Вопрос проверяет понимание построения ETL-пайплайнов в Apache Airflow, что необходимо для организации процессов обработки данных.
ETL-пайплайны в Apache Airflow реализуются через Directed Acyclic Graphs (DAG), которые представляют собой набор задач с определенными зависимостями. Каждая задача — это атомарная операция, например, извлечение данных из API, их трансформация или загрузка в хранилище. DAG описывает логику выполнения: какие задачи запускаются последовательно, а какие параллельно.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract():
# Извлечение данных из источника
return {'data': [1, 2, 3]}
def transform(**context):
data = context['ti'].xcom_pull(task_ids='extract')
# Трансформация данных
return [x * 2 for x in data['data']]
def load(**context):
data = context['ti'].xcom_pull(task_ids='transform')
# Загрузка в базу данных
print(f'Loading {data}')
with DAG(
'etl_pipeline',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily',
catchup=False
) as dag:
extract_task = PythonOperator(task_id='extract', python_callable=extract)
transform_task = PythonOperator(task_id='transform', python_callable=transform)
load_task = PythonOperator(task_id='load', python_callable=load)
extract_task >> transform_task >> load_taskВывод: Airflow идеально подходит для сложных ETL-процессов с множеством зависимостей, где требуется надежность, мониторинг и гибкость в управлении расписанием.