How to Build a RAG System with a Self-Querying Retriever in LangChain

RAG + Filtering with Metadata = Great Movie Recommendations 🍿

Ed Izaguirre
Towards Data Science

--

Image of a person watching television. Image created in DALL·E 3.

Table of Contents

Links

Recently, I was browsing Max trying to find a movie to watch. Typically this involves browsing through the various lists presented to me, reading a few descriptions, and then picking something that sounds vaguely interesting. Sometimes it is a hit, sometimes not so much. I usually only touch the search function if I know the title of a film I am trying to watch or know the name of an actor I want. Otherwise searching is just not very useful.

I was suddenly hit with an idea: why can’t I use natural language to query a movie based more on the vibe or the substance of a movie, rather than just a title or actor? For example, why can’t I fire up Max, Netflix, or Hulu and type one of the following queries into the search bar:

  • Find me drama movies in English that are less than 2 hours long and feature pets.
  • Recommend zombie movies, but make sure they are funny.
  • I liked ‘Everything Everywhere all at Once’. Give me a similar film, but darker.

The beauty of this approach goes beyond a more natural way to search for films. This approach also preserves a user’s privacy. Rather than mine a user’s actions, likes, and dislikes to feed to a recommender system, this system uses no user data at all. The only thing required is a query.

So I built Film Search. This is a RAG-based system that takes in a user’s query, embeds it, and does a similarity search to find similar films. But it goes beyond vanilla RAG. This system uses what is called a self-querying retriever. What this allows for is filtering movies by their metadata, before doing a similarity search. So if a user has a query like “Recommend horror movies made after 1980 that features lots of explosions”, the search will first filter out all films that are not “horror movies made after the year 1980” before doing a similarity search for films that “feature lots of explosions”.

In this article, I will provide a high-level overview of how I made this system. The full code is provided in the links above if you want to go deeper.

Let’s dive in.

Retrieving the data

The data for this project came from The Movie Database (TMDB), with permission from the owner. Their API was simple to use, well maintained, and not heavily rate limited. I pulled the following film attributes from their API:

  • Title
  • Runtime (minutes)
  • Language
  • Overview
  • Release Year
  • Genre
  • Keywords describing the film
  • Actors
  • Directors
  • Places to stream
  • Places to buy
  • Places to rent
  • List of Production Companies

Below is a snippet of how data was pulled using the TMDB API and the response library from Python:

def get_data(API_key, Movie_ID, max_retries=5):
"""
Function to pull details of your film of interest in JSON format.

parameters:
API_key (str): Your API key for TMBD
Movie_ID (str): TMDB id for film of interest

returns:
dict: JSON formatted dictionary containing all details of your film of
interest
"""

query = 'https://api.themoviedb.org/3/movie/' + Movie_ID + \
'?api_key='+API_key + '&append_to_response=keywords,' + \
'watch/providers,credits'
for i in range(max_retries):
response = requests.get(query)
if response.status_code == 429:
# If the response was a 429, wait and then try again
print(
f"Request limit reached. Waiting and retrying ({i+1}/{
max_retries})")
time.sleep(2 ** i) # Exponential backoff
else:
dict = response.json()
return dict

Notice that the query requires movie IDs (which were also obtained using TMDB), as well as append_to_response, which allows me to pull several types of data e.g. keywords, watch providers, credits (directors and actors) in additional to some basic information about the film. There is also some basic scaffolding code in case I hit a rate limit, although this was never observed.

We then have to parse the JSON response. Here is a snippet showing how this was done for parsing the actors and directors who worked on a film:

credits = dict[‘credits’]
actor_list, director_list = [], []

# Parsing cast
cast = credits['cast']
NUM_ACTORS = 5
for member in cast[:NUM_ACTORS]:
actor_list.append(member["name"])

# Parsing crew
crew = credits['crew']
for member in crew:
if member['job'] == 'Director':
director_list.append(member["name"])

actor_str = ', '.join(list(set(actor_list)))
director_str = ', '.join(list(set(director_list)))

Note that I limited the number of actors to the top five in a film. I also had to specify that I was only interested in directors, as the response included other types of crew members such as editors, costume designers, etc.

All of this data was then compiled into CSV files. Each attribute listed above became a column, and each row now represents a particular film. Below is a short snippet of films from the 2008_movie_collection_data.csv file that was created programatically. For this project I got roughly the 100 top films from the years 1920–2023.

Snippet of movie data for demonstration purposes. By author.

Believe it or not, I still have not seen Kung Fu Panda. Perhaps I’ll have to after this project.

Upload documents to Pinecone

Next I had to upload the csv data to Pinecone. Typically chunking is important in a RAG system, but here each “document” (row of a CSV file) is fairly short, so chunking was not a concern. I first had to convert each CSV file to a LangChain document, and then specify which fields should be the primary content and which fields should be the metadata.

Here is a snippet of code used to construct these documents:

# Loading in data from all csv files
loader = DirectoryLoader(
path="./data",
glob="*.csv",
loader_cls=CSVLoader,
show_progress=True)

docs = loader.load()

metadata_field_info = [
AttributeInfo(
name="Title", description="The title of the movie", type="string"),
AttributeInfo(name="Runtime (minutes)",
description="The runtime of the movie in minutes", type="integer"),
AttributeInfo(name="Language",
description="The language of the movie", type="string"),
...
]

for doc in docs:
# Parse the page_content string into a dictionary
page_content_dict = dict(line.split(": ", 1)
for line in doc.page_content.split("\n") if ": " in line)

doc.page_content = 'Overview: ' + page_content_dict.get(
'Overview') + '. Keywords: ' + page_content_dict.get('Keywords')
doc.metadata = {field.name: page_content_dict.get(
field.name) for field in metadata_field_info}

# Convert fields from string to list of strings
for field in fields_to_convert_list:
convert_to_list(doc, field)

# Convert fields from string to integers
for field in fields_to_convert_int:
convert_to_int(doc, field)

DirectoryLoader from LangChain takes care of loading all csv files into documents. Then I need to specify what should be page_content and what should be metadata . This is an important decision. page_content will be embedded and used in similarity search during the retrieval phase. metadata will be used solely for filtering purposes before similarity search is done. I decided to take the overview and keywords properties and embed those, and the rest of the properties would be metadata. Further tweaking should be done to see if perhaps title should also be included in page_content, but I found this configuration works well for most user queries.

Then the documents have to be uploaded to Pinecone. This is a fairly straightforward process:

# Create empty index
PINECONE_KEY, PINECONE_INDEX_NAME = os.getenv(
'PINECONE_API_KEY'), os.getenv('PINECONE_INDEX_NAME')

pc = Pinecone(api_key=PINECONE_KEY)

# Uncomment if index is not created already
pc.create_index(
name=PINECONE_INDEX_NAME,
dimension=1536,
metric="cosine",
spec=PodSpec(
environment="gcp-starter"
)
)

# Target index and check status
pc_index = pc.Index(PINECONE_INDEX_NAME)
print(pc_index.describe_index_stats())

embeddings = OpenAIEmbeddings(model='text-embedding-ada-002')

vectorstore = PineconeVectorStore(
pc_index, embeddings
)

# Create record manager
namespace = f"pinecone/{PINECONE_INDEX_NAME}"
record_manager = SQLRecordManager(
namespace, db_url="sqlite:///record_manager_cache.sql"
)

record_manager.create_schema()

# Upload documents to pinecome
index(docs, record_manager, vectorstore,
cleanup="full", source_id_key="Website")

I’ll just highlight a few things here:

  • Using an SQLRecordManager ensures that duplicate documents are not uploaded to Pinecone if this code is run multiple times. If a document is modified, only that document is modified in the vector store.
  • We are using the classic text-embedding-ada-002 from OpenAI as our embedding model.

Creating the self-querying retriever

The self-querying retriever will allow us to filter the movies that are retrieved during RAG via the metadata we defined earlier. This will dramatically increase the usefulness of our film recommender.

One important consideration when choosing your vector store is to make sure that it supports filtering by metadata, because not all do. Here is a list of databases by LangChain that support self-querying retrieval. Another important consideration is what types of comparators are allowed for each vector store. Comparators are the method by which we filter via metadata. For example, we can use the eq comparator to make sure that our film falls under the science fiction genre: eq('Genre', 'Science Fiction') . Not all vector stores allow for all comparators. As an example, check out the allowed comparators in Chroma and how they vary from the comparators in Pinecone. We need to tell the model about what comparators are allowed to prevent it from accidentally writing a forbidden query.

In addition to telling the model what comparators exist, we can also feed the model examples of user queries and corresponding filters. This is known as few-shot learning, and it is invaluable to help guide your model.

To see where this helps, take a look at the following two user queries:

  • “Recommend some films by Yorgos Lanthimos.”
  • “Films similar to Yorgos Lanthmios movies.”

It is easy for my metadata filtering model to write the same filter query for each of these examples, even though I want them to be treated differently. The first should yield only films directed by Lanthimos, while the second should yield films that have a similar vibe to Lanthimos films. To ensure this behavior, I spoon-feed the model examples of my desired behavior. The beauty with language models is that they can use their “reasoning” abilities and world knowledge to generalize from these few-shot examples to other user queries.

document_content_description = "Brief overview of a movie, along with keywords"

# Define allowed comparators list
allowed_comparators = [
"$eq", # Equal to (number, string, boolean)
"$ne", # Not equal to (number, string, boolean)
"$gt", # Greater than (number)
"$gte", # Greater than or equal to (number)
"$lt", # Less than (number)
"$lte", # Less than or equal to (number)
"$in", # In array (string or number)
"$nin", # Not in array (string or number)
"$exists", # Has the specified metadata field (boolean)
]

examples = [
(
"Recommend some films by Yorgos Lanthimos.",
{
"query": "Yorgos Lanthimos",
"filter": 'in("Directors", ["Yorgos Lanthimos]")',
},
),
(
"Films similar to Yorgos Lanthmios movies.",
{
"query": "Dark comedy, absurd, Greek Weird Wave",
"filter": 'NO_FILTER',
},
),
...
]

metadata_field_info = [
AttributeInfo(
name="Title", description="The title of the movie", type="string"),
AttributeInfo(name="Runtime (minutes)",
description="The runtime of the movie in minutes", type="integer"),
AttributeInfo(name="Language",
description="The language of the movie", type="string"),
...
]

constructor_prompt = get_query_constructor_prompt(
document_content_description,
metadata_field_info,
allowed_comparators=allowed_comparators,
examples=examples,
)

output_parser = StructuredQueryOutputParser.from_components()
query_constructor = constructor_prompt | query_model | output_parser

retriever = SelfQueryRetriever(
query_constructor=query_constructor,
vectorstore=vectorstore,
structured_query_translator=PineconeTranslator(),
search_kwargs={'k': 10}
)

In addition to examples, the model also has to know a description of each metadata field. This helps it understand what metadata filtering is possible.

Finally, we construct our chain. Here query_model is an instance of GPT-4 Turbo using the OpenAI API. I recommend using GPT-4 instead of 3.5 for writing these metadata filter queries, since this is a critical step and one that 3.5 messes up on more frequently. search_kwargs={'k':10} tells the retriever to pull up the ten most similar films based on the user query.

Creating the chat model

Finally, after building the self-querying retriever we can build the standard RAG model on top of it. We begin by defining our chat model. This is what I call a summary model because it takes in a context (retrieved films + system message) and responds with a summary of each recommendation. This model can be GPT-3.5 Turbo if you are trying to keep costs down, or GPT-4 Turbo if you want the absolute best results.

In the system message I tell the bot what its goal is, and provide a series of recommendations and restrictions, the most important of which is to not recommend a film that is not provided to it by the self-querying retriever. In testing, I was having issues when a user query yielded no films from the database. For example, the query: “Recommend some horror films starring Matt Damon directed by Wes Anderson made before 1980” would cause the self-querying retriever to retrieve no films (because as awesome as it sounds that movie doesn’t exist). Presented with no film data in its context, the model would use its own (faulty) memory to try and recommend some films. This is not good behavior. I don’t want a Netflix recommender to discuss films that are not in the database. The system message below managed to stop this behavior. I did notice that GPT-4 is better at following instructions than GPT-3.5, which is expected.

chat_model = ChatOpenAI(
model=SUMMARY_MODEL_NAME,
temperature=0,
streaming=True,
)

prompt = ChatPromptTemplate.from_messages(
[
(
'system',
"""
Your goal is to recommend films to users based on their
query and the retrieved context. If a retrieved film doesn't seem
relevant, omit it from your response. If your context is empty
or none of the retrieved films are relevant, do not recommend films
, but instead tell the user you couldn't find any films
that match their query. Aim for three to five film recommendations,
as long as the films are relevant. You cannot recommend more than
five films. Your recommendation should be relevant, original, and
at least two to three sentences long.

YOU CANNOT RECOMMEND A FILM IF IT DOES NOT APPEAR IN YOUR
CONTEXT.

# TEMPLATE FOR OUTPUT
- **Title of Film**:
- Runtime:
- Release Year:
- Streaming:
- (Your reasoning for recommending this film)

Question: {question}
Context: {context}
"""
),
]
)

def format_docs(docs):
return "\n\n".join(f"{doc.page_content}\n\nMetadata: {doc.metadata}" for doc in docs)

# Create a chatbot Question & Answer chain from the retriever
rag_chain_from_docs = (
RunnablePassthrough.assign(
context=(lambda x: format_docs(x["context"])))
| prompt
| chat_model
| StrOutputParser()
)

rag_chain_with_source = RunnableParallel(
{"context": retriever, "question": RunnablePassthrough()}
).assign(answer=rag_chain_from_docs)

format_docs is used to format the information presented to the model so that it is easy to understand and parse. We present to the model both the page_content (overview and keywords) as well as the metadata (all other movie properties); anything it might need to better recommend a film to the user.

rag_chain_from_docs is a chain that takes the retrieved documents, formats them using format_docs , feeds the formatted documents into the context that the model then uses to answer the question. Finally we create rag_chain_with_source , which is a RunnableParallel that, as its name suggests, runs two operations in parallel: the self-querying retriever goes off to retrieve similar documents while the the query is simply passed to the model via RunnablePassthrough() . The results from the parallel components are then combined, and rag_chain_from_docs is used to generate the answer. Here source refers to the retriever, which access to all ‘source’ documents.

Because I want the answer to be streamed (e.g. presented to the user chunk by chunk like ChatGPT), we use the following code:

for chunk in rag_chain_with_source.stream(query):
for key in chunk:
if key == 'answer':
yield chunk[key]

Demonstration

Now to the fun part: playing with the model. As mentioned previously, Streamlit was used to create the frontend and for hosting the app. I won’t discuss the code for the UI here; please see the raw code for details on the implementation. It is fairly straightforward, and there are lots of other examples on the Streamlit website.

Film Search UI. By author.

There are several suggestions you can use, but let’s try our own query:

Example query and model response. By author.

Behind the scenes, the self-querying retriever made sure to filter out any films that were not in the French language. Then, it performed a similarity search for “coming of age stories”, resulting in ten films in the context. Finally the summarizer bot selected five films for recommendation. Note the range of films suggested: some with release dates as early as 1959 to as late as 2012. For convenience I ensure the bot includes the film’s runtime, release year, streaming providers, and a brief recommendation handcrafted by the bot.

(Side note: If you haven’t seen The 400 Blows, stop whatever you are doing, and go watch it immediately.)

Qualities that normally are seen as negatives in a large language model, such as the non-deterministic nature of its responses, are now positive. Ask the model the same question twice and you may get slightly different recommendations.

It is important to note some limitations of the current implementation:

  • There is no saving of recommendations. Users likely would want to revisit old recommendations.
  • Manual updating of raw data from The Movie Database. Automating this and having it update weekly would be a good idea.
  • Bad metadata filtering by the self-querying retrieval. For example the query “Ben Affleck films” could be problematic. This could mean films where Ben Affleck is the star or films that have been directed by Ben Affleck. This is an example where clarification of the query would be helpful.

Possible improvements to this project could be to perform a re-ranking of documents after retrieval. It could also be interesting to have a chat model that you can converse with in multi-turn conversations, rather then just a QA bot. One could also create an agent recommender that prompts the user with a clarifying question if the query is not clear.

Have fun searching for films!

--

--

Machine learning engineer. Focus on computer vision and natural language processing. A former college physics educator.