Вопрос проверяет понимание внутреннего устройства сенсоров и умение расширять Airflow под свои задачи.
Собственный сенсор создаётся путём наследования от BaseSensorOperator. В нём реализуется метод poke, который проверяет условие. Airflow вызывает этот метод с заданным интервалом. Для асинхронной версии используется deferrable-модель и триггеры. Такой подход позволяет реализовать ожидание любых внешних условий.
Иногда стандартных сенсоров недостаточно, и тогда требуется написать свой. Airflow предоставляет для этого понятный и расширяемый механизм.
Custom sensor — это пользовательский сенсор, реализованный через наследование от базового класса Airflow.
Самый простой вариант — синхронный сенсор.
Нужно:
унаследоваться от BaseSensorOperator;
реализовать метод poke.
Пример упрощённой реализации:
from airflow.sensors.base import BaseSensorOperator
class MySensor(BaseSensorOperator):
def poke(self, context):
# проверить условие
return check_condition()
Если poke вернул:
True → сенсор завершён;
False → Airflow подождёт poke_interval и попробует снова.
Для долгих ожиданий лучше использовать асинхронную модель.
Deferrable-сенсор:
проверяет условие;
если не готово — вызывает defer;
передаёт управление trigger-у.
def execute(self, context):
if not condition():
self.defer(trigger=MyTrigger())
Trigger:
асинхронно ждёт событие;
возвращает управление сенсору.
Имеет смысл, если:
нет готового сенсора;
логика проверки специфична;
важно оптимизировать ресурсы.
всегда задавайте timeout;
логируйте проверки;
тестируйте сенсор отдельно от DAG.
Собственные сенсоры позволяют адаптировать Airflow под любые внешние условия. Для коротких ожиданий подойдут blocking-сенсоры, для долгих — deferrable-модель с триггерами.