Data changes frequently, making the task of keeping track of its exact state over time difficult. Oftentimes, people maintain only one state of their data––its current state. Data lake best practices require reproducibility that lets us time travel between different versions of the data, enabling us a snapshot at the data at different times and in different forms of alteration. We can create branches and test new versions on the same input data. In this article we share how to achieve Reproducible data with lakeFS, and dive into two specific examples for data lakes best practice:
- Building Reproducible Data Pipelines with Airflow and lakeFS
- Building Reproducible Experiments with Kubeflow and lakeFS
The Benefits of Reproducible data
Frequent Data changes are a necessity when working on large amounts of data, yet may have a negative impact on the work, as it becomes hard to:
- Debug a data issue
- Validate machine learning training accuracy (re-running a model over different data gives different results)
- Comply with data audits
Exposing a Git-like interface to data allows keeping track of more than just the current state of data. It enables operations such as branching or committing over large datasets. The result is repeatable, atomic and versioned data lake operations resulting in better manageability of data.
How do I achieve Reproducible data with lakeFS?
To make data reproducible, we recommend taking a new commit of your lakeFS repository every time the data in it changes. As long as there’s a commit taken, the process to reproduce a given state is as simple as reading the data from a path that includes the unique commit_id generated for each commit.
To read data at its current state, we can use a static path containing the repository and branch names. For example, if you have a repository named example with a branch named main, reading the latest state of this data into a Spark Dataframe is always:
df = spark.read.parquet(‘s3://example/main/”)
Note: The code above assumes that all objects in the repository under this path are stored in parquet format. If a different format is used, the applicable Spark read method should be used.
In a lakeFS repository, we are capable of taking many commits over the data, making many points in time reproducible.
In the repository above, a new commit is taken each time a model training script is run, and the commit message includes the specific run number.
If we wanted to re-run the model training script and reproduce the exact same results for a historical run, say run #435, we could copy the commit ID associated with the run and read the data into a dataframe like so:
df = spark.read.parquet("s3://example/296e54fbee5e176f3f4f4aeb7e087f9d57515750e8c3d033b8b841778613cb23/training_dataset/”)
The ability to reference a specific commit_id in code simplifies reproducing the specific state a data collection or even multiple collections. This has many applications that are common in data development, such as historical debugging, identifying deltas in a data collection, audit compliance, and more.
Building Reproducible Data Pipelines with Airflow and lakeFS
There are two aspects we will need to handle in order to run Airflow with lakeFS:
Access and Insert data through lakeFS
Since lakeFS supports AWS S3 API, it works seamlessly with all operators that work on top of S3 (such as SparkSubmitOperator, S3FileTransormOperator, etc.)
All we need to do is set lakeFS as the endpoint-url and use our lakeFS credentials instead of our S3 credentials and that’s about it.
We could then run tasks on lakeFS using the lakeFS path convention s3://[REPOSITORY]/[BRANCH]/PATH/TO/OBJECT
Run lakeFS commands such as creating branches, committing, merging, etc.
We currently have two options to run lakeFS commands with Airflow
Soon lakeFS will have an airflow operator dedicated for lakeFS, but until then, using the bashOperator with lakeCTL commands is quite simple
For example, committing with the bash operator would look like this:
commit_extract = BashOperator( task_id='commit_extract', bash_command='lakectl commit lakefs://example_repo@example_dag_branch -m "extract data"', dag=dag, )
After adding commits to our DAG it will look like this:
Looking at committed data
After running the DAG we can easily go to the lakeFS UI and explore changes produced by our DAG
We could view the list of commits
And explore data on a specific commit
From this view, you can compare the two commits to see how a task affects your data (e.g. review the changes done by the transform task).
Data can be easily read data from a commit into a spark Context for reproduction, by passing in the commit ID:
Since commits in lakeFS are guaranteed to be immutable, we can reproduce the exact transform phase using the exact input data as existed at the time of computation, and the same code, by storing the Git commit hash of the job as a metadata field in the lakeFS commit.
Building Reproducible Experiments with Kubeflow and lakeFS
Machine Learning Experiment – Reproducible data
Reproducibility of a machine learning experiment is a useful property in many situations. Say one day you report 98% accuracy of your model on a training dataset and the next day you see that value drop 10 points. It’s useful to be able to re-create the original accuracy, if only to prove that you’re not crazy or made some silly error.
There are different ways people go about making training and testing datasets reproducible in this way to a specific point in time. One approach involves copying a portion of the production dataset to a separate location or under a different name. Another approach involves hardcoding specific date ranges in the query that sources the data to the model.
While both approaches can work, they incur unnecessary cost in data duplication or involve a manual, error-prone process of date manipulation.
In contrast, lakeFS commits provide an elegant solution to the problem of selecting a specific point-in-time slice of a dataset or datasets. For this reason, it makes sense to include lakeFS operations like branching and committing to a Kubeflow pipeline and achieve full experiment reproducibility.
The only question left is how to do this. Absent a native integration between the two tools, we need to implement our own solution to make it work.
We’re happy to share that we were able to make it work. Let’s see how!
Running lakeFS commands within a Kubeflow component
The way to integrate lakeFS with a Kubeflow pipeline is to create Kubeflow components that perform lakeFS operations. Currently, there are two methods to create lakeFS ContainerOps classes:
- Implement a function-based ContainerOp that uses lakeFS’s Python API to invoke lakeFS operations.
- Implement a ContainerOp that uses the lakectl CLI docker image to invoke lakeFS operations.
The code example below shows how to create a Kubeflow component that performs the create branch lakeFS operation.
Create Branch Example Operation
from kfp import components def create_branch(repo_name, branch_name, source_branch): import lakefs_client from lakefs_client import models from lakefs_client.client import LakeFSClient # lakeFS credentials and endpoint configuration = lakefs_client.Configuration() configuration.username = 'AKIAIOSFODNN7EXAMPLE' configuration.password = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' configuration.host = 'https://lakefs.example.com' client = LakeFSClient(configuration) client.branches.create_branch( repository=repo_name, branch_creation=models.BranchCreation( name=branch_name, source=source_branch) ) # Convert the function to a lakeFS pipeline step. create_branch_op = components.func_to_container_op( func=create_branch, # Type in the lakeFS version you are using packages_to_install=['lakefs_client==<lakeFS version>'])
Any lakeFS operation supported by the OpenAPI can be included in a component. For example, you could implement a commit and merge function-based Container Operator. Check out the API reference for a complete list of methods.
The other approach to running lakeFS commands within Kubeflow uses a non-function based ContainerOp. In this approach, you should use the treeverse/lakectl docker image. Within the container built from this image you can run lakeFS CLI commands to execute the desired lakeFS operations.
For lakectl to work with Kubeflow, you will need to pass your lakeFS configurations as environment variables named:
LAKECTL_CREDENTIALS_ACCESS_KEY_ID: AKIAIOSFODNN7EXAMPLE LAKECTL_SECRET_ACCESS_KEY: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY LAKECTL_SERVER_ENDPOINT_URL: https://lakefs.example.com
Commit Example Operation
- Commit changes to a branch: A ContainerOp that commits uncommitted changes to example-branch on example-repo.
from kubernetes.client.models import V1EnvVar def commit_op(): return dsl.ContainerOp( name='commit', image='treeverse/lakectl', arguments=['commit', 'lakefs://example-repo/example-branch', '-m', 'commit message']) .add_env_variable(V1EnvVar(name='LAKECTL_CREDENTIALS_ACCESS_KEY_ID',value='AKIAIOSFODNN7EXAMPLE')) .add_env_variable(V1EnvVar(name='LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY',value='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY')) .add_env_variable(V1EnvVar(name='LAKECTL_SERVER_ENDPOINT_URL',value='https://lakefs.example.com') )
Merge Example Operation
2. Merge two lakeFS branches: A ContainerOp that merges example-branch into the main branch of example-repo.
def merge_op(): return dsl.ContainerOp( name='merge', image='treeverse/lakectl', arguments=['merge', 'lakefs://example-repo/example-branch', 'lakefs://example-repo/main']) .add_env_variable(V1EnvVar(name='LAKECTL_CREDENTIALS_ACCESS_KEY_ID',value='AKIAIOSFODNN7EXAMPLE')) .add_env_variable(V1EnvVar(name='LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY',value='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY')) .add_env_variable(V1EnvVar(name='LAKECTL_SERVER_ENDPOINT_URL',value='https://lakefs.example.com') )
You can invoke any lakeFS operation supported by lakectl by implementing it as a ContainerOp. Check out the complete CLI reference for the list of supported operations.
Adding lakeFS Components to a Kubeflow pipeline
Let’s put it all together, and create an example Kubeflow pipeline that implements a simple ETL workflow, and makes use of lakeFS operators to feature branch creation and commits.
def lakectl_pipeline(): create_branch_task = create_branch_op('example-repo', 'example-branch', 'main') # A function-based component extract_task = example_extract_op() commit_task = commit_op() transform_task = example_transform_op() commit_task = commit_op() load_task = example_load_op()
Thank you for reading how to create Reproducible data with lakeFS. With lakeFS, your data lake is versioned and you can easily time-travel between consistent snapshots of the lake. The ability to reproduce and work on an isolated branch can also be useful to backfill, manipulate data, run end-to-end tests on a dedicated branch, and more.
Table of Contents