Analytics Core
Core analytics engine providing foundational infrastructure for collecting, processing, and analyzing metrics from enterprise RAG systems.
Core Classes
AnalyticsEngine
Description: Main analytics engine for collecting and processing system metrics
Parameters:
config(AnalyticsConfig): Analytics configurationstorage_backend(str): Storage backend ("postgresql", "redis", "mongodb")enable_real_time(bool): Enable real-time analytics (default: True)batch_size(int): Batch size for processing (default: 1000)
Returns: AnalyticsEngine instance
Example:
from recoagent.analytics.core import AnalyticsEngine, AnalyticsConfig
# Create configuration
config = AnalyticsConfig(
enable_user_tracking=True,
enable_performance_metrics=True,
enable_business_metrics=True,
retention_days=90
)
# Create analytics engine
analytics = AnalyticsEngine(
config=config,
storage_backend="postgresql",
enable_real_time=True
)
# Track events
analytics.track_event(
event_type="query_start",
user_id="user_123",
session_id="session_456",
properties={"query": "machine learning", "timestamp": datetime.utcnow()}
)
EventTracker
Description: Event tracking system for user interactions and system events
Parameters:
analytics_engine(AnalyticsEngine): Analytics engine instanceenable_batching(bool): Enable event batching (default: True)batch_interval(int): Batch interval in seconds (default: 5)
Returns: EventTracker instance
Example:
from recoagent.analytics.core import EventTracker
# Create event tracker
tracker = EventTracker(
analytics_engine=analytics,
enable_batching=True,
batch_interval=10
)
# Track user events
tracker.track_query_start(user_id="user_123", query="AI applications")
tracker.track_query_complete(user_id="user_123", response_time=1.5, results_count=10)
tracker.track_user_feedback(user_id="user_123", rating=5, feedback="Very helpful")
MetricsCollector
Description: Collects and aggregates system performance metrics
Parameters:
analytics_engine(AnalyticsEngine): Analytics engine instancecollection_interval(int): Collection interval in seconds (default: 60)metrics_types(List[str]): Types of metrics to collect
Returns: MetricsCollector instance
Example:
from recoagent.analytics.core import MetricsCollector
# Create metrics collector
collector = MetricsCollector(
analytics_engine=analytics,
collection_interval=30,
metrics_types=["performance", "usage", "quality"]
)
# Start collecting metrics
collector.start_collection()
# Get collected metrics
performance_metrics = collector.get_performance_metrics()
usage_metrics = collector.get_usage_metrics()
quality_metrics = collector.get_quality_metrics()
AnalyticsConfig
Description: Configuration for analytics system
Fields:
enable_user_tracking(bool): Enable user behavior trackingenable_performance_metrics(bool): Enable performance metricsenable_business_metrics(bool): Enable business metricsretention_days(int): Data retention period in daysprivacy_mode(bool): Enable privacy protectionsampling_rate(float): Event sampling rate (0.0-1.0)
Usage Examples
Basic Analytics Setup
from recoagent.analytics.core import AnalyticsEngine, AnalyticsConfig, EventTracker
# Create analytics configuration
config = AnalyticsConfig(
enable_user_tracking=True,
enable_performance_metrics=True,
enable_business_metrics=True,
retention_days=90,
privacy_mode=True,
sampling_rate=1.0
)
# Initialize analytics engine
analytics = AnalyticsEngine(
config=config,
storage_backend="postgresql",
enable_real_time=True
)
# Create event tracker
tracker = EventTracker(analytics_engine=analytics)
# Track user session
tracker.track_session_start(
user_id="user_123",
session_id="session_456",
user_segment="power_user",
properties={"source": "web", "device": "desktop"}
)
# Track query events
tracker.track_query_start(
user_id="user_123",
session_id="session_456",
query="machine learning algorithms",
properties={"query_length": 25, "timestamp": datetime.utcnow()}
)
tracker.track_query_complete(
user_id="user_123",
session_id="session_456",
response_time=2.5,
results_count=15,
properties={"satisfaction_score": 0.8}
)
Performance Metrics Collection
from recoagent.analytics.core import MetricsCollector
import asyncio
# Create metrics collector
collector = MetricsCollector(
analytics_engine=analytics,
collection_interval=60,
metrics_types=["performance", "usage", "quality"]
)
# Start background collection
async def collect_metrics():
collector.start_collection()
# Collect custom metrics
while True:
# Collect system performance
cpu_usage = get_cpu_usage()
memory_usage = get_memory_usage()
disk_usage = get_disk_usage()
collector.record_system_metrics({
"cpu_usage": cpu_usage,
"memory_usage": memory_usage,
"disk_usage": disk_usage,
"timestamp": datetime.utcnow()
})
# Collect RAG-specific metrics
rag_metrics = {
"avg_retrieval_time": get_avg_retrieval_time(),
"avg_generation_time": get_avg_generation_time(),
"total_queries": get_total_queries(),
"success_rate": get_success_rate()
}
collector.record_rag_metrics(rag_metrics)
await asyncio.sleep(60) # Collect every minute
# Run metrics collection
asyncio.create_task(collect_metrics())
User Behavior Analytics
from recoagent.analytics.core import UserBehaviorAnalyzer
# Create user behavior analyzer
behavior_analyzer = UserBehaviorAnalyzer(analytics_engine=analytics)
# Analyze user patterns
user_patterns = behavior_analyzer.analyze_user_patterns(
user_id="user_123",
time_range=timedelta(days=30)
)
print(f"User activity level: {user_patterns['activity_level']}")
print(f"Preferred query types: {user_patterns['preferred_query_types']}")
print(f"Peak usage hours: {user_patterns['peak_usage_hours']}")
# Segment users
user_segments = behavior_analyzer.segment_users()
print(f"Power users: {len(user_segments['power_users'])}")
print(f"Casual users: {len(user_segments['casual_users'])}")
print(f"New users: {len(user_segments['new_users'])}")
# Analyze query patterns
query_patterns = behavior_analyzer.analyze_query_patterns()
print(f"Most common queries: {query_patterns['common_queries']}")
print(f"Query complexity distribution: {query_patterns['complexity_distribution']}")
Business Intelligence Analytics
from recoagent.analytics.core import BusinessIntelligenceAnalyzer
# Create BI analyzer
bi_analyzer = BusinessIntelligenceAnalyzer(analytics_engine=analytics)
# Generate business reports
business_report = bi_analyzer.generate_business_report(
start_date=datetime(2024, 1, 1),
end_date=datetime(2024, 1, 31)
)
print("=== Business Intelligence Report ===")
print(f"Total active users: {business_report['total_active_users']}")
print(f"Total queries: {business_report['total_queries']}")
print(f"Average response time: {business_report['avg_response_time']:.2f}s")
print(f"User satisfaction: {business_report['user_satisfaction']:.2f}")
print(f"System uptime: {business_report['system_uptime']:.2f}%")
# Analyze user engagement
engagement_metrics = bi_analyzer.analyze_engagement()
print(f"Daily active users: {engagement_metrics['dau']}")
print(f"Monthly active users: {engagement_metrics['mau']}")
print(f"User retention rate: {engagement_metrics['retention_rate']:.2f}%")
# Cost analysis
cost_analysis = bi_analyzer.analyze_costs()
print(f"Total API costs: ${cost_analysis['api_costs']:.2f}")
print(f"Infrastructure costs: ${cost_analysis['infrastructure_costs']:.2f}")
print(f"Cost per query: ${cost_analysis['cost_per_query']:.4f}")
Real-time Analytics Dashboard
from recoagent.analytics.core import RealTimeDashboard
# Create real-time dashboard
dashboard = RealTimeDashboard(
analytics_engine=analytics,
update_interval=5 # Update every 5 seconds
)
# Start dashboard
async def run_dashboard():
await dashboard.start()
# Add custom metrics
dashboard.add_metric(
name="active_users",
query="SELECT COUNT(DISTINCT user_id) FROM events WHERE timestamp > NOW() - INTERVAL '1 hour'",
chart_type="gauge"
)
dashboard.add_metric(
name="query_volume",
query="SELECT COUNT(*) FROM events WHERE event_type = 'query_start' AND timestamp > NOW() - INTERVAL '1 hour'",
chart_type="line"
)
dashboard.add_metric(
name="avg_response_time",
query="SELECT AVG(response_time) FROM events WHERE event_type = 'query_complete' AND timestamp > NOW() - INTERVAL '1 hour'",
chart_type="bar"
)
# Run dashboard
asyncio.run(run_dashboard())
Custom Event Tracking
from recoagent.analytics.core import CustomEventTracker
# Create custom event tracker
custom_tracker = CustomEventTracker(analytics_engine=analytics)
# Track custom business events
custom_tracker.track_custom_event(
event_name="feature_adoption",
user_id="user_123",
properties={
"feature_name": "advanced_search",
"adoption_stage": "trial",
"usage_frequency": "daily",
"satisfaction_rating": 4
}
)
# Track conversion events
custom_tracker.track_conversion(
user_id="user_123",
conversion_type="premium_upgrade",
value=99.99,
properties={
"plan": "enterprise",
"upgrade_reason": "advanced_features",
"trial_duration_days": 14
}
)
# Track A/B test events
custom_tracker.track_ab_test(
user_id="user_123",
test_name="search_interface_v2",
variant="treatment",
properties={
"conversion": True,
"time_to_conversion": 300,
"interactions": 15
}
)
API Reference
AnalyticsEngine Methods
track_event(event_type: str, user_id: str, properties: Dict) -> None
Track a custom event
Parameters:
event_type(str): Type of eventuser_id(str): User identifierproperties(Dict): Event properties
get_metrics(time_range: timedelta) -> Dict
Get aggregated metrics for time range
Parameters:
time_range(timedelta): Time range for metrics
Returns: Dictionary with metrics
export_data(format: str, filters: Dict) -> str
Export analytics data
Parameters:
format(str): Export format ("json", "csv", "parquet")filters(Dict): Data filters
Returns: Exported data
EventTracker Methods
track_query_start(user_id: str, query: str, properties: Dict) -> None
Track query start event
Parameters:
user_id(str): User identifierquery(str): Search queryproperties(Dict): Additional properties
track_query_complete(user_id: str, response_time: float, results_count: int) -> None
Track query completion event
Parameters:
user_id(str): User identifierresponse_time(float): Response time in secondsresults_count(int): Number of results returned
track_user_feedback(user_id: str, rating: int, feedback: str) -> None
Track user feedback event
Parameters:
user_id(str): User identifierrating(int): User rating (1-5)feedback(str): User feedback text
MetricsCollector Methods
start_collection() -> None
Start background metrics collection
get_performance_metrics() -> Dict
Get performance metrics
Returns: Dictionary with performance metrics
get_usage_metrics() -> Dict
Get usage metrics
Returns: Dictionary with usage metrics
get_quality_metrics() -> Dict
Get quality metrics
Returns: Dictionary with quality metrics
See Also
- Analytics A/B Testing - A/B testing framework
- Analytics Dashboards - Analytics dashboards
- Observability Tracing - Distributed tracing
- Observability Metrics - Metrics collection