Introducing Kubeflow and lakeFS
Kubeflow is a cloud-native ML platform that simplifies the training and deployment of machine learning pipelines on Kubernetes. An ML project using Kubeflow will consist of isolated components for each stage of the ML lifecycle. And each component of a Kubeflow pipeline is packaged as a Docker image and executed in a Kubernetes Pod.
The result is a machine learning workflow that is portable, scalable, and reproducible. These are properties that align Kubeflow nicely with the ethos of lakeFS for data.
lakeFS is wrapper layer around an object store that enables git-like operations such as branching or committing over large datasets. The result is repeatable, atomic and versioned data lake operations resulting is better manageability of data.
Machine Learning Experiment Reproducibility
Reproducibility of a machine learning experiment is a useful property in many situations. Say one day you report 98% accuracy of your model on a training dataset and the next day you see that value drop 10 points. It’s useful to be able to re-create the original accuracy, if only to prove that you’re not crazy or made some silly error.
There are different ways people go about making training and testing datasets reproducible in this way to specific point in time. One approach involves copying a portion of the production dataset to a separate location or under a different name. Another approach involves hardcoding specific date ranges in the query that sources the data to the model.
While both approaches can work, they incur unnecessary cost in data duplication or involve a manual, error-prone process of date manipulation.
In contrast, LakeFS commits provide an elegant solution to the problem of selecting a specific point-in-time slice of a dataset or datasets. For this reason, it makes sense to include lakeFS operations like branching and committing to a Kubeflow pipeline and achieve full experiment reproducibility.
The only question left is how to do this. Absent a native integration between the two tools, we need to implement our own solution to make it work.
We’re happy to share that we were able to make it work. Let’s see how!
Running lakeFS commands within a Kubeflow component
The way to integrate lakeFS with a Kubeflow pipeline is to create Kubeflow components that perform lakeFS operations. Currently, there are two methods to create lakeFS ContainerOps classes:
- Implement a function-based ContainerOp that uses lakeFS’s Python API to invoke lakeFS operations.
- Implement a ContainerOp that uses the
lakectlCLI docker image to invoke lakeFS operations.
To implement a function-based component that invokes lakeFS operations, you can use the pip-installable lakeFS Python OpenAPI client.
The code example below shows how to create a Kubeflow component that performs the create branch lakeFS operation.
Create Branch Example Operation
from kfp import components def create_branch(repo_name, branch_name, source_branch): import lakefs_client from lakefs_client import models from lakefs_client.client import LakeFSClient # lakeFS credentials and endpoint configuration = lakefs_client.Configuration() configuration.username = 'AKIAIOSFODNN7EXAMPLE' configuration.password = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' configuration.host = 'https://lakefs.example.com' client = LakeFSClient(configuration) client.branches.create_branch( repository=repo_name, branch_creation=models.BranchCreation( name=branch_name, source=source_branch) ) # Convert the function to a lakeFS pipeline step. create_branch_op = components.func_to_container_op( func=create_branch, # Type in the lakeFS version you are using packages_to_install=['lakefs_client==<lakeFS version>'])
Any lakeFS operation supported by the OpenAPI can be included in a component. For example, you could implement a commit and merge function-based Container Operator. Check out the API reference for a complete list of methods.
The other approach to running lakeFS commands within Kubeflow uses a non-function based ContainerOp. In this approach, you should use the
treeverse/lakectl docker image. Within the container built from this image you can run lakeFS CLI commands to execute the desired lakeFS operations.
lakectl to work with Kubeflow, you will need to pass your lakeFS configurations as environment variables named:
Commit Example Operation
- Commit changes to a branch: A ContainerOp that commits uncommitted changes to
from kubernetes.client.models import V1EnvVar def commit_op(): return dsl.ContainerOp( name='commit', image='treeverse/lakectl', arguments=['commit', 'lakefs://example-repo/example-branch', '-m', 'commit message']) .add_env_variable(V1EnvVar(name='LAKECTL_CREDENTIALS_ACCESS_KEY_ID',value='AKIAIOSFODNN7EXAMPLE')) .add_env_variable(V1EnvVar(name='LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY',value='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY')) .add_env_variable(V1EnvVar(name='LAKECTL_SERVER_ENDPOINT_URL',value='https://lakefs.example.com') )
Merge Example Operation
2. Merge two lakeFS branches: A ContainerOp that merges
example-branch into the
main branch of
def merge_op(): return dsl.ContainerOp( name='merge', image='treeverse/lakectl', arguments=['merge', 'lakefs://example-repo/example-branch', 'lakefs://example-repo/main']) .add_env_variable(V1EnvVar(name='LAKECTL_CREDENTIALS_ACCESS_KEY_ID',value='AKIAIOSFODNN7EXAMPLE')) .add_env_variable(V1EnvVar(name='LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY',value='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY')) .add_env_variable(V1EnvVar(name='LAKECTL_SERVER_ENDPOINT_URL',value='https://lakefs.example.com') )
You can invoke any lakeFS operation supported by
lakectl by implementing it as a ContainerOp. Check out the complete CLI reference for the list of supported operations.
Adding lakeFS Components to a Kubeflow pipeline
Let’s put it all together, and create an example Kubeflow pipeline that implements a simple ETL workflow, and makes use of lakeFS operators to feature branch creation and commits.
def lakectl_pipeline(): create_branch_task = create_branch_op('example-repo', 'example-branch', 'main') # A function-based component extract_task = example_extract_op() commit_task = commit_op() transform_task = example_transform_op() commit_task = commit_op() load_task = example_load_op()
We hope this post is helpful for anyone looking to integrate lakeFS and Kubeflow in a machine learning workflow. Although the above approach works, it does involve creating custom components and could be made smoother with a native integration.
The lakeFS project is an open source technology that provides a git-like version control interface for data lakes, with seamless integration to popular data tools and frameworks.
Our mission is to maximize the manageability of open source data analytics solutions that scale.
Read Related Articles.
Best Practices to Easily Adopt lakeFS
lakeFS is gaining momentum as a solution for data versioning on top of an object store, and more and more data driven organizations adopt lakeFS
lakeFS Product Offerings Overview: Open Source vs. Enterprise vs. Cloud
What is lakeFS? lakeFS is a platform that helps data engineers build scalable and resilient data lakes running on object storage. It provides version control,
The Airflow and lakeFS Integration: Step-by-Step Configuration Tutorial
Introduction lakeFS makes creating isolated environments for data ingestion instantaneous so you can run data ingestion jobs without impacting your production data and merge ingested