DAGs In Apache Airflow

Introduction

DAGs are the most important component of Apache Airflow; DAG stands for Directed Acyclic Graph, it’s a graph with Nodes and Edges and it should not have any loops as edges should always be directed. In a nutshell, DAG is a Data Pipeline, Node in a DAG is a task like “Download a File from S3” or “Query MySQL Database”, “Email” etc.

The article explains DAG, its common properties, writing a DAG, Scheduling DAG, and other basics, and by covering these topics step by step we will develop our first workflow with a simple operator which will be later monitored and scheduled by Airflow.

Directed Acyclic Graph(DAG)

In the DAG each node represents tasks, and it should not have any loop in it. The DAG for real-world scenarios would be, to "Download File", "Transform it using Pandas", "Make a DB insert" and "Emailing that process is completed".

DAGs in Apache Airflow

This is an example of correct DAG and each node here is a task, there are no loops in it each task will be executed one after the other.

DAGs in Apache Airflow

Incorrect DAG as there is a loop this workflow will run forever, as it keeps downloading the file forever and never terminates.

Writing a DAG

The airflow data pipeline is a Python script that contains the DAG object. The first step is to import modules required for developing the DAG and Operators.

from airflow import DAG
with DAG() as dag:

This import is required for instantiating a DAG object, line 2 is our DAG and it is the data pipeline. This DAG is of no use, we need to add certain parameters inside the DAG() the first parameter is the “dagid”, every dag inside the airflow will be identified by a unique id.

with DAG(“basic”)

The second parameter is “start_date”, the start_date represents the first DAG start date.

from airflow import DAG
from datetime import datetime
with DAG("basic", start_date=datetime(2022,1,1)) as dag:

The 3rd parameter is “schedule_interval”, using ‘schedule_interval’ the jobs can be run at specified intervals.

from airflow import DAG
from datetime import datetime
with DAG("basic", start_date=datetime(2022,1,1) , schedule_interval="@daily") as dag:

That’s it these are the only steps to create a DAG, of course, the dag will do nothing as we have not defined any task yet. The article covers the task creating in the latter part.

DAG scheduling

The DAG can run at regular intervals we need to configure the interval through the ‘schedule_interval’ argument. By default, the value of ‘schedule_interval’ is None which means the DAG will not be scheduled and can only be run through Airflow UI. In Airflow there are various convenient cron presets that can be used for DAG scheduling, they are

  • @once - will execute only once
  • @hour - run once an hour
  • @daily - once at midnight
  • @weekly - once a week at midnight
  • @monthly - once a month at midnight
  • @yearly - run once a year midnight of January 1.

The DAG can also be scheduled using cron-based intervals, all the above macros are mostly used for running at midnight every day, a year, or a month. But what if the requirement is to execute a DAG every 5 minutes of 10 minutes or every Thursday at 10:00 AM, in these circumstances cron-based interval scheduling should have opted.

Usually, the cron expressions are represented by an asterisk (*) where each * represents an hour or month based on its position.

DAGs in Apache Airflow

Run DAG every 5 minutes can be represented using cron expression as “ * / 5 * * * * ”

5 9 * * 3 = at 9:05 on Wednesday

*/1 * * * * = Run every minute

0 15 * * * = every day at 15:00 PM

0 15 * 12 * = December 15:00 PM

from airflow import DAG
from datetime import datetime
with DAG("basic", start_date=datetime(2022,1,1) , schedule_interval="*/5 * * * *") as dag:

DAG runs every 5 minutes.

Airflow ‘schedule_interval’ also supports frequency-based scheduling as sometimes cron-based scheduling can be confusing, for that datetime can be used.

with DAG("basic", start_date=datetime(2022,1,1) , schedule_interval=timedelta(days=5)) as dag:

The dag will run once every 5 days.

with DAG("basic", start_date=datetime(2022,1,1) , schedule_interval=timedelta(minutes=0, hours=15, days=3)) as dag:

Every Wednesday at 15:00 PM.

Defining Task

Tasks inside the DAG are configured using Operators, in the article, we are going to write a Python function that prints “Hello World” every 5 minutes.

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python import PythonOperator
def print_message():
    print("HELLO WORLD!!!")
with DAG("basic", start_date=datetime(2022,1,1) , schedule_interval="*/5 * * * *") as dag:
    task = PythonOperator(
        task_id="dag_basics",
        python_callable=print_message)
task

Airflow UI

DAGs in Apache Airflow

Summary

The article covered, the basics of DAG, the properties required for running DAG, and explained in depth DAG scheduling. In the upcoming articles, details on Operators will be explained.


Similar Articles