Deduplication
This example demonstrates the complete deduplication workflow including document processing, duplicate detection using multiple algorithms, manual review workflows, impact analysis, and monitoring.
Overviewā
The deduplication system provides:
- Multiple Detection Algorithms - MinHash, SimHash, and semantic similarity
- Hierarchical Processing - Document, chunk, and sentence-level deduplication
- Manual Review Workflows - Human-in-the-loop validation
- Impact Analysis - Measure deduplication effectiveness
- Monitoring Dashboard - Real-time deduplication metrics
- Optimization Tools - Performance tuning and configuration
Prerequisitesā
- RecoAgent installed and configured
- Deduplication packages available
- Sample documents for testing
- Python 3.8+ with required dependencies
Basic Setupā
1. Initialize Deduplication Systemā
from packages.rag.deduplication import (
DeduplicationConfig, DocumentDeduplicator, HierarchicalDeduplicator,
SimilarityAlgorithm, DeduplicationAction
)
from packages.rag.deduplication_review import (
ReviewWorkflowManager, ReviewerProfile, ReviewDecision, ReviewPriority
)
from packages.rag.deduplication_analytics import ImpactAnalyzer
from packages.rag.deduplication_tools import DeduplicationAnalyzer, ManualReviewInterface
from packages.rag.deduplication_monitoring import DeduplicationMonitoringDashboard
class DeduplicationExample:
"""Example implementation of the deduplication system."""
def __init__(self):
# Configure deduplication system
self.config = DeduplicationConfig(
# Similarity thresholds
exact_match_threshold=1.0,
near_duplicate_threshold=0.85,
semantic_similarity_threshold=0.75,
manual_review_threshold=0.65,
# Algorithm settings
minhash_num_permutations=128,
minhash_band_size=8,
simhash_hash_bits=64,
semantic_model_name="all-MiniLM-L6-v2",
# Processing settings
batch_size=100,
max_candidates_per_doc=50,
enable_parallel_processing=True
)
# Initialize deduplication components
self.deduplicator = DocumentDeduplicator(self.config)
self.hierarchical_deduplicator = HierarchicalDeduplicator(self.config)
self.review_manager = ReviewWorkflowManager()
self.impact_analyzer = ImpactAnalyzer()
self.monitoring_dashboard = DeduplicationMonitoringDashboard()
2. Load Sample Documentsā
def load_sample_documents(self):
"""Load sample documents for deduplication testing."""
print("š Loading sample documents...")
sample_documents = [
{
"id": "doc_1",
"content": "RecoAgent is an enterprise RAG platform built with LangGraph and LangChain.",
"source": "introduction.txt",
"metadata": {"category": "overview", "author": "team"}
},
{
"id": "doc_2",
"content": "RecoAgent is an enterprise RAG platform built with LangGraph and LangChain. It provides hybrid retrieval capabilities.",
"source": "features.txt",
"metadata": {"category": "features", "author": "team"}
},
{
"id": "doc_3",
"content": "The RecoAgent platform combines retrieval-augmented generation with agent orchestration.",
"source": "architecture.txt",
"metadata": {"category": "architecture", "author": "team"}
},
{
"id": "doc_4",
"content": "RecoAgent is an enterprise RAG platform built with LangGraph and LangChain.",
"source": "duplicate_intro.txt",
"metadata": {"category": "overview", "author": "team"}
}
]
print(f"ā
Loaded {len(sample_documents)} sample documents")
return sample_documents
Document Processingā
Fingerprint Generationā
async def demonstrate_fingerprinting(self, documents):
"""Demonstrate document fingerprinting capabilities."""
print("\nš Document Fingerprinting")
print("=" * 40)
for doc in documents:
print(f"\nDocument: {doc['id']}")
print(f"Content: {doc['content'][:50]}...")
# Generate fingerprints using different algorithms
fingerprints = await self.deduplicator.generate_fingerprints(doc)
print(f"MinHash Fingerprint: {fingerprints.minhash[:20]}...")
print(f"SimHash Fingerprint: {fingerprints.simhash}")
print(f"Semantic Embedding: {fingerprints.semantic_embedding[:10]}...")
print(f"Content Hash: {fingerprints.content_hash}")
return fingerprints
Batch Processingā
async def process_document_batch(self, documents):
"""Process a batch of documents for deduplication."""
print("\nš¦ Batch Processing")
print("=" * 40)
# Process documents in batches
batch_size = self.config.batch_size
processed_docs = []
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
print(f"Processing batch {i//batch_size + 1}: {len(batch)} documents")
# Process batch
batch_results = await self.deduplicator.process_batch(batch)
processed_docs.extend(batch_results)
print(f"ā
Processed {len(batch_results)} documents")
return processed_docs
Duplicate Detectionā
Multi-Algorithm Detectionā
async def detect_duplicates(self, documents):
"""Detect duplicates using multiple algorithms."""
print("\nš Duplicate Detection")
print("=" * 40)
# Detect exact matches
exact_matches = await self.deduplicator.detect_exact_matches(documents)
print(f"Exact matches found: {len(exact_matches)}")
# Detect near duplicates using MinHash
near_duplicates = await self.deduplicator.detect_near_duplicates(
documents,
algorithm=SimilarityAlgorithm.MINHASH,
threshold=self.config.near_duplicate_threshold
)
print(f"Near duplicates (MinHash): {len(near_duplicates)}")
# Detect semantic duplicates
semantic_duplicates = await self.deduplicator.detect_semantic_duplicates(
documents,
threshold=self.config.semantic_similarity_threshold
)
print(f"Semantic duplicates: {len(semantic_duplicates)}")
# Combine results
all_duplicates = self.deduplicator.combine_duplicate_results([
exact_matches,
near_duplicates,
semantic_duplicates
])
print(f"Total duplicate groups: {len(all_duplicates)}")
return all_duplicates
Hierarchical Deduplicationā
async def hierarchical_deduplication(self, documents):
"""Demonstrate hierarchical deduplication."""
print("\nšļø Hierarchical Deduplication")
print("=" * 40)
# Document-level deduplication
doc_duplicates = await self.hierarchical_deduplicator.deduplicate_documents(documents)
print(f"Document-level duplicates: {len(doc_duplicates)}")
# Chunk-level deduplication
chunk_duplicates = await self.hierarchical_deduplicator.deduplicate_chunks(documents)
print(f"Chunk-level duplicates: {len(chunk_duplicates)}")
# Sentence-level deduplication
sentence_duplicates = await self.hierarchical_deduplicator.deduplicate_sentences(documents)
print(f"Sentence-level duplicates: {len(sentence_duplicates)}")
# Analyze hierarchical results
analysis = await self.hierarchical_deduplicator.analyze_hierarchical_results({
'document': doc_duplicates,
'chunk': chunk_duplicates,
'sentence': sentence_duplicates
})
print(f"\nHierarchical Analysis:")
print(f" Total duplicates found: {analysis['total_duplicates']}")
print(f" Processing efficiency: {analysis['efficiency']:.1%}")
print(f" Memory savings: {analysis['memory_savings']:.1%}")
return analysis
Manual Review Workflowā
Review Queue Managementā
async def setup_review_workflow(self, duplicates):
"""Set up manual review workflow for duplicates."""
print("\nš„ Manual Review Workflow")
print("=" * 40)
# Create reviewer profiles
reviewers = [
ReviewerProfile(
id="reviewer_1",
name="John Smith",
expertise=["technical", "content"],
max_reviews_per_day=50
),
ReviewerProfile(
id="reviewer_2",
name="Jane Doe",
expertise=["business", "content"],
max_reviews_per_day=30
)
]
for reviewer in reviewers:
await self.review_manager.register_reviewer(reviewer)
print(f"ā
Registered reviewer: {reviewer.name}")
# Create review tasks
review_tasks = []
for duplicate_group in duplicates:
task = await self.review_manager.create_review_task(
duplicate_group=duplicate_group,
priority=ReviewPriority.MEDIUM,
estimated_time_minutes=5
)
review_tasks.append(task)
print(f"ā
Created {len(review_tasks)} review tasks")
# Simulate review process
for i, task in enumerate(review_tasks[:3]): # Review first 3 tasks
print(f"\nReviewing task {i+1}: {task.id}")
# Simulate reviewer decision
decision = ReviewDecision(
task_id=task.id,
reviewer_id="reviewer_1",
decision="keep_primary", # or "merge", "keep_all", "delete_all"
confidence=0.9,
comments="Clear duplicate, keeping the primary document",
timestamp=datetime.now()
)
await self.review_manager.submit_review_decision(decision)
print(f"ā
Review decision submitted: {decision.decision}")
return review_tasks
Review Analyticsā
async def analyze_review_performance(self):
"""Analyze manual review performance."""
print("\nš Review Performance Analysis")
print("=" * 40)
# Get review metrics
metrics = await self.review_manager.get_review_metrics(
time_range=timedelta(days=7)
)
print("Review Metrics (7 days):")
print(f" Total tasks created: {metrics['total_tasks']}")
print(f" Tasks completed: {metrics['completed_tasks']}")
print(f" Average review time: {metrics['avg_review_time']:.1f} minutes")
print(f" Reviewer productivity: {metrics['reviewer_productivity']:.1f} tasks/day")
# Get decision distribution
decisions = await self.review_manager.get_decision_distribution()
print(f"\nDecision Distribution:")
for decision, count in decisions.items():
print(f" {decision}: {count}")
# Get reviewer performance
reviewer_performance = await self.review_manager.get_reviewer_performance()
print(f"\nReviewer Performance:")
for reviewer_id, performance in reviewer_performance.items():
print(f" {reviewer_id}: {performance['tasks_completed']} tasks, "
f"{performance['avg_confidence']:.2f} confidence")
Impact Analysisā
Deduplication Impactā
async def analyze_deduplication_impact(self, documents, duplicates):
"""Analyze the impact of deduplication."""
print("\nš Impact Analysis")
print("=" * 40)
# Calculate storage impact
storage_impact = await self.impact_analyzer.calculate_storage_impact(
original_documents=documents,
duplicate_groups=duplicates
)
print("Storage Impact:")
print(f" Original size: {storage_impact['original_size']:.2f} MB")
print(f" After deduplication: {storage_impact['deduplicated_size']:.2f} MB")
print(f" Space saved: {storage_impact['space_saved']:.2f} MB ({storage_impact['space_saved_percent']:.1%})")
# Calculate performance impact
performance_impact = await self.impact_analyzer.calculate_performance_impact(
original_documents=documents,
duplicate_groups=duplicates
)
print(f"\nPerformance Impact:")
print(f" Indexing time reduction: {performance_impact['indexing_time_reduction']:.1%}")
print(f" Search time improvement: {performance_impact['search_time_improvement']:.1%}")
print(f" Memory usage reduction: {performance_impact['memory_reduction']:.1%}")
# Calculate quality impact
quality_impact = await self.impact_analyzer.calculate_quality_impact(
original_documents=documents,
duplicate_groups=duplicates
)
print(f"\nQuality Impact:")
print(f" Content diversity: {quality_impact['content_diversity']:.2f}")
print(f" Information density: {quality_impact['information_density']:.2f}")
print(f" Relevance preservation: {quality_impact['relevance_preservation']:.2f}")
return {
'storage': storage_impact,
'performance': performance_impact,
'quality': quality_impact
}
Monitoring and Optimizationā
Real-time Monitoringā
async def setup_monitoring(self):
"""Set up real-time deduplication monitoring."""
print("\nš Monitoring Dashboard")
print("=" * 40)
# Create monitoring dashboard
dashboard_config = {
'title': 'Deduplication Monitoring Dashboard',
'refresh_interval': 30, # seconds
'widgets': [
{
'type': 'duplicate_detection_rate',
'title': 'Duplicate Detection Rate',
'time_range': '24h'
},
{
'type': 'processing_performance',
'title': 'Processing Performance',
'time_range': '1h'
},
{
'type': 'review_queue_status',
'title': 'Review Queue Status',
'time_range': 'realtime'
},
{
'type': 'impact_metrics',
'title': 'Impact Metrics',
'time_range': '7d'
}
]
}
dashboard_url = await self.monitoring_dashboard.create_dashboard(dashboard_config)
print(f"ā
Monitoring dashboard created: {dashboard_url}")
# Get current metrics
metrics = await self.monitoring_dashboard.get_current_metrics()
print(f"\nCurrent Metrics:")
print(f" Documents processed: {metrics['documents_processed']}")
print(f" Duplicates detected: {metrics['duplicates_detected']}")
print(f" Processing rate: {metrics['processing_rate']:.1f} docs/sec")
print(f" Review queue size: {metrics['review_queue_size']}")
return dashboard_url
Performance Optimizationā
async def optimize_performance(self):
"""Optimize deduplication performance."""
print("\nā” Performance Optimization")
print("=" * 40)
# Get performance recommendations
recommendations = await self.deduplicator.get_performance_recommendations()
print("Performance Recommendations:")
for rec in recommendations:
print(f" ⢠{rec['description']}")
print(f" Expected improvement: {rec['expected_improvement']:.1%}")
print(f" Implementation effort: {rec['effort']}")
# Apply optimizations
optimization_results = await self.deduplicator.apply_optimizations([
'enable_parallel_processing',
'optimize_batch_size',
'enable_caching'
])
print(f"\nApplied Optimizations:")
for opt, result in optimization_results.items():
print(f" {opt}: {result['status']} - {result['performance_gain']:.1%} improvement")
return optimization_results
Complete Exampleā
Here's a complete working example:
#!/usr/bin/env python3
"""
Complete Deduplication System Example
"""
import asyncio
from datetime import datetime, timedelta
async def main():
"""Main demo function."""
print("š Comprehensive Deduplication System Demo")
print("=" * 50)
# Initialize deduplication system
dedup_example = DeduplicationExample()
# Load sample documents
documents = dedup_example.load_sample_documents()
# Demonstrate fingerprinting
await dedup_example.demonstrate_fingerprinting(documents)
# Process documents
processed_docs = await dedup_example.process_document_batch(documents)
# Detect duplicates
duplicates = await dedup_example.detect_duplicates(documents)
# Hierarchical deduplication
hierarchical_analysis = await dedup_example.hierarchical_deduplication(documents)
# Set up review workflow
review_tasks = await dedup_example.setup_review_workflow(duplicates)
# Analyze review performance
await dedup_example.analyze_review_performance()
# Analyze impact
impact_analysis = await dedup_example.analyze_deduplication_impact(documents, duplicates)
# Set up monitoring
dashboard_url = await dedup_example.setup_monitoring()
# Optimize performance
await dedup_example.optimize_performance()
print(f"\nš Deduplication Demo Complete!")
print(f"š Dashboard available at: {dashboard_url}")
if __name__ == "__main__":
asyncio.run(main())
Best Practicesā
1. Algorithm Selectionā
- Use exact matching for perfect duplicates
- Use MinHash for near-duplicate detection
- Use semantic similarity for content duplicates
- Combine multiple algorithms for best results
2. Threshold Tuningā
- Start with conservative thresholds
- Monitor false positive rates
- Adjust based on your data characteristics
- Regular threshold optimization
3. Review Workflowā
- Prioritize high-confidence duplicates
- Provide clear review guidelines
- Track reviewer performance
- Regular training and feedback
4. Performance Optimizationā
- Use parallel processing for large datasets
- Optimize batch sizes
- Enable caching for repeated operations
- Monitor resource usage
Troubleshootingā
Common Issuesā
High False Positive Rate
# Adjust similarity thresholds
config = DeduplicationConfig(
near_duplicate_threshold=0.9, # Increase threshold
semantic_similarity_threshold=0.8
)
Slow Processing
# Enable parallel processing
config = DeduplicationConfig(
enable_parallel_processing=True,
batch_size=200, # Increase batch size
max_workers=4
)
Memory Issues
# Reduce batch size and enable streaming
config = DeduplicationConfig(
batch_size=50,
enable_streaming=True,
max_memory_usage="2GB"
)
Next Stepsā
- š Deduplication Monitoring - Advanced monitoring setup
- š§ Deduplication Configuration - Production configuration
- š Impact Analysis - Detailed impact measurement
- šÆ Optimization Guide - Performance optimization
Summaryā
This example demonstrated:
- ā Document Processing - Fingerprinting and batch processing
- ā Duplicate Detection - Multi-algorithm duplicate identification
- ā Hierarchical Deduplication - Document, chunk, and sentence-level processing
- ā Manual Review - Human-in-the-loop validation workflows
- ā Impact Analysis - Comprehensive impact measurement
- ā Monitoring - Real-time deduplication metrics
- ā Optimization - Performance tuning and configuration
You now have a complete deduplication system that efficiently identifies and manages duplicate content in your RAG application!