Itai Admi
May 25, 2021
lakeFS AIrflow Integration

Today we are excited to announce the official release of the lakeFS Airflow provider! What this package does is allow you to easily integrate lakeFS functionality to your Airflow DAGs. The library is published on PyPI so it can easily be installed in your project via the command:

pip install airflow-provider-lakefs

Once installed, you are free to import and use the lakeFS-specific operators to your Airflow scripts in the same way you import any other operator.

In a previous post, we’ve already demonstrated the power of Airflow and lakeFS together by wrapping Airflow tasks in lakeFS commits.

This process is now even easier with the release of the lakeFS-Airflow provider. The new module exposes several operations for interacting with a lakeFS server:

  • CreateBranchOperator: Create a new lakeFS branch from the source branch (defaults to main).
  • CommitOperator: Commit uncommitted changes to a branch.
  • MergeOperator: Merge one lakeFS branch into another.

New types of sensors are also available to help synchronize running a DAG with external operations:

  • CommitSensor: Wait for a commit to be made on a branch.
  • FileSensor: Wait for a given file to be present.

With these primitives available, you can build an Airflow DAG that incorporates lakeFS-provided capabilities of providing atomic, isolated and reproducible data pipelines.

Let’s go over an example use-case of lakeFS with Airflow to show what this all looks like.

An Airflow + lakeFS Example

Take the case of a basic task in Airflow that runs a spark job and outputs the results to S3.

While this task will enjoy all the benefits that come with Airflow orchestration, it can be made better by incorporating the lakeFS-provided capabilities listed above.

To do this, we will take the above example DAG wrap it with a few lakeFS operators. The final DAG will have the following steps:

  • Create a new branch (CreateBranchOperator)
  • Run the Spark Submit job (updating the input/output paths to point to the lakeFS repo + branch)
  • Commit the resulting data (CommitOperator)
  • Merge the result to the main production branch (MergeOperator)

Here’s a full code example of the new task:

Note: Production DAGs typically include more than a single task. You can use lakeFS operations to commit intermediate outputs to the designated branch after the execution of each job.

Note: DAG’s default args are passed to each operator. For example, the created branch in ‘task_create_branch’ is ‘example-branch‘.

The Benefits

  • Data Debugging Environment — Say the Spark task fails. Since we created an isolated branch as the first step, we can access that branch to easily see an exact snapshot of the data when the error occurred.
  • CI/CD Data Workflow — By running the Spark job over an isolated branch of data, we give ourselves the opportunity to configure a pre-merge hook that runs data quality checks over the results before merging them to the production branch. In this way, we guarantee that bad data won’t pollute downstream consumers.

Adding an Airflow Connection

In order for the integration to work, Airflow needs to be able to communicate with lakeFS.

For authenticating to the lakeFS server, you need to create a new Airflow Connection of type HTTP and pass it to your DAG. You can do that using the Airflow UI or the CLI. Here’s an example Airflow command to create the connection:

$ airflow connections add conn_lakefs --conn-type=HTTP --conn-host=http://<LAKEFS_ENDPOINT> --conn-extra='{"access_key_id":"<LAKEFS_ACCESS_KEY_ID>","secret_access_key":"<LAKEFS_SECRET_ACCESS_KEY>"}'

Note: If you’re unsure of where to get lakeFS credentials, check out how to create a lakeFS repository in the docs.

Visualizing the new DAG in the Airflow UI

In the Graph View of the Airflow UI, we see all four tasks, allowing us to trace the progress and track the history of each step.

Learn More

Integrating lakeFS commands into existing Airflow is now easier than ever before.

If you are interested in trying this out on your own DAGs, feel free to check out our documentation pages, join our Slack community, or create an issue on the lakeFS Github repository.

We look forward to hearing from you!

LakeFS

  • Get Started
    Get Started