Вопрос проверяет понимание использования Kafka как брокера сообщений для асинхронной синхронизации данных между Redis и Postgres.
Kafka используется как система обмена сообщениями (event streaming platform), которая обеспечивает надежную и асинхронную передачу данных между сервисами. В контексте синхронизации Redis (кэш) и Postgres (основная база данных) Kafka позволяет поддерживать актуальность кэша без прямой связи между этими системами.
Когда в Postgres происходят изменения (вставка, обновление, удаление), приложение-продюсер отправляет сообщение в определенный топик Kafka. Это сообщение содержит информацию об изменении (например, тип операции и идентификатор записи). Затем приложение-консюмер, подписанное на этот топик, получает сообщение и обновляет соответствующие данные в Redis. Таким образом, Redis всегда содержит актуальные данные, а Postgres остается источником истины.
// Продюсер (отправка события при изменении в Postgres)
const { Kafka } = require('kafkajs');
const kafka = new Kafka({ clientId: 'my-app', brokers: ['localhost:9092'] });
const producer = kafka.producer();
async function sendUpdate(userId, newData) {
await producer.connect();
await producer.send({
topic: 'user-updates',
messages: [{ value: JSON.stringify({ userId, ...newData }) }],
});
await producer.disconnect();
}
// Консюмер (обновление Redis)
const consumer = kafka.consumer({ groupId: 'redis-sync-group' });
async function run() {
await consumer.connect();
await consumer.subscribe({ topic: 'user-updates', fromBeginning: true });
await consumer.run({
eachMessage: async ({ message }) => {
const { userId, ...data } = JSON.parse(message.value.toString());
await redisClient.set(`user:${userId}`, JSON.stringify(data));
},
});
}
Вывод: Использование Kafka для синхронизации Redis и Postgres оправдано в системах с высокой нагрузкой, где важна производительность чтения (кэш) и надежность хранения (база данных). Это позволяет разделить ответственность между сервисами и обеспечить согласованность данных в конечном счете.