Airflow —How to Generate DAG’s Dynamically
Introduction
Apache Airflow is an open-source workflow management platform. Airflow is written in Python, and workflows are created via Python scripts. Airflow is designed under the principle of “configuration as code”. While other “configuration as code” workflow platforms exist using markup languages like XML, using Python allows developers to import libraries and classes to help them create their workflows.
Problem statement
While working on Orchestration over Airflow, I came across an requirement to scaleup Airflow pipeline for hundreds of different categories. Orchestration of each pipelines are exactly same. I have 2 options to solve the problem as given below :-
- Create multiple copies of same code with different DAG name and different configuration values.
- Next option is to create DAG’s dynamically at run time using different DAG name, config values and parameters.
First approach requires high maintenance, in terms of handling high number of code, higher efforts requires if we need to change any logic within pipelines. Hence Dynamically generation of Airflow DAG’s is the solution which will generate DAG’s at runtime from a single code and it can create ’N’ of DAGs at runtime.
Pipeline configurations
Import the required libraries, basically beam, and its configuration options:
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
For the creation of Dynamic DAGs you need to create a list which will be input for the number of DAGs.
list = ['Item1', 'Item2', 'Items3']
Next, we need to create a FOR loop that will loop the process and create ’N’ numbers of DAGs, equal to number of items given in list given above.
# build a dag for each number in the list
for idx, list in enumerate(list):
dag_id = 'DAG_{}'.format(str(list))
default_args = {'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': False,
'retries': 1,
'retry_delay': datetime.timedelta(hours=5)
}
schedule = '@daily'
dag_number = idx
globals()[dag_id] = create_dag(dag_id,
schedule,
dag_number,
default_args)
Next, we need to define ‘create_dag’ function which is called in FOR loop given above.
def create_dag(dag_id,
schedule,
dag_number,
default_args):
Next, we need to Capture name for which DAG is created and it can be utilized to map different Config values from Config files.
category = dag_id.split("_")[1]
Next, we need to define dag definition.
dag = DAG(dag_id,
schedule_interval=schedule,
default_args=default_args)
Next, we need to create different task as per our requirements
def hello_world_py(*args):
print(category)
print('This is DAG: {}'.format(str(dag_number)))with dag:
start_op = DummyOperator(task_id="Start")
t1 = PythonOperator(
task_id='hello_world',
python_callable=hello_world_py,
dag_number=dag_number)
end_op = DummyOperator(task_id="I-am-done") start_op >> t1 >> end_op
return dag
After saving file we will be able to see 3 DAG’s created on Airflow UI as given below:-
Different tasks created are:-
Logs of ‘hello_world’ represent Items name for which DAG is created. It can be further utilized for upgradation.
Over all Code is given below :-
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperatordef create_dag(dag_id,
schedule,
dag_number,
default_args):category = dag_id.split("_")[1]
def hello_world_py(*args):
print(category)
print('This is DAG: {}'.format(str(dag_number)))dag = DAG(dag_id,
schedule_interval=schedule,
default_args=default_args)with dag:
start_op = DummyOperator(task_id="Start")
t1 = PythonOperator(
task_id='hello_world',
python_callable=hello_world_py,
dag_number=dag_number)
end_op = DummyOperator(task_id="I-am-done")start_op >> t1 >> end_op
return daglist = ['Item1', 'Item2', 'Items3']# build a dag for each element in the list
for idx, list in enumerate(list):
dag_id = 'DAG_{}'.format(str(list))default_args = {'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': False,
'retries': 1,
'retry_delay': datetime.timedelta(hours=5)
}
schedule = '@daily'
dag_number = idx
globals()[dag_id] = create_dag(dag_id,
schedule,
dag_number,
default_args)
Summary
We have learnt how we can dynamically create ’N’ DAGs at run time dynamically. It also has a lot of scope to capitalize as per our requirements over different Tasks.