본문 바로가기
Data Engineering

Intro to Airflow

by Calvin H. 2022. 6. 1.

Introduction

Airflow, developed first by airbnb, appeared to manage the vast amount of workflows.

Simply put, airflow is workflow management with code. By code, I mean python. So a python programmer can easily use airflow to control workflows.

Characteristics

Operators

As already mentioned above, airflow can be used with python. Making it easier to control the necessary jobs. Along with python code, airflow uses operators. Operators are simply put tools to help code. For example, there is a bash operator, postgres operator, google operator etc, which allows easier control.

Basically they are tools that can be used like libraries.

Operators vs Tasks

From a user standpoint, a task is not much different than an operator.

However, from an engineering point of view, these two are different. Although they may seem the same in terms of executing certain steps, they are different in terms of their objectives and actions.

An operator is a single step of code execution. And a task encompasses such operator. This means that a single task is a single step but within this task, the operator is the actual executor. Furthermore, a task shows and manages states of each step. Meaning that the state of each step is mainly managed by the task while the operator actually executes and outputs the state.

Thus, from a user standpoint, where they are able to observe DAG runs, a task is not much different than an operator since they can not differentiate such difference because they only get to see the output.

DAG

Directed Acyclic Graph is the form of workflow airflow uses. Thinking it as a graph is helpful, except for the fact that there is no circular flow. In other words, it is a one-way road except for the fact that multiple nodes can connect to one and vice versa.

This allows careful control since one can organize the nodes in such a way that a single node leads to many other nodes and other nodes lead to a single node. And during such flow, they can wait for other nodes to complete.

Not exactly an ETL tool

Unlike what some people may think, airflow itself is not an ETL tool. It is only a workflow management tool, meaning it uses DAGs with code to control different operations.

One can use code to organize and construct code for ETL process which means that airflow when used in the right situation can be quite powerful.

By managing workflows, airflow is able to schedule itself in intervals or in conditions, allowing periodic runs while being able to keep the logs of all such runs.

Main Components

In airflow, there are three main components. They are the airflow scheduler, airflow workers, airflow webserver. The scheduler is what keeps the DAGs running at specified intervals and is able to backfill, keep track of tasks run etc. The worker is the actual executer that works with code. It executes the desired behavior when given tasks with a queue. And lastly the airflow webserver is the web UI made with flask, a python web application framework, that allows users to interact with it. Allowing them to manage, view, and operate DAGs.

However, the web UI doesn't allow the creation of DAGs. They are only able to manage what already exists in the 'dags' folder.

Scheduling

In order to understand scheduling in airflow, one needs to understand that it is interval based. Basically, a task executes with 'intervals'. For example, upon scheduling a task at Jan 1, 2021 with a daily interal, that first task that runs is not at Jan 1, 2021. It is actually on 00:00, Jan 2, 2021. That is because for airflow a task executes at the first moment after the last interval. It may seem strange at first but this execution keeps track of the 'interval' it represents.

Thus, a task run at Jan 2, 2021 00:00 represents the interval of 'daily' meaning from 00:00 Jan 1, 2021 to 23:59 Jan 1, 2021.

This is different from just cronjobs. Cron operates with a point time scheduling. Meaning that if you want to run something at 00:00 Jan 2, 2021 it runs at that specified time. The advantage of interval scheduling is that one knows the start and end of the time a certain task covers. While for a cronjob, you don't really know whether a task run at 00:00 Jan 2, 2021 represents a 6 hour worth of task, 12 hour worth of task etc. A simple time at which it is run is all the information it gives.

Furthermore, with datetime timedelta parameter in python, airflow can schedule tasks based on X amount of time. For example once per 3 days, every 3 week, every 6 hours etc. This is pretty hard to simulate in a cronjob since you have to pinpoint the time the job runs. Even if one manages a once per 3 days for a given month, upon moving on to the next month, it may not synchronize properly with given intentions. For example, if the last task is run at Jan 31, 2021, the next task will be run at Feb 1, 2021. Since for a cronjob, a month is reset. So one may have to implement extended measures to take care of such discrepencies.

Dockerizing Operators

Why would you want to dockerize operators? Why not just write out python code and use a single python file for a DAG? It obviously needs more work and cooperation to orchestrate such behavior.

However, containerizing operators actually has some advantages. For one, the dependencies may differ from one operator to another. Meaning that while executing operators, one may have to use different versions of certain libraries and it may be hard managing such operator dependencies.

So if one only has to use docker operator, it is much simpler since within the execution of each docker operator, one can specify the needed versions separately.

Another advantage is a uniform approach to running different tasks. If one has to use different operators, they have to be able to keep track of each operator usage and the interface. However, by using a single docker operator, one only needs to know how this operator handles tasks and debug each operator separately.

Finally, it improves testability. Since each operator is run at different containers, they are able to have their own development lifecycles, meaning that they can be tested on its own. Meaning that testing dependencies are less required.

Cloud Deployment

To deploy airflow to cloud, there are various strategies to consider or use. For one, one has to be familiar with specific cloud service providers - AWS, GCS, Azure etc. This is because each provider differs in services and usage.

If deploying to AWS, one can use simple AWS Glue, AWS Athena and AWS S3 to simply manage the services. Basically use AWS Glue to manage the crawling and processing (ETL), save them to S3 then query them from AWS Athena by defining the S3 object schema. Then you can save the resulting file to S3 again.

Another example is by using ECS, elastic container service. That is, you can use the ECR (elastic container registry), Fargate along with tasks and services provided within ECS. Also, by setting up a separate AWS RDS postgres database (or other RDBMS) you can organize another form of airflow deployment on AWS. The fact is one has to take care of DAG storage, Log storage, and metadata database for the storage/database aspect. And for the airflow worker, flower, scheduler, webserver, redis you can base them off from a single image (for eg. docker image). Make them into containers and create a ECS task for each container. By the way, a task in a ECS cluster can be thought of as a docker-compose (from my understandings). And by setting up services on ECS, you can execute these 'tasks'.

Deployment Experience Log

With the help of 'Designing Data Pipelines with Apache Airflow' and some medium blogs, I managed to gather the courage to try a new tech stack on AWS. Namely, ECS. The first difficulty was in understanding what ECS is comprised of.

ECS is basically like kubernetes except unlike k8s you don't have to manage settings and such as much. Inside a ECS cluster, there are services. These are basically containers in execution. Thus, one can specify what and when these services are run. And within these services are tasks. Tasks are basically like a docker-compose file. A task is a group of containers. It manages what image to base off from, and what containers it should be running along with optional port mappings, environment settings, VPC etc.

The above paragraph mentioned images. They are basically container images. Simply put, docker images. There is a service called ECR, which allows one to store and manage these images. If you have AWS CLI, you can even upload them directly.

I was stuck at the load balancer part. The problem after setting up ECS, ECR, tasks, services etc., was that I couldn't get my head wrapped around VPCs and how I could connect a Load Balancer to an ECS service.

The second problem was in understanding AWS Fargate. Fargate is "a serverless compute engine" which allows users to use the minimum required resources. Unlike an EC2, you can use Fargate to have a running compute engine when you need it, keeping costs and resources lower.

Docker-compose

Deploying airflow with docker is quite easy since the airflow documentation has a simple steps you can follow.

The site is here. They actually have instructions that uses a docker compose file.

So you 'curl' the file and simply start it after docker-compose up airflow-init and docker-compose up. If you get an error while setting up databases related to UID or GID, you have to make sure you made three directories in the same folder with the compose file: logs, dags, and plugins. Then you have to create an .env file that has "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" or simply follow the page and enter the commands.

With docker-compose, I actually just created an AWS EC2 instance with SSH ports and HTTP ports made available and put it up there.

My EC2 was Ubuntu 20.04, size t2.medium because I tested it locally and found you need at least 2~3 gb ram to run the entire docker-compose file. Around 4 CPU is OK if you don't have much DAGs. You should make sure that the airflow webserver, scheduler, workers have enough CPU and RAM. Also Redis is included used by flower. So you might want to either get higher RAM for EC2 or separate the instance for Redis connection.

For the airflow metadata database, I chose to create an AWS RDS postgres free-tier instance since as I already mentioned before, I don't have much DAGs.

The downside of using an EC2 instance is that you can't control the RAM and CPU separately. And resources aren't elastic in terms of the instance themselves.

Another downside of running a docker-compose is if you have a deployment cycle that restarts the docker containers periodically, you will have to start either all the related containers or certain containers.

For me, I simply created a github repository for DAG storage, docker-compose file and linked a webhook to jenkins that deploys the repository contents to the AWS EC2 through SSH file publishing.

It wasn't hard building up the CI/CD pipelines since I didn't have any tests or have a health check during deployment. Which means that one wrong config in the repo might end up as a disaster...

All in all, I think for simple deployment, this cycle is enough, given that one separates DAGs from the webserver.

On AWS MWAA

AWS has a Managed Workflow Apache Airflow similar to GCP Google Cloud Composer. However, the pricing isn't quite great... hence the reason I moved to AWS EC2 with docker compose. Another problem is that one can't control the 'inner' parts of this service for example the database, logs etc. They are all managed by the AWS service since it is afterall a 'managed' service.

'Data Engineering' 카테고리의 다른 글

Data Discovery Platform 알아보기  (0) 2022.12.24
AWS MWAA 활용하기  (0) 2022.06.11
Intro to Sqoop  (0) 2022.06.01
Intro to Hive  (0) 2022.06.01
Spark Example  (0) 2022.06.01