Streaming Handler
The Streaming Handler provides unified streaming support across all LLM providers with Server-Sent Events (SSE) and WebSocket formats.
Features
- Multi-Format Support: SSE and WebSocket streaming
- Provider Agnostic: Works with any LLM provider
- Real-time Metrics: Performance and cost tracking
- Error Handling: Graceful error recovery
- Connection Management: Automatic connection handling
Quick Start
from packages.llm import StreamingHandler, StreamFormat, LiteLLMProvider, LiteLLMConfig
# Configure LLM provider
config = LiteLLMConfig(
model="gpt-4",
enable_streaming=True
)
llm_provider = LiteLLMProvider(config)
# Create streaming handler
handler = StreamingHandler(llm_provider, StreamFormat.SSE)
# Stream response
async for chunk in handler.stream_sse(messages):
print(chunk)
Streaming Formats
Server-Sent Events (SSE)
from packages.llm import StreamingHandler, StreamFormat
# Create SSE handler
handler = StreamingHandler(llm_provider, StreamFormat.SSE)
# Stream as SSE
async def stream_response():
async for chunk in handler.stream_sse(messages):
yield chunk
# Use in FastAPI
from fastapi.responses import StreamingResponse
from sse_starlette import EventSourceResponse
@app.post("/stream")
async def stream_endpoint():
return EventSourceResponse(
stream_response(),
media_type="text/event-stream"
)
WebSocket Streaming
from fastapi import WebSocket
from packages.llm import StreamingHandler, StreamFormat
# Create WebSocket handler
handler = StreamingHandler(llm_provider, StreamFormat.WEBSOCKET)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# Stream over WebSocket
await handler.stream_websocket(websocket, messages)
Message Formats
SSE Format
data: {"content": "Hello", "metadata": {}, "is_final": false}
data: {"content": " world", "metadata": {}, "is_final": false}
data: {"content": "", "metadata": {"final_response": "Hello world"}, "is_final": true}
WebSocket Format
{"type": "chunk", "content": "Hello"}
{"type": "chunk", "content": " world"}
{"type": "complete", "content": "Hello world", "metadata": {}}
Error Handling
# SSE with error handling
async def stream_with_error_handling():
try:
async for chunk in handler.stream_sse(messages):
yield chunk
except Exception as e:
error_data = {
"content": "",
"metadata": {},
"is_final": True,
"error": str(e)
}
yield f"data: {json.dumps(error_data)}\n\n"
# WebSocket with error handling
async def websocket_with_error_handling(websocket: WebSocket):
try:
await handler.stream_websocket(websocket, messages)
except Exception as e:
await websocket.send_text(json.dumps({
"type": "error",
"content": str(e)
}))
Advanced Configuration
Custom Headers
# SSE with custom headers
return EventSourceResponse(
stream_response(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Cache-Control"
}
)
Timeout Configuration
# Configure timeout
config = LiteLLMConfig(
model="gpt-4",
stream_timeout=120, # 2 minutes
enable_streaming=True
)
Integration Examples
FastAPI Integration
from fastapi import FastAPI, WebSocket
from fastapi.responses import StreamingResponse
from sse_starlette import EventSourceResponse
from packages.llm import StreamingHandler, LiteLLMProvider, LiteLLMConfig
app = FastAPI()
@app.post("/query/stream")
async def query_stream(request: QueryRequest):
# Configure LLM
config = LiteLLMConfig(
model="gpt-4",
enable_streaming=True
)
llm_provider = LiteLLMProvider(config)
handler = StreamingHandler(llm_provider, StreamFormat.SSE)
# Convert query to messages
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": request.query}
]
# Stream response
async def generate_stream():
async for chunk in handler.stream_sse(messages):
yield chunk
return EventSourceResponse(
generate_stream(),
media_type="text/event-stream"
)
@app.websocket("/query/ws")
async def query_websocket(websocket: WebSocket):
await websocket.accept()
# Configure LLM
config = LiteLLMConfig(
model="gpt-4",
enable_streaming=True
)
llm_provider = LiteLLMProvider(config)
handler = StreamingHandler(llm_provider, StreamFormat.WEBSOCKET)
while True:
# Receive message
data = await websocket.receive_text()
message_data = json.loads(data)
query = message_data.get("query", "")
if not query:
await websocket.send_text(json.dumps({
"type": "error",
"content": "No query provided"
}))
continue
# Convert to messages
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": query}
]
# Stream response
await handler.stream_websocket(websocket, messages)
Client-Side Usage
SSE Client
// JavaScript SSE client
const eventSource = new EventSource('/query/stream');
eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);
if (data.is_final) {
console.log('Stream complete:', data.metadata.final_response);
eventSource.close();
} else {
console.log('Chunk:', data.content);
// Append to UI
document.getElementById('response').innerHTML += data.content;
}
};
eventSource.onerror = function(event) {
console.error('Stream error:', event);
eventSource.close();
};
WebSocket Client
// JavaScript WebSocket client
const ws = new WebSocket('ws://localhost:8000/query/ws');
ws.onopen = function() {
// Send query
ws.send(JSON.stringify({
query: "Hello, how are you?"
}));
};
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
switch(data.type) {
case 'chunk':
console.log('Chunk:', data.content);
// Append to UI
document.getElementById('response').innerHTML += data.content;
break;
case 'complete':
console.log('Complete:', data.content);
break;
case 'error':
console.error('Error:', data.content);
break;
}
};
ws.onerror = function(error) {
console.error('WebSocket error:', error);
};
Performance Considerations
Connection Limits
# Configure connection limits
config = LiteLLMConfig(
model="gpt-4",
max_concurrent_streams=10, # Limit concurrent streams
enable_streaming=True
)
Memory Management
# Stream with memory management
async def stream_with_cleanup():
try:
async for chunk in handler.stream_sse(messages):
yield chunk
finally:
# Cleanup resources
await handler.cleanup()
Best Practices
- Use Appropriate Format: SSE for simple streaming, WebSocket for bidirectional
- Handle Errors: Always implement error handling
- Monitor Performance: Track streaming metrics
- Cleanup Resources: Properly close connections
- Test Thoroughly: Test with various network conditions
API Reference
StreamingHandler
| Parameter | Type | Description |
|---|---|---|
llm_provider | LiteLLMProvider | LLM provider instance |
format | StreamFormat | Streaming format (SSE/WEBSOCKET) |
Methods
| Method | Description |
|---|---|
stream_sse(messages) | Stream as Server-Sent Events |
stream_websocket(websocket, messages) | Stream over WebSocket |
cleanup() | Cleanup resources |
StreamFormat
| Value | Description |
|---|---|
StreamFormat.SSE | Server-Sent Events |
StreamFormat.WEBSOCKET | WebSocket |
Troubleshooting
Common Issues
- Connection Drops: Check network stability and timeout settings
- Memory Leaks: Ensure proper cleanup of resources
- Performance Issues: Monitor concurrent connections and optimize
- Error Handling: Implement comprehensive error handling
Debug Mode
# Enable debug logging
import logging
logging.getLogger('packages.llm.streaming_handler').setLevel(logging.DEBUG)