Apache Kafka
Apache Kafka is a distributed streaming platform capable of handling trillions of events a day. It enables you to publish and subscribe to streams of records, store records in a fault-tolerant way, and process them as they occur. Kafka is widely used for stream processing, log aggregation, and real-time analytics.
Celery
Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation but supports scheduling as well. The execution units, called tasks, are executed concurrently on one or more worker nodes using multiprocessing, Eventlet, or Gevent.
Setup and Requirements
Before diving into the implementation, ensure you have a running Kafka cluster and a Redis or RabbitMQ service for Celery. You'll also need Python installed on your system. This guide assumes basic knowledge of Python and familiarity with command-line tools.
Step 1. Installing Dependencies
Install the necessary Python libraries for Kafka and Celery:
pip install confluent-kafka celery redis
Step 2. Creating a Kafka Producer
A Kafka producer sends messages to Kafka topics. The following Python script demonstrates a simple producer sending a message to a specific topic:
from confluent_kafka import Producer
config = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(**config)
def delivery_report(err, msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()}')
topic = 'test_topic'
message = 'Hello, Kafka!'
producer.produce(topic, message.encode('utf-8'), callback=delivery_report)
producer.flush()
Step 3. Setting Up Celery
Define your Celery app and tasks in a celery_app.py file. The task will process messages consumed from Kafka:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def process_message(message):
print(f"Processing message: {message}")
# Insert message processing logic here
Step 4. Creating a Kafka Consumer to Offload Tasks to Celery
The Kafka consumer script consumes messages from the Kafka topic and uses Celery to process them asynchronously:
from confluent_kafka import Consumer
from celery_app import process_message
config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my_group',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(**config)
consumer.subscribe(['test_topic'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
process_message.delay(msg.value().decode('utf-8'))
finally:
consumer.close()
Step 5. Running the System
- Start the Kafka cluster and ensure the topic test_topic exists.
- Run the Celery worker: Navigate to your project directory and start a Celery worker:
celery -A celery_app worker --loglevel=info
- Run the Kafka producer script to send messages to Kafka.
- Run the Kafka consumer script; it consumes messages from Kafka and offloads them to Celery for processing.
Conclusion
Integrating Kafka with Celery provides a powerful architecture for processing data streams asynchronously. Kafka handles the high-throughput, distributed messaging, while Celery allows for flexible, distributed task processing. This setup is ideal for applications requiring real-time data processing and analytics, scalable microservices, and asynchronous task execution in distributed systems.