Contact Us
24/7
Python BlogDjango BlogSearch for Kubernetes Big DataSearch for Kubernetes AWS BlogCloud Services
<< ALL BLOG POSTS

Tips for Running Smoothly with Airflow

|
December 2, 2022

Airflow is great in many ways. During a recent project, our client’s stakeholders waxed lyrical about the benefits of switching to Airflow, including better tools to figure out what happened in an individual workflow instance, and the ability for developers to get into the business logic without having the entire workflow system’s complexity in the line of sight.

In practice, though, there are some snags that anyone new to Airflow should watch out for. The following is a list of “gotchas” and suggestions we’ve learned during that aforementioned project. Being aware of these snags can help you get up and running more quickly, reduce frustrations, and even set expectations when switching to, or working with, Airflow.

Global DAG defaults

We recommend implementing a simple common structure to pass settings into the DAG declaration for values that will be used across all DAGs right from the very start. This proved useful for us when we wanted to change the retries policy for tasks across all DAGs. This can then be overridden on a per-DAG basis if necessary.

from client.settings import dag_defaults
    dag_defaults['tags'] += ['foo', 'bar']
    with DAG(dag_id="xyz_v001", **dag_defaults) as dag:
        


When to use TaskFlow tasks vs. Operators

Starting out with Airflow there were no clear guidelines documented about when to use Operators and when to use TaskFlow. Eventually, we developed our own rules of use:

One advantage to using Operators that we recently came across is that you can define trigger_rules for Operators at the DAG level, whereas for TaskFlow tasks they must be defined at the task level, leaving you with less flexibility in the flow of a DAG. That’s not such a big deal when your TaskFlow task is limited to few DAGs, but it could be a problem if all of your code is based on TaskFlow tasks.

Code/package structuring

Airflow will constantly parse all code in the dags directory, so you’ll want to limit what you place there. During the project, we came up with a structure similar to the following, which still seems to be working well without much change one year later:

dags/my_dag_v001.py
dags/tests/ - exclude this directory from DAG parsing via dags/.airflowignore
dag_plugins/client_name/factories.py - Reusable TaskGroups for multiple DAGs.
dag_plugins/client_name/tasks.py - Reusable tasks for more than one DAG.
dag_plugins/client_name/my_dag/tasks.py - TaskFlow tasks that are used in just one DAG
dag_plugins/client_name/my_dag/tests/ - all your DAG's Unit tests go here. 
dag_plugins/client_API_name/operators
dag_plugins/client_API_name/hooks
dag_plugins/client_API_name/sensors
dag_plugins/client_API_name/triggers
dag_plugins/client_API_name/tests/[hooks, operators, etc.]

For the DAG name, we use a suffix of “_vnnn” where “nnn” is the number for that particular DAG. A couple examples might be dag_name_v001.py and dag_name_v002.py. This is done because we know there will be changes to our client’s process over time. When that time comes, they’ll want to keep the existing DAG so they can review and rerun old DAG run instances without them being subject to the new DAG’s structure.

Airflow has discussed changing this so the structure is stored with a DAG run instance, but they’re not there yet. The main point here is it’s important to remember when changing DAGs that there may well have been previous DAG run instances that you’ll need to consider when making changes. This applies to Operators and other tasks as well.

Airflow is slow. Really slow.

The client we were engaged with provided us with 2019 i7 MacBook Pros with 16GB of RAM, but slow iteration times were common during development. At idle, running only the Airflow containers and not running any DAGs or tasks, 300-500% CPU utilization and loud fan noise was the norm.

We recommend people who can’t work on powerful Linux machines directly to look into Docker context with file sharing for the dag and dag_plugins mounts, and use a fast Linux machine where the Airflow containers will actually run.

This should hopefully improve some after Airflow has a release with Python 3.11 support and with AIP-45 to remove double DAG parsing; however, AIP-45 has been pushed back at least once and it’s hard to know at this time if those will be enough to completely mitigate the frustration, or if other features will offset those gains, so running development containers on a powerful machine is likely to remain a good idea.

Airflow inconsistencies and development methodology

API and documentation
When just starting out and reading the documentation, it seemed like Airflow’s API was nicely designed, and in many ways it is, but then when you get into development you see that the API or documentation misses out on things that could have made real world use and the developer experience easier.

For a simple example, one friction point for a while was the inconsistency in using Operator and TaskFlow output in a DAG without using XCOMs.

class CustomOperator(BaseOperator):
    

@task
def custom_task():
    

@task
def output_task(operator_format, task_flow_format):
    

with DAG(task_id="sample_v001", **dag_defaults) as dag:
    wf_task = custom_task()
    op_task = CustomOperator(task_id="op_task")
    out_task = output_task(wf_task, op_task.output)

A small but nice change in terms of consistency for a DAG developer would be to have .output on both, and then it requires no thought. Having no .output, like TaskFlow, would be even more pythonic, but would require a major version bump.

Task naming in the UI
In addition to this, the above code would show in the Airflow UI as op_task and custom_task, meaning even the naming for operators and tasks as displayed in the UI is inconsistent.

We chose to give naming priority to what will appear in the UI, requiring some creative naming at times. For example, making the task name a slightly more formal version of the UI view’s visible naming.

create_items_payload = format_items_payload(previous_step_data)
create_items = CreateItemsOperator(task_id="create_items", payload=create_items_payload)

Another issue we noticed was that sometimes Airflow features get released early before they’re really complete. A recent example is Dynamic Task Mapping. There’s no way to have Dynamic Task Mapping run into a chain of tasks, it can only determine whether one task can run a certain number of times. There's also no way to run a Dynamic Task Mapping into a TaskGroup as a workaround at this time, though that’s in development now.

In our use case at least, we almost never needed to run just one task after we wanted to use Dynamic Task Mapping, so for now the Dynamic Task Mapping feature is practically useless. Making it worse is that documentation didn’t cover this limitation, so time was wasted trying to figure out a way to make it work.

There will be bugs

In a year of development on this engagement we saw a number of bugs that arguably should never have made it into release, including some that still haven’t been resolved months later despite issues being filed.

Development environment bug
Using the Airflow Quick Start development environment, changes to the code will automatically restart all the containers, right? Nope, not for the triggerer container.

You make your change, you see a bunch of containers restart in the mass of logs that the docker-compose puts onto stdout, and then you spend an age trying to figure out what’s wrong with your code to cause the unexpected output. You don’t immediately think it might be Airflow because this is your first trigger and up to now there’s been no issue with code not updating, except for when it gets extra slow, which happens now and again, but even that eventually comes right, so you wait and wait.

Then you get to the point where you just know it’s not your code anymore and you start digging deeper. You wonder why your trigger’s logs aren’t even showing up in the task’s log tab output in the first place. That’s not right, they should be there, but they’re just not.

Only then do you go looking for the trigger’s logs on the docker-compose stdout and realize the logging output there matches the old code! The containers haven’t been getting restarted! Now you need to restart the container manually every single time you change the code.

Production bugs
We had a bug with the Graph view not updating as the DAG progressed, and another bug with deferred sensors. An issue had already been created and a PR added, but we were going to be waiting some time for the next release. We needed this to work, and we couldn’t downgrade without losing some other functionality we were relying on.

The suggestion on this general problem, one we were happy we had already implemented, is to have a fork of Airflow and to implement the patch that was in the PR ourselves for our own deployment until a production Airflow release was created. Bonus: Our client had a happy reminder about why Open Source is so amazing.

Log all the things

This obviously applies in most code but it especially applies in the case of complex workflows. It will save you a lot of tickets and debug time up front if the stakeholders can look at a task log themselves without having a developer in the loop and see that the data somewhere in the workflow process wasn’t what they’d expect it to be, and then trace that back to the input data that was provided at the time of DAG run initiation. It’s really nice to have a workflow go through testing and require zero developer input for source data problems or issues in processes carried out on other systems.

We settled on logging the input and output of all Operators and TaskFlow tasks, hooks, and requests at the very least, as well as steps along the way in more complex tasks. Extensive logging in sensors and triggers that track what’s happening in a plainly worded way will save you time in the long run as they get executed and you try to figure out what the trigger is currently doing, and most importantly, why it’s currently awaited or has proceeded.

The use of plain wording helps non-developers understand what they need to do to get the task to complete without contacting a developer.

Unit Tests. Write them.

They’re great not just for helping ensure your code works on an ongoing basis, but also for speeding up developing the fix when you find out it doesn’t.

Unit tests saved us a ton of time recently when we needed to rework a complex sensor and trigger combination because of an unlikely edge case. We were able to first add the test to reproduce the edge case, rework the trigger, and run the tests for the fix without having to make matching changes in the sensor — all without the process of constantly manually restarting the trigger container, waiting for Airflow to settle, then clearing and running the task and waiting for the result.

That’s tedious. Running pytest isn’t.

With that done we could just as easily tackle the sensor changes. Overall having the tests made iteration much quicker even if there was a non-trivial time investment put into them earlier on.

How can we assist you in reaching your objectives?
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.