Clarification System Architecture
This document provides a comprehensive overview of the contextual clarification system architecture, explaining how it integrates with the enterprise RAG system to provide intelligent clarification capabilities.
Table of Contents
- System Overview
- Core Components
- Data Flow
- Integration Points
- Configuration Options
- Deployment Considerations
System Overview
The clarification system is designed to address the common problem of ambiguous queries in enterprise RAG applications. It provides intelligent clarification capabilities that help users get better answers by asking relevant follow-up questions while preserving context across interactions.
Key Features
- Intelligent Ambiguity Detection: Identifies various types of ambiguity in user queries
- Contextual Question Generation: Creates helpful clarification questions based on available context
- Progressive Disclosure: Provides partial answers while asking for clarification
- Context Preservation: Maintains conversation context across clarification exchanges
- User Preference Learning: Adapts clarification behavior based on user preferences
- Smart Suggestions: Offers intelligent suggestions for common clarifications
- Comprehensive Analytics: Tracks effectiveness and user satisfaction
- A/B Testing Framework: Enables testing of different clarification approaches
Core Components
1. Ambiguity Detection System
The ambiguity detection system analyzes queries to identify various types of ambiguity:
from packages.rag.clarification_system import AmbiguityDetector, AmbiguityType
detector = AmbiguityDetector()
# Detect ambiguity in a query
result = await detector.detect_ambiguity(
query="How do I fix it?",
context={"recent_messages": [{"content": "API is broken", "type": "user"}]},
domain="technical"
)
print(f"Is ambiguous: {result.is_ambiguous}")
print(f"Ambiguity types: {result.ambiguity_types}")
print(f"Confidence scores: {result.confidence_scores}")
Ambiguity Types
- Pronoun Reference: "it", "this", "that" without clear antecedents
- Context Dependent: "previous", "above", "next" requiring conversation context
- Incomplete Query: Missing key information like "How to" without specifics
- Multiple Interpretations: Queries that could mean different things
- Vague Terms: "good", "better", "best" without clear criteria
- Domain Specific: Technical terms needing clarification
2. Clarification Question Generator
Generates intelligent clarification questions based on detected ambiguity:
from packages.rag.clarification_system import ClarificationQuestionGenerator, ClarificationStrategy
generator = ClarificationQuestionGenerator()
# Generate clarification questions
questions = await generator.generate_clarification_questions(
ambiguity_detection=result,
context=clarification_context,
strategy=ClarificationStrategy.DIRECT_QUESTION
)
for question in questions:
print(f"Question: {question.question_text}")
print(f"Strategy: {question.strategy}")
print(f"Options: {question.options}")
Question Strategies
- Direct Question: Specific, targeted clarification questions
- Suggest Options: Multiple choice questions with predefined options
- Progressive Disclosure: Partial answers with follow-up questions
- Context Exploration: Questions that explore available context
- Smart Suggestions: AI-generated suggestions based on patterns
3. Context Preservation Manager
Maintains conversation context across clarification exchanges:
from packages.rag.clarification_system import ContextPreservationManager
context_manager = ContextPreservationManager()
# Preserve context with user response
preserved_context = await context_manager.preserve_context(
original_query="How do I fix it?",
clarification_context=clarification_context,
user_response="The authentication API"
)
print(f"Updated query: {preserved_context['updated_query']}")
print(f"Resolved ambiguities: {preserved_context['resolved_ambiguities']}")
4. User Preference Manager
Learns and adapts to user preferences for clarification behavior:
from packages.rag.user_preference_manager import UserPreferenceManager, PreferenceType
preference_manager = UserPreferenceManager()
# Get user preferences
preferences = await preference_manager.get_user_preferences("user123")
print(f"Clarification frequency: {preferences[PreferenceType.CLARIFICATION_FREQUENCY]}")
print(f"Clarification style: {preferences[PreferenceType.CLARIFICATION_STYLE]}")
# Learn from interaction
await preference_manager.learn_from_interaction(
user_id="user123",
original_query="How do I fix it?",
clarification_questions=["What are you referring to?"],
user_response="The API",
resolution_success=True,
satisfaction_score=0.8
)
Preference Types
- Clarification Frequency: How often to ask for clarification
- Clarification Style: Preferred communication style
- Communication Preference: Professional vs. casual tone
- Domain Preference: Preferred technical domains
- Interaction Pattern: Learning from user behavior patterns
5. Smart Suggestion System
Provides intelligent suggestions for common clarifications:
from packages.rag.smart_suggestions import SmartSuggestionSystem, SuggestionContext
suggestion_system = SmartSuggestionSystem()
# Create suggestion context
context = SuggestionContext(
original_query="How do I optimize it?",
ambiguity_types=["pronoun_reference", "vague_terms"],
available_documents=[{"title": "Performance Guide", "content": "..."}],
conversation_history=[{"content": "I need help with performance", "type": "user"}],
user_profile={"user_id": "user123"},
domain="technical",
session_metadata={"session_id": "session123"}
)
# Generate suggestions
suggestions = await suggestion_system.generate_suggestions(context)
for suggestion in suggestions:
print(f"Suggestion: {suggestion.suggestion_text}")
print(f"Confidence: {suggestion.confidence}")
print(f"Source: {suggestion.source}")
Suggestion Sources
- Pattern Analysis: Based on query patterns and templates
- User History: Based on user's previous interactions
- Domain Knowledge: Based on domain-specific knowledge
- Collaborative: Based on similar users' successful interactions
- ML Model: Based on machine learning predictions
6. Analytics System
Tracks and analyzes clarification effectiveness:
from packages.rag.clarification_analytics import ClarificationAnalytics, AnalyticsEventType
analytics = ClarificationAnalytics()
# Record events
await analytics.record_event(
event_type=AnalyticsEventType.CLARIFICATION_REQUESTED,
user_id="user123",
session_id="session123",
data={"questions": ["What are you referring to?"]}
)
# Get metrics
metrics = await analytics.get_clarification_metrics()
print(f"Clarification rate: {metrics.clarification_rate:.2%}")
print(f"Resolution rate: {metrics.resolution_rate:.2%}")
print(f"User satisfaction: {metrics.user_satisfaction_score:.2f}")
7. A/B Testing Framework
Enables testing of different clarification approaches:
from packages.rag.ab_testing_framework import ABTestManager, TestType, MetricType
ab_manager = ABTestManager()
# Create A/B test
test = await ab_manager.create_test(
name="Clarification Strategy Test",
description="Test different clarification strategies",
test_type=TestType.CLARIFICATION_STRATEGY,
primary_metric=MetricType.USER_SATISFACTION,
variants=[
{
"name": "Direct Questions",
"configuration": {"strategy": "direct", "frequency": "moderate"}
},
{
"name": "Progressive Disclosure",
"configuration": {"strategy": "progressive", "frequency": "often"}
}
]
)
# Start test
await ab_manager.start_test(test.test_id)
# Get user's variant
variant = await ab_manager.get_test_variant(test.test_id, "user123")
print(f"User variant: {variant}")
Data Flow
1. Query Processing Flow
2. Context Preservation Flow
3. User Preference Learning Flow
Integration Points
1. RAG System Integration
The clarification system integrates with the existing RAG system through several key points:
# Integration with query understanding
from packages.rag.query_understanding import QueryUnderstandingPipeline
from packages.rag.clarification_system import ClarificationSystem
# Extend query understanding with clarification
class EnhancedQueryUnderstandingPipeline(QueryUnderstandingPipeline):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.clarification_system = ClarificationSystem()
async def process_query(self, query, *args, **kwargs):
# First, do normal query understanding
analysis = await super().process_query(query, *args, **kwargs)
# Then check for clarification needs
if analysis.needs_clarification:
clarification_response = await self.clarification_system.process_query(
query, kwargs.get('context'), kwargs.get('domain', 'general')
)
# Enhance analysis with clarification
analysis.clarification_response = clarification_response
return analysis
2. Memory System Integration
# Integration with conversation memory
from recoagent.memory import MemoryManager
from packages.rag.clarification_system import ContextPreservationManager
class ClarificationAwareMemoryManager(MemoryManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.context_preservation = ContextPreservationManager()
async def add_message_with_clarification(self, thread_id, message_type, content, clarification_context=None):
# Add message to memory
await self.add_message(thread_id, message_type, content)
# If this is a clarification response, preserve context
if clarification_context and message_type == MessageType.USER:
preserved_context = await self.context_preservation.preserve_context(
clarification_context['original_query'],
clarification_context,
content
)
# Store preserved context in memory
await self.store_clarification_context(thread_id, preserved_context)
3. API Integration
# Integration with FastAPI
from fastapi import FastAPI, HTTPException
from packages.rag.clarification_system import ClarificationSystem
app = FastAPI()
clarification_system = ClarificationSystem()
@app.post("/query-with-clarification")
async def query_with_clarification(request: QueryRequest):
"""Enhanced query endpoint with clarification support."""
# Process query through clarification system
clarification_response = await clarification_system.process_query(
query=request.query,
context=request.context,
domain=request.domain,
user_id=request.user_id
)
if clarification_response.needs_clarification:
return {
"needs_clarification": True,
"questions": [
{
"text": q.question_text,
"type": q.question_type,
"options": q.options,
"partial_answer": q.partial_answer
}
for q in clarification_response.questions
],
"suggestions": clarification_response.suggestions,
"reasoning": clarification_response.reasoning
}
else:
# Process query normally
result = await process_normal_query(request.query)
return {
"needs_clarification": False,
"answer": result["answer"],
"context": result["context"]
}
@app.post("/clarification-response")
async def handle_clarification_response(request: ClarificationResponseRequest):
"""Handle user's response to clarification questions."""
# Handle clarification response
preserved_context = await clarification_system.handle_clarification_response(
request.original_query,
request.clarification_context,
request.user_response
)
# Process updated query
updated_query = preserved_context['updated_query']
result = await process_normal_query(updated_query)
return {
"answer": result["answer"],
"updated_query": updated_query,
"context_preserved": True
}
Configuration Options
1. System Configuration
# clarification_config.yaml
clarification_system:
ambiguity_detection:
thresholds:
pronoun_reference: 0.3
context_dependent: 0.4
vague_terms: 0.2
incomplete_query: 0.5
multiple_interpretations: 0.3
domain_specific: 0.3
enable_caching: true
cache_size: 1000
cache_ttl: 3600
question_generation:
max_questions: 3
enable_progressive_disclosure: true
enable_smart_suggestions: true
templates:
direct_questions:
pronoun_reference:
- "What specific {term} are you referring to?"
- "Could you clarify what you mean by '{term}'?"
vague_terms:
- "Could you be more specific about what you mean by '{term}'?"
- "What criteria are you using to define '{term}'?"
context_preservation:
max_history_messages: 10
max_context_age_hours: 24
enable_compression: true
compression_algorithm: "gzip"
user_preferences:
learning_rate: 0.1
memory_decay: 0.95
confidence_threshold: 0.7
min_interactions: 5
analytics:
enable_real_time: true
batch_size: 100
retention_days: 90
ab_testing:
enable_testing: true
default_test_duration_days: 14
min_sample_size: 100
2. Domain-Specific Configuration
# domain_config.yaml
domains:
technical:
ambiguity_patterns:
api: ["endpoint", "service", "microservice"]
database: ["table", "schema", "query", "index"]
server: ["host", "instance", "container", "pod"]
question_templates:
api_ambiguity:
- "Which API are you referring to - {api_options}?"
- "Are you asking about the REST API or GraphQL API?"
suggestion_templates:
- "Common API issues: authentication, rate limiting, data format"
- "API troubleshooting: check endpoints, verify credentials, test with Postman"
business:
ambiguity_patterns:
revenue: ["income", "sales", "profit", "earnings"]
customer: ["client", "user", "subscriber", "buyer"]
strategy: ["plan", "approach", "method", "tactic"]
question_templates:
business_ambiguity:
- "Which business area are you asking about?"
- "Are you looking for financial or operational information?"
3. User Preference Configuration
# user_preferences_config.yaml
default_preferences:
clarification_frequency: "moderate"
clarification_style: "direct"
communication_preference: "professional"
domain_preference: "general"
preference_learning:
enable_learning: true
learning_algorithms:
- "pattern_recognition"
- "temporal_analysis"
- "cross_user_learning"
adaptation_speed:
fast_learners: 0.2
normal_learners: 0.1
slow_learners: 0.05
confidence_thresholds:
high_confidence: 0.8
medium_confidence: 0.6
low_confidence: 0.4
Deployment Considerations
1. Performance Requirements
- Response Time: < 2 seconds for clarification question generation
- Throughput: Support 1000+ concurrent clarification requests
- Memory Usage: < 500MB per clarification system instance
- Database Performance: < 100ms for preference lookups
2. Scalability Considerations
# Horizontal scaling configuration
class ScalableClarificationSystem:
def __init__(self, redis_url=None, database_url=None):
self.redis_client = redis.from_url(redis_url) if redis_url else None
self.database_url = database_url
# Use Redis for caching if available
if self.redis_client:
self.cache_backend = RedisCache(self.redis_client)
else:
self.cache_backend = LocalCache()
async def process_query(self, query, *args, **kwargs):
# Check distributed cache first
cache_key = self._generate_cache_key(query, kwargs)
cached_result = await self.cache_backend.get(cache_key)
if cached_result:
return cached_result
# Process query
result = await self._process_query_internal(query, *args, **kwargs)
# Cache result
await self.cache_backend.set(cache_key, result, ttl=3600)
return result
3. Monitoring and Observability
# Monitoring setup
import logging
from packages.observability import MetricsCollector, StructuredLogger
class MonitoredClarificationSystem(ClarificationSystem):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metrics = MetricsCollector()
self.logger = StructuredLogger("clarification_system")
async def process_query(self, query, *args, **kwargs):
start_time = time.time()
try:
result = await super().process_query(query, *args, **kwargs)
# Record metrics
processing_time = time.time() - start_time
self.metrics.record_clarification_metrics({
'processing_time_ms': processing_time * 1000,
'needs_clarification': result.needs_clarification,
'question_count': len(result.questions),
'confidence': result.confidence
})
# Log interaction
self.logger.log_clarification_interaction({
'query': query,
'needs_clarification': result.needs_clarification,
'processing_time_ms': processing_time * 1000
})
return result
except Exception as e:
self.logger.log_error({
'error': str(e),
'query': query,
'processing_time_ms': (time.time() - start_time) * 1000
})
raise
4. Security Considerations
- Data Privacy: User preferences and conversation history should be encrypted
- Access Control: Implement proper authentication and authorization
- Audit Logging: Log all clarification interactions for compliance
- Data Retention: Implement proper data retention policies
5. Backup and Recovery
# Backup configuration
class BackupEnabledClarificationSystem(ClarificationSystem):
async def backup_preferences(self, backup_path):
"""Backup user preferences to file."""
preferences = await self.preference_manager.get_all_preferences()
with open(backup_path, 'w') as f:
json.dump(preferences, f, indent=2, default=str)
async def restore_preferences(self, backup_path):
"""Restore user preferences from backup."""
with open(backup_path, 'r') as f:
preferences = json.load(f)
for user_id, user_preferences in preferences.items():
for pref_type, pref_data in user_preferences.items():
await self.preference_manager.update_user_preference(
user_id, PreferenceType(pref_type), pref_data['value'], pref_data['confidence']
)
This architecture provides a robust, scalable, and maintainable clarification system that can be easily integrated into existing enterprise RAG applications while providing intelligent clarification capabilities that improve user experience and answer quality.