Skip to main content

Worker

Location: apps/worker/
Tech Stack: Celery, Redis/RabbitMQ
Purpose: Asynchronous task processing for scaling and batch operations

🎯 Overview​

The Worker app uses Celery for asynchronous task processing, enabling you to handle long-running operations, batch processing, and scheduled tasks without blocking API requests.

Use this when:

  • Processing large batches of documents
  • Long-running operations (>30 seconds)
  • Scheduled tasks (daily indexing, etc.)
  • Scaling horizontally
  • Decoupling API from heavy processing

⚑ Quick Start​

# Start Redis (message broker)
redis-server

# Start Celery worker
cd apps/worker
pip install -r requirements.txt
celery -A worker worker --loglevel=info

# In another terminal, send tasks
python send_task.py

πŸ—οΈ Architecture​

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ β”‚ β”‚ β”‚ β”‚ β”‚
β”‚ API/Client │─────▢│ Redis │─────▢│ Worker 1 β”‚
β”‚ β”‚ β”‚ (Broker) β”‚ β”‚ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
└───────────────▢│ Worker 2 β”‚
β”‚ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Worker N β”‚
β”‚ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ“‹ Available Tasks​

1. Document Indexing​

# Index large document batches
from worker import index_documents

# Send task
task = index_documents.delay(
documents=document_list,
collection="knowledge_base"
)

# Check status
print(task.state) # PENDING, STARTED, SUCCESS, FAILURE
result = task.get() # Blocks until complete

2. Batch Query Processing​

# Process multiple queries
from worker import process_batch_queries

task = process_batch_queries.delay(
queries=["What is RAG?", "How does hybrid search work?"],
output_file="results.json"
)

3. Scheduled Evaluation​

# Run evaluation on schedule
from worker import run_evaluation

# Every day at midnight
@celery_app.task
@periodic_task(crontab(hour=0, minute=0))
def daily_evaluation():
run_evaluation(
dataset="test_set",
metrics=["precision", "recall", "faithfulness"]
)

4. Document Refresh​

# Re-index updated documents
from worker import refresh_documents

task = refresh_documents.delay(
source="s3://docs-bucket/",
incremental=True
)

πŸ”§ Configuration​

Celery Config​

# apps/worker/celeryconfig.py
from celery import Celery
from celery.schedules import crontab

# Initialize Celery
celery_app = Celery(
'recoagent_worker',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)

# Configuration
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
task_track_started=True,
task_time_limit=3600, # 1 hour max
task_soft_time_limit=3300, # Soft limit
worker_prefetch_multiplier=1, # Tasks per worker
worker_max_tasks_per_child=100, # Restart after N tasks
)

# Periodic tasks
celery_app.conf.beat_schedule = {
'nightly-indexing': {
'task': 'worker.tasks.index_documents',
'schedule': crontab(hour=2, minute=0), # 2 AM daily
},
'hourly-evaluation': {
'task': 'worker.tasks.run_evaluation',
'schedule': crontab(minute=0), # Every hour
},
}

Environment Variables​

# .env
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/1

# Alternative: RabbitMQ
# CELERY_BROKER_URL=amqp://guest@localhost//

# Worker settings
CELERY_WORKER_CONCURRENCY=4
CELERY_WORKER_MAX_MEMORY_PER_CHILD=500000 # 500MB

# Task settings
CELERY_TASK_TIME_LIMIT=3600
CELERY_TASK_SOFT_TIME_LIMIT=3300

πŸ“ Task Examples​

Document Indexing Task​

# worker/tasks.py
from celery import Task
from recoagent import RecoAgent

@celery_app.task(bind=True, max_retries=3)
def index_documents(self, documents, collection):
try:
agent = RecoAgent()

# Index in batches
for i in range(0, len(documents), 100):
batch = documents[i:i+100]
agent.add_documents(batch, collection=collection)

# Update progress
self.update_state(
state='PROGRESS',
meta={'current': i, 'total': len(documents)}
)

return {'status': 'success', 'indexed': len(documents)}

except Exception as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

Query Processing Task​

@celery_app.task
def process_query(question, user_id):
agent = RecoAgent()
response = agent.ask(question)

# Save result
save_result(user_id, question, response)

# Send notification
notify_user(user_id, "Your query is ready!")

return response.answer

Batch Processing Task​

from celery import group, chain, chord

@celery_app.task
def process_batch(queries):
# Process all queries in parallel
job = group(process_query.s(q) for q in queries)
result = job.apply_async()
return result.get()

🎯 Task Patterns​

Chain (Sequential)​

# Execute tasks in sequence
from celery import chain

workflow = chain(
index_documents.s(docs, "kb"),
run_evaluation.s("kb"),
send_report.s()
)
workflow.apply_async()

Group (Parallel)​

# Execute tasks in parallel
from celery import group

job = group(
process_query.s("Query 1"),
process_query.s("Query 2"),
process_query.s("Query 3")
)
result = job.apply_async()

Chord (Parallel + Callback)​

# Process in parallel, then aggregate
from celery import chord

workflow = chord(
[process_query.s(q) for q in queries]
)(aggregate_results.s())

πŸ“Š Monitoring​

Flower Dashboard​

# Install Flower
pip install flower

# Start monitoring dashboard
celery -A worker flower

# Open: http://localhost:5555

Flower Features:

  • Real-time task monitoring
  • Worker management
  • Task history
  • Statistics and graphs
  • Task retry/revoke

Task Status Checking​

# Check task status
task = index_documents.delay(docs)

# Polling
while not task.ready():
print(f"Status: {task.state}")
if task.state == 'PROGRESS':
print(f"Progress: {task.info}")
time.sleep(1)

# Get result
result = task.get()
print(f"Result: {result}")

πŸš€ Scaling Workers​

Multiple Workers​

# Start multiple workers
celery -A worker worker --concurrency=4 --loglevel=info &
celery -A worker worker --concurrency=4 --loglevel=info &
celery -A worker worker --concurrency=4 --loglevel=info &

Dedicated Queue Workers​

# Worker for high-priority tasks
celery -A worker worker -Q high_priority --concurrency=8

# Worker for batch processing
celery -A worker worker -Q batch --concurrency=2

# Worker for scheduled tasks
celery -A worker worker -Q scheduled --concurrency=4

Docker Scaling​

# docker-compose.yml
services:
redis:
image: redis:latest

worker:
build: .
command: celery -A worker worker --loglevel=info
depends_on:
- redis
deploy:
replicas: 5 # 5 workers

πŸ”” Scheduled Tasks​

Periodic Beat Scheduler​

# Start beat scheduler
celery -A worker beat --loglevel=info

# In production, run beat and worker separately
celery -A worker beat --loglevel=info &
celery -A worker worker --loglevel=info &

Example Schedules​

# Every day at 2 AM
'schedule': crontab(hour=2, minute=0)

# Every hour
'schedule': crontab(minute=0)

# Every Monday at 9 AM
'schedule': crontab(day_of_week=1, hour=9, minute=0)

# Every 15 minutes
'schedule': crontab(minute='*/15')

πŸ†˜ Troubleshooting​

IssueSolution
Tasks not executingCheck Redis is running: redis-cli ping
Worker crashesCheck logs, increase memory limits
Tasks timeoutIncrease task_time_limit
Tasks stuckCheck worker is running, inspect with Flower
Memory leaksSet worker_max_tasks_per_child

Scale infinitely! Add more workers as you grow! βš™οΈ