Worker
Location: apps/worker/
Tech Stack: Celery, Redis/RabbitMQ
Purpose: Asynchronous task processing for scaling and batch operations
π― Overviewβ
The Worker app uses Celery for asynchronous task processing, enabling you to handle long-running operations, batch processing, and scheduled tasks without blocking API requests.
Use this when:
- Processing large batches of documents
- Long-running operations (>30 seconds)
- Scheduled tasks (daily indexing, etc.)
- Scaling horizontally
- Decoupling API from heavy processing
β‘ Quick Startβ
# Start Redis (message broker)
redis-server
# Start Celery worker
cd apps/worker
pip install -r requirements.txt
celery -A worker worker --loglevel=info
# In another terminal, send tasks
python send_task.py
ποΈ Architectureβ
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β β β β β β
β API/Client βββββββΆβ Redis βββββββΆβ Worker 1 β
β β β (Broker) β β β
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β
β ββββββββββββββββ
βββββββββββββββββΆβ Worker 2 β
β β
ββββββββββββββββ
β
ββββββββββββββββ
β Worker N β
β β
ββββββββββββββββ
π Available Tasksβ
1. Document Indexingβ
# Index large document batches
from worker import index_documents
# Send task
task = index_documents.delay(
documents=document_list,
collection="knowledge_base"
)
# Check status
print(task.state) # PENDING, STARTED, SUCCESS, FAILURE
result = task.get() # Blocks until complete
2. Batch Query Processingβ
# Process multiple queries
from worker import process_batch_queries
task = process_batch_queries.delay(
queries=["What is RAG?", "How does hybrid search work?"],
output_file="results.json"
)
3. Scheduled Evaluationβ
# Run evaluation on schedule
from worker import run_evaluation
# Every day at midnight
@celery_app.task
@periodic_task(crontab(hour=0, minute=0))
def daily_evaluation():
run_evaluation(
dataset="test_set",
metrics=["precision", "recall", "faithfulness"]
)
4. Document Refreshβ
# Re-index updated documents
from worker import refresh_documents
task = refresh_documents.delay(
source="s3://docs-bucket/",
incremental=True
)
π§ Configurationβ
Celery Configβ
# apps/worker/celeryconfig.py
from celery import Celery
from celery.schedules import crontab
# Initialize Celery
celery_app = Celery(
'recoagent_worker',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
# Configuration
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
task_track_started=True,
task_time_limit=3600, # 1 hour max
task_soft_time_limit=3300, # Soft limit
worker_prefetch_multiplier=1, # Tasks per worker
worker_max_tasks_per_child=100, # Restart after N tasks
)
# Periodic tasks
celery_app.conf.beat_schedule = {
'nightly-indexing': {
'task': 'worker.tasks.index_documents',
'schedule': crontab(hour=2, minute=0), # 2 AM daily
},
'hourly-evaluation': {
'task': 'worker.tasks.run_evaluation',
'schedule': crontab(minute=0), # Every hour
},
}
Environment Variablesβ
# .env
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/1
# Alternative: RabbitMQ
# CELERY_BROKER_URL=amqp://guest@localhost//
# Worker settings
CELERY_WORKER_CONCURRENCY=4
CELERY_WORKER_MAX_MEMORY_PER_CHILD=500000 # 500MB
# Task settings
CELERY_TASK_TIME_LIMIT=3600
CELERY_TASK_SOFT_TIME_LIMIT=3300
π Task Examplesβ
Document Indexing Taskβ
# worker/tasks.py
from celery import Task
from recoagent import RecoAgent
@celery_app.task(bind=True, max_retries=3)
def index_documents(self, documents, collection):
try:
agent = RecoAgent()
# Index in batches
for i in range(0, len(documents), 100):
batch = documents[i:i+100]
agent.add_documents(batch, collection=collection)
# Update progress
self.update_state(
state='PROGRESS',
meta={'current': i, 'total': len(documents)}
)
return {'status': 'success', 'indexed': len(documents)}
except Exception as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
Query Processing Taskβ
@celery_app.task
def process_query(question, user_id):
agent = RecoAgent()
response = agent.ask(question)
# Save result
save_result(user_id, question, response)
# Send notification
notify_user(user_id, "Your query is ready!")
return response.answer
Batch Processing Taskβ
from celery import group, chain, chord
@celery_app.task
def process_batch(queries):
# Process all queries in parallel
job = group(process_query.s(q) for q in queries)
result = job.apply_async()
return result.get()
π― Task Patternsβ
Chain (Sequential)β
# Execute tasks in sequence
from celery import chain
workflow = chain(
index_documents.s(docs, "kb"),
run_evaluation.s("kb"),
send_report.s()
)
workflow.apply_async()
Group (Parallel)β
# Execute tasks in parallel
from celery import group
job = group(
process_query.s("Query 1"),
process_query.s("Query 2"),
process_query.s("Query 3")
)
result = job.apply_async()
Chord (Parallel + Callback)β
# Process in parallel, then aggregate
from celery import chord
workflow = chord(
[process_query.s(q) for q in queries]
)(aggregate_results.s())
π Monitoringβ
Flower Dashboardβ
# Install Flower
pip install flower
# Start monitoring dashboard
celery -A worker flower
# Open: http://localhost:5555
Flower Features:
- Real-time task monitoring
- Worker management
- Task history
- Statistics and graphs
- Task retry/revoke
Task Status Checkingβ
# Check task status
task = index_documents.delay(docs)
# Polling
while not task.ready():
print(f"Status: {task.state}")
if task.state == 'PROGRESS':
print(f"Progress: {task.info}")
time.sleep(1)
# Get result
result = task.get()
print(f"Result: {result}")
π Scaling Workersβ
Multiple Workersβ
# Start multiple workers
celery -A worker worker --concurrency=4 --loglevel=info &
celery -A worker worker --concurrency=4 --loglevel=info &
celery -A worker worker --concurrency=4 --loglevel=info &
Dedicated Queue Workersβ
# Worker for high-priority tasks
celery -A worker worker -Q high_priority --concurrency=8
# Worker for batch processing
celery -A worker worker -Q batch --concurrency=2
# Worker for scheduled tasks
celery -A worker worker -Q scheduled --concurrency=4
Docker Scalingβ
# docker-compose.yml
services:
redis:
image: redis:latest
worker:
build: .
command: celery -A worker worker --loglevel=info
depends_on:
- redis
deploy:
replicas: 5 # 5 workers
π Scheduled Tasksβ
Periodic Beat Schedulerβ
# Start beat scheduler
celery -A worker beat --loglevel=info
# In production, run beat and worker separately
celery -A worker beat --loglevel=info &
celery -A worker worker --loglevel=info &
Example Schedulesβ
# Every day at 2 AM
'schedule': crontab(hour=2, minute=0)
# Every hour
'schedule': crontab(minute=0)
# Every Monday at 9 AM
'schedule': crontab(day_of_week=1, hour=9, minute=0)
# Every 15 minutes
'schedule': crontab(minute='*/15')
π Troubleshootingβ
Issue | Solution |
---|---|
Tasks not executing | Check Redis is running: redis-cli ping |
Worker crashes | Check logs, increase memory limits |
Tasks timeout | Increase task_time_limit |
Tasks stuck | Check worker is running, inspect with Flower |
Memory leaks | Set worker_max_tasks_per_child |
π Related Docsβ
- Celery Official Docs
- Production API - API that sends tasks to workers
- Deploy to Production
Scale infinitely! Add more workers as you grow! βοΈ