Airflow —How to Generate DAG’s Dynamically

Introduction

Airflow Dynamic DAGs

Apache Airflow is an open-source workflow management platform. Airflow is written in Python, and workflows are created via Python scripts. Airflow is designed under the principle of “configuration as code”. While other “configuration as code” workflow platforms exist using markup languages like XML, using Python allows developers to import libraries and classes to help them create their workflows.

Problem statement

While working on Orchestration over Airflow, I came across an requirement to scaleup Airflow pipeline for hundreds of different categories. Orchestration of each pipelines are exactly same. I have 2 options to solve the problem as given below :-

  1. Create multiple copies of same code with different DAG name and different configuration values.
  2. Next option is to create DAG’s dynamically at run time using different DAG name, config values and parameters.

First approach requires high maintenance, in terms of handling high number of code, higher efforts requires if we need to change any logic within pipelines. Hence Dynamically generation of Airflow DAG’s is the solution which will generate DAG’s at runtime from a single code and it can create ’N’ of DAGs at runtime.

Pipeline configurations

Import the required libraries, basically beam, and its configuration options:

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

For the creation of Dynamic DAGs you need to create a list which will be input for the number of DAGs.

list = ['Item1', 'Item2', 'Items3']

Next, we need to create a FOR loop that will loop the process and create ’N’ numbers of DAGs, equal to number of items given in list given above.

# build a dag for each number in the list
for idx, list in enumerate(list):
dag_id = 'DAG_{}'.format(str(list))
default_args = {'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': False,
'retries': 1,
'retry_delay': datetime.timedelta(hours=5)
}
schedule = '@daily'
dag_number = idx
globals()[dag_id] = create_dag(dag_id,
schedule,
dag_number,
default_args)

Next, we need to define ‘create_dag’ function which is called in FOR loop given above.

def create_dag(dag_id,
schedule,
dag_number,
default_args):

Next, we need to Capture name for which DAG is created and it can be utilized to map different Config values from Config files.

category = dag_id.split("_")[1]

Next, we need to define dag definition.

dag = DAG(dag_id,
schedule_interval=schedule,
default_args=default_args)

Next, we need to create different task as per our requirements

def hello_world_py(*args):
print(category)
print('This is DAG: {}'.format(str(dag_number)))
with dag:
start_op = DummyOperator(task_id="Start")

t1 = PythonOperator(
task_id='hello_world',
python_callable=hello_world_py,
dag_number=dag_number)

end_op = DummyOperator(task_id="I-am-done")
start_op >> t1 >> end_op

return dag

After saving file we will be able to see 3 DAG’s created on Airflow UI as given below:-

Dynamic DAGs Created.

Different tasks created are:-

Tasks

Logs of ‘hello_world’ represent Items name for which DAG is created. It can be further utilized for upgradation.

Logs of ‘hello_world’ task printed item name.

Over all Code is given below :-

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
def create_dag(dag_id,
schedule,
dag_number,
default_args):
category = dag_id.split("_")[1]

def hello_world_py(*args):
print(category)
print('This is DAG: {}'.format(str(dag_number)))
dag = DAG(dag_id,
schedule_interval=schedule,
default_args=default_args)
with dag:
start_op = DummyOperator(task_id="Start")

t1 = PythonOperator(
task_id='hello_world',
python_callable=hello_world_py,
dag_number=dag_number)

end_op = DummyOperator(task_id="I-am-done")
start_op >> t1 >> end_op
return dag
list = ['Item1', 'Item2', 'Items3']# build a dag for each element in the list
for idx, list in enumerate(list):
dag_id = 'DAG_{}'.format(str(list))
default_args = {'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1),
'depends_on_past': False,
'retries': 1,
'retry_delay': datetime.timedelta(hours=5)
}
schedule = '@daily'
dag_number = idx
globals()[dag_id] = create_dag(dag_id,
schedule,
dag_number,
default_args)

Summary

We have learnt how we can dynamically create ’N’ DAGs at run time dynamically. It also has a lot of scope to capitalize as per our requirements over different Tasks.

--

--

--

Hi, I am a Certified Google Cloud Data engineer. I use Medium platform to share my experience with other members of Medium network.

Love podcasts or audiobooks? Learn on the go with our new app.

How We Build Product At Bench.

How To Run Docker Compose with Testcontainers

Building a Simple Grade Calculator using R

“ What You Need to Know to Be a Front End Developer”

Specification Networks

Quick Brushup: Story of instance method , static method, class methods and Burger(Python)

TryHackMe : OWASP Top 10

Summary on Well-Architected Frameworks

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Vibhor Gupta

Vibhor Gupta

Hi, I am a Certified Google Cloud Data engineer. I use Medium platform to share my experience with other members of Medium network.

More from Medium

Airflow — Cross Dag Dependency

Your first steps with App Engine

Fortune 50 energy company migrates 600 TBs of data into Azure in just 20 days and realizes cost…