Airflow is an excellent orchestrator for managing serious data pipelines. But as that infrastructure scales up, how you go about managing all those DAGs becomes very important to maintain efficiency.
The standard approach is to create template files, one for each DAG that will or may be used; however, this can lead to a pretty unwieldy file directory. Another method would be to create a “DAG factory,” which can churn out thousands of DAGs dynamically from a single configuration file.
First, a little terminology.
A DAG, or directed acyclic graph, defines a series of tasks that are interdependent but do not loop around. In other words, it’s a workflow with a defined beginning and end, but potential for many branching paths in between.
Airflow loads DAGs from Python source files, which it looks for inside its configured DAG_FOLDER
. It will take each file, execute it, and then load any DAG objects from that file. You can learn more about how this works in Airflow’s documentation.
The simplest way of creating a DAG is to write it as a static Python file.
However, sometimes manually writing DAGs isn’t practical. Maybe you have hundreds or thousands of DAGs that do similar things with just a parameter changing between them. One option is template files.
Basically, rather than writing out the code for each individual DAG, you write out templates that define the parameters in which the code should run. Then, when you push the code to production, the code runs through those static files to create numerous templated files that are similar and have minor changes.
Yet another option is to create a DAG factory, where a single YAML configuration file is used to generate a vast array of DAGs without the need for templates; however, dynamically generating DAGs can cause performance issues when used at scale.
Traditionally, when you use templates, Airflow is essentially supplying the DAG factory, so implementing a DAG factory in your infrastructure can introduce unknowns into the equation.
Astronomer has some really useful insight into how this all works. You can read more about single-file methods of generating DAGs in this article.
If you’re outgrowing Airflow’s tolerance for how long it takes a single DAG factory to churn out all of its DAGs, one solution is to clone that DAG factory into smaller versions of itself that each manage a portion of the overall task.
This was our situation — thousands of DAGs whose shape and other parameters were contained in a single configuration bundle that was organized in a hierarchy of source mart and table definitions. Individual DAG files were too numerous to manage well, and because of the way Airflow re-imports and evaluates the DAG files on every task execution, the overhead of building the DAGs from this configuration lead to inconsistencies in Airflow’s behavior and execution.
The solution was to keep the factory, but subdivide its scope of work. While the original factory would iterate over all of the source marts, tables, and sub-DAGs, we could restrict it to sub-trees of the overall configuration, which would be faster.
Since the configuration files are basically creating clones of themselves as code, they could derive their function from the file name assigned to them. The mechanism used here is certainly unorthodox, but it was effective.
The magic comes down to deriving the scope from the file’s name:
def get_sourcemart_and_table(fname): stem = Path(fname).stem if "__" in stem: sourcemart_name, table_name = stem.split("__") elif stem in config: sourcemart_name, table_name = stem, None else: SOURCEMART, table_name = None, None return sourcemart_name, table_name
And then use those parameters to filter down the configuration data for source mart and/or table:
def get_sourcemart_configs(sourcemart_filter) -> dict: if sourcemart_filter and sourcemart_filter in config: return { sourcemart_filter: config[sourcemart_filter], } return config def get_table_configs(sourcemart_name, table_filter=None) -> dict: if table_filter: return { table_filter: config[sourcemart_name][table_filter], } return config[sourcemart_name]
With those two functions, we can then change the loops that were previously iterating over every source mart and every table from:
def main(): sourcemarts = config for sourcemart in sourcemarts: tables = sourcemarts[sourcemart] for table in tables: print(f"{sourcemart=}, {table=}") ... # create dag for table main()
to something more like:
def main(filename): sourcemart_name, table_name = get_sourcemart_and_table(filename) sourcemarts = get_sourcemart_configs(sourcemart_name) for sourcemart in sourcemarts: tables = get_table_configs(sourcemart, table_name) for table in tables: print(f"{sourcemart=}, {table=}") ... # create dag for table main(__file__)
Now we can fill the Airflow DAGs folder with softlinks named after the source marts or source marts and tables.
ln -s dag_factory.py dags/mysourcemart.py # will process every table in `mysourcemart` ln -s dag_factory.py dags/othersourcemart__special_table.py. # enable only this table
Every DAG is built from a single source with no custom or one-off wrappers, and an update to the factory code is seen immediately by all of the DAGs linked to that file.
So now we won’t outgrow the factory because we can subdivide arbitrarily with simple link commands.