Skip to main content

Conversational Search Core

Core conversational search engine providing natural language understanding, memory management, intelligent query processing, and resilient API integration for enterprise conversational interfaces.

Core Classes

ConversationalSearchEngine

Description: Main conversational search engine orchestrating all components

Parameters:

  • nlu_engine (ContextAwareNLUEngine): Natural language understanding engine
  • memory_manager (SimpleMemoryManager): Memory management component
  • api_mapping (SimpleAPIMappingEngine): API mapping engine
  • resilient_client (ResilientAPIClient): Resilient API client
  • smart_cache (SmartCacheManager): Smart caching manager

Returns: ConversationalSearchEngine instance

Example:

from recoagent.conversational_search import ConversationalSearchEngine, ConversationState

# Create conversational search engine
search_engine = ConversationalSearchEngine(
nlu_engine=nlu_engine,
memory_manager=memory_manager,
api_mapping=api_mapping,
resilient_client=resilient_client,
smart_cache=smart_cache
)

# Process conversational query
state = ConversationState(
messages=[HumanMessage(content="I need help with machine learning")],
query="machine learning help",
session_id="session_123"
)

result = await search_engine.process_conversation(state)

ContextAwareNLUEngine

Description: Natural language understanding engine with context awareness

Parameters:

  • intent_classifier (IntentClassifier): Intent classification component
  • entity_extractor (EntityExtractor): Entity extraction component
  • context_manager (ContextManager): Context management component
  • confidence_threshold (float): Confidence threshold for classification (default: 0.7)

Returns: ContextAwareNLUEngine instance

Example:

from recoagent.conversational_search import ContextAwareNLUEngine

# Create NLU engine
nlu_engine = ContextAwareNLUEngine(
intent_classifier=intent_classifier,
entity_extractor=entity_extractor,
context_manager=context_manager,
confidence_threshold=0.8
)

# Process natural language input
nlu_result = await nlu_engine.process_input(
text="I want to learn about deep learning",
context={"previous_intent": "education", "user_level": "beginner"}
)

print(f"Intent: {nlu_result.intent}")
print(f"Entities: {nlu_result.entities}")
print(f"Confidence: {nlu_result.confidence}")

SimpleMemoryManager

Description: Memory management for conversational context

Parameters:

  • max_memory_size (int): Maximum memory size (default: 1000)
  • memory_decay (float): Memory decay factor (default: 0.9)
  • enable_entity_tracking (bool): Enable entity tracking (default: True)

Returns: SimpleMemoryManager instance

Example:

from recoagent.conversational_search import SimpleMemoryManager

# Create memory manager
memory_manager = SimpleMemoryManager(
max_memory_size=500,
memory_decay=0.8,
enable_entity_tracking=True
)

# Store conversation context
memory_manager.store_context(
session_id="session_123",
context={
"user_preferences": {"topic": "AI", "level": "intermediate"},
"recent_queries": ["machine learning", "neural networks"],
"entities": ["AI", "machine learning", "neural networks"]
}
)

# Retrieve context
context = memory_manager.get_context("session_123")

ResilientAPIClient

Description: Resilient API client with fallback and error handling

Parameters:

  • base_url (str): Base API URL
  • timeout (int): Request timeout in seconds (default: 30)
  • max_retries (int): Maximum retry attempts (default: 3)
  • fallback_apis (List[str]): List of fallback API URLs

Returns: ResilientAPIClient instance

Example:

from recoagent.conversational_search import ResilientAPIClient

# Create resilient API client
api_client = ResilientAPIClient(
base_url="https://api.example.com",
timeout=30,
max_retries=3,
fallback_apis=["https://backup1.example.com", "https://backup2.example.com"]
)

# Make resilient API call
response = await api_client.make_request(
endpoint="/search",
method="POST",
data={"query": "machine learning"},
retry_on_failure=True
)

Usage Examples

from recoagent.conversational_search import ConversationalSearchEngine, ConversationState
from langchain_core.messages import HumanMessage, AIMessage

# Create conversational search engine
search_engine = ConversationalSearchEngine(
nlu_engine=nlu_engine,
memory_manager=memory_manager,
api_mapping=api_mapping,
resilient_client=api_client,
smart_cache=smart_cache
)

# Simulate conversation
conversation_turns = [
"I need help with machine learning",
"What are the best algorithms for beginners?",
"Can you recommend some resources?",
"What about deep learning?"
]

# Process conversation
state = ConversationState(
messages=[],
query="",
session_id="session_123"
)

for turn in conversation_turns:
# Add user message
state["messages"].append(HumanMessage(content=turn))
state["query"] = turn

# Process conversation
result = await search_engine.process_conversation(state)

# Add AI response
state["messages"].append(AIMessage(content=result.response))

print(f"User: {turn}")
print(f"AI: {result.response}")
print(f"Intent: {result.metadata.get('intent')}")
print(f"Entities: {result.metadata.get('entities')}")
print("---")

Advanced NLU Processing

from recoagent.conversational_search import ContextAwareNLUEngine

# Create advanced NLU engine
nlu_engine = ContextAwareNLUEngine(
intent_classifier=intent_classifier,
entity_extractor=entity_extractor,
context_manager=context_manager,
confidence_threshold=0.7
)

# Process complex queries with context
complex_queries = [
{
"text": "I want to learn about machine learning",
"context": {"user_level": "beginner", "previous_topic": None}
},
{
"text": "What about deep learning specifically?",
"context": {"user_level": "beginner", "previous_topic": "machine learning"}
},
{
"text": "Show me advanced neural network architectures",
"context": {"user_level": "intermediate", "previous_topic": "deep learning"}
}
]

for query_data in complex_queries:
nlu_result = await nlu_engine.process_input(
text=query_data["text"],
context=query_data["context"]
)

print(f"Query: {query_data['text']}")
print(f"Intent: {nlu_result.intent}")
print(f"Confidence: {nlu_result.confidence:.3f}")
print(f"Entities: {nlu_result.entities}")
print(f"Context Updated: {nlu_result.context_updated}")
print("---")

Memory Management and Context Tracking

from recoagent.conversational_search import SimpleMemoryManager

# Create memory manager
memory_manager = SimpleMemoryManager(
max_memory_size=1000,
memory_decay=0.9,
enable_entity_tracking=True
)

# Simulate multi-turn conversation
conversation_data = [
{
"turn": 1,
"user": "I'm interested in AI",
"entities": ["AI"],
"intent": "topic_interest"
},
{
"turn": 2,
"user": "What are the main applications?",
"entities": ["applications"],
"intent": "information_request"
},
{
"turn": 3,
"user": "Tell me more about healthcare AI",
"entities": ["healthcare", "AI"],
"intent": "specific_information"
}
]

# Process conversation with memory
for turn_data in conversation_data:
# Store turn in memory
memory_manager.store_turn(
session_id="session_123",
turn_number=turn_data["turn"],
user_input=turn_data["user"],
entities=turn_data["entities"],
intent=turn_data["intent"]
)

# Get accumulated context
context = memory_manager.get_context("session_123")

print(f"Turn {turn_data['turn']}: {turn_data['user']}")
print(f"Accumulated entities: {context.get('entities', [])}")
print(f"Intent history: {context.get('intent_history', [])}")
print(f"Memory size: {len(context.get('turns', []))}")
print("---")

Resilient API Integration

from recoagent.conversational_search import ResilientAPIClient, FallbackManager

# Create resilient API client
api_client = ResilientAPIClient(
base_url="https://api.example.com",
timeout=30,
max_retries=3,
fallback_apis=[
"https://backup1.example.com",
"https://backup2.example.com"
]
)

# Create fallback manager
fallback_manager = FallbackManager(
primary_apis=["https://api.example.com"],
fallback_apis=["https://backup1.example.com", "https://backup2.example.com"],
health_check_interval=60
)

# Make resilient API calls
async def make_resilient_search(query: str):
"""Make resilient search API call."""
try:
# Try primary API
response = await api_client.make_request(
endpoint="/search",
method="POST",
data={"query": query},
retry_on_failure=True
)

print(f"✅ Primary API success: {response.status_code}")
return response.json()

except Exception as e:
print(f"❌ Primary API failed: {str(e)}")

# Try fallback APIs
for fallback_url in fallback_manager.get_available_fallbacks():
try:
fallback_client = ResilientAPIClient(base_url=fallback_url)
response = await fallback_client.make_request(
endpoint="/search",
method="POST",
data={"query": query}
)

print(f"✅ Fallback API success: {fallback_url}")
return response.json()

except Exception as fallback_error:
print(f"❌ Fallback API failed: {fallback_url} - {str(fallback_error)}")
continue

raise Exception("All APIs failed")

# Test resilient search
queries = [
"machine learning algorithms",
"deep learning frameworks",
"AI applications in healthcare"
]

for query in queries:
try:
result = await make_resilient_search(query)
print(f"Search results for '{query}': {len(result.get('results', []))} items")
except Exception as e:
print(f"Search failed for '{query}': {str(e)}")
print("---")
from recoagent.conversational_search import SmartCacheManager

# Create smart cache manager
smart_cache = SmartCacheManager(
cache_backend="redis",
similarity_threshold=0.8,
enable_semantic_caching=True,
cache_ttl=3600
)

# Cache conversational responses
async def cache_conversational_response(query: str, response: str, context: Dict):
"""Cache conversational response with context."""
cache_key = smart_cache.generate_key(query, context)

await smart_cache.set(
key=cache_key,
value={
"response": response,
"context": context,
"timestamp": datetime.utcnow().isoformat()
},
ttl=3600
)

print(f"Cached response for: {query}")

# Retrieve cached responses
async def get_cached_response(query: str, context: Dict):
"""Get cached response if available."""
# Try exact match first
cache_key = smart_cache.generate_key(query, context)
cached_response = await smart_cache.get(cache_key)

if cached_response:
print(f"✅ Exact cache hit for: {query}")
return cached_response

# Try semantic similarity
similar_response = await smart_cache.get_similar(query, context)
if similar_response:
print(f"✅ Semantic cache hit for: {query}")
return similar_response

print(f"❌ Cache miss for: {query}")
return None

# Test smart caching
test_queries = [
("What is machine learning?", {"topic": "AI", "level": "beginner"}),
("Tell me about ML", {"topic": "AI", "level": "beginner"}), # Similar to first
("How does artificial intelligence work?", {"topic": "AI", "level": "beginner"}) # Different
]

for query, context in test_queries:
# Check cache first
cached = await get_cached_response(query, context)

if not cached:
# Generate new response (simulate)
response = f"Response for: {query}"
await cache_conversational_response(query, response, context)
else:
print(f"Using cached response: {cached['response']}")

print("---")
from recoagent.conversational_search import ConversationalSearchEngine

# Create multi-modal search engine
multi_modal_engine = ConversationalSearchEngine(
nlu_engine=nlu_engine,
memory_manager=memory_manager,
api_mapping=api_mapping,
resilient_client=api_client,
smart_cache=smart_cache,
enable_multi_modal=True
)

# Process multi-modal queries
multi_modal_queries = [
{
"type": "text",
"content": "Show me images of neural networks",
"context": {"modality": "visual", "topic": "neural networks"}
},
{
"type": "voice",
"content": "audio_query.wav",
"context": {"modality": "audio", "language": "en"}
},
{
"type": "image",
"content": "neural_network_diagram.png",
"context": {"modality": "image", "task": "explain"}
}
]

for query in multi_modal_queries:
result = await multi_modal_engine.process_multi_modal_query(
query_type=query["type"],
content=query["content"],
context=query["context"]
)

print(f"Query Type: {query['type']}")
print(f"Response: {result.response}")
print(f"Modality: {result.metadata.get('modality')}")
print(f"Processing Time: {result.metadata.get('processing_time')}ms")
print("---")

API Reference

ConversationalSearchEngine Methods

process_conversation(state: ConversationState) -> ConversationalResponse

Process conversational query with state management

Parameters:

  • state (ConversationState): Conversation state

Returns: ConversationalResponse with response and metadata

process_multi_modal_query(query_type: str, content: Any, context: Dict) -> ConversationalResponse

Process multi-modal query

Parameters:

  • query_type (str): Type of query (text, voice, image)
  • content (Any): Query content
  • context (Dict): Query context

Returns: ConversationalResponse with multi-modal response

ContextAwareNLUEngine Methods

process_input(text: str, context: Dict = None) -> NLUResult

Process natural language input with context

Parameters:

  • text (str): Input text
  • context (Dict, optional): Context information

Returns: NLUResult with intent, entities, and confidence

update_context(session_id: str, context: Dict) -> None

Update conversation context

Parameters:

  • session_id (str): Session identifier
  • context (Dict): Context to update

SimpleMemoryManager Methods

store_context(session_id: str, context: Dict) -> None

Store conversation context

Parameters:

  • session_id (str): Session identifier
  • context (Dict): Context to store

get_context(session_id: str) -> Dict

Get conversation context

Parameters:

  • session_id (str): Session identifier

Returns: Stored context dictionary

ResilientAPIClient Methods

make_request(endpoint: str, method: str, data: Dict = None) -> Response

Make resilient API request

Parameters:

  • endpoint (str): API endpoint
  • method (str): HTTP method
  • data (Dict, optional): Request data

Returns: API response

health_check() -> bool

Check API health

Returns: True if API is healthy

See Also