Apache Airflow 2.0: A Practical Jump Start

Photo by Zeb Johnson on Unsplash

In this article, I would like to give you a jump-start tutorial to understand the basic concepts and create a workflow pipeline from scratch.

So let's get started!

Introduction

Workflows are created using python scripts, which define how your tasks are executed. They are usually defined as Directed Acyclic Graphs(DAG).

AirFlow DAG

The workflow execution is based on the schedule you provide, which is as per Unix cron schedule format. Once you create python scripts and place them in the dags folder of Airflow, Airflow will automatically create the workflow for you.

How difficult are the python scripts?

Well, not difficult but pretty straightforward. Let me explain this.

Key Concepts

  • BashOperator — used to run bash commands.
  • PythonOperator — used to run a python function you define.
  • BranchOperator — used to create a branch in the workflow.
  • DummyOperator — used to represent a dummy task.

There are quite a few other operators that will help you make an HTTP call, connect to a Postgres instance, connect to slack, etc. You can find more operators here.

Finally, with the theory done, Let’s do something exciting i.e. create our first Airflow DAG.

Creating an Airflow DAG.

  1. Check if the URL is available
  2. Fetches some data from an URL
  3. Extract certain fields from it.
  4. Print the extracted fields using the bash echo command.
Example Airflow DAG

Pretty simple workflow, But there are some useful concepts that I will explain as we go.

So let's start.

Running Apache Airflow with Docker

docker-compose up -d

Once your containers are up and running, adags folder is created on your local machine where you placed the docker-compose.yml file. We are going to use this dags folder to place our python workflows. You can access the Airflow web UI using the URL localhost:8080

Creating your DAG Definition

from airflow.models import DAGdefault_args = {
'start_date': datetime(2020, 1, 1)
}
with DAG('user_processing',
schedule_interval='@daily',
default_args=default_args,
catchup=False) as dag:

With this, you just created a new workflow, But without any tasks to be executed. The first parameter is the name of the DAG, followed by the schedule you want it to be triggered. Then you set some default arguments like start_datetelling when actually the DAG can start to trigger.

Next, let's create our first task.

Now, to check if our URL is reachable, we are going to use a sensor operator. In this case, it's an HttpSensor operator.

...with DAG('user_content_processing',
schedule_interval='@daily',
default_args=default_args) as dag:

is_api_available = HttpSensor(
task_id='is_api_available',
http_conn_id='user_api',
endpoint='api/'
)

To create the HttpSensor operator, We provide it with a task id, HTTP connection id, and endpoint. The task id identifies the task in the DAG, and the endpoint identifies the API to fetch. Now the third parameter i.e http_conn_id will require something to be explained.

Airflow provides a mechanism to create and store some configurations that you can use across workflows. One such type is configuring “Connections”. Here you can provide various connections like AWS connections, ElasticSearch connection, Kubernetes cluster connections, etc.

In our case, we would be using the HTTP connection option. Here we would provide the URL we want to trigger and setting the connection id to user_api .We want to call the URL https://randomuser.me/api/, which will return a JSON response of some random user information.

Airflow Connection Creation

Finally, with this configured, the task is ready to make a call to the URL with the endpoint provided.

So with that, We just created our first task and learned some concepts.

Now, once the first task execution succeeds, we want to execute the task that will make the actual call to get user details. For this, we are going to make use of an HttpOperator called SimpleHttpOperator.

is_api_available = HttpSensor(
...
)

extracting_user = SimpleHttpOperator(
task_id='extracting_user',
http_conn_id='user_api',
endpoint='api/',
method='GET'
)

As you can see, we have the task_idto identify this task, the http_conn_id we spoke about before, the endpoint, and the method you want to execute.

That's it!

Now you must be wondering what happens to the response. For this let's look into another concept called Xcoms

Xcoms

The SimpleHttpOperator stores the response inside these Xcoms. Hence we will use another task to retrieve the value and process it. You can read more about Xcoms here

Once the data is fetched, We are going to create a processor, which will process the data that was pushed into Xcoms by the previous task. We will do this using a python function and hence the new operator that we are going to use is the PythonOperator.

fetch_user = SimpleHttpOperator(
...
)

processing_user = PythonOperator(
task_id='processing_user',
python_callable=_processing_user
)
DAG Tasks

The PythonOperator is a simple operator, that takes the task_id and a python callable function. Let’s look at the function.

def _processing_user(ti):
users_txt = ti.xcom_pull(task_ids=["extracting_user"])[0]
users = json.loads(users_txt)

if not len(users) or 'results' not in users:
raise ValueError("User is empty")
user = users['results'][0]
user_map = {
'firstname': user['name']['first'],
'lastname': user['name']['last']
}
processed_user = json_normalize(user_map)
Variable.set("user", processed_user)

So this function receives a Task Instance (referred as ti). We use this task instance to get the information that our task extracting_user has pushed to Xcoms, using xcom_pull call. We then validate the input and extract some fields as a dictionary. Finally, we normalize the dictionary and add it to something called Variables. Let’s look at what are Variables.

Variables

Now that we have stored the values inside Variables, Let print them to the logs using the echo command. To do this we are going to use the BashOperator

processing_user = PythonOperator(
...
)
print_user = BashOperator(
task_id='log_user',
bash_command='echo "{{ var.value.user }}"'
)

In this, I will explain to you another important concept called Templating.

Templating

Now how would you know if the particular operator supports templating? For this, you can search for “template_fields” in the API reference of the operator, Which will tell you which fields support templating. e.g You can have a look at the HttpSensor operator reference here.

Ordering Your Tasks

Task 1 << Task 2

This means Task 1 will run before Task 2. In our case, we will order this as

is_api_available >> fetch_user >> processing_user >> print_user

And there we go. We just created a simple Airflow DAG workflow from scratch covering some of the important concepts of Airflow. Now, enable the DAG and hit run. It will start executing.

Apache Airflow Successful Execution

As usual, I have uploaded the code on GitHub. :)

This article was originally published on RefactorFirst.

Feel free to share this article and follow me on Twitter. You can also subscribe to my newsletter on RefactorFirst.com

Enjoy!!

You can always say a thank you and support my articles by buying me a coffee. :)

Software Craftsman, Tech Enthusiast. I run https://refactorfirst.com to post all my articles