Storage & Indexing - Theory and Practice
Understanding document storage patterns and indexing pipelines
The Document Storage Problemβ
Challenge: Where to Store What?β
When building a document search system, you have different types of data with different needs:
Data Type | Size | Access Pattern | Search Needs |
---|---|---|---|
Raw Document | 1-100MB | Rare (download only) | None |
Extracted Text | 10-1000KB | Never directly | Full-text search |
Document Chunks | 500-1000 chars | Frequent (search) | Vector + keyword |
Metadata | 1-10KB | Very frequent | Filtering, facets |
The Mistake: Using one storage system for everything.
The Solution: Match storage to data characteristics.
Storage Architecture Patternsβ
Pattern 1: Layered Storageβ
βββββββββββββββββββββββββββββββββββββββββββββββββββ
β Application Layer β
ββββββββββββββββ¬βββββββββββββββββββββββββββββββββββ
β
ββββββββββββββββ΄βββββββββββββββββββββββββββββββββββ
β Storage Layer (3 systems for 3 purposes) β
β β
β ββββββββββββ ββββββββββββ ββββββββββββ β
β β S3 β βOpenSearchβ β MongoDB β β
β β β β β β β β
β β Raw Docs β β Vectors β β Metadata β β
β β 100MB β β Chunks β β 10KB β β
β β Immutableβ β Searchableβ β Mutable β β
β ββββββββββββ ββββββββββββ ββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββ
Why Three Systems?
S3 for Raw Documents:
- Documents are large (1-100MB)
- Documents are immutable (never change)
- Access is rare (only for download)
- Need original format (PDF, not just text)
- Cost: $0.023/GB/month (cheap!)
OpenSearch for Searchable Content:
- Chunks are small (500-1000 chars)
- Need full-text search (BM25)
- Need vector search (k-NN)
- Frequent access (every search)
- Cost: $100-500/month (optimized for search)
MongoDB for Metadata:
- Small structured data (document info)
- Frequent updates (status, stats)
- Complex queries (filters, aggregations)
- Relationships (similar docs)
- Cost: $50-200/month (optimized for metadata)
Total Cost: $150-723/month for 1000s of documents
Single System: Would be expensive OR slow
Pattern 2: Write-Through Cacheβ
Concept: Write to multiple systems atomically.
def store_document(doc):
"""Store in all three systems."""
# 1. Write to S3 (source of truth)
s3_key = upload_to_s3(doc.file_bytes, doc.id)
# 2. Write to MongoDB (metadata)
mongodb.insert({
"doc_id": doc.id,
"s3_key": s3_key,
"filename": doc.filename,
"status": "indexing",
...
})
# 3. Write to OpenSearch (searchable)
chunks = chunk_document(doc.content)
for chunk in chunks:
opensearch.index({
"chunk_id": chunk.id,
"doc_id": doc.id,
"content": chunk.content,
"embedding": get_embedding(chunk.content)
})
# 4. Update status
mongodb.update({"doc_id": doc.id}, {"status": "indexed"})
Benefits:
- Consistency across systems
- Atomic operations
- Clear success/failure
Challenges:
- What if step 2 fails after step 1 succeeds?
- How to handle partial failures?
Solution: Idempotent operations + retry
def store_document_with_retry(doc):
"""Store with retry logic."""
# Step 1: S3 (idempotent - can retry safely)
s3_key = upload_to_s3_with_retry(doc)
# Step 2: MongoDB (idempotent - upsert)
mongodb.upsert({"doc_id": doc.id}, metadata)
# Step 3: OpenSearch (idempotent - index overwrites)
for chunk in chunks:
opensearch.index(chunk) # Same ID = overwrite
# If any step fails, just retry the whole function
# All operations are idempotent!
S3 Storage Deep Diveβ
Why S3 (or S3-Compatible)?β
S3 Durability: 99.999999999% (11 nines)
What This Means: If you store 10,000,000 files, you can expect to lose 1 file every 10,000 years.
How It Works:
- Data replicated across 3+ availability zones
- Checksum verification on every read/write
- Automatic error detection and healing
vs Local Storage:
- HDD failure rate: ~1-5% per year
- If you have 100 files, expect to lose 1-5 per year
- 11 nines is 100,000x more reliable
S3 Lifecycle Policiesβ
Concept: Automatically transition data between storage classes.
Document uploaded
β
Hot storage (S3 Standard) - $0.023/GB/month
β After 30 days
Warm storage (S3 IA) - $0.0125/GB/month
β After 90 days
Cold storage (Glacier) - $0.004/GB/month
β After 7 years (retention period)
Deleted
Cost Savings:
- Year 1: $0.023/GB (standard)
- Year 2-7: $0.004/GB (glacier)
- Average: ~$0.010/GB/month (56% savings!)
Implementation:
{
"Rules": [{
"Id": "Archive old documents",
"Status": "Enabled",
"Transitions": [
{
"Days": 30,
"StorageClass": "STANDARD_IA"
},
{
"Days": 90,
"StorageClass": "GLACIER"
}
],
"Expiration": {
"Days": 2555 // 7 years
}
}]
}
S3 Securityβ
Server-Side Encryption (SSE):
# Encrypt at rest automatically
s3_client.put_object(
Bucket='my-docs',
Key='sensitive.pdf',
Body=file_bytes,
ServerSideEncryption='AES256' # β Transparent encryption
)
# Reading is transparent too
file_bytes = s3_client.get_object(
Bucket='my-docs',
Key='sensitive.pdf'
)['Body'].read() # β Automatically decrypted
Access Control:
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::ACCOUNT:role/app-role"},
"Action": ["s3:GetObject", "s3:PutObject"],
"Resource": "arn:aws:s3:::my-docs/documents/*",
"Condition": {
"StringEquals": {"s3:x-amz-server-side-encryption": "AES256"}
}
}]
}
Key Points:
- Only specific role can access
- Must use encryption
- Limited to documents/ prefix
- Can add IP restrictions, MFA, etc.
Indexing Pipeline Theoryβ
What is an Indexing Pipeline?β
Definition: A series of transformations from raw document to searchable representation.
The Transformation:
Raw PDF (2MB) βββ Searchable Chunks
β
[Multiple Stages]
β
Magic Happens
β
Query: "revenue" β Results in 50ms
Without Pipeline: How does 2MB PDF become searchable in 50ms?
Answer: Pre-processing (indexing) does heavy work once, search is fast every time.
Index-Time vs Query-Time Tradeoffβ
Fundamental Tradeoff in Search:
βββββββββββββββββββββββββββββββββββββββ
β Index-Time (Do Once) β
β - Parse document β
β - Extract text β
β - Chunk into pieces β
β - Generate embeddings β
β - Build BM25 index β
β - Store in optimized format β
β Time: Seconds to minutes β
β Cost: Higher (GPU for embeddings) β
βββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββ
β Query-Time (Every Search) β
β - Embed query β
β - Search BM25 index β
β - Search vector index β
β - Fuse results β
β Time: Milliseconds β
β Cost: Lower (just lookups) β
βββββββββββββββββββββββββββββββββββββββ
Why This Works:
- Index once (expensive), search many times (cheap)
- Pre-computation amortizes cost
- Similar to: compile once, run many times
Chunking Theoryβ
Why Chunk Documents?
Problem: Documents are long, embeddings are fixed-size.
Document: 10,000 words
Embedding model: Max 512 tokens (~380 words)
Options:
-
Truncate (β Bad)
Only embed first 380 words
Lose 96% of document! -
Chunk (β Good)
Split into 27 chunks of 380 words
Embed each chunk separately
Search across all chunks
Chunking Strategies:
Fixed-Size (Simple):
chunks = [text[i:i+500] for i in range(0, len(text), 500)]
- Pro: Simple, predictable
- Con: May split mid-sentence
Semantic (Better):
# Split on paragraph boundaries, sentences, etc.
# Keep chunks between 400-600 words
# Preserve semantic units
- Pro: Preserves meaning
- Con: Slightly complex
Our Choice: Use existing SemanticChunker
(battle-tested)
Overlap Theoryβ
Concept: Chunks share some content.
Document: "A B C D E F G H I J"
Without overlap:
Chunk 1: [A B C D]
Chunk 2: [E F G H]
Chunk 3: [I J]
With 50% overlap:
Chunk 1: [A B C D]
Chunk 2: [ C D E F]
Chunk 3: [ E F G H]
Chunk 4: [ G H I J]
Why Overlap?
Problem: Important information spans chunk boundary.
Without overlap:
Chunk 1: "...The CEO announced"
Chunk 2: "record revenue of $1B..."
Query: "CEO revenue announcement"
Might miss! (no chunk has all three concepts)
With overlap:
Chunk 1: "...The CEO announced"
Chunk 2: "...The CEO announced record revenue of $1B..."
β Overlap preserves context!
Query matches Chunk 2 perfectly!
Tradeoff:
- More overlap = Better context, More chunks to store
- Less overlap = Less storage, May miss context
- Sweet spot: 20-25% overlap (100-150 words)
Implementation Detailsβ
S3 Integrationβ
See: packages/rag/document_search/s3_storage.py
(~200 lines)
Key Features:
- Upload/download documents
- Pre-signed URLs (time-limited access)
- Metadata management
- Lifecycle configuration
Indexing Pipelineβ
See: packages/rag/document_search/indexing.py
(~300 lines)
Stages:
- Load document (uses
document_loader.py
) - Upload to S3 (uses
s3_storage.py
) - Chunk document (uses
chunkers.py
- REUSED) - Generate embeddings (function passed in)
- Index in vector store (uses
stores.py
- REUSED)
Reuse Ratio: 60% of indexing logic is reused!
Performance Considerationsβ
Indexing Performanceβ
Bottlenecks:
-
Document Parsing: PDF parsing can be slow
- Optimization: Use pypdf (fast) not pdfminer (slow)
- Our choice: pypdf β
-
Embedding Generation: Calls to API or model inference
- Optimization: Batch embeddings (10-100 at once)
- Optimization: Cache embeddings for identical content
- Our approach: Batch when possible
-
Vector Store Insertion: Network + indexing overhead
- Optimization: Bulk insert API
- Our approach: Use existing
add_documents()
with bulk
Target Performance (per document):
- PDF parsing: < 5s for 100 pages
- Chunking: < 1s for 10,000 words
- Embedding: < 2s for 20 chunks (batched)
- Indexing: < 2s for 20 chunks (bulk)
- Total: < 10s for typical document
Search Performanceβ
Index Size Impact:
Documents | Chunks | Search Latency | Why |
---|---|---|---|
100 | 2,000 | ~50ms | Small, fits in RAM |
10,000 | 200,000 | ~100ms | Moderate, good indexes |
1,000,000 | 20M | ~200-500ms | Large, distributed |
Optimization Strategies:
-
Proper Indexing:
OpenSearch k-NN parameters:
- ef_construction: 512 (build quality)
- ef_search: 512 (search quality)
- m: 16 (connections per node) -
Sharding:
1M documents across 3 shards
= 333K per shard
= Faster parallel search -
Caching:
Hot queries cached in Redis
40-60% hit rate typical
5ms vs 500ms (100x faster)
Metadata Managementβ
What Metadata to Track?β
Document Level:
{
"document_id": "uuid",
"filename": "report.pdf",
"format": "pdf",
"size_bytes": 2048576,
"s3_key": "documents/2025/10/uuid/report.pdf",
"uploaded_at": "2025-10-09T12:00:00Z",
"uploaded_by": "user123",
"status": "indexed",
"content_metadata": {
"num_pages": 45,
"title": "Q3 Financial Report",
"author": "Finance Team",
"created_date": "2025-09-30"
},
"processing": {
"chunks_created": 67,
"processing_time_ms": 8500,
"indexed_at": "2025-10-09T12:00:10Z"
},
"search_stats": {
"times_retrieved": 0,
"last_accessed": null,
"avg_relevance": 0
}
}
Chunk Level (in OpenSearch):
{
"chunk_id": "uuid_chunk_0",
"document_id": "uuid",
"content": "Q3 revenue increased by 25%...",
"embedding": [0.23, -0.45, ...],
"metadata": {
"chunk_index": 0,
"page_number": 5,
"section": "Executive Summary",
"document_title": "Q3 Financial Report"
}
}
Why Split?:
- Document metadata changes rarely β MongoDB
- Chunks searched frequently β OpenSearch
- Clear separation of concerns
Indexing Pipeline Patternsβ
Pattern: Pipeline with Progress Trackingβ
Why: Users want to know "How much longer?"
class IndexingProgress:
"""Track indexing progress."""
stages = ["loading", "chunking", "embedding", "indexing"]
def __init__(self, doc_id):
self.doc_id = doc_id
self.current_stage = 0
self.total_stages = len(self.stages)
self.progress_pct = 0
def update(self, stage_name, pct_complete=None):
self.current_stage = self.stages.index(stage_name)
if pct_complete:
# Within-stage progress
stage_progress = (self.current_stage + pct_complete) / self.total_stages
self.progress_pct = stage_progress * 100
else:
# Stage complete
self.progress_pct = (self.current_stage + 1) / self.total_stages * 100
# Emit event for UI
emit_progress(self.doc_id, self.progress_pct, self.stages[self.current_stage])
# Usage
progress = IndexingProgress(doc.id)
progress.update("loading") # 25% complete
doc = load_document(file_path)
progress.update("chunking") # 50% complete
chunks = chunker.chunk(doc.content)
progress.update("embedding", 0.0) # 50% complete
for i, chunk in enumerate(chunks):
embedding = get_embedding(chunk)
progress.update("embedding", (i+1)/len(chunks)) # 50-75% complete
progress.update("indexing") # 75% complete
store.index(chunks)
progress.update("complete") # 100% complete
Benefits:
- Real-time progress for users
- Debugging (which stage failed?)
- UX improvement (no black box)
Pattern: Batch Processingβ
Concept: Process multiple items together for efficiency.
Why Batching Helps:
Without Batching:
# 100 chunks β 100 API calls
for chunk in chunks: # 100 iterations
embedding = call_api(chunk.content) # 100 network round-trips
# Latency: 100 Γ 50ms = 5000ms
With Batching:
# 100 chunks β 1 API call
embeddings = call_api_batch([c.content for c in chunks]) # 1 network round-trip
# Latency: 1 Γ 200ms = 200ms (25x faster!)
Batch Size Tradeoffs:
Batch Size | Latency | Memory | Failure Impact |
---|---|---|---|
1 | High (many calls) | Low | Small (1 chunk) |
10 | Medium | Medium | Medium (10 chunks) |
100 | Low (1 call) | High | Large (100 chunks) |
Sweet Spot: 10-50 items per batch
Implementation in Our Pipeline:
# Batch embeddings
indexed_chunks = []
batch_size = 20
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i+batch_size]
# Single API call for batch
batch_contents = [c.content for c in batch]
batch_embeddings = embedding_function_batch(batch_contents)
# Combine
for chunk, embedding in zip(batch, batch_embeddings):
indexed_chunks.append({
'chunk_id': chunk.chunk_id,
'content': chunk.content,
'embedding': embedding,
'metadata': chunk.metadata
})
# Bulk insert to vector store (REUSED method)
store.add_documents(indexed_chunks) # Single bulk operation
Reusing Existing Components - Deep Diveβ
Integration Point 1: Chunkerβ
Existing Component: packages/rag/chunkers.py
What It Provides:
class SemanticChunker:
"""Chunks by semantic boundaries."""
def chunk(self, content: str, metadata: Dict) -> List[Chunk]:
# Returns Chunk objects with:
# - content (str)
# - metadata (dict)
# - chunk_id (str)
# - source (str)
# - start_char, end_char (int)
How We Use It:
# In our indexing pipeline
from packages.rag.chunkers import SemanticChunker
chunker = SemanticChunker(chunk_size=500, chunk_overlap=100)
chunks = chunker.chunk(doc.content, doc.metadata)
# We get Chunk objects ready for embedding!
# No need to implement chunking logic
What We Reuse:
- Semantic boundary detection
- Overlap management
- Metadata preservation
- Chunk ID generation
What We Add: None (just use it)
Integration Point 2: VectorStoreβ
Existing Component: packages/rag/stores.py
Interface:
class VectorStore(ABC):
@abstractmethod
def add_documents(self, documents: List[VectorDocument]) -> bool:
pass
@abstractmethod
def search(self, query_embedding: List[float], k: int) -> List[Dict]:
pass
@abstractmethod
def delete_documents(self, document_ids: List[str]) -> bool:
pass
Implementations Available:
OpenSearchStore
- k-NN + BM25MongoDBStore
- Atlas Vector SearchAzureAISearchStore
- Azure AI Search
How We Use It:
# In our indexing pipeline
from packages.rag.stores import VectorDocument
# Convert to VectorDocument format
vector_docs = [
VectorDocument(
id=chunk['chunk_id'],
content=chunk['content'],
embedding=chunk['embedding'],
metadata=chunk['metadata']
)
for chunk in indexed_chunks
]
# Use existing bulk insert
success = store.add_documents(vector_docs) # β Existing method!
What We Reuse:
- Bulk insertion logic
- Connection pooling
- Error handling
- Retry mechanisms
What We Add: Orchestration (calling it correctly)
Integration Point 3: HybridRetrieverβ
Existing Component: packages/rag/retrievers.py
What It Does:
class HybridRetriever:
"""BM25 + Vector with RRF fusion."""
def __init__(self, vector_retriever, bm25_retriever, alpha=0.5):
self.vector_retriever = vector_retriever
self.bm25_retriever = bm25_retriever
self.alpha = alpha
def retrieve(self, query: str, k: int) -> List[RetrievalResult]:
# 1. BM25 search
bm25_results = self.bm25_retriever.retrieve(query, k)
# 2. Vector search
vector_results = self.vector_retriever.retrieve(query, k)
# 3. RRF fusion
fused = self._reciprocal_rank_fusion(bm25_results, vector_results)
return fused[:k]
How We Use It:
# In our pipeline factory
vector_retriever = VectorRetriever(vector_store)
bm25_retriever = BM25Retriever(k1=1.2, b=0.75)
bm25_retriever.add_chunks(chunks)
hybrid = HybridRetriever(
vector_retriever=vector_retriever,
bm25_retriever=bm25_retriever,
alpha=0.5 # Profile-specific
)
# Our pipeline just orchestrates
pipeline = DocumentSearchPipeline(
retriever=hybrid, # β Existing component
...
)
What We Reuse (ALL of it):
- BM25 implementation
- Vector search logic
- RRF fusion algorithm
- Score normalization
- Result deduplication
What We Add: Profile-based alpha configuration
Real-World Example: Knowledge Base Indexingβ
Scenarioβ
Customer support team has 500 KB articles (PDF format).
Requirementsβ
- Index all 500 articles
- Full-text search across all
- Semantic search for concepts
- < 1 hour total indexing time
- < 100ms search latency
Implementationβ
# 1. Initialize (using existing components)
from packages.rag.stores import OpenSearchStore
from packages.rag.document_search import (
DocumentIndexingPipeline,
S3DocumentStorage,
create_s3_storage
)
# Existing vector store
store = OpenSearchStore(
host="opensearch.prod.company.com",
index_name="kb_articles"
)
# NEW S3 storage
s3_storage = create_s3_storage(
bucket_name="company-kb-articles",
region="us-west-2"
)
# NEW indexing pipeline (orchestrates existing components)
pipeline = DocumentIndexingPipeline(
store=store, # β REUSED
embedding_function=embed_fn, # β REUSED
chunker=None, # β Will use existing SemanticChunker
s3_storage=s3_storage # β NEW
)
# 2. Index all articles
import glob
pdf_files = glob.glob("kb_articles/*.pdf")
results = pipeline.batch_index_documents(pdf_files)
# 3. Check results
stats = pipeline.get_indexing_stats(results)
print(f"Indexed: {stats['successful']}/{stats['total_documents']}")
print(f"Chunks: {stats['total_chunks_indexed']}")
print(f"Avg time: {stats['avg_processing_time_ms']:.2f}ms")
# Expected output:
# Indexed: 500/500
# Chunks: 8,435
# Avg time: 6,200ms
# Total time: ~52 minutes (well under 1 hour!)
Resultsβ
- β All 500 articles indexed
- β 8,435 searchable chunks created
- β Search latency: 85ms average (< 100ms target)
- β Storage cost: $0.50/month (500 Γ 2MB in S3)
Summaryβ
Key Conceptsβ
Storage Patterns:
- Use S3 for raw documents (cheap, durable)
- Use Vector Store for search (fast, optimized)
- Use MongoDB for metadata (flexible, queryable)
Indexing Theory:
- Index once (expensive), search many (cheap)
- Chunking preserves semantic units
- Overlap prevents context loss
- Batching improves efficiency
Reuse Strategy:
- Survey before building (80% exists)
- Compose existing components
- Add only unique functionality (grounded summarization)
- Benefit from existing improvements
Implementation Files:
- S3 Storage:
packages/rag/document_search/s3_storage.py
- Indexing:
packages/rag/document_search/indexing.py
- Reused Components:
packages/rag/{stores,chunkers,retrievers}.py
Next: Grounded Summarization - Our unique contribution