Skip to main content

Storage & Indexing - Theory and Practice

Understanding document storage patterns and indexing pipelines


The Document Storage Problem

Challenge: Where to Store What?

When building a document search system, you have different types of data with different needs:

Data TypeSizeAccess PatternSearch Needs
Raw Document1-100MBRare (download only)None
Extracted Text10-1000KBNever directlyFull-text search
Document Chunks500-1000 charsFrequent (search)Vector + keyword
Metadata1-10KBVery frequentFiltering, facets

The Mistake: Using one storage system for everything.

The Solution: Match storage to data characteristics.


Storage Architecture Patterns

Pattern 1: Layered Storage

┌─────────────────────────────────────────────────┐
│ Application Layer │
└──────────────┬──────────────────────────────────┘

┌──────────────┴──────────────────────────────────┐
│ Storage Layer (3 systems for 3 purposes) │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ S3 │ │OpenSearch│ │ MongoDB │ │
│ │ │ │ │ │ │ │
│ │ Raw Docs │ │ Vectors │ │ Metadata │ │
│ │ 100MB │ │ Chunks │ │ 10KB │ │
│ │ Immutable│ │ Searchable│ │ Mutable │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────┘

Why Three Systems?

S3 for Raw Documents:

  • Documents are large (1-100MB)
  • Documents are immutable (never change)
  • Access is rare (only for download)
  • Need original format (PDF, not just text)
  • Cost: $0.023/GB/month (cheap!)

OpenSearch for Searchable Content:

  • Chunks are small (500-1000 chars)
  • Need full-text search (BM25)
  • Need vector search (k-NN)
  • Frequent access (every search)
  • Cost: $100-500/month (optimized for search)

MongoDB for Metadata:

  • Small structured data (document info)
  • Frequent updates (status, stats)
  • Complex queries (filters, aggregations)
  • Relationships (similar docs)
  • Cost: $50-200/month (optimized for metadata)

Total Cost: $150-723/month for 1000s of documents
Single System: Would be expensive OR slow

Pattern 2: Write-Through Cache

Concept: Write to multiple systems atomically.

def store_document(doc):
"""Store in all three systems."""

# 1. Write to S3 (source of truth)
s3_key = upload_to_s3(doc.file_bytes, doc.id)

# 2. Write to MongoDB (metadata)
mongodb.insert({
"doc_id": doc.id,
"s3_key": s3_key,
"filename": doc.filename,
"status": "indexing",
...
})

# 3. Write to OpenSearch (searchable)
chunks = chunk_document(doc.content)
for chunk in chunks:
opensearch.index({
"chunk_id": chunk.id,
"doc_id": doc.id,
"content": chunk.content,
"embedding": get_embedding(chunk.content)
})

# 4. Update status
mongodb.update({"doc_id": doc.id}, {"status": "indexed"})

Benefits:

  • Consistency across systems
  • Atomic operations
  • Clear success/failure

Challenges:

  • What if step 2 fails after step 1 succeeds?
  • How to handle partial failures?

Solution: Idempotent operations + retry

def store_document_with_retry(doc):
"""Store with retry logic."""

# Step 1: S3 (idempotent - can retry safely)
s3_key = upload_to_s3_with_retry(doc)

# Step 2: MongoDB (idempotent - upsert)
mongodb.upsert({"doc_id": doc.id}, metadata)

# Step 3: OpenSearch (idempotent - index overwrites)
for chunk in chunks:
opensearch.index(chunk) # Same ID = overwrite

# If any step fails, just retry the whole function
# All operations are idempotent!

S3 Storage Deep Dive

Why S3 (or S3-Compatible)?

S3 Durability: 99.999999999% (11 nines)

What This Means: If you store 10,000,000 files, you can expect to lose 1 file every 10,000 years.

How It Works:

  • Data replicated across 3+ availability zones
  • Checksum verification on every read/write
  • Automatic error detection and healing

vs Local Storage:

  • HDD failure rate: ~1-5% per year
  • If you have 100 files, expect to lose 1-5 per year
  • 11 nines is 100,000x more reliable

S3 Lifecycle Policies

Concept: Automatically transition data between storage classes.

Document uploaded

Hot storage (S3 Standard) - $0.023/GB/month
↓ After 30 days
Warm storage (S3 IA) - $0.0125/GB/month
↓ After 90 days
Cold storage (Glacier) - $0.004/GB/month
↓ After 7 years (retention period)
Deleted

Cost Savings:

  • Year 1: $0.023/GB (standard)
  • Year 2-7: $0.004/GB (glacier)
  • Average: ~$0.010/GB/month (56% savings!)

Implementation:

{
"Rules": [{
"Id": "Archive old documents",
"Status": "Enabled",
"Transitions": [
{
"Days": 30,
"StorageClass": "STANDARD_IA"
},
{
"Days": 90,
"StorageClass": "GLACIER"
}
],
"Expiration": {
"Days": 2555 // 7 years
}
}]
}

S3 Security

Server-Side Encryption (SSE):

# Encrypt at rest automatically
s3_client.put_object(
Bucket='my-docs',
Key='sensitive.pdf',
Body=file_bytes,
ServerSideEncryption='AES256' # ← Transparent encryption
)

# Reading is transparent too
file_bytes = s3_client.get_object(
Bucket='my-docs',
Key='sensitive.pdf'
)['Body'].read() # ← Automatically decrypted

Access Control:

{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::ACCOUNT:role/app-role"},
"Action": ["s3:GetObject", "s3:PutObject"],
"Resource": "arn:aws:s3:::my-docs/documents/*",
"Condition": {
"StringEquals": {"s3:x-amz-server-side-encryption": "AES256"}
}
}]
}

Key Points:

  • Only specific role can access
  • Must use encryption
  • Limited to documents/ prefix
  • Can add IP restrictions, MFA, etc.

Indexing Pipeline Theory

What is an Indexing Pipeline?

Definition: A series of transformations from raw document to searchable representation.

The Transformation:

Raw PDF (2MB) →→→ Searchable Chunks

[Multiple Stages]

Magic Happens

Query: "revenue" → Results in 50ms

Without Pipeline: How does 2MB PDF become searchable in 50ms?

Answer: Pre-processing (indexing) does heavy work once, search is fast every time.

Index-Time vs Query-Time Tradeoff

Fundamental Tradeoff in Search:

┌─────────────────────────────────────┐
│ Index-Time (Do Once) │
│ - Parse document │
│ - Extract text │
│ - Chunk into pieces │
│ - Generate embeddings │
│ - Build BM25 index │
│ - Store in optimized format │
│ Time: Seconds to minutes │
│ Cost: Higher (GPU for embeddings) │
└─────────────────────────────────────┘

┌─────────────────────────────────────┐
│ Query-Time (Every Search) │
│ - Embed query │
│ - Search BM25 index │
│ - Search vector index │
│ - Fuse results │
│ Time: Milliseconds │
│ Cost: Lower (just lookups) │
└─────────────────────────────────────┘

Why This Works:

  • Index once (expensive), search many times (cheap)
  • Pre-computation amortizes cost
  • Similar to: compile once, run many times

Chunking Theory

Why Chunk Documents?

Problem: Documents are long, embeddings are fixed-size.

Document: 10,000 words
Embedding model: Max 512 tokens (~380 words)

Options:

  1. Truncate (❌ Bad)

    Only embed first 380 words
    Lose 96% of document!
  2. Chunk (✅ Good)

    Split into 27 chunks of 380 words
    Embed each chunk separately
    Search across all chunks

Chunking Strategies:

Fixed-Size (Simple):

chunks = [text[i:i+500] for i in range(0, len(text), 500)]
  • Pro: Simple, predictable
  • Con: May split mid-sentence

Semantic (Better):

# Split on paragraph boundaries, sentences, etc.
# Keep chunks between 400-600 words
# Preserve semantic units
  • Pro: Preserves meaning
  • Con: Slightly complex

Our Choice: Use existing SemanticChunker (battle-tested)

Overlap Theory

Concept: Chunks share some content.

Document: "A B C D E F G H I J"

Without overlap:
Chunk 1: [A B C D]
Chunk 2: [E F G H]
Chunk 3: [I J]

With 50% overlap:
Chunk 1: [A B C D]
Chunk 2: [ C D E F]
Chunk 3: [ E F G H]
Chunk 4: [ G H I J]

Why Overlap?

Problem: Important information spans chunk boundary.

Without overlap:
Chunk 1: "...The CEO announced"
Chunk 2: "record revenue of $1B..."

Query: "CEO revenue announcement"
Might miss! (no chunk has all three concepts)

With overlap:
Chunk 1: "...The CEO announced"
Chunk 2: "...The CEO announced record revenue of $1B..."
← Overlap preserves context!

Query matches Chunk 2 perfectly!

Tradeoff:

  • More overlap = Better context, More chunks to store
  • Less overlap = Less storage, May miss context
  • Sweet spot: 20-25% overlap (100-150 words)

Implementation Details

S3 Integration

See: packages/rag/document_search/s3_storage.py (~200 lines)

Key Features:

  • Upload/download documents
  • Pre-signed URLs (time-limited access)
  • Metadata management
  • Lifecycle configuration

Indexing Pipeline

See: packages/rag/document_search/indexing.py (~300 lines)

Stages:

  1. Load document (uses document_loader.py)
  2. Upload to S3 (uses s3_storage.py)
  3. Chunk document (uses chunkers.py - REUSED)
  4. Generate embeddings (function passed in)
  5. Index in vector store (uses stores.py - REUSED)

Reuse Ratio: 60% of indexing logic is reused!


Performance Considerations

Indexing Performance

Bottlenecks:

  1. Document Parsing: PDF parsing can be slow

    • Optimization: Use pypdf (fast) not pdfminer (slow)
    • Our choice: pypdf ✅
  2. Embedding Generation: Calls to API or model inference

    • Optimization: Batch embeddings (10-100 at once)
    • Optimization: Cache embeddings for identical content
    • Our approach: Batch when possible
  3. Vector Store Insertion: Network + indexing overhead

    • Optimization: Bulk insert API
    • Our approach: Use existing add_documents() with bulk

Target Performance (per document):

  • PDF parsing: < 5s for 100 pages
  • Chunking: < 1s for 10,000 words
  • Embedding: < 2s for 20 chunks (batched)
  • Indexing: < 2s for 20 chunks (bulk)
  • Total: < 10s for typical document

Search Performance

Index Size Impact:

DocumentsChunksSearch LatencyWhy
1002,000~50msSmall, fits in RAM
10,000200,000~100msModerate, good indexes
1,000,00020M~200-500msLarge, distributed

Optimization Strategies:

  1. Proper Indexing:

    OpenSearch k-NN parameters:
    - ef_construction: 512 (build quality)
    - ef_search: 512 (search quality)
    - m: 16 (connections per node)
  2. Sharding:

    1M documents across 3 shards
    = 333K per shard
    = Faster parallel search
  3. Caching:

    Hot queries cached in Redis
    40-60% hit rate typical
    5ms vs 500ms (100x faster)

Metadata Management

What Metadata to Track?

Document Level:

{
"document_id": "uuid",
"filename": "report.pdf",
"format": "pdf",
"size_bytes": 2048576,
"s3_key": "documents/2025/10/uuid/report.pdf",
"uploaded_at": "2025-10-09T12:00:00Z",
"uploaded_by": "user123",
"status": "indexed",

"content_metadata": {
"num_pages": 45,
"title": "Q3 Financial Report",
"author": "Finance Team",
"created_date": "2025-09-30"
},

"processing": {
"chunks_created": 67,
"processing_time_ms": 8500,
"indexed_at": "2025-10-09T12:00:10Z"
},

"search_stats": {
"times_retrieved": 0,
"last_accessed": null,
"avg_relevance": 0
}
}

Chunk Level (in OpenSearch):

{
"chunk_id": "uuid_chunk_0",
"document_id": "uuid",
"content": "Q3 revenue increased by 25%...",
"embedding": [0.23, -0.45, ...],

"metadata": {
"chunk_index": 0,
"page_number": 5,
"section": "Executive Summary",
"document_title": "Q3 Financial Report"
}
}

Why Split?:

  • Document metadata changes rarely → MongoDB
  • Chunks searched frequently → OpenSearch
  • Clear separation of concerns

Indexing Pipeline Patterns

Pattern: Pipeline with Progress Tracking

Why: Users want to know "How much longer?"

class IndexingProgress:
"""Track indexing progress."""

stages = ["loading", "chunking", "embedding", "indexing"]

def __init__(self, doc_id):
self.doc_id = doc_id
self.current_stage = 0
self.total_stages = len(self.stages)
self.progress_pct = 0

def update(self, stage_name, pct_complete=None):
self.current_stage = self.stages.index(stage_name)
if pct_complete:
# Within-stage progress
stage_progress = (self.current_stage + pct_complete) / self.total_stages
self.progress_pct = stage_progress * 100
else:
# Stage complete
self.progress_pct = (self.current_stage + 1) / self.total_stages * 100

# Emit event for UI
emit_progress(self.doc_id, self.progress_pct, self.stages[self.current_stage])

# Usage
progress = IndexingProgress(doc.id)

progress.update("loading") # 25% complete
doc = load_document(file_path)

progress.update("chunking") # 50% complete
chunks = chunker.chunk(doc.content)

progress.update("embedding", 0.0) # 50% complete
for i, chunk in enumerate(chunks):
embedding = get_embedding(chunk)
progress.update("embedding", (i+1)/len(chunks)) # 50-75% complete

progress.update("indexing") # 75% complete
store.index(chunks)

progress.update("complete") # 100% complete

Benefits:

  • Real-time progress for users
  • Debugging (which stage failed?)
  • UX improvement (no black box)

Pattern: Batch Processing

Concept: Process multiple items together for efficiency.

Why Batching Helps:

Without Batching:

# 100 chunks → 100 API calls
for chunk in chunks: # 100 iterations
embedding = call_api(chunk.content) # 100 network round-trips
# Latency: 100 × 50ms = 5000ms

With Batching:

# 100 chunks → 1 API call
embeddings = call_api_batch([c.content for c in chunks]) # 1 network round-trip
# Latency: 1 × 200ms = 200ms (25x faster!)

Batch Size Tradeoffs:

Batch SizeLatencyMemoryFailure Impact
1High (many calls)LowSmall (1 chunk)
10MediumMediumMedium (10 chunks)
100Low (1 call)HighLarge (100 chunks)

Sweet Spot: 10-50 items per batch

Implementation in Our Pipeline:

# Batch embeddings
indexed_chunks = []
batch_size = 20

for i in range(0, len(chunks), batch_size):
batch = chunks[i:i+batch_size]

# Single API call for batch
batch_contents = [c.content for c in batch]
batch_embeddings = embedding_function_batch(batch_contents)

# Combine
for chunk, embedding in zip(batch, batch_embeddings):
indexed_chunks.append({
'chunk_id': chunk.chunk_id,
'content': chunk.content,
'embedding': embedding,
'metadata': chunk.metadata
})

# Bulk insert to vector store (REUSED method)
store.add_documents(indexed_chunks) # Single bulk operation

Reusing Existing Components - Deep Dive

Integration Point 1: Chunker

Existing Component: packages/rag/chunkers.py

What It Provides:

class SemanticChunker:
"""Chunks by semantic boundaries."""

def chunk(self, content: str, metadata: Dict) -> List[Chunk]:
# Returns Chunk objects with:
# - content (str)
# - metadata (dict)
# - chunk_id (str)
# - source (str)
# - start_char, end_char (int)

How We Use It:

# In our indexing pipeline
from packages.rag.chunkers import SemanticChunker

chunker = SemanticChunker(chunk_size=500, chunk_overlap=100)
chunks = chunker.chunk(doc.content, doc.metadata)

# We get Chunk objects ready for embedding!
# No need to implement chunking logic

What We Reuse:

  • Semantic boundary detection
  • Overlap management
  • Metadata preservation
  • Chunk ID generation

What We Add: None (just use it)

Integration Point 2: VectorStore

Existing Component: packages/rag/stores.py

Interface:

class VectorStore(ABC):
@abstractmethod
def add_documents(self, documents: List[VectorDocument]) -> bool:
pass

@abstractmethod
def search(self, query_embedding: List[float], k: int) -> List[Dict]:
pass

@abstractmethod
def delete_documents(self, document_ids: List[str]) -> bool:
pass

Implementations Available:

  • OpenSearchStore - k-NN + BM25
  • MongoDBStore - Atlas Vector Search
  • AzureAISearchStore - Azure AI Search

How We Use It:

# In our indexing pipeline
from packages.rag.stores import VectorDocument

# Convert to VectorDocument format
vector_docs = [
VectorDocument(
id=chunk['chunk_id'],
content=chunk['content'],
embedding=chunk['embedding'],
metadata=chunk['metadata']
)
for chunk in indexed_chunks
]

# Use existing bulk insert
success = store.add_documents(vector_docs) # ← Existing method!

What We Reuse:

  • Bulk insertion logic
  • Connection pooling
  • Error handling
  • Retry mechanisms

What We Add: Orchestration (calling it correctly)

Integration Point 3: HybridRetriever

Existing Component: packages/rag/retrievers.py

What It Does:

class HybridRetriever:
"""BM25 + Vector with RRF fusion."""

def __init__(self, vector_retriever, bm25_retriever, alpha=0.5):
self.vector_retriever = vector_retriever
self.bm25_retriever = bm25_retriever
self.alpha = alpha

def retrieve(self, query: str, k: int) -> List[RetrievalResult]:
# 1. BM25 search
bm25_results = self.bm25_retriever.retrieve(query, k)

# 2. Vector search
vector_results = self.vector_retriever.retrieve(query, k)

# 3. RRF fusion
fused = self._reciprocal_rank_fusion(bm25_results, vector_results)

return fused[:k]

How We Use It:

# In our pipeline factory
vector_retriever = VectorRetriever(vector_store)
bm25_retriever = BM25Retriever(k1=1.2, b=0.75)
bm25_retriever.add_chunks(chunks)

hybrid = HybridRetriever(
vector_retriever=vector_retriever,
bm25_retriever=bm25_retriever,
alpha=0.5 # Profile-specific
)

# Our pipeline just orchestrates
pipeline = DocumentSearchPipeline(
retriever=hybrid, # ← Existing component
...
)

What We Reuse (ALL of it):

  • BM25 implementation
  • Vector search logic
  • RRF fusion algorithm
  • Score normalization
  • Result deduplication

What We Add: Profile-based alpha configuration


Real-World Example: Knowledge Base Indexing

Scenario

Customer support team has 500 KB articles (PDF format).

Requirements

  • Index all 500 articles
  • Full-text search across all
  • Semantic search for concepts
  • < 1 hour total indexing time
  • < 100ms search latency

Implementation

# 1. Initialize (using existing components)
from packages.rag.stores import OpenSearchStore
from packages.rag.document_search import (
DocumentIndexingPipeline,
S3DocumentStorage,
create_s3_storage
)

# Existing vector store
store = OpenSearchStore(
host="opensearch.prod.company.com",
index_name="kb_articles"
)

# NEW S3 storage
s3_storage = create_s3_storage(
bucket_name="company-kb-articles",
region="us-west-2"
)

# NEW indexing pipeline (orchestrates existing components)
pipeline = DocumentIndexingPipeline(
store=store, # ← REUSED
embedding_function=embed_fn, # ← REUSED
chunker=None, # ← Will use existing SemanticChunker
s3_storage=s3_storage # ← NEW
)

# 2. Index all articles
import glob

pdf_files = glob.glob("kb_articles/*.pdf")
results = pipeline.batch_index_documents(pdf_files)

# 3. Check results
stats = pipeline.get_indexing_stats(results)
print(f"Indexed: {stats['successful']}/{stats['total_documents']}")
print(f"Chunks: {stats['total_chunks_indexed']}")
print(f"Avg time: {stats['avg_processing_time_ms']:.2f}ms")

# Expected output:
# Indexed: 500/500
# Chunks: 8,435
# Avg time: 6,200ms
# Total time: ~52 minutes (well under 1 hour!)

Results

  • ✅ All 500 articles indexed
  • ✅ 8,435 searchable chunks created
  • ✅ Search latency: 85ms average (< 100ms target)
  • ✅ Storage cost: $0.50/month (500 × 2MB in S3)

Summary

Key Concepts

Storage Patterns:

  • Use S3 for raw documents (cheap, durable)
  • Use Vector Store for search (fast, optimized)
  • Use MongoDB for metadata (flexible, queryable)

Indexing Theory:

  • Index once (expensive), search many (cheap)
  • Chunking preserves semantic units
  • Overlap prevents context loss
  • Batching improves efficiency

Reuse Strategy:

  • Survey before building (80% exists)
  • Compose existing components
  • Add only unique functionality (grounded summarization)
  • Benefit from existing improvements

Implementation Files:

  • S3 Storage: packages/rag/document_search/s3_storage.py
  • Indexing: packages/rag/document_search/indexing.py
  • Reused Components: packages/rag/{stores,chunkers,retrievers}.py

Next: Grounded Summarization - Our unique contribution