Skip to main content

Streaming Handler

The Streaming Handler provides unified streaming support across all LLM providers with Server-Sent Events (SSE) and WebSocket formats.

Features

  • Multi-Format Support: SSE and WebSocket streaming
  • Provider Agnostic: Works with any LLM provider
  • Real-time Metrics: Performance and cost tracking
  • Error Handling: Graceful error recovery
  • Connection Management: Automatic connection handling

Quick Start

from packages.llm import StreamingHandler, StreamFormat, LiteLLMProvider, LiteLLMConfig

# Configure LLM provider
config = LiteLLMConfig(
model="gpt-4",
enable_streaming=True
)
llm_provider = LiteLLMProvider(config)

# Create streaming handler
handler = StreamingHandler(llm_provider, StreamFormat.SSE)

# Stream response
async for chunk in handler.stream_sse(messages):
print(chunk)

Streaming Formats

Server-Sent Events (SSE)

from packages.llm import StreamingHandler, StreamFormat

# Create SSE handler
handler = StreamingHandler(llm_provider, StreamFormat.SSE)

# Stream as SSE
async def stream_response():
async for chunk in handler.stream_sse(messages):
yield chunk

# Use in FastAPI
from fastapi.responses import StreamingResponse
from sse_starlette import EventSourceResponse

@app.post("/stream")
async def stream_endpoint():
return EventSourceResponse(
stream_response(),
media_type="text/event-stream"
)

WebSocket Streaming

from fastapi import WebSocket
from packages.llm import StreamingHandler, StreamFormat

# Create WebSocket handler
handler = StreamingHandler(llm_provider, StreamFormat.WEBSOCKET)

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()

# Stream over WebSocket
await handler.stream_websocket(websocket, messages)

Message Formats

SSE Format

data: {"content": "Hello", "metadata": {}, "is_final": false}

data: {"content": " world", "metadata": {}, "is_final": false}

data: {"content": "", "metadata": {"final_response": "Hello world"}, "is_final": true}

WebSocket Format

{"type": "chunk", "content": "Hello"}
{"type": "chunk", "content": " world"}
{"type": "complete", "content": "Hello world", "metadata": {}}

Error Handling

# SSE with error handling
async def stream_with_error_handling():
try:
async for chunk in handler.stream_sse(messages):
yield chunk
except Exception as e:
error_data = {
"content": "",
"metadata": {},
"is_final": True,
"error": str(e)
}
yield f"data: {json.dumps(error_data)}\n\n"

# WebSocket with error handling
async def websocket_with_error_handling(websocket: WebSocket):
try:
await handler.stream_websocket(websocket, messages)
except Exception as e:
await websocket.send_text(json.dumps({
"type": "error",
"content": str(e)
}))

Advanced Configuration

Custom Headers

# SSE with custom headers
return EventSourceResponse(
stream_response(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Headers": "Cache-Control"
}
)

Timeout Configuration

# Configure timeout
config = LiteLLMConfig(
model="gpt-4",
stream_timeout=120, # 2 minutes
enable_streaming=True
)

Integration Examples

FastAPI Integration

from fastapi import FastAPI, WebSocket
from fastapi.responses import StreamingResponse
from sse_starlette import EventSourceResponse
from packages.llm import StreamingHandler, LiteLLMProvider, LiteLLMConfig

app = FastAPI()

@app.post("/query/stream")
async def query_stream(request: QueryRequest):
# Configure LLM
config = LiteLLMConfig(
model="gpt-4",
enable_streaming=True
)
llm_provider = LiteLLMProvider(config)
handler = StreamingHandler(llm_provider, StreamFormat.SSE)

# Convert query to messages
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": request.query}
]

# Stream response
async def generate_stream():
async for chunk in handler.stream_sse(messages):
yield chunk

return EventSourceResponse(
generate_stream(),
media_type="text/event-stream"
)

@app.websocket("/query/ws")
async def query_websocket(websocket: WebSocket):
await websocket.accept()

# Configure LLM
config = LiteLLMConfig(
model="gpt-4",
enable_streaming=True
)
llm_provider = LiteLLMProvider(config)
handler = StreamingHandler(llm_provider, StreamFormat.WEBSOCKET)

while True:
# Receive message
data = await websocket.receive_text()
message_data = json.loads(data)
query = message_data.get("query", "")

if not query:
await websocket.send_text(json.dumps({
"type": "error",
"content": "No query provided"
}))
continue

# Convert to messages
messages = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": query}
]

# Stream response
await handler.stream_websocket(websocket, messages)

Client-Side Usage

SSE Client

// JavaScript SSE client
const eventSource = new EventSource('/query/stream');

eventSource.onmessage = function(event) {
const data = JSON.parse(event.data);

if (data.is_final) {
console.log('Stream complete:', data.metadata.final_response);
eventSource.close();
} else {
console.log('Chunk:', data.content);
// Append to UI
document.getElementById('response').innerHTML += data.content;
}
};

eventSource.onerror = function(event) {
console.error('Stream error:', event);
eventSource.close();
};

WebSocket Client

// JavaScript WebSocket client
const ws = new WebSocket('ws://localhost:8000/query/ws');

ws.onopen = function() {
// Send query
ws.send(JSON.stringify({
query: "Hello, how are you?"
}));
};

ws.onmessage = function(event) {
const data = JSON.parse(event.data);

switch(data.type) {
case 'chunk':
console.log('Chunk:', data.content);
// Append to UI
document.getElementById('response').innerHTML += data.content;
break;
case 'complete':
console.log('Complete:', data.content);
break;
case 'error':
console.error('Error:', data.content);
break;
}
};

ws.onerror = function(error) {
console.error('WebSocket error:', error);
};

Performance Considerations

Connection Limits

# Configure connection limits
config = LiteLLMConfig(
model="gpt-4",
max_concurrent_streams=10, # Limit concurrent streams
enable_streaming=True
)

Memory Management

# Stream with memory management
async def stream_with_cleanup():
try:
async for chunk in handler.stream_sse(messages):
yield chunk
finally:
# Cleanup resources
await handler.cleanup()

Best Practices

  1. Use Appropriate Format: SSE for simple streaming, WebSocket for bidirectional
  2. Handle Errors: Always implement error handling
  3. Monitor Performance: Track streaming metrics
  4. Cleanup Resources: Properly close connections
  5. Test Thoroughly: Test with various network conditions

API Reference

StreamingHandler

ParameterTypeDescription
llm_providerLiteLLMProviderLLM provider instance
formatStreamFormatStreaming format (SSE/WEBSOCKET)

Methods

MethodDescription
stream_sse(messages)Stream as Server-Sent Events
stream_websocket(websocket, messages)Stream over WebSocket
cleanup()Cleanup resources

StreamFormat

ValueDescription
StreamFormat.SSEServer-Sent Events
StreamFormat.WEBSOCKETWebSocket

Troubleshooting

Common Issues

  1. Connection Drops: Check network stability and timeout settings
  2. Memory Leaks: Ensure proper cleanup of resources
  3. Performance Issues: Monitor concurrent connections and optimize
  4. Error Handling: Implement comprehensive error handling

Debug Mode

# Enable debug logging
import logging
logging.getLogger('packages.llm.streaming_handler').setLevel(logging.DEBUG)