Вопрос проверяет понимание принципов обработки потоковых данных с использованием Spark Structured Streaming.
Spark Structured Streaming обрабатывает данные микропакетами, используя ту же API, что и для пакетной обработки. Данные поступают из Kafka, Kinesis или файловой системы, обрабатываются через DataFrame API и выводятся в хранилища или дашборды. Гарантируется exactly-once семантика обработки.
Основные концепции:
Источники данных (Sources):
Apache Kafka (наиболее популярный вариант)
Amazon Kinesis
Файловые системы (HDFS, S3)
Сокеты (для тестирования)
Обработка (Processing):
Использование знакомого DataFrame/Dataset API
Окна агрегации (tumbling, sliding, session)
Водяные знаки (watermarks) для обработки задержанных данных
Приемники (Sinks):
Файловые системы (Parquet, JSON)
Базы данных (Cassandra, MySQL)
Консоль (для отладки)
Память (для стриминговых запросов)
Пример обработки данных из Kafka:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("StructuredStreamingExample") \
.getOrCreate()
# Чтение из Kafka
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "topic1") \
.load()
# Преобразование JSON-данных
from pyspark.sql.functions import from_json, col
json_schema = "name STRING, age INT"
parsed_df = df.select(
from_json(col("value").cast("string"), json_schema).alias("data")) \
.select("data.*")
# Агрегация по окнам
windowed_counts = parsed_df \
.groupBy(
window(col("timestamp"), "10 minutes"),
col("name")) \
.count()
# Запись в консоль (для отладки)
query = windowed_counts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()Типы окон в агрегации:
Tumbling windows (фиксированные):
Непересекающиеся интервалы (каждые 5 минут)
Пример: подсчет событий за каждые 10 минут
Sliding windows (скользящие):
Перекрывающиеся интервалы
Пример: подсчет за последние 10 минут, обновляемый каждую минуту
Session windows (сессии):
Динамические интервалы активности
Пример: сессия пользователя на сайте
Гарантии доставки:
Exactly-once: Каждое событие обрабатывается ровно один раз
Достигается через:
Транзакционные приемники
Чекпоинты (checkpointing)
Write-ahead logs
Когда использовать:
Обработка логов в реальном времени
Мониторинг IoT-устройств
Обнаружение аномалий
Обновление дашбордов в реальном времени