Skip to main content

Analytics Core

Core analytics engine providing foundational infrastructure for collecting, processing, and analyzing metrics from enterprise RAG systems.

Core Classes

AnalyticsEngine

Description: Main analytics engine for collecting and processing system metrics

Parameters:

  • config (AnalyticsConfig): Analytics configuration
  • storage_backend (str): Storage backend ("postgresql", "redis", "mongodb")
  • enable_real_time (bool): Enable real-time analytics (default: True)
  • batch_size (int): Batch size for processing (default: 1000)

Returns: AnalyticsEngine instance

Example:

from recoagent.analytics.core import AnalyticsEngine, AnalyticsConfig

# Create configuration
config = AnalyticsConfig(
enable_user_tracking=True,
enable_performance_metrics=True,
enable_business_metrics=True,
retention_days=90
)

# Create analytics engine
analytics = AnalyticsEngine(
config=config,
storage_backend="postgresql",
enable_real_time=True
)

# Track events
analytics.track_event(
event_type="query_start",
user_id="user_123",
session_id="session_456",
properties={"query": "machine learning", "timestamp": datetime.utcnow()}
)

EventTracker

Description: Event tracking system for user interactions and system events

Parameters:

  • analytics_engine (AnalyticsEngine): Analytics engine instance
  • enable_batching (bool): Enable event batching (default: True)
  • batch_interval (int): Batch interval in seconds (default: 5)

Returns: EventTracker instance

Example:

from recoagent.analytics.core import EventTracker

# Create event tracker
tracker = EventTracker(
analytics_engine=analytics,
enable_batching=True,
batch_interval=10
)

# Track user events
tracker.track_query_start(user_id="user_123", query="AI applications")
tracker.track_query_complete(user_id="user_123", response_time=1.5, results_count=10)
tracker.track_user_feedback(user_id="user_123", rating=5, feedback="Very helpful")

MetricsCollector

Description: Collects and aggregates system performance metrics

Parameters:

  • analytics_engine (AnalyticsEngine): Analytics engine instance
  • collection_interval (int): Collection interval in seconds (default: 60)
  • metrics_types (List[str]): Types of metrics to collect

Returns: MetricsCollector instance

Example:

from recoagent.analytics.core import MetricsCollector

# Create metrics collector
collector = MetricsCollector(
analytics_engine=analytics,
collection_interval=30,
metrics_types=["performance", "usage", "quality"]
)

# Start collecting metrics
collector.start_collection()

# Get collected metrics
performance_metrics = collector.get_performance_metrics()
usage_metrics = collector.get_usage_metrics()
quality_metrics = collector.get_quality_metrics()

AnalyticsConfig

Description: Configuration for analytics system

Fields:

  • enable_user_tracking (bool): Enable user behavior tracking
  • enable_performance_metrics (bool): Enable performance metrics
  • enable_business_metrics (bool): Enable business metrics
  • retention_days (int): Data retention period in days
  • privacy_mode (bool): Enable privacy protection
  • sampling_rate (float): Event sampling rate (0.0-1.0)

Usage Examples

Basic Analytics Setup

from recoagent.analytics.core import AnalyticsEngine, AnalyticsConfig, EventTracker

# Create analytics configuration
config = AnalyticsConfig(
enable_user_tracking=True,
enable_performance_metrics=True,
enable_business_metrics=True,
retention_days=90,
privacy_mode=True,
sampling_rate=1.0
)

# Initialize analytics engine
analytics = AnalyticsEngine(
config=config,
storage_backend="postgresql",
enable_real_time=True
)

# Create event tracker
tracker = EventTracker(analytics_engine=analytics)

# Track user session
tracker.track_session_start(
user_id="user_123",
session_id="session_456",
user_segment="power_user",
properties={"source": "web", "device": "desktop"}
)

# Track query events
tracker.track_query_start(
user_id="user_123",
session_id="session_456",
query="machine learning algorithms",
properties={"query_length": 25, "timestamp": datetime.utcnow()}
)

tracker.track_query_complete(
user_id="user_123",
session_id="session_456",
response_time=2.5,
results_count=15,
properties={"satisfaction_score": 0.8}
)

Performance Metrics Collection

from recoagent.analytics.core import MetricsCollector
import asyncio

# Create metrics collector
collector = MetricsCollector(
analytics_engine=analytics,
collection_interval=60,
metrics_types=["performance", "usage", "quality"]
)

# Start background collection
async def collect_metrics():
collector.start_collection()

# Collect custom metrics
while True:
# Collect system performance
cpu_usage = get_cpu_usage()
memory_usage = get_memory_usage()
disk_usage = get_disk_usage()

collector.record_system_metrics({
"cpu_usage": cpu_usage,
"memory_usage": memory_usage,
"disk_usage": disk_usage,
"timestamp": datetime.utcnow()
})

# Collect RAG-specific metrics
rag_metrics = {
"avg_retrieval_time": get_avg_retrieval_time(),
"avg_generation_time": get_avg_generation_time(),
"total_queries": get_total_queries(),
"success_rate": get_success_rate()
}

collector.record_rag_metrics(rag_metrics)

await asyncio.sleep(60) # Collect every minute

# Run metrics collection
asyncio.create_task(collect_metrics())

User Behavior Analytics

from recoagent.analytics.core import UserBehaviorAnalyzer

# Create user behavior analyzer
behavior_analyzer = UserBehaviorAnalyzer(analytics_engine=analytics)

# Analyze user patterns
user_patterns = behavior_analyzer.analyze_user_patterns(
user_id="user_123",
time_range=timedelta(days=30)
)

print(f"User activity level: {user_patterns['activity_level']}")
print(f"Preferred query types: {user_patterns['preferred_query_types']}")
print(f"Peak usage hours: {user_patterns['peak_usage_hours']}")

# Segment users
user_segments = behavior_analyzer.segment_users()
print(f"Power users: {len(user_segments['power_users'])}")
print(f"Casual users: {len(user_segments['casual_users'])}")
print(f"New users: {len(user_segments['new_users'])}")

# Analyze query patterns
query_patterns = behavior_analyzer.analyze_query_patterns()
print(f"Most common queries: {query_patterns['common_queries']}")
print(f"Query complexity distribution: {query_patterns['complexity_distribution']}")

Business Intelligence Analytics

from recoagent.analytics.core import BusinessIntelligenceAnalyzer

# Create BI analyzer
bi_analyzer = BusinessIntelligenceAnalyzer(analytics_engine=analytics)

# Generate business reports
business_report = bi_analyzer.generate_business_report(
start_date=datetime(2024, 1, 1),
end_date=datetime(2024, 1, 31)
)

print("=== Business Intelligence Report ===")
print(f"Total active users: {business_report['total_active_users']}")
print(f"Total queries: {business_report['total_queries']}")
print(f"Average response time: {business_report['avg_response_time']:.2f}s")
print(f"User satisfaction: {business_report['user_satisfaction']:.2f}")
print(f"System uptime: {business_report['system_uptime']:.2f}%")

# Analyze user engagement
engagement_metrics = bi_analyzer.analyze_engagement()
print(f"Daily active users: {engagement_metrics['dau']}")
print(f"Monthly active users: {engagement_metrics['mau']}")
print(f"User retention rate: {engagement_metrics['retention_rate']:.2f}%")

# Cost analysis
cost_analysis = bi_analyzer.analyze_costs()
print(f"Total API costs: ${cost_analysis['api_costs']:.2f}")
print(f"Infrastructure costs: ${cost_analysis['infrastructure_costs']:.2f}")
print(f"Cost per query: ${cost_analysis['cost_per_query']:.4f}")

Real-time Analytics Dashboard

from recoagent.analytics.core import RealTimeDashboard

# Create real-time dashboard
dashboard = RealTimeDashboard(
analytics_engine=analytics,
update_interval=5 # Update every 5 seconds
)

# Start dashboard
async def run_dashboard():
await dashboard.start()

# Add custom metrics
dashboard.add_metric(
name="active_users",
query="SELECT COUNT(DISTINCT user_id) FROM events WHERE timestamp > NOW() - INTERVAL '1 hour'",
chart_type="gauge"
)

dashboard.add_metric(
name="query_volume",
query="SELECT COUNT(*) FROM events WHERE event_type = 'query_start' AND timestamp > NOW() - INTERVAL '1 hour'",
chart_type="line"
)

dashboard.add_metric(
name="avg_response_time",
query="SELECT AVG(response_time) FROM events WHERE event_type = 'query_complete' AND timestamp > NOW() - INTERVAL '1 hour'",
chart_type="bar"
)

# Run dashboard
asyncio.run(run_dashboard())

Custom Event Tracking

from recoagent.analytics.core import CustomEventTracker

# Create custom event tracker
custom_tracker = CustomEventTracker(analytics_engine=analytics)

# Track custom business events
custom_tracker.track_custom_event(
event_name="feature_adoption",
user_id="user_123",
properties={
"feature_name": "advanced_search",
"adoption_stage": "trial",
"usage_frequency": "daily",
"satisfaction_rating": 4
}
)

# Track conversion events
custom_tracker.track_conversion(
user_id="user_123",
conversion_type="premium_upgrade",
value=99.99,
properties={
"plan": "enterprise",
"upgrade_reason": "advanced_features",
"trial_duration_days": 14
}
)

# Track A/B test events
custom_tracker.track_ab_test(
user_id="user_123",
test_name="search_interface_v2",
variant="treatment",
properties={
"conversion": True,
"time_to_conversion": 300,
"interactions": 15
}
)

API Reference

AnalyticsEngine Methods

track_event(event_type: str, user_id: str, properties: Dict) -> None

Track a custom event

Parameters:

  • event_type (str): Type of event
  • user_id (str): User identifier
  • properties (Dict): Event properties

get_metrics(time_range: timedelta) -> Dict

Get aggregated metrics for time range

Parameters:

  • time_range (timedelta): Time range for metrics

Returns: Dictionary with metrics

export_data(format: str, filters: Dict) -> str

Export analytics data

Parameters:

  • format (str): Export format ("json", "csv", "parquet")
  • filters (Dict): Data filters

Returns: Exported data

EventTracker Methods

track_query_start(user_id: str, query: str, properties: Dict) -> None

Track query start event

Parameters:

  • user_id (str): User identifier
  • query (str): Search query
  • properties (Dict): Additional properties

track_query_complete(user_id: str, response_time: float, results_count: int) -> None

Track query completion event

Parameters:

  • user_id (str): User identifier
  • response_time (float): Response time in seconds
  • results_count (int): Number of results returned

track_user_feedback(user_id: str, rating: int, feedback: str) -> None

Track user feedback event

Parameters:

  • user_id (str): User identifier
  • rating (int): User rating (1-5)
  • feedback (str): User feedback text

MetricsCollector Methods

start_collection() -> None

Start background metrics collection

get_performance_metrics() -> Dict

Get performance metrics

Returns: Dictionary with performance metrics

get_usage_metrics() -> Dict

Get usage metrics

Returns: Dictionary with usage metrics

get_quality_metrics() -> Dict

Get quality metrics

Returns: Dictionary with quality metrics

See Also