Guy Hardonag
February 3, 2021

In this post, we’ll see how easy it is to use lakeFS with an existing Airflow DAG, to make every step in a pipeline completely reproducible in both code and data. This is done without modifying the actual code and logic of our jobs – by wrapping these operations with lakeFS commits.

An example data pipeline

Let’s take an example of a simple ETL workflow with 3 steps

  1. Extracts data from an external source in their raw JSON representation  
  2. Transforms it to a set of partitioned columnar parquet files
  3. Load it to a location used by our data visualization tool of choice
ETL

Integrating Airflow with lakeFS

There are two aspects we will need to handle in order to run Airflow with lakeFS:

Access and Insert data through lakeFS

Since lakeFS supports AWS S3 API, it works seamlessly with all operators that work on top of S3 (such as SparkSubmitOperator, S3FileTransormOperator, etc.)

All we need to do is set lakeFS as the endpoint-url and use our lakeFS credentials instead of our S3 credentials and that’s about it.

We could then run tasks on lakeFS using the lakeFS path convention s3://[REPOSITORY]/[BRANCH]/PATH/TO/OBJECT

The lakeFS docs contain explanations and examples on how to use lakeFS with AWS CLI, Spark, Presto and many more. 

Run lakeFS commands such as creating branches, committing, merging, etc.

We currently have two options to run lakeFS commands with Airflow
Using the SimpleHttpOperator to send API requests to lakeFS. Or we could use the bashOperator with lakeCTL commands.

Soon lakeFS will have an airflow operator dedicated for lakeFS, but until then, using the bashOperator with lakeCTL commands is quite simple

For example, committing with the bash operator would look like this

commit_extract = BashOperator(
       task_id='commit_extract',
       bash_command='lakectl commit lakefs://example_repo@example_dag_branch -m "extract data"',
       dag=dag,
       )

After adding commits to our DAG it will look like this:

Airflow Dag

Looking at committed data

After running the DAG we can easily go to the lakeFS UI and explore changes produced by our DAG

We could view the list of commits

lakeFS Commit List

And explore data on a specific commit 

lakeFS Specific Commit

From this view, you can compare the two commits to see how a task affects your data (e.g. review the changes done by the transform task).

lakeFS Compare Commits

Data can be easily read data from a commit into a sparkContext for reproduction, by passing in the commit ID:

spark.read.parquet("s3://my-repo/7f326dc3b1/transform").show()

Since commits in lakeFS are guaranteed to be immutable, we can reproduce the exact transform phase using the exact input data as existed at the time of computation, and the same code, by storing the Git commit hash of the job as a metadata field in the lakeFS commit.

Summary

In short, with Airflow and lakeFS you can easily build reproducible data pipelines using commits, without having to modify the code and logic of your job.

The ability to reproduce and work on an isolated branch can also be useful to backfill, manipulate data, run end-to-end tests on a dedicated branch, and more.

Check out the lakeFS docs for recommended models for branching different data processing use cases. 

Feel free to join our slack and let us know how you’re thinking to integrate Airflow with lakeFS.

LakeFS

  • Get Started
    Get Started