Apache recently released Airflow 2.3.0. Since its last update, Apache Airflow 2.2.0, this new release has over 700 commits, including 50 new features, 99 improvements, 85 bug fixes and several doc changes.
Airflow 2.3 is a landmark release. Its new dynamic task mapping API, in particular, is a foundational feature that will improve iteratively in each new release of Airflow.
Here is a glimpse of the major updates:
Let’s discuss these updates in detail.
Dynamic task mapping lets Airflow trigger tasks based on unpredictable input conditions. This new feature adds the possibility of creating tasks dynamically at runtime. This is similar to defining tasks in a loop.
A scheduler can carry out this action depending on the results of the previous task rather than having the DAG file collect the data and execute it by itself. The scheduler will make copies of the mapped job, one for each input, just before it is executed. Dynamic Mapping is able to have a task generate the list to iterate over.
Representation
[]
Benefits
Limitations
UnmappableXComTypePushed
exception is thrown at runtime with any other data type.Implementation
Dynamic Task Mapping is simple to implement with two new functions:
The Grid View replaces the Tree View, with a focus on task history. Grid View makes the DAG display cleaner and provides more useful information than the previous tree view. For example, the grid view can quickly filter tasks upstream, so we can easily monitor the DAG. Also, we can directly know the DAG duration to finish all task instances, which allows us to know the average performance of the DAG.
Benefits
You can now create connections using the JSON serialization format.
Previously in general we could only store connections in the Airflow URI format. With this change we can serialize as JSON.
The Airflow URI format can be very tricky to work with and although we have for some time had a convenience method with Connection.get_uri
, using JSON is just simpler. JSON serialization can also be used when setting connections in the environment variables.
A new command in Airflow 2.3.0 is used for your chosen version by downgrading or upgrading the database.
The downgrade/upgrade SQL scripts can also be generated for your database. You can also run it against your database manually or just view the SQL queries, which would be run by the downgrade/upgrade command.
airflow db downgrade
downgrades the database to your chosen version and airflow db upgrade
upgrades your database with the schema changes in the Airflow version you’re upgrading to. Alternatively, you may downgrade/upgrade to a specific Alembic revision ID.
If you want to preview the commands but not execute them, use option --show-sql-only
.
Options --from-revision
and --from-version
may only be used in conjunction with the --show-sql-only
option, because when actually running migrations we should always downgrade from the current revision.
_BranchPythonDecoratedOperator
added a decorator for BranchPythonOperator
which wraps a Python callable and captures args/kwargs when called for execution.@task.branch(task_id="branching") def random_choice(): return random.choice(options)
ShortCircuitOperator
configurability for respecting downstream trigger rulesignore_downstream_trigger_rules
, which allows users to have the ShortCircuitOperator
perform a "hard short" (i.e., blindly skip all downstream tasks; the current behavior) or a "soft short" (i.e., the immediate, downstream task(s) are skipped only and the Scheduler is left to handle the trigger rules appropriately).DummyOperator
with EmptyOperator
execution_timeout
configurableexecution_timeout
attribute can be globally configured via airflow.cfg. The default value is still None. Users are expected to define an integer value to be passed into timedelta object to set the timeout in terms of seconds by default, via configuration.ALL_SKIPPED
trigger rule Trigger Rule when all upstream tasks are in a skipped
state Example:start = DummyOperator(task_id='start') success = DummyOperator(task_id='success', trigger_rule='all_skipped') skip = DummyOperator(task_id='skip', trigger_rule='all_skipped')
With the new release the improvement in Airflow is astounding. It eliminates a growing amount of the tedious, repetitive work that would otherwise be done manually by the developers.
Airflow's new Grid View and dynamic task mapping API not only improve each user's experience but also formalize and incorporate use case-specific logic that would otherwise need to be developed by skilled developers. The new release keeps complete backward compatibility, so it should be able to run all code created in earlier versions. You could claim that Airflow breaks the vicious cycle of writing and maintaining custom code for each use case with each new release.