A conversation full of memories, Photo by Juri Gianfrancesco on Unsplash.

Building a Conversational Agent with Memory Microservice with OpenAI and FastAPI

Crafting Context-Aware Conversational Agents: A Deep Dive into OpenAI and FastAPI Integration

Cesar Flores
Towards Data Science
30 min readAug 17, 2023

--

Introduction

In this tutorial, we will explore the process of creating a Conversational Agent with a memory microservice using OpenAI and FastAPI. Conversational Agents have become a crucial component in various applications, including customer support, virtual assistants, and information retrieval systems. However, many traditional chatbot implementations lack the ability to retain context during a conversation, resulting in limited capabilities and frustrating user experiences. This is challenging, especially when building agent services following a microservice architecture.

The link to the GitHub repository is at the bottom of the article.

Motivation

The motivation behind this tutorial is to address the limitation of traditional chatbot implementations and create a Conversational Agent with a memory microservice, which becomes especially crucial when deploying agents within complex environments like Kubernetes. In Kubernetes or similar container orchestration systems, microservices are subject to frequent restarts, updates, and scaling operations. During these events, the state of the conversation in traditional buffers for chatbots would be lost, leading to disjointed interactions and poor user experiences.

By building a Conversational Agent with a memory microservice, we can ensure that crucial conversation context is preserved even in the face of microservice restarts or updates or when interactions are not continuous. This preservation of state allows the agent to seamlessly pick up conversations where they left off, maintaining continuity and providing a more natural and personalized user experience. Furthermore, this approach aligns with the best practices of modern application development, where containerized microservices often interact with other components, making the memory microservice a valuable addition to the conversational agent’s architecture in such distributed setups.

The Stack We Will Be Using

For this project, we will primarily work with the following technologies and tools:

  1. OpenAI GPT-3.5: We will leverage OpenAI’s GPT-3.5 language model, which is capable of performing various natural language processing tasks, including text generation, conversation management, and context retention. We will need to generate an OpenAI API Key, make sure you visit this URL to manage your keys.
  2. FastAPI: FastAPI will serve as the backbone of our microservice, providing the infrastructure for handling HTTP requests, managing conversation states, and integrating with the OpenAI API. FastAPI is great for building microservices with Python.

The Development Cycle

In this section, we will dive into the step-by-step process of building our Conversational Agent with a memory microservice. The development cycle will include:

  1. Environment Setup: We’ll create a virtual environment and install the necessary dependencies, including OpenAI’s Python library and FastAPI.
  2. Designing the Memory Microservice: We’ll outline the architecture and design of the memory microservice, which will be responsible for storing and managing conversation context.
  3. Integrating OpenAI: We’ll integrate OpenAI’s GPT-3.5 model into our application and define the logic for processing user messages and generating responses.
  4. Testing: We’ll gradually test our conversational agent.

Environment Setup

For this setup, we will use the following structure to build the microservice. This is convenient for more expansions of other services under the same project, and I personally like this structure.

├── Dockerfile <--- Container
├── requirements.txt <--- Libraries and Dependencies
├── setup.py <--- Build and distribute microservices as Python packages
└── src
├── agents <--- Name of your Microservice
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── routes.py
│ │ └── schemas.py
│ ├── crud.py
│ ├── database.py
│ ├── main.py
│ ├── models.py
│ └── processing.py
└── agentsfwrk <--- Name of your Common Framework
├── __init__.py
├── integrations.py
└── logger.py

We will need to craft in the project a folder named src which will contain the Python code for the services; in our case agents contains all the code associated with our conversational agents and the API, and agentsfwrk is our common framework for usage across services.

The Dockerfile contains the instructions to build the image, once the code is ready, the requirements.txt contains the libraries to use in our project and the setup.py contains the instructions to build and distribute our project.

For now, just create the services folders along with the __init__.py files and add the following to the requirements.txt and setup.py to the root of the project, leave the Dockerfile empty, as we will come back to it in the Deployment Cycle section.

# Requirements.txt
fastapi==0.95.2
ipykernel==6.22.0
jupyter-bokeh==2.0.2
jupyterlab==3.6.3
openai==0.27.6
pandas==2.0.1
sqlalchemy-orm==1.2.10
sqlalchemy==2.0.15
uvicorn<0.22.0,>=0.21.1
# setup.py
from setuptools import find_packages, setup

setup(
name = 'conversational-agents',
version = '0.1',
description = 'microservices for conversational agents',
packages = find_packages('src'),
package_dir = {'': 'src'},
# This is optional btw
author = 'XXX XXXX',
author_email = 'XXXX@XXXXX.ai',
maintainer = 'XXX XXXX',
maintainer_email = 'XXXX@XXXXX.ai',
)

Let’s activate the virtual environment, and we will run pip install -r requirements.txt in the terminal. We will not run the setup file yet, so let’s get into the next section.

Designing the Common Framework

We will design our common framework, so we can use it across all the microservices built-in the project. This is not strictly necessary for small projects, but thinking about the future, you can expand it to use multiple LLM providers, add other libraries to interact with your own data (i.e. LangChain, VoCode), and other common capabilities such as voice and image services, without the need of implementing them in each microservice.

Create the folder and the files following the agentsfwrk structure. Each file and its description are below:

└── agentsfwrk <--- Name of your Common Framework
├── __init__.py
├── integrations.py
└── logger.py

The logger is a very basic utility to set up a common logging module, and you can define it as follows:

import logging
import multiprocessing
import sys

APP_LOGGER_NAME = 'CaiApp'

def setup_applevel_logger(logger_name = APP_LOGGER_NAME, file_name = None):
"""
Setup the logger for the application
"""
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
sh = logging.StreamHandler(sys.stdout)
sh.setFormatter(formatter)
logger.handlers.clear()
logger.addHandler(sh)
if file_name:
fh = logging.FileHandler(file_name)
fh.setFormatter(formatter)
logger.addHandler(fh)

return logger

def get_multiprocessing_logger(file_name = None):
"""
Setup the logger for the application for multiprocessing
"""
logger = multiprocessing.get_logger()
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")

sh = logging.StreamHandler(sys.stdout)
sh.setFormatter(formatter)

if not len(logger.handlers):
logger.addHandler(sh)

if file_name:
fh = logging.FileHandler(file_name)
fh.setFormatter(formatter)
logger.addHandler(fh)

return logger


def get_logger(module_name, logger_name = None):
"""
Get the logger for the module
"""
return logging.getLogger(logger_name or APP_LOGGER_NAME).getChild(module_name)

Next, our integration layer is done via the integration module. This file acts as a middleman between the microservices logic and OpenAI, and it’s designed to expose LLM providers in a common manner for our application. Here, we can implement common ways to handle exceptions, errors, retries, and timeouts in requests or in responses. I learned from a very good manager to always place an integration layer between external services/APIs and the inside world of our application.

The integration code is defined below:

# integrations.py
# LLM provider common module
import json
import os
import time
from typing import Union

import openai
from openai.error import APIConnectionError, APIError, RateLimitError

import agentsfwrk.logger as logger

log = logger.get_logger(__name__)

openai.api_key = os.getenv('OPENAI_API_KEY')

class OpenAIIntegrationService:
def __init__(
self,
context: Union[str, dict],
instruction: Union[str, dict]
) -> None:

self.context = context
self.instructions = instruction

if isinstance(self.context, dict):
self.messages = []
self.messages.append(self.context)

elif isinstance(self.context, str):
self.messages = self.instructions + self.context


def get_models(self):
return openai.Model.list()

def add_chat_history(self, messages: list):
"""
Adds chat history to the conversation.
"""
self.messages += messages

def answer_to_prompt(self, model: str, prompt: str, **kwargs):
"""
Collects prompts from user, appends to messages from the same conversation
and return responses from the gpt models.
"""
# Preserve the messages in the conversation
self.messages.append(
{
'role': 'user',
'content': prompt
}
)

retry_exceptions = (APIError, APIConnectionError, RateLimitError)
for _ in range(3):
try:
response = openai.ChatCompletion.create(
model = model,
messages = self.messages,
**kwargs
)
break
except retry_exceptions as e:
if _ == 2:
log.error(f"Last attempt failed, Exception occurred: {e}.")
return {
"answer": "Sorry, I'm having technical issues."
}
retry_time = getattr(e, 'retry_after', 3)
log.error(f"Exception occurred: {e}. Retrying in {retry_time} seconds...")
time.sleep(retry_time)

response_message = response.choices[0].message["content"]
response_data = {"answer": response_message}
self.messages.append(
{
'role': 'assistant',
'content': response_message
}
)

return response_data

def answer_to_simple_prompt(self, model: str, prompt: str, **kwargs) -> dict:
"""
Collects context and appends a prompt from a user and return response from
the gpt model given an instruction.
This method only allows one message exchange.
"""

messages = self.messages + f"\n<Client>: {prompt} \n"

retry_exceptions = (APIError, APIConnectionError, RateLimitError)
for _ in range(3):
try:
response = openai.Completion.create(
model = model,
prompt = messages,
**kwargs
)
break
except retry_exceptions as e:
if _ == 2:
log.error(f"Last attempt failed, Exception occurred: {e}.")
return {
"intent": False,
"answer": "Sorry, I'm having technical issues."
}
retry_time = getattr(e, 'retry_after', 3)
log.error(f"Exception occurred: {e}. Retrying in {retry_time} seconds...")
time.sleep(retry_time)

response_message = response.choices[0].text

try:
response_data = json.loads(response_message)
answer_text = response_data.get('answer')
if answer_text is not None:
self.messages = self.messages + f"\n<Client>: {prompt} \n" + f"<Agent>: {answer_text} \n"
else:
raise ValueError("The response from the model is not valid.")
except ValueError as e:
log.error(f"Error occurred while parsing response: {e}")
log.error(f"Prompt from the user: {prompt}")
log.error(f"Response from the model: {response_message}")
log.info("Returning a safe response to the user.")
response_data = {
"intent": False,
"answer": response_message
}

return response_data


def verify_end_conversation(self):
"""
Verify if the conversation has ended by checking the last message from the user
and the last message from the assistant.
"""
pass

def verify_goal_conversation(self, model: str, **kwargs):
"""
Verify if the conversation has reached the goal by checking the conversation history.
Format the response as specified in the instructions.
"""
messages = self.messages.copy()
messages.append(self.instructions)

retry_exceptions = (APIError, APIConnectionError, RateLimitError)
for _ in range(3):
try:
response = openai.ChatCompletion.create(
model = model,
messages = messages,
**kwargs
)
break
except retry_exceptions as e:
if _ == 2:
log.error(f"Last attempt failed, Exception occurred: {e}.")
raise
retry_time = getattr(e, 'retry_after', 3)
log.error(f"Exception occurred: {e}. Retrying in {retry_time} seconds...")
time.sleep(retry_time)

response_message = response.choices[0].message["content"]
try:
response_data = json.loads(response_message)
if response_data.get('summary') is None:
raise ValueError("The response from the model is not valid. Missing summary.")
except ValueError as e:
log.error(f"Error occurred while parsing response: {e}")
log.error(f"Response from the model: {response_message}")
log.info("Returning a safe response to the user.")
raise

return response_data

Some notes about the integration module:

  • The OpenAI Key is defined as an environment variable named “OPENAI_API_KEY”, we should download this key and define it in our terminal or using the python-dotenv library.
  • There are two methods to integrate with GPT models, one for the chat endpoint (answer_to_prompt) and one for the completion endpoint (answer_to_simple_prompt). We will focus on the usage of the first one.
  • There is a method to check the goal of a conversation — verify_goal_conversation, which simply follows the instructions of agents and creates a summary of it.

Designing the (Memory) Microservice

The best exercise is to design and consequentially draw a diagram to visualize what the service needs to do, including the actors and their actions when interacting with it. Let’s start by describing our application in simple terms:

  • Our microservice is a provider of artificially intelligent agents, which are experts on a subject and are expected to have conversations in response to an outbound message and following prompts.
  • Our agents can hold multiple conversations and are packed with memory that is to be persisted, which means they must be able to retain the conversation history regardless of the session of the client who is interacting with the agents.
  • The agents should receive, at creation, clear instructions on how to handle a conversation and respond accordingly during the course of it.
  • For programmatic integration, the agents should also follow an expected response shape.

Our design looks like the following diagram:

Conversational Agents Design — Image by author

With this simple diagram, we know that our microservice needs to implement methods that are responsible for these specific tasks:

  1. Creation of agents & definition of instructions
  2. Conversation starter & preservation of conversation history
  3. Chat with agents

We will code these functionalities in their order, and before we dive into that we will build the skeleton of our application

Application Skeleton

To kickstart the development, we begin by building the FastAPI app skeleton. The app skeleton consists of essential components, including the main application script, database configuration, processing script, and routing modules. The main script serves as the entry point for the application, where we set up the FastAPI instance.

Main File

Let’s create/open the main.py file in your agents folder and type the following code, which simply defines a root endpoint.

from fastapi import FastAPI

from agentsfwrk.logger import setup_applevel_logger

log = setup_applevel_logger(file_name = 'agents.log')

app = FastAPI()

@app.get("/")
async def root():
return {"message": "Hello there conversational ai user!"}

Database Configuration

We then create/open the database configuration script called database.py, which establishes the connection to our local database for storing and retrieving conversation context. We will start by using a local SQLite for simplicity, but feel free to try other databases for your environment.

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///agents.db"

engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args = {"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit = False, autoflush = False, bind = engine)

Base = declarative_base()

API Routes

Finally, we define routing modules that handle incoming HTTP requests, encompassing endpoints responsible for processing user interactions. Let’s create the apifolder and create/open the routes.py file and paste the following code.

from typing import List

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session

import agents.api.schemas
import agents.models
from agents.database import SessionLocal, engine

from agentsfwrk import integrations, logger

log = logger.get_logger(__name__)

agents.models.Base.metadata.create_all(bind = engine)

# Router basic information
router = APIRouter(
prefix = "/agents",
tags = ["Chat"],
responses = {404: {"description": "Not found"}}
)

# Dependency: Used to get the database in our endpoints.
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

# Root endpoint for the router.
@router.get("/")
async def agents_root():
return {"message": "Hello there conversational ai!"}

With this structured skeleton, we are ready to start coding the application we designed.

Create Agents and Assign Instructions

In this section, we will focus on implementing the “Create Agent” endpoint. This endpoint enables users to initiate new conversations and interact with agents, providing a context and a set of instructions for the agent to follow throughout the rest of the conversation. We will start by introducing two Data Models for this process: One for the Database and another one for the API. We will be using Pydantic for our data models. Create/Open the schemas.py file in the api folder, and define the Agent base, Agent Create, and Agent data model.

from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel

class AgentBase(BaseModel): # <-- Base model
context: str # <-- Our agents context
first_message: str # <-- Our agents will approach the users with a first message.
response_shape: str # <-- The expected shape (for programatic communication) of the response of each agent's interaction with the user
instructions: str # <-- Set of instructions that our agent should follow.

class AgentCreate(AgentBase): # <-- Creation data model
pass

class Agent(AgentBase): # <-- Agent data model
id: str
timestamp: datetime = datetime.utcnow()

class Config:
orm_mode = True

The fields in the agent’s data model are detailed below:

  • Context: This is an overall context of what the agent is.
  • First message: Our agents are intended to start a conversation with the users. This can be as simple as “Hello, how can I help you?” or something like “Hi, you requested an agent to help you find information about stocks, is that correct?”.
  • Response shape: This field is mainly used for specifying the output format of our agent’s response and should be used for transforming the text output of our LLM to a desired shape for programmatic communication. For example, we may want to specify that our agent should wrap the response in a JSON format with a key named response, i.e. {'response': "string"}.
  • Instructions: This field holds the instructions and guidelines each agent should follow during the whole conversation, such as “Gather the following entities [e1, e2, e3, …] during each interaction” or “Reply to the user until he is no longer interested in the conversation” or “Don’t deviate from the main topic and drive the conversation back to the main goal when needed”.

We now proceed to open the models.py file, where we will code our database table that belongs to the agent's entity.

from sqlalchemy import Column, ForeignKey, String, DateTime, JSON
from sqlalchemy.orm import relationship
from datetime import datetime

from agents.database import Base

class Agent(Base):
__tablename__ = "agents"

id = Column(String, primary_key = True, index = True)
timestamp = Column(DateTime, default = datetime.utcnow)

context = Column(String, nullable = False)
first_message = Column(String, nullable = False)
response_shape = Column(JSON, nullable = False)
instructions = Column(String, nullable = False)

This code is pretty similar to the Pydantic model, it defines the table of the agent in our database.

With our two data models in place, we are ready to implement the creation of the Agent. For this, we will start by modifying the routes.py file and adding the endpoint:

@router.post("/create-agent", response_model = agents.api.schemas.Agent)
async def create_agent(campaign: agents.api.schemas.AgentCreate, db: Session = Depends(get_db)):
"""
Create an agent
"""
log.info(f"Creating agent")
# db_agent = create_agent(db, agent)
log.info(f"Agent created with id: {db_agent.id}")

return db_agent

We need to create a new function that receives an Agent object from the request and creates it into the database. For this, we will create/open the crud.py file which will hold all the interactions to the database (CREATE, READ, UPDATE, DELETE).

# crud.py
import uuid
from sqlalchemy.orm import Session
from agents import models
from agents.api import schemas

def create_agent(db: Session, agent: schemas.AgentCreate):
"""
Create an agent in the database
"""
db_agent = models.Agent(
id = str(uuid.uuid4()),
context = agent.context,
first_message = agent.first_message,
response_shape = agent.response_shape,
instructions = agent.instructions
)
db.add(db_agent)
db.commit()
db.refresh(db_agent)

return db_agent

With our function created, we can now go back to theroutes.py, import the crud module, and use it in the endpoint’s method.

import agents.crud

@router.post("/create-agent", response_model = agents.api.schemas.Agent)
async def create_agent(agent: agents.api.schemas.AgentCreate, db: Session = Depends(get_db)):
"""
Create an agent endpoint.
"""
log.info(f"Creating agent: {agent.json()}")
db_agent = agents.crud.create_agent(db, agent)
log.info(f"Agent created with id: {db_agent.id}")

return db_agent

Now let’s go back to the main.py file and add the “agents” router. The modifications

# main.py
from fastapi import FastAPI

from agents.api.routes import router as ai_agents # NOTE: <-- new addition
from agentsfwrk.logger import setup_applevel_logger

log = setup_applevel_logger(file_name = 'agents.log')

app = FastAPI()
app.include_router(router = ai_agents) # NOTE: <-- new addition

@app.get("/")
async def root():
return {"message": "Hello there conversational ai user!"}

Let’s test this functionality. First, we will need to install our services as a Python package, secondly, start the application on port 8000.

# Run from the root of the project.
$ pip install -e .
# Command to run the app.
$ uvicorn agents.main:app --host 0.0.0.0 --port 8000 --reload

Navigate to http://0.0.0.0:8000/docs, where you will see the Swagger UI with the endpoint to test. Submit your payload and check the output.

create-agent endpoint from Swagger UI — Image by author

We will continue developing our application, but testing the first endpoint is a good sign of progress.

Create Conversations & Preserve Conversation History

Our next step is to allow users to interact with our agents. We want users to interact with specific agents, so we will need to pass the ID of the agent along with the first interaction message from the user. Let’s make some modifications to the Agent data model so each agent can have multiple conversations by introducing the Conversation entity. Open the schemas.py file and add the following models:

class ConversationBase(BaseModel): # <-- base of our conversations, they must belong to an agent
agent_id: str

class ConversationCreate(ConversationBase): # <-- conversation creation object
pass

class Conversation(ConversationBase): # <-- The conversation objects
id: str
timestamp: datetime = datetime.utcnow()

class Config:
orm_mode = True

class Agent(AgentBase): # <-- Agent data model
id: str
timestamp: datetime = datetime.utcnow()
conversations: List[Conversation] = [] # <-- NOTE: we have added the conversation as a list of Conversations objects.

class Config:
orm_mode = True

Note that we have modified the Agent data model and added conversations to it, this is so each agent can hold multiple conversations as designed in our diagram.

We have to modify our database object and include the conversation table in the database model script. We will open the models.py file and modify the code as follow:

# models.py

class Agent(Base):
__tablename__ = "agents"

id = Column(String, primary_key = True, index = True)
timestamp = Column(DateTime, default = datetime.utcnow)

context = Column(String, nullable = False)
first_message = Column(String, nullable = False)
response_shape = Column(JSON, nullable = False)
instructions = Column(String, nullable = False)

conversations = relationship("Conversation", back_populates = "agent") # <-- NOTE: We add the conversation relationship into the agents table

class Conversation(Base):
__tablename__ = "conversations"

id = Column(String, primary_key = True, index = True)
agent_id = Column(String, ForeignKey("agents.id"))
timestap = Column(DateTime, default = datetime.utcnow)

agent = relationship("Agent", back_populates = "conversations") # <-- We add the relationship between the conversation and the agent

Note how we added the relationship between the conversations per each agent in the agents table, and also the relationship between a conversation with an agent in the conversations table.

We will now create a set of CRUD functions to retrieve the agent and conversations by their IDs, which will help us to craft our process of creating a conversation and preserving its history. Let’s open the crud.py file and add the following functions:

def get_agent(db: Session, agent_id: str):
"""
Get an agent by its id
"""
return db.query(models.Agent).filter(models.Agent.id == agent_id).first()

def get_conversation(db: Session, conversation_id: str):
"""
Get a conversation by its id
"""
return db.query(models.Conversation).filter(models.Conversation.id == conversation_id).first()

def create_conversation(db: Session, conversation: schemas.ConversationCreate):
"""
Create a conversation
"""
db_conversation = models.Conversation(
id = str(uuid.uuid4()),
agent_id = conversation.agent_id,
)
db.add(db_conversation)
db.commit()
db.refresh(db_conversation)

return db_conversation

These new functions will help us during the normal workflow of our application, we can now get an agent by its ID, get a conversation by its ID, and create a conversation by providing an ID as optional, and the agent ID that should hold the conversation.

We can go ahead and create an endpoint that creates a conversation. Open the routes.py and add the following code:

@router.post("/create-conversation", response_model = agents.api.schemas.Conversation)
async def create_conversation(conversation: agents.api.schemas.ConversationCreate, db: Session = Depends(get_db)):
"""
Create a conversation linked to an agent
"""
log.info(f"Creating conversation assigned to agent id: {conversation.agent_id}")
db_conversation = agents.crud.create_conversation(db, conversation)
log.info(f"Conversation created with id: {db_conversation.id}")

return db_conversation

With this method ready we are still one step away from having an actual conversational endpoint, which we will review next.

It is important to make a distinction here when we initialize an agent, we can create a conversation without triggering a two-way exchange of messages or another way is to trigger the creation of a conversation when the “Chat with an agent” endpoint is called. This provides some flexibility in orchestrating the workflows outside the microservice, in some cases you may want to initialize the agents, pre kick-off conversations to clients and as messages start to come in, you start preserving the history of the messages.

create-conversation endpoint from Swagger UI — Image by author

Important Note: if you are following step by step this guide and see an error related to the database schema in this step, it is because we are not applying migrations to the database with each modification of the schemas, so make sure you close the application (exit the terminal command) and delete the agents.db file that is created at runtime. You will need to run each endpoint again and take notes of the IDs.

Chat with an agent

We are going to introduce the last entity type in our application which is the Message entity. This one is responsible for modeling the interaction between a client’s message and an agent’s message (two-way exchange of messages). We will also add API data models that are used to define the structure of the response of our endpoints. Let’s go ahead and create the data models and API response types first; open the schemas.py file, and modify the code:

##########################################
# Internal schemas
##########################################
class MessageBase(BaseModel): # <-- Every message is composed by user/client message and the agent
user_message: str
agent_message: str

class MessageCreate(MessageBase):
pass

class Message(MessageBase): # <-- Data model for the Message entity
id: str
timestamp: datetime = datetime.utcnow()
conversation_id: str

class Config:
orm_mode = True

##########################################
# API schemas
##########################################
class UserMessage(BaseModel):
conversation_id: str
message: str

class ChatAgentResponse(BaseModel):
conversation_id: str
response: str

We now have to add the data model in our database models script which represents the table in our database. Open the models.py file and modify as below:

# models.py

class Conversation(Base):
__tablename__ = "conversations"

id = Column(String, primary_key = True, index = True)
agent_id = Column(String, ForeignKey("agents.id"))
timestap = Column(DateTime, default = datetime.utcnow)

agent = relationship("Agent", back_populates = "conversations")
messages = relationship("Message", back_populates = "conversation") # <-- We define the relationship between the conversation and the multiple messages in them.

class Message(Base):
__tablename__ = "messages"

id = Column(String, primary_key = True, index = True)
timestamp = Column(DateTime, default = datetime.utcnow)

user_message = Column(String)
agent_message = Column(String)

conversation_id = Column(String, ForeignKey("conversations.id")) # <-- A message belongs to a conversation
conversation = relationship("Conversation", back_populates = "messages") # <-- We define the relationship between the messages and the conversation.

Note that we have modified our Conversations table to define the relationship between messages and conversation and we created a new table that represents the interactions (exchange of messages) that should belong to a conversation.

We are now going to add a new CRUD function to interact with the database and create a message for a conversation. Let’s open the crud.py file and add the following function:

def create_conversation_message(db: Session, message: schemas.MessageCreate, conversation_id: str):
"""
Create a message for a conversation
"""
db_message = models.Message(
id = str(uuid.uuid4()),
user_message = message.user_message,
agent_message = message.agent_message,
conversation_id = conversation_id
)
db.add(db_message)
db.commit()
db.refresh(db_message)

return db_message

Now we are ready to build the final and most interesting endpoint, the chat-agent endpoint. Let’s open the routes.py file and follow the code along as we will be implementing some processing functions on the way.

@router.post("/chat-agent", response_model = agents.api.schemas.ChatAgentResponse)
async def chat_completion(message: agents.api.schemas.UserMessage, db: Session = Depends(get_db)):
"""
Get a response from the GPT model given a message from the client using the chat
completion endpoint.

The response is a json object with the following structure:
```
{
"conversation_id": "string",
"response": "string"
}
```
"""
log.info(f"User conversation id: {message.conversation_id}")
log.info(f"User message: {message.message}")

conversation = agents.crud.get_conversation(db, message.conversation_id)

if not conversation:
# If there are no conversations, we can choose to create one on the fly OR raise an exception.
# Which ever you choose, make sure to uncomment when necessary.

# Option 1:
# conversation = agents.crud.create_conversation(db, message.conversation_id)

# Option 2:
return HTTPException(
status_code = 404,
detail = "Conversation not found. Please create conversation first."
)

log.info(f"Conversation id: {conversation.id}")

In this section of the endpoint, we are making sure to create or raise an exception if the conversation does not exist. The next step is to prepare the data that will be sent to OpenAI via our integration, for this we will create a set of processing functions in the processing.py file that will craft the context, first message, instructions, and expected response shape from the LLM.

# processing.py

import json

########################################
# Chat Properties
########################################
def craft_agent_chat_context(context: str) -> dict:
"""
Craft the context for the agent to use for chat endpoints.
"""
agent_chat_context = {
"role": "system",
"content": context
}
return agent_chat_context

def craft_agent_chat_first_message(content: str) -> dict:
"""
Craft the first message for the agent to use for chat endpoints.
"""
agent_chat_first_message = {
"role": "assistant",
"content": content
}
return agent_chat_first_message

def craft_agent_chat_instructions(instructions: str, response_shape: str) -> dict:
"""
Craft the instructions for the agent to use for chat endpoints.
"""
agent_instructions = {
"role": "user",
"content": instructions + f"\n\nFollow a RFC8259 compliant JSON with a shape of: {json.dumps(response_shape)} format without deviation."
}
return agent_instructions

Note the last function that expects the response_shape defined during the creation of the agent, this input will be appended to the LLM during the course of a conversation and will guide the agent to follow the guidelines and return the response as a JSON object.

Let’s go back to the routes.py file and finish our endpoint implementation:

# New imports from the processing module.
from agents.processing import (
craft_agent_chat_context,
craft_agent_chat_first_message,
craft_agent_chat_instructions
)

@router.post("/chat-agent", response_model = agents.api.schemas.ChatAgentResponse)
async def chat_completion(message: agents.api.schemas.UserMessage, db: Session = Depends(get_db)):
"""
Get a response from the GPT model given a message from the client using the chat
completion endpoint.

The response is a json object with the following structure:
```
{
"conversation_id": "string",
"response": "string"
}
```
"""
log.info(f"User conversation id: {message.conversation_id}")
log.info(f"User message: {message.message}")

conversation = agents.crud.get_conversation(db, message.conversation_id)

if not conversation:
# If there are no conversations, we can choose to create one on the fly OR raise an exception.
# Which ever you choose, make sure to uncomment when necessary.

# Option 1:
# conversation = agents.crud.create_conversation(db, message.conversation_id)

# Option 2:
return HTTPException(
status_code = 404,
detail = "Conversation not found. Please create conversation first."
)

log.info(f"Conversation id: {conversation.id}")

# NOTE: We are crafting the context first and passing the chat messages in a list
# appending the first message (the approach from the agent) to it.
context = craft_agent_chat_context(conversation.agent.context)
chat_messages = [craft_agent_chat_first_message(conversation.agent.first_message)]

# NOTE: Append to the conversation all messages until the last interaction from the agent
# If there are no messages, then this has no effect.
# Otherwise, we append each in order by timestamp (which makes logical sense).
hist_messages = conversation.messages
hist_messages.sort(key = lambda x: x.timestamp, reverse = False)
if len(hist_messages) > 0:
for mes in hist_messages:
log.info(f"Conversation history message: {mes.user_message} | {mes.agent_message}")
chat_messages.append(
{
"role": "user",
"content": mes.user_message
}
)
chat_messages.append(
{
"role": "assistant",
"content": mes.agent_message
}
)
# NOTE: We could control the conversation by simply adding
# rules to the length of the history.
if len(hist_messages) > 10:
# Finish the conversation gracefully.
log.info("Conversation history is too long, finishing conversation.")
api_response = agents.api.schemas.ChatAgentResponse(
conversation_id = message.conversation_id,
response = "This conversation is over, good bye."
)
return api_response

# Send the message to the AI agent and get the response
service = integrations.OpenAIIntegrationService(
context = context,
instruction = craft_agent_chat_instructions(
conversation.agent.instructions,
conversation.agent.response_shape
)
)
service.add_chat_history(messages = chat_messages)

response = service.answer_to_prompt(
# We can test different OpenAI models.
model = "gpt-3.5-turbo",
prompt = message.message,
# We can test different parameters too.
temperature = 0.5,
max_tokens = 1000,
frequency_penalty = 0.5,
presence_penalty = 0
)

log.info(f"Agent response: {response}")

# Prepare response to the user
api_response = agents.api.schemas.ChatAgentResponse(
conversation_id = message.conversation_id,
response = response.get('answer')
)

# Save interaction to database
db_message = agents.crud.create_conversation_message(
db = db,
conversation_id = conversation.id,
message = agents.api.schemas.MessageCreate(
user_message = message.message,
agent_message = response.get('answer'),
),
)
log.info(f"Conversation message id {db_message.id} saved to database")

return api_response

Voilà! This is our final endpoint implementation, if we look at the Notes added to the code, we see that the process is quite straightforward:

  1. We make sure the conversation exists in our database (or we create one)
  2. We craft the context and instructions to the agent from our database
  3. We make use of the “memory” of the agent by pulling the conversation history
  4. Finally, we request the agent response through OpenAI’s GPT-3.5 Turbo model and return the response to the client.

Local Testing Our Agents

Now we are ready to test the complete workflow of our microservice, we will start by going to our terminal and typing uvicorn agents.main:app — host 0.0.0.0 — port 8000 — reload to launch the app. Next, we will navigate to our Swagger UI by going to http://0.0.0.0:8000/docs and we will submit the following requests:

  • Create the agent: Give a payload that you’d like to test. I will submit the following:
{
"context": "You are a chef specializing in Mediterranean food that provides receipts with a maximum of simple 10 ingredients. The user can have many food preferences or ingredient preferences, and your job is always to analyze and guide them to use simple ingredients for the recipes you suggest and these should also be Mediterranean. The response should include detailed information on the recipe. The response should also include questions to the user when necessary. If you think your response may be inaccurate or vague, do not write it and answer with the exact text: `I don't have a response.`",
"first_message": "Hello, I am your personal chef and cooking advisor and I am here to help you with your meal preferences and your cooking skills. What can I can do for you today?",
"response_shape": "{'recipes': 'List of strings with the name of the recipes', 'ingredients': 'List of the ingredients used in the recipes', 'summary': 'String, summary of the conversation'}",
"instructions": "Run through the conversation messages and discard any messages that are not relevant for cooking. Focus on extracting the recipes that were mentioned in the conversation and for each of them extract the list of ingredients. Make sure to provide a summary of the conversation when asked."
}
  • Create the conversation: assign the conversation to the agent_id one that you have gotten from the previous response.
{
"agent_id": "Replace with the UUID from the agent you just created."
}
  • Let’s chat with our agent through the outbound message: Our agent will initialize the conversation by asking us a question or approaching us with a message. We will follow the conversation by submitting a message back — I’ll use this one:
{
"conversation_id": "74485ccb-21e5-45bd-8008-001cbd6d3e37",
"message": "Hey I am interested in cooking dinner for me and my friends tomorrow night, and I need your advice on what to cook for 5 people."
}
chat-agent endpoint response from Swagger UI — Image by author

Our agent replied to us with a response and we can continue this conversation by replying in a natural way.

# Our question
{
"conversation_id": "74485ccb-21e5-45bd-8008-001cbd6d3e37",
"message": "There is someone that is allergic to peanuts and that's it. My only preference is seafood and anything that goes perfectly with white wine."
}
# Our agents response
{
"conversation_id": "74485ccb-21e5-45bd-8008-001cbd6d3e37",
"response": "Great! We can definitely work around the peanut allergy and incorporate seafood into the menu. How about we start with a refreshing appetizer of Grilled Shrimp Skewers with Lemon and Garlic? It's simple, delicious, and pairs well with white wine.\n\nHere's the recipe for Grilled Shrimp Skewers with Lemon and Garlic:\n\nIngredients:\n1. 1 pound of large shrimp, peeled and deveined\n2. 2 cloves of garlic, minced\n3. 2 tablespoons of fresh lemon juice\n4. 2 tablespoons of olive oil\n5. Salt and pepper to taste\n6. Wooden skewers, soaked in water for 30 minutes\n\nInstructions:\n1. Preheat your grill to medium-high heat.\n2. In a bowl, combine minced garlic, lemon juice, olive oil, salt, and pepper.\n3. Thread the shrimp onto the soaked wooden skewers.\n4. Brush the shrimp skewers with the garlic-lemon marinade on both sides.\n5. Place the skewers on the preheated grill and cook for about 2-3 minutes per side until they turn pink and opaque.\n6. Remove from the grill and serve hot.\n\nFor the main course, how about a Mediterranean-style Baked Salmon? It's flavorful, healthy, and pairs wonderfully with white wine.\n\nHere's the recipe for Mediterranean Baked Salmon:\n\nIngredients:\n1. 1 pound of salmon fillets\n2. 2 tablespoons of olive oil\n3. 2 cloves of garlic, minced\n4. 1 teaspoon of dried oregano\n5. Juice of half a lemon\n6. Salt and pepper to taste\n\nInstructions:\n1. Preheat your oven to 375°F (190°C).\n2. Place the salmon fillets on a baking sheet lined with parchment paper or aluminum foil.\n3. In a small bowl, mix together olive oil, minced garlic, dried oregano, lemon juice, salt, and pepper.\n4. Brush the mixture over the salmon fillets, ensuring they are evenly coated.\n5. Bake the salmon in the preheated oven for about 12-15 minutes or until it flakes easily with a fork.\n6. Remove from the oven and let it rest for a few minutes before serving.\n\nTo complement the meal, you can serve a side of Roasted Mediterranean Vegetables. It's a colorful and nutritious addition to your dinner.\n\nHere's the recipe for Roasted Mediterranean Vegetables:\n\nIngredients:\n1. 1 red bell pepper, sliced\n2. 1 yellow bell pepper, sliced\n3. 1 zucchini, sliced\n4. 1 eggplant, diced\n5. 1 red onion, sliced\n6. 2 tablespoons of olive oil\n7. Salt and pepper to taste\n8. Fresh herbs like basil or parsley (optional)\n\nInstructions:\n1. Preheat your oven to 425°F (220°C).\n2. In a large bowl, toss together all the vegetables with olive oil, salt, and pepper until well coated.\n3. Spread the vegetables in a single layer on a baking sheet.\n4. Roast in the preheated oven for about 20-25 minutes or until they are tender and slightly caramelized.\n5. Remove from the oven and sprinkle with fresh herbs if desired.\n\nFor dessert, you can keep it simple yet satisfying with a classic Mediterranean dessert like Fresh Fruit Salad with Honey and Mint.\n\nHere's how you can prepare Fresh Fruit Salad with Honey and Mint:\n\nIngredients:\n1. Assorted fresh fruits of your choice (such as berries, melons, grapes)\n2. Honey for drizzling\n3. Fresh mint leaves for garnish\n\nInstructions:\n1. Wash and cut the fresh fruits into bite-sized pieces.\n2. Combine all the fruits in a bowl.\n3. Drizzle honey over the fruit salad and gently toss to coat.\n4. Garnish with fresh mint leaves.\n5. Serve chilled.\n\nI hope you and your friends enjoy this Mediterranean dinner menu! If you have any other questions or need further assistance, feel free to ask."
}
# Our question - Note that we are not mentioning the allergy, as we expect the agent to remember we are talking about an allergy to peanuts.
{
"conversation_id": "74485ccb-21e5-45bd-8008-001cbd6d3e37",
"message": "Is the recipe ok for my friend who is allergic?"
}
# Our agents response - Note the mention of these two indices.
{
"conversation_id": "74485ccb-21e5-45bd-8008-001cbd6d3e37",
"response": "Yes, the Grilled Shrimp Skewers with Lemon and Garlic recipe should be safe for your friend with a peanut allergy. However, it's always important to double-check the ingredients you use to ensure they are free from any potential allergens or cross-contamination."
}

Go ahead and play around with the code and your new agents. In the next section, I will focus on the deployment of this service.

The Deployment Cycle

We will deploy our application under a container environment in the cloud such as Kubernetes, Azure Container Service, or AWS Elastic Container Service. Here is where we create a docker image and upload our code so we can run it in one of these environments, go ahead and open the Dockerfile one we created at the start and paste the following code:

# Dockerfile
FROM python:3.10-slim-bullseye

# Set the working directory
WORKDIR /app

# Copy the project files to the container
COPY . .

# Install the package using setup.py
RUN pip install -e .

# Install dependencies
RUN pip install pip -U && \
pip install --no-cache-dir -r requirements.txt

# Set the environment variable
ARG OPENAI_API_KEY
ENV OPENAI_API_KEY=$OPENAI_API_KEY

# Expose the necessary ports
EXPOSE 8000

# Run the application
# CMD ["uvicorn", "agents.main:app", "--host", "0.0.0.0", "--port", "8000"]

The Dockerfile installs the app and then it runs it via the CMD which is commented out. You should uncomment the command if you want to run it locally as a standalone, but for other services such as Kubernetes, this is defined when defining the deployment or pods in the command section of the manifest.

Build the image, wait until the build is completed, and then test it by running the run command, which is below:

# Build the image
$ docker build - build-arg OPENAI_API_KEY=<Replace with your OpenAI Key> -t agents-app .
# Run the container with the command from the agents app (Use -d flag for the detached run).
$ docker run -p 8000:8000 agents-app uvicorn agents.main:app --host 0.0.0.0 --port 8000
# Output
INFO: Started server process [1]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
INFO: 172.17.0.1:41766 - "GET / HTTP/1.1" 200 OK
INFO: 172.17.0.1:41766 - "GET /favicon.ico HTTP/1.1" 404 Not Found
INFO: 172.17.0.1:41770 - "GET /docs HTTP/1.1" 200 OK
INFO: 172.17.0.1:41770 - "GET /openapi.json HTTP/1.1" 200 OK

Great you are ready to start using the application in your deployment environment.

Finally, we will try to integrate this microservice with a front-end application that will serve the agents and the conversations by calling the endpoints internally, which is the common way of building and interacting between services using this architecture.

The Usage Cycle

We can use this new service in multiple ways, and I will only focus on building a front-end application that calls the endpoints from our agents and makes it possible for users to interact via a UI. We will use Streamlit for this, as it is a simple way to spin up a front-end using Python.

Important Note: There are additional utilities that I added to our agents’ service that you can copy directly from the repository. Search for get_agents() ,get_conversations(), get_messages() from the crud.py module and the api/routes.py routes.

  • Install Streamlit and add it to our requirements.txt.
# Pin a version if you need
$ pip install streamlit==1.25.0
# Our requirements.txt (added streamlit)
$ cat requirements.txt
fastapi==0.95.2
ipykernel==6.22.0
jupyter-bokeh==2.0.2
jupyterlab==3.6.3
openai==0.27.6
pandas==2.0.1
sqlalchemy-orm==1.2.10
sqlalchemy==2.0.15
streamlit==1.25.0
uvicorn<0.22.0,>=0.21.1
  • Create the application by creating first a folder in our src folder with the name frontend. Create a new file named main.py and place the following code.
import streamlit as st
import requests

API_URL = "http://0.0.0.0:8000/agents" # We will use our local URL and port defined of our microservice for this example

def get_agents():
"""
Get the list of available agents from the API
"""
response = requests.get(API_URL + "/get-agents")
if response.status_code == 200:
agents = response.json()
return agents

return []

def get_conversations(agent_id: str):
"""
Get the list of conversations for the agent with the given ID
"""
response = requests.get(API_URL + "/get-conversations", params = {"agent_id": agent_id})
if response.status_code == 200:
conversations = response.json()
return conversations

return []

def get_messages(conversation_id: str):
"""
Get the list of messages for the conversation with the given ID
"""
response = requests.get(API_URL + "/get-messages", params = {"conversation_id": conversation_id})
if response.status_code == 200:
messages = response.json()
return messages

return []

def send_message(agent_id, message):
"""
Send a message to the agent with the given ID
"""
payload = {"conversation_id": agent_id, "message": message}
response = requests.post(API_URL + "/chat-agent", json = payload)
if response.status_code == 200:
return response.json()

return {"response": "Error"}

def main():
st.set_page_config(page_title = "🤗💬 AIChat")

with st.sidebar:
st.title("Conversational Agent Chat")

# Dropdown to select agent
agents = get_agents()
agent_ids = [agent["id"] for agent in agents]
selected_agent = st.selectbox("Select an Agent:", agent_ids)

for agent in agents:
if agent["id"] == selected_agent:
selected_agent_context = agent["context"]
selected_agent_first_message = agent["first_message"]

# Dropdown to select conversation
conversations = get_conversations(selected_agent)
conversation_ids = [conversation["id"] for conversation in conversations]
selected_conversation = st.selectbox("Select a Conversation:", conversation_ids)

if selected_conversation is None:
st.write("Please select a conversation from the dropdown.")
else:
st.write(f"**Selected Agent**: {selected_agent}")
st.write(f"**Selected Conversation**: {selected_conversation}")

# Display chat messages
st.title("Chat")
st.write("This is a chat interface for the selected agent and conversation. You can send messages to the agent and see its responses.")
st.write(f"**Agent Context**: {selected_agent_context}")

messages = get_messages(selected_conversation)
with st.chat_message("assistant"):
st.write(selected_agent_first_message)

for message in messages:
with st.chat_message("user"):
st.write(message["user_message"])
with st.chat_message("assistant"):
st.write(message["agent_message"])

# User-provided prompt
if prompt := st.chat_input("Send a message:"):
with st.chat_message("user"):
st.write(prompt)
with st.spinner("Thinking..."):
response = send_message(selected_conversation, prompt)
with st.chat_message("assistant"):
st.write(response["response"])

if __name__ == "__main__":
main()

The code below connects to our agent's microservice via API calls and allows the user to select the Agent and the Conversations and chat with the agent, similar to what ChatGPT provides. Let’s run this app by opening another terminal (make sure you have the agents microservice up and running on port 8000) and type $ streamlit run src/frontend/main.py and you are ready to go!

AI Chat Streamlit App — Image by author

Future Improvements and Conclusion

Future Improvements

There are several exciting opportunities for enhancing our Conversational Agent with a memory microservice. These improvements introduce advanced capabilities that can extend user interactions and expand the scope of our applications or overall system.

  1. Enhanced Error Handling: To ensure robust and reliable conversations, we could implement code to gracefully handle unexpected user inputs, API failures — dealing with OpenAI or other services, and potential issues that could arise during real-time interactions.
  2. Integrated Buffers and Conversation Summaries: The integration of buffers implemented by the LangChain framework, offers the potential to optimize token management, enabling conversations to span more extended periods without running into token limitations. Additionally, incorporating conversation summaries allows users to review the ongoing discussion, aiding in context retention and improving the overall user experience. Take note of the agent instructions and the response shape to extend this easily in our code.
  3. Data-aware Applications: We could create agents with unique and internal knowledge by connecting our agents’ models to other sources of data such as internal databases. This involves training or integrating models that can understand and respond to complex queries based on an understanding of your organization’s unique data and information — Check LangChain’s data connection modules.
  4. Model Diversification: While we’ve only used OpenAI’s GPT-3.5 model, the landscape of language model providers is expanding rapidly. Testing models from other providers can lead to comparative analysis, uncovering strengths and weaknesses, and enabling us to choose the best fit for specific use cases — Try playing with different LLM integrations such as HuggingFace, Cohere, Google’s, etc.

Conclusion

We have developed a microservice that provides intelligent agents powered by OpenAI GPT models and have proven how these agents can be packed with memory that lives outside of the client’s session. By adopting this architecture, we have unlocked a world of possibilities. From context-aware conversations to seamlessly integrating with sophisticated language models, our stack has become capable of providing new features to our products.

This implementation and the tangible benefits of it, make it clear that using AI is at the hands of anyone with the right tools and approach. The use of AI-powered agents is not only about prompt engineering but how we build tools and engage with them more effectively, offering personalized experiences, and tackling complex tasks with the finesse and precision that AI and software engineering can provide. So, whether you’re building a customer support system, a sales virtual assistant, a personal chef, or something entirely new, remember that the journey starts with a touch of code and an abundance of imagination — The possibilities are limitless.

The whole code for this article is in GitHub — You can find me on LinkedIn, feel free to connect!

--

--