Vector Databases + RAG Pipelines: Building Production-Ready AI Applications

aimachine-learningvector-databasesragretrieval-augmented-generationembeddingspgvectorpineconeweaviatemilvuspostgresqlllmopenaipython

Introduction

The AI landscape in 2025 is dominated by Large Language Models (LLMs) that can generate human-like text, but these models have a critical limitation: they can only work with the information they were trained on, and they often “hallucinate” or make up facts when asked about topics outside their training data. This is where Retrieval-Augmented Generation (RAG) comes in as a game-changing solution.

The Hallucination Problem and RAG Solution

Traditional LLMs like GPT-4, Claude, and others are incredibly powerful at generating coherent text, but they suffer from several fundamental limitations:

  • Knowledge Cutoff: They can only access information from their training data, which has a fixed cutoff date
  • Hallucination: They often generate plausible-sounding but incorrect information when they don’t know the answer
  • Lack of Citations: They can’t provide sources or references for their claims
  • Static Knowledge: They can’t access real-time or domain-specific information

RAG solves these problems by combining the generative capabilities of LLMs with the retrieval capabilities of vector databases. Instead of relying solely on the model’s internal knowledge, RAG systems:

  1. Retrieve relevant information from a knowledge base using semantic search
  2. Augment the LLM’s context with this retrieved information
  3. Generate responses that are grounded in the retrieved facts

This approach ensures that AI applications can provide accurate, up-to-date, and verifiable information while maintaining the natural language generation capabilities that make LLMs so powerful.

The Critical Role of Vector Databases

Vector databases are the backbone of modern RAG systems. They store and index high-dimensional vector representations (embeddings) of text, images, and other data types, enabling lightning-fast similarity searches across massive datasets.

Unlike traditional databases that search for exact matches or simple keyword matches, vector databases can find semantically similar content even when the exact words don’t match. This is crucial for RAG systems because:

  • Semantic Understanding: They can find relevant documents even when the query uses different terminology
  • Scalability: They can handle millions of documents with sub-second query times
  • Flexibility: They support multiple data types (text, images, audio) in the same system
  • Real-time Updates: They can be updated incrementally as new information becomes available

The Production-Ready Challenge

While the concept of RAG is straightforward, building production-ready RAG systems presents significant challenges:

  • Latency Requirements: Users expect responses in seconds, not minutes
  • Accuracy Demands: The retrieved information must be highly relevant to the query
  • Scalability Needs: Systems must handle thousands of concurrent users
  • Cost Optimization: Embedding generation and storage can be expensive
  • Data Freshness: Information must stay current and accurate

In this comprehensive guide, we’ll explore how to build robust, scalable RAG pipelines using modern vector databases and best practices for production deployment.

How Vector Databases Work

To understand RAG systems, we need to dive deep into how vector databases work and why they’re essential for semantic search.

Understanding Embeddings

At the heart of vector databases are embeddings—numerical representations of text, images, or other data that capture semantic meaning. Embeddings are created by neural networks that have been trained to understand the relationships between different pieces of content.

# Example: Generating embeddings with OpenAI
import openai
import numpy as np

def generate_embedding(text):
    """Generate embedding for a piece of text using OpenAI's API"""
    response = openai.Embedding.create(
        model="text-embedding-ada-002",
        input=text
    )
    return response['data'][0]['embedding']

# Example usage
text = "Vector databases enable semantic search across large datasets"
embedding = generate_embedding(text)
print(f"Embedding dimension: {len(embedding)}")  # 1536 dimensions
print(f"Sample values: {embedding[:5]}")  # First 5 values

Embeddings have several key properties:

  • High Dimensionality: Modern embeddings typically have 384-1536 dimensions
  • Semantic Similarity: Similar concepts have similar embedding vectors
  • Mathematical Operations: You can perform vector operations like addition, subtraction, and similarity calculations
  • Language Agnostic: The same concepts in different languages have similar embeddings

Similarity Search and Distance Metrics

Vector databases use similarity search to find the most relevant documents for a query. This involves calculating the distance between the query embedding and all stored document embeddings.

import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

def calculate_similarity(query_embedding, document_embeddings):
    """Calculate cosine similarity between query and documents"""
    similarities = cosine_similarity([query_embedding], document_embeddings)[0]
    return similarities

def find_most_similar(query_embedding, document_embeddings, top_k=5):
    """Find the top-k most similar documents"""
    similarities = calculate_similarity(query_embedding, document_embeddings)
    top_indices = np.argsort(similarities)[::-1][:top_k]
    return [(i, similarities[i]) for i in top_indices]

# Example usage
query = "How do vector databases work?"
query_embedding = generate_embedding(query)

documents = [
    "Vector databases store high-dimensional embeddings for semantic search",
    "Machine learning models generate embeddings from text and images",
    "Traditional databases use exact matching while vector databases use similarity",
    "RAG systems combine retrieval with generation for better AI responses"
]

document_embeddings = [generate_embedding(doc) for doc in documents]
similar_docs = find_most_similar(query_embedding, document_embeddings, top_k=3)

for idx, similarity in similar_docs:
    print(f"Document {idx}: {documents[idx]} (similarity: {similarity:.3f})")

Common distance metrics include:

  • Cosine Similarity: Measures the angle between vectors (most common for text)
  • Euclidean Distance: Measures straight-line distance between points
  • Dot Product: Measures the magnitude of projection of one vector onto another
  • Manhattan Distance: Measures distance along grid lines

Approximate Nearest Neighbor (ANN) Indexes

Searching through millions of embeddings using exact similarity calculations would be prohibitively slow. Vector databases use Approximate Nearest Neighbor (ANN) indexes to trade perfect accuracy for dramatic speed improvements.

# Example: Using FAISS for efficient similarity search
import faiss
import numpy as np

class VectorIndex:
    def __init__(self, dimension=1536):
        # Create a FAISS index for cosine similarity
        self.index = faiss.IndexFlatIP(dimension)  # Inner product for cosine similarity
        self.documents = []
    
    def add_documents(self, embeddings, documents):
        """Add documents and their embeddings to the index"""
        # Normalize embeddings for cosine similarity
        embeddings_normalized = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
        self.index.add(embeddings_normalized.astype('float32'))
        self.documents.extend(documents)
    
    def search(self, query_embedding, k=5):
        """Search for the k most similar documents"""
        # Normalize query embedding
        query_normalized = query_embedding / np.linalg.norm(query_embedding)
        query_normalized = query_normalized.reshape(1, -1).astype('float32')
        
        # Search the index
        similarities, indices = self.index.search(query_normalized, k)
        
        results = []
        for i, (similarity, idx) in enumerate(zip(similarities[0], indices[0])):
            if idx < len(self.documents):  # Valid index
                results.append({
                    'document': self.documents[idx],
                    'similarity': float(similarity),
                    'rank': i + 1
                })
        
        return results

# Example usage
index = VectorIndex(dimension=1536)
documents = [
    "Vector databases enable semantic search across large datasets",
    "Machine learning models generate embeddings from text and images",
    "Traditional databases use exact matching while vector databases use similarity",
    "RAG systems combine retrieval with generation for better AI responses"
]

embeddings = np.array([generate_embedding(doc) for doc in documents])
index.add_documents(embeddings, documents)

query = "How do vector databases work?"
query_embedding = generate_embedding(query)
results = index.search(query_embedding, k=3)

for result in results:
    print(f"Rank {result['rank']}: {result['document']} (similarity: {result['similarity']:.3f})")

Vector Database Comparison

Different vector databases offer different trade-offs between performance, scalability, and ease of use:

PostgreSQL with pgvector

Pros: ACID compliance, SQL interface, mature ecosystem Cons: Limited scalability for very large datasets Best for: Applications requiring transactional consistency

-- Example: Setting up pgvector
CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE documents (
    id SERIAL PRIMARY KEY,
    content TEXT NOT NULL,
    embedding vector(1536),
    metadata JSONB
);

-- Create an index for similarity search
CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);

-- Insert a document with embedding
INSERT INTO documents (content, embedding, metadata)
VALUES (
    'Vector databases enable semantic search',
    '[0.1, 0.2, 0.3, ...]'::vector,
    '{"source": "article", "date": "2025-08-30"}'
);

-- Search for similar documents
SELECT content, embedding <=> '[0.1, 0.2, 0.3, ...]'::vector as distance
FROM documents
ORDER BY embedding <=> '[0.1, 0.2, 0.3, ...]'::vector
LIMIT 5;

Pinecone

Pros: Managed service, excellent scalability, real-time updates Cons: Vendor lock-in, cost at scale Best for: Rapid prototyping and production deployments

import pinecone

# Initialize Pinecone
pinecone.init(api_key="your-api-key", environment="us-west1-gcp")

# Create an index
pinecone.create_index(
    name="documents",
    dimension=1536,
    metric="cosine"
)

# Get the index
index = pinecone.Index("documents")

# Upsert vectors
vectors = [
    ("doc1", [0.1, 0.2, 0.3, ...], {"content": "Vector databases enable semantic search"}),
    ("doc2", [0.2, 0.3, 0.4, ...], {"content": "Machine learning models generate embeddings"})
]
index.upsert(vectors=vectors)

# Query the index
results = index.query(
    vector=[0.1, 0.2, 0.3, ...],
    top_k=5,
    include_metadata=True
)

Weaviate

Pros: GraphQL interface, multi-modal support, schema flexibility Cons: Steeper learning curve, smaller community Best for: Complex data relationships and multi-modal applications

import weaviate
from weaviate.classes.init import Auth

# Initialize Weaviate client
client = weaviate.connect_to_wcs(
    cluster_url="your-cluster-url",
    auth_credentials=Auth.api_key("your-api-key")
)

# Create a collection
client.collections.create(
    name="Document",
    properties=[
        weaviate.classes.config.Property(name="content", data_type=weaviate.classes.config.DataType.TEXT),
        weaviate.classes.config.Property(name="source", data_type=weaviate.classes.config.DataType.TEXT)
    ],
    vectorizer_config=weaviate.classes.config.Configure.Vectorizer.text2vec_openai()
)

# Add documents
documents = client.collections.get("Document")
documents.data.insert_many([
    {"content": "Vector databases enable semantic search", "source": "article"},
    {"content": "Machine learning models generate embeddings", "source": "tutorial"}
])

# Search documents
response = documents.query.near_text(
    query="How do vector databases work?",
    limit=5
)

Milvus

Pros: High performance, open source, excellent scalability Cons: Complex setup, requires infrastructure management Best for: Large-scale production deployments with custom infrastructure

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility

# Connect to Milvus
connections.connect("default", host="localhost", port="19530")

# Define collection schema
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536)
]
schema = CollectionSchema(fields, "documents")

# Create collection
collection = Collection("documents", schema)

# Create index
index_params = {
    "metric_type": "COSINE",
    "index_type": "IVF_FLAT",
    "params": {"nlist": 1024}
}
collection.create_index("embedding", index_params)

# Insert data
data = [
    ["Vector databases enable semantic search"],
    ["Machine learning models generate embeddings"],
    [[0.1, 0.2, 0.3, ...], [0.2, 0.3, 0.4, ...]]  # embeddings
]
collection.insert(data)

# Search
search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}}
results = collection.search(
    data=[[0.1, 0.2, 0.3, ...]],
    anns_field="embedding",
    param=search_params,
    limit=5,
    output_fields=["content"]
)

Architecture of a RAG Pipeline

Building a production-ready RAG system requires careful consideration of each component in the pipeline. Let’s explore the complete architecture and how each piece works together.

High-Level RAG Architecture

A typical RAG pipeline consists of five main components:

  1. Data Ingestion: Collecting and preprocessing documents
  2. Embedding Generation: Converting text into vector representations
  3. Vector Storage: Storing embeddings in a vector database
  4. Retrieval: Finding relevant documents for a query
  5. Generation: Using retrieved context to generate responses
# Complete RAG Pipeline Architecture
import asyncio
from typing import List, Dict, Any
from dataclasses import dataclass

@dataclass
class Document:
    id: str
    content: str
    metadata: Dict[str, Any]
    embedding: List[float] = None

@dataclass
class Query:
    text: str
    embedding: List[float] = None
    filters: Dict[str, Any] = None

@dataclass
class RAGResponse:
    answer: str
    sources: List[Document]
    confidence: float
    metadata: Dict[str, Any]

class RAGPipeline:
    def __init__(self, embedding_model, vector_db, llm):
        self.embedding_model = embedding_model
        self.vector_db = vector_db
        self.llm = llm
    
    async def ingest_documents(self, documents: List[Document]) -> None:
        """Ingest documents into the RAG system"""
        # Generate embeddings for all documents
        for doc in documents:
            doc.embedding = await self.embedding_model.embed(doc.content)
        
        # Store in vector database
        await self.vector_db.add_documents(documents)
    
    async def query(self, query: Query) -> RAGResponse:
        """Process a query through the RAG pipeline"""
        # Generate embedding for query
        query.embedding = await self.embedding_model.embed(query.text)
        
        # Retrieve relevant documents
        retrieved_docs = await self.vector_db.search(
            query.embedding, 
            top_k=5,
            filters=query.filters
        )
        
        # Construct context from retrieved documents
        context = self._build_context(retrieved_docs)
        
        # Generate response using LLM
        prompt = self._create_prompt(query.text, context)
        answer = await self.llm.generate(prompt)
        
        return RAGResponse(
            answer=answer,
            sources=retrieved_docs,
            confidence=self._calculate_confidence(retrieved_docs),
            metadata={"context_length": len(context)}
        )
    
    def _build_context(self, documents: List[Document]) -> str:
        """Build context string from retrieved documents"""
        context_parts = []
        for i, doc in enumerate(documents, 1):
            context_parts.append(f"Document {i}:\n{doc.content}\n")
        return "\n".join(context_parts)
    
    def _create_prompt(self, query: str, context: str) -> str:
        """Create prompt for LLM with retrieved context"""
        return f"""Based on the following context, answer the question. If the context doesn't contain enough information to answer the question, say so.

Context:
{context}

Question: {query}

Answer:"""
    
    def _calculate_confidence(self, documents: List[Document]) -> float:
        """Calculate confidence score based on retrieved documents"""
        if not documents:
            return 0.0
        # Simple confidence based on number and relevance of retrieved documents
        return min(1.0, len(documents) / 3.0)

Data Ingestion and Preprocessing

The quality of your RAG system depends heavily on the quality of your ingested data. Proper preprocessing is crucial for good retrieval performance.

import re
from typing import List, Dict, Any
from dataclasses import dataclass
import hashlib

@dataclass
class ProcessedDocument:
    id: str
    content: str
    metadata: Dict[str, Any]
    chunks: List[str]

class DocumentProcessor:
    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
    
    def process_document(self, raw_content: str, metadata: Dict[str, Any] = None) -> ProcessedDocument:
        """Process a raw document into chunks suitable for embedding"""
        # Clean the content
        cleaned_content = self._clean_text(raw_content)
        
        # Generate document ID
        doc_id = self._generate_id(cleaned_content, metadata)
        
        # Split into chunks
        chunks = self._chunk_text(cleaned_content)
        
        return ProcessedDocument(
            id=doc_id,
            content=cleaned_content,
            metadata=metadata or {},
            chunks=chunks
        )
    
    def _clean_text(self, text: str) -> str:
        """Clean and normalize text"""
        # Remove extra whitespace
        text = re.sub(r'\s+', ' ', text)
        
        # Remove special characters but keep punctuation
        text = re.sub(r'[^\w\s\.\,\!\?\;\:\-\(\)]', '', text)
        
        # Normalize unicode
        text = text.encode('ascii', 'ignore').decode('ascii')
        
        return text.strip()
    
    def _generate_id(self, content: str, metadata: Dict[str, Any]) -> str:
        """Generate a unique ID for the document"""
        # Use metadata if available, otherwise hash the content
        if metadata and 'id' in metadata:
            return str(metadata['id'])
        
        # Create hash from content
        content_hash = hashlib.md5(content.encode()).hexdigest()
        return f"doc_{content_hash[:8]}"
    
    def _chunk_text(self, text: str) -> List[str]:
        """Split text into overlapping chunks"""
        chunks = []
        start = 0
        
        while start < len(text):
            # Find the end of the chunk
            end = start + self.chunk_size
            
            if end >= len(text):
                # Last chunk
                chunk = text[start:]
            else:
                # Try to break at sentence boundary
                last_period = text.rfind('.', start, end)
                last_exclamation = text.rfind('!', start, end)
                last_question = text.rfind('?', start, end)
                
                # Find the latest sentence boundary
                sentence_end = max(last_period, last_exclamation, last_question)
                
                if sentence_end > start:
                    end = sentence_end + 1
                
                chunk = text[start:end]
            
            chunks.append(chunk.strip())
            
            # Move start position for next chunk
            start = end - self.chunk_overlap
            if start >= len(text):
                break
        
        return chunks

# Example usage
processor = DocumentProcessor(chunk_size=1000, chunk_overlap=200)

raw_document = """
Vector databases are specialized databases designed to store and query high-dimensional vector data efficiently. 
They are particularly useful for machine learning applications, especially those involving similarity search and 
semantic understanding. Unlike traditional relational databases that store structured data in tables, vector 
databases store embeddings - numerical representations of data that capture semantic meaning.

The key advantage of vector databases is their ability to perform similarity searches quickly. When you have 
millions of documents, finding the most similar ones to a given query becomes computationally expensive with 
traditional methods. Vector databases use specialized indexing techniques like Approximate Nearest Neighbor (ANN) 
algorithms to make these searches fast and scalable.

Common use cases for vector databases include:
1. Semantic search engines
2. Recommendation systems
3. Image and video similarity search
4. Natural language processing applications
5. Anomaly detection systems
"""

processed_doc = processor.process_document(
    raw_document,
    metadata={"source": "article", "topic": "vector databases", "date": "2025-08-30"}
)

print(f"Document ID: {processed_doc.id}")
print(f"Number of chunks: {len(processed_doc.chunks)}")
for i, chunk in enumerate(processed_doc.chunks):
    print(f"Chunk {i+1}: {chunk[:100]}...")

Embedding Generation Strategies

Choosing the right embedding model and generation strategy is critical for RAG performance.

import asyncio
from typing import List, Dict, Any
import openai
from sentence_transformers import SentenceTransformer
import numpy as np

class EmbeddingGenerator:
    def __init__(self, model_name: str = "text-embedding-ada-002", batch_size: int = 100):
        self.model_name = model_name
        self.batch_size = batch_size
        
        # Initialize the embedding model
        if model_name.startswith("text-embedding-"):
            # OpenAI embedding model
            self.model_type = "openai"
            self.client = openai.AsyncOpenAI()
        else:
            # Local sentence transformer model
            self.model_type = "local"
            self.model = SentenceTransformer(model_name)
    
    async def embed_texts(self, texts: List[str]) -> List[List[float]]:
        """Generate embeddings for a list of texts"""
        if self.model_type == "openai":
            return await self._embed_with_openai(texts)
        else:
            return self._embed_with_local_model(texts)
    
    async def _embed_with_openai(self, texts: List[str]) -> List[List[float]]:
        """Generate embeddings using OpenAI API"""
        embeddings = []
        
        # Process in batches to avoid rate limits
        for i in range(0, len(texts), self.batch_size):
            batch = texts[i:i + self.batch_size]
            
            try:
                response = await self.client.embeddings.create(
                    model=self.model_name,
                    input=batch
                )
                
                batch_embeddings = [data.embedding for data in response.data]
                embeddings.extend(batch_embeddings)
                
                # Add small delay to respect rate limits
                await asyncio.sleep(0.1)
                
            except Exception as e:
                print(f"Error generating embeddings for batch {i}: {e}")
                # Return zero vectors for failed batches
                embeddings.extend([[0.0] * 1536] * len(batch))
        
        return embeddings
    
    def _embed_with_local_model(self, texts: List[str]) -> List[List[float]]:
        """Generate embeddings using local sentence transformer model"""
        embeddings = self.model.encode(texts, convert_to_tensor=False)
        return embeddings.tolist()
    
    def embed_single(self, text: str) -> List[float]:
        """Generate embedding for a single text"""
        if self.model_type == "openai":
            # For single embeddings, we can use the sync API
            response = openai.Embedding.create(
                model=self.model_name,
                input=text
            )
            return response['data'][0]['embedding']
        else:
            embedding = self.model.encode([text], convert_to_tensor=False)
            return embedding[0].tolist()

# Example usage with different embedding strategies
async def demonstrate_embedding_strategies():
    # Strategy 1: OpenAI embeddings (high quality, paid)
    openai_generator = EmbeddingGenerator("text-embedding-ada-002")
    
    # Strategy 2: Local embeddings (free, good quality)
    local_generator = EmbeddingGenerator("all-MiniLM-L6-v2")
    
    texts = [
        "Vector databases enable semantic search",
        "Machine learning models generate embeddings",
        "RAG systems combine retrieval with generation"
    ]
    
    # Generate embeddings
    openai_embeddings = await openai_generator.embed_texts(texts)
    local_embeddings = local_generator.embed_texts(texts)
    
    print(f"OpenAI embedding dimension: {len(openai_embeddings[0])}")
    print(f"Local embedding dimension: {len(local_embeddings[0])}")
    
    # Compare similarity between different embedding types
    from sklearn.metrics.pairwise import cosine_similarity
    
    # Calculate similarity between first and second text
    openai_sim = cosine_similarity([openai_embeddings[0]], [openai_embeddings[1]])[0][0]
    local_sim = cosine_similarity([local_embeddings[0]], [local_embeddings[1]])[0][0]
    
    print(f"OpenAI similarity: {openai_sim:.3f}")
    print(f"Local similarity: {local_sim:.3f}")

# Run the demonstration
# asyncio.run(demonstrate_embedding_strategies())

Retrieval Strategies

The retrieval component is where the magic happens. Different retrieval strategies can significantly impact RAG performance.

from typing import List, Dict, Any, Optional
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class RetrievalEngine:
    def __init__(self, vector_db, reranker=None):
        self.vector_db = vector_db
        self.reranker = reranker
    
    async def retrieve(
        self, 
        query_embedding: List[float], 
        top_k: int = 10,
        filters: Optional[Dict[str, Any]] = None,
        use_reranker: bool = False
    ) -> List[Dict[str, Any]]:
        """Retrieve relevant documents for a query"""
        
        # Initial retrieval with larger k if using reranker
        initial_k = top_k * 3 if use_reranker else top_k
        
        # Get initial candidates
        candidates = await self.vector_db.search(
            query_embedding,
            top_k=initial_k,
            filters=filters
        )
        
        if use_reranker and self.reranker:
            # Rerank candidates using a more sophisticated model
            reranked = await self._rerank_candidates(query_embedding, candidates)
            return reranked[:top_k]
        
        return candidates[:top_k]
    
    async def _rerank_candidates(
        self, 
        query_embedding: List[float], 
        candidates: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        """Rerank candidates using a cross-encoder model"""
        if not self.reranker:
            return candidates
        
        # Prepare pairs for reranking
        pairs = []
        for candidate in candidates:
            pairs.append((candidate['content'], candidate['content']))
        
        # Get reranker scores
        scores = await self.reranker.score(pairs)
        
        # Sort candidates by reranker scores
        scored_candidates = list(zip(candidates, scores))
        scored_candidates.sort(key=lambda x: x[1], reverse=True)
        
        # Return reranked candidates
        return [candidate for candidate, score in scored_candidates]
    
    async def hybrid_search(
        self,
        query: str,
        query_embedding: List[float],
        top_k: int = 10,
        alpha: float = 0.5
    ) -> List[Dict[str, Any]]:
        """Perform hybrid search combining vector and keyword search"""
        
        # Vector search
        vector_results = await self.vector_db.search(
            query_embedding,
            top_k=top_k
        )
        
        # Keyword search (BM25 or similar)
        keyword_results = await self.vector_db.keyword_search(
            query,
            top_k=top_k
        )
        
        # Combine results using reciprocal rank fusion
        combined = self._reciprocal_rank_fusion(
            vector_results, 
            keyword_results, 
            alpha=alpha
        )
        
        return combined[:top_k]
    
    def _reciprocal_rank_fusion(
        self, 
        results1: List[Dict[str, Any]], 
        results2: List[Dict[str, Any]], 
        alpha: float = 0.5
    ) -> List[Dict[str, Any]]:
        """Combine two result sets using reciprocal rank fusion"""
        
        # Create mapping from doc_id to rank
        rank_map = {}
        
        # Add ranks from first result set
        for i, result in enumerate(results1):
            doc_id = result['id']
            if doc_id not in rank_map:
                rank_map[doc_id] = {'doc': result, 'score': 0.0}
            rank_map[doc_id]['score'] += alpha / (i + 1)
        
        # Add ranks from second result set
        for i, result in enumerate(results2):
            doc_id = result['id']
            if doc_id not in rank_map:
                rank_map[doc_id] = {'doc': result, 'score': 0.0}
            rank_map[doc_id]['score'] += (1 - alpha) / (i + 1)
        
        # Sort by combined score
        combined = list(rank_map.values())
        combined.sort(key=lambda x: x['score'], reverse=True)
        
        return [item['doc'] for item in combined]

# Example usage
class MockVectorDB:
    async def search(self, embedding, top_k, filters=None):
        # Mock implementation
        return [
            {'id': f'doc_{i}', 'content': f'Document {i}', 'score': 0.9 - i*0.1}
            for i in range(top_k)
        ]
    
    async def keyword_search(self, query, top_k):
        # Mock implementation
        return [
            {'id': f'doc_{i}', 'content': f'Document {i}', 'score': 0.8 - i*0.1}
            for i in range(top_k)
        ]

# retrieval_engine = RetrievalEngine(MockVectorDB())
# results = await retrieval_engine.hybrid_search("vector databases", [0.1]*1536)

Generation and Response Synthesis

The final step in the RAG pipeline is generating a coherent response using the retrieved context.

from typing import List, Dict, Any
import openai
from dataclasses import dataclass

@dataclass
class GenerationConfig:
    model: str = "gpt-4"
    temperature: float = 0.1
    max_tokens: int = 1000
    system_prompt: str = None

class ResponseGenerator:
    def __init__(self, config: GenerationConfig):
        self.config = config
        self.client = openai.AsyncOpenAI()
    
    async def generate_response(
        self, 
        query: str, 
        context: List[Dict[str, Any]]
    ) -> Dict[str, Any]:
        """Generate a response using retrieved context"""
        
        # Build the prompt
        prompt = self._build_prompt(query, context)
        
        # Generate response
        response = await self.client.chat.completions.create(
            model=self.config.model,
            messages=[
                {"role": "system", "content": self.config.system_prompt or self._get_default_system_prompt()},
                {"role": "user", "content": prompt}
            ],
            temperature=self.config.temperature,
            max_tokens=self.config.max_tokens
        )
        
        answer = response.choices[0].message.content
        
        # Extract sources
        sources = self._extract_sources(context)
        
        return {
            "answer": answer,
            "sources": sources,
            "context_used": len(context),
            "model": self.config.model
        }
    
    def _build_prompt(self, query: str, context: List[Dict[str, Any]]) -> str:
        """Build a prompt with retrieved context"""
        context_text = self._format_context(context)
        
        prompt = f"""Based on the following context, answer the question. If the context doesn't contain enough information to answer the question accurately, say so.

Context:
{context_text}

Question: {query}

Please provide a comprehensive answer based on the context provided. If you reference specific information, indicate which source it comes from."""

        return prompt
    
    def _format_context(self, context: List[Dict[str, Any]]) -> str:
        """Format context for the prompt"""
        formatted_parts = []
        
        for i, doc in enumerate(context, 1):
            content = doc.get('content', '')
            source = doc.get('metadata', {}).get('source', f'Document {i}')
            
            formatted_parts.append(f"Source {i} ({source}):\n{content}\n")
        
        return "\n".join(formatted_parts)
    
    def _extract_sources(self, context: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Extract source information from context"""
        sources = []
        
        for doc in context:
            source = {
                "id": doc.get('id'),
                "content": doc.get('content', '')[:200] + "...",
                "metadata": doc.get('metadata', {}),
                "score": doc.get('score', 0.0)
            }
            sources.append(source)
        
        return sources
    
    def _get_default_system_prompt(self) -> str:
        """Get default system prompt"""
        return """You are a helpful AI assistant that provides accurate, well-sourced answers based on the context provided. Always cite your sources when possible and be transparent about the limitations of the information available."""
    
    async def generate_with_citations(
        self, 
        query: str, 
        context: List[Dict[str, Any]]
    ) -> Dict[str, Any]:
        """Generate response with inline citations"""
        
        # Add citation instructions to the prompt
        citation_prompt = f"""Based on the following context, answer the question with inline citations. Use [Source X] format to cite specific information.

Context:
{self._format_context(context)}

Question: {query}

Answer with citations:"""
        
        response = await self.client.chat.completions.create(
            model=self.config.model,
            messages=[
                {"role": "system", "content": "You are a helpful AI assistant that provides well-cited answers based on the provided context."},
                {"role": "user", "content": citation_prompt}
            ],
            temperature=self.config.temperature,
            max_tokens=self.config.max_tokens
        )
        
        answer = response.choices[0].message.content
        
        return {
            "answer": answer,
            "sources": self._extract_sources(context),
            "has_citations": "[Source" in answer
        }

# Example usage
config = GenerationConfig(
    model="gpt-4",
    temperature=0.1,
    max_tokens=500
)

generator = ResponseGenerator(config)

# Mock context
context = [
    {
        "id": "doc_1",
        "content": "Vector databases are specialized databases designed to store and query high-dimensional vector data efficiently.",
        "metadata": {"source": "article", "author": "John Doe"},
        "score": 0.95
    },
    {
        "id": "doc_2", 
        "content": "RAG systems combine retrieval with generation to provide more accurate and up-to-date responses.",
        "metadata": {"source": "tutorial", "author": "Jane Smith"},
        "score": 0.87
    }
]

# Generate response
# response = await generator.generate_response("What are vector databases?", context)
# print(response['answer'])

## Implementing RAG with pgvector

Now let's build a complete, production-ready RAG system using PostgreSQL with the pgvector extension. This approach gives us the benefits of a mature, ACID-compliant database while adding powerful vector search capabilities.

### Setting Up PostgreSQL with pgvector

First, let's set up our database infrastructure:

```sql
-- Enable the pgvector extension
CREATE EXTENSION IF NOT EXISTS vector;

-- Create our documents table
CREATE TABLE documents (
    id SERIAL PRIMARY KEY,
    content TEXT NOT NULL,
    embedding vector(1536),
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Create indexes for efficient querying
CREATE INDEX ON documents USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
CREATE INDEX ON documents USING gin (metadata);
CREATE INDEX ON documents (created_at);

-- Create a function to update the updated_at timestamp
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ language 'plpgsql';

-- Create trigger to automatically update updated_at
CREATE TRIGGER update_documents_updated_at 
    BEFORE UPDATE ON documents 
    FOR EACH ROW 
    EXECUTE FUNCTION update_updated_at_column();

Python Implementation

Now let’s create a complete Python implementation:

import asyncio
import asyncpg
import openai
import numpy as np
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import json
from datetime import datetime

@dataclass
class Document:
    id: Optional[int]
    content: str
    embedding: Optional[List[float]]
    metadata: Dict[str, Any]

@dataclass
class SearchResult:
    id: int
    content: str
    metadata: Dict[str, Any]
    similarity: float

class PgVectorRAG:
    def __init__(self, db_url: str, openai_api_key: str):
        self.db_url = db_url
        self.openai_client = openai.AsyncOpenAI(api_key=openai_api_key)
        self.embedding_model = "text-embedding-ada-002"
        self.llm_model = "gpt-4"
    
    async def connect(self):
        """Establish database connection"""
        self.pool = await asyncpg.create_pool(self.db_url)
    
    async def close(self):
        """Close database connection"""
        await self.pool.close()
    
    async def add_documents(self, documents: List[Document]) -> List[int]:
        """Add documents to the database with embeddings"""
        # Generate embeddings for all documents
        for doc in documents:
            if doc.embedding is None:
                doc.embedding = await self._generate_embedding(doc.content)
        
        # Insert documents into database
        async with self.pool.acquire() as conn:
            ids = []
            for doc in documents:
                row = await conn.fetchrow(
                    """
                    INSERT INTO documents (content, embedding, metadata)
                    VALUES ($1, $2, $3)
                    RETURNING id
                    """,
                    doc.content,
                    doc.embedding,
                    json.dumps(doc.metadata)
                )
                ids.append(row['id'])
            
            return ids
    
    async def search(
        self, 
        query: str, 
        top_k: int = 5,
        similarity_threshold: float = 0.7,
        filters: Optional[Dict[str, Any]] = None
    ) -> List[SearchResult]:
        """Search for similar documents"""
        # Generate embedding for query
        query_embedding = await self._generate_embedding(query)
        
        # Build the search query
        search_query = """
        SELECT id, content, metadata, 
               1 - (embedding <=> $1) as similarity
        FROM documents
        WHERE 1 - (embedding <=> $1) > $2
        """
        
        params = [query_embedding, similarity_threshold]
        
        # Add filters if provided
        if filters:
            filter_conditions = []
            for key, value in filters.items():
                filter_conditions.append(f"metadata->>'{key}' = ${len(params) + 1}")
                params.append(str(value))
            
            if filter_conditions:
                search_query += " AND " + " AND ".join(filter_conditions)
        
        search_query += " ORDER BY embedding <=> $1 LIMIT $3"
        params.append(top_k)
        
        # Execute search
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(search_query, *params)
            
            results = []
            for row in rows:
                results.append(SearchResult(
                    id=row['id'],
                    content=row['content'],
                    metadata=json.loads(row['metadata']) if row['metadata'] else {},
                    similarity=float(row['similarity'])
                ))
            
            return results
    
    async def generate_response(
        self, 
        query: str, 
        search_results: List[SearchResult],
        max_tokens: int = 500
    ) -> Dict[str, Any]:
        """Generate a response using retrieved context"""
        if not search_results:
            return {
                "answer": "I couldn't find any relevant information to answer your question.",
                "sources": [],
                "confidence": 0.0
            }
        
        # Build context from search results
        context = self._build_context(search_results)
        
        # Create prompt
        prompt = f"""Based on the following context, answer the question. If the context doesn't contain enough information to answer the question accurately, say so.

Context:
{context}

Question: {query}

Answer:"""
        
        # Generate response
        response = await self.openai_client.chat.completions.create(
            model=self.llm_model,
            messages=[
                {"role": "system", "content": "You are a helpful AI assistant that provides accurate answers based on the provided context."},
                {"role": "user", "content": prompt}
            ],
            max_tokens=max_tokens,
            temperature=0.1
        )
        
        answer = response.choices[0].message.content
        
        # Calculate confidence based on similarity scores
        avg_similarity = np.mean([r.similarity for r in search_results])
        
        return {
            "answer": answer,
            "sources": [
                {
                    "id": r.id,
                    "content": r.content[:200] + "...",
                    "similarity": r.similarity,
                    "metadata": r.metadata
                }
                for r in search_results
            ],
            "confidence": avg_similarity,
            "context_length": len(context)
        }
    
    async def _generate_embedding(self, text: str) -> List[float]:
        """Generate embedding for text using OpenAI"""
        response = await self.openai_client.embeddings.create(
            model=self.embedding_model,
            input=text
        )
        return response.data[0].embedding
    
    def _build_context(self, search_results: List[SearchResult]) -> str:
        """Build context string from search results"""
        context_parts = []
        for i, result in enumerate(search_results, 1):
            context_parts.append(f"Source {i} (similarity: {result.similarity:.3f}):\n{result.content}\n")
        return "\n".join(context_parts)
    
    async def get_document_stats(self) -> Dict[str, Any]:
        """Get statistics about the documents in the database"""
        async with self.pool.acquire() as conn:
            # Total documents
            total_docs = await conn.fetchval("SELECT COUNT(*) FROM documents")
            
            # Documents with embeddings
            docs_with_embeddings = await conn.fetchval(
                "SELECT COUNT(*) FROM documents WHERE embedding IS NOT NULL"
            )
            
            # Average content length
            avg_length = await conn.fetchval(
                "SELECT AVG(LENGTH(content)) FROM documents"
            )
            
            # Most recent document
            latest_doc = await conn.fetchval(
                "SELECT MAX(created_at) FROM documents"
            )
            
            return {
                "total_documents": total_docs,
                "documents_with_embeddings": docs_with_embeddings,
                "average_content_length": avg_length,
                "latest_document_date": latest_doc.isoformat() if latest_doc else None
            }

# Example usage
async def main():
    # Initialize RAG system
    rag = PgVectorRAG(
        db_url="postgresql://user:password@localhost/ragdb",
        openai_api_key="your-openai-api-key"
    )
    
    await rag.connect()
    
    # Add some sample documents
    documents = [
        Document(
            id=None,
            content="Vector databases are specialized databases designed to store and query high-dimensional vector data efficiently.",
            embedding=None,
            metadata={"source": "article", "topic": "vector databases", "author": "John Doe"}
        ),
        Document(
            id=None,
            content="RAG systems combine retrieval with generation to provide more accurate and up-to-date responses than traditional language models.",
            embedding=None,
            metadata={"source": "tutorial", "topic": "RAG", "author": "Jane Smith"}
        ),
        Document(
            id=None,
            content="PostgreSQL with pgvector extension provides ACID compliance and powerful vector search capabilities in a single database.",
            embedding=None,
            metadata={"source": "documentation", "topic": "pgvector", "author": "PostgreSQL Team"}
        )
    ]
    
    # Add documents to database
    doc_ids = await rag.add_documents(documents)
    print(f"Added {len(doc_ids)} documents with IDs: {doc_ids}")
    
    # Search for relevant documents
    query = "What are vector databases and how do they work?"
    search_results = await rag.search(query, top_k=3)
    
    print(f"Found {len(search_results)} relevant documents:")
    for result in search_results:
        print(f"- ID {result.id}: {result.content[:100]}... (similarity: {result.similarity:.3f})")
    
    # Generate response
    response = await rag.generate_response(query, search_results)
    
    print(f"\nGenerated Answer:\n{response['answer']}")
    print(f"\nConfidence: {response['confidence']:.3f}")
    print(f"Sources used: {len(response['sources'])}")
    
    # Get statistics
    stats = await rag.get_document_stats()
    print(f"\nDatabase Statistics:\n{json.dumps(stats, indent=2)}")
    
    await rag.close()

# Run the example
# asyncio.run(main())

Advanced pgvector Features

Let’s explore some advanced features that make pgvector powerful for production RAG systems:

-- Create a partitioned table for large-scale deployments
CREATE TABLE documents_partitioned (
    id SERIAL,
    content TEXT NOT NULL,
    embedding vector(1536),
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    partition_key DATE DEFAULT CURRENT_DATE
) PARTITION BY RANGE (partition_key);

-- Create partitions for different time periods
CREATE TABLE documents_2025_01 PARTITION OF documents_partitioned
    FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');

CREATE TABLE documents_2025_02 PARTITION OF documents_partitioned
    FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');

-- Create indexes on partitions
CREATE INDEX ON documents_2025_01 USING ivfflat (embedding vector_cosine_ops) WITH (lists = 50);
CREATE INDEX ON documents_2025_02 USING ivfflat (embedding vector_cosine_ops) WITH (lists = 50);

-- Create a function for hybrid search (vector + text)
CREATE OR REPLACE FUNCTION hybrid_search(
    query_embedding vector(1536),
    query_text text,
    similarity_threshold float DEFAULT 0.7,
    limit_count int DEFAULT 10
)
RETURNS TABLE (
    id int,
    content text,
    metadata jsonb,
    similarity float,
    text_rank float
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        d.id,
        d.content,
        d.metadata,
        1 - (d.embedding <=> query_embedding) as similarity,
        ts_rank(to_tsvector('english', d.content), plainto_tsquery('english', query_text)) as text_rank
    FROM documents d
    WHERE 1 - (d.embedding <=> query_embedding) > similarity_threshold
       OR to_tsvector('english', d.content) @@ plainto_tsquery('english', query_text)
    ORDER BY 
        GREATEST(
            1 - (d.embedding <=> query_embedding),
            ts_rank(to_tsvector('english', d.content), plainto_tsquery('english', query_text))
        ) DESC
    LIMIT limit_count;
END;
$$ LANGUAGE plpgsql;

-- Create a function for batch similarity search
CREATE OR REPLACE FUNCTION batch_similarity_search(
    query_embeddings vector(1536)[],
    similarity_threshold float DEFAULT 0.7,
    limit_per_query int DEFAULT 5
)
RETURNS TABLE (
    query_index int,
    doc_id int,
    content text,
    similarity float
) AS $$
DECLARE
    query_embedding vector(1536);
    query_idx int := 1;
BEGIN
    FOREACH query_embedding IN ARRAY query_embeddings
    LOOP
        RETURN QUERY
        SELECT 
            query_idx,
            d.id,
            d.content,
            1 - (d.embedding <=> query_embedding) as similarity
        FROM documents d
        WHERE 1 - (d.embedding <=> query_embedding) > similarity_threshold
        ORDER BY d.embedding <=> query_embedding
        LIMIT limit_per_query;
        
        query_idx := query_idx + 1;
    END LOOP;
END;
$$ LANGUAGE plpgsql;

Performance Optimization

For production deployments, consider these optimization strategies:

class OptimizedPgVectorRAG(PgVectorRAG):
    def __init__(self, db_url: str, openai_api_key: str, cache_size: int = 1000):
        super().__init__(db_url, openai_api_key)
        self.cache_size = cache_size
        self.embedding_cache = {}
        self.response_cache = {}
    
    async def _generate_embedding_cached(self, text: str) -> List[float]:
        """Generate embedding with caching"""
        # Simple hash-based cache key
        cache_key = hash(text) % self.cache_size
        
        if cache_key in self.embedding_cache:
            return self.embedding_cache[cache_key]
        
        embedding = await self._generate_embedding(text)
        self.embedding_cache[cache_key] = embedding
        
        return embedding
    
    async def search_with_caching(
        self, 
        query: str, 
        top_k: int = 5,
        use_cache: bool = True
    ) -> List[SearchResult]:
        """Search with response caching"""
        if use_cache:
            cache_key = f"{hash(query)}_{top_k}"
            if cache_key in self.response_cache:
                return self.response_cache[cache_key]
        
        results = await self.search(query, top_k)
        
        if use_cache:
            self.response_cache[cache_key] = results
        
        return results
    
    async def batch_search(
        self, 
        queries: List[str], 
        top_k: int = 5
    ) -> List[List[SearchResult]]:
        """Perform batch search for multiple queries"""
        # Generate embeddings for all queries
        embeddings = []
        for query in queries:
            embedding = await self._generate_embedding_cached(query)
            embeddings.append(embedding)
        
        # Use batch similarity search
        async with self.pool.acquire() as conn:
            # Convert embeddings to PostgreSQL array format
            embedding_array = f"ARRAY{embeddings}"
            
            rows = await conn.fetch(
                f"SELECT * FROM batch_similarity_search({embedding_array}, 0.7, {top_k})"
            )
            
            # Group results by query
            results_by_query = [[] for _ in queries]
            for row in rows:
                query_idx = row['query_index'] - 1
                if query_idx < len(queries):
                    results_by_query[query_idx].append(SearchResult(
                        id=row['doc_id'],
                        content=row['content'],
                        metadata={},
                        similarity=float(row['similarity'])
                    ))
            
            return results_by_query
    
    async def get_performance_metrics(self) -> Dict[str, Any]:
        """Get performance metrics for the RAG system"""
        async with self.pool.acquire() as conn:
            # Index usage statistics
            index_stats = await conn.fetch("""
                SELECT schemaname, tablename, indexname, idx_scan, idx_tup_read, idx_tup_fetch
                FROM pg_stat_user_indexes 
                WHERE indexname LIKE '%embedding%'
            """)
            
            # Query performance statistics
            query_stats = await conn.fetch("""
                SELECT query, calls, total_time, mean_time
                FROM pg_stat_statements 
                WHERE query LIKE '%embedding%'
                ORDER BY total_time DESC
                LIMIT 10
            """)
            
            return {
                "index_statistics": [dict(row) for row in index_stats],
                "query_statistics": [dict(row) for row in query_stats],
                                 "cache_hit_rate": len(self.embedding_cache) / self.cache_size if self.cache_size > 0 else 0
             }

## Scaling RAG Pipelines

As your RAG system grows, you'll need to address scalability challenges across multiple dimensions: data volume, query throughput, and system complexity.

### Horizontal Scaling with Sharding

For large-scale deployments, consider sharding your vector database across multiple instances:

```python
import hashlib
from typing import List, Dict, Any
import asyncio

class ShardedVectorDB:
    def __init__(self, shard_configs: List[Dict[str, str]]):
        """
        Initialize sharded vector database
        shard_configs: List of database connection strings for each shard
        """
        self.shards = []
        self.shard_count = len(shard_configs)
        
        # Initialize connections to all shards
        for config in shard_configs:
            shard = PgVectorRAG(config['db_url'], config['openai_api_key'])
            self.shards.append(shard)
    
    async def connect_all(self):
        """Connect to all shards"""
        await asyncio.gather(*[shard.connect() for shard in self.shards])
    
    async def close_all(self):
        """Close all shard connections"""
        await asyncio.gather(*[shard.close() for shard in self.shards])
    
    def _get_shard_for_document(self, document_id: str) -> int:
        """Determine which shard to use for a document"""
        hash_value = int(hashlib.md5(document_id.encode()).hexdigest(), 16)
        return hash_value % self.shard_count
    
    async def add_documents(self, documents: List[Document]) -> List[int]:
        """Add documents to appropriate shards"""
        # Group documents by shard
        shard_docs = [[] for _ in range(self.shard_count)]
        
        for doc in documents:
            shard_idx = self._get_shard_for_document(doc.content)
            shard_docs[shard_idx].append(doc)
        
        # Add documents to each shard
        all_ids = []
        for shard_idx, docs in enumerate(shard_docs):
            if docs:
                ids = await self.shards[shard_idx].add_documents(docs)
                all_ids.extend(ids)
        
        return all_ids
    
    async def search(self, query: str, top_k: int = 5) -> List[SearchResult]:
        """Search across all shards and merge results"""
        # Search all shards in parallel
        search_tasks = [
            shard.search(query, top_k=top_k) 
            for shard in self.shards
        ]
        
        all_results = await asyncio.gather(*search_tasks)
        
        # Merge and sort results
        merged_results = []
        for results in all_results:
            merged_results.extend(results)
        
        # Sort by similarity and return top_k
        merged_results.sort(key=lambda x: x.similarity, reverse=True)
        return merged_results[:top_k]

# Example usage
shard_configs = [
    {"db_url": "postgresql://user:pass@shard1/ragdb", "openai_api_key": "key1"},
    {"db_url": "postgresql://user:pass@shard2/ragdb", "openai_api_key": "key2"},
    {"db_url": "postgresql://user:pass@shard3/ragdb", "openai_api_key": "key3"}
]

sharded_db = ShardedVectorDB(shard_configs)
# await sharded_db.connect_all()

Hybrid Search Strategies

Combine vector search with traditional text search for better results:

from rank_bm25 import BM25Okapi
import re

class HybridSearchRAG:
    def __init__(self, vector_db: PgVectorRAG):
        self.vector_db = vector_db
        self.bm25_index = None
        self.documents = []
    
    async def build_bm25_index(self, documents: List[Document]):
        """Build BM25 index for keyword search"""
        self.documents = documents
        
        # Tokenize documents
        tokenized_docs = []
        for doc in documents:
            tokens = self._tokenize(doc.content)
            tokenized_docs.append(tokens)
        
        # Build BM25 index
        self.bm25_index = BM25Okapi(tokenized_docs)
    
    def _tokenize(self, text: str) -> List[str]:
        """Simple tokenization"""
        return re.findall(r'\w+', text.lower())
    
    async def hybrid_search(
        self, 
        query: str, 
        top_k: int = 5,
        vector_weight: float = 0.7,
        keyword_weight: float = 0.3
    ) -> List[SearchResult]:
        """Perform hybrid search combining vector and keyword search"""
        
        # Vector search
        vector_results = await self.vector_db.search(query, top_k=top_k * 2)
        
        # Keyword search
        query_tokens = self._tokenize(query)
        keyword_scores = self.bm25_index.get_scores(query_tokens)
        
        # Create keyword results
        keyword_results = []
        for i, score in enumerate(keyword_scores):
            if score > 0:
                keyword_results.append(SearchResult(
                    id=self.documents[i].id,
                    content=self.documents[i].content,
                    metadata=self.documents[i].metadata,
                    similarity=score
                ))
        
        # Sort keyword results by score
        keyword_results.sort(key=lambda x: x.similarity, reverse=True)
        keyword_results = keyword_results[:top_k * 2]
        
        # Combine results using reciprocal rank fusion
        combined = self._reciprocal_rank_fusion(
            vector_results, 
            keyword_results,
            vector_weight=vector_weight,
            keyword_weight=keyword_weight
        )
        
        return combined[:top_k]
    
    def _reciprocal_rank_fusion(
        self, 
        vector_results: List[SearchResult],
        keyword_results: List[SearchResult],
        vector_weight: float = 0.7,
        keyword_weight: float = 0.3
    ) -> List[SearchResult]:
        """Combine results using weighted reciprocal rank fusion"""
        
        # Create mapping from doc_id to combined score
        doc_scores = {}
        
        # Add vector search scores
        for i, result in enumerate(vector_results):
            doc_id = result.id
            if doc_id not in doc_scores:
                doc_scores[doc_id] = {
                    'doc': result,
                    'score': 0.0,
                    'vector_rank': i + 1,
                    'keyword_rank': None
                }
            doc_scores[doc_id]['score'] += vector_weight / (i + 1)
        
        # Add keyword search scores
        for i, result in enumerate(keyword_results):
            doc_id = result.id
            if doc_id not in doc_scores:
                doc_scores[doc_id] = {
                    'doc': result,
                    'score': 0.0,
                    'vector_rank': None,
                    'keyword_rank': i + 1
                }
            else:
                doc_scores[doc_id]['keyword_rank'] = i + 1
            doc_scores[doc_id]['score'] += keyword_weight / (i + 1)
        
        # Sort by combined score
        combined = list(doc_scores.values())
        combined.sort(key=lambda x: x['score'], reverse=True)
        
        return [item['doc'] for item in combined]

Caching and Performance Optimization

Implement comprehensive caching strategies for production systems:

import redis
import pickle
import time
from functools import wraps

class CachedRAG:
    def __init__(self, rag_system: PgVectorRAG, redis_url: str = "redis://localhost:6379"):
        self.rag = rag_system
        self.redis_client = redis.from_url(redis_url)
        self.cache_ttl = 3600  # 1 hour
    
    async def cached_search(self, query: str, top_k: int = 5) -> List[SearchResult]:
        """Search with caching"""
        cache_key = f"search:{hash(query)}_{top_k}"
        
        # Try to get from cache
        cached_result = self.redis_client.get(cache_key)
        if cached_result:
            return pickle.loads(cached_result)
        
        # Perform search
        results = await self.rag.search(query, top_k)
        
        # Cache results
        self.redis_client.setex(
            cache_key, 
            self.cache_ttl, 
            pickle.dumps(results)
        )
        
        return results
    
    async def cached_generate_response(
        self, 
        query: str, 
        search_results: List[SearchResult]
    ) -> Dict[str, Any]:
        """Generate response with caching"""
        # Create cache key based on query and top result IDs
        top_result_ids = [r.id for r in search_results[:3]]
        cache_key = f"response:{hash(query)}_{hash(tuple(top_result_ids))}"
        
        # Try to get from cache
        cached_result = self.redis_client.get(cache_key)
        if cached_result:
            return pickle.loads(cached_result)
        
        # Generate response
        response = await self.rag.generate_response(query, search_results)
        
        # Cache response
        self.redis_client.setex(
            cache_key, 
            self.cache_ttl, 
            pickle.dumps(response)
        )
        
        return response
    
    async def batch_process_with_caching(
        self, 
        queries: List[str], 
        top_k: int = 5
    ) -> List[Dict[str, Any]]:
        """Process multiple queries with intelligent caching"""
        results = []
        
        for query in queries:
            # Check if we have a cached response first
            cache_key = f"full_response:{hash(query)}_{top_k}"
            cached_response = self.redis_client.get(cache_key)
            
            if cached_response:
                results.append(pickle.loads(cached_response))
            else:
                # Perform full RAG pipeline
                search_results = await self.cached_search(query, top_k)
                response = await self.cached_generate_response(query, search_results)
                
                # Cache full response
                self.redis_client.setex(
                    cache_key, 
                    self.cache_ttl, 
                    pickle.dumps(response)
                )
                
                results.append(response)
        
        return results
    
    def get_cache_stats(self) -> Dict[str, Any]:
        """Get cache statistics"""
        info = self.redis_client.info()
        return {
            "total_connections_received": info.get("total_connections_received", 0),
            "total_commands_processed": info.get("total_commands_processed", 0),
            "keyspace_hits": info.get("keyspace_hits", 0),
            "keyspace_misses": info.get("keyspace_misses", 0),
            "used_memory_human": info.get("used_memory_human", "0B"),
            "hit_rate": info.get("keyspace_hits", 0) / max(info.get("keyspace_hits", 0) + info.get("keyspace_misses", 0), 1)
        }

# Example usage
# cached_rag = CachedRAG(rag_system, redis_url="redis://localhost:6379")
# results = await cached_rag.batch_process_with_caching(["What are vector databases?", "How does RAG work?"])

Monitoring and Cost Optimization

Implement comprehensive monitoring for production RAG systems:

import time
import logging
from dataclasses import dataclass
from typing import Dict, Any, List
import asyncio

@dataclass
class RAGMetrics:
    query_latency: float
    embedding_latency: float
    search_latency: float
    generation_latency: float
    tokens_used: int
    cost_estimate: float
    cache_hit: bool
    result_count: int

class RAGMonitor:
    def __init__(self):
        self.metrics_history: List[RAGMetrics] = []
        self.logger = logging.getLogger(__name__)
    
    async def monitor_rag_call(self, func, *args, **kwargs):
        """Monitor a RAG function call and collect metrics"""
        start_time = time.time()
        
        try:
            # Track embedding generation time
            embedding_start = time.time()
            # ... embedding generation code ...
            embedding_latency = time.time() - embedding_start
            
            # Track search time
            search_start = time.time()
            # ... search code ...
            search_latency = time.time() - search_start
            
            # Track generation time
            generation_start = time.time()
            result = await func(*args, **kwargs)
            generation_latency = time.time() - generation_start
            
            # Calculate total latency
            total_latency = time.time() - start_time
            
            # Estimate costs (example rates)
            embedding_cost = 0.0001  # per 1K tokens
            generation_cost = 0.03   # per 1K tokens
            
            metrics = RAGMetrics(
                query_latency=total_latency,
                embedding_latency=embedding_latency,
                search_latency=search_latency,
                generation_latency=generation_latency,
                tokens_used=result.get('tokens_used', 0),
                cost_estimate=result.get('cost_estimate', 0.0),
                cache_hit=result.get('cache_hit', False),
                result_count=len(result.get('sources', []))
            )
            
            self.metrics_history.append(metrics)
            
            # Log metrics
            self.logger.info(f"RAG Query - Latency: {total_latency:.3f}s, Cost: ${metrics.cost_estimate:.4f}")
            
            return result
            
        except Exception as e:
            self.logger.error(f"RAG query failed: {e}")
            raise
    
    def get_performance_summary(self) -> Dict[str, Any]:
        """Get performance summary from collected metrics"""
        if not self.metrics_history:
            return {}
        
        latencies = [m.query_latency for m in self.metrics_history]
        costs = [m.cost_estimate for m in self.metrics_history]
        cache_hits = [m.cache_hit for m in self.metrics_history]
        
        return {
            "total_queries": len(self.metrics_history),
            "average_latency": sum(latencies) / len(latencies),
            "p95_latency": sorted(latencies)[int(len(latencies) * 0.95)],
            "p99_latency": sorted(latencies)[int(len(latencies) * 0.99)],
            "total_cost": sum(costs),
            "average_cost_per_query": sum(costs) / len(costs),
            "cache_hit_rate": sum(cache_hits) / len(cache_hits),
            "average_results_per_query": sum(m.result_count for m in self.metrics_history) / len(self.metrics_history)
        }
    
    def generate_cost_report(self) -> Dict[str, Any]:
        """Generate detailed cost analysis"""
        if not self.metrics_history:
            return {}
        
        total_embedding_cost = sum(m.cost_estimate * 0.3 for m in self.metrics_history)  # Estimate 30% for embeddings
        total_generation_cost = sum(m.cost_estimate * 0.7 for m in self.metrics_history)  # Estimate 70% for generation
        
        return {
            "total_cost": sum(m.cost_estimate for m in self.metrics_history),
            "embedding_cost": total_embedding_cost,
            "generation_cost": total_generation_cost,
            "cost_per_query": sum(m.cost_estimate for m in self.metrics_history) / len(self.metrics_history),
            "estimated_monthly_cost": sum(m.cost_estimate for m in self.metrics_history) * 30,  # Assuming daily usage
            "cost_breakdown": {
                "embeddings": f"{total_embedding_cost / sum(m.cost_estimate for m in self.metrics_history) * 100:.1f}%",
                "generation": f"{total_generation_cost / sum(m.cost_estimate for m in self.metrics_history) * 100:.1f}%"
            }
        }

# Example usage
# monitor = RAGMonitor()
# result = await monitor.monitor_rag_call(rag_system.query, "What are vector databases?")
# performance_summary = monitor.get_performance_summary()
# cost_report = monitor.generate_cost_report()

Advanced Use Cases

RAG systems are incredibly versatile and can be applied to a wide range of advanced use cases beyond simple question answering.

Multi-Modal RAG

Extend RAG to handle images, audio, and other data types:

from PIL import Image
import torch
from transformers import CLIPProcessor, CLIPModel
import numpy as np

class MultiModalRAG:
    def __init__(self, vector_db: PgVectorRAG):
        self.vector_db = vector_db
        self.clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
        self.clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
    
    def encode_image(self, image_path: str) -> List[float]:
        """Encode image to vector representation"""
        image = Image.open(image_path)
        inputs = self.clip_processor(images=image, return_tensors="pt")
        
        with torch.no_grad():
            image_features = self.clip_model.get_image_features(**inputs)
        
        return image_features.numpy().flatten().tolist()
    
    def encode_text(self, text: str) -> List[float]:
        """Encode text to vector representation"""
        inputs = self.clip_processor(text=text, return_tensors="pt", padding=True)
        
        with torch.no_grad():
            text_features = self.clip_model.get_text_features(**inputs)
        
        return text_features.numpy().flatten().tolist()
    
    async def search_by_image(self, image_path: str, top_k: int = 5) -> List[SearchResult]:
        """Search for documents similar to an image"""
        image_embedding = self.encode_image(image_path)
        
        # Search in vector database
        results = await self.vector_db.search_by_embedding(image_embedding, top_k)
        return results
    
    async def search_by_text_and_image(
        self, 
        text: str, 
        image_path: str, 
        top_k: int = 5
    ) -> List[SearchResult]:
        """Search using both text and image"""
        text_embedding = self.encode_text(text)
        image_embedding = self.encode_image(image_path)
        
        # Combine embeddings (simple average)
        combined_embedding = [
            (t + i) / 2 for t, i in zip(text_embedding, image_embedding)
        ]
        
        results = await self.vector_db.search_by_embedding(combined_embedding, top_k)
        return results

# Example usage
# multimodal_rag = MultiModalRAG(vector_db)
# results = await multimodal_rag.search_by_image("product_image.jpg")
# combined_results = await multimodal_rag.search_by_text_and_image("red shoes", "shoe_image.jpg")

Domain-Specific Knowledge Bases

Build specialized RAG systems for specific domains:

class MedicalRAG:
    def __init__(self, vector_db: PgVectorRAG):
        self.vector_db = vector_db
        self.medical_terms = self._load_medical_terminology()
    
    def _load_medical_terminology(self) -> Dict[str, str]:
        """Load medical terminology for query expansion"""
        return {
            "heart attack": "myocardial infarction",
            "high blood pressure": "hypertension",
            "diabetes": "diabetes mellitus",
            # Add more medical term mappings
        }
    
    def expand_medical_query(self, query: str) -> str:
        """Expand query with medical terminology"""
        expanded_query = query
        for term, medical_term in self.medical_terms.items():
            if term.lower() in query.lower():
                expanded_query += f" OR {medical_term}"
        
        return expanded_query
    
    async def medical_search(self, query: str, top_k: int = 5) -> List[SearchResult]:
        """Search with medical query expansion"""
        expanded_query = self.expand_medical_query(query)
        results = await self.vector_db.search(expanded_query, top_k)
        
        # Filter results for medical relevance
        medical_results = [
            result for result in results 
            if self._is_medically_relevant(result.content)
        ]
        
        return medical_results[:top_k]
    
    def _is_medically_relevant(self, content: str) -> bool:
        """Check if content is medically relevant"""
        medical_keywords = [
            "diagnosis", "treatment", "symptoms", "patient", "clinical",
            "medical", "health", "disease", "condition", "therapy"
        ]
        
        content_lower = content.lower()
        return any(keyword in content_lower for keyword in medical_keywords)

class LegalRAG:
    def __init__(self, vector_db: PgVectorRAG):
        self.vector_db = vector_db
        self.jurisdiction = "US"  # Default jurisdiction
    
    async def legal_search(
        self, 
        query: str, 
        jurisdiction: str = None,
        case_law_only: bool = False,
        top_k: int = 5
    ) -> List[SearchResult]:
        """Search legal documents with jurisdiction filtering"""
        
        # Add jurisdiction filter
        filters = {}
        if jurisdiction:
            filters["jurisdiction"] = jurisdiction
        elif self.jurisdiction:
            filters["jurisdiction"] = self.jurisdiction
        
        if case_law_only:
            filters["document_type"] = "case_law"
        
        results = await self.vector_db.search(query, top_k, filters=filters)
        return results
    
    async def cite_check(self, text: str) -> List[Dict[str, Any]]:
        """Check citations in legal text"""
        # Extract potential citations
        citations = self._extract_citations(text)
        
        # Verify citations against knowledge base
        verified_citations = []
        for citation in citations:
            search_results = await self.vector_db.search(citation, top_k=1)
            if search_results:
                verified_citations.append({
                    "citation": citation,
                    "verified": True,
                    "source": search_results[0].content[:200]
                })
            else:
                verified_citations.append({
                    "citation": citation,
                    "verified": False,
                    "source": None
                })
        
        return verified_citations
    
    def _extract_citations(self, text: str) -> List[str]:
        """Extract legal citations from text"""
        import re
        
        # Common legal citation patterns
        patterns = [
            r'\d+ U\.S\. \d+',  # Supreme Court cases
            r'\d+ F\.\d+ \d+',  # Federal cases
            r'\d+ S\.Ct\. \d+',  # Supreme Court Reporter
            r'\d+ L\.Ed\.\d+ \d+',  # Lawyers' Edition
        ]
        
        citations = []
        for pattern in patterns:
            matches = re.findall(pattern, text)
            citations.extend(matches)
        
        return citations

# Example usage
# medical_rag = MedicalRAG(vector_db)
# medical_results = await medical_rag.medical_search("What are the symptoms of diabetes?")

# legal_rag = LegalRAG(vector_db)
# legal_results = await legal_rag.legal_search("contract breach", jurisdiction="California")
# citations = await legal_rag.cite_check("The court held in Smith v. Jones, 123 U.S. 456...")

Build comprehensive enterprise search systems:

class EnterpriseRAG:
    def __init__(self, vector_db: PgVectorRAG):
        self.vector_db = vector_db
        self.departments = ["engineering", "marketing", "sales", "hr", "finance"]
        self.access_control = self._load_access_control()
    
    def _load_access_control(self) -> Dict[str, List[str]]:
        """Load user access permissions"""
        return {
            "user1": ["engineering", "marketing"],
            "user2": ["sales", "finance"],
            "admin": self.departments
        }
    
    async def enterprise_search(
        self, 
        query: str, 
        user_id: str,
        departments: List[str] = None,
        document_types: List[str] = None,
        date_range: tuple = None,
        top_k: int = 10
    ) -> List[SearchResult]:
        """Search with enterprise access controls"""
        
        # Get user permissions
        user_permissions = self.access_control.get(user_id, [])
        
        # Build filters
        filters = {}
        
        if departments:
            # Filter by requested departments (must be in user permissions)
            allowed_departments = [d for d in departments if d in user_permissions]
            if allowed_departments:
                filters["department"] = allowed_departments
        else:
            # Use all user permissions
            filters["department"] = user_permissions
        
        if document_types:
            filters["document_type"] = document_types
        
        if date_range:
            start_date, end_date = date_range
            filters["date_range"] = f"{start_date} TO {end_date}"
        
        # Perform search
        results = await self.vector_db.search(query, top_k, filters=filters)
        
        # Add access control metadata
        for result in results:
            result.metadata["access_level"] = "user"
            result.metadata["departments"] = filters.get("department", [])
        
        return results
    
    async def search_across_sources(
        self, 
        query: str, 
        sources: List[str],
        user_id: str
    ) -> Dict[str, List[SearchResult]]:
        """Search across multiple data sources"""
        
        results_by_source = {}
        
        for source in sources:
            # Check if user has access to this source
            if self._user_has_access(user_id, source):
                source_results = await self.vector_db.search(
                    query, 
                    top_k=5,
                    filters={"source": source}
                )
                results_by_source[source] = source_results
        
        return results_by_source
    
    def _user_has_access(self, user_id: str, source: str) -> bool:
        """Check if user has access to a specific source"""
        user_permissions = self.access_control.get(user_id, [])
        return source in user_permissions or "admin" in user_permissions
    
    async def generate_enterprise_report(
        self, 
        query: str, 
        user_id: str,
        report_type: str = "summary"
    ) -> Dict[str, Any]:
        """Generate enterprise report from search results"""
        
        # Get search results
        results = await self.enterprise_search(query, user_id, top_k=20)
        
        if report_type == "summary":
            return self._generate_summary_report(results)
        elif report_type == "detailed":
            return self._generate_detailed_report(results)
        elif report_type == "analytics":
            return self._generate_analytics_report(results)
        else:
            raise ValueError(f"Unknown report type: {report_type}")
    
    def _generate_summary_report(self, results: List[SearchResult]) -> Dict[str, Any]:
        """Generate summary report"""
        departments = {}
        document_types = {}
        
        for result in results:
            dept = result.metadata.get("department", "unknown")
            doc_type = result.metadata.get("document_type", "unknown")
            
            departments[dept] = departments.get(dept, 0) + 1
            document_types[doc_type] = document_types.get(doc_type, 0) + 1
        
        return {
            "total_results": len(results),
            "departments": departments,
            "document_types": document_types,
            "average_similarity": sum(r.similarity for r in results) / len(results) if results else 0,
            "top_results": [
                {
                    "content": r.content[:200] + "...",
                    "department": r.metadata.get("department"),
                    "similarity": r.similarity
                }
                for r in results[:5]
            ]
        }
    
    def _generate_detailed_report(self, results: List[SearchResult]) -> Dict[str, Any]:
        """Generate detailed report"""
        return {
            "total_results": len(results),
            "results": [
                {
                    "id": r.id,
                    "content": r.content,
                    "metadata": r.metadata,
                    "similarity": r.similarity
                }
                for r in results
            ]
        }
    
    def _generate_analytics_report(self, results: List[SearchResult]) -> Dict[str, Any]:
        """Generate analytics report"""
        # Calculate various analytics
        similarities = [r.similarity for r in results]
        
        return {
            "total_results": len(results),
            "similarity_stats": {
                "mean": sum(similarities) / len(similarities) if similarities else 0,
                "median": sorted(similarities)[len(similarities)//2] if similarities else 0,
                "min": min(similarities) if similarities else 0,
                "max": max(similarities) if similarities else 0
            },
            "department_distribution": self._get_distribution(results, "department"),
            "document_type_distribution": self._get_distribution(results, "document_type"),
            "date_distribution": self._get_date_distribution(results)
        }
    
    def _get_distribution(self, results: List[SearchResult], key: str) -> Dict[str, int]:
        """Get distribution of a metadata key"""
        distribution = {}
        for result in results:
            value = result.metadata.get(key, "unknown")
            distribution[value] = distribution.get(value, 0) + 1
        return distribution
    
    def _get_date_distribution(self, results: List[SearchResult]) -> Dict[str, int]:
        """Get date distribution of results"""
        date_dist = {}
        for result in results:
            date = result.metadata.get("created_date", "unknown")
            if date != "unknown":
                year_month = date[:7]  # YYYY-MM
                date_dist[year_month] = date_dist.get(year_month, 0) + 1
        return date_dist

# Example usage
# enterprise_rag = EnterpriseRAG(vector_db)
# results = await enterprise_rag.enterprise_search(
#     "Q4 sales performance", 
#     user_id="user1", 
#     departments=["sales", "marketing"]
# )
# report = await enterprise_rag.generate_enterprise_report(
#     "Q4 sales performance", 
#     user_id="user1", 
#     report_type="analytics"
# )

Conclusion

The landscape of AI applications is rapidly evolving, and RAG systems with vector databases are at the forefront of this transformation. As we’ve explored throughout this comprehensive guide, building production-ready RAG pipelines requires careful consideration of multiple components: from choosing the right vector database to implementing efficient retrieval strategies and scaling for enterprise use.

The Future of RAG Systems

The future of RAG systems is incredibly promising, with several exciting developments on the horizon:

AI Agents and Autonomous Systems: RAG systems are evolving into intelligent agents that can not only retrieve and generate information but also take actions based on that information. These agents will be able to interact with multiple data sources, make decisions, and execute tasks autonomously.

Self-Improving Indexes: Future RAG systems will feature self-improving vector indexes that automatically optimize themselves based on user interactions, query patterns, and feedback. This will lead to continuously improving search quality and relevance.

Multimodal RAG: The integration of text, images, audio, and video in RAG systems will become more sophisticated, enabling truly multimodal AI applications that can understand and reason across different types of content.

Real-Time Learning: RAG systems will increasingly incorporate real-time learning capabilities, allowing them to adapt to new information as it becomes available without requiring complete retraining.

Federated RAG: Organizations will deploy federated RAG systems that can search across multiple distributed knowledge bases while maintaining data privacy and security.

Getting Started with RAG

If you’re ready to implement RAG in your organization, start with these steps:

  1. Assess Your Data: Identify the knowledge sources that would benefit from RAG integration
  2. Choose Your Vector Database: Select a vector database that fits your scale, budget, and technical requirements
  3. Start Small: Begin with a focused use case and expand gradually
  4. Measure and Iterate: Continuously monitor performance and user feedback
  5. Scale Thoughtfully: Plan for growth from the beginning

Key Takeaways

  • RAG solves the hallucination problem by grounding AI responses in retrieved facts
  • Vector databases are essential for efficient semantic search at scale
  • Production RAG requires careful architecture including caching, monitoring, and optimization
  • Hybrid search strategies combine the best of vector and keyword search
  • Domain-specific RAG systems can provide specialized capabilities for different industries
  • Enterprise RAG systems require careful attention to access control and governance

The combination of vector databases and RAG pipelines represents a fundamental shift in how we build AI applications. By providing accurate, up-to-date, and verifiable information, RAG systems enable AI applications that are not just impressive but truly useful and trustworthy.

As the technology continues to mature, organizations that successfully implement RAG systems will find themselves with a significant competitive advantage—the ability to build AI applications that can access and reason about their specific knowledge while maintaining the natural language capabilities that make AI so powerful.

The future of AI is not just about bigger models—it’s about smarter systems that can access the right information at the right time. RAG with vector databases is the foundation upon which this future is being built.

Join the Discussion

Have thoughts on this article? Share your insights and engage with the community.