Этот вопрос проверяет глубокое понимание систем очередей сообщений, их архитектуры и практических аспектов использования в распределенных системах.
Архитектура очередей основана на системе обмена сообщениями между компонентами приложения. Несколько очередей и exchange нужны для разделения нагрузки и приоритетов задач. Например, отдельные очереди для срочных email, фоновой обработки изображений и отчетов. Exchange распределяют сообщения по правилам маршрутизации. Это позволяет обрабатывать задачи с разной скоростью, изолировать сбои и масштабировать компоненты независимо.
Создает и отправляет сообщения
Не знает о получателях напрямую
Обрабатывает сообщения из очередей
Может масштабироваться горизонтально
Принимает сообщения от producers
Определяет правила маршрутизации
Временное хранилище сообщений
Гарантирует доставку
// Высокоприоритетная очередь для срочных уведомлений
$channel->queue_declare('emails_high_priority', false, true, false, false);
// Низкоприоритетная очередь для массовых рассылок
$channel->queue_declare('emails_low_priority', false, true, false, false);Проблемы в обработке отчетов не влияют на отправку email
Каждая очередь может иметь собственные настройки retry policy
Маршрутизация по точному совпадению routing key
Использование: Точечная доставка конкретным обработчикам
// Producer отправляет в direct exchange
$channel->basic_publish(
$message,
'notifications_direct',
'user_registration'
);
// Consumer слушает конкретную очередь
$channel->queue_bind('email_service', 'notifications_direct', 'user_registration');Рассылка во все привязанные очереди
Использование: Широковещательные уведомления
// Отправка в fanout exchange - получат все подписчики
$channel->basic_publish(
$message,
'audit_log_fanout',
'' // routing key игнорируется
);
// Несколько сервисов получают одинаковые сообщения
$channel->queue_bind('analytics_queue', 'audit_log_fanout');
$channel->queue_bind('backup_queue', 'audit_log_fanout');Маршрутизация по шаблонам routing key
Использование: Сложные сценарии маршрутизации
// Сообщения маршрутизируются по шаблонам
$channel->basic_publish($message, 'events_topic', 'user.profile.updated');
$channel->basic_publish($message, 'events_topic', 'order.status.changed');
// Очереди подписываются на интересующие события
$channel->queue_bind('notification_queue', 'events_topic', 'user.*.updated');
$channel->queue_bind('analytics_queue', 'events_topic', '*.status.*');Exchange: "orders_topic"
|
+-- Queue: "payment_processing" (routing: "order.created")
| └-- Consumers: 3 воркера для обработки платежей
|
+-- Queue: "email_notifications" (routing: "order.*")
| └-- Consumers: 2 воркера для отправки email
|
+-- Queue: "inventory_updates" (routing: "order.created")
| └-- Consumers: 1 воркер для обновления склада
|
+-- Queue: "analytics_events" (routing: "order.#")
└-- Consumers: 1 воркер для аналитикиКаждый тип задач масштабируется независимо
Можно добавлять воркеров для медленных очередей
Сбой в одном обработчике не блокирует всю систему
Сообщения сохраняются до успешной обработки
Легко добавлять новые обработчики
Возможность изменять логику маршрутизации
class QueueManager {
private $channel;
public function __construct() {
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$this->channel = $connection->channel();
// Объявляем exchange
$this->channel->exchange_declare('application_events', 'topic', false, true, false);
// Создаем очереди для разных сервисов
$this->setupEmailQueue();
$this->setupAnalyticsQueue();
}
private function setupEmailQueue() {
$this->channel->queue_declare('email_queue', false, true, false, false);
$this->channel->queue_bind('email_queue', 'application_events', 'user.*');
}
}Использование нескольких очередей и exchange позволяет создавать гибкие, масштабируемые и отказоустойчивые системы. Разделение по типам задач и приоритетам обеспечивает оптимальное использование ресурсов и стабильность работы приложения.