Seeking to improve your big data pipeline? This blog walks you through enhancements made to a client's system using Airflow Datasets, DAG Dependencies, Azure Durable Functions, and edge cases.
Learn how we added functionality and flexibility by streamlining data integration, minimizing cost increases, and creating a scalable pipeline development process.
For tech leaders, this project demonstrates the value of innovative data management solutions. Strategic thinking, combined with leading-edge technology, can help you solve complex data challenges.
Read more about our big data services and projects, and watch this presentation, “Too Big for DAG Factories,” for a deeper dive into how you can transform, optimize and scale your data workflow.
Interested in the technical details for enhancing a data pipeline, which integrates Airflow, Azure DataFactory, and Databricks? Keep reading.
This article explores enhancements we implemented for a longstanding client's system. Initially, we developed a medallion architecture for them. Our latest modifications introduced prerequisite processing steps to their data pipelines, significantly boosting the system's capabilities and flexibility. We also reduced their dependency on proprietary cloud architectures, which can often introduce challenges. Through these updates, we optimized the client’s infrastructure for better performance and greater independence.
Specifically, the client outlined several key requirements for enhancing their data pipeline, which integrates Airflow, Azure DataFactory, and Databricks.
These tasks were not identified as a singular challenge but rather as a series of distinct issues, prioritized in the sequence listed above.
The project involved a considerable number of sourcemarts, each requiring careful attention. This "Too Big for DAG Factories?" blog offers valuable insights into the complexities of managing extensive sourcemarts. The blog is particularly useful for understanding the unique configurations of each sourcemart and their tables, which are defined using JSON. This background informed many of our strategic decisions in addressing the client's needs.
The first task we worked on was accessing Azure Blob Storage sources. A prototype was built using Azure Functions and the azure-storage-blob
library.
From the beginning, we believed the most effective strategy for ensuring long-term flexibility and functionality was to manage the data using Python file-like objects. This approach was chosen with the anticipation of adding support for JSON files later, and with the understanding that a modular design would offer greater possibilities for future enhancements.
Initially, this file-like object choice was a preference mainly to allow interoperability between the various sources and destinations we would use. It also turned out to be a big benefit when looking into the APIs.
We could instead stream directly from the source straight to the raw storage with the use of file-like objects using readinto()
.
This method also makes it possible to choose less expensive server instances with lower memory and minimal storage for Azure Functions, which further cuts down on resource usage and costs.
The next functionality we added to the prototype was the handling of JSON files, namely JSONP, which has a format of one JSON object per line.
The advantage of using file-like objects continued to be evident throughout the project. In this instance, we were able to pass the file-like object to a modular Transformer
class which would read the JSONP stream by line and write out to another file-like object representing the output on the client's raw storage using csv.writer()
. At this stage, the pre-existing DAG pipelines could then process the data.
After refactoring the prototype, we implemented Durable Functions which allowed data to move seamlessly from one "Processor" to another. This approach supports various workflows like unzipping JSONP files and converting them into CSV format.
readall
and readinto
support for the blob clientio.BufferedRandom
, implements write()
, flush()
and close()
write()
required a different encodingWith this structure, we have a modular means for handling multiple types of source data (JSON, Zip) from potentially any source type (API, network share, etc.) based entirely on configuration passed into the entry point.
Separating azure_shim
gives the client flexibility. In the future, if they decide to switch from their current cloud provider (Azure) to another, they can do so without redeveloping the entire process. If they decide to move, the adjustments needed would be limited to an equivalent entry point file for that provider and a matching "shim" ported to that cloud provider’s APIs.
Furthermore, development can continue on both the current (Azure) and the new cloud platforms simultaneously, making the transition smoother.
Before we could integrate the new features into the existing pipeline, the client shifted their priorities. They placed the highest importance on updating the system to handle sourcemart-level pipelines. This update was necessary to support new sourcemarts. Additionally, adding the decompress
support becomes a top priority. This feature was not yet included in the prototype; initially, only JSON file support was available.
The key requirement for these tasks was to set up a system that could run one or more configurable processes before table pipelines were run.
Our solutions to these issues were facilitated by Airflow Datasets.
Datasets can be viewed as functionally similar to pub-sub systems. In other words, DAG tasks can be configured to notify datasets when they have completed successfully, and DAGs can be configured to instantiated based on Datasets being notified.
ds_update = DummyOperator(
task_id="trigger_outlet_datasets",
outlets=[Dataset(name) for name in end_task_outlets_names],
)
dag = DAG(
schedule=[Dataset(name) for name in schedule_names]
...
)
We now have an Airflow native and low-resource method of handling the dependencies between DAGs. This means we didn't have to make changes of the DAGs to either trigger another DAG or check for the completion of a prerequisite DAG, which would have consumed more resources and been more brittle.
To ease into the concepts, I'll introduce the new architecture by iteration.
First we broke the medallion architecture DAG into two stages:
Note that from this point on the contents of the refiner have been simplified down to "RawToBronze -> EDW" but includes all subsequent stages as in the “Current Architecture” diagram.
As you can see, we now treat all sourcemart-level and table-level pipelines as table-level pipelines because the functionality required for each is essentially the same. The added benefit is that this gives us consistency between configurations for both.
In the DAG factory code, we continue to have just one factory creating both the Collector and Refiner. This is because shared items like DAG scheduling, end-of-pipeline dataset updates, and a number of stages not shown here for brevity are needed across both the Collector and the Refiner. Differences in task population between the Collector and the Refiner are determined based on the type.
{
"upstreams": ["T1"],
"downstreams": ["T2", "T3"]
}
One benefit to splitting the DAG into the Collector and the Refiner should now be visible. It would allow us to start the table DAGs without waiting for the Refiner stage from the sourcemart table to complete. In most cases, the table-level pipelines only needs the raw data.
T2 SrcToRaw
and below T3 SrcToRaw
. Thanks to Airflow Datasets the T2
Collector can pick up the "scheduled" notification to both ds T2
and the sourcemart pipeline’s dsr1
and begin execution.dse2
and dse3
come in to play. With this structure DAGs can be therefore be chained.Now we have the ability to run a sourcemart-level pipeline in advance of the table-level pipelines, but what does this have to do with the Durable Functions mentioned at the start of this article?
In the diagram above you can now see we have processors that can run after the Collector has downloaded the data, or before the Refiner has started operating on the data. The reason for the Refiner Processor capability will become clear soon.
For the T1 Collector Processor, we specify this in the table-level configuration. This allows handling of situations where the sourcemart level pipeline needs to, for example, download a zip file which is then extracted to raw storage, and then each table potentially needs to transform the data to match its own requirements.
At this point we then needed to consider what happens if there are multiple sourcemart-level pipelines which need to merge and/or transform data for the table-level pipelines.
One use case we were given was a sourcemart that provided three zip files. Each file contained a number of files hat needed to be processed in order to output a number of CSVs. The CSVs would be then used as the table level data.
It made more sense to have an intermediary processing step to handle this case, rather than having each sourcemart-level pipeline or table-level pipeline check to see if their siblings had already completed before carrying out the transformations.
{
"upstreams": ["T1", "T2"],
"downstreams": ["T3", "T4"]
"processors": [
{
"processor_name": "decompress",
"processor_type": "zip",
"processor_config": {
"src": {...},
"dest": {...}
}
},
{
"processor_name": "transform",
"processor_type": "JSONL"
"processor_config": {
"src": {...},
"dest": {...}
}
}
]
}
(Some configuration details omitted for brevity.)
Now is a good time to explain why there's both a Collector and Refiner Processor, as well as the main reason the decision to split the pipeline into a Collector and Refiner was made.
The use case exists where the developer onboarding the sourcemart wants to unzip files in the sourcemart pipeline but not run any Refiner directly off that, and conversely not need any Collector happen at all for T3 or T4. A possible example is a case where all the data is contained in the zip file, but still requires some separate transformation processing for T3 and T4 tables.
In Airflow, we hand off the request to start the Processors by using a method which returns a TaskGroup. This TaskGroup passes configuration data to an Airflow SimpleHTTPOperator task call. That instantiate Azure's Durable Function where the response contains a url which is then polled using a HTTPSensor task to wait for completion.
Using the TaskGroup-generating function we could easily add support for all of these use cases in the DAG.
The complexity comes in linking the Datasets and DAGs together because we want to link to and from middle of the sourcemart- and table-level pipelines. For example:
{
"upstreams": ["T1", "T2"],
"downstreams": ["T3", "T4"]
"skip_upstreams_refiners": ["T1", "T2"],
"skip_downstreams_collectors": ["T3", "T4"]
"processors": [
{
"processor_name": "decompress",
"processor_type": "zip",
"processor_config": {
"src": {...},
"dest": {...}
}
},
{
"processor_name": "transform",
"processor_type": "JSONL"
"processor_config": {
"src": {...},
"dest": {...}
}
}
]
}
(Note: configurations for tables' Collector and Refiner processors is similar.)
The ability to skip certain Collectors and Refiners gives onboarding developers the flexibility to have some Collectors or some Refiners, depending on their sourcemart's needs.
As you might imagine, this dependency parsing gets complex quickly. You don't want to calculate this information in the DAG factory because it will put extra demand on the scheduler for something that will only change rarely (e.g.: on new deployment).
Instead, during deployment, we simplify the work for developers setting up sourcemarts by converting their basic dependency requirements into complex JSON objects. These objects are then used effectively by the sourcemart DAG factories, helping to reduce the load on the Airflow scheduler. This approach is part of our ongoing efforts to minimize any possible issues with resource usage.
Given the numerous ways DAGs and Datasets can now interact, it was crucial to write unit tests to check the creation of dependencies among these various entry and exit points.
Time spent on developing unit tests proved very beneficial. We were able to refactor and optimize the code when necessary. This process was faster because we didn't have to manually review the impact of changes on each DAG and Dataset in Airflow, which saved time and money. This was especially helpful given the lag with Airflow's scheduler and the need to enhance configurations during the deployment process.
*A sourcemart is where data comes from, often grouped by who provides it. For example, it could be data from a product supplier.
**A table refers to a collection of data organized around a specific subject, such as transaction logs, shipment details, or records of service delivery, provided by a particular vendor.