Tal Sofer, Paul Singman
June 30, 2021

Introducing Kubeflow and lakeFS

Kubeflow is a cloud-native ML platform that simplifies the training and deployment of machine learning pipelines on Kubernetes. An ML project using Kubeflow will consist of isolated components for each stage of the ML lifecycle. And each component of a Kubeflow pipeline is packaged as a Docker image and executed in a Kubernetes Pod. 

The result is a machine learning workflow that is portable, scalable, and reproducible. These are properties that align Kubeflow nicely with the ethos of lakeFS for data. 

lakeFS is wrapper layer around an object store that enables git-like operations such as branching or committing over large datasets. The result is repeatable, atomic and versioned data lake operations resulting is better manageability of data.

Machine Learning Experiment Reproducibility

Reproduciblity of a machine learning experiment is a useful property in many situations. Say one day you report 98% accuracy of your model on a training dataset and the next day you see that value drop 10 points. It’s useful to be able to re-create the original accuracy, if only to prove that you’re not crazy or made some silly error.

There are different ways people go about making training and testing datasets reproducible in this way to specific point in time. One approach involves copying a portion of the production dataset to a separate location or under a different name. Another approach involves hardcoding specific date ranges in the query that sources the data to the model.

While both approaches can work, they incur unnecessary cost in data duplication or involve a manual, error-prone process of date manipulation.

In contrast, LakeFS commits provide an elegant solution to the problem of selecting a specific point-in-time slice of a dataset or datasets. For this reason, it makes sense to include lakeFS operations like branching and committing to a Kubeflow pipeline and achieve full experiment reproducibility.

The only question left is how to do this. Absent a native integration between the two tools, we need to implement our own solution to make it work.

We’re happy to share that we were able to make it work. Let’s see how!

Running lakeFS commands within a Kubeflow component

The way to integrate lakeFS with a Kubeflow pipeline is to create Kubeflow components that perform lakeFS operations. Currently, there are two methods to create lakeFS ContainerOps classes:

  1. Implement a function-based ContainerOp that uses lakeFS’s Python API to invoke lakeFS operations.
  2. Implement a ContainerOp that uses the lakectl CLI docker image to invoke lakeFS operations.

Function-based ContainerOps

To implement a function-based component that invokes lakeFS operations, you can use the pip-installable lakeFS Python OpenAPI client

The code example below shows how to create a Kubeflow component that performs the create branch lakeFS operation. 

Create Branch Example Operation
from kfp import components

def create_branch(repo_name, branch_name, source_branch):
   import lakefs_client
   from lakefs_client import models
   from lakefs_client.client import LakeFSClient

   # lakeFS credentials and endpoint
   configuration = lakefs_client.Configuration()
   configuration.username = 'AKIAIOSFODNN7EXAMPLE'
   configuration.password = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
   configuration.host = 'https://lakefs.example.com'
   client = LakeFSClient(configuration)

   client.branches.create_branch(
        repository=repo_name,
        branch_creation=models.BranchCreation(
                name=branch_name,
                source=source_branch)
    )

# Convert the function to a lakeFS pipeline step.
create_branch_op = components.func_to_container_op(
   func=create_branch,
   # Type in the lakeFS version you are using
   packages_to_install=['lakefs_client==<lakeFS version>']) 

Any lakeFS operation supported by the OpenAPI can be included in a component. For example, you could implement a commit and merge function-based Container Operator. Check out the API reference for a complete list of methods. 

Non-function-based ContainerOps

The other approach to running lakeFS commands within Kubeflow uses a non-function based ContainerOp. In this approach, you should use the treeverse/lakectl docker image. Within the container built from this image you can run lakeFS CLI commands to execute the desired lakeFS operations.

For lakectl to work with Kubeflow, you will need to pass your lakeFS configurations as environment variables named:

  • LAKECTL_CREDENTIALS_ACCESS_KEY_ID: AKIAIOSFODNN7EXAMPLE
  • LAKECTL_SECRET_ACCESS_KEY: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
  • LAKECTL_SERVER_ENDPOINT_URL: https://lakefs.example.com
Commit Example Operation
  1. Commit changes to a branch: A ContainerOp that commits uncommitted changes to example-branch on example-repo.
from kubernetes.client.models import V1EnvVar

def commit_op():
   return dsl.ContainerOp(
   name='commit',
   image='treeverse/lakectl',
   arguments=['commit', 'lakefs://example-repo/example-branch', '-m', 'commit message'])
    .add_env_variable(V1EnvVar(name='LAKECTL_CREDENTIALS_ACCESS_KEY_ID',value='AKIAIOSFODNN7EXAMPLE'))
    .add_env_variable(V1EnvVar(name='LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY',value='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'))
    .add_env_variable(V1EnvVar(name='LAKECTL_SERVER_ENDPOINT_URL',value='https://lakefs.example.com')
    )
Merge Example Operation

       2. Merge two lakeFS branches: A ContainerOp that merges example-branch into the main branch of example-repo.

def merge_op():
  return dsl.ContainerOp(
  name='merge',
  image='treeverse/lakectl',
  arguments=['merge', 'lakefs://example-repo/example-branch', 'lakefs://example-repo/main'])
    .add_env_variable(V1EnvVar(name='LAKECTL_CREDENTIALS_ACCESS_KEY_ID',value='AKIAIOSFODNN7EXAMPLE'))
    .add_env_variable(V1EnvVar(name='LAKECTL_CREDENTIALS_SECRET_ACCESS_KEY',value='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'))
    .add_env_variable(V1EnvVar(name='LAKECTL_SERVER_ENDPOINT_URL',value='https://lakefs.example.com')
    )

You can invoke any lakeFS operation supported by lakectl by implementing it as a ContainerOp. Check out the complete CLI reference for the list of supported operations.

Adding lakeFS Components to a Kubeflow pipeline

Let’s put it all together, and create an example Kubeflow pipeline that implements a simple ETL workflow, and makes use of lakeFS operators to feature branch creation and commits.

def lakectl_pipeline():
   create_branch_task = create_branch_op('example-repo', 'example-branch', 'main') # A function-based component   
   extract_task = example_extract_op() 
   commit_task = commit_op()
   transform_task = example_transform_op()
   commit_task = commit_op()
   load_task = example_load_op()

Getting Involved

We hope this post is helpful for anyone looking to integrate lakeFS and Kubeflow in a machine learning workflow. Although the above approach works, it does involve creating custom components and could be made smoother with a native integration. 

If this is something you have interest in:

Read Related Articles.

LakeFS

  • Get Started
    Get Started