
It’s been a while since my last LLM post and I’m excited to share that my prototype has been successfully productionized as Outside’s first LLM-powered chatbot, Scout. If you are an Outside+ member, you can check it out over at https://scout.outsideonline.com/.
This journey began as my weekend curiosity project back in March 2023. I had the idea to build a Q&A chatbot using OpenAI’s LLMs and Outside’s content as a knowledge base. Later I shared my prototype at our internal product demo day and I was thrilled by the interest it managed to spark. Scout quickly became an official project. On November 28th 2023, we launched Scout to limited Outside+ members. Fast forward to today, April 12th, 2024, over 28.3k unique users have already utilized this Outdoor Companion AI tool.
I couldn’t be more grateful for this moonstruck experience and I’ve been planning to write a mini-series to share some behind-the-scenes insights into what it takes to bring LLM & RAG powered apps to life. So far I’ve planned to cover the following three parts:
- 🦦 Part 1: Automate Pinecone Daily Upsert with Celery and Slack monitoring
- 🦦 Part 2: Building an LLM Websocket API in Django with Postman Testing
- 🦦 Part 3: Monitoring LLM Apps with Datadog: synthetic tests, OpenAI, and Pinecone usage tracking
This post will dive into Part 1, setting up scheduled tasks with Celery Beat to automatically upsert embeddings into the Pinecone vector database. And we’ll set up Slack updates for easy monitoring. Let’s get started!
The Promise of RAG: updated and relevant data
LLMs typically have training data cut off date, the current gpt-4-turbo was cut off at 2023-Dec (to my writing day -2024-April). The promise of using RAG is that we can equip LLMs with more fresh and domain specific data to reduce hallucinations and improve user experience. Thus the question: how can we keep the knowledge base fresh and up to date? The answer is – using Celery and Celery Beat to schedule a periodical task (daily or weekly) to embed newly published content and send them to Pinecone.
Celery vs Celery Beat vs Celery Worker
When my coworker first heard using celery, he was utterly confused: "If it’s 「celery」, why add 「beet」? Two vegetables?!!" If you read the official definitions, and you find yourself still confused about which is which, you are not alone. I have two analogies that might just help untangle this confusion once and for all.
The first analogy: imagine Celery as an orchestra, and Celery Beat as the conductor, who sets the pace and rhythm for the performance, hence the "Beat" is more of a music beat 🎵 and nothing to do with the vegetable beet.
The second one is my favorite: imagine Celery as a bee hive, Celery Worker as bee worker and Celery beat as one diligent bee tasked to keep track of the schedule. Now let’s bring the original definition back:
- Celery: a task queue, which can be used to distribute tasks across threads or machines
- Celery Beat: a scheduler, which can be used to schedule periodic tasks
- Celery Worker: the worker which execute the tasks.

Now we get the intuitive understanding of what celery can help us with, let’s write code for a real-world example to make it more concrete.
Daily Pinecone Upsert Pipeline
As a media company, we typically have new articles published every day. So we want to capture the newly published articles, create embeddings and store those embeddings to Pinecone.
Step 1: capture the new articles
This one is relatively easy, we use SQL
to select articles added to our ContentItem
table in the past 25 hours.
SELECT
item_uuid AS id,
item_source,
item_title AS title,
body_content AS content,
item_url,
item_image_url,
item_created_at
FROM content_items
WHERE
item_created_at >= CURRENT_TIMESTAMP - INTERVAL '25' HOUR
AND item_type = 'article'
AND sponsored = 0
AND syndicated = FALSE
AND item_uuid IS NOT NULL
AND item_title IS NOT NULL
AND LENGTH(body_content) > 100
ORDER BY item_created_at DESC
Step 2: Fetch -> Embed -> Upsert to Pinecone
This is the main script that defines the daily task to fetch new articles, embed them and upsert to Pinecone.
class UpdatePinecone:
"""
Daily Task to Fetch, Embed and Upsert Data to Pinecone
"""
def __init__(self, content_type):
self.batch_limit = 100
self.texts = []
self.metadatas = []
self.upserted_items = set()
self.omitted_items = set()
self.content_type = content_type
self.text_splitter = self.initialize_text_splitter()
def initialize_text_splitter(self):
if self.content_type == 'articles':
return RecursiveCharacterTextSplitter(
chunk_size=400,
chunk_overlap=20,
length_function=self.tiktoken_len,
separators=["nn", "n", " ", ""]
)
elif self.content_type == 'videos':
return RecursiveCharacterTextSplitter(
chunk_size=600,
chunk_overlap=40,
length_function=self.tiktoken_len,
separators=["nn", "n", " ", ""]
)
def connect_pinecone(self):
# Get relevant settings
openai_api_key = settings.OPENAI_API_KEY
pinecone_api_key = settings.PINECONE_API_KEY
index_name = "scout"
# initialize Pinecone
pinecone.init(api_key=pinecone_api_key,
environment="us-east-1-aws")
index = pinecone.Index(index_name)
embed = OpenAIEmbeddings(openai_api_key=openai_api_key, max_retries=2)
return index, embed
def tiktoken_len(self, text):
tokenizer = tiktoken.get_encoding('p50k_base')
tokens = tokenizer.encode(
text,
disallowed_special=()
)
return len(tokens)
def embed_upsert_batch(self, index, embed):
# Generate unique ids for each text chunk
ids = [str(uuid4()) for _ in range(len(self.texts))]
embeds = embed.embed_documents(self.texts)
try:
index.upsert(vectors=zip(ids, embeds, self.metadatas))
self.upserted_items.update(
metadata['item_url'] for metadata in self.metadatas)
except Exception as e:
logger.error(f"Error during upsert: {str(e)}")
self.omitted_items.update(
metadata['item_url'] for metadata in self.metadatas)
def upsert_pinecone(self):
# 0. Connect to Pinecone
index, embed = self.connect_pinecone()
# 1. Get Scout relevant item data from db based on content type
logger.info(f"FETCH {self.content_type.upper()} CONTENT ITEM DATA FOR SCOUT")
sql_file_path = os.path.join(
os.path.dirname(os.path.dirname(__file__)),
'discovery_service', 'sql', f'get_{self.content_type}_for_scout.sql')
with open(sql_file_path) as sql_file:
query = sql_file.read()
df_item = pd.read_sql(query, connection)
# convert df to dict
data = df_item.to_dict('records')
# 2. Embed and Upsert to Pinecone
for i, record in enumerate(data):
metadata = {
'item_uuid': str(record['id']),
'item_source': record['item_source'],
'item_title': record['title'],
'item_url': record['item_url'],
'item_image_url': record['item_image_url'],
'item_created_at': record['item_created_at']
}
record_texts = self.text_splitter.split_text(record['content'])
record_metadatas = [{
"chunk": j, "text": text, **metadata
} for j, text in enumerate(record_texts)]
self.texts.extend(record_texts)
self.metadatas.extend(record_metadatas)
# 5a. len(texts) is now at the batch limit, so we'll upsert it, empty the list, and repeat.
if len(self.texts) >= self.batch_limit:
self.embed_upsert_batch(index, embed)
# Reset self.texts and self.metadatas
self.texts = []
self.metadatas = []
# 5b. If there are any remaining items in texts, we'll upsert those as well
if len(self.texts) > 0:
self.embed_upsert_batch(index, embed)
# 6. Send Slack message with summary
slack.send_pinecone_upsert_report(self.content_type, self.upserted_items, self.omitted_items)
Above is the starting script which includes the usual components for Pinecone upsert:
- I created two sets
upserted_items
andomited_items
to help us track how many articles have been successuflly embeded and upserted and how many were failed. initialize_text_splitter
: this is a function worth pointing out. It’s based on Langchain’s recursive text splitter but it has different parameters based on the content type. The reason is due to the difference between articles and videos:- for articles: the body_content itself has paragraph (
/n/n
separator) and sentence (/n
separator), which result in meaningful chunks by using 400 tokens + 20 tokens overlap - for videos: the text data is from transcripts, which have many line breaker and captions like "(tranquil music)". We need to use a larger chunk to create meaningful embeddings. (600 tokens + 40 tokens overlap). Another tip for embedding video transcripts: I found that using transcripts alone wasn’t yielding good results. My current hack is concatenate item_title, item_subtitle and transcripts together as video’s content. This produces much better Q&A results.
upsert_pinecone
: this is the most important function which does the heavy-lifting to fetch and upsert data.- Get Scout relevant item data: this is to read the sql file we created in Step 1, which fetches articles published in the past 25 hours.
- Uptill
5b
, it’s adapted from James Briggs‘s sample code forpinecone-client==2.2.4
- What might be helpful to you is the final step – Send slack message with summary, which will post message to our dedicated slack channel like below.

Step 3: Slack monitoring
In order to send slack message about the task summary, you’ll need a separate python script like below:
import requests
from django.conf import settings
from slack import WebClient
# Use your organization slack url
webhook_url = 'https://hooks.slack.com/services/xxxxxx'
def _send(channel, msg, icon=":partydeploy:"):
if settings.SEND_SLACK:
host = socket.gethostname()
slack_data = {"channel": channel, "username": "discovobot",
"text": f"{settings.DEPLOYED_ENV} [{host}]n{msg}",
"icon_emoji": icon}
requests.post( # noqa: F841
webhook_url, data=json.dumps(slack_data),
headers={'Content-Type': 'application/json'}
)
else:
logger.debug(msg)
def send_pinecone_upsert_report(content_type, upserted_items, omitted_items):
msg = f"🌐 Pinecone Daily Report:n{'-' * 25}n"
f"📈 Upserted {len(upserted_items)} unique URLs.n"
f"🚫 Omitted {len(omitted_items)} URLs."
_send("#discovery-data-report", msg)
Notice that in _send
function, it would check which DEPLOYED_ENV
is, since you might have various environments like prod
, stage
and dev
. You can use this to specify case like – only send slack summary when the pipeline is running in production env.
Step 4: Add task to tasks.py
We store the long python script created in Step 2 as update_pinecone_task.py
under scripts
folder. So we just need to import the script and decorate the task with either @shared_task
or @app.task
.
@app.task(ignore_result=True)
def upsert_content_to_pinecone(content_type: str):
from scripts.update_pinecone_task import UpdatePinecone
if 'prod' not in settings.ENV:
return
update = UpdatePinecone(content_type=content_type)
update.upsert_pinecone()
Notice that we only want to run this in prod
environment, because we don’t want to accidentally embed articles in our stage database into Pinecone database. If you have stage environment in your Pinecone instance, then you could also try to run the pipeline in stage environment.
Step 5: Add our task to Celery Beat Schedule
This is the final step! In Django, we have settings.py
, and we can add our task with desired running schedule to Celery Beat Schedule.
CELERY_BEAT_SCHEDULE = {
"Upsert Articles to Pinecone": {
"task": "discovery_service.tasks.upsert_content_to_pinecone",
"args": [],
"kwargs": {"content_type": "articles"},
"schedule": crontab(hour=1, minute=0), # daily at 1am (MTN)
},
"Upsert Videos to Pinecone": {
"task": "discovery_service.tasks.upsert_content_to_pinecone",
"args": [],
"kwargs": {"content_type": "videos"},
"schedule": crontab(hour=2, minute=0, day_of_week=6), # Every Sunday at 2am (MTN)
}
Configuring Celery Beat Schedule In settings.py
, we define the Celery Beat schedule using a dictionary format. Within this configuration, we specify the task to execute (discovery_service.tasks.upsert_content_to_pinecone
), along with any required arguments or keyword arguments. The schedule itself is set using crontab(hour=1, minute=0)
to ensure the article update task runs daily at 1 am in UTC time. For videos, because we don’t have new videos published every day, so we run this at weekly schedule instead of daily.
To customize crontab schedule, you can find more details here.
Summary
Let’s recap the workflow involved in setting up the Celery Beat task. Key files include update_pinecone_task.py
and tasks.py
. The former contains the logic for fetching, embedding, and upserting data into Pinecone, while the latter defines a celery task upsert_content_to_pinecone()
that orchestrates this process. Additionally, we utilize settings.py
to configure the Celery Beat schedule, ensuring our article task runs daily and video task runs weekly at specific times.
The heart of our Celery task resides in update_pinecone_task.py
. Here, we define a class UpdatePinecone
which encapsulates the logic for connecting to Pinecone, fetching data, embedding it, and performing the upsert. Additionally, error handling and reporting mechanisms, such as sending Slack messages, are integrated to ensure robustness and visibility into task execution.
That’s all for today. I hope this post is useful for you to keep your knowledge base up to date if you are building LLM RAG apps. If you have any questions, feel free to reach out or leave a comment here. And remember, if you’re considering building a Websocket API for LLM using Django, be sure to come back for Part 2 of this Productionize LLM series!
PS: I recently started a newsletter to share more personal / less technical stuff captured my fancy. If you like to receive occasional emails from me, feel free to sign up! (No pressure, only if you feel like it~) My wonderful mother-in-law subscribed to me after reading the first issue, so you’ll be in good company! 😁