Вопрос проверяет понимание механизмов идемпотентности и дедупликации сообщений в системах очередей.
В распределенных системах сообщения могут быть доставлены несколько раз из-за сетевых сбоев, повторных отправок или особенностей брокера. Чтобы избежать повторной обработки, consumer должен реализовать механизм дедупликации. Основной подход — использование уникального идентификатора сообщения (message ID), который генерируется продюсером или брокером.
import redis
import json
r = redis.Redis()
def process_message(message):
msg_id = message['id']
# Проверяем, обрабатывался ли ID
if r.sismember('processed_ids', msg_id):
print(f"Duplicate message {msg_id}, skipping")
return
# Обрабатываем сообщение
print(f"Processing {msg_id}: {message['data']}")
# Сохраняем ID как обработанный
r.sadd('processed_ids', msg_id)
# Устанавливаем TTL для очистки старых ID
r.expire('processed_ids', 86400)
# Пример использования
msg = {'id': 'abc-123', 'data': 'order_created'}
process_message(msg)
process_message(msg) # Дубликат будет пропущенДедупликация сообщений критична для обеспечения надежности и корректности в системах с очередями. Использование уникальных ID и идемпотентной логики позволяет consumer безопасно обрабатывать повторные доставки, что особенно важно в финансовых, заказных и других транзакционных системах.