Вопрос проверяет понимание работы KafkaListener в Spring для асинхронного потребления сообщений из Kafka.
KafkaListener — это аннотация из Spring Kafka, которая позволяет методам в Spring-бинах асинхронно потреблять сообщения из Kafka. Она автоматически настраивает контейнер слушателя (KafkaMessageListenerContainer), который подписывается на указанные топики и вызывает аннотированный метод при получении каждого сообщения. Это избавляет разработчика от ручного управления потребителями и циклами опроса.
topics — массив топиков для прослушивания.groupId — идентификатор группы потребителей (если не указан, используется значение из конфигурации).containerFactory — ссылка на фабрику контейнеров для настройки десериализации и других параметров.@Service
public class KafkaConsumerService {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}В этом примере метод listen будет вызываться для каждого сообщения из топика my-topic в группе my-group. Сообщение автоматически десериализуется в строку.
Для работы KafkaListener необходимо настроить KafkaListenerContainerFactory в конфигурации Spring:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); // количество потоков
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
}Аннотация @EnableKafka включает поддержку KafkaListener в приложении.
KafkaListener упрощает интеграцию Kafka с Spring, позволяя сосредоточиться на бизнес-логике обработки сообщений, а не на инфраструктурных деталях. Это особенно полезно в микросервисных архитектурах, где требуется надежная асинхронная коммуникация.