How to setup Airflow Sensor’s mode as Reschedule
Introduction
Airflow Sensors are a special kind of operator that are designed to wait for something to happen. When sensors run, they check to see if a certain condition is met before they are marked successful and let their downstream tasks execute. It makes DAGs as event driven. With Airflow Sensor’s mode as ‘Reschedule’, it makes use of attached resources effectively.
Sensors Overview
Sensors are a type of operator that checks if the supplied condition is satisfied on a particular interval. As soon as the condition is met, then the task is marked successful and the DAG will move on to next tasks. If the condition hasn’t been met, the sensor waits for another interval before checking again.
All Sensors inherit following parameters:-
- timeout: The maximum amount of time in seconds that the sensor should check the condition for. If the condition has not been met within the specified time, then the Sensor task will fail.
- soft_fail: If this parameter is set to True, then the task is marked as skipped if the condition specified is not met by the timeout.
- poke_interval: When using poke mode, this is the time in seconds that the sensor waits before checking the condition again. The default is 30 seconds.
- exponential_backoff: If this parameter is set to True, then it will set exponentially longer wait times between pokes in poke mode.
- mode: How the sensor operates. There are two types of modes:
poke: This is the default mode. When using poke, the sensor occupies a worker slot for the entire execution time and sleeps between pokes. This mode is best if we expect a short runtime for the sensor.
reschedule: When using this mode, if the criteria is not met then the sensor releases its worker slot and reschedules the next check for a later time. This mode is best if you expect a long runtime for the sensor, because it is less resource intensive and frees up workers for other tasks.
Apart from above parameters list, different types of sensors have different implementation setup.
Example Implementations
Some of the common used Airflow sensors are S3KeySensor, dateTimeSensor, ExternalTaskSensor, HttpSensor, SQLSensor, PythonSensor, out of which i am sharing example of a ExternalTaskSensor.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensordefault_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 10, 6),
}dag = DAG('Child_dag', default_args=default_args, schedule_interval='0 13 * * *')# Use ExternalTaskSensor to listen to the Parent_dag and cook_dinner task
# when cook_dinner is finished, Child_dag will be triggered
wait_for_dinner = ExternalTaskSensor(
task_id='wait_for_DCG_pipeline',
external_dag_id='Parent_dag',
external_task_id= None,
start_date=datetime(2021, 10, 6),
timeout=10,
mode="reschedule",
poke_interval=60,
execution_delta=timedelta(minutes=5)
)have_dinner = DummyOperator(
task_id='have_dinner',
trigger_rule='all_success',
dag=dag,
)wait_for_dinner >> have_dinner
DAG flow.
This DAG is waiting for Upstream DAG to complete the scheduled run successfully. In general, the ExternalTaskSensor checks for upstream DAG status. The ExternalTaskSensor task in this example (waiting_for_partner) checks the upstream DAG status every 60 seconds (the poke_interval) until the upstream DAG gets SUCCESS. The mode is set to reschedule, meaning between each 60 second interval the task will not take a worker slot. The timeout is set to 6 minutes, so if the upstream DAG status doesn’t arrive SUCCESS by that time, the task will fail. Once the ExternalTaskSensor SUCCESS criteria is met, the DAG moves on to the downstream tasks.
If you like content, please clap for it.
Thank you!!