Skip to main content

Deduplication

This example demonstrates the complete deduplication workflow including document processing, duplicate detection using multiple algorithms, manual review workflows, impact analysis, and monitoring.

Overview​

The deduplication system provides:

  • Multiple Detection Algorithms - MinHash, SimHash, and semantic similarity
  • Hierarchical Processing - Document, chunk, and sentence-level deduplication
  • Manual Review Workflows - Human-in-the-loop validation
  • Impact Analysis - Measure deduplication effectiveness
  • Monitoring Dashboard - Real-time deduplication metrics
  • Optimization Tools - Performance tuning and configuration

Prerequisites​

  • RecoAgent installed and configured
  • Deduplication packages available
  • Sample documents for testing
  • Python 3.8+ with required dependencies

Basic Setup​

1. Initialize Deduplication System​

from packages.rag.deduplication import (
DeduplicationConfig, DocumentDeduplicator, HierarchicalDeduplicator,
SimilarityAlgorithm, DeduplicationAction
)
from packages.rag.deduplication_review import (
ReviewWorkflowManager, ReviewerProfile, ReviewDecision, ReviewPriority
)
from packages.rag.deduplication_analytics import ImpactAnalyzer
from packages.rag.deduplication_tools import DeduplicationAnalyzer, ManualReviewInterface
from packages.rag.deduplication_monitoring import DeduplicationMonitoringDashboard

class DeduplicationExample:
"""Example implementation of the deduplication system."""

def __init__(self):
# Configure deduplication system
self.config = DeduplicationConfig(
# Similarity thresholds
exact_match_threshold=1.0,
near_duplicate_threshold=0.85,
semantic_similarity_threshold=0.75,
manual_review_threshold=0.65,

# Algorithm settings
minhash_num_permutations=128,
minhash_band_size=8,
simhash_hash_bits=64,
semantic_model_name="all-MiniLM-L6-v2",

# Processing settings
batch_size=100,
max_candidates_per_doc=50,
enable_parallel_processing=True
)

# Initialize deduplication components
self.deduplicator = DocumentDeduplicator(self.config)
self.hierarchical_deduplicator = HierarchicalDeduplicator(self.config)
self.review_manager = ReviewWorkflowManager()
self.impact_analyzer = ImpactAnalyzer()
self.monitoring_dashboard = DeduplicationMonitoringDashboard()

2. Load Sample Documents​

def load_sample_documents(self):
"""Load sample documents for deduplication testing."""
print("šŸ“„ Loading sample documents...")

sample_documents = [
{
"id": "doc_1",
"content": "RecoAgent is an enterprise RAG platform built with LangGraph and LangChain.",
"source": "introduction.txt",
"metadata": {"category": "overview", "author": "team"}
},
{
"id": "doc_2",
"content": "RecoAgent is an enterprise RAG platform built with LangGraph and LangChain. It provides hybrid retrieval capabilities.",
"source": "features.txt",
"metadata": {"category": "features", "author": "team"}
},
{
"id": "doc_3",
"content": "The RecoAgent platform combines retrieval-augmented generation with agent orchestration.",
"source": "architecture.txt",
"metadata": {"category": "architecture", "author": "team"}
},
{
"id": "doc_4",
"content": "RecoAgent is an enterprise RAG platform built with LangGraph and LangChain.",
"source": "duplicate_intro.txt",
"metadata": {"category": "overview", "author": "team"}
}
]

print(f"āœ… Loaded {len(sample_documents)} sample documents")
return sample_documents

Document Processing​

Fingerprint Generation​

async def demonstrate_fingerprinting(self, documents):
"""Demonstrate document fingerprinting capabilities."""
print("\nšŸ” Document Fingerprinting")
print("=" * 40)

for doc in documents:
print(f"\nDocument: {doc['id']}")
print(f"Content: {doc['content'][:50]}...")

# Generate fingerprints using different algorithms
fingerprints = await self.deduplicator.generate_fingerprints(doc)

print(f"MinHash Fingerprint: {fingerprints.minhash[:20]}...")
print(f"SimHash Fingerprint: {fingerprints.simhash}")
print(f"Semantic Embedding: {fingerprints.semantic_embedding[:10]}...")
print(f"Content Hash: {fingerprints.content_hash}")

return fingerprints

Batch Processing​

async def process_document_batch(self, documents):
"""Process a batch of documents for deduplication."""
print("\nšŸ“¦ Batch Processing")
print("=" * 40)

# Process documents in batches
batch_size = self.config.batch_size
processed_docs = []

for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
print(f"Processing batch {i//batch_size + 1}: {len(batch)} documents")

# Process batch
batch_results = await self.deduplicator.process_batch(batch)
processed_docs.extend(batch_results)

print(f"āœ… Processed {len(batch_results)} documents")

return processed_docs

Duplicate Detection​

Multi-Algorithm Detection​

async def detect_duplicates(self, documents):
"""Detect duplicates using multiple algorithms."""
print("\nšŸ” Duplicate Detection")
print("=" * 40)

# Detect exact matches
exact_matches = await self.deduplicator.detect_exact_matches(documents)
print(f"Exact matches found: {len(exact_matches)}")

# Detect near duplicates using MinHash
near_duplicates = await self.deduplicator.detect_near_duplicates(
documents,
algorithm=SimilarityAlgorithm.MINHASH,
threshold=self.config.near_duplicate_threshold
)
print(f"Near duplicates (MinHash): {len(near_duplicates)}")

# Detect semantic duplicates
semantic_duplicates = await self.deduplicator.detect_semantic_duplicates(
documents,
threshold=self.config.semantic_similarity_threshold
)
print(f"Semantic duplicates: {len(semantic_duplicates)}")

# Combine results
all_duplicates = self.deduplicator.combine_duplicate_results([
exact_matches,
near_duplicates,
semantic_duplicates
])

print(f"Total duplicate groups: {len(all_duplicates)}")

return all_duplicates

Hierarchical Deduplication​

async def hierarchical_deduplication(self, documents):
"""Demonstrate hierarchical deduplication."""
print("\nšŸ—ļø Hierarchical Deduplication")
print("=" * 40)

# Document-level deduplication
doc_duplicates = await self.hierarchical_deduplicator.deduplicate_documents(documents)
print(f"Document-level duplicates: {len(doc_duplicates)}")

# Chunk-level deduplication
chunk_duplicates = await self.hierarchical_deduplicator.deduplicate_chunks(documents)
print(f"Chunk-level duplicates: {len(chunk_duplicates)}")

# Sentence-level deduplication
sentence_duplicates = await self.hierarchical_deduplicator.deduplicate_sentences(documents)
print(f"Sentence-level duplicates: {len(sentence_duplicates)}")

# Analyze hierarchical results
analysis = await self.hierarchical_deduplicator.analyze_hierarchical_results({
'document': doc_duplicates,
'chunk': chunk_duplicates,
'sentence': sentence_duplicates
})

print(f"\nHierarchical Analysis:")
print(f" Total duplicates found: {analysis['total_duplicates']}")
print(f" Processing efficiency: {analysis['efficiency']:.1%}")
print(f" Memory savings: {analysis['memory_savings']:.1%}")

return analysis

Manual Review Workflow​

Review Queue Management​

async def setup_review_workflow(self, duplicates):
"""Set up manual review workflow for duplicates."""
print("\nšŸ‘„ Manual Review Workflow")
print("=" * 40)

# Create reviewer profiles
reviewers = [
ReviewerProfile(
id="reviewer_1",
name="John Smith",
expertise=["technical", "content"],
max_reviews_per_day=50
),
ReviewerProfile(
id="reviewer_2",
name="Jane Doe",
expertise=["business", "content"],
max_reviews_per_day=30
)
]

for reviewer in reviewers:
await self.review_manager.register_reviewer(reviewer)
print(f"āœ… Registered reviewer: {reviewer.name}")

# Create review tasks
review_tasks = []
for duplicate_group in duplicates:
task = await self.review_manager.create_review_task(
duplicate_group=duplicate_group,
priority=ReviewPriority.MEDIUM,
estimated_time_minutes=5
)
review_tasks.append(task)

print(f"āœ… Created {len(review_tasks)} review tasks")

# Simulate review process
for i, task in enumerate(review_tasks[:3]): # Review first 3 tasks
print(f"\nReviewing task {i+1}: {task.id}")

# Simulate reviewer decision
decision = ReviewDecision(
task_id=task.id,
reviewer_id="reviewer_1",
decision="keep_primary", # or "merge", "keep_all", "delete_all"
confidence=0.9,
comments="Clear duplicate, keeping the primary document",
timestamp=datetime.now()
)

await self.review_manager.submit_review_decision(decision)
print(f"āœ… Review decision submitted: {decision.decision}")

return review_tasks

Review Analytics​

async def analyze_review_performance(self):
"""Analyze manual review performance."""
print("\nšŸ“Š Review Performance Analysis")
print("=" * 40)

# Get review metrics
metrics = await self.review_manager.get_review_metrics(
time_range=timedelta(days=7)
)

print("Review Metrics (7 days):")
print(f" Total tasks created: {metrics['total_tasks']}")
print(f" Tasks completed: {metrics['completed_tasks']}")
print(f" Average review time: {metrics['avg_review_time']:.1f} minutes")
print(f" Reviewer productivity: {metrics['reviewer_productivity']:.1f} tasks/day")

# Get decision distribution
decisions = await self.review_manager.get_decision_distribution()
print(f"\nDecision Distribution:")
for decision, count in decisions.items():
print(f" {decision}: {count}")

# Get reviewer performance
reviewer_performance = await self.review_manager.get_reviewer_performance()
print(f"\nReviewer Performance:")
for reviewer_id, performance in reviewer_performance.items():
print(f" {reviewer_id}: {performance['tasks_completed']} tasks, "
f"{performance['avg_confidence']:.2f} confidence")

Impact Analysis​

Deduplication Impact​

async def analyze_deduplication_impact(self, documents, duplicates):
"""Analyze the impact of deduplication."""
print("\nšŸ“ˆ Impact Analysis")
print("=" * 40)

# Calculate storage impact
storage_impact = await self.impact_analyzer.calculate_storage_impact(
original_documents=documents,
duplicate_groups=duplicates
)

print("Storage Impact:")
print(f" Original size: {storage_impact['original_size']:.2f} MB")
print(f" After deduplication: {storage_impact['deduplicated_size']:.2f} MB")
print(f" Space saved: {storage_impact['space_saved']:.2f} MB ({storage_impact['space_saved_percent']:.1%})")

# Calculate performance impact
performance_impact = await self.impact_analyzer.calculate_performance_impact(
original_documents=documents,
duplicate_groups=duplicates
)

print(f"\nPerformance Impact:")
print(f" Indexing time reduction: {performance_impact['indexing_time_reduction']:.1%}")
print(f" Search time improvement: {performance_impact['search_time_improvement']:.1%}")
print(f" Memory usage reduction: {performance_impact['memory_reduction']:.1%}")

# Calculate quality impact
quality_impact = await self.impact_analyzer.calculate_quality_impact(
original_documents=documents,
duplicate_groups=duplicates
)

print(f"\nQuality Impact:")
print(f" Content diversity: {quality_impact['content_diversity']:.2f}")
print(f" Information density: {quality_impact['information_density']:.2f}")
print(f" Relevance preservation: {quality_impact['relevance_preservation']:.2f}")

return {
'storage': storage_impact,
'performance': performance_impact,
'quality': quality_impact
}

Monitoring and Optimization​

Real-time Monitoring​

async def setup_monitoring(self):
"""Set up real-time deduplication monitoring."""
print("\nšŸ“Š Monitoring Dashboard")
print("=" * 40)

# Create monitoring dashboard
dashboard_config = {
'title': 'Deduplication Monitoring Dashboard',
'refresh_interval': 30, # seconds
'widgets': [
{
'type': 'duplicate_detection_rate',
'title': 'Duplicate Detection Rate',
'time_range': '24h'
},
{
'type': 'processing_performance',
'title': 'Processing Performance',
'time_range': '1h'
},
{
'type': 'review_queue_status',
'title': 'Review Queue Status',
'time_range': 'realtime'
},
{
'type': 'impact_metrics',
'title': 'Impact Metrics',
'time_range': '7d'
}
]
}

dashboard_url = await self.monitoring_dashboard.create_dashboard(dashboard_config)
print(f"āœ… Monitoring dashboard created: {dashboard_url}")

# Get current metrics
metrics = await self.monitoring_dashboard.get_current_metrics()
print(f"\nCurrent Metrics:")
print(f" Documents processed: {metrics['documents_processed']}")
print(f" Duplicates detected: {metrics['duplicates_detected']}")
print(f" Processing rate: {metrics['processing_rate']:.1f} docs/sec")
print(f" Review queue size: {metrics['review_queue_size']}")

return dashboard_url

Performance Optimization​

async def optimize_performance(self):
"""Optimize deduplication performance."""
print("\n⚔ Performance Optimization")
print("=" * 40)

# Get performance recommendations
recommendations = await self.deduplicator.get_performance_recommendations()

print("Performance Recommendations:")
for rec in recommendations:
print(f" • {rec['description']}")
print(f" Expected improvement: {rec['expected_improvement']:.1%}")
print(f" Implementation effort: {rec['effort']}")

# Apply optimizations
optimization_results = await self.deduplicator.apply_optimizations([
'enable_parallel_processing',
'optimize_batch_size',
'enable_caching'
])

print(f"\nApplied Optimizations:")
for opt, result in optimization_results.items():
print(f" {opt}: {result['status']} - {result['performance_gain']:.1%} improvement")

return optimization_results

Complete Example​

Here's a complete working example:

#!/usr/bin/env python3
"""
Complete Deduplication System Example
"""

import asyncio
from datetime import datetime, timedelta

async def main():
"""Main demo function."""
print("šŸ”„ Comprehensive Deduplication System Demo")
print("=" * 50)

# Initialize deduplication system
dedup_example = DeduplicationExample()

# Load sample documents
documents = dedup_example.load_sample_documents()

# Demonstrate fingerprinting
await dedup_example.demonstrate_fingerprinting(documents)

# Process documents
processed_docs = await dedup_example.process_document_batch(documents)

# Detect duplicates
duplicates = await dedup_example.detect_duplicates(documents)

# Hierarchical deduplication
hierarchical_analysis = await dedup_example.hierarchical_deduplication(documents)

# Set up review workflow
review_tasks = await dedup_example.setup_review_workflow(duplicates)

# Analyze review performance
await dedup_example.analyze_review_performance()

# Analyze impact
impact_analysis = await dedup_example.analyze_deduplication_impact(documents, duplicates)

# Set up monitoring
dashboard_url = await dedup_example.setup_monitoring()

# Optimize performance
await dedup_example.optimize_performance()

print(f"\nšŸŽ‰ Deduplication Demo Complete!")
print(f"šŸ“Š Dashboard available at: {dashboard_url}")

if __name__ == "__main__":
asyncio.run(main())

Best Practices​

1. Algorithm Selection​

  • Use exact matching for perfect duplicates
  • Use MinHash for near-duplicate detection
  • Use semantic similarity for content duplicates
  • Combine multiple algorithms for best results

2. Threshold Tuning​

  • Start with conservative thresholds
  • Monitor false positive rates
  • Adjust based on your data characteristics
  • Regular threshold optimization

3. Review Workflow​

  • Prioritize high-confidence duplicates
  • Provide clear review guidelines
  • Track reviewer performance
  • Regular training and feedback

4. Performance Optimization​

  • Use parallel processing for large datasets
  • Optimize batch sizes
  • Enable caching for repeated operations
  • Monitor resource usage

Troubleshooting​

Common Issues​

High False Positive Rate

# Adjust similarity thresholds
config = DeduplicationConfig(
near_duplicate_threshold=0.9, # Increase threshold
semantic_similarity_threshold=0.8
)

Slow Processing

# Enable parallel processing
config = DeduplicationConfig(
enable_parallel_processing=True,
batch_size=200, # Increase batch size
max_workers=4
)

Memory Issues

# Reduce batch size and enable streaming
config = DeduplicationConfig(
batch_size=50,
enable_streaming=True,
max_memory_usage="2GB"
)

Next Steps​

  1. šŸ“Š Deduplication Monitoring - Advanced monitoring setup
  2. šŸ”§ Deduplication Configuration - Production configuration
  3. šŸ“ˆ Impact Analysis - Detailed impact measurement
  4. šŸŽÆ Optimization Guide - Performance optimization

Summary​

This example demonstrated:

  • āœ… Document Processing - Fingerprinting and batch processing
  • āœ… Duplicate Detection - Multi-algorithm duplicate identification
  • āœ… Hierarchical Deduplication - Document, chunk, and sentence-level processing
  • āœ… Manual Review - Human-in-the-loop validation workflows
  • āœ… Impact Analysis - Comprehensive impact measurement
  • āœ… Monitoring - Real-time deduplication metrics
  • āœ… Optimization - Performance tuning and configuration

You now have a complete deduplication system that efficiently identifies and manages duplicate content in your RAG application!