Skip to main content

Token Optimization

This example demonstrates the complete token optimization system including context compression, response caching, cost monitoring, optimization recommendations, and cost modeling.

Overview​

The token optimization system provides:

  • Context Compression - Reduce token usage while preserving information
  • Response Caching - Cache responses to avoid redundant LLM calls
  • Cost Monitoring - Track and analyze token usage costs
  • Optimization Recommendations - Automated suggestions for cost reduction
  • Cost Modeling - Predict costs and ROI of optimization strategies
  • Interactive Dashboard - Real-time cost and performance monitoring

Prerequisites​

  • RecoAgent installed and configured
  • Token optimization packages available
  • OpenAI API key (or other LLM provider)
  • Python 3.8+ with required dependencies

Basic Setup​

1. Initialize Token Optimization System​

from packages.rag.token_optimization import (
TokenOptimizer, TokenOptimizationConfig, CompressionStrategy,
PruningStrategy, OptimizationLevel, create_token_optimizer
)
from packages.rag.optimization_recommender import (
OptimizationRecommender, create_optimization_recommender
)
from packages.rag.cost_modeling import (
CostModelingSystem, create_cost_modeling_system
)
from packages.rag.cost_dashboard import (
CostAnalyticsDashboard, DashboardConfig, create_cost_dashboard
)

class TokenOptimizationDemo:
"""Comprehensive demonstration of token optimization capabilities."""

def __init__(self):
# Initialize token optimizer
self.optimizer = create_token_optimizer(
config=TokenOptimizationConfig(
compression_strategy=CompressionStrategy.HYBRID,
pruning_strategy=PruningStrategy.RELEVANCE_BASED,
optimization_level=OptimizationLevel.BALANCED,
enable_caching=True,
cache_ttl=3600,
max_context_tokens=4000,
target_compression_ratio=0.7
)
)

# Initialize cost modeling system
self.cost_system = create_cost_modeling_system(
provider="openai",
model="gpt-4",
cost_per_token=0.00003
)

# Initialize optimization recommender
self.recommender = create_optimization_recommender(
cost_system=self.cost_system
)

# Initialize cost dashboard
self.dashboard = create_cost_dashboard(
config=DashboardConfig(
refresh_interval=30,
enable_real_time=True
)
)

2. Configure Optimization Settings​

def configure_optimization_settings(self):
"""Configure optimization settings for different use cases."""
print("āš™ļø Configuring optimization settings...")

# Configure for different optimization levels
optimization_configs = {
'maximum_savings': TokenOptimizationConfig(
compression_strategy=CompressionStrategy.AGGRESSIVE,
pruning_strategy=PruningStrategy.RELEVANCE_BASED,
optimization_level=OptimizationLevel.MAXIMUM_SAVINGS,
target_compression_ratio=0.5,
enable_caching=True,
cache_ttl=7200
),
'balanced': TokenOptimizationConfig(
compression_strategy=CompressionStrategy.HYBRID,
pruning_strategy=PruningStrategy.RELEVANCE_BASED,
optimization_level=OptimizationLevel.BALANCED,
target_compression_ratio=0.7,
enable_caching=True,
cache_ttl=3600
),
'quality_focused': TokenOptimizationConfig(
compression_strategy=CompressionStrategy.CONSERVATIVE,
pruning_strategy=PruningStrategy.MINIMAL,
optimization_level=OptimizationLevel.QUALITY_FOCUSED,
target_compression_ratio=0.9,
enable_caching=True,
cache_ttl=1800
)
}

for name, config in optimization_configs.items():
print(f"āœ… Configured {name} optimization strategy")

return optimization_configs

Context Compression​

Demonstrate Compression Strategies​

async def demonstrate_compression(self):
"""Demonstrate different compression strategies."""
print("\nšŸ—œļø Context Compression Demo")
print("=" * 40)

# Sample context that needs compression
long_context = """
RecoAgent is an enterprise RAG platform built with LangGraph and LangChain.
It provides comprehensive retrieval-augmented generation capabilities for
enterprise applications. The platform includes hybrid retrieval combining
BM25 and vector search, built-in evaluation with RAGAS metrics, safety
guardrails using NVIDIA NeMo Guardrails, and support for multiple vector
stores including OpenSearch, Azure AI Search, and Vertex AI. The system
also includes LangSmith integration for observability and monitoring,
comprehensive analytics and business intelligence features, and advanced
security measures including input sanitization and prompt injection prevention.
"""

print(f"Original context length: {len(long_context)} characters")

# Test different compression strategies
strategies = [
CompressionStrategy.AGGRESSIVE,
CompressionStrategy.HYBRID,
CompressionStrategy.CONSERVATIVE
]

for strategy in strategies:
print(f"\n{strategy.value.title()} Compression:")

# Compress context
compressed = await self.optimizer.compress_context(
context=long_context,
strategy=strategy,
target_ratio=0.7
)

print(f" Original tokens: {compressed.original_tokens}")
print(f" Compressed tokens: {compressed.compressed_tokens}")
print(f" Compression ratio: {compressed.compression_ratio:.2f}")
print(f" Quality score: {compressed.quality_score:.2f}")
print(f" Compressed text: {compressed.compressed_text[:100]}...")

return compressed

Intelligent Pruning​

async def demonstrate_pruning(self):
"""Demonstrate intelligent context pruning."""
print("\nāœ‚ļø Intelligent Pruning Demo")
print("=" * 40)

# Sample context with multiple sections
context_sections = [
"Introduction: RecoAgent is a comprehensive RAG platform...",
"Features: The platform includes hybrid retrieval, evaluation metrics...",
"Architecture: The system follows a modular design with three main packages...",
"Installation: To install RecoAgent, use pip install recoagent...",
"Configuration: RecoAgent can be configured through environment variables...",
"Use Cases: The platform is suitable for enterprise knowledge management..."
]

query = "How do I install RecoAgent?"

print(f"Query: {query}")
print(f"Original context sections: {len(context_sections)}")

# Prune context based on relevance
pruned_context = await self.optimizer.prune_context(
context_sections=context_sections,
query=query,
strategy=PruningStrategy.RELEVANCE_BASED,
max_sections=3
)

print(f"\nPruned context:")
print(f" Sections kept: {len(pruned_context.kept_sections)}")
print(f" Sections removed: {len(pruned_context.removed_sections)}")
print(f" Relevance scores: {pruned_context.relevance_scores}")
print(f" Token reduction: {pruned_context.token_reduction:.1%}")

print(f"\nKept sections:")
for i, section in enumerate(pruned_context.kept_sections):
print(f" {i+1}. {section[:50]}...")

return pruned_context

Response Caching​

Implement Smart Caching​

async def demonstrate_caching(self):
"""Demonstrate intelligent response caching."""
print("\nšŸ’¾ Response Caching Demo")
print("=" * 40)

# Sample queries for caching demonstration
queries = [
"What is RecoAgent?",
"How does hybrid search work?",
"What is RecoAgent?", # Duplicate query
"What evaluation metrics are available?",
"How does hybrid search work?", # Another duplicate
"What vector stores are supported?"
]

cache_hits = 0
cache_misses = 0

for i, query in enumerate(queries):
print(f"\nQuery {i+1}: {query}")

# Check cache first
cached_response = await self.optimizer.get_cached_response(query)

if cached_response:
cache_hits += 1
print(f" āœ… Cache hit! Response: {cached_response['response'][:50]}...")
print(f" šŸ’° Cost saved: ${cached_response['cost_saved']:.4f}")
else:
cache_misses += 1
print(f" āŒ Cache miss - generating new response")

# Simulate response generation
response = f"Response to: {query}"
cost = 0.001 # Simulated cost

# Cache the response
await self.optimizer.cache_response(
query=query,
response=response,
cost=cost,
ttl=3600
)

print(f"\nCaching Statistics:")
print(f" Cache hits: {cache_hits}")
print(f" Cache misses: {cache_misses}")
print(f" Hit rate: {cache_hits / len(queries):.1%}")

return {
'cache_hits': cache_hits,
'cache_misses': cache_misses,
'hit_rate': cache_hits / len(queries)
}

Cost Monitoring​

Track Token Usage​

async def track_token_usage(self):
"""Track and analyze token usage patterns."""
print("\nšŸ’° Token Usage Tracking")
print("=" * 40)

# Simulate token usage data
usage_data = [
{'timestamp': datetime.now() - timedelta(hours=2), 'tokens': 1500, 'cost': 0.045},
{'timestamp': datetime.now() - timedelta(hours=1), 'tokens': 2300, 'cost': 0.069},
{'timestamp': datetime.now() - timedelta(minutes=30), 'tokens': 1800, 'cost': 0.054},
{'timestamp': datetime.now() - timedelta(minutes=15), 'tokens': 1200, 'cost': 0.036},
{'timestamp': datetime.now(), 'tokens': 2000, 'cost': 0.060}
]

# Track usage
for data in usage_data:
await self.cost_system.track_usage(
timestamp=data['timestamp'],
tokens_used=data['tokens'],
cost=data['cost'],
model="gpt-4",
user_id="demo_user"
)

# Get usage analytics
analytics = await self.cost_system.get_usage_analytics(
time_range=timedelta(hours=24)
)

print("Usage Analytics (24 hours):")
print(f" Total tokens used: {analytics['total_tokens']:,}")
print(f" Total cost: ${analytics['total_cost']:.2f}")
print(f" Average tokens per request: {analytics['avg_tokens_per_request']:.0f}")
print(f" Peak usage: {analytics['peak_tokens']} tokens at {analytics['peak_time']}")
print(f" Cost per token: ${analytics['cost_per_token']:.6f}")

# Get cost trends
trends = await self.cost_system.get_cost_trends(
time_range=timedelta(hours=24),
granularity="hourly"
)

print(f"\nCost Trends:")
for trend in trends:
print(f" {trend['timestamp']}: ${trend['cost']:.2f} ({trend['tokens']} tokens)")

return analytics

Cost Prediction​

async def predict_costs(self):
"""Predict future costs based on usage patterns."""
print("\nšŸ”® Cost Prediction")
print("=" * 40)

# Predict costs for different scenarios
scenarios = [
{
'name': 'Current Usage',
'daily_queries': 1000,
'avg_tokens_per_query': 2000
},
{
'name': 'Increased Usage',
'daily_queries': 2000,
'avg_tokens_per_query': 2000
},
{
'name': 'Optimized Usage',
'daily_queries': 1000,
'avg_tokens_per_query': 1400 # 30% reduction
}
]

for scenario in scenarios:
prediction = await self.cost_system.predict_costs(
daily_queries=scenario['daily_queries'],
avg_tokens_per_query=scenario['avg_tokens_per_query'],
time_horizon=30 # 30 days
)

print(f"\n{scenario['name']} Scenario:")
print(f" Daily cost: ${prediction['daily_cost']:.2f}")
print(f" Monthly cost: ${prediction['monthly_cost']:.2f}")
print(f" Annual cost: ${prediction['annual_cost']:.2f}")
print(f" Total tokens (monthly): {prediction['monthly_tokens']:,}")

return predictions

Optimization Recommendations​

Get Automated Recommendations​

async def get_optimization_recommendations(self):
"""Get automated optimization recommendations."""
print("\nšŸ’” Optimization Recommendations")
print("=" * 40)

# Analyze current usage patterns
usage_patterns = await self.recommender.analyze_usage_patterns(
time_range=timedelta(days=7)
)

print("Usage Pattern Analysis:")
print(f" Average query length: {usage_patterns['avg_query_length']} tokens")
print(f" Context utilization: {usage_patterns['context_utilization']:.1%}")
print(f" Cache hit rate: {usage_patterns['cache_hit_rate']:.1%}")
print(f" Duplicate queries: {usage_patterns['duplicate_queries']:.1%}")

# Get recommendations
recommendations = await self.recommender.get_recommendations(
usage_patterns=usage_patterns,
optimization_goal="cost_reduction"
)

print(f"\nOptimization Recommendations:")
for i, rec in enumerate(recommendations, 1):
print(f" {i}. {rec['title']}")
print(f" Impact: {rec['impact']}")
print(f" Effort: {rec['effort']}")
print(f" Expected savings: {rec['expected_savings']:.1%}")
print(f" Description: {rec['description']}")

# Calculate potential savings
total_savings = sum(rec['expected_savings'] for rec in recommendations)
print(f"\nTotal potential savings: {total_savings:.1%}")

return recommendations

Implement Recommendations​

async def implement_recommendations(self, recommendations):
"""Implement optimization recommendations."""
print("\nšŸš€ Implementing Recommendations")
print("=" * 40)

implementation_results = []

for rec in recommendations:
print(f"\nImplementing: {rec['title']}")

# Simulate implementation
result = await self.recommender.implement_recommendation(
recommendation=rec,
dry_run=False
)

print(f" Status: {result['status']}")
print(f" Actual savings: {result['actual_savings']:.1%}")
print(f" Implementation time: {result['implementation_time']:.1f} minutes")

implementation_results.append(result)

# Calculate total impact
total_actual_savings = sum(r['actual_savings'] for r in implementation_results)
print(f"\nTotal actual savings: {total_actual_savings:.1%}")

return implementation_results

Interactive Dashboard​

Create Cost Dashboard​

async def create_cost_dashboard(self):
"""Create interactive cost monitoring dashboard."""
print("\nšŸ“Š Cost Dashboard")
print("=" * 40)

# Configure dashboard
dashboard_config = {
'title': 'Token Optimization Dashboard',
'refresh_interval': 30,
'widgets': [
{
'type': 'cost_overview',
'title': 'Cost Overview',
'time_range': '24h'
},
{
'type': 'token_usage',
'title': 'Token Usage Trends',
'time_range': '7d'
},
{
'type': 'optimization_impact',
'title': 'Optimization Impact',
'time_range': '30d'
},
{
'type': 'cache_performance',
'title': 'Cache Performance',
'time_range': '24h'
},
{
'type': 'recommendations',
'title': 'Active Recommendations',
'time_range': 'realtime'
}
]
}

# Create dashboard
dashboard_url = await self.dashboard.create_dashboard(dashboard_config)
print(f"āœ… Dashboard created: {dashboard_url}")

# Get current metrics
metrics = await self.dashboard.get_current_metrics()
print(f"\nCurrent Metrics:")
print(f" Today's cost: ${metrics['todays_cost']:.2f}")
print(f" Tokens used today: {metrics['todays_tokens']:,}")
print(f" Cache hit rate: {metrics['cache_hit_rate']:.1%}")
print(f" Optimization savings: {metrics['optimization_savings']:.1%}")

return dashboard_url

Complete Example​

Here's a complete working example:

#!/usr/bin/env python3
"""
Complete Token Optimization Example
"""

import asyncio
from datetime import datetime, timedelta

async def main():
"""Main demo function."""
print("šŸ’° Comprehensive Token Optimization Demo")
print("=" * 50)

# Initialize optimization demo
demo = TokenOptimizationDemo()

# Configure optimization settings
demo.configure_optimization_settings()

# Demonstrate compression
await demo.demonstrate_compression()

# Demonstrate pruning
await demo.demonstrate_pruning()

# Demonstrate caching
caching_stats = await demo.demonstrate_caching()

# Track token usage
usage_analytics = await demo.track_token_usage()

# Predict costs
cost_predictions = await demo.predict_costs()

# Get recommendations
recommendations = await demo.get_optimization_recommendations()

# Implement recommendations
implementation_results = await demo.implement_recommendations(recommendations)

# Create dashboard
dashboard_url = await demo.create_cost_dashboard()

print(f"\nšŸŽ‰ Token Optimization Demo Complete!")
print(f"šŸ“Š Dashboard available at: {dashboard_url}")
print(f"šŸ’¾ Cache hit rate: {caching_stats['hit_rate']:.1%}")
print(f"šŸ’° Total potential savings: {sum(r['actual_savings'] for r in implementation_results):.1%}")

if __name__ == "__main__":
asyncio.run(main())

Best Practices​

1. Compression Strategy​

  • Use aggressive compression for cost-sensitive applications
  • Use conservative compression for quality-critical use cases
  • Monitor quality scores to ensure information preservation

2. Caching Implementation​

  • Set appropriate TTL values based on content freshness needs
  • Use semantic similarity for cache key matching
  • Monitor cache hit rates and adjust strategies

3. Cost Monitoring​

  • Set up alerts for unusual cost spikes
  • Track cost trends and patterns
  • Regular cost optimization reviews

4. Recommendation Implementation​

  • Prioritize high-impact, low-effort recommendations
  • Test recommendations in staging environments
  • Monitor actual vs. predicted savings

Troubleshooting​

Common Issues​

High Compression Loss

# Adjust compression settings
config = TokenOptimizationConfig(
target_compression_ratio=0.8, # Less aggressive
optimization_level=OptimizationLevel.QUALITY_FOCUSED
)

Low Cache Hit Rate

# Improve cache key generation
config = TokenOptimizationConfig(
enable_semantic_caching=True,
cache_ttl=7200, # Longer TTL
similarity_threshold=0.85
)

Inaccurate Cost Predictions

# Improve prediction model
cost_system = CostModelingSystem(
enable_ml_predictions=True,
training_data_size=10000,
model_update_frequency="weekly"
)

Next Steps​

  1. šŸ“Š Cost Monitoring - Advanced cost tracking setup
  2. šŸ”§ Optimization Configuration - Production optimization
  3. šŸ“ˆ Performance Analysis - Detailed cost analysis
  4. šŸŽÆ ROI Measurement - Optimization ROI tracking

Summary​

This example demonstrated:

  • āœ… Context Compression - Multiple compression strategies
  • āœ… Intelligent Pruning - Relevance-based context reduction
  • āœ… Response Caching - Smart caching for cost reduction
  • āœ… Cost Monitoring - Comprehensive usage tracking
  • āœ… Cost Prediction - Future cost forecasting
  • āœ… Optimization Recommendations - Automated suggestions
  • āœ… Interactive Dashboard - Real-time monitoring interface

You now have a complete token optimization system that significantly reduces costs while maintaining response quality!