Вопрос проверяет понимание того, как вручную реализовать систему очередей и воркеров, без использования готовых фреймворков вроде Celery.
Обработку задач можно реализовать напрямую через клиент RabbitMQ, например с помощью библиотеки pika. API выступает как producer и публикует сообщения в очередь, а отдельный процесс-воркер работает как consumer и обрабатывает задачи. Нужно вручную реализовать подтверждения сообщений, обработку ошибок и ретраи. Такой подход проще по зависимостям, но требует больше инфраструктурного кода.
Celery — это обертка над брокером сообщений, но вся базовая логика может быть реализована напрямую.
Определение: Producer — сервис, отправляющий сообщения в очередь.
Определение: Consumer — сервис, который читает сообщения и обрабатывает их.
Минимальная схема:
API публикует сообщение в очередь.
RabbitMQ хранит сообщения.
Воркер забирает сообщения и выполняет задачи.
Упрощенный пример публикации:
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue="tasks")
channel.basic_publish(
exchange="",
routing_key="tasks",
body=json.dumps({"task": "send_email", "user_id": 10})
)
def callback(ch, method, properties, body):
# обработка задачи
print(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue="tasks", on_message_callback=callback)
channel.start_consuming()
Важно:
всегда отправлять ack,
обрабатывать ошибки,
логировать задачи.
Если не используется Celery, придется самостоятельно решить:
Ретраи задач
DLQ
Мониторинг
Ограничение параллелизма
Планирование задач
Подход без Celery имеет смысл:
при небольшой системе,
когда нужен полный контроль,
когда Celery слишком тяжелый.
RabbitMQ можно использовать напрямую через Python-клиенты, но тогда ответственность за ретраи, мониторинг и надежность ложится на разработчика.