By Ali Elborey

RAG That Doesn't Rot: Freshness-Aware Retrieval with Incremental Indexing + Query Rewrites + Reranking

ragretrieval-augmented-generationfreshnessincremental-indexingrerankingquery-rewritesvector-searchembeddingspythonproductionknowledge-management

Freshness-Aware RAG Pipeline

Most RAG demos look good. They retrieve relevant docs. They generate coherent answers. They cite sources. Then they fail quietly in production.

The problem isn’t the retrieval algorithm. It’s not the embedding model. It’s not the LLM. The problem is that content changes, and most RAG systems don’t handle change well.

This article focuses on one specific problem: knowledge freshness. You’ll see how to build RAG that stays accurate when docs update daily, policies change, pages get removed, and multiple versions of “truth” exist.

The Real Failure Mode: Stale Context

Here’s what happens. A user asks: “What’s our refund policy?”

Your RAG system retrieves a document from last month. It says refunds are allowed within 30 days. The answer is well-written. It cites the source. But the policy changed yesterday. Refunds are now allowed within 60 days.

The user follows the old policy. They get rejected. They lose trust. They complain.

This isn’t a hypothetical. It happens in production. Docs update daily. Policies change. Pages get removed. Multiple versions exist. Your RAG system needs to handle this.

The Answer Is Well-Written, But Cites Outdated Docs

The model generates good text. It’s coherent. It’s grammatically correct. It cites sources. But the sources are stale.

# User query: "What's our refund policy?"
# Retrieved doc (from last month): "Refunds allowed within 30 days"
# Actual policy (updated yesterday): "Refunds allowed within 60 days"

response = llm.generate(
    context="Refunds are allowed within 30 days of purchase...",
    query="What's our refund policy?"
)
# Response: "Our refund policy allows returns within 30 days..."
# This is wrong. The policy changed.

The retrieval worked. The generation worked. But the answer is wrong because the context is stale.

Users Lose Trust Fast When RAG Is Confidently Wrong

When RAG is wrong, it’s confidently wrong. It cites sources. It sounds authoritative. Users trust it. Then they find out it’s outdated. Trust evaporates.

This is worse than saying “I don’t know.” Saying “I don’t know” is honest. Giving a confident wrong answer is misleading.

Define “Freshness”

Freshness isn’t the same as relevance. A document can be highly relevant but stale. A document can be fresh but irrelevant.

Freshness vs Relevance (Not the Same)

Relevance measures how well a document matches the query semantically. Freshness measures how recent the document is.

# High relevance, low freshness
old_doc = {
    "content": "Our refund policy allows returns within 30 days",
    "published_at": "2025-11-01",
    "relevance_score": 0.95
}

# Lower relevance, higher freshness
new_doc = {
    "content": "We've updated our shipping times. Standard shipping now takes 3-5 days.",
    "published_at": "2026-01-04",
    "relevance_score": 0.75
}

# Query: "What's our refund policy?"
# old_doc has higher relevance, but it's stale
# new_doc is fresh, but less relevant

You need both. High relevance. High freshness. But when you have to choose, freshness matters more for time-sensitive queries.

What “Fresh Enough” Means Depends on the Domain

“Fresh enough” depends on what you’re building.

  • Policy docs: Need updates within hours. Policies change frequently.
  • Product catalogs: Need updates daily. Prices and availability change.
  • Technical documentation: Need updates weekly. APIs evolve.
  • Historical data: Freshness doesn’t matter. It’s historical.
FRESHNESS_THRESHOLDS = {
    "policy": timedelta(hours=6),      # 6 hours
    "product": timedelta(days=1),        # 1 day
    "documentation": timedelta(days=7),  # 7 days
    "historical": None                   # No threshold
}

Define thresholds per domain. Use them in retrieval and reranking.

Indexing Strategy That Supports Change

Most RAG systems re-index everything when content changes. That’s slow. That’s expensive. That doesn’t scale.

Instead, use incremental indexing. Only re-embed changed chunks. Track what changed. Handle deletions. Version your content.

Incremental Ingestion (Only Re-Embed Changed Chunks)

Don’t re-embed everything. Only re-embed what changed.

import hashlib
from datetime import datetime
from typing import Optional

class DocumentTracker:
    def __init__(self):
        self.doc_hashes = {}  # doc_id -> content_hash
        self.last_indexed = {}  # doc_id -> timestamp
    
    def compute_hash(self, content: str) -> str:
        """Compute content hash to detect changes."""
        return hashlib.sha256(content.encode()).hexdigest()
    
    def has_changed(self, doc_id: str, content: str) -> bool:
        """Check if document content has changed."""
        current_hash = self.compute_hash(content)
        old_hash = self.doc_hashes.get(doc_id)
        
        if old_hash is None:
            return True  # New document
        
        return current_hash != old_hash
    
    def mark_indexed(self, doc_id: str, content: str):
        """Mark document as indexed with current hash."""
        self.doc_hashes[doc_id] = self.compute_hash(content)
        self.last_indexed[doc_id] = datetime.now()

When you crawl documents, check if they changed. Only re-embed if the hash changed.

def incremental_ingest(doc_id: str, content: str, tracker: DocumentTracker):
    """Only re-embed if content changed."""
    if not tracker.has_changed(doc_id, content):
        print(f"Doc {doc_id} unchanged, skipping")
        return
    
    # Content changed, re-embed
    chunks = chunk_document(content)
    embeddings = generate_embeddings(chunks)
    
    # Update index
    update_vector_index(doc_id, chunks, embeddings)
    
    # Mark as indexed
    tracker.mark_indexed(doc_id, content)

This saves time. This saves money. This scales.

Content Hashing + Last-Modified Tracking

Use content hashing to detect changes. Use last-modified timestamps to track freshness.

class DocumentMetadata:
    def __init__(self, doc_id: str, content: str, published_at: datetime):
        self.doc_id = doc_id
        self.content = content
        self.content_hash = hashlib.sha256(content.encode()).hexdigest()
        self.published_at = published_at
        self.indexed_at = datetime.now()
        self.version = 1
    
    def update(self, new_content: str, new_published_at: datetime):
        """Update document and increment version."""
        self.content = new_content
        self.content_hash = hashlib.sha256(new_content.encode()).hexdigest()
        self.published_at = new_published_at
        self.indexed_at = datetime.now()
        self.version += 1

Store metadata with each document. Use it for freshness scoring.

Tombstones for Deleted Content (Don’t Keep Retrieving Ghosts)

When a document is deleted, mark it as deleted. Don’t keep retrieving it.

class VectorIndex:
    def __init__(self):
        self.embeddings = {}
        self.metadata = {}
        self.tombstones = set()  # doc_ids that are deleted
    
    def delete_document(self, doc_id: str):
        """Mark document as deleted."""
        self.tombstones.add(doc_id)
        # Optionally: remove from index immediately
        # Or: keep in index but filter during retrieval
    
    def is_deleted(self, doc_id: str) -> bool:
        """Check if document is deleted."""
        return doc_id in self.tombstones
    
    def retrieve(self, query_embedding, top_k: int = 10):
        """Retrieve documents, filtering out deleted ones."""
        results = self._similarity_search(query_embedding, top_k * 2)
        
        # Filter out deleted documents
        filtered = [
            r for r in results 
            if not self.is_deleted(r.doc_id)
        ]
        
        return filtered[:top_k]

Tombstones prevent retrieving deleted content. They also let you audit what was deleted and when.

Chunk Versioning

Store doc_id, chunk_id, version, published_at, indexed_at. Keep old versions for audit. Bias retrieval to newest.

class ChunkMetadata:
    def __init__(
        self,
        doc_id: str,
        chunk_id: str,
        version: int,
        published_at: datetime,
        indexed_at: datetime
    ):
        self.doc_id = doc_id
        self.chunk_id = chunk_id
        self.version = version
        self.published_at = published_at
        self.indexed_at = indexed_at
    
    def to_dict(self):
        return {
            "doc_id": self.doc_id,
            "chunk_id": self.chunk_id,
            "version": self.version,
            "published_at": self.published_at.isoformat(),
            "indexed_at": self.indexed_at.isoformat()
        }

When you store chunks in your vector database, store metadata too. Use it for freshness scoring and version tracking.

Retrieval Pipeline That Includes Freshness

Retrieval isn’t just about relevance. It’s about relevance and freshness. Use query rewrites. Use hybrid search. Use reranking that considers both.

Query Rewrite Step (Short, Focused, Retrieval-Friendly Query)

Rewrite queries to be retrieval-friendly. Remove filler words. Focus on key terms.

def rewrite_query(query: str) -> str:
    """Rewrite query for better retrieval."""
    # Remove common stop words
    stop_words = {"what", "is", "the", "a", "an", "how", "does", "do"}
    words = query.lower().split()
    filtered = [w for w in words if w not in stop_words]
    
    # Keep original if too short after filtering
    if len(filtered) < 2:
        return query
    
    return " ".join(filtered)

# Example
original = "What is our refund policy?"
rewritten = rewrite_query(original)  # "refund policy"

Short, focused queries work better for retrieval. They match key terms. They reduce noise.

Hybrid Search (Keyword + Vector) If You Can

Use both keyword search and vector search. Combine results.

def hybrid_search(
    query: str,
    query_embedding: list[float],
    vector_index: VectorIndex,
    keyword_index: KeywordIndex,
    top_k: int = 10
) -> list[SearchResult]:
    """Combine vector and keyword search."""
    
    # Vector search
    vector_results = vector_index.search(query_embedding, top_k=top_k * 2)
    
    # Keyword search
    keyword_results = keyword_index.search(query, top_k=top_k * 2)
    
    # Combine and deduplicate
    combined = {}
    for result in vector_results + keyword_results:
        doc_id = result.doc_id
        if doc_id not in combined:
            combined[doc_id] = result
        else:
            # Merge scores
            combined[doc_id].score = max(
                combined[doc_id].score,
                result.score
            )
    
    # Sort by score
    sorted_results = sorted(
        combined.values(),
        key=lambda x: x.score,
        reverse=True
    )
    
    return sorted_results[:top_k]

Hybrid search catches both semantic matches (vector) and exact term matches (keyword). It’s more robust.

Reranking That Considers: Relevance Score, Recency Score, Source Priority

Rerank results using multiple signals: relevance, recency, source priority.

from datetime import datetime, timedelta
import math

def compute_freshness_score(
    published_at: datetime,
    decay_half_life_days: int = 30
) -> float:
    """Compute freshness score using exponential decay."""
    age_days = (datetime.now() - published_at).days
    
    if age_days < 0:
        return 1.0  # Future dates (shouldn't happen)
    
    # Exponential decay: score halves every decay_half_life_days
    score = math.exp(-age_days * math.log(2) / decay_half_life_days)
    return max(0.0, min(1.0, score))

def compute_source_priority(source_type: str) -> float:
    """Compute source priority score."""
    priorities = {
        "policy": 1.0,
        "official_docs": 0.9,
        "wiki": 0.7,
        "forum": 0.5,
        "blog": 0.6
    }
    return priorities.get(source_type, 0.5)

def rerank_with_freshness(
    results: list[SearchResult],
    relevance_weight: float = 0.6,
    freshness_weight: float = 0.3,
    source_weight: float = 0.1
) -> list[SearchResult]:
    """Rerank results considering relevance, freshness, and source."""
    
    for result in results:
        # Normalize relevance score (assume 0-1 range)
        relevance = result.relevance_score
        
        # Compute freshness
        freshness = compute_freshness_score(result.published_at)
        
        # Compute source priority
        source_priority = compute_source_priority(result.source_type)
        
        # Combined score
        combined_score = (
            relevance_weight * relevance +
            freshness_weight * freshness +
            source_weight * source_priority
        )
        
        result.combined_score = combined_score
    
    # Sort by combined score
    return sorted(results, key=lambda x: x.combined_score, reverse=True)

This reranking boosts fresh, relevant, authoritative sources. It demotes stale, low-priority sources.

Answering Rules That Reduce “RAG Hallucinations”

When context is weak, refuse. When claims need citations, require them. When quoting, keep it short.

Require Citations for Key Claims

For important claims, require citations. Don’t let the model make unsupported statements.

def generate_with_citations(
    query: str,
    context: list[str],
    require_citations: bool = True
) -> str:
    """Generate answer with required citations."""
    
    prompt = f"""Answer the following question using only the provided context.
    
Context:
{chr(10).join(f"[{i+1}] {ctx}" for i, ctx in enumerate(context))}

Question: {query}

{"IMPORTANT: You must cite sources using [1], [2], etc. for all factual claims." if require_citations else ""}

Answer:"""
    
    response = llm.generate(prompt)
    
    # Validate citations if required
    if require_citations:
        citations = extract_citations(response)
        if not citations:
            return "I don't have enough information in the provided sources to answer this question."
    
    return response

Citations make answers verifiable. They let users check sources. They reduce hallucinations.

Refuse When Context Is Weak (“I Don’t Have Enough in the Provided Sources”)

When context is weak, refuse. Don’t guess. Don’t hallucinate.

def should_refuse(context: list[str], min_relevance: float = 0.7) -> bool:
    """Decide if we should refuse to answer."""
    
    if not context:
        return True
    
    # Check if any context is highly relevant
    max_relevance = max(ctx.relevance_score for ctx in context)
    
    if max_relevance < min_relevance:
        return True
    
    return False

def safe_answer(query: str, context: list[str]) -> str:
    """Answer only if context is strong enough."""
    
    if should_refuse(context):
        return "I don't have enough information in the provided sources to answer this question accurately."
    
    return generate_with_citations(query, context)

Refusing is better than guessing. It’s honest. It builds trust.

Quote Small Snippets When Needed (Short, Not Walls of Text)

When quoting, keep it short. Quote the relevant part. Don’t quote entire paragraphs.

def extract_relevant_snippet(text: str, query: str, max_length: int = 200) -> str:
    """Extract short relevant snippet from text."""
    
    # Find sentences containing query terms
    sentences = text.split('.')
    query_terms = set(query.lower().split())
    
    relevant_sentences = []
    for sentence in sentences:
        sentence_lower = sentence.lower()
        if any(term in sentence_lower for term in query_terms):
            relevant_sentences.append(sentence.strip())
    
    # Combine until we hit max_length
    snippet = ""
    for sentence in relevant_sentences:
        if len(snippet) + len(sentence) > max_length:
            break
        snippet += sentence + ". "
    
    return snippet.strip()

# Example
text = "Our refund policy allows returns within 60 days of purchase. Customers must provide a receipt. Items must be in original condition. Refunds are processed within 5-7 business days."
query = "refund policy"
snippet = extract_relevant_snippet(text, query)
# "Our refund policy allows returns within 60 days of purchase."

Short quotes are readable. They focus on what matters. They don’t overwhelm.

Evaluation That Catches Staleness

Create a freshness test set. Include time-based questions. Measure citation accuracy and stale citation rate.

Create a “Freshness Test Set”

Build test cases that check freshness. Include questions that depend on recent updates.

FRESHNESS_TEST_CASES = [
    {
        "query": "What is our refund policy as of January 2026?",
        "expected_doc_id": "refund-policy-v2",
        "expected_answer_contains": ["60 days"],
        "should_not_contain": ["30 days"],
        "published_after": "2026-01-01"
    },
    {
        "query": "What are the current shipping rates?",
        "expected_doc_id": "shipping-rates-v3",
        "expected_answer_contains": ["$5.99"],
        "should_not_contain": ["$4.99"],
        "published_after": "2025-12-15"
    }
]

Test cases check that answers use fresh docs, not stale ones.

Include Time-Based Questions Like “As of This Month…”

Time-based questions test freshness explicitly.

def is_time_based_query(query: str) -> bool:
    """Check if query is time-sensitive."""
    time_indicators = [
        "as of", "current", "latest", "recent", "updated",
        "this month", "this year", "now", "today"
    ]
    query_lower = query.lower()
    return any(indicator in query_lower for indicator in time_indicators)

def handle_time_based_query(query: str, results: list[SearchResult]) -> list[SearchResult]:
    """Boost freshness for time-based queries."""
    if is_time_based_query(query):
        # Increase freshness weight
        return rerank_with_freshness(
            results,
            relevance_weight=0.4,
            freshness_weight=0.5,  # Higher freshness weight
            source_weight=0.1
        )
    return results

Time-based queries need fresher results. Boost freshness for them.

Metrics: Citation Accuracy, Stale Citation Rate, “Answer Changes When Docs Change” Sanity Checks

Track metrics that matter for freshness.

class FreshnessMetrics:
    def __init__(self):
        self.total_queries = 0
        self.correct_citations = 0
        self.stale_citations = 0
        self.answer_changes = 0
    
    def record_citation(self, doc_id: str, expected_doc_id: str, is_stale: bool):
        """Record citation accuracy."""
        self.total_queries += 1
        
        if doc_id == expected_doc_id:
            self.correct_citations += 1
        elif is_stale:
            self.stale_citations += 1
    
    def record_answer_change(self, old_answer: str, new_answer: str):
        """Record when answer changes after doc update."""
        if old_answer != new_answer:
            self.answer_changes += 1
    
    def get_report(self) -> dict:
        """Get metrics report."""
        return {
            "total_queries": self.total_queries,
            "citation_accuracy": self.correct_citations / self.total_queries if self.total_queries > 0 else 0,
            "stale_citation_rate": self.stale_citations / self.total_queries if self.total_queries > 0 else 0,
            "answer_change_rate": self.answer_changes / self.total_queries if self.total_queries > 0 else 0
        }

Track these metrics. Alert when stale citation rate is high. Alert when answers don’t change after doc updates.

Operational Checklist

Run re-ingestion regularly. Have a backfill strategy. Monitor retrieval drift and stale hits.

Re-Ingestion Cadence (Hourly/Daily)

Set up regular re-ingestion. Hourly for policy docs. Daily for product catalogs. Weekly for documentation.

REINGESTION_SCHEDULE = {
    "policy": "hourly",
    "product": "daily",
    "documentation": "weekly",
    "historical": "never"
}

def schedule_reingestion():
    """Schedule re-ingestion based on document type."""
    for doc_type, frequency in REINGESTION_SCHEDULE.items():
        if frequency == "hourly":
            schedule.every().hour.do(reingest_documents, doc_type)
        elif frequency == "daily":
            schedule.every().day.do(reingest_documents, doc_type)
        elif frequency == "weekly":
            schedule.every().week.do(reingest_documents, doc_type)

Automate re-ingestion. Don’t rely on manual runs.

Backfill Strategy

When you add freshness tracking, backfill metadata for existing docs.

def backfill_metadata():
    """Backfill metadata for existing documents."""
    all_docs = get_all_documents()
    
    for doc in all_docs:
        if not has_metadata(doc.id):
            # Infer published_at from file modification time
            published_at = get_file_mtime(doc.path)
            
            # Set version to 1 (assume current is v1)
            version = 1
            
            # Create metadata
            metadata = DocumentMetadata(
                doc_id=doc.id,
                content=doc.content,
                published_at=published_at,
                version=version
            )
            
            save_metadata(metadata)

Backfill lets you track freshness for existing content.

Monitoring: Retrieval Drift, Top Retrieved Doc IDs, Stale-Hit Alerts

Monitor what’s being retrieved. Alert on stale hits.

def monitor_retrieval(query: str, results: list[SearchResult]):
    """Monitor retrieval for freshness issues."""
    
    # Check for stale results
    stale_threshold = timedelta(days=30)
    now = datetime.now()
    
    stale_count = 0
    for result in results:
        age = now - result.published_at
        if age > stale_threshold:
            stale_count += 1
    
    # Alert if too many stale results
    if stale_count > len(results) * 0.3:  # More than 30% stale
        alert(
            f"High stale result rate for query: {query}",
            stale_count=stale_count,
            total_results=len(results)
        )
    
    # Log top retrieved doc IDs
    top_doc_ids = [r.doc_id for r in results[:5]]
    log_retrieval(query, top_doc_ids)

Monitoring catches issues early. Alerts let you fix problems before users notice.

Code Samples

The code repository includes three runnable examples:

  1. Incremental Ingestion: Crawl documents, compute content hashes, only re-embed when hash changes, write metadata
  2. Freshness-Aware Retrieval: Retrieve top N by relevance, compute freshness score, combine scores, return top K with citations
  3. Evaluation Harness: JSONL dataset of questions with expected source doc IDs, measure stale citation rate and top-k hit rate, print report

See the GitHub repository for complete, runnable code.

Summary

RAG systems fail in production when content changes. The answer looks good, but it’s wrong because the context is stale.

Fix this by:

  1. Using incremental indexing (only re-embed changed chunks)
  2. Tracking content hashes and last-modified timestamps
  3. Using tombstones for deleted content
  4. Versioning chunks with metadata
  5. Reranking with freshness scores
  6. Requiring citations for key claims
  7. Refusing when context is weak
  8. Evaluating with freshness test sets
  9. Monitoring stale citation rates

Freshness-aware retrieval keeps RAG accurate over time. It’s not just about relevance. It’s about relevance and freshness together.

Start with incremental indexing. Add freshness scoring. Monitor stale hits. Your users will notice the difference.

Discussion

Join the conversation and share your thoughts

Discussion

0 / 5000