Conversational Search Core
Core conversational search engine providing natural language understanding, memory management, intelligent query processing, and resilient API integration for enterprise conversational interfaces.
Core Classes
ConversationalSearchEngine
Description: Main conversational search engine orchestrating all components
Parameters:
nlu_engine(ContextAwareNLUEngine): Natural language understanding enginememory_manager(SimpleMemoryManager): Memory management componentapi_mapping(SimpleAPIMappingEngine): API mapping engineresilient_client(ResilientAPIClient): Resilient API clientsmart_cache(SmartCacheManager): Smart caching manager
Returns: ConversationalSearchEngine instance
Example:
from recoagent.conversational_search import ConversationalSearchEngine, ConversationState
# Create conversational search engine
search_engine = ConversationalSearchEngine(
nlu_engine=nlu_engine,
memory_manager=memory_manager,
api_mapping=api_mapping,
resilient_client=resilient_client,
smart_cache=smart_cache
)
# Process conversational query
state = ConversationState(
messages=[HumanMessage(content="I need help with machine learning")],
query="machine learning help",
session_id="session_123"
)
result = await search_engine.process_conversation(state)
ContextAwareNLUEngine
Description: Natural language understanding engine with context awareness
Parameters:
intent_classifier(IntentClassifier): Intent classification componententity_extractor(EntityExtractor): Entity extraction componentcontext_manager(ContextManager): Context management componentconfidence_threshold(float): Confidence threshold for classification (default: 0.7)
Returns: ContextAwareNLUEngine instance
Example:
from recoagent.conversational_search import ContextAwareNLUEngine
# Create NLU engine
nlu_engine = ContextAwareNLUEngine(
intent_classifier=intent_classifier,
entity_extractor=entity_extractor,
context_manager=context_manager,
confidence_threshold=0.8
)
# Process natural language input
nlu_result = await nlu_engine.process_input(
text="I want to learn about deep learning",
context={"previous_intent": "education", "user_level": "beginner"}
)
print(f"Intent: {nlu_result.intent}")
print(f"Entities: {nlu_result.entities}")
print(f"Confidence: {nlu_result.confidence}")
SimpleMemoryManager
Description: Memory management for conversational context
Parameters:
max_memory_size(int): Maximum memory size (default: 1000)memory_decay(float): Memory decay factor (default: 0.9)enable_entity_tracking(bool): Enable entity tracking (default: True)
Returns: SimpleMemoryManager instance
Example:
from recoagent.conversational_search import SimpleMemoryManager
# Create memory manager
memory_manager = SimpleMemoryManager(
max_memory_size=500,
memory_decay=0.8,
enable_entity_tracking=True
)
# Store conversation context
memory_manager.store_context(
session_id="session_123",
context={
"user_preferences": {"topic": "AI", "level": "intermediate"},
"recent_queries": ["machine learning", "neural networks"],
"entities": ["AI", "machine learning", "neural networks"]
}
)
# Retrieve context
context = memory_manager.get_context("session_123")
ResilientAPIClient
Description: Resilient API client with fallback and error handling
Parameters:
base_url(str): Base API URLtimeout(int): Request timeout in seconds (default: 30)max_retries(int): Maximum retry attempts (default: 3)fallback_apis(List[str]): List of fallback API URLs
Returns: ResilientAPIClient instance
Example:
from recoagent.conversational_search import ResilientAPIClient
# Create resilient API client
api_client = ResilientAPIClient(
base_url="https://api.example.com",
timeout=30,
max_retries=3,
fallback_apis=["https://backup1.example.com", "https://backup2.example.com"]
)
# Make resilient API call
response = await api_client.make_request(
endpoint="/search",
method="POST",
data={"query": "machine learning"},
retry_on_failure=True
)
Usage Examples
Basic Conversational Search
from recoagent.conversational_search import ConversationalSearchEngine, ConversationState
from langchain_core.messages import HumanMessage, AIMessage
# Create conversational search engine
search_engine = ConversationalSearchEngine(
nlu_engine=nlu_engine,
memory_manager=memory_manager,
api_mapping=api_mapping,
resilient_client=api_client,
smart_cache=smart_cache
)
# Simulate conversation
conversation_turns = [
"I need help with machine learning",
"What are the best algorithms for beginners?",
"Can you recommend some resources?",
"What about deep learning?"
]
# Process conversation
state = ConversationState(
messages=[],
query="",
session_id="session_123"
)
for turn in conversation_turns:
# Add user message
state["messages"].append(HumanMessage(content=turn))
state["query"] = turn
# Process conversation
result = await search_engine.process_conversation(state)
# Add AI response
state["messages"].append(AIMessage(content=result.response))
print(f"User: {turn}")
print(f"AI: {result.response}")
print(f"Intent: {result.metadata.get('intent')}")
print(f"Entities: {result.metadata.get('entities')}")
print("---")
Advanced NLU Processing
from recoagent.conversational_search import ContextAwareNLUEngine
# Create advanced NLU engine
nlu_engine = ContextAwareNLUEngine(
intent_classifier=intent_classifier,
entity_extractor=entity_extractor,
context_manager=context_manager,
confidence_threshold=0.7
)
# Process complex queries with context
complex_queries = [
{
"text": "I want to learn about machine learning",
"context": {"user_level": "beginner", "previous_topic": None}
},
{
"text": "What about deep learning specifically?",
"context": {"user_level": "beginner", "previous_topic": "machine learning"}
},
{
"text": "Show me advanced neural network architectures",
"context": {"user_level": "intermediate", "previous_topic": "deep learning"}
}
]
for query_data in complex_queries:
nlu_result = await nlu_engine.process_input(
text=query_data["text"],
context=query_data["context"]
)
print(f"Query: {query_data['text']}")
print(f"Intent: {nlu_result.intent}")
print(f"Confidence: {nlu_result.confidence:.3f}")
print(f"Entities: {nlu_result.entities}")
print(f"Context Updated: {nlu_result.context_updated}")
print("---")
Memory Management and Context Tracking
from recoagent.conversational_search import SimpleMemoryManager
# Create memory manager
memory_manager = SimpleMemoryManager(
max_memory_size=1000,
memory_decay=0.9,
enable_entity_tracking=True
)
# Simulate multi-turn conversation
conversation_data = [
{
"turn": 1,
"user": "I'm interested in AI",
"entities": ["AI"],
"intent": "topic_interest"
},
{
"turn": 2,
"user": "What are the main applications?",
"entities": ["applications"],
"intent": "information_request"
},
{
"turn": 3,
"user": "Tell me more about healthcare AI",
"entities": ["healthcare", "AI"],
"intent": "specific_information"
}
]
# Process conversation with memory
for turn_data in conversation_data:
# Store turn in memory
memory_manager.store_turn(
session_id="session_123",
turn_number=turn_data["turn"],
user_input=turn_data["user"],
entities=turn_data["entities"],
intent=turn_data["intent"]
)
# Get accumulated context
context = memory_manager.get_context("session_123")
print(f"Turn {turn_data['turn']}: {turn_data['user']}")
print(f"Accumulated entities: {context.get('entities', [])}")
print(f"Intent history: {context.get('intent_history', [])}")
print(f"Memory size: {len(context.get('turns', []))}")
print("---")
Resilient API Integration
from recoagent.conversational_search import ResilientAPIClient, FallbackManager
# Create resilient API client
api_client = ResilientAPIClient(
base_url="https://api.example.com",
timeout=30,
max_retries=3,
fallback_apis=[
"https://backup1.example.com",
"https://backup2.example.com"
]
)
# Create fallback manager
fallback_manager = FallbackManager(
primary_apis=["https://api.example.com"],
fallback_apis=["https://backup1.example.com", "https://backup2.example.com"],
health_check_interval=60
)
# Make resilient API calls
async def make_resilient_search(query: str):
"""Make resilient search API call."""
try:
# Try primary API
response = await api_client.make_request(
endpoint="/search",
method="POST",
data={"query": query},
retry_on_failure=True
)
print(f"✅ Primary API success: {response.status_code}")
return response.json()
except Exception as e:
print(f"❌ Primary API failed: {str(e)}")
# Try fallback APIs
for fallback_url in fallback_manager.get_available_fallbacks():
try:
fallback_client = ResilientAPIClient(base_url=fallback_url)
response = await fallback_client.make_request(
endpoint="/search",
method="POST",
data={"query": query}
)
print(f"✅ Fallback API success: {fallback_url}")
return response.json()
except Exception as fallback_error:
print(f"❌ Fallback API failed: {fallback_url} - {str(fallback_error)}")
continue
raise Exception("All APIs failed")
# Test resilient search
queries = [
"machine learning algorithms",
"deep learning frameworks",
"AI applications in healthcare"
]
for query in queries:
try:
result = await make_resilient_search(query)
print(f"Search results for '{query}': {len(result.get('results', []))} items")
except Exception as e:
print(f"Search failed for '{query}': {str(e)}")
print("---")
Smart Caching for Conversational Search
from recoagent.conversational_search import SmartCacheManager
# Create smart cache manager
smart_cache = SmartCacheManager(
cache_backend="redis",
similarity_threshold=0.8,
enable_semantic_caching=True,
cache_ttl=3600
)
# Cache conversational responses
async def cache_conversational_response(query: str, response: str, context: Dict):
"""Cache conversational response with context."""
cache_key = smart_cache.generate_key(query, context)
await smart_cache.set(
key=cache_key,
value={
"response": response,
"context": context,
"timestamp": datetime.utcnow().isoformat()
},
ttl=3600
)
print(f"Cached response for: {query}")
# Retrieve cached responses
async def get_cached_response(query: str, context: Dict):
"""Get cached response if available."""
# Try exact match first
cache_key = smart_cache.generate_key(query, context)
cached_response = await smart_cache.get(cache_key)
if cached_response:
print(f"✅ Exact cache hit for: {query}")
return cached_response
# Try semantic similarity
similar_response = await smart_cache.get_similar(query, context)
if similar_response:
print(f"✅ Semantic cache hit for: {query}")
return similar_response
print(f"❌ Cache miss for: {query}")
return None
# Test smart caching
test_queries = [
("What is machine learning?", {"topic": "AI", "level": "beginner"}),
("Tell me about ML", {"topic": "AI", "level": "beginner"}), # Similar to first
("How does artificial intelligence work?", {"topic": "AI", "level": "beginner"}) # Different
]
for query, context in test_queries:
# Check cache first
cached = await get_cached_response(query, context)
if not cached:
# Generate new response (simulate)
response = f"Response for: {query}"
await cache_conversational_response(query, response, context)
else:
print(f"Using cached response: {cached['response']}")
print("---")
Multi-Modal Conversational Search
from recoagent.conversational_search import ConversationalSearchEngine
# Create multi-modal search engine
multi_modal_engine = ConversationalSearchEngine(
nlu_engine=nlu_engine,
memory_manager=memory_manager,
api_mapping=api_mapping,
resilient_client=api_client,
smart_cache=smart_cache,
enable_multi_modal=True
)
# Process multi-modal queries
multi_modal_queries = [
{
"type": "text",
"content": "Show me images of neural networks",
"context": {"modality": "visual", "topic": "neural networks"}
},
{
"type": "voice",
"content": "audio_query.wav",
"context": {"modality": "audio", "language": "en"}
},
{
"type": "image",
"content": "neural_network_diagram.png",
"context": {"modality": "image", "task": "explain"}
}
]
for query in multi_modal_queries:
result = await multi_modal_engine.process_multi_modal_query(
query_type=query["type"],
content=query["content"],
context=query["context"]
)
print(f"Query Type: {query['type']}")
print(f"Response: {result.response}")
print(f"Modality: {result.metadata.get('modality')}")
print(f"Processing Time: {result.metadata.get('processing_time')}ms")
print("---")
API Reference
ConversationalSearchEngine Methods
process_conversation(state: ConversationState) -> ConversationalResponse
Process conversational query with state management
Parameters:
state(ConversationState): Conversation state
Returns: ConversationalResponse with response and metadata
process_multi_modal_query(query_type: str, content: Any, context: Dict) -> ConversationalResponse
Process multi-modal query
Parameters:
query_type(str): Type of query (text, voice, image)content(Any): Query contentcontext(Dict): Query context
Returns: ConversationalResponse with multi-modal response
ContextAwareNLUEngine Methods
process_input(text: str, context: Dict = None) -> NLUResult
Process natural language input with context
Parameters:
text(str): Input textcontext(Dict, optional): Context information
Returns: NLUResult with intent, entities, and confidence
update_context(session_id: str, context: Dict) -> None
Update conversation context
Parameters:
session_id(str): Session identifiercontext(Dict): Context to update
SimpleMemoryManager Methods
store_context(session_id: str, context: Dict) -> None
Store conversation context
Parameters:
session_id(str): Session identifiercontext(Dict): Context to store
get_context(session_id: str) -> Dict
Get conversation context
Parameters:
session_id(str): Session identifier
Returns: Stored context dictionary
ResilientAPIClient Methods
make_request(endpoint: str, method: str, data: Dict = None) -> Response
Make resilient API request
Parameters:
endpoint(str): API endpointmethod(str): HTTP methoddata(Dict, optional): Request data
Returns: API response
health_check() -> bool
Check API health
Returns: True if API is healthy
See Also
- RAG Query Understanding - Query processing
- Channel Adapters - Communication channels
- Analytics Core - Conversational analytics
- Security Core - Conversational security