Этот вопрос проверяет понимание проблемы “плохих” сообщений (poison message) и способов изоляции ошибок, чтобы очередь не блокировалась.
Если одно сообщение постоянно вызывает падение воркера, оно может бесконечно переотправляться и блокировать очередь. В таких случаях вводят лимит повторных попыток и отправляют сообщение в DLQ после превышения порога. Также важно оборачивать обработку в try/except, чтобы процесс не падал целиком. Дополнительно используют изоляцию воркеров и отдельные очереди для проблемных задач.
Проблема “плохого” сообщения возникает, когда входные данные или логика обработки приводят к исключению, и сообщение повторно доставляется снова и снова.
Если используется prefetch=1 или малое число воркеров:
сообщение доставляется
обработка падает
сообщение возвращается в очередь
тот же воркер снова получает его
В итоге прогресс не происходит.
Обычно применяют счетчик попыток:
сохраняют число ретраев в заголовке сообщения
после N попыток отправляют в DLQ
Сообщения, которые не удалось обработать:
не блокируют основную очередь
могут анализироваться отдельно
Подходы:
обработка в отдельном процессе
worker pool, где падение одного процесса не влияет на другие
Обработчик должен перехватывать исключения:
def on_message(ch, method, properties, body):
try:
process(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
# отправка в DLQ или retry
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
Валидация входных данных до тяжелой обработки
Таймауты на внешние вызовы
Идемпотентная обработка
Вывод
Если одно сообщение падает постоянно, его нужно изолировать: ограничить ретраи, отправить в DLQ и не позволять ему блокировать очередь.