Vector Databases + RAG Pipelines: Building Production-Ready AI Applications
Introduction
The AI landscape in 2025 is dominated by Large Language Models (LLMs) that can generate human-like text, but these models have a critical limitation: they can only work with the information they were trained on, and they often “hallucinate” or make up facts when asked about topics outside their training data. This is where Retrieval-Augmented Generation (RAG) comes in as a game-changing solution.
The Hallucination Problem and RAG Solution
Traditional LLMs like GPT-4, Claude, and others are incredibly powerful at generating coherent text, but they suffer from several fundamental limitations:
- Knowledge Cutoff: They can only access information from their training data, which has a fixed cutoff date
- Hallucination: They often generate plausible-sounding but incorrect information when they don’t know the answer
- Lack of Citations: They can’t provide sources or references for their claims
- Static Knowledge: They can’t access real-time or domain-specific information
RAG solves these problems by combining the generative capabilities of LLMs with the retrieval capabilities of vector databases. Instead of relying solely on the model’s internal knowledge, RAG systems:
- Retrieve relevant information from a knowledge base using semantic search
- Augment the LLM’s context with this retrieved information
- Generate responses that are grounded in the retrieved facts
This approach ensures that AI applications can provide accurate, up-to-date, and verifiable information while maintaining the natural language generation capabilities that make LLMs so powerful.
The Critical Role of Vector Databases
Vector databases are the backbone of modern RAG systems. They store and index high-dimensional vector representations (embeddings) of text, images, and other data types, enabling lightning-fast similarity searches across massive datasets.
Unlike traditional databases that search for exact matches or simple keyword matches, vector databases can find semantically similar content even when the exact words don’t match. This is crucial for RAG systems because:
- Semantic Understanding: They can find relevant documents even when the query uses different terminology
- Scalability: They can handle millions of documents with sub-second query times
- Flexibility: They support multiple data types (text, images, audio) in the same system
- Real-time Updates: They can be updated incrementally as new information becomes available
The Production-Ready Challenge
While the concept of RAG is straightforward, building production-ready RAG systems presents significant challenges:
- Latency Requirements: Users expect responses in seconds, not minutes
- Accuracy Demands: The retrieved information must be highly relevant to the query
- Scalability Needs: Systems must handle thousands of concurrent users
- Cost Optimization: Embedding generation and storage can be expensive
- Data Freshness: Information must stay current and accurate
In this comprehensive guide, we’ll explore how to build robust, scalable RAG pipelines using modern vector databases and best practices for production deployment.
How Vector Databases Work
To understand RAG systems, we need to dive deep into how vector databases work and why they’re essential for semantic search.
Understanding Embeddings
At the heart of vector databases are embeddings—numerical representations of text, images, or other data that capture semantic meaning. Embeddings are created by neural networks that have been trained to understand the relationships between different pieces of content.
# Example: Generating embeddings with OpenAI
import openai
import numpy as np
def generate_embedding(text):
"""Generate embedding for a piece of text using OpenAI's API"""
response = openai.Embedding.create(
model="text-embedding-ada-002",
input=text
)
return response['data'][0]['embedding']
# Example usage
text = "Vector databases enable semantic search across large datasets"
embedding = generate_embedding(text)
print(f"Embedding dimension: {len(embedding)}") # 1536 dimensions
print(f"Sample values: {embedding[:5]}") # First 5 values
Embeddings have several key properties:
- High Dimensionality: Modern embeddings typically have 384-1536 dimensions
- Semantic Similarity: Similar concepts have similar embedding vectors
- Mathematical Operations: You can perform vector operations like addition, subtraction, and similarity calculations
- Language Agnostic: The same concepts in different languages have similar embeddings
Similarity Search and Distance Metrics
Vector databases use similarity search to find the most relevant documents for a query. This involves calculating the distance between the query embedding and all stored document embeddings.
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
def calculate_similarity(query_embedding, document_embeddings):
"""Calculate cosine similarity between query and documents"""
similarities = cosine_similarity([query_embedding], document_embeddings)[0]
return similarities
def find_most_similar(query_embedding, document_embeddings, top_k=5):
"""Find the top-k most similar documents"""
similarities = calculate_similarity(query_embedding, document_embeddings)
top_indices = np.argsort(similarities)[::-1][:top_k]
return [(i, similarities[i]) for i in top_indices]
# Example usage
query = "How do vector databases work?"
query_embedding = generate_embedding(query)
documents = [
"Vector databases store high-dimensional embeddings for semantic search",
"Machine learning models generate embeddings from text and images",
"Traditional databases use exact matching while vector databases use similarity",
"RAG systems combine retrieval with generation for better AI responses"
]
document_embeddings = [generate_embedding(doc) for doc in documents]
similar_docs = find_most_similar(query_embedding, document_embeddings, top_k=3)
for idx, similarity in similar_docs:
print(f"Document {idx}: {documents[idx]} (similarity: {similarity:.3f})")
Common distance metrics include:
- Cosine Similarity: Measures the angle between vectors (most common for text)
- Euclidean Distance: Measures straight-line distance between points
- Dot Product: Measures the magnitude of projection of one vector onto another
- Manhattan Distance: Measures distance along grid lines
Approximate Nearest Neighbor (ANN) Indexes
Searching through millions of embeddings using exact similarity calculations would be prohibitively slow. Vector databases use Approximate Nearest Neighbor (ANN) indexes to trade perfect accuracy for dramatic speed improvements.
# Example: Using FAISS for efficient similarity search
import faiss
import numpy as np
class VectorIndex:
def __init__(self, dimension=1536):
# Create a FAISS index for cosine similarity
self.index = faiss.IndexFlatIP(dimension) # Inner product for cosine similarity
self.documents = []
def add_documents(self, embeddings, documents):
"""Add documents and their embeddings to the index"""
# Normalize embeddings for cosine similarity
embeddings_normalized = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
self.index.add(embeddings_normalized.astype('float32'))
self.documents.extend(documents)
def search(self, query_embedding, k=5):
"""Search for the k most similar documents"""
# Normalize query embedding
query_normalized = query_embedding / np.linalg.norm(query_embedding)
query_normalized = query_normalized.reshape(1, -1).astype('float32')
# Search the index
similarities, indices = self.index.search(query_normalized, k)
results = []
for i, (similarity, idx) in enumerate(zip(similarities[0], indices[0])):
if idx < len(self.documents): # Valid index
results.append({
'document': self.documents[idx],
'similarity': float(similarity),
'rank': i + 1
})
return results
# Example usage
index = VectorIndex(dimension=1536)
documents = [
"Vector databases enable semantic search across large datasets",
"Machine learning models generate embeddings from text and images",
"Traditional databases use exact matching while vector databases use similarity",
"RAG systems combine retrieval with generation for better AI responses"
]
embeddings = np.array([generate_embedding(doc) for doc in documents])
index.add_documents(embeddings, documents)
query = "How do vector databases work?"
query_embedding = generate_embedding(query)
results = index.search(query_embedding, k=3)
for result in results:
print(f"Rank {result['rank']}: {result['document']} (similarity: {result['similarity']:.3f})")
Vector Database Comparison
Different vector databases offer different trade-offs between performance, scalability, and ease of use:
PostgreSQL with pgvector
Pros: ACID compliance, SQL interface, mature ecosystem Cons: Limited scalability for very large datasets Best for: Applications requiring transactional consistency
-- Example: Setting up pgvector
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE documents (
id SERIAL PRIMARY KEY,
content TEXT NOT NULL,
embedding vector(1536),
metadata JSONB
);
-- Create an index for similarity search
CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
-- Insert a document with embedding
INSERT INTO documents (content, embedding, metadata)
VALUES (
'Vector databases enable semantic search',
'[0.1, 0.2, 0.3, ...]'::vector,
'{"source": "article", "date": "2025-08-30"}'
);
-- Search for similar documents
SELECT content, embedding <=> '[0.1, 0.2, 0.3, ...]'::vector as distance
FROM documents
ORDER BY embedding <=> '[0.1, 0.2, 0.3, ...]'::vector
LIMIT 5;
Pinecone
Pros: Managed service, excellent scalability, real-time updates Cons: Vendor lock-in, cost at scale Best for: Rapid prototyping and production deployments
import pinecone
# Initialize Pinecone
pinecone.init(api_key="your-api-key", environment="us-west1-gcp")
# Create an index
pinecone.create_index(
name="documents",
dimension=1536,
metric="cosine"
)
# Get the index
index = pinecone.Index("documents")
# Upsert vectors
vectors = [
("doc1", [0.1, 0.2, 0.3, ...], {"content": "Vector databases enable semantic search"}),
("doc2", [0.2, 0.3, 0.4, ...], {"content": "Machine learning models generate embeddings"})
]
index.upsert(vectors=vectors)
# Query the index
results = index.query(
vector=[0.1, 0.2, 0.3, ...],
top_k=5,
include_metadata=True
)
Weaviate
Pros: GraphQL interface, multi-modal support, schema flexibility Cons: Steeper learning curve, smaller community Best for: Complex data relationships and multi-modal applications
import weaviate
from weaviate.classes.init import Auth
# Initialize Weaviate client
client = weaviate.connect_to_wcs(
cluster_url="your-cluster-url",
auth_credentials=Auth.api_key("your-api-key")
)
# Create a collection
client.collections.create(
name="Document",
properties=[
weaviate.classes.config.Property(name="content", data_type=weaviate.classes.config.DataType.TEXT),
weaviate.classes.config.Property(name="source", data_type=weaviate.classes.config.DataType.TEXT)
],
vectorizer_config=weaviate.classes.config.Configure.Vectorizer.text2vec_openai()
)
# Add documents
documents = client.collections.get("Document")
documents.data.insert_many([
{"content": "Vector databases enable semantic search", "source": "article"},
{"content": "Machine learning models generate embeddings", "source": "tutorial"}
])
# Search documents
response = documents.query.near_text(
query="How do vector databases work?",
limit=5
)
Milvus
Pros: High performance, open source, excellent scalability Cons: Complex setup, requires infrastructure management Best for: Large-scale production deployments with custom infrastructure
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
# Connect to Milvus
connections.connect("default", host="localhost", port="19530")
# Define collection schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536)
]
schema = CollectionSchema(fields, "documents")
# Create collection
collection = Collection("documents", schema)
# Create index
index_params = {
"metric_type": "COSINE",
"index_type": "IVF_FLAT",
"params": {"nlist": 1024}
}
collection.create_index("embedding", index_params)
# Insert data
data = [
["Vector databases enable semantic search"],
["Machine learning models generate embeddings"],
[[0.1, 0.2, 0.3, ...], [0.2, 0.3, 0.4, ...]] # embeddings
]
collection.insert(data)
# Search
search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}}
results = collection.search(
data=[[0.1, 0.2, 0.3, ...]],
anns_field="embedding",
param=search_params,
limit=5,
output_fields=["content"]
)
Architecture of a RAG Pipeline
Building a production-ready RAG system requires careful consideration of each component in the pipeline. Let’s explore the complete architecture and how each piece works together.
High-Level RAG Architecture
A typical RAG pipeline consists of five main components:
- Data Ingestion: Collecting and preprocessing documents
- Embedding Generation: Converting text into vector representations
- Vector Storage: Storing embeddings in a vector database
- Retrieval: Finding relevant documents for a query
- Generation: Using retrieved context to generate responses
# Complete RAG Pipeline Architecture
import asyncio
from typing import List, Dict, Any
from dataclasses import dataclass
@dataclass
class Document:
id: str
content: str
metadata: Dict[str, Any]
embedding: List[float] = None
@dataclass
class Query:
text: str
embedding: List[float] = None
filters: Dict[str, Any] = None
@dataclass
class RAGResponse:
answer: str
sources: List[Document]
confidence: float
metadata: Dict[str, Any]
class RAGPipeline:
def __init__(self, embedding_model, vector_db, llm):
self.embedding_model = embedding_model
self.vector_db = vector_db
self.llm = llm
async def ingest_documents(self, documents: List[Document]) -> None:
"""Ingest documents into the RAG system"""
# Generate embeddings for all documents
for doc in documents:
doc.embedding = await self.embedding_model.embed(doc.content)
# Store in vector database
await self.vector_db.add_documents(documents)
async def query(self, query: Query) -> RAGResponse:
"""Process a query through the RAG pipeline"""
# Generate embedding for query
query.embedding = await self.embedding_model.embed(query.text)
# Retrieve relevant documents
retrieved_docs = await self.vector_db.search(
query.embedding,
top_k=5,
filters=query.filters
)
# Construct context from retrieved documents
context = self._build_context(retrieved_docs)
# Generate response using LLM
prompt = self._create_prompt(query.text, context)
answer = await self.llm.generate(prompt)
return RAGResponse(
answer=answer,
sources=retrieved_docs,
confidence=self._calculate_confidence(retrieved_docs),
metadata={"context_length": len(context)}
)
def _build_context(self, documents: List[Document]) -> str:
"""Build context string from retrieved documents"""
context_parts = []
for i, doc in enumerate(documents, 1):
context_parts.append(f"Document {i}:\n{doc.content}\n")
return "\n".join(context_parts)
def _create_prompt(self, query: str, context: str) -> str:
"""Create prompt for LLM with retrieved context"""
return f"""Based on the following context, answer the question. If the context doesn't contain enough information to answer the question, say so.
Context:
{context}
Question: {query}
Answer:"""
def _calculate_confidence(self, documents: List[Document]) -> float:
"""Calculate confidence score based on retrieved documents"""
if not documents:
return 0.0
# Simple confidence based on number and relevance of retrieved documents
return min(1.0, len(documents) / 3.0)
Data Ingestion and Preprocessing
The quality of your RAG system depends heavily on the quality of your ingested data. Proper preprocessing is crucial for good retrieval performance.
import re
from typing import List, Dict, Any
from dataclasses import dataclass
import hashlib
@dataclass
class ProcessedDocument:
id: str
content: str
metadata: Dict[str, Any]
chunks: List[str]
class DocumentProcessor:
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def process_document(self, raw_content: str, metadata: Dict[str, Any] = None) -> ProcessedDocument:
"""Process a raw document into chunks suitable for embedding"""
# Clean the content
cleaned_content = self._clean_text(raw_content)
# Generate document ID
doc_id = self._generate_id(cleaned_content, metadata)
# Split into chunks
chunks = self._chunk_text(cleaned_content)
return ProcessedDocument(
id=doc_id,
content=cleaned_content,
metadata=metadata or {},
chunks=chunks
)
def _clean_text(self, text: str) -> str:
"""Clean and normalize text"""
# Remove extra whitespace
text = re.sub(r'\s+', ' ', text)
# Remove special characters but keep punctuation
text = re.sub(r'[^\w\s\.\,\!\?\;\:\-\(\)]', '', text)
# Normalize unicode
text = text.encode('ascii', 'ignore').decode('ascii')
return text.strip()
def _generate_id(self, content: str, metadata: Dict[str, Any]) -> str:
"""Generate a unique ID for the document"""
# Use metadata if available, otherwise hash the content
if metadata and 'id' in metadata:
return str(metadata['id'])
# Create hash from content
content_hash = hashlib.md5(content.encode()).hexdigest()
return f"doc_{content_hash[:8]}"
def _chunk_text(self, text: str) -> List[str]:
"""Split text into overlapping chunks"""
chunks = []
start = 0
while start < len(text):
# Find the end of the chunk
end = start + self.chunk_size
if end >= len(text):
# Last chunk
chunk = text[start:]
else:
# Try to break at sentence boundary
last_period = text.rfind('.', start, end)
last_exclamation = text.rfind('!', start, end)
last_question = text.rfind('?', start, end)
# Find the latest sentence boundary
sentence_end = max(last_period, last_exclamation, last_question)
if sentence_end > start:
end = sentence_end + 1
chunk = text[start:end]
chunks.append(chunk.strip())
# Move start position for next chunk
start = end - self.chunk_overlap
if start >= len(text):
break
return chunks
# Example usage
processor = DocumentProcessor(chunk_size=1000, chunk_overlap=200)
raw_document = """
Vector databases are specialized databases designed to store and query high-dimensional vector data efficiently.
They are particularly useful for machine learning applications, especially those involving similarity search and
semantic understanding. Unlike traditional relational databases that store structured data in tables, vector
databases store embeddings - numerical representations of data that capture semantic meaning.
The key advantage of vector databases is their ability to perform similarity searches quickly. When you have
millions of documents, finding the most similar ones to a given query becomes computationally expensive with
traditional methods. Vector databases use specialized indexing techniques like Approximate Nearest Neighbor (ANN)
algorithms to make these searches fast and scalable.
Common use cases for vector databases include:
1. Semantic search engines
2. Recommendation systems
3. Image and video similarity search
4. Natural language processing applications
5. Anomaly detection systems
"""
processed_doc = processor.process_document(
raw_document,
metadata={"source": "article", "topic": "vector databases", "date": "2025-08-30"}
)
print(f"Document ID: {processed_doc.id}")
print(f"Number of chunks: {len(processed_doc.chunks)}")
for i, chunk in enumerate(processed_doc.chunks):
print(f"Chunk {i+1}: {chunk[:100]}...")
Embedding Generation Strategies
Choosing the right embedding model and generation strategy is critical for RAG performance.
import asyncio
from typing import List, Dict, Any
import openai
from sentence_transformers import SentenceTransformer
import numpy as np
class EmbeddingGenerator:
def __init__(self, model_name: str = "text-embedding-ada-002", batch_size: int = 100):
self.model_name = model_name
self.batch_size = batch_size
# Initialize the embedding model
if model_name.startswith("text-embedding-"):
# OpenAI embedding model
self.model_type = "openai"
self.client = openai.AsyncOpenAI()
else:
# Local sentence transformer model
self.model_type = "local"
self.model = SentenceTransformer(model_name)
async def embed_texts(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings for a list of texts"""
if self.model_type == "openai":
return await self._embed_with_openai(texts)
else:
return self._embed_with_local_model(texts)
async def _embed_with_openai(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings using OpenAI API"""
embeddings = []
# Process in batches to avoid rate limits
for i in range(0, len(texts), self.batch_size):
batch = texts[i:i + self.batch_size]
try:
response = await self.client.embeddings.create(
model=self.model_name,
input=batch
)
batch_embeddings = [data.embedding for data in response.data]
embeddings.extend(batch_embeddings)
# Add small delay to respect rate limits
await asyncio.sleep(0.1)
except Exception as e:
print(f"Error generating embeddings for batch {i}: {e}")
# Return zero vectors for failed batches
embeddings.extend([[0.0] * 1536] * len(batch))
return embeddings
def _embed_with_local_model(self, texts: List[str]) -> List[List[float]]:
"""Generate embeddings using local sentence transformer model"""
embeddings = self.model.encode(texts, convert_to_tensor=False)
return embeddings.tolist()
def embed_single(self, text: str) -> List[float]:
"""Generate embedding for a single text"""
if self.model_type == "openai":
# For single embeddings, we can use the sync API
response = openai.Embedding.create(
model=self.model_name,
input=text
)
return response['data'][0]['embedding']
else:
embedding = self.model.encode([text], convert_to_tensor=False)
return embedding[0].tolist()
# Example usage with different embedding strategies
async def demonstrate_embedding_strategies():
# Strategy 1: OpenAI embeddings (high quality, paid)
openai_generator = EmbeddingGenerator("text-embedding-ada-002")
# Strategy 2: Local embeddings (free, good quality)
local_generator = EmbeddingGenerator("all-MiniLM-L6-v2")
texts = [
"Vector databases enable semantic search",
"Machine learning models generate embeddings",
"RAG systems combine retrieval with generation"
]
# Generate embeddings
openai_embeddings = await openai_generator.embed_texts(texts)
local_embeddings = local_generator.embed_texts(texts)
print(f"OpenAI embedding dimension: {len(openai_embeddings[0])}")
print(f"Local embedding dimension: {len(local_embeddings[0])}")
# Compare similarity between different embedding types
from sklearn.metrics.pairwise import cosine_similarity
# Calculate similarity between first and second text
openai_sim = cosine_similarity([openai_embeddings[0]], [openai_embeddings[1]])[0][0]
local_sim = cosine_similarity([local_embeddings[0]], [local_embeddings[1]])[0][0]
print(f"OpenAI similarity: {openai_sim:.3f}")
print(f"Local similarity: {local_sim:.3f}")
# Run the demonstration
# asyncio.run(demonstrate_embedding_strategies())
Retrieval Strategies
The retrieval component is where the magic happens. Different retrieval strategies can significantly impact RAG performance.
from typing import List, Dict, Any, Optional
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
class RetrievalEngine:
def __init__(self, vector_db, reranker=None):
self.vector_db = vector_db
self.reranker = reranker
async def retrieve(
self,
query_embedding: List[float],
top_k: int = 10,
filters: Optional[Dict[str, Any]] = None,
use_reranker: bool = False
) -> List[Dict[str, Any]]:
"""Retrieve relevant documents for a query"""
# Initial retrieval with larger k if using reranker
initial_k = top_k * 3 if use_reranker else top_k
# Get initial candidates
candidates = await self.vector_db.search(
query_embedding,
top_k=initial_k,
filters=filters
)
if use_reranker and self.reranker:
# Rerank candidates using a more sophisticated model
reranked = await self._rerank_candidates(query_embedding, candidates)
return reranked[:top_k]
return candidates[:top_k]
async def _rerank_candidates(
self,
query_embedding: List[float],
candidates: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Rerank candidates using a cross-encoder model"""
if not self.reranker:
return candidates
# Prepare pairs for reranking
pairs = []
for candidate in candidates:
pairs.append((candidate['content'], candidate['content']))
# Get reranker scores
scores = await self.reranker.score(pairs)
# Sort candidates by reranker scores
scored_candidates = list(zip(candidates, scores))
scored_candidates.sort(key=lambda x: x[1], reverse=True)
# Return reranked candidates
return [candidate for candidate, score in scored_candidates]
async def hybrid_search(
self,
query: str,
query_embedding: List[float],
top_k: int = 10,
alpha: float = 0.5
) -> List[Dict[str, Any]]:
"""Perform hybrid search combining vector and keyword search"""
# Vector search
vector_results = await self.vector_db.search(
query_embedding,
top_k=top_k
)
# Keyword search (BM25 or similar)
keyword_results = await self.vector_db.keyword_search(
query,
top_k=top_k
)
# Combine results using reciprocal rank fusion
combined = self._reciprocal_rank_fusion(
vector_results,
keyword_results,
alpha=alpha
)
return combined[:top_k]
def _reciprocal_rank_fusion(
self,
results1: List[Dict[str, Any]],
results2: List[Dict[str, Any]],
alpha: float = 0.5
) -> List[Dict[str, Any]]:
"""Combine two result sets using reciprocal rank fusion"""
# Create mapping from doc_id to rank
rank_map = {}
# Add ranks from first result set
for i, result in enumerate(results1):
doc_id = result['id']
if doc_id not in rank_map:
rank_map[doc_id] = {'doc': result, 'score': 0.0}
rank_map[doc_id]['score'] += alpha / (i + 1)
# Add ranks from second result set
for i, result in enumerate(results2):
doc_id = result['id']
if doc_id not in rank_map:
rank_map[doc_id] = {'doc': result, 'score': 0.0}
rank_map[doc_id]['score'] += (1 - alpha) / (i + 1)
# Sort by combined score
combined = list(rank_map.values())
combined.sort(key=lambda x: x['score'], reverse=True)
return [item['doc'] for item in combined]
# Example usage
class MockVectorDB:
async def search(self, embedding, top_k, filters=None):
# Mock implementation
return [
{'id': f'doc_{i}', 'content': f'Document {i}', 'score': 0.9 - i*0.1}
for i in range(top_k)
]
async def keyword_search(self, query, top_k):
# Mock implementation
return [
{'id': f'doc_{i}', 'content': f'Document {i}', 'score': 0.8 - i*0.1}
for i in range(top_k)
]
# retrieval_engine = RetrievalEngine(MockVectorDB())
# results = await retrieval_engine.hybrid_search("vector databases", [0.1]*1536)
Generation and Response Synthesis
The final step in the RAG pipeline is generating a coherent response using the retrieved context.
from typing import List, Dict, Any
import openai
from dataclasses import dataclass
@dataclass
class GenerationConfig:
model: str = "gpt-4"
temperature: float = 0.1
max_tokens: int = 1000
system_prompt: str = None
class ResponseGenerator:
def __init__(self, config: GenerationConfig):
self.config = config
self.client = openai.AsyncOpenAI()
async def generate_response(
self,
query: str,
context: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""Generate a response using retrieved context"""
# Build the prompt
prompt = self._build_prompt(query, context)
# Generate response
response = await self.client.chat.completions.create(
model=self.config.model,
messages=[
{"role": "system", "content": self.config.system_prompt or self._get_default_system_prompt()},
{"role": "user", "content": prompt}
],
temperature=self.config.temperature,
max_tokens=self.config.max_tokens
)
answer = response.choices[0].message.content
# Extract sources
sources = self._extract_sources(context)
return {
"answer": answer,
"sources": sources,
"context_used": len(context),
"model": self.config.model
}
def _build_prompt(self, query: str, context: List[Dict[str, Any]]) -> str:
"""Build a prompt with retrieved context"""
context_text = self._format_context(context)
prompt = f"""Based on the following context, answer the question. If the context doesn't contain enough information to answer the question accurately, say so.
Context:
{context_text}
Question: {query}
Please provide a comprehensive answer based on the context provided. If you reference specific information, indicate which source it comes from."""
return prompt
def _format_context(self, context: List[Dict[str, Any]]) -> str:
"""Format context for the prompt"""
formatted_parts = []
for i, doc in enumerate(context, 1):
content = doc.get('content', '')
source = doc.get('metadata', {}).get('source', f'Document {i}')
formatted_parts.append(f"Source {i} ({source}):\n{content}\n")
return "\n".join(formatted_parts)
def _extract_sources(self, context: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Extract source information from context"""
sources = []
for doc in context:
source = {
"id": doc.get('id'),
"content": doc.get('content', '')[:200] + "...",
"metadata": doc.get('metadata', {}),
"score": doc.get('score', 0.0)
}
sources.append(source)
return sources
def _get_default_system_prompt(self) -> str:
"""Get default system prompt"""
return """You are a helpful AI assistant that provides accurate, well-sourced answers based on the context provided. Always cite your sources when possible and be transparent about the limitations of the information available."""
async def generate_with_citations(
self,
query: str,
context: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""Generate response with inline citations"""
# Add citation instructions to the prompt
citation_prompt = f"""Based on the following context, answer the question with inline citations. Use [Source X] format to cite specific information.
Context:
{self._format_context(context)}
Question: {query}
Answer with citations:"""
response = await self.client.chat.completions.create(
model=self.config.model,
messages=[
{"role": "system", "content": "You are a helpful AI assistant that provides well-cited answers based on the provided context."},
{"role": "user", "content": citation_prompt}
],
temperature=self.config.temperature,
max_tokens=self.config.max_tokens
)
answer = response.choices[0].message.content
return {
"answer": answer,
"sources": self._extract_sources(context),
"has_citations": "[Source" in answer
}
# Example usage
config = GenerationConfig(
model="gpt-4",
temperature=0.1,
max_tokens=500
)
generator = ResponseGenerator(config)
# Mock context
context = [
{
"id": "doc_1",
"content": "Vector databases are specialized databases designed to store and query high-dimensional vector data efficiently.",
"metadata": {"source": "article", "author": "John Doe"},
"score": 0.95
},
{
"id": "doc_2",
"content": "RAG systems combine retrieval with generation to provide more accurate and up-to-date responses.",
"metadata": {"source": "tutorial", "author": "Jane Smith"},
"score": 0.87
}
]
# Generate response
# response = await generator.generate_response("What are vector databases?", context)
# print(response['answer'])
## Implementing RAG with pgvector
Now let's build a complete, production-ready RAG system using PostgreSQL with the pgvector extension. This approach gives us the benefits of a mature, ACID-compliant database while adding powerful vector search capabilities.
### Setting Up PostgreSQL with pgvector
First, let's set up our database infrastructure:
```sql
-- Enable the pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;
-- Create our documents table
CREATE TABLE documents (
id SERIAL PRIMARY KEY,
content TEXT NOT NULL,
embedding vector(1536),
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- Create indexes for efficient querying
CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
CREATE INDEX ON documents USING gin (metadata);
CREATE INDEX ON documents (created_at);
-- Create a function to update the updated_at timestamp
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ language 'plpgsql';
-- Create trigger to automatically update updated_at
CREATE TRIGGER update_documents_updated_at
BEFORE UPDATE ON documents
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
Python Implementation
Now let’s create a complete Python implementation:
import asyncio
import asyncpg
import openai
import numpy as np
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import json
from datetime import datetime
@dataclass
class Document:
id: Optional[int]
content: str
embedding: Optional[List[float]]
metadata: Dict[str, Any]
@dataclass
class SearchResult:
id: int
content: str
metadata: Dict[str, Any]
similarity: float
class PgVectorRAG:
def __init__(self, db_url: str, openai_api_key: str):
self.db_url = db_url
self.openai_client = openai.AsyncOpenAI(api_key=openai_api_key)
self.embedding_model = "text-embedding-ada-002"
self.llm_model = "gpt-4"
async def connect(self):
"""Establish database connection"""
self.pool = await asyncpg.create_pool(self.db_url)
async def close(self):
"""Close database connection"""
await self.pool.close()
async def add_documents(self, documents: List[Document]) -> List[int]:
"""Add documents to the database with embeddings"""
# Generate embeddings for all documents
for doc in documents:
if doc.embedding is None:
doc.embedding = await self._generate_embedding(doc.content)
# Insert documents into database
async with self.pool.acquire() as conn:
ids = []
for doc in documents:
row = await conn.fetchrow(
"""
INSERT INTO documents (content, embedding, metadata)
VALUES ($1, $2, $3)
RETURNING id
""",
doc.content,
doc.embedding,
json.dumps(doc.metadata)
)
ids.append(row['id'])
return ids
async def search(
self,
query: str,
top_k: int = 5,
similarity_threshold: float = 0.7,
filters: Optional[Dict[str, Any]] = None
) -> List[SearchResult]:
"""Search for similar documents"""
# Generate embedding for query
query_embedding = await self._generate_embedding(query)
# Build the search query
search_query = """
SELECT id, content, metadata,
1 - (embedding <=> $1) as similarity
FROM documents
WHERE 1 - (embedding <=> $1) > $2
"""
params = [query_embedding, similarity_threshold]
# Add filters if provided
if filters:
filter_conditions = []
for key, value in filters.items():
filter_conditions.append(f"metadata->>'{key}' = ${len(params) + 1}")
params.append(str(value))
if filter_conditions:
search_query += " AND " + " AND ".join(filter_conditions)
search_query += " ORDER BY embedding <=> $1 LIMIT $3"
params.append(top_k)
# Execute search
async with self.pool.acquire() as conn:
rows = await conn.fetch(search_query, *params)
results = []
for row in rows:
results.append(SearchResult(
id=row['id'],
content=row['content'],
metadata=json.loads(row['metadata']) if row['metadata'] else {},
similarity=float(row['similarity'])
))
return results
async def generate_response(
self,
query: str,
search_results: List[SearchResult],
max_tokens: int = 500
) -> Dict[str, Any]:
"""Generate a response using retrieved context"""
if not search_results:
return {
"answer": "I couldn't find any relevant information to answer your question.",
"sources": [],
"confidence": 0.0
}
# Build context from search results
context = self._build_context(search_results)
# Create prompt
prompt = f"""Based on the following context, answer the question. If the context doesn't contain enough information to answer the question accurately, say so.
Context:
{context}
Question: {query}
Answer:"""
# Generate response
response = await self.openai_client.chat.completions.create(
model=self.llm_model,
messages=[
{"role": "system", "content": "You are a helpful AI assistant that provides accurate answers based on the provided context."},
{"role": "user", "content": prompt}
],
max_tokens=max_tokens,
temperature=0.1
)
answer = response.choices[0].message.content
# Calculate confidence based on similarity scores
avg_similarity = np.mean([r.similarity for r in search_results])
return {
"answer": answer,
"sources": [
{
"id": r.id,
"content": r.content[:200] + "...",
"similarity": r.similarity,
"metadata": r.metadata
}
for r in search_results
],
"confidence": avg_similarity,
"context_length": len(context)
}
async def _generate_embedding(self, text: str) -> List[float]:
"""Generate embedding for text using OpenAI"""
response = await self.openai_client.embeddings.create(
model=self.embedding_model,
input=text
)
return response.data[0].embedding
def _build_context(self, search_results: List[SearchResult]) -> str:
"""Build context string from search results"""
context_parts = []
for i, result in enumerate(search_results, 1):
context_parts.append(f"Source {i} (similarity: {result.similarity:.3f}):\n{result.content}\n")
return "\n".join(context_parts)
async def get_document_stats(self) -> Dict[str, Any]:
"""Get statistics about the documents in the database"""
async with self.pool.acquire() as conn:
# Total documents
total_docs = await conn.fetchval("SELECT COUNT(*) FROM documents")
# Documents with embeddings
docs_with_embeddings = await conn.fetchval(
"SELECT COUNT(*) FROM documents WHERE embedding IS NOT NULL"
)
# Average content length
avg_length = await conn.fetchval(
"SELECT AVG(LENGTH(content)) FROM documents"
)
# Most recent document
latest_doc = await conn.fetchval(
"SELECT MAX(created_at) FROM documents"
)
return {
"total_documents": total_docs,
"documents_with_embeddings": docs_with_embeddings,
"average_content_length": avg_length,
"latest_document_date": latest_doc.isoformat() if latest_doc else None
}
# Example usage
async def main():
# Initialize RAG system
rag = PgVectorRAG(
db_url="postgresql://user:password@localhost/ragdb",
openai_api_key="your-openai-api-key"
)
await rag.connect()
# Add some sample documents
documents = [
Document(
id=None,
content="Vector databases are specialized databases designed to store and query high-dimensional vector data efficiently.",
embedding=None,
metadata={"source": "article", "topic": "vector databases", "author": "John Doe"}
),
Document(
id=None,
content="RAG systems combine retrieval with generation to provide more accurate and up-to-date responses than traditional language models.",
embedding=None,
metadata={"source": "tutorial", "topic": "RAG", "author": "Jane Smith"}
),
Document(
id=None,
content="PostgreSQL with pgvector extension provides ACID compliance and powerful vector search capabilities in a single database.",
embedding=None,
metadata={"source": "documentation", "topic": "pgvector", "author": "PostgreSQL Team"}
)
]
# Add documents to database
doc_ids = await rag.add_documents(documents)
print(f"Added {len(doc_ids)} documents with IDs: {doc_ids}")
# Search for relevant documents
query = "What are vector databases and how do they work?"
search_results = await rag.search(query, top_k=3)
print(f"Found {len(search_results)} relevant documents:")
for result in search_results:
print(f"- ID {result.id}: {result.content[:100]}... (similarity: {result.similarity:.3f})")
# Generate response
response = await rag.generate_response(query, search_results)
print(f"\nGenerated Answer:\n{response['answer']}")
print(f"\nConfidence: {response['confidence']:.3f}")
print(f"Sources used: {len(response['sources'])}")
# Get statistics
stats = await rag.get_document_stats()
print(f"\nDatabase Statistics:\n{json.dumps(stats, indent=2)}")
await rag.close()
# Run the example
# asyncio.run(main())
Advanced pgvector Features
Let’s explore some advanced features that make pgvector powerful for production RAG systems:
-- Create a partitioned table for large-scale deployments
CREATE TABLE documents_partitioned (
id SERIAL,
content TEXT NOT NULL,
embedding vector(1536),
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
partition_key DATE DEFAULT CURRENT_DATE
) PARTITION BY RANGE (partition_key);
-- Create partitions for different time periods
CREATE TABLE documents_2025_01 PARTITION OF documents_partitioned
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
CREATE TABLE documents_2025_02 PARTITION OF documents_partitioned
FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');
-- Create indexes on partitions
CREATE INDEX ON documents_2025_01 USING ivfflat (embedding vector_cosine_ops) WITH (lists = 50);
CREATE INDEX ON documents_2025_02 USING ivfflat (embedding vector_cosine_ops) WITH (lists = 50);
-- Create a function for hybrid search (vector + text)
CREATE OR REPLACE FUNCTION hybrid_search(
query_embedding vector(1536),
query_text text,
similarity_threshold float DEFAULT 0.7,
limit_count int DEFAULT 10
)
RETURNS TABLE (
id int,
content text,
metadata jsonb,
similarity float,
text_rank float
) AS $$
BEGIN
RETURN QUERY
SELECT
d.id,
d.content,
d.metadata,
1 - (d.embedding <=> query_embedding) as similarity,
ts_rank(to_tsvector('english', d.content), plainto_tsquery('english', query_text)) as text_rank
FROM documents d
WHERE 1 - (d.embedding <=> query_embedding) > similarity_threshold
OR to_tsvector('english', d.content) @@ plainto_tsquery('english', query_text)
ORDER BY
GREATEST(
1 - (d.embedding <=> query_embedding),
ts_rank(to_tsvector('english', d.content), plainto_tsquery('english', query_text))
) DESC
LIMIT limit_count;
END;
$$ LANGUAGE plpgsql;
-- Create a function for batch similarity search
CREATE OR REPLACE FUNCTION batch_similarity_search(
query_embeddings vector(1536)[],
similarity_threshold float DEFAULT 0.7,
limit_per_query int DEFAULT 5
)
RETURNS TABLE (
query_index int,
doc_id int,
content text,
similarity float
) AS $$
DECLARE
query_embedding vector(1536);
query_idx int := 1;
BEGIN
FOREACH query_embedding IN ARRAY query_embeddings
LOOP
RETURN QUERY
SELECT
query_idx,
d.id,
d.content,
1 - (d.embedding <=> query_embedding) as similarity
FROM documents d
WHERE 1 - (d.embedding <=> query_embedding) > similarity_threshold
ORDER BY d.embedding <=> query_embedding
LIMIT limit_per_query;
query_idx := query_idx + 1;
END LOOP;
END;
$$ LANGUAGE plpgsql;
Performance Optimization
For production deployments, consider these optimization strategies:
class OptimizedPgVectorRAG(PgVectorRAG):
def __init__(self, db_url: str, openai_api_key: str, cache_size: int = 1000):
super().__init__(db_url, openai_api_key)
self.cache_size = cache_size
self.embedding_cache = {}
self.response_cache = {}
async def _generate_embedding_cached(self, text: str) -> List[float]:
"""Generate embedding with caching"""
# Simple hash-based cache key
cache_key = hash(text) % self.cache_size
if cache_key in self.embedding_cache:
return self.embedding_cache[cache_key]
embedding = await self._generate_embedding(text)
self.embedding_cache[cache_key] = embedding
return embedding
async def search_with_caching(
self,
query: str,
top_k: int = 5,
use_cache: bool = True
) -> List[SearchResult]:
"""Search with response caching"""
if use_cache:
cache_key = f"{hash(query)}_{top_k}"
if cache_key in self.response_cache:
return self.response_cache[cache_key]
results = await self.search(query, top_k)
if use_cache:
self.response_cache[cache_key] = results
return results
async def batch_search(
self,
queries: List[str],
top_k: int = 5
) -> List[List[SearchResult]]:
"""Perform batch search for multiple queries"""
# Generate embeddings for all queries
embeddings = []
for query in queries:
embedding = await self._generate_embedding_cached(query)
embeddings.append(embedding)
# Use batch similarity search
async with self.pool.acquire() as conn:
# Convert embeddings to PostgreSQL array format
embedding_array = f"ARRAY{embeddings}"
rows = await conn.fetch(
f"SELECT * FROM batch_similarity_search({embedding_array}, 0.7, {top_k})"
)
# Group results by query
results_by_query = [[] for _ in queries]
for row in rows:
query_idx = row['query_index'] - 1
if query_idx < len(queries):
results_by_query[query_idx].append(SearchResult(
id=row['doc_id'],
content=row['content'],
metadata={},
similarity=float(row['similarity'])
))
return results_by_query
async def get_performance_metrics(self) -> Dict[str, Any]:
"""Get performance metrics for the RAG system"""
async with self.pool.acquire() as conn:
# Index usage statistics
index_stats = await conn.fetch("""
SELECT schemaname, tablename, indexname, idx_scan, idx_tup_read, idx_tup_fetch
FROM pg_stat_user_indexes
WHERE indexname LIKE '%embedding%'
""")
# Query performance statistics
query_stats = await conn.fetch("""
SELECT query, calls, total_time, mean_time
FROM pg_stat_statements
WHERE query LIKE '%embedding%'
ORDER BY total_time DESC
LIMIT 10
""")
return {
"index_statistics": [dict(row) for row in index_stats],
"query_statistics": [dict(row) for row in query_stats],
"cache_hit_rate": len(self.embedding_cache) / self.cache_size if self.cache_size > 0 else 0
}
## Scaling RAG Pipelines
As your RAG system grows, you'll need to address scalability challenges across multiple dimensions: data volume, query throughput, and system complexity.
### Horizontal Scaling with Sharding
For large-scale deployments, consider sharding your vector database across multiple instances:
```python
import hashlib
from typing import List, Dict, Any
import asyncio
class ShardedVectorDB:
def __init__(self, shard_configs: List[Dict[str, str]]):
"""
Initialize sharded vector database
shard_configs: List of database connection strings for each shard
"""
self.shards = []
self.shard_count = len(shard_configs)
# Initialize connections to all shards
for config in shard_configs:
shard = PgVectorRAG(config['db_url'], config['openai_api_key'])
self.shards.append(shard)
async def connect_all(self):
"""Connect to all shards"""
await asyncio.gather(*[shard.connect() for shard in self.shards])
async def close_all(self):
"""Close all shard connections"""
await asyncio.gather(*[shard.close() for shard in self.shards])
def _get_shard_for_document(self, document_id: str) -> int:
"""Determine which shard to use for a document"""
hash_value = int(hashlib.md5(document_id.encode()).hexdigest(), 16)
return hash_value % self.shard_count
async def add_documents(self, documents: List[Document]) -> List[int]:
"""Add documents to appropriate shards"""
# Group documents by shard
shard_docs = [[] for _ in range(self.shard_count)]
for doc in documents:
shard_idx = self._get_shard_for_document(doc.content)
shard_docs[shard_idx].append(doc)
# Add documents to each shard
all_ids = []
for shard_idx, docs in enumerate(shard_docs):
if docs:
ids = await self.shards[shard_idx].add_documents(docs)
all_ids.extend(ids)
return all_ids
async def search(self, query: str, top_k: int = 5) -> List[SearchResult]:
"""Search across all shards and merge results"""
# Search all shards in parallel
search_tasks = [
shard.search(query, top_k=top_k)
for shard in self.shards
]
all_results = await asyncio.gather(*search_tasks)
# Merge and sort results
merged_results = []
for results in all_results:
merged_results.extend(results)
# Sort by similarity and return top_k
merged_results.sort(key=lambda x: x.similarity, reverse=True)
return merged_results[:top_k]
# Example usage
shard_configs = [
{"db_url": "postgresql://user:pass@shard1/ragdb", "openai_api_key": "key1"},
{"db_url": "postgresql://user:pass@shard2/ragdb", "openai_api_key": "key2"},
{"db_url": "postgresql://user:pass@shard3/ragdb", "openai_api_key": "key3"}
]
sharded_db = ShardedVectorDB(shard_configs)
# await sharded_db.connect_all()
Hybrid Search Strategies
Combine vector search with traditional text search for better results:
from rank_bm25 import BM25Okapi
import re
class HybridSearchRAG:
def __init__(self, vector_db: PgVectorRAG):
self.vector_db = vector_db
self.bm25_index = None
self.documents = []
async def build_bm25_index(self, documents: List[Document]):
"""Build BM25 index for keyword search"""
self.documents = documents
# Tokenize documents
tokenized_docs = []
for doc in documents:
tokens = self._tokenize(doc.content)
tokenized_docs.append(tokens)
# Build BM25 index
self.bm25_index = BM25Okapi(tokenized_docs)
def _tokenize(self, text: str) -> List[str]:
"""Simple tokenization"""
return re.findall(r'\w+', text.lower())
async def hybrid_search(
self,
query: str,
top_k: int = 5,
vector_weight: float = 0.7,
keyword_weight: float = 0.3
) -> List[SearchResult]:
"""Perform hybrid search combining vector and keyword search"""
# Vector search
vector_results = await self.vector_db.search(query, top_k=top_k * 2)
# Keyword search
query_tokens = self._tokenize(query)
keyword_scores = self.bm25_index.get_scores(query_tokens)
# Create keyword results
keyword_results = []
for i, score in enumerate(keyword_scores):
if score > 0:
keyword_results.append(SearchResult(
id=self.documents[i].id,
content=self.documents[i].content,
metadata=self.documents[i].metadata,
similarity=score
))
# Sort keyword results by score
keyword_results.sort(key=lambda x: x.similarity, reverse=True)
keyword_results = keyword_results[:top_k * 2]
# Combine results using reciprocal rank fusion
combined = self._reciprocal_rank_fusion(
vector_results,
keyword_results,
vector_weight=vector_weight,
keyword_weight=keyword_weight
)
return combined[:top_k]
def _reciprocal_rank_fusion(
self,
vector_results: List[SearchResult],
keyword_results: List[SearchResult],
vector_weight: float = 0.7,
keyword_weight: float = 0.3
) -> List[SearchResult]:
"""Combine results using weighted reciprocal rank fusion"""
# Create mapping from doc_id to combined score
doc_scores = {}
# Add vector search scores
for i, result in enumerate(vector_results):
doc_id = result.id
if doc_id not in doc_scores:
doc_scores[doc_id] = {
'doc': result,
'score': 0.0,
'vector_rank': i + 1,
'keyword_rank': None
}
doc_scores[doc_id]['score'] += vector_weight / (i + 1)
# Add keyword search scores
for i, result in enumerate(keyword_results):
doc_id = result.id
if doc_id not in doc_scores:
doc_scores[doc_id] = {
'doc': result,
'score': 0.0,
'vector_rank': None,
'keyword_rank': i + 1
}
else:
doc_scores[doc_id]['keyword_rank'] = i + 1
doc_scores[doc_id]['score'] += keyword_weight / (i + 1)
# Sort by combined score
combined = list(doc_scores.values())
combined.sort(key=lambda x: x['score'], reverse=True)
return [item['doc'] for item in combined]
Caching and Performance Optimization
Implement comprehensive caching strategies for production systems:
import redis
import pickle
import time
from functools import wraps
class CachedRAG:
def __init__(self, rag_system: PgVectorRAG, redis_url: str = "redis://localhost:6379"):
self.rag = rag_system
self.redis_client = redis.from_url(redis_url)
self.cache_ttl = 3600 # 1 hour
async def cached_search(self, query: str, top_k: int = 5) -> List[SearchResult]:
"""Search with caching"""
cache_key = f"search:{hash(query)}_{top_k}"
# Try to get from cache
cached_result = self.redis_client.get(cache_key)
if cached_result:
return pickle.loads(cached_result)
# Perform search
results = await self.rag.search(query, top_k)
# Cache results
self.redis_client.setex(
cache_key,
self.cache_ttl,
pickle.dumps(results)
)
return results
async def cached_generate_response(
self,
query: str,
search_results: List[SearchResult]
) -> Dict[str, Any]:
"""Generate response with caching"""
# Create cache key based on query and top result IDs
top_result_ids = [r.id for r in search_results[:3]]
cache_key = f"response:{hash(query)}_{hash(tuple(top_result_ids))}"
# Try to get from cache
cached_result = self.redis_client.get(cache_key)
if cached_result:
return pickle.loads(cached_result)
# Generate response
response = await self.rag.generate_response(query, search_results)
# Cache response
self.redis_client.setex(
cache_key,
self.cache_ttl,
pickle.dumps(response)
)
return response
async def batch_process_with_caching(
self,
queries: List[str],
top_k: int = 5
) -> List[Dict[str, Any]]:
"""Process multiple queries with intelligent caching"""
results = []
for query in queries:
# Check if we have a cached response first
cache_key = f"full_response:{hash(query)}_{top_k}"
cached_response = self.redis_client.get(cache_key)
if cached_response:
results.append(pickle.loads(cached_response))
else:
# Perform full RAG pipeline
search_results = await self.cached_search(query, top_k)
response = await self.cached_generate_response(query, search_results)
# Cache full response
self.redis_client.setex(
cache_key,
self.cache_ttl,
pickle.dumps(response)
)
results.append(response)
return results
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics"""
info = self.redis_client.info()
return {
"total_connections_received": info.get("total_connections_received", 0),
"total_commands_processed": info.get("total_commands_processed", 0),
"keyspace_hits": info.get("keyspace_hits", 0),
"keyspace_misses": info.get("keyspace_misses", 0),
"used_memory_human": info.get("used_memory_human", "0B"),
"hit_rate": info.get("keyspace_hits", 0) / max(info.get("keyspace_hits", 0) + info.get("keyspace_misses", 0), 1)
}
# Example usage
# cached_rag = CachedRAG(rag_system, redis_url="redis://localhost:6379")
# results = await cached_rag.batch_process_with_caching(["What are vector databases?", "How does RAG work?"])
Monitoring and Cost Optimization
Implement comprehensive monitoring for production RAG systems:
import time
import logging
from dataclasses import dataclass
from typing import Dict, Any, List
import asyncio
@dataclass
class RAGMetrics:
query_latency: float
embedding_latency: float
search_latency: float
generation_latency: float
tokens_used: int
cost_estimate: float
cache_hit: bool
result_count: int
class RAGMonitor:
def __init__(self):
self.metrics_history: List[RAGMetrics] = []
self.logger = logging.getLogger(__name__)
async def monitor_rag_call(self, func, *args, **kwargs):
"""Monitor a RAG function call and collect metrics"""
start_time = time.time()
try:
# Track embedding generation time
embedding_start = time.time()
# ... embedding generation code ...
embedding_latency = time.time() - embedding_start
# Track search time
search_start = time.time()
# ... search code ...
search_latency = time.time() - search_start
# Track generation time
generation_start = time.time()
result = await func(*args, **kwargs)
generation_latency = time.time() - generation_start
# Calculate total latency
total_latency = time.time() - start_time
# Estimate costs (example rates)
embedding_cost = 0.0001 # per 1K tokens
generation_cost = 0.03 # per 1K tokens
metrics = RAGMetrics(
query_latency=total_latency,
embedding_latency=embedding_latency,
search_latency=search_latency,
generation_latency=generation_latency,
tokens_used=result.get('tokens_used', 0),
cost_estimate=result.get('cost_estimate', 0.0),
cache_hit=result.get('cache_hit', False),
result_count=len(result.get('sources', []))
)
self.metrics_history.append(metrics)
# Log metrics
self.logger.info(f"RAG Query - Latency: {total_latency:.3f}s, Cost: ${metrics.cost_estimate:.4f}")
return result
except Exception as e:
self.logger.error(f"RAG query failed: {e}")
raise
def get_performance_summary(self) -> Dict[str, Any]:
"""Get performance summary from collected metrics"""
if not self.metrics_history:
return {}
latencies = [m.query_latency for m in self.metrics_history]
costs = [m.cost_estimate for m in self.metrics_history]
cache_hits = [m.cache_hit for m in self.metrics_history]
return {
"total_queries": len(self.metrics_history),
"average_latency": sum(latencies) / len(latencies),
"p95_latency": sorted(latencies)[int(len(latencies) * 0.95)],
"p99_latency": sorted(latencies)[int(len(latencies) * 0.99)],
"total_cost": sum(costs),
"average_cost_per_query": sum(costs) / len(costs),
"cache_hit_rate": sum(cache_hits) / len(cache_hits),
"average_results_per_query": sum(m.result_count for m in self.metrics_history) / len(self.metrics_history)
}
def generate_cost_report(self) -> Dict[str, Any]:
"""Generate detailed cost analysis"""
if not self.metrics_history:
return {}
total_embedding_cost = sum(m.cost_estimate * 0.3 for m in self.metrics_history) # Estimate 30% for embeddings
total_generation_cost = sum(m.cost_estimate * 0.7 for m in self.metrics_history) # Estimate 70% for generation
return {
"total_cost": sum(m.cost_estimate for m in self.metrics_history),
"embedding_cost": total_embedding_cost,
"generation_cost": total_generation_cost,
"cost_per_query": sum(m.cost_estimate for m in self.metrics_history) / len(self.metrics_history),
"estimated_monthly_cost": sum(m.cost_estimate for m in self.metrics_history) * 30, # Assuming daily usage
"cost_breakdown": {
"embeddings": f"{total_embedding_cost / sum(m.cost_estimate for m in self.metrics_history) * 100:.1f}%",
"generation": f"{total_generation_cost / sum(m.cost_estimate for m in self.metrics_history) * 100:.1f}%"
}
}
# Example usage
# monitor = RAGMonitor()
# result = await monitor.monitor_rag_call(rag_system.query, "What are vector databases?")
# performance_summary = monitor.get_performance_summary()
# cost_report = monitor.generate_cost_report()
Advanced Use Cases
RAG systems are incredibly versatile and can be applied to a wide range of advanced use cases beyond simple question answering.
Multi-Modal RAG
Extend RAG to handle images, audio, and other data types:
from PIL import Image
import torch
from transformers import CLIPProcessor, CLIPModel
import numpy as np
class MultiModalRAG:
def __init__(self, vector_db: PgVectorRAG):
self.vector_db = vector_db
self.clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
self.clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
def encode_image(self, image_path: str) -> List[float]:
"""Encode image to vector representation"""
image = Image.open(image_path)
inputs = self.clip_processor(images=image, return_tensors="pt")
with torch.no_grad():
image_features = self.clip_model.get_image_features(**inputs)
return image_features.numpy().flatten().tolist()
def encode_text(self, text: str) -> List[float]:
"""Encode text to vector representation"""
inputs = self.clip_processor(text=text, return_tensors="pt", padding=True)
with torch.no_grad():
text_features = self.clip_model.get_text_features(**inputs)
return text_features.numpy().flatten().tolist()
async def search_by_image(self, image_path: str, top_k: int = 5) -> List[SearchResult]:
"""Search for documents similar to an image"""
image_embedding = self.encode_image(image_path)
# Search in vector database
results = await self.vector_db.search_by_embedding(image_embedding, top_k)
return results
async def search_by_text_and_image(
self,
text: str,
image_path: str,
top_k: int = 5
) -> List[SearchResult]:
"""Search using both text and image"""
text_embedding = self.encode_text(text)
image_embedding = self.encode_image(image_path)
# Combine embeddings (simple average)
combined_embedding = [
(t + i) / 2 for t, i in zip(text_embedding, image_embedding)
]
results = await self.vector_db.search_by_embedding(combined_embedding, top_k)
return results
# Example usage
# multimodal_rag = MultiModalRAG(vector_db)
# results = await multimodal_rag.search_by_image("product_image.jpg")
# combined_results = await multimodal_rag.search_by_text_and_image("red shoes", "shoe_image.jpg")
Domain-Specific Knowledge Bases
Build specialized RAG systems for specific domains:
class MedicalRAG:
def __init__(self, vector_db: PgVectorRAG):
self.vector_db = vector_db
self.medical_terms = self._load_medical_terminology()
def _load_medical_terminology(self) -> Dict[str, str]:
"""Load medical terminology for query expansion"""
return {
"heart attack": "myocardial infarction",
"high blood pressure": "hypertension",
"diabetes": "diabetes mellitus",
# Add more medical term mappings
}
def expand_medical_query(self, query: str) -> str:
"""Expand query with medical terminology"""
expanded_query = query
for term, medical_term in self.medical_terms.items():
if term.lower() in query.lower():
expanded_query += f" OR {medical_term}"
return expanded_query
async def medical_search(self, query: str, top_k: int = 5) -> List[SearchResult]:
"""Search with medical query expansion"""
expanded_query = self.expand_medical_query(query)
results = await self.vector_db.search(expanded_query, top_k)
# Filter results for medical relevance
medical_results = [
result for result in results
if self._is_medically_relevant(result.content)
]
return medical_results[:top_k]
def _is_medically_relevant(self, content: str) -> bool:
"""Check if content is medically relevant"""
medical_keywords = [
"diagnosis", "treatment", "symptoms", "patient", "clinical",
"medical", "health", "disease", "condition", "therapy"
]
content_lower = content.lower()
return any(keyword in content_lower for keyword in medical_keywords)
class LegalRAG:
def __init__(self, vector_db: PgVectorRAG):
self.vector_db = vector_db
self.jurisdiction = "US" # Default jurisdiction
async def legal_search(
self,
query: str,
jurisdiction: str = None,
case_law_only: bool = False,
top_k: int = 5
) -> List[SearchResult]:
"""Search legal documents with jurisdiction filtering"""
# Add jurisdiction filter
filters = {}
if jurisdiction:
filters["jurisdiction"] = jurisdiction
elif self.jurisdiction:
filters["jurisdiction"] = self.jurisdiction
if case_law_only:
filters["document_type"] = "case_law"
results = await self.vector_db.search(query, top_k, filters=filters)
return results
async def cite_check(self, text: str) -> List[Dict[str, Any]]:
"""Check citations in legal text"""
# Extract potential citations
citations = self._extract_citations(text)
# Verify citations against knowledge base
verified_citations = []
for citation in citations:
search_results = await self.vector_db.search(citation, top_k=1)
if search_results:
verified_citations.append({
"citation": citation,
"verified": True,
"source": search_results[0].content[:200]
})
else:
verified_citations.append({
"citation": citation,
"verified": False,
"source": None
})
return verified_citations
def _extract_citations(self, text: str) -> List[str]:
"""Extract legal citations from text"""
import re
# Common legal citation patterns
patterns = [
r'\d+ U\.S\. \d+', # Supreme Court cases
r'\d+ F\.\d+ \d+', # Federal cases
r'\d+ S\.Ct\. \d+', # Supreme Court Reporter
r'\d+ L\.Ed\.\d+ \d+', # Lawyers' Edition
]
citations = []
for pattern in patterns:
matches = re.findall(pattern, text)
citations.extend(matches)
return citations
# Example usage
# medical_rag = MedicalRAG(vector_db)
# medical_results = await medical_rag.medical_search("What are the symptoms of diabetes?")
# legal_rag = LegalRAG(vector_db)
# legal_results = await legal_rag.legal_search("contract breach", jurisdiction="California")
# citations = await legal_rag.cite_check("The court held in Smith v. Jones, 123 U.S. 456...")
Enterprise Search
Build comprehensive enterprise search systems:
class EnterpriseRAG:
def __init__(self, vector_db: PgVectorRAG):
self.vector_db = vector_db
self.departments = ["engineering", "marketing", "sales", "hr", "finance"]
self.access_control = self._load_access_control()
def _load_access_control(self) -> Dict[str, List[str]]:
"""Load user access permissions"""
return {
"user1": ["engineering", "marketing"],
"user2": ["sales", "finance"],
"admin": self.departments
}
async def enterprise_search(
self,
query: str,
user_id: str,
departments: List[str] = None,
document_types: List[str] = None,
date_range: tuple = None,
top_k: int = 10
) -> List[SearchResult]:
"""Search with enterprise access controls"""
# Get user permissions
user_permissions = self.access_control.get(user_id, [])
# Build filters
filters = {}
if departments:
# Filter by requested departments (must be in user permissions)
allowed_departments = [d for d in departments if d in user_permissions]
if allowed_departments:
filters["department"] = allowed_departments
else:
# Use all user permissions
filters["department"] = user_permissions
if document_types:
filters["document_type"] = document_types
if date_range:
start_date, end_date = date_range
filters["date_range"] = f"{start_date} TO {end_date}"
# Perform search
results = await self.vector_db.search(query, top_k, filters=filters)
# Add access control metadata
for result in results:
result.metadata["access_level"] = "user"
result.metadata["departments"] = filters.get("department", [])
return results
async def search_across_sources(
self,
query: str,
sources: List[str],
user_id: str
) -> Dict[str, List[SearchResult]]:
"""Search across multiple data sources"""
results_by_source = {}
for source in sources:
# Check if user has access to this source
if self._user_has_access(user_id, source):
source_results = await self.vector_db.search(
query,
top_k=5,
filters={"source": source}
)
results_by_source[source] = source_results
return results_by_source
def _user_has_access(self, user_id: str, source: str) -> bool:
"""Check if user has access to a specific source"""
user_permissions = self.access_control.get(user_id, [])
return source in user_permissions or "admin" in user_permissions
async def generate_enterprise_report(
self,
query: str,
user_id: str,
report_type: str = "summary"
) -> Dict[str, Any]:
"""Generate enterprise report from search results"""
# Get search results
results = await self.enterprise_search(query, user_id, top_k=20)
if report_type == "summary":
return self._generate_summary_report(results)
elif report_type == "detailed":
return self._generate_detailed_report(results)
elif report_type == "analytics":
return self._generate_analytics_report(results)
else:
raise ValueError(f"Unknown report type: {report_type}")
def _generate_summary_report(self, results: List[SearchResult]) -> Dict[str, Any]:
"""Generate summary report"""
departments = {}
document_types = {}
for result in results:
dept = result.metadata.get("department", "unknown")
doc_type = result.metadata.get("document_type", "unknown")
departments[dept] = departments.get(dept, 0) + 1
document_types[doc_type] = document_types.get(doc_type, 0) + 1
return {
"total_results": len(results),
"departments": departments,
"document_types": document_types,
"average_similarity": sum(r.similarity for r in results) / len(results) if results else 0,
"top_results": [
{
"content": r.content[:200] + "...",
"department": r.metadata.get("department"),
"similarity": r.similarity
}
for r in results[:5]
]
}
def _generate_detailed_report(self, results: List[SearchResult]) -> Dict[str, Any]:
"""Generate detailed report"""
return {
"total_results": len(results),
"results": [
{
"id": r.id,
"content": r.content,
"metadata": r.metadata,
"similarity": r.similarity
}
for r in results
]
}
def _generate_analytics_report(self, results: List[SearchResult]) -> Dict[str, Any]:
"""Generate analytics report"""
# Calculate various analytics
similarities = [r.similarity for r in results]
return {
"total_results": len(results),
"similarity_stats": {
"mean": sum(similarities) / len(similarities) if similarities else 0,
"median": sorted(similarities)[len(similarities)//2] if similarities else 0,
"min": min(similarities) if similarities else 0,
"max": max(similarities) if similarities else 0
},
"department_distribution": self._get_distribution(results, "department"),
"document_type_distribution": self._get_distribution(results, "document_type"),
"date_distribution": self._get_date_distribution(results)
}
def _get_distribution(self, results: List[SearchResult], key: str) -> Dict[str, int]:
"""Get distribution of a metadata key"""
distribution = {}
for result in results:
value = result.metadata.get(key, "unknown")
distribution[value] = distribution.get(value, 0) + 1
return distribution
def _get_date_distribution(self, results: List[SearchResult]) -> Dict[str, int]:
"""Get date distribution of results"""
date_dist = {}
for result in results:
date = result.metadata.get("created_date", "unknown")
if date != "unknown":
year_month = date[:7] # YYYY-MM
date_dist[year_month] = date_dist.get(year_month, 0) + 1
return date_dist
# Example usage
# enterprise_rag = EnterpriseRAG(vector_db)
# results = await enterprise_rag.enterprise_search(
# "Q4 sales performance",
# user_id="user1",
# departments=["sales", "marketing"]
# )
# report = await enterprise_rag.generate_enterprise_report(
# "Q4 sales performance",
# user_id="user1",
# report_type="analytics"
# )
Conclusion
The landscape of AI applications is rapidly evolving, and RAG systems with vector databases are at the forefront of this transformation. As we’ve explored throughout this comprehensive guide, building production-ready RAG pipelines requires careful consideration of multiple components: from choosing the right vector database to implementing efficient retrieval strategies and scaling for enterprise use.
The Future of RAG Systems
The future of RAG systems is incredibly promising, with several exciting developments on the horizon:
AI Agents and Autonomous Systems: RAG systems are evolving into intelligent agents that can not only retrieve and generate information but also take actions based on that information. These agents will be able to interact with multiple data sources, make decisions, and execute tasks autonomously.
Self-Improving Indexes: Future RAG systems will feature self-improving vector indexes that automatically optimize themselves based on user interactions, query patterns, and feedback. This will lead to continuously improving search quality and relevance.
Multimodal RAG: The integration of text, images, audio, and video in RAG systems will become more sophisticated, enabling truly multimodal AI applications that can understand and reason across different types of content.
Real-Time Learning: RAG systems will increasingly incorporate real-time learning capabilities, allowing them to adapt to new information as it becomes available without requiring complete retraining.
Federated RAG: Organizations will deploy federated RAG systems that can search across multiple distributed knowledge bases while maintaining data privacy and security.
Getting Started with RAG
If you’re ready to implement RAG in your organization, start with these steps:
- Assess Your Data: Identify the knowledge sources that would benefit from RAG integration
- Choose Your Vector Database: Select a vector database that fits your scale, budget, and technical requirements
- Start Small: Begin with a focused use case and expand gradually
- Measure and Iterate: Continuously monitor performance and user feedback
- Scale Thoughtfully: Plan for growth from the beginning
Key Takeaways
- RAG solves the hallucination problem by grounding AI responses in retrieved facts
- Vector databases are essential for efficient semantic search at scale
- Production RAG requires careful architecture including caching, monitoring, and optimization
- Hybrid search strategies combine the best of vector and keyword search
- Domain-specific RAG systems can provide specialized capabilities for different industries
- Enterprise RAG systems require careful attention to access control and governance
The combination of vector databases and RAG pipelines represents a fundamental shift in how we build AI applications. By providing accurate, up-to-date, and verifiable information, RAG systems enable AI applications that are not just impressive but truly useful and trustworthy.
As the technology continues to mature, organizations that successfully implement RAG systems will find themselves with a significant competitive advantage—the ability to build AI applications that can access and reason about their specific knowledge while maintaining the natural language capabilities that make AI so powerful.
The future of AI is not just about bigger models—it’s about smarter systems that can access the right information at the right time. RAG with vector databases is the foundation upon which this future is being built.
Join the Discussion
Have thoughts on this article? Share your insights and engage with the community.