Prompt Compression and Query Optimization

Deeplearning.ai’daki “Prompt Compression and Query Optimization” kursunun Türkçe özetidir.

Introduction

This course shows how to integrate mature database features with vector search to minimize the cost of servicing a large RAG application. You will learn how to create a standard database with an extra vector index. In RAG applications, if the recovered context is particularly long, it results in a very long prompt, which can be costly when retrieving, say, 10,000 tokens. If you manage a rental comparison website that searches a million queries each day and LLM input tokens are $10 per million tokens, you could end up spending more than 36 million dollars per year. To assist you save money, this course will go over how to keep the retrieved results as compact and relevant as feasible.

Let's go over some of the strategies you'll learn in this article. Let's consider your rental application. A pre-filter or post-filter can be used to filter results by the number of bedrooms or bathrooms. An efficient pre-filter is performed during the database index construction stage. You create a new index with entries that match popular queries. For example, if you know you commonly receive inquiries about bedroom units, you can develop an index that includes the bedroom field. This is pre-filtering.

Post-filtering, on the other hand, occurs after a vector search query has been executed, and the result is subsequently filtered to choose the subset that meets the required criteria. Large-scale applications may employ both of these strategies simultaneously.

Another strategy for reducing the size of the result is termed projection, which selects a subset of the fields returned by a query. For example, out of 15 fields on a potential rental, you may only wish to return three. Name, number of bedrooms, and pricing. You could conduct all of these activities directly in your application, but the database can optimize them for performance and impose role-based access control. 

Another powerful strategy is to rerank search results. For example, after conducting a semantic search using the renter description's text embeddings, you can rerank the results based on additional data variables such as average star rating or number of reviews to move the most desired results to the top of the list. To create a better background for the LLM.

The final technique is quick compression. If the recovered information is lengthy, putting it all into an LLM prompt leads in an extremely long prompt that is expensive to complete. To reduce these expenses, employ a small, low-cost LLM that has been fine-tuned to compress prompts before transmitting them to the final LLM. There are many opportunities to improve relevance and save costs.

Vanilla Vector Search

String/text search

There is a massive amount of material on the internet. There are several approaches of comparing the similarity. Text search is the default strategy, which matches query keywords to data. Another option is to retrieve data based on its context, semantics, and meaning.

context/semantics/vector/meaning search

Vector search is used to comprehend context (semantic search), anticipate preferences (recommendations), and improve LLM processing (augmented retrieval). Vector search involves encoding data into high-dimensional vector representations, storing and indexing vectors in a specialized vector database, and performing searches based on similarity matching between query vectors and database vectors.

RAG (Retrieval Augmented Generation) is a system architecture approach for data retrieval (vector search). RAG is used to supplement LLMs with additional material. RAG finds appropriate information to supplement the query.

LLM chatting vs RAG enhanced LLM chatting

RAG benefits include providing LLMs with relevant, up-to-date information, lowering the likelihood of hallucinations, reducing the amount of information fed into LLMs, reducing the requirement to fine-tune LLMS for domain-specific scenarios, and exploiting private and domain-specific data.

It would be beneficial if you had a vector database to store a vectorised version of your data. To accomplish this, you can use MongoDB (MongoDB sponsors this course, but you could also use another vector database).

MongoDB

MongoDB is a NoSQL database for developers that can store operational, transactional, and vector data. It is adaptable, scalable, and works effectively.

Relational Data Example
NoSQL Data Example

In MongoDB NoSQL, a document is a fundamental data unit that is formatted similarly to a JSON object. Each document in MongoDB is a collection of key-value pairs, similar to a row in relational databases. Documents are dynamic, which means they can have several fields and structures inside the same collection. Documents contain the following elements: JSON, field-value pairs, nested documents, and data types.

Consider data modelling to guarantee that your documents are appropriately formatted. Data modelling is the process of creating the structure of documents and collections so that data can be efficiently represented and organized. It is about arranging how to store and link data among documents to improve efficiency, scalability, and your application's specific data access pattern. The structure and format of your data in a database might sometimes influence the layout of your application components. Ideally, you should start with the demands of your application rather than the data itself. You must first ask yourself, "How would I access my data?" This will dictate how you would represent the structure of your data. 

MongoDB allows you to apply your existing knowledge of pipelines, which is common in data processing and machine learning. The pipeline notion can be used for ideas within a database layer.

When running queries in MongoDB, you create an aggregate pipeline. An aggregate pipeline can be thought of as a series of data processing stages, each of which alters the data that passes through it. Because the pipeline includes many phases of data translation, this approach enables complicated query composition within MongoDB. A collection of operations known as stages that adjust data input to create desired results.

Aggregation Pipeline Visualization
Aggregation pipeline query example

The aforementioned aggregation pipeline fills items starting in January 2021, organizes them by category, calculates average likes and comments, and sorts the results in descending order based on average likes. 

In AI applications, data validation is required to ensure that data complies to a specific model. This lowers the likelihood of errors in the production system. Pydantic is a Python module that supports data validation, modeling, and administration. Pydantic provides tools that allow you to create data schemas, such as object definitions and property lists. Pydantic additionally ensures that data follows predefined schemas, data types, formats, and constraints. If a data schema fails to match the validation criteria, Pydantic responds by throwing an exception that describes the individual validation errors.

Before we start coding, let's go over the dataset. It consists of 5000 Airbnb listings uploaded on HuggingFace, including details such as address, description, transit reviews, and comments. This course will teach you how to design an Airbnb listing recommendation system utilizing RAG approaches. Each record or data point contains image embeddings of the listing as well as photographs and text embeddings from the space attribute's content. The OpenAI text-embedding-ada-002 model has analyzed the data and the space attribute. 

Here are the steps we'll take in the coding component of this lesson.

1. Data load and preparation
2. Database setup
3. Data ingestion
4. Vector Search Implementation
5. Handling user inquiries.
Let's create a RAG recommendation system that leverages vector search to extract relevant results from a vector database and give context to an LLM.

# Warning control
import warnings
warnings.filterwarnings('ignore')

#!pip install datasets pandas openai pymongo pydantic

Getting API Keys

You need to enter your own MONGO_URI and OPENAI_API_KEY keys in the following cell.

import os
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv()) # read local .env file
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
MONGO_URI = os.environ.get("MONGO_URI")

1.1 Data Loading

# 1. Dataset Loading
from datasets import load_dataset
import pandas as pd

# NOTE: Make sure you have an Hugging Face token (HF_TOKEN) in your development environemnt
# NOTE: https://huggingface.co/datasets/MongoDB/airbnb_embeddings
# NOTE: This dataset contains several records with datapoint representing an airbnb listing.
# NOTE: This dataset contains text and image embeddings, but this lessons only uses the text embeddings
dataset = load_dataset("MongoDB/airbnb_embeddings", streaming=True, split="train")
dataset = dataset.take(100)
# Convert the dataset to a pandas dataframe
dataset_df = pd.DataFrame(dataset)
dataset_df.head(5)
print("Columns:", dataset_df.columns)

1.2 Document Modelling

from typing import List, Optional
from pydantic import BaseModel, ValidationError
from datetime import datetime

class Host(BaseModel):
host_id: str
host_url: str
host_name: str
host_location: str
host_about: str
host_response_time: Optional[str] = None
host_thumbnail_url: str
host_picture_url: str
host_response_rate: Optional[int] = None
host_is_superhost: bool
host_has_profile_pic: bool
host_identity_verified: bool

class Location(BaseModel):
type: str
coordinates: List[float]
is_location_exact: bool

class Address(BaseModel):
street: str
government_area: str
market: str
country: str
country_code: str
location: Location

class Review(BaseModel):
_id: str
date: Optional[datetime] = None
listing_id: str
reviewer_id: str
reviewer_name: Optional[str] = None
comments: Optional[str] = None

class Listing(BaseModel):
_id: int
listing_url: str
name: str
summary: str
space: str
description: str
neighborhood_overview: Optional[str] = None
notes: Optional[str] = None
transit: Optional[str] = None
access: str
interaction: Optional[str] = None
house_rules: str
property_type: str
room_type: str
bed_type: str
minimum_nights: int
maximum_nights: int
cancellation_policy: str
last_scraped: Optional[datetime] = None
calendar_last_scraped: Optional[datetime] = None
first_review: Optional[datetime] = None
last_review: Optional[datetime] = None
accommodates: int
bedrooms: Optional[float] = 0
beds: Optional[float] = 0
number_of_reviews: int
bathrooms: Optional[float] = 0
amenities: List[str]
price: int
security_deposit: Optional[float] = None
cleaning_fee: Optional[float] = None
extra_people: int
guests_included: int
images: dict
host: Host
address: Address
availability: dict
review_scores: dict
reviews: List[Review]
text_embeddings: List[float]

records = dataset_df.to_dict(orient='records')

# To handle catch `NaT` values
for record in records:
for key, value in record.items():
# Check if the value is list-like; if so, process each element.
if isinstance(value, list):
processed_list = [None if pd.isnull(v) else v for v in value]
record[key] = processed_list
# For scalar values, continue as before.
else:
if pd.isnull(value):
record[key] = None

try:
# Convert each dictionary to a Movie instance
listings = [Listing(**record).dict() for record in records]
# Get an overview of a single datapoint
print(listings[0].keys())
except ValidationError as e:
print(e)

1.3 Database Creation and Connection

from pymongo.mongo_client import MongoClient
from pymongo.operations import SearchIndexModel

database_name = "airbnb_dataset"
collection_name = "listings_reviews"


def get_mongo_client(mongo_uri):
"""Establish connection to the MongoDB."""
# gateway to interacting with a MongoDB database cluster
client = MongoClient(mongo_uri, appname="devrel.deeplearningai.lesson1.python")
print("Connection to MongoDB successful")
return client

if not MONGO_URI:
print("MONGO_URI not set in environment variables")
mongo_client = get_mongo_client(MONGO_URI)
# Pymongo client of database and collection
db = mongo_client.get_database(database_name)
collection = db.get_collection(collection_name)

# Delete any existing records in the collection
collection.delete_many({})

1.4 Data Ingestion

# The ingestion process might take a few minutes
collection.insert_many(listings)
print("Data ingestion into MongoDB completed")

1.5 Vector Search Index Definition

# NOTE: This dataset contains text and image embeddings, but this lessons only uses the text embeddings
# The field containing the text embeddings on each document within the listings_reviews collection
text_embedding_field_name = "text_embeddings"
# MongoDB Atlas Vector Search index name
vector_search_index_name_text = "vector_index_text"

vector_search_index_model = SearchIndexModel(
definition={
"mappings": { # describes how fields in the database documents are indexed and stored
"dynamic": True, # automatically index new fields that appear in the document
"fields": { # properties of the fields that will be indexed.
text_embedding_field_name: {
"dimensions": 1536, # size of the vector.
"similarity": "cosine", # algorithm used to compute the similarity between vectors
"type": "knnVector",
}
},
}
},
name=vector_search_index_name_text, # identifier for the vector search index
)

# Check if the index already exists
index_exists = False
for index in collection.list_indexes():
print(index)
if index['name'] == vector_search_index_name_text:
index_exists = True
break

import time
# Create the index if it doesn't exist
if not index_exists:
try:
result = collection.create_search_index(model=vector_search_index_model)
print("Creating index...")
time.sleep(20) # Sleep for 20 seconds, adding sleep to ensure vector index has compeleted inital sync before utilization
print("Index created successfully:", result)
print("Wait a few minutes before conducting search with index to ensure index intialization")
except Exception as e:
print(f"Error creating vector search index: {str(e)}")
else:
print(f"Index '{vector_search_index_name_text}' already exists.")
# NOTE: if the output of this process is Error creating vector search index: Duplicate Index, you may proceed to the next cell if you intend to still use a previously created index

import openai
openai.api_key = OPENAI_API_KEY
def get_embedding(text):
"""Generate an embedding for the given text using OpenAI's API."""
# Check for valid input
if not text or not isinstance(text, str):
return None
try:
# Call OpenAI API to get the embedding
embedding = openai.embeddings.create(
input=text,
model="text-embedding-3-small", dimensions=1536).data[0].embedding
return embedding
except Exception as e:
print(f"Error in get_embedding: {e}")
return None

1.6 Compose Vector Search Query

def vector_search(user_query, db, collection, vector_index="vector_index_text"):
"""
Perform a vector search in the MongoDB collection based on the user query.

Args:
user_query (str): The user's query string.
db (MongoClient.database): The database object.
collection (MongoCollection): The MongoDB collection to search.
additional_stages (list): Additional aggregation stages to include in the pipeline.

Returns:
list: A list of matching documents.
"""


# Generate embedding for the user query
query_embedding = get_embedding(user_query)

if query_embedding is None:
return "Invalid query or embedding generation failed."

# Define the vector search stage
vector_search_stage = {
"$vectorSearch": {
"index": vector_index, # specifies the index to use for the search
"queryVector": query_embedding, # the vector representing the query
"path": text_embedding_field_name, # field in the documents containing the vectors to search against
"numCandidates": 150, # number of candidate matches to consider
"limit": 20 # return top 20 matches
}
}

# Define the aggregate pipeline with the vector search stage and additional stages
pipeline = [vector_search_stage]

# Execute the search
results = collection.aggregate(pipeline)

explain_query_execution = db.command( # sends a database command directly to the MongoDB server
'explain', { # return information about how MongoDB executes a query or command without actually running it
'aggregate': collection.name, # specifies the name of the collection on which the aggregation is performed
'pipeline': pipeline, # the aggregation pipeline to analyze
'cursor': {} # indicates that default cursor behavior should be used
},
verbosity='executionStats') # detailed statistics about the execution of each stage of the aggregation pipeline


vector_search_explain = explain_query_execution['stages'][0]['$vectorSearch']
millis_elapsed = vector_search_explain['explain']['collectStats']['millisElapsed']

print(f"Total time for the execution to complete on the database server: {millis_elapsed} milliseconds")

return list(results)

1.7 Handling User Query

class SearchResultItem(BaseModel):
name: str
accommodates: Optional[int] = None
address: Address
summary: Optional[str] = None
description: Optional[str] = None
neighborhood_overview: Optional[str] = None
notes: Optional[str] = None

from IPython.display import display, HTML

def handle_user_query(query, db, collection):
# Assuming vector_search returns a list of dictionaries with keys 'title' and 'plot'
get_knowledge = vector_search(query, db, collection)

# Check if there are any results
if not get_knowledge:
return "No results found.", "No source information available."

# Convert search results into a list of SearchResultItem models
search_results_models = [
SearchResultItem(**result)
for result in get_knowledge
]

# Convert search results into a DataFrame for better rendering in Jupyter
search_results_df = pd.DataFrame([item.dict() for item in search_results_models])

# Generate system response using OpenAI's completion
completion = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "system",
"content": "You are a airbnb listing recommendation system."},
{
"role": "user",
"content": f"Answer this user query: {query} with the following context:\n{search_results_df}"
}
]
)

system_response = completion.choices[0].message.content

# Print User Question, System Response, and Source Information
print(f"- User Question:\n{query}\n")
print(f"- System Response:\n{system_response}\n")

# Display the DataFrame as an HTML table
display(HTML(search_results_df.to_html()))

# Return structured response and source info as a string
return system_response

query = """
I want to stay in a place that's warm and friendly,
and not too far from resturants, can you recommend a place?
Include a reason as to why you've chosen your selection.
"""

handle_user_query(query, db, collection)

Filtering With Metadata

Let us create a MongoDB aggregation pipeline and learn how to use metadata to identify and limit search results, as well as modify and enhance them. Metadata is supplemental information that complements and characterizes primary data. It gives additional context, qualities, and information about the core data. Metadata also improves relevancy and allows for filtering and sorting. This may reduce the scope of the vector search query process.

For example, the Mona Lisa's image can be the key data, while the title, artist, medium, dimension, date completed, and location can be the metadata. You can save this in MongoDB.

To streamline the results of a vector search operation and increase relevance, metadata will be used in filtering stages that are combined with a vector search stage in an aggregation pipeline. Stages enable the creation of sophisticated database queries. Post-filtering is a filtering technique.

Post Filtering

This is where a vector search is performed, and the results are narrowed based on particular criteria known as filters. The first vector search operation on all data is performed, followed by the vector search stage, which is used to further decrease the return result depending on certain criteria. 

Pre Filtering

The filter operation or stage is applied to the entire dataset to remove results that do not fit the filter requirements. Following the initial reduction, a vector search is done to the filter stage output. This method reduces the amount of data that vector search will process for similarity measurements.

The distinction between post-filtering and pre-filtering is that post-filtering may minimize the number of pages used for semantic similarity to a user query vector. This means there may be a loss of information or records that are semantically comparable to the user query but are not returned as a result of the filter stage.

Let's put up a new pipeline, add the post-filter step, handle the user query, then add the pre-filter stage and repeat the process.

# Warning control
import warnings
warnings.filterwarnings('ignore')

import custom_utils

Data Loading

# 1. Dataset Loading
from datasets import load_dataset
import pandas as pd

dataset = load_dataset("MongoDB/airbnb_embeddings", streaming=True, split="train")
dataset = dataset.take(100)
# Convert the dataset to a pandas dataframe
dataset_df = pd.DataFrame(dataset)
dataset_df.head(5)
print("Columns:", dataset_df.columns)

Document Modelling

The process_records() method accepts the supplied dataset and conforms each data point to the model. The result is a Python list of datapoints, each of which is an Airbnb listing that meets the model's specifications.

listings = custom_utils.process_records(dataset_df)

Database Creation and Connection

db, collection = custom_utils.connect_to_database()
# Delete any existing records in the collection
collection.delete_many({})

Data Ingestion

# The ingestion process might take a few minutes
collection.insert_many(listings)
print("Data ingestion into MongoDB completed")

Vector Search Index Definition

custom_utils.setup_vector_search_index(collection=collection)

Compose Vector Search Query

def vector_search(user_query, db, collection, additional_stages=[], vector_index="vector_index_text"):
"""
Perform a vector search in the MongoDB collection based on the user query.

Args:
user_query (str): The user's query string.
db (MongoClient.database): The database object.
collection (MongoCollection): The MongoDB collection to search.
additional_stages (list): Additional aggregation stages to include in the pipeline.

Returns:
list: A list of matching documents.
"""


# Generate embedding for the user query
query_embedding = custom_utils.get_embedding(user_query)

if query_embedding is None:
return "Invalid query or embedding generation failed."

# Define the vector search stage
vector_search_stage = {
"$vectorSearch": {
"index": vector_index, # specifies the index to use for the search
"queryVector": query_embedding, # the vector representing the query
"path": "text_embeddings", # field in the documents containing the vectors to search against
"numCandidates": 150, # number of candidate matches to consider
"limit": 20, # return top 20 matches
}
}

# Define the aggregate pipeline with the vector search stage and additional stages
pipeline = [vector_search_stage] + additional_stages

# Execute the search
results = collection.aggregate(pipeline)

explain_query_execution = db.command( # sends a database command directly to the MongoDB server
'explain', { # return information about how MongoDB executes a query or command without actually running it
'aggregate': collection.name, # specifies the name of the collection on which the aggregation is performed
'pipeline': pipeline, # the aggregation pipeline to analyze
'cursor': {} # indicates that default cursor behavior should be used
},
verbosity='executionStats') # detailed statistics about the execution of each stage of the aggregation pipeline

vector_search_explain = explain_query_execution['stages'][0]['$vectorSearch']
millis_elapsed = vector_search_explain['explain']['collectStats']['millisElapsed']

print(f"Total time for the execution to complete on the database server: {millis_elapsed} milliseconds")

return list(results)

Handling User Query

from pydantic import BaseModel
from typing import Optional

class SearchResultItem(BaseModel):
name: str
accommodates: Optional[int] = None
bedrooms: Optional[int] = None
address: custom_utils.Address
space: str = None

from IPython.display import display, HTML

def handle_user_query(query, db, collection, stages=[], vector_index="vector_index_text"):
# Assuming vector_search returns a list of dictionaries with keys 'title' and 'plot'
get_knowledge = vector_search(query, db, collection, stages, vector_index)

# Check if there are any results
if not get_knowledge:
return "No results found.", "No source information available."

# Convert search results into a list of SearchResultItem models
search_results_models = [
SearchResultItem(**result)
for result in get_knowledge
]

# Convert search results into a DataFrame for better rendering in Jupyter
search_results_df = pd.DataFrame([item.dict() for item in search_results_models])

# Generate system response using OpenAI's completion
completion = custom_utils.openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "system",
"content": "You are a airbnb listing recommendation system."},
{
"role": "user",
"content": f"Answer this user query: {query} with the following context:\n{search_results_df}"
}
]
)

system_response = completion.choices[0].message.content

# Print User Question, System Response, and Source Information
print(f"- User Question:\n{query}\n")
print(f"- System Response:\n{system_response}\n")

# Display the DataFrame as an HTML table
display(HTML(search_results_df.to_html()))

# Return structured response and source info as a string
return system_response

Adding a Post Filter to Vector Search (Match Operator)

import re
# Specifying the metadata field to limit documents on
search_path = "address.country"

# Create a match stage
match_stage = {
"$match": {
search_path: re.compile(r"United States"),
"accommodates": { "$gt": 1, "$lt": 5}
}
}

additional_stages = [match_stage]

query = """
I want to stay in a place that's warm and friendly,
and not too far from resturants, can you recommend a place?
Include a reason as to why you've chosen your selection"
"""

handle_user_query(query, db, collection, additional_stages)

Adding a PreFilter to Vector Search

from pymongo.operations import SearchIndexModel
import time

vector_index_with_filter = "vector_index_with_filter"

new_vector_search_index_model = SearchIndexModel(
definition={
"mappings": {
"dynamic": True,
"fields": {
"text_embeddings": {
"dimensions": 1536,
"similarity": "cosine",
"type": "knnVector",
},
"accommodates": {
"type": "number"
},
"bedrooms": {
"type": "number"
},
},
}
},
name=vector_index_with_filter,
)

# Create the new index
try:
result = collection.create_search_index(model=new_vector_search_index_model)
print("Creating index...")
time.sleep(20) # Sleep for 20 seconds, adding sleep to ensure vector index has compeleted inital sync before utilization
print("New index created successfully:", result)
except Exception as e:
print(f"Error creating new vector search index: {str(e)}")

def vector_search(user_query, db, collection, additional_stages=[], vector_index="vector_index_text"):
query_embedding = custom_utils.get_embedding(user_query)
if query_embedding is None:
return "Invalid query or embedding generation failed."

vector_search_stage = {
"$vectorSearch": {
"index": vector_index, # specifies the index to use for the search
"queryVector": query_embedding, # the vector representing the query
"path": "text_embeddings", # field in the documents containing the vectors to search against
"numCandidates": 150, # number of candidate matches to consider
"limit": 20, # return top 20 matches
"filter": {
"$and": [
{"accommodates": {"$gte": 2}},
{"bedrooms": {"$lte": 7}}
]
},
}
}
pipeline = [vector_search_stage] + additional_stages
results = collection.aggregate(pipeline)
explain_query_execution = db.command( # sends a database command directly to the MongoDB server
'explain', { # return information about how MongoDB executes a query or command without actually running it
'aggregate': collection.name, # specifies the name of the collection on which the aggregation is performed
'pipeline': pipeline, # the aggregation pipeline to analyze
'cursor': {} # indicates that default cursor behavior should be used
},
verbosity='executionStats') # detailed statistics about the execution of each stage of the aggregation pipeline

vector_search_explain = explain_query_execution['stages'][0]['$vectorSearch']
millis_elapsed = vector_search_explain['explain']['collectStats']['millisElapsed']

print(f"Total time for the execution to complete on the database server: {millis_elapsed} milliseconds")
return list(results)

query = """
I want to stay in a place that's warm and friendly,
and not too far from resturants, can you recommend a place?
Include a reason as to why you've chosen your selection"
"""

handle_user_query(
query,
db,
collection,
vector_index=vector_index_with_filter
)

Projections

Learn how to simplify database outputs by integrating a projection stage into the MongoDB aggregation process. This will significantly minimize the quantity of data returned, hence improving performance and data handling. 

In this situation, we will not use all of the fields in the documents supplied by the aggregate pipeline, but will instead rely on the application layer to remove undesirable attributes or fields. This can have negative consequences such as higher network traffic and processing time because undesired data must still be transferred and then filtered out at the application layer. The MongoDB database allows for the inclusion or exclusion of individual fields as an additional stage in the aggregate process. This is accomplished via a technique called as projection, which generates the same number of documents as the previous stage but minimizes the fields returned by each document. 

The same number of data results but fewer data

By definition, projection is the process of extracting specific fields from a document rather than the complete document. The projection mechanism in MongoDB works by specifying the fields to include or exclude from the final documents. For example, in the Mona Lisa example, metadata can be projected to use less data, such as the title, medium, and date.Completed.

The benefits of projection include less overall memory consumption on the client side, reduced data transfer, and optimized query speed.

# Warning control
import warnings
warnings.filterwarnings('ignore')

import custom_utils

Data Loading

from datasets import load_dataset
import pandas as pd

dataset = load_dataset("MongoDB/airbnb_embeddings", streaming=True, split="train")
dataset = dataset.take(100)
# Convert the dataset to a pandas dataframe
dataset_df = pd.DataFrame(dataset)
dataset_df.head(5)

print("Columns:", dataset_df.columns)

Document Modelling

listings = custom_utils.process_records(dataset_df)

Database Creation and Connection

db, collection = custom_utils.connect_to_database()
# Delete any existing records in the collection
collection.delete_many({})

Data Ingestion

collection.insert_many(listings)
print("Data ingestion into MongoDB completed")

Vector Search Index Definition

Creating an optimal vector search index for data containing the accommodates and bedroom qualities. 
# Create vector search index
custom_utils.setup_vector_search_index_with_filter(collection=collection)

Handling User Query

from pydantic import BaseModel
from typing import Optional

# Note: Ensure that the projection document in the projection stage matches the search result model.
class SearchResultItem(BaseModel):
name: str
accommodates: Optional[int] = None
address: custom_utils.Address
summary: Optional[str] = None
space: Optional[str] = None
neighborhood_overview: Optional[str] = None
notes: Optional[str] = None
score: Optional[float]=None

from IPython.display import display, HTML

def handle_user_query(query, db, collection, stages=[], vector_index="vector_index_text"):
get_knowledge = custom_utils.vector_search_with_filter(query, db, collection, stages, vector_index)

if not get_knowledge:
return "No results found.", "No source information available."

print("List of all fields of the first document, before model conformance")
print(get_knowledge[0].keys())

search_results_models = [
SearchResultItem(**result)
for result in get_knowledge
]

search_results_df = pd.DataFrame([item.dict() for item in search_results_models])

completion = custom_utils.openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "system",
"content": "You are a airbnb listing recommendation system."},
{
"role": "user",
"content": f"Answer this user query: {query} with the following context:\n{search_results_df}"
}
]
)
system_response = completion.choices[0].message.content
print(f"- User Question:\n{query}\n")
print(f"- System Response:\n{system_response}\n")
display(HTML(search_results_df.to_html()))
return system_response

Adding a Projection Stage


projection_stage = {
"$project": {
"_id": 0,
"name": 1,
"accommodates": 1,
"address.street": 1,
"address.government_area": 1,
"address.market": 1,
"address.country": 1,
"address.country_code": 1,
"address.location.type": 1,
"address.location.coordinates": 1,
"address.location.is_location_exact": 1,
"summary": 1,
"space": 1,
"neighborhood_overview": 1,
"notes": 1,
"score": {"$meta": "vectorSearchScore"}
}
}

additional_stages = [projection_stage]

Results

The fields listed in the docs are the ones we used in the projection.

query = """
I want to stay in a place that's warm and friendly,
and not too far from resturants, can you recommend a place?
Include a reason as to why you've chosen your selection"
"""

handle_user_query(
query,
db,
collection,
additional_stages,
vector_index="vector_index_with_filter"
)

Boosting

Learn how to organize documents for better information retrieval relevance and quality. We will learn how to use specified metadata values to determine the reordering position.

Boosting is the process of prioritizing or reranking documents using metadata or past database actions. Boosting can increase relevance, retrieval credibility, and customize results.

average rating or number of reviews are numerical fields that are a good fit for boost ranking.

Let's set up the RAG pipeline, including essential stages, boosting logic, and handling user requests.

# Warning control
import warnings
warnings.filterwarnings('ignore')

import custom_utils

Data Loading

from datasets import load_dataset
import pandas as pd

dataset = load_dataset("MongoDB/airbnb_embeddings", streaming=True, split="train")
dataset = dataset.take(100)
# Convert the dataset to a pandas dataframe
dataset_df = pd.DataFrame(dataset)
dataset_df.head(5)
print("Columns:", dataset_df.columns)

Document Modelling

listings = custom_utils.process_records(dataset_df)

Database Creation and Connection

db, collection = custom_utils.connect_to_database()
# Delete any existing records in the collection
collection.delete_many({})

Data Ingestion

collection.insert_many(listings)
print("Data ingestion into MongoDB completed")

Vector Search Index Definition

# Create vector search index
custom_utils.setup_vector_search_index_with_filter(collection=collection)

Handling User Query

from pydantic import BaseModel
from typing import Optional

class SearchResultItem(BaseModel):
name: str
accommodates: Optional[int] = None
address: custom_utils.Address
averageReviewScore: Optional[float] = None
number_of_reviews: Optional[float] = None
combinedScore: Optional[float] = None

from IPython.display import display, HTML

def handle_user_query(query, db, collection, stages=[], vector_index="vector_index_text"):
get_knowledge = custom_utils.vector_search_with_filter(query, db, collection, stages, vector_index)

if not get_knowledge:
return "No results found.", "No source information available."

print("List of all fields of the first document, before model conformance")
print(get_knowledge[0].keys())

search_results_models = [
SearchResultItem(**result)
for result in get_knowledge
]

search_results_df = pd.DataFrame([item.dict() for item in search_results_models])

completion = custom_utils.openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "system",
"content": "You are a airbnb listing recommendation system."},
{
"role": "user",
"content": f"Answer this user query: {query} with the following context:\n{search_results_df}"
}
]
)
system_response = completion.choices[0].message.content
print(f"- User Question:\n{query}\n")
print(f"- System Response:\n{system_response}\n")
display(HTML(search_results_df.to_html()))
return system_response

Boosting Search Results After Vector Search

Let us add two more fields to each document returned from the database procedure. The averageReviewScore will go through each review component in a document and calculate the average of the sum of the review components. We can add together all of the review score values and divide them by the number of review components, which in this case is 6.

review_average_stage = {
"$addFields": {
"averageReviewScore": {
"$divide": [
{
"$add": [
"$review_scores.review_scores_accuracy",
"$review_scores.review_scores_cleanliness",
"$review_scores.review_scores_checkin",
"$review_scores.review_scores_communication",
"$review_scores.review_scores_location",
"$review_scores.review_scores_value",
]
},
6 # Divide by the number of review score types to get the average
]
},
# Calculate a score boost factor based on the number of reviews
"reviewCountBoost": "$number_of_reviews"
}
}

weighting_stage = {
"$addFields": {
"combinedScore": {
# Example formula that combines average review score and review count boost
"$add": [
{"$multiply": ["$averageReviewScore", 0.9]}, # Weighted average review score
{"$multiply": ["$reviewCountBoost", 0.1]} # Weighted review count boost
]
}
}
}

# Apply the combinedScore for sorting
sorting_stage_sort = {
"$sort": {"combinedScore": -1} # Descending order to boost higher combined scores
}

additional_stages = [review_average_stage, weighting_stage, sorting_stage_sort]

Results

query = """
I want to stay in a place that's warm and friendly,
and not too far from resturants, can you recommend a place?
Include a reason as to why you've chosen your selection"
"""

handle_user_query(
query,
db,
collection,
additional_stages,
vector_index="vector_index_with_filter"
)

Prompt Compression

LLMs with huge context windows are becoming the new normal. The disadvantages are cost and delay. Prompt compression seeks to address this issue. Prompt compression is the systematic reduction of tokens fed into a large language model. The goal is to keep or closely match the output quality of the original, uncompressed prompt.

Let's configure the RAG pipeline, add the necessary steps, apply compression logic, and handle user inquiries.

# Warning control
import warnings
warnings.filterwarnings('ignore')
#pip install llmlingua
import custom_utils

Data Loading

from datasets import load_dataset
import pandas as pd

dataset = load_dataset("MongoDB/airbnb_embeddings", streaming=True, split="train")
dataset = dataset.take(100)
# Convert the dataset to a pandas dataframe
dataset_df = pd.DataFrame(dataset)
dataset_df.head(5)
print("Columns:", dataset_df.columns)

Document Modelling

listings = custom_utils.process_records(dataset_df)

Database Creation and Connection

db, collection = custom_utils.connect_to_database()
# Delete any existing records in the collection
collection.delete_many({})

Data Ingestion

collection.insert_many(listings)
print("Data ingestion into MongoDB completed")

Vector Search Index Definition

# Create vector search index
custom_utils.setup_vector_search_index_with_filter(collection=collection)

Handling User Query

from pydantic import BaseModel
from typing import Optional

class SearchResultItem(BaseModel):
name: str
accommodates: Optional[int] = None
address: custom_utils.Address
neighborhood_overview: Optional[str] = None
notes: Optional[str] = None
averageReviewScore: Optional[float] = None
number_of_reviews: Optional[float] = None
combinedScore: Optional[float] = None

Boosting Search Results After Vector Search

review_average_stage = {
"$addFields": {
"averageReviewScore": {
"$divide": [
{
"$add": [
"$review_scores.review_scores_accuracy",
"$review_scores.review_scores_cleanliness",
"$review_scores.review_scores_checkin",
"$review_scores.review_scores_communication",
"$review_scores.review_scores_location",
"$review_scores.review_scores_value",
]
},
6 # Divide by the number of review score types to get the average
]
},
# Calculate a score boost factor based on the number of reviews
"reviewCountBoost": "$number_of_reviews"
}
}

weighting_stage = {
"$addFields": {
"combinedScore": {
# Example formula that combines average review score and review count boost
"$add": [
{"$multiply": ["$averageReviewScore", 0.3]}, # Weighted average review score
{"$multiply": ["$reviewCountBoost", 0.7]} # Weighted review count boost
]
}
}
}

# Apply the combinedScore for sorting
sorting_stage_sort = {
"$sort": {"combinedScore": -1} # Descending order to boost higher combined scores
}

additional_stages = [review_average_stage, weighting_stage, sorting_stage_sort]

Modified Handling User Query

from IPython.display import display, HTML
import pprint

def handle_user_query(query, db, collection, stages=[], vector_index="vector_index_text"):
get_knowledge = custom_utils.vector_search_with_filter(query, db, collection, stages, vector_index)

if not get_knowledge:
return "No results found.", "No source information available."

search_results_models = [
SearchResultItem(**result)
for result in get_knowledge
]

search_results_df = pd.DataFrame([item.dict() for item in search_results_models])

print("Uncompressed Prompt (Query Info):\n")
print(search_results_df)

completion = custom_utils.openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "system",
"content": "You are a airbnb listing recommendation system."},
{
"role": "user",
"content": f"Answer this user query: {query} with the following context:\n{search_results_df}"
}
]
)
system_response = completion.choices[0].message.content
print(f"- User Question:\n{query}\n")
print(f"- System Response:\n{system_response}\n")
display(HTML(search_results_df.to_html()))
return system_response

query = """
I want to stay in a place that's warm and friendly,
and not too far from resturants, can you recommend a place?
Include a reason as to why you've chosen your selection"
"""

handle_user_query(
query,
db,
collection,
additional_stages,
vector_index="vector_index_with_filter"
)
Uncompressed output

Prompt Compression

We utilized a smaller LLM to reduce a long prompt to a short one. The input prompt must be built in a precise way, with a component-based structure and three fields: demonstration, instruction, and question.

import json
from llmlingua import PromptCompressor

llm_lingua = PromptCompressor(
model_name="microsoft/llmlingua-2-bert-base-multilingual-cased-meetingbank",
model_config={"revision": "main"},
use_llmlingua2=True, # latest approach
device_map="cpu",
)

# Function definition
def compress_query_prompt(query):

demonstration_str = query['demonstration_str']
instruction = query['instruction']
question = query['question']

# 6x Compression
compressed_prompt = llm_lingua.compress_prompt(
demonstration_str.split("\n"),
instruction=instruction,
question=question,
target_token=500, # the compressed prompts length
rank_method="longllmlingua",
context_budget="+100", # compressed prompt length can be longer up to 100 tokens
dynamic_context_compression_ratio=0.4, # how assign tokens to context
reorder_context="sort", # reordering the context
)

return json.dumps(compressed_prompt, indent=4)

def handle_user_query_with_compression(query, db, collection, stages=[], vector_index="vector_index_text"):
# Perform vector search to get knowledge from the database
get_knowledge = custom_utils.vector_search_with_filter(query, db, collection, stages, vector_index)

# Check if there are any results
if not get_knowledge:
return None, "No results found."

# Convert search results into a list of SearchResultItem models
search_results_models = [SearchResultItem(**result) for result in get_knowledge]

# Convert search results into a DataFrame for better rendering
search_results_df = pd.DataFrame([item.dict() for item in search_results_models])

# Prepare information for compression
query_info = {
'demonstration_str': search_results_df.to_string(), # Results from information retrieval process
'instruction': "Write a high-quality answer for the given question using only the provided search results.",
'question': query # User query
}

# Compress the query prompt using predefined function
compressed_prompt = compress_query_prompt(query_info)

# Optional: Print compressed prompts for debugging
print("Compressed Prompt:\n")
pprint.pprint(compressed_prompt)
print("\n" + "=" * 80 + "\n")

return search_results_df, compressed_prompt

def handle_system_response(query, compressed_prompt):
# Generate system response using OpenAI's completion
completion = custom_utils.openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "system",
"content": "You are an Airbnb listing recommendation system."
},
{
"role": "user",
"content": f"Answer this user query: {query} with the following context:\n{compressed_prompt}"
}
]
)

system_response = completion.choices[0].message.content

# Print User Question, System Response
print(f"- User Question:\n{query}\n")
print(f"- System Response:\n{system_response}\n")

# Return the system response
return system_response

# Compress the query and get search results
results, compressed_prompt = handle_user_query_with_compression(query,
db,
collection,
additional_stages,
vector_index="vector_index_with_filter"
)

As you can see above, the 4284 origin tokens have been reduced to 512 at an 8.4x ratio, resulting in a $0.2 savings per prompt. This value increases over time and makes a significant difference.

if compressed_prompt:
# Handle the system response with the compressed prompt
system_response = handle_system_response(query, compressed_prompt)
else:
print("No valid results to display.")

More Learning Resources:

The MongoDB Developer: Center for tutorials, articles and videos: www.mongodb.com/developer/

The GenAI Showcase Repo: For code showcasing building AI applications and demo:

DeepLearning AI forum: For questions in regards to this course: https://community.deeplearning.ai/

Kaynaklar

Deeplearning.ai, (2024), Prompt Compression and Query Optimization:

[https://learn.deeplearning.ai/courses/prompt-compression-and-query-optimization/]

Post a Comment

0 Comments