Introduction
lakeFS makes creating isolated environments for data ingestion instantaneous so you can run data ingestion jobs without impacting your production data and merge ingested data atomically to your production data instantaneously. This frees you from spending time on environment maintenance and makes it possible to create as many environments as needed.
If ingested data is wrong then lakeFS also lets engineers revert production data to an error-free state in an extremely simple, one-line command. This frees you from spending time to remove bad ingested data from production.
But developing a data ingestion pipeline requires automation and orchestration for repeated queries, data quality checks, and overall cluster operations. This makes Airflow the perfect orchestrator to pair with lakeFS. With Airflow, you can orchestrate each step of your data ingestion pipeline, integrate with services that clean your data, and store and publish your results using SQL and Python code.
This tutorial will review all steps needed to integrate Apache Airflow with lakeFS. If you don’t have an existing Airflow environment, then this tutorial also includes steps to run Managed Apache Airflow on AWS.
This tutorial assumes that lakeFS is already set up and running against your storage (in this example, AWS S3), and is focused on setting up the integration of Airflow and lakeFS.
Prerequisites
- lakeFS Server (you can deploy one independently or use the hosted solution lakeFS Cloud).
- lakeFS Credentials (Key & Secret) or the permissions to create those credentials.
- Airflow environment (if you don’t have an existing Airflow environment then refer to installation documentation or follow Step 1 below to create Managed Airflow environment on AWS)
Step 1 – Create Managed Airflow Environment on AWS
If you would like to run Apache Airflow on AWS, then refer to Get Started user guide for Amazon Managed Workflows for Apache Airflow and follow the instructions below.
- Open your browser and login to your AWS Console.
- Search for s3 in the top search bar and select S3 service:
- Click on “Create bucket” button to create a S3 bucket to store Airflow related content e.g. Airflow DAGs, plugins.zip, and requirements.txt file:
- Enter Bucket name, select the same AWS Region for the bucket where you want to run Airflow, choose Block all public access and enable Bucket Versioning and :
- Once the S3 bucket gets created, select the newly created bucket from the list of buckets. Click on “Create folder” button to create dags folder:
- Create requirements.txt file locally on your computer with the following content:
airflow-provider-lakefs apache-airflow-providers-apache-spark lakefs-client |
- Go back to the newly created S3 bucket in your browser and click on “Upload” button:
- Click on the “Add files” button and select requirements.txt file stored on your computer. Click on the “Upload” button to upload the file:
- After you uploaded the file, your S3 bucket should have dags sub-folder and requirements.txt file:
- Search for mwaa in the top search bar of AWS Console and select “Managed Apache Airflow” service:
- Click on “Create environment” button:
- Enter the environment name under Environment Details. Select Airflow version and Weekly maintenance window:
- Scroll down and enter S3 Bucket, DAGs folder and Requirements file (created in previous step) information and click “Next”:
- Choose an existing VPC or Create MWAA VPC. If you create a new MWAA VPC then refresh & choose it from the dropdown list after creating it:
- OPTIONAL: If you want Airflow UI to be available via internet then select Public network option:
- Select Environment class and change worker & scheduler count. Following selection will work for this tutorial or change it based on your future workload requirements:
- Scroll down and click “Next”:
- Review your environment configuration and click “Save”.
- It takes 30-45 minutes to create an Airflow environment.
- Once Airflow environment is available then click on “Open Airflow UI” link to open Airflow UI:
- If you would like to use Airflow CLI instead of UI then refer to CLI sections in Accessing the Apache Airflow UI documentation.
- Go to Step 3.
Step 2 – Install Python packages
If you followed “Step 1 – Create Managed Airflow Environment on AWS” then skip this step.
- Install lakeFS Airflow package on Airflow server:
pip install airflow-provider-lakefs
- Install Apache Spark package on Airflow server:
pip install apache-airflow-providers-apache-spark
- Install lakeFS Python Client on Airflow server:
pip install lakefs-client
Step 3 – Acquire lakeFS Key and Secret
In this step, we will acquire the lakeFS Access Key and Secret Key that will later on be used in Airflow in the following steps. If you already have an Access & Secret Key, you can skip this step.
Note: To create a new access key, You need either AuthManageOwnCredentials or AuthFullAccess lakeFS Policy attached to your user.
- Login to lakeFS and click on Administration -> Create Access Key
A new key will be generated:
As instructed, copy the Secret Access Key and store it somewhere safe. You will not be able to access it again (You will be able to create new ones).
Step 4 – Create a lakeFS connection on Airflow
To access the lakeFS server and authenticate with it, you will create a new Airflow Connection of type HTTP and will add it to your DAG. You can do that by using the Airflow UI or the CLI.
Create a lakeFS connection by using Airflow UI
- Click on “Connections” sub-menu under “Admin” menu:
- Click on “+” button to add a new connection:
- Enter the following values for the connection fields and click on the “Test” button. If the test is successful then click the “Save” button. If the test is not successful then check your lakeFS server endpoint and access keys.
Note: Replace <LAKEFS_ENDPOINT> with your lakeFS server endpoint which might be in this format if you are using lakeFS Cloud (don’t use / at the end): https://orgname.aws_region_name.lakefscloud.io
Note: Replace <LAKEFS_ACCESS_KEY_ID> and <LAKEFS_SECRET_ACCESS_KEY> with the keys that you created in the previous step.
Connection Id: conn_lakefs
Connection Type: HTTP
Host: <LAKEFS_ENDPOINT>
Extra: {"access_key_id":"<LAKEFS_ACCESS_KEY_ID>","secret_access_key":"<LAKEFS_SECRET_ACCESS_KEY>"}
Create a lakeFS connection by using Airflow CLI
- Airflow CLI to create the connection:
Note: Replace <LAKEFS_ENDPOINT> with your lakeFS server endpoint which might be in this format if you are using lakeFS Cloud (don’t use / at the end): https://orgname.aws_region_name.lakefscloud.io
Note: Replace <LAKEFS_ACCESS_KEY_ID> and <LAKEFS_SECRET_ACCESS_KEY> with the keys that you created in the previous step.
airflow connections add conn_lakefs --conn-type=http \
--conn-host=http://<LAKEFS_ENDPOINT> \
--conn-extra='{"access_key_id":"<LAKEFS_ACCESS_KEY_ID>","secret_access_key":"<LAKEFS_SECRET_ACCESS_KEY>"}'
Step 5 – Create Airflow variables
We will create a few variables which will be used in Airflow DAGs. You can do that by using the Airflow UI or the CLI.
Create Airflow variables by using Airflow UI
- Create variables.json file locally on your computer with the following content:
{ "lakefsEndPoint": "<LAKEFS_ENDPOINT>", "lakefsAccessKey": "<LAKEFS_ACCESS_KEY_ID>", "lakefsSecretKey": "<LAKEFS_SECRET_ACCESS_KEY>", "repo": "<LAKEFS_REPO_NAME>", "sourceBranch": "<LAKEFS_DEFAULT_BRANCH_NAME>", "newBranch": "airflow_demo_existing_dag", "conn_lakefs": "conn_lakefs"}
Note: Replace <LAKEFS_ENDPOINT> with your lakeFS server endpoint which might be in this format if you are using lakeFS Cloud (don’t use “/” at the end): https://orgname.aws_region_name.lakefscloud.io
Note: Replace <LAKEFS_ACCESS_KEY_ID> and <LAKEFS_SECRET_ACCESS_KEY> with the keys that you created in the Step 3.
Note: Replace <LAKEFS_REPO_NAME> with your lakeFS repository name. If you didn’t create the repository yet, then follow the instructions in the documentation.
Note: Replace <LAKEFS_DEFAULT_BRANCH_NAME> with your main branch name that you used while creating the repository.
You can find repository name and default branch name on lakeFS home page:
- Click on “Variables” sub-menu under “Admin” menu in Airflow UI:
- Click on the “Choose File” button, select variables.json file stored on your computer and click on the “Import Variables” button.
- Following 7 variables will get created:
Create Airflow variables by using CLI
- Create variables to store lakeFS credentials:
Note: Replace <LAKEFS_ENDPOINT> with your lakeFS server endpoint which might be in this format if you are using lakeFS Cloud (don’t use / at the end): https://orgname.aws_region_name.lakefscloud.io
Note: Replace <LAKEFS_ACCESS_KEY_ID> and <LAKEFS_SECRET_ACCESS_KEY> with the keys that you created in the Step 3.
airflow variables set lakefsEndPoint <LAKEFS_ENDPOINT>
airflow variables set lakefsAccessKey <LAKEFS_ACCESS_KEY_ID>
airflow variables set lakefsSecretKey <LAKEFS_SECRET_ACCESS_KEY>
- Create variables to store lakeFS versioning information:
Note: Replace <LAKEFS_REPO_NAME> with your lakeFS repository name. If you didn’t create the repository yet then follow the instructions in the documentation.
Note: Replace <LAKEFS_DEFAULT_BRANCH_NAME> with your main branch name that you used while creating the repository.
You can find repository name and default branch name on lakeFS home page:
airflow variables set repo <LAKEFS_REPO_NAME>
airflow variables set sourceBranch <LAKEFS_DEFAULT_BRANCH_NAME>
airflow variables set newBranch 'airflow_demo_existing_dag'
- Create a variable to store connection name created in previous step:
airflow variables set conn_lakefs 'conn_lakefs'
Step 6 – Airflow tutorial
- We will use Working with TaskFlow Airflow tutorial and will modify it to use the lakeFS version control functionality. This tutorial includes a simple ETL DAG (don’t worry if you don’t have this tutorial in your Airflow environment):
- Download modified tutorial lakefs_tutorial_taskflow_api_etl.py manually from lakeFS samples repo or run following command on your computer:
wget https://raw.githubusercontent.com/treeverse/lakeFS-samples/main/03-multiple-samples/airflow/Existing_DAG/lakefs_tutorial_taskflow_api_etl.py
- If you would like to review the changes we made in the tutorial, then open lakefs_tutorial_taskflow_api_etl.py file in a text editor and search for “[START of lakeFS Code]”.
- If you are using Managed Airflow on AWS, then follow the next few instructions, otherwise copy the downloaded lakefs_tutorial_taskflow_api_etl.py file in the DAGs folder referenced in your Airflow configuration file (airflow.cfg) and go to Step 7.
Note: The default location for your DAGs is ~/airflow/dags.
- Go to S3 bucket you created in Step 1 and click on “dags” link to go to dags sub-folder:
- Click on “Upload” button inside dags sub-folder:
- Click on the “Add files” button and select lakefs_tutorial_taskflow_api_etl.py file stored on your computer. Click on the “Upload” button to upload the file:
Step 7 – lakeFS Wrapper DAG
We will run the Airflow tutorial DAG on an isolated branch in lakeFS. With this approach, we will create a branch out of our production environment in lakeFS (meaning, create an isolated production identical environment without copying it) and will run the DAG on that branch. Once completed, we will merge the final output of the DAG back into production:
- Download the Wrapper DAG that lakeFS created to trigger Airflow tutorial DAG on an isolated branch or run following command on your computer:
wget https://raw.githubusercontent.com/treeverse/lakeFS-samples/main/03-multiple-samples/airflow/Existing_DAG/lakefs_wrapper_dag.py
- If you are using Managed Airflow on AWS, then follow the next few instructions, otherwise copy the downloaded lakefs_wrapper_dag.py file in the DAGs folder referenced in your Airflow configuration file (airflow.cfg) and go to Step 8.
Note: The default location for your DAGs is ~/airflow/dags.
- Go to S3 bucket you created in Step 1 and click on “dags” link to go to dags sub-folder:
- Click on “Upload” button inside dags sub-folder:
- Click on the “Add files” button and select lakefs_wrapper_dag.py file stored on your computer. Click on “Upload” button to upload the file:
Step 8 – Trigger DAG
- You should see these 2 DAGs in the Airflow UI:
Note: You might need to wait a minute for the DAGs to appear in your UI.
- You can unpause and trigger the DAG either by using Airflow UI or CLI.
- Use the toggle switch in UI and unpause both DAGs or use CLI in the next instruction:
- Unpause both DAGs by using CLI:
airflow dags unpause lakefs_wrapper_dag
airflow dags unpause lakefs_tutorial_taskflow_api_etl
- You should see these 2 DAGs unpaused in the Airflow UI:
- Click on the Play button for lakefs_wrapper_dag & “Trigger DAG” or use CLI in the next instruction (IMPORTANT: trigger lakefs_wrapper_dag only):
- Trigger lakefs_wrapper_dag only by using CLI:
airflow dags trigger lakefs_wrapper_dag
Step 9 – Visualize DAG
- To get access to the graphical representation of the DAG. Click on the lakefs_wrapper_dag in Airflow UI:
- Click on “Graph” icon to see the graphical representation of the DAG and status for each task:
- As the DAG run, switch over to lakeFS UI and observe the branches, merges and commits taking place via Airflow DAG:
- You can also click on any completed task box in Airflow UI, then click on the Log button and search for “lakeFS URL” (this URL will take you to the applicable branch/commit/data file).
Next steps
In this guide, you learned how to successfully integrate Apache Airflow and lakeFS. For more examples on how Airflow and lakeFS work better together, read more in our blog or join our Slack community.
Table of Contents