Skip to main content

Query Expansion Optimization Guide

Performance Optimization Strategies

Database Optimization

Indexing Strategy

-- Primary indexes for fast lookups
CREATE INDEX idx_synonyms_term_domain ON synonyms(term, domain);
CREATE INDEX idx_synonyms_confidence ON synonyms(confidence);
CREATE INDEX idx_synonyms_usage ON synonyms(usage_count);

-- Composite indexes for complex queries
CREATE INDEX idx_synonyms_domain_role ON synonyms(domain, user_role);
CREATE INDEX idx_synonyms_domain_dept ON synonyms(domain, department);

-- History table indexes
CREATE INDEX idx_expansion_history_user_date ON expansion_history(user_id, created_at);
CREATE INDEX idx_expansion_history_type ON expansion_history(expansion_type);

Query Optimization

# Use prepared statements for repeated queries
async def get_synonyms_optimized(self, term: str, domain: str):
query = """
SELECT term, synonym, domain, confidence, relevance_score, usage_count,
success_rate, source, context, user_role, department, metadata,
created_at, updated_at
FROM synonyms
WHERE term = ? AND domain = ? AND confidence > 0.5
ORDER BY confidence DESC, relevance_score DESC
LIMIT 10
"""
# Use connection pooling and prepared statements

Caching Implementation

Redis Caching

import redis
import json
import pickle
from typing import Optional, List

class CachedSynonymDatabase:
def __init__(self, db_path: str, redis_url: str = "redis://localhost:6379"):
self.db = SynonymDatabase(db_path)
self.redis = redis.from_url(redis_url)
self.cache_ttl = 300 # 5 minutes

async def get_synonyms(self, term: str, domain: str) -> List[Synonym]:
cache_key = f"synonyms:{term}:{domain}"

# Try cache first
cached = self.redis.get(cache_key)
if cached:
return pickle.loads(cached)

# Fallback to database
synonyms = await self.db.get_synonyms(term, domain)

# Cache result
self.redis.setex(cache_key, self.cache_ttl, pickle.dumps(synonyms))

return synonyms

In-Memory Caching

from functools import lru_cache
from typing import Dict, List

class MemoryCachedExpansionSystem:
def __init__(self):
self.synonym_cache = {}
self.expansion_cache = {}
self.max_cache_size = 10000

@lru_cache(maxsize=1000)
def get_cached_synonyms(self, term: str, domain: str) -> List[Synonym]:
return self.synonym_db.get_synonyms(term, domain)

def cache_expansion(self, query: str, context_hash: str, expansions: List[ExpansionResult]):
cache_key = f"{query}:{context_hash}"
if len(self.expansion_cache) < self.max_cache_size:
self.expansion_cache[cache_key] = expansions

Parallel Processing

Async Strategy Execution

import asyncio
from concurrent.futures import ThreadPoolExecutor

class ParallelExpansionSystem:
def __init__(self, max_workers: int = 4):
self.executor = ThreadPoolExecutor(max_workers=max_workers)

async def expand_query_parallel(self, query: str, context: ExpansionContext):
# Run strategies in parallel
tasks = []
for strategy_type, strategy in self.strategies.items():
task = asyncio.create_task(
self._run_strategy_safe(strategy, query, context)
)
tasks.append((strategy_type, task))

# Collect results
results = []
for strategy_type, task in tasks:
try:
expansions = await task
results.extend(expansions)
except Exception as e:
logger.warning(f"Strategy {strategy_type} failed: {e}")

return results

async def _run_strategy_safe(self, strategy, query: str, context: ExpansionContext):
try:
return await strategy.expand_query(query, context)
except Exception as e:
logger.error(f"Strategy execution failed: {e}")
return []

Memory Optimization

Efficient Data Structures

from dataclasses import dataclass
from typing import NamedTuple
import sys

# Use NamedTuple for immutable data
class SynonymData(NamedTuple):
term: str
synonym: str
domain: str
confidence: float
relevance_score: float

# Use __slots__ for memory efficiency
@dataclass
class OptimizedSynonym:
__slots__ = ['term', 'synonym', 'domain', 'confidence', 'relevance_score', 'usage_count']

term: str
synonym: str
domain: str
confidence: float
relevance_score: float
usage_count: int

Lazy Loading

class LazySynonymLoader:
def __init__(self, db_path: str):
self.db_path = db_path
self._synonyms_cache = {}
self._loaded_domains = set()

async def get_synonyms(self, term: str, domain: str) -> List[Synonym]:
# Load domain synonyms if not already loaded
if domain not in self._loaded_domains:
await self._load_domain_synonyms(domain)

return self._synonyms_cache.get(domain, {}).get(term, [])

async def _load_domain_synonyms(self, domain: str):
# Load all synonyms for domain
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT term, synonym, domain, confidence, relevance_score, usage_count
FROM synonyms WHERE domain = ?
""", (domain,))

domain_synonyms = defaultdict(list)
for row in cursor.fetchall():
synonym = SynonymData(*row)
domain_synonyms[synonym.term].append(synonym)

self._synonyms_cache[domain] = domain_synonyms
self._loaded_domains.add(domain)

Quality Optimization

Synonym Quality Scoring

Multi-Factor Quality Score

def calculate_synonym_quality(synonym: Synonym) -> float:
"""Calculate comprehensive quality score for synonym."""

# Base confidence score (0-1)
confidence_score = synonym.confidence

# Usage-based score (0-1)
usage_score = min(synonym.usage_count / 100, 1.0)

# Success rate score (0-1)
success_score = synonym.success_rate

# Relevance score (0-1)
relevance_score = synonym.relevance_score

# Source credibility (0-1)
source_scores = {
'manual_curation': 1.0,
'domain_expert': 0.9,
'user_feedback': 0.7,
'ml_discovery': 0.6,
'collaborative': 0.5
}
source_score = source_scores.get(synonym.source, 0.5)

# Weighted combination
quality_score = (
confidence_score * 0.3 +
usage_score * 0.2 +
success_score * 0.2 +
relevance_score * 0.2 +
source_score * 0.1
)

return min(1.0, quality_score)

Dynamic Quality Thresholds

class AdaptiveQualityThresholds:
def __init__(self):
self.base_threshold = 0.6
self.adaptive_factor = 0.1

def get_threshold(self, domain: str, user_role: str) -> float:
"""Get quality threshold based on context."""

# Domain-specific adjustments
domain_adjustments = {
'technical': 0.1, # Higher standards for technical terms
'medical': 0.15, # Very high standards for medical terms
'legal': 0.12, # High standards for legal terms
'general': 0.0 # Standard threshold
}

# Role-specific adjustments
role_adjustments = {
'expert': 0.05, # Experts can handle lower quality
'beginner': -0.05, # Beginners need higher quality
'general': 0.0 # Standard threshold
}

adjustment = (
domain_adjustments.get(domain, 0.0) +
role_adjustments.get(user_role, 0.0)
)

return max(0.3, min(0.9, self.base_threshold + adjustment))

Expansion Strategy Optimization

Dynamic Strategy Selection

class IntelligentStrategySelector:
def __init__(self):
self.strategy_performance = defaultdict(list)
self.query_patterns = {
'acronym_heavy': r'\b[A-Z]{2,6}\b',
'technical': r'\b(API|SQL|HTTP|XML|JSON)\b',
'long_query': r'^.{50,}$',
'short_query': r'^.{1,10}$'
}

def select_strategies(self, query: str, context: ExpansionContext) -> List[ExpansionType]:
"""Select optimal strategies based on query characteristics."""

strategies = []

# Always include synonym expansion
strategies.append(ExpansionType.SYNONYM_EXPANSION)

# Pattern-based selection
if self._matches_pattern(query, 'acronym_heavy'):
strategies.append(ExpansionType.ACRONYM_EXPANSION)

if self._matches_pattern(query, 'technical'):
strategies.append(ExpansionType.ACRONYM_EXPANSION)
strategies.append(ExpansionType.CONTEXTUAL)

if self._matches_pattern(query, 'long_query'):
strategies.append(ExpansionType.SEMANTIC)

if self._matches_pattern(query, 'short_query'):
strategies.append(ExpansionType.CONTEXTUAL)

# Performance-based selection
domain_performance = self._get_domain_performance(context.domain)
for strategy in strategies.copy():
if domain_performance.get(strategy, 0) < 0.5:
strategies.remove(strategy)

return strategies

def _matches_pattern(self, query: str, pattern_name: str) -> bool:
pattern = self.query_patterns.get(pattern_name)
if pattern:
return bool(re.search(pattern, query))
return False

def _get_domain_performance(self, domain: str) -> Dict[ExpansionType, float]:
"""Get strategy performance for domain."""
# In practice, this would query performance metrics
return {
ExpansionType.SYNONYM_EXPANSION: 0.8,
ExpansionType.ACRONYM_EXPANSION: 0.7,
ExpansionType.SEMANTIC: 0.6,
ExpansionType.CONTEXTUAL: 0.75
}

Confidence-Based Filtering

class ConfidenceFilter:
def __init__(self):
self.min_confidence = 0.5
self.max_expansions = 5

def filter_expansions(self, expansions: List[ExpansionResult]) -> List[ExpansionResult]:
"""Filter expansions by confidence and relevance."""

# Sort by composite score
scored_expansions = []
for expansion in expansions:
score = (
expansion.confidence_score * 0.6 +
expansion.relevance_score * 0.4
)
scored_expansions.append((score, expansion))

# Sort by score (descending)
scored_expansions.sort(key=lambda x: x[0], reverse=True)

# Filter by confidence and limit count
filtered = []
for score, expansion in scored_expansions:
if (expansion.confidence_score >= self.min_confidence and
len(filtered) < self.max_expansions):
filtered.append(expansion)

return filtered

Learning and Adaptation

User Feedback Integration

class AdaptiveLearningSystem:
def __init__(self):
self.feedback_weights = defaultdict(float)
self.learning_rate = 0.1

async def process_feedback(self, expansion: ExpansionResult, feedback: Dict[str, Any]):
"""Process user feedback to improve future expansions."""

was_helpful = feedback.get('was_helpful', False)
rating = feedback.get('rating', 3)

# Update synonym success rates
for synonym in expansion.synonyms_used:
if was_helpful:
synonym.success_rate = min(1.0, synonym.success_rate + self.learning_rate)
else:
synonym.success_rate = max(0.0, synonym.success_rate - self.learning_rate)

# Update in database
await self.synonym_db.add_synonym(synonym)

# Update strategy weights
strategy = expansion.expansion_type
if was_helpful:
self.feedback_weights[strategy] += self.learning_rate
else:
self.feedback_weights[strategy] -= self.learning_rate

# Normalize weights
self._normalize_weights()

def _normalize_weights(self):
"""Normalize feedback weights to prevent drift."""
total_weight = sum(self.feedback_weights.values())
if total_weight > 0:
for strategy in self.feedback_weights:
self.feedback_weights[strategy] /= total_weight

Continuous Improvement

class ContinuousImprovementSystem:
def __init__(self):
self.improvement_threshold = 0.05
self.check_interval = 24 * 60 * 60 # 24 hours

async def run_improvement_cycle(self):
"""Run continuous improvement cycle."""

# Analyze performance metrics
metrics = await self.analytics.get_expansion_metrics()

# Identify improvement opportunities
improvements = self._identify_improvements(metrics)

# Apply improvements
for improvement in improvements:
await self._apply_improvement(improvement)

def _identify_improvements(self, metrics) -> List[Dict[str, Any]]:
"""Identify potential improvements."""
improvements = []

# Low success rate
if metrics.success_rate < 0.6:
improvements.append({
'type': 'confidence_threshold',
'action': 'lower_threshold',
'current': 0.5,
'suggested': 0.4
})

# High false positive rate
if metrics.false_positive_rate > 0.3:
improvements.append({
'type': 'relevance_threshold',
'action': 'raise_threshold',
'current': 0.6,
'suggested': 0.7
})

return improvements

async def _apply_improvement(self, improvement: Dict[str, Any]):
"""Apply a specific improvement."""
if improvement['type'] == 'confidence_threshold':
# Update configuration
pass
elif improvement['type'] == 'relevance_threshold':
# Update configuration
pass

Scalability Optimization

Horizontal Scaling

Database Sharding

class ShardedSynonymDatabase:
def __init__(self, shard_config: Dict[str, str]):
self.shards = {}
for domain, db_path in shard_config.items():
self.shards[domain] = SynonymDatabase(db_path)
self.default_shard = self.shards.get('default')

async def get_synonyms(self, term: str, domain: str) -> List[Synonym]:
shard = self.shards.get(domain, self.default_shard)
if shard:
return await shard.get_synonyms(term, domain)
return []

async def add_synonym(self, synonym: Synonym) -> bool:
shard = self.shards.get(synonym.domain, self.default_shard)
if shard:
return await shard.add_synonym(synonym)
return False

Load Balancing

class LoadBalancedExpansionSystem:
def __init__(self, expansion_systems: List[QueryExpansionSystem]):
self.systems = expansion_systems
self.current_index = 0
self.system_health = [True] * len(expansion_systems)

async def expand_query(self, query: str, context: ExpansionContext):
"""Distribute expansion requests across systems."""

# Find healthy system
for _ in range(len(self.systems)):
system = self.systems[self.current_index]
if self.system_health[self.current_index]:
try:
return await system.expand_query(query, context)
except Exception as e:
logger.error(f"System {self.current_index} failed: {e}")
self.system_health[self.current_index] = False

self.current_index = (self.current_index + 1) % len(self.systems)

raise Exception("All expansion systems are unhealthy")

Vertical Scaling

Memory Optimization

class MemoryOptimizedExpansionSystem:
def __init__(self, max_memory_mb: int = 512):
self.max_memory_mb = max_memory_mb
self.synonym_cache = {}
self.cache_size_limit = max_memory_mb * 1024 * 1024 // 1000 # Rough estimate

def _check_memory_usage(self):
"""Check if memory usage is within limits."""
import psutil
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
return memory_mb < self.max_memory_mb

def _evict_cache_if_needed(self):
"""Evict least recently used cache entries."""
if len(self.synonym_cache) > self.cache_size_limit:
# Remove oldest entries
sorted_items = sorted(
self.synonym_cache.items(),
key=lambda x: x[1]['last_used']
)

# Remove 20% of cache
remove_count = len(sorted_items) // 5
for key, _ in sorted_items[:remove_count]:
del self.synonym_cache[key]

Monitoring and Alerting

Performance Monitoring

class ExpansionPerformanceMonitor:
def __init__(self):
self.metrics = {
'response_times': [],
'success_rates': [],
'error_counts': defaultdict(int),
'throughput': 0
}

def record_expansion(self, response_time: float, success: bool, error: str = None):
"""Record expansion performance metrics."""
self.metrics['response_times'].append(response_time)
self.metrics['success_rates'].append(success)
self.metrics['throughput'] += 1

if error:
self.metrics['error_counts'][error] += 1

def get_performance_summary(self) -> Dict[str, Any]:
"""Get performance summary."""
response_times = self.metrics['response_times']
success_rates = self.metrics['success_rates']

return {
'avg_response_time': sum(response_times) / len(response_times) if response_times else 0,
'p95_response_time': sorted(response_times)[int(len(response_times) * 0.95)] if response_times else 0,
'success_rate': sum(success_rates) / len(success_rates) if success_rates else 0,
'throughput_per_minute': self.metrics['throughput'] / 60,
'error_distribution': dict(self.metrics['error_counts'])
}

Alerting System

class ExpansionAlertingSystem:
def __init__(self):
self.alert_thresholds = {
'response_time_ms': 1000,
'success_rate': 0.7,
'error_rate': 0.1
}
self.alert_handlers = []

def add_alert_handler(self, handler):
"""Add alert handler."""
self.alert_handlers.append(handler)

async def check_alerts(self, metrics: Dict[str, Any]):
"""Check for alert conditions."""
alerts = []

# Response time alert
if metrics['avg_response_time'] > self.alert_thresholds['response_time_ms']:
alerts.append({
'type': 'high_response_time',
'message': f"Average response time {metrics['avg_response_time']:.2f}ms exceeds threshold",
'severity': 'warning'
})

# Success rate alert
if metrics['success_rate'] < self.alert_thresholds['success_rate']:
alerts.append({
'type': 'low_success_rate',
'message': f"Success rate {metrics['success_rate']:.2%} below threshold",
'severity': 'critical'
})

# Send alerts
for alert in alerts:
for handler in self.alert_handlers:
await handler.send_alert(alert)

This optimization guide provides comprehensive strategies for improving the performance, quality, and scalability of the query expansion system.