Apache airflow enables you to build multistep workflows across multiple technologies. The programmatic approach, allowing you to schedule and monitor workflows, helps users build complicated ETLs on their data that will be difficult to achieve automatically otherwise.
This enabled the evolution of ETLs from simple single steps to complicated, parallelized, multi steps advance transformations:
The challenge is, complicated ETLs mean complicated troubleshooting. When step number 17 out of 50 fails, it is incredibly difficult to understand after the fact what caused the failure.
What to do when a failure happens? An effective method here would be to first revert the production data to a consistent state (before the issue occurred) for improved data availability. And then investigate the problem.
lakeFS lets engineers revert production data to an error-free state in an extremely simple, one-line command. It supports Git-like branching, committing, and reverting operations on the data lake, enabling a safe and error-free way of troubleshooting production issues.
This article shows how to use lakeFS to quickly analyze and troubleshoot failed Airflow jobs, improving data integrity and trustworthiness.
Here’s an idea: Treat your data like code
lakeFS allows you to manage your data the same way that you manage code with git operations, with a quick (under 20 minutes) setup. lakeFS sits on top of the object store itself (Amazon S3, Google Cloud Storage, Azure Blob Storage, etc.) and provides Git-like capabilities for the object store itself such as merge, branch, revert, and commit.
All of that happens through an API, so the ecosystem of tools that we have running in our environment – Airflow being one of them – can either access the object store in a normal way or access the data through lakeFS and enjoy data versioning. The only difference is that before accessing a collection on a bucket, you need to include the name of a branch or a commit identifier.
Option 1: Run DAGs in Isolation
With this approach, we will create a branch out of our production environment (meaning, create an isolated production identical environment without copying it) and will run the DAG on that branch. Once completed, we will merge the final output of the DAG back into production:
This approach might be the best approach for a quick win:
Option 2: Execute Git actions within a DAG
With this approach, we will not only run the DAG on an isolated branch (similar to the previous option). In addition, we will run git actions INSIDE the DAG execution. This means we will have a commit history of changes across steps executed within the DAG:
This approach requires more code; However, it provides more value on top of the previous one:
- Run a lakeFS Server
To demonstrate this, you will need a lakeFS server running.
You can either use a lakeFS playground, which is an isolated lakeFS environment (using our storage, as opposed to your buckets). Or, to run this against your data, spin up a local lakeFS environment or provision a lakeFS Cloud environment.
- Reference notebook example
We provide this sample, which can be used to run the airflow DAGs with lakeFS against your lakeFS server.
git clone https://github.com/treeverse/lakeFS-samples && cd lakeFS-samples/03-apache-spark-python-demo
Then, build & run:
docker build -t lakefs-spark-python-demo . docker run -d -p 8888:8888 -p 4040:4040 -p 8080:8080 --user root -e GRANT_SUDO=yes -v $PWD:/home/jovyan -v $PWD/jupyter_notebook_config.py:/home/jovyan/.jupyter/jupyter_notebook_config.py --name lakefs-spark-python-demo lakefs-spark-python-demo
Note: you need to be able to run a docker container on your machine in order to run this example.
Once you run the container, access your notebook UI but going to the address http://127.0.0.1:8888/ on your browser:
- Modify the notebooks
Click on the file to review and edit it:
To run this example, we will need to input our lakeFS server (the one created here) details.
Enter lakeFS Credentials:
Click on the file to review and edit it:
lakefsEndPoint = '<lakeFS Endpoint URL>' # e.g. 'https://username.aws_region_name.lakefscloud.io' lakefsAccessKey = '<lakeFS Access Key>' lakefsSecretKey = '<lakeFS Secret Key>'
If you are using the playground for example, the link you received has all the details, which you can simply copy over to the above.
Since we are creating a repository, we will need to change the storage namespace to point to a unique path. If you are using your own bucket (not playground), insert the path to your bucket. If you are using the playground, you will want to create a repository in a subdirectory of the sample repository that was automatically created for you.
For example, if you login to your playground and see:
Add a subdirectory to the existing path (in this case, s3://treeverse-demo-lakefs-storage-production/user_big-grouper/my-repo). I.e. insert:
storageNamespace = 's3://treeverse-demo-lakefs-storage-production/user_big-grouper/my-repo/dag1/' # e.g. "s3://username-lakefs-cloud/"
- Run notebooks
You can now run the notebook step by step and view the changes in your lakeFS Server.
- The Setup tasks will start airflow, configure the DAG as well as install a lakeFS python Client.
Make sure to click on the links to airflow when prompted. For example, here:
Copy the password from above and click on the link to the DAG graph, then paste the password:
To get access to the graphical representation of the graph:
- As the DAGs run, don’t forget to switch over to lakeFS and observe the branches merges and commits taking place.
- Troubleshooting DAGs
The second notebook runs the same DAG twice. Once with valid input, and the second with invalid input.
When you run the second DAG against the bad input data (“lakefs_test_latest_file.csv”), the task will fail. Now, with lakeFS, we can see the exact data at the point of the failure in that specific commit.
To do so, click on “etl_task3” task box:
then click on Log button:
Search for “Exception”, to find the following:
This exception happens because column “_c4” (or 5th column) is missing in the latest file.
- Get the snapshot of the data at the time of the error
In the same log file, search for a link to your lakeFS server. You will see the logged message with a link to the branch at the time of the failure:
Click on this link to open lakeFS server on that branch:
You can now download the file (lakefs_test_latest_file.csv) and indeed, you can see the file is missing the 5th column:
We went over two scenarios: running an existing DAG in isolation and creating a new DAG with Git-like actions. In both cases, you achieve:
- Isolated staging environments with no copy
- Promote only quality data to production: Atomically promote the data only once the DAG finishes execution successfully.
- In case bad data was introduced to production, revert back to a good state in milliseconds.
lakeFS not only saves on storage (up to 80%) but also dramatically increases productivity, as these tasks are otherwise manual and time-consuming. Cherry on top: If a production outage ever happens, you can always revert back to where you were before.
We also provide enterprise support as well as lakeFS Cloud – a hosted solution with additional features.