Machine Learning (ML) has come a long way from being a supporting feature of the software to becoming a well-rounded solution to real-world problems. With the growing incorporation of ML in such businesses, the data volumes have also increased from some megabytes a day to gigabytes per minute. While this deluge in data is very important to build systems capable of handling real-time challenges, there is a critical need for processes and tools that can handle the vast volumes efficiently. MLOps tools like pipelines have emerged to streamline and automate their order to ensure proper loading and preparation of different-sized datasets.
How to Automate Data Pipelines with Apache Airflow?
Data pipelines are made of tasks that, when executed in the desired order, help in achieving the required result. Let us assume that you have been approached by the owner of an anti-pollution face mask manufacturing company. They would like to use ML to increase the efficiency of their operation by using AQI data to predict sales trends. The company owner would like you to implement a data pipeline that creates an ML model correlating mask sales with AQI variations. After all, many cities see disturbing levels of AQI, and masks are crucial during those times. This model can then be used to predict demand for the company’s face masks in the coming weeks, depending on the AQI forecasts done by your system. So the ML solution that powers your neat little dashboard could look something like this:
The data pipeline for the use case can be translated to a DAG that looks like this:
As you can see, the sales and AQI data are independent of each other. Their preparations are independent tasks and can therefore be done in parallel. The workflow has thus been optimized. This was a relatively straightforward use case, and more complex business problems would translate to a multi-faceted solution with complicated workflows.
Recommended Reading: What is a DAG?
So What is Apache Airflow?
Apache Airflow is an open-source platform that allows authoring, scheduling, and monitoring workflows programmatically. It is a platform that lets you define workflows as code since this programmability makes workflows easier to maintain, test, and collaborate. In this data tool, data pipeline DAGs are programmed through Python scripts called DAG files. Each DAG file describes tasks and the dependencies between them. A typical Airflow pipeline would look like this.
You may also ask what is Airflow used for? Authoring a workflow would set a sequence of tasks while scheduling would plan and control the order of the tasks and optimize when a particular task should be done. All this could be very tedious if you had to write scripts for each of these functionalities, and Airflow makes all this a smooth experience.
Recommended Reading: Components of Airflow: Operators
Development Environment of Apache Airflow
Let us now see what makes up the development environment for Airflow that is aptly named Breeze. It will help you better understand the entities you would create and work with.
- Airflow Scheduler is responsible for the monitoring of DAGs. It triggers the scheduled workflows and submits the tasks to the executor. It is responsible for keeping tabs on the DAG folder, checking the tasks in each DAG, and triggering them when they are ready. The scheduler does all this by spawning a periodic process that reads the metadata database to check the status of each task and decides further actions. In your Breeze environment, the scheduler can be started by the command 'airflow scheduler', and its configuration can be controlled through the airflow.cfg configuration file.
- Airflow Workers, aka Executor, handle the running of member tasks defined in a DAG by requesting resources needed by a task from the scheduler. By default, Airflow uses the SequentialExecutor, which is limited in functionality but works well with SQLLite. Based on the resource-handling strategy and your preference, you may choose among Debug Executor, Local Executor, Dask Executor, Celery Executor, Kubernetes Executor, and Scaling Out with Mesos.
Celery Executor can ensure a high availability since it employs workers that function in a distributed manner, making it preferable over the SequentialExecutor for some cases. Moreover, there is redundancy as a failed worker node can be taken over by another.
- Airflow webserver is a handy user interface (UI) to inspect, trigger and debug the behavior of DAGs and tasks like the one seen in the pictures above. You can perform many actions on this UI that include triggering a task, monitoring its execution, and the duration. Additionally, the UI offers two comprehensible views to browse the structure of DAGs in graphical as well as a tree view. The logs can also be consulted for debugging. The command 'airflow webserver' will launch the web UI in the breeze environment.
- A folder of DAG files that is read by the scheduler and executor and any of its workers for the functioning of Airflow.
- Airflow metastore is a database that holds the requisites and states of Airflow activities. The requisites include secrets and their management that allow authorized access to hosted Airflow. This is required because Airflow communications happen over less safe HTTP, and the webserver needs to be secured through a private key and SSL certificate. For the scheduling function, metastore is used to save the interpretation of scripts and states of the workflows. The scheduler also updates the metastore with dynamic DAGs created for the periodic flows. The next actor, the executor, picks the tasks enqueued by the scheduler and executes them. The status changes like running, successful, failed, and so on are registered in the metastore. All these database operations are carried out through SQLAlchemy. Therefore you need to consider compatible databases like MySQL and Postgres if you do not wish to use the default SQLite database.
How to Install Apache Airflow?
Now that you have decided to take Airflow for a spin, you can follow easy installation steps from the Quick Start guide. You will have the option to run it from a Python environment or in Docker. Read on to know more about setting up Apache Airflow.
For a Python environment, 'pip install apache-airflow' is the simple command to install the Airflow package. After installing Airflow, you can start it by initializing the metastore, creating a user, copying the mask sales DAG (or any use-case DAG that you have written) into the DAGs directory, and starting the scheduler and webserver by following commands:
Docker containers are also a popular method to create isolated environments if you wish to run a set of Python packages and avoid dependency conflicts. Running Docker containers would require the installation of a Docker Engine on your machine. This command would let you run Airflow in Docker:
While we ran this single command for demonstration, the Airflow webserver, scheduler, and metastore should be restricted to separate containers in a production setting. Once the required services have been started, you can view Airflow on http://localhost:8080 and log in with your username and password.
Once set up, you can load the Airflow web view on http://localhost:8080. The first view will be the login screen. Logging in will take you to the main page with an extensive overview of the different DAGs and the summaries of their recent runs. A newly created user may also find some sample DAGs to get started with.
Clicking on a specific DAG will give you a graph view of the selected DAG. This view would show you the taska' organization and interdependencies for the relevant DAG.
Airflow also offers a tree view that is more detailed than the graph view. In the tree view, you can see the current and previous runs of the associated DAG.
Airflow’s tree view is the richest visual provided by the web interface as it gives you a quick overview of how a DAG has performed over time and allows you to debug failed tasks. As highlighted in the image, one column shows a single execution of a DAG workflow comprising all the completed tasks. A single task corresponds to a cell in the tree view, so a row of cells shows the status of one task over time. The colors of each cell indicate the result of the corresponding task. You can also click on the task cells for more details about a given task instance or reset the state of a task to be rerun by Airflow if desired.
If a task had failed, then by default, Airflow would retry a couple of times before marking the task execution as a failure. You may also configure some wait time between the runs to help tasks recover from any intermittent failures.
Things to Consider Before Starting with Airflow
Bet you cannot wait to get started with Airflow, but please spend a little more time here to save yourself some unnecessary headaches.
- You opened the graph view on the web interface and decided to run it. You click on a task to execute it, but nothing happens. The chances are that the pause switch on the top-left is off, and you need to turn this toggle switch on first.
- Always use a unique identifier for DAG when defining the DAG file.
- You defined X number of tasks for a DAG, but only Y are running. The scheduler is not busy, but what should you do? Please open airflow.cfg and modify values of dag_concurrency and max_active_runs.
- “Help! The webserver is crashing with an error 503.” The only information you have is the cryptic text: ‘Service Temporarily Unavailable Error.’ The good news is that it is not error number 500, which means the server cannot handle any requests. Since a webserver might be handling other internal requests or a bottleneck has been created with hanging processes, it may need time to manage yours. This overload situation gives an error code 503. You may restart the webserver if it has been running for long. Another way is that you may increase the value of web_server_master_timeout or web_server_worker_timeout to increase the wait time before airflow decides that the webserver cannot cater to you.
- Your team member had scheduled executions for 5.00 am, and when you checked for the latest logs, the dates were a day behind. Should you wait for some more time? No. By default, Airflow time and date are in UTC. You may either condition yourself and your team to work with UTC or modify the time zone in airflow.cfg
- You set the start_date of a DAG to datetime.now() and wait for the magic to happen. Then wait a bit more *sounds of crickets chirping*. The DAG will not start. Airflow would see instantaneous date and time in the start_date field and assume that the DAG is not ready. Always specify a past date to make your DAG look prim and ready for execution.
The Good and Bad of Apache Airflow
If you are interested in implementing data pipelines with Apache Airflow, you also need to weigh its pros and cons. Some of the primary advantages offered by Airflow are:
- Airflow is an open-source framework. So no vendor-captive limitations, yay!
- You can program pipelines that are as complex as you wish to code. It all depends on your proficiency in the Python programming language.
- Python, again an open-source language, offers many types of libraries. The DAG files written in Python can be easily extended as per your needs.
- Another advantage of being open-source? You can find many community-developed extensions that cater to different databases and cloud services.
- You can schedule the data pipelines to run regularly and use their results for incremental processing. Also, features like backfilling enable you to easily re-process historical data. Such features save up on the re-computation of intermediate results.
- Airflow has an easy-to-use web interface that lets you see and modify pipelines or debug in case of any failures.
Airflow’s design choices may make it less suitable for certain cases despite numerous advantages.
- Since Airflow was designed to run batch-oriented tasks, it may not be the best fit to implement dynamic pipelines where tasks would be modified between each run.
- Python programming experience is a major requirement to implement DAGs for Airflow. For teams having scant programming experience, a workflow manager with a graphical interface like Azure Data Factory may be more suitable.
- Airflow DAGs need regular maintenance or versioning to achieve stable pipelines
- Airflow is mainly a workflow management platform. It cannot manage data lineages and their versioning. You’ll probably need to combine Airflow with other specialized tools for such additional requirements.
I hope this post gently introduced Apache Airflow to you and showed you how to get started with it. Airflow has extensive functionalities that you can explore through the rich user interface and wide community support.
Liked the content? You'll love our emails!
The best MLOps and AI Observability content handpicked and delivered to your email twice a month