<< ALL BLOG POSTS

Improving Big Data: A Guide to Enhanced Pipelines

|
January 25, 2024
Table of Contents

Seeking to improve your big data pipeline? This blog walks you through enhancements made to a client's system using Airflow Datasets, DAG Dependencies, Azure Durable Functions, and edge cases.

Learn how we added functionality and flexibility by streamlining data integration, minimizing cost increases, and creating a scalable pipeline development process.

These enhancements:
  1. Streamlined data integration: We streamlined the integration of data from various sources using Python file-like objects, enabling direct streaming to storage. This approach not only simplifies the data handling process but also significantly reduces costs. Additionally, the introduction of Azure Durable Functions has allowed for seamless management of data workflows, offering the flexibility to easily adapt to future requirements.
  2. Minimized cost increases: Our refined approach leverages more cost-effective server options and requires minimal storage, thereby optimizing the investment in cloud resources. This strategic choice delivers both immediate and long-term financial benefits.
  3. Created a scalable pipeline development process: We designed the pipeline development process to facilitate easy migration to alternative cloud providers, protecting the client's investment into the future. Furthermore, our streamlined configuration processes for sourcemarts* and tables** significantly reduced the learning curve for new developers, ensuring rapid adaptation and effective utilization of new features.

For tech leaders, this project demonstrates the value of innovative data management solutions. Strategic thinking, combined with leading-edge technology, can help you solve complex data challenges.

Read more about our big data services and projects, and watch this presentation, “Too Big for DAG Factories,” for a deeper dive into how you can transform, optimize and scale your data workflow.

Interested in the technical details for enhancing a data pipeline, which integrates Airflow, Azure DataFactory, and Databricks? Keep reading.

Getting Started

This article explores enhancements we implemented for a longstanding client's system. Initially, we developed a medallion architecture for them. Our latest modifications introduced prerequisite processing steps to their data pipelines, significantly boosting the system's capabilities and flexibility. We also reduced their dependency on proprietary cloud architectures, which can often introduce challenges. Through these updates, we optimized the client’s infrastructure for better performance and greater independence.

Specifically, the client outlined several key requirements for enhancing their data pipeline, which integrates Airflow, Azure DataFactory, and Databricks.

These requirements focused on:
  1. Handling of Azure Blob Storage sources
  2. Handling of JSON file sources
  3. Handling of API sources
  4. Handling compressed files from sources, e.g. zip
  5. Implementing sourcemart-level processing, meaning certain tasks must be completed before initiating table-level Directed Acyclic Graphs (DAGs)

These tasks were not identified as a singular challenge but rather as a series of distinct issues, prioritized in the sequence listed above.

The project involved a considerable number of sourcemarts, each requiring careful attention. This "Too Big for DAG Factories?" blog offers valuable insights into the complexities of managing extensive sourcemarts. The blog is particularly useful for understanding the unique configurations of each sourcemart and their tables, which are defined using JSON. This background informed many of our strategic decisions in addressing the client's needs.

Azure Blob Storage Support

The first task we worked on was accessing Azure Blob Storage sources. A prototype was built using Azure Functions and the azure-storage-blob library.

From the beginning, we believed the most effective strategy for ensuring long-term flexibility and functionality was to manage the data using Python file-like objects. This approach was chosen with the anticipation of adding support for JSON files later, and with the understanding that a modular design would offer greater possibilities for future enhancements.

Initially, this file-like object choice was a preference mainly to allow interoperability between the various sources and destinations we would use. It also turned out to be a big benefit when looking into the APIs.

The traditional process requires you to:
  • Load the data from the source into the Azure Function container memory
  • Save to the Azure storage used by the container running the Azure Function
  • Copy from that container to the client's raw storage

We could instead stream directly from the source straight to the raw storage with the use of file-like objects using readinto().

This method also makes it possible to choose less expensive server instances with lower memory and minimal storage for Azure Functions, which further cuts down on resource usage and costs.

JSON File Sources

The next functionality we added to the prototype was the handling of JSON files, namely JSONP, which has a format of one JSON object per line.

The advantage of using file-like objects continued to be evident throughout the project. In this instance, we were able to pass the file-like object to a modular Transformer class which would read the JSONP stream by line and write out to another file-like object representing the output on the client's raw storage using csv.writer(). At this stage, the pre-existing DAG pipelines could then process the data.

Durable Functions Repository Structure

After refactoring the prototype, we implemented Durable Functions which allowed data to move seamlessly from one "Processor" to another. This approach supports various workflows like unzipping JSONP files and converting them into CSV format.

The project structure now looks like this:
  1. function_app.py — Azure's entry point for instantiating Durable Functions
  2. azure_shim/ — The interface between Azure activities/adapters and Python processors
    • activity_triggers/activity_triggers.py — registers adapters
    • adapters/
      • adapter_base.py — shared functionality on all adapters
      • decompress.py — sets up connections between source, destination, and the sourcemart configured decompression handler class
      • transform.py — sets up the connections between source, destination, and the sourcemart configured transformation class
  3. mounters/ — modular source type support
    • base.py — empty, a placeholder for shared functionality
    • azure_blob.py — set up blob client, readall and readinto support for the blob client
    • api.py — not implemented due to time constraints, here for an example of extensibility
    • nfs.py — not implemented in favor of continued DataFactory use due to time constraints, here for an example of extensibility
  4. writers/
    • azure_blob_writer.py — extends io.BufferedRandom, implements write(), flush() and close()
    • azure_blob_csv_writer.py — extends azure_blob_writer.py, CSV write() required a different encoding
  5. processors/ — these do the real work
    • processor_base.py
    • decompress/
      • base.py
      • zip.py
    • transform/ — note: transform destination format will always be CSV
      • base.py
      • jsonl.py — iterates over input file-like stream and loads JSON object row, transforming to CSV based on sourcemart configuration

With this structure, we have a modular means for handling multiple types of source data (JSON, Zip) from potentially any source type (API, network share, etc.) based entirely on configuration passed into the entry point.

Separating azure_shim gives the client flexibility. In the future, if they decide to switch from their current cloud provider (Azure) to another, they can do so without redeveloping the entire process. If they decide to move, the adjustments needed would be limited to an equivalent entry point file for that provider and a matching "shim" ported to that cloud provider’s APIs.

Furthermore, development can continue on both the current (Azure) and the new cloud platforms simultaneously, making the transition smoother.

Sourcemart-level Pipelines

Before we could integrate the new features into the existing pipeline, the client shifted their priorities. They placed the highest importance on updating the system to handle sourcemart-level pipelines. This update was necessary to support new sourcemarts. Additionally, adding the decompress support becomes a top priority. This feature was not yet included in the prototype; initially, only JSON file support was available.

The key requirement for these tasks was to set up a system that could run one or more configurable processes before table pipelines were run.

A few goals became clear:
  1. Design the changes to the sourcemart and table configuration JSON so that the developers onboarding new sourcemarts would need minimal training for the new structures.
  2. Minimize changes to the DAG, as the process for retrieving the sourcemart-level data would be much the same as the process for retrieving the table-level data.
  3. Complete the sourcemart-level pipelines as a prerequisite for starting the currently scheduled table level pipelines.

Our solutions to these issues were facilitated by Airflow Datasets.

Datasets can be viewed as functionally similar to pub-sub systems. In other words, DAG tasks can be configured to notify datasets when they have completed successfully, and DAGs can be configured to instantiated based on Datasets being notified.

Specifying to a Dataset/Datasets that a DAG is complete:
ds_update = DummyOperator(  
    task_id="trigger_outlet_datasets",  
    outlets=[Dataset(name) for name in end_task_outlets_names],  
)
Specifying that a DAG can be instantiated by a Dataset/Datasets:
dag = DAG(  
    schedule=[Dataset(name) for name in schedule_names]
    ...
)

We now have an Airflow native and low-resource method of handling the dependencies between DAGs. This means we didn't have to make changes of the DAGs to either trigger another DAG or check for the completion of a prerequisite DAG, which would have consumed more resources and been more brittle.

Current Architecture:
Current Architecture.png
New Architecture:

To ease into the concepts, I'll introduce the new architecture by iteration.

First we broke the medallion architecture DAG into two stages:

  1. Collector - retrieves data from the source
  2. Refiner - processes the data from raw all the way through to the final EDW
New Architecture.png

Note that from this point on the contents of the refiner have been simplified down to "RawToBronze -> EDW" but includes all subsequent stages as in the “Current Architecture” diagram.

As you can see, we now treat all sourcemart-level and table-level pipelines as table-level pipelines because the functionality required for each is essentially the same. The added benefit is that this gives us consistency between configurations for both.

In the DAG factory code, we continue to have just one factory creating both the Collector and Refiner. This is because shared items like DAG scheduling, end-of-pipeline dataset updates, and a number of stages not shown here for brevity are needed across both the Collector and the Refiner. Differences in task population between the Collector and the Refiner are determined based on the type.

The sourcemart configuration for the last diagram is straightforward:
{
    "upstreams": ["T1"],
    "downstreams": ["T2", "T3"]
}

One benefit to splitting the DAG into the Collector and the Refiner should now be visible. It would allow us to start the table DAGs without waiting for the Refiner stage from the sourcemart table to complete. In most cases, the table-level pipelines only needs the raw data.

DAG Scheduling

  1. To maintain the existing schedules on table-level pipelines the DAG factory specifies a simple scheduled no-op DAG that notifies the dataset that you can see above T2 SrcToRaw and below T3 SrcToRaw. Thanks to Airflow Datasets the T2 Collector can pick up the "scheduled" notification to both ds T2 and the sourcemart pipeline’s dsr1 and begin execution.
  2. This had an added benefit that some "Dag Dependency" functionality can now be replaced by the use of Datasets, instead of having dedicated tasks inside of each DAG, reducing the tasks and lines of custom code in our code base. That's where Datasets dse2 and dse3 come in to play. With this structure DAGs can be therefore be chained.

Now we have the ability to run a sourcemart-level pipeline in advance of the table-level pipelines, but what does this have to do with the Durable Functions mentioned at the start of this article?

Processors run after the Collector has downloaded data.png

In the diagram above you can now see we have processors that can run after the Collector has downloaded the data, or before the Refiner has started operating on the data. The reason for the Refiner Processor capability will become clear soon.

For the T1 Collector Processor, we specify this in the table-level configuration. This allows handling of situations where the sourcemart level pipeline needs to, for example, download a zip file which is then extracted to raw storage, and then each table potentially needs to transform the data to match its own requirements.

At this point we then needed to consider what happens if there are multiple sourcemart-level pipelines which need to merge and/or transform data for the table-level pipelines.

One use case we were given was a sourcemart that provided three zip files. Each file contained a number of files hat needed to be processed in order to output a number of CSVs. The CSVs would be then used as the table level data.

It made more sense to have an intermediary processing step to handle this case, rather than having each sourcemart-level pipeline or table-level pipeline check to see if their siblings had already completed before carrying out the transformations.

Intermediary processing step.png
Sourcemart configuration with Shared Processors
{
    "upstreams": ["T1", "T2"],
    "downstreams": ["T3", "T4"]
    "processors": [
        {
            "processor_name": "decompress",
            "processor_type": "zip",
            "processor_config": {
                "src": {...},
                "dest": {...}
            }
        },
        {
            "processor_name": "transform",
            "processor_type": "JSONL"
            "processor_config": {
                "src": {...},
                "dest": {...}
            }
        }
    ]
}

(Some configuration details omitted for brevity.)

Now is a good time to explain why there's both a Collector and Refiner Processor, as well as the main reason the decision to split the pipeline into a Collector and Refiner was made.

The use case exists where the developer onboarding the sourcemart wants to unzip files in the sourcemart pipeline but not run any Refiner directly off that, and conversely not need any Collector happen at all for T3 or T4. A possible example is a case where all the data is contained in the zip file, but still requires some separate transformation processing for T3 and T4 tables.

In Airflow, we hand off the request to start the Processors by using a method which returns a TaskGroup. This TaskGroup passes configuration data to an Airflow SimpleHTTPOperator task call. That instantiate Azure's Durable Function where the response contains a url which is then polled using a HTTPSensor task to wait for completion.

Using the TaskGroup-generating function we could easily add support for all of these use cases in the DAG.

The complexity comes in linking the Datasets and DAGs together because we want to link to and from middle of the sourcemart- and table-level pipelines. For example:

Linking Datasets and DAGs.png
Sourcemart Configuration example with Shared Processor, with skipping of Refiners/Collectors:
{
    "upstreams": ["T1", "T2"],
    "downstreams": ["T3", "T4"]
    "skip_upstreams_refiners": ["T1", "T2"],
    "skip_downstreams_collectors": ["T3", "T4"]
    "processors": [
        {
            "processor_name": "decompress",
            "processor_type": "zip",
            "processor_config": {
                "src": {...},
                "dest": {...}
            }
        },
        {
            "processor_name": "transform",
            "processor_type": "JSONL"
            "processor_config": {
                "src": {...},
                "dest": {...}
            }
        }
    ]
}

(Note: configurations for tables' Collector and Refiner processors is similar.)

The ability to skip certain Collectors and Refiners gives onboarding developers the flexibility to have some Collectors or some Refiners, depending on their sourcemart's needs.

As you might imagine, this dependency parsing gets complex quickly. You don't want to calculate this information in the DAG factory because it will put extra demand on the scheduler for something that will only change rarely (e.g.: on new deployment).

Instead, during deployment, we simplify the work for developers setting up sourcemarts by converting their basic dependency requirements into complex JSON objects. These objects are then used effectively by the sourcemart DAG factories, helping to reduce the load on the Airflow scheduler. This approach is part of our ongoing efforts to minimize any possible issues with resource usage.

Unit Testing

Given the numerous ways DAGs and Datasets can now interact, it was crucial to write unit tests to check the creation of dependencies among these various entry and exit points.

Time spent on developing unit tests proved very beneficial. We were able to refactor and optimize the code when necessary. This process was faster because we didn't have to manually review the impact of changes on each DAG and Dataset in Airflow, which saved time and money. This was especially helpful given the lag with Airflow's scheduler and the need to enhance configurations during the deployment process.

 

References

*A sourcemart is where data comes from, often grouped by who provides it. For example, it could be data from a product supplier.

**A table refers to a collection of data organized around a specific subject, such as transaction logs, shipment details, or records of service delivery, provided by a particular vendor.

Related Posts
How can we assist you?
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.