Ready to dive into the lake?
lakeFS is currently only
available on desktop.

For an optimal experience, provide your email below and one of our lifeguards will send you a link to start swimming in the lake!

lakeFS Community
Iddo Avneri
Iddo Avneri Author

Iddo has a strong software development background. He started his...

Last updated on April 26, 2024

Introduction

Schema validation ensures that the data stored in the lake conforms to a predefined schema, which specifies the structure, format, and constraints of the data.

It’s important for:

  • Consistency: Data lakes typically store large volumes of data from a variety of sources. Without schema validation, there is a risk that data with inconsistent or invalid formats could be stored in the lake, which could lead to errors and inconsistencies when processing the data.
  • Quality: Schema validation helps to ensure that the data stored in the lake is of high quality. By enforcing data constraints and standards, schema validation can help to identify and flag data quality issues, such as missing or incorrect values, before they can cause problems downstream.
  • Efficiency: By enforcing a consistent schema across all data in the lake, schema validation can help to streamline data processing and analysis. This can help to reduce the time and effort required to clean, transform, and analyze the data, and can improve the overall efficiency of the data pipeline.
  • Compliance: Many organizations are subject to regulatory and compliance requirements that require them to maintain strict data standards and controls. Schema validation can help to ensure that the data stored in the lake meets these requirements, and can provide a clear audit trail of data lineage and quality.

Data lakes provide a lot of flexibility compared to a more rigid data model that is available in a data warehouse; there are typically a variety of data sources ingested into a data lake. Different data sources may have different schema definitions, and schema evolution may occur over time as new data is added. This can make it difficult to enforce a consistent schema across all data in the lake, and can lead to inconsistencies and errors when processing the data.

Moreover, due to the ever-growing complexity of data pipelines built on top of data lakes, there is no one consistent schema. Data pipelines may involve multiple steps and transformations, each of which may require different schema definitions. As data is processed and transformed, the schema may change, making it impossible to enforce schema validation across the entire pipeline.

Schema validation: Approach

Like source control systems, lakeFS allows you to configure actions that are triggered when predefined events occur. Those are the lakeFS hooks.

Hooks are custom scripts or programs that can be executed by the lakeFS platform in response to specific events or actions. These events can include actions such as committing changes, merging branches, creating new branches, creating or deleting tags etc.

For example, a post-create-branch hook, runs on the new branch after the branch was created. Or, a pre-merge hook runs on the source branch when the merge occurs, before the merge is finalized.

In this post, we will review step-by-step how to create a pre-merge hook that validates for parquet files that the schema isn’t different from the existing schema. 

Implementation of lakeFS for schema validation

Prerequisites

To follow this guide, you will need:

  1. A lakeFS server (you can either install one, spin one up for free on the cloud, or run one as part of the sample-repo)
  2. OPTIONAL: Use the sample-repo to spin up a notebook you can configure to run against the lakeFS Server. 

In this example, we will create a delta table in an ingest branch and merge it into production. Then, we will change the schema of the table and try to merge it again (simulating promoting data into production):

In this example, we will create a delta table in an ingest branch and merge it into production

Step 1: Configuration set up

First, we will set some global variables, and install packages that will be useful in this example, running in a Python notebook.  

Set up lakeFS’ credentials (if you don’t know your credentials, you can create a new key under the “administration” tab in lakeFS):

lakefsEndPoint = 'https://YOURLAKEFSCLOUD.lakefscloud.io' # e.g. 'https://LAKEFSAddress' or 'http://host.docker.internal:8000' (if lakeFS is running in a local Docker container)
lakefsAccessKey = '' # Your access key
lakefsSecretKey = '' # Your secret key

Next, we will set up some global variables with the names of the repository & Branches:

repo = "schema-validation-example-repo"
mainBranch = "main"
ingestionBranch = "ingestion_branch"

Each lakeFS repository must have a unique storage namespace, so we will set that as well:

storageNamespace = 's3://' # e.g. "s3://username-lakefs-cloud/"

If you have not done so before, you will need to configure your storage to work with lakeFS. In this example, I’m using AWS. However, you can use Azure, GCP or on-premise object stores such as MinIO


Note: If you spun up a lakeFS Cloud instance, and have not yet connected your storage, copy the storage namespace of the sample repository and append a string to it.
For example, if lakeFS Cloud created this sample-repo for you:

Sample repo created on lakeFS Cloud

You can configure: 

storageNamespace = 's3://lakefs-sample-us-east-1-production/AROA5OU4KHZHHFCX4PTOM:2ae87b7718e5bb16573c021e542dd0ec429b7ccc1a4f9d0e3f17d6ee99253655/my_random_string' 

We will use Python code in our notebook. Therefore, we will need to import the lakeFS python client packages:

import lakefs_client
from lakefs_client import models
from lakefs_client.client import LakeFSClient

import os
from pyspark.sql.types import ByteType, IntegerType, LongType, StringType, StructType, StructField

And configure our client:

%xmode Minimal
if not 'client' in locals():
    # lakeFS credentials and endpoint
    configuration = lakefs_client.Configuration()
    configuration.username = lakefsAccessKey
    configuration.password = lakefsSecretKey
    configuration.host = lakefsEndPoint

    client = LakeFSClient(configuration)
    print("Created lakeFS client.")

In this example, we will create delta tables. Add the relevant packages:

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:2.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" pyspark-shell'

lakeFS exposes an S3 Gateway that enables applications to communicate with lakeFS the same way they would with S3. (Alternatively, you can use the lakeFS Hadoop file system client).

To configure the gateway:

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", lakefsAccessKey)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", lakefsSecretKey)
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", lakefsEndPoint)
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")

We are now all set to start benefiting from lakeFS version control at scale in our notebook.

Step 2: Building the repository and hooks

Using the Python client, we will create our repository:

client.repositories.create_repository(
    repository_creation=models.RepositoryCreation(
        name=repo,
        storage_namespace=storageNamespace,
        default_branch=mainBranch))

In this example, we will use a pre-merge hook to validate the schema has not changed. Action files should be uploaded with the prefix _lakefs_actions/ to the lakeFS repository. A failure to parse an action file will result with a failing Run.

We will upload a hook configuration action file called pre-merge-schema-validation.yaml that has the following content:

#Parquet schema Validator
#Args:
# - locations (list of strings): locations to look for parquet files under
# - sample (boolean): whether reading one new/changed file per directory is enough, or go through all of them
#Example hook declaration: (_lakefs_actions/pre-merge-schema-validation.yaml):
name: pre merge checks on main branch
on:
  pre-merge:
    branches:
      - main
hooks:
  - id: check_schema_changes
    type: lua
    properties:
      script_path: scripts/parquet_schema_change.lua # location of this script in the repository
      args:
        sample: false
        locations:
          - tables/customers/

In our sample repo, this file (pre-merge-schema-validation.yaml) is located in the subdirectory LuaHooks. We need to upload the file to the lakeFS repository under _lakefs_actions:

hooks_config_yaml = "pre-merge-schema-validation.yaml"
hooks_prefix = "_lakefs_actions"
with open(f'./LuaHooks/{hooks_config_yaml}', 'rb') as f:
    client.objects.upload_object(repository=repo,
                                branch=mainBranch,
                                path=f'{hooks_prefix}/{hooks_config_yaml}',
                                content=f
                                )

We just configured an action script that will execute scripts/parquet_schema_change.lua before merging into main. 

Next, we will create the script itself (parquet_schema_change.lua) and upload that to the scripts directory. As you can see, in this example we are running hooks without relying on external components using an embedded Lua VM

In the sample-repo this file also sits in the LuaHooks subdirectory containing this code:

--[[
Parquet schema validator

Args:
- locations (list of strings): locations to look for parquet files under
- sample (boolean): whether reading one new/changed file per directory is enough, or go through all of them
]]


lakefs = require("lakefs")
strings = require("strings")
parquet = require("encoding/parquet")
regexp = require("regexp")
path = require("path")


visited_directories = {}

for _, location in ipairs(args.locations) do
    after = ""
    has_more = true
    need_more = true
    print("checking location: " .. location)
    while has_more do
        print("running diff, location = " .. location .. " after = " .. after)
        local code, resp = lakefs.diff_refs(action.repository_id, action.branch_id, action.source_ref, after, location)
        if code ~= 200 then
            error("could not diff: " .. resp.message)
        end

        for _, result in pairs(resp.results) do
            p = path.parse(result.path)
            print("checking: '" .. result.path .. "'")
            if not args.sample or (p.parent and not visited_directories[p.parent]) then
                if result.path_type == "object" and result.type ~= "removed" then
                    if strings.has_suffix(p.base_name, ".parquet") then
                        -- check it!
                        code, content = lakefs.get_object(action.repository_id, action.source_ref, result.path)
                        if code ~= 200 then
                            error("could not fetch data file: HTTP " .. tostring(code) .. "body:\n" .. content)
                        end
                        schema = parquet.get_schema(content)
                        for _, column in ipairs(schema) do
                            for _, pattern in ipairs(args.column_block_list) do
                                if regexp.match(pattern, column.name) then
                                    error("Column is not allowed: '" .. column.name .. "': type: " .. column.type .. " in path: " .. result.path)
                                end
                            end
                        end
                        print("\t all columns are valid")
                        visited_directories[p.parent] = true
                    end
                end
            else
                print("\t skipping path, directory already sampled")
            end
        end

        -- pagination
        has_more = resp.pagination.has_more
        after = resp.pagination.next_offset
    end
end

We will upload the file (this time parquet_schema_change.lua) from the LuaHooks directory to our lakeFS repository in the location the action configuration file is configured to execute it (i.e. under the scripts folder):

hooks_config_yaml = "pre-merge-schema-validation.yaml"
hooks_prefix = "_lakefs_actions"

with open(f'./LuaHooks/{hooks_config_yaml}', 'rb') as f:
    client.objects.upload_object(repository=repo,
                                branch=mainBranch,
                                path=f'{hooks_prefix}/{hooks_config_yaml}',
                                content=f
                                )

After uploading the action file, we need to commit the changes for them to take effect:

client.commits.commit(
    repository=repo,
    branch=mainBranch,
    commit_creation=models.CommitCreation(
        message='Added hook config file and schema validation scripts'))

If you switch to your lakeFS UI, you should see the following directory structure and files under the main directory:

Directory structure within the lakeFS UI
Pre-merge schema validation as it appears within the lakeFS UI
Schema validation scripts as it appears within the lakeFS UI

Step 3: Running the first ETL with the original schema 

Typically, with lakeFS, ingestion and transformation are done in a separate branch than the production branch. This approach helps achieve many capabilities with lakeFS, to name a few: isolation & separation, auditing, data quality validation, atomic data promotion, rollbacks and reproducibility.  

Therefore, we will create an ingestion branch:

client.branches.create_branch(
    repository=repo,
    branch_creation=models.BranchCreation(
        name=ingestionBranch, source=mainBranch))

Next, we will utilize a dataset Orion Star – Sports and outdoors RDBMS dataset from Kaggle. We will use the Customer.csv, which we will upload from data/samples/OrionStar/ in our sample repository

First, we will define the table schema:

customersSchema = StructType([
  StructField("User_ID", IntegerType(), False),
  StructField("Country", StringType(), False),
  StructField("Gender", StringType(), False),
  StructField("Personal_ID", IntegerType(), True),
  StructField("Customer_Name", StringType(), False),
  StructField("Customer_FirstName", StringType(), False),
  StructField("Customer_LastName", StringType(), False),
  StructField("Birth_Date", StringType(), False),
  StructField("Customer_Address", StringType(), False),
  StructField("Street_ID", LongType(), False),
  StructField("Street_Number", IntegerType(), False),
  StructField("Customer_Type_ID", IntegerType(), False)
])

Next, create a delta table from the csv file, and upload it to our repository:

customersTablePath = f"s3a://{repo}/{ingestionBranch}/tables/customers"
df = spark.read.csv('./data/samples/OrionStar/CUSTOMER.csv',header=True,schema=customersSchema)
df.write.format("delta").mode("overwrite").save(customersTablePath)

Commit the changes:

client.commits.commit(
    repository=repo,
    branch=ingestionBranch,
    commit_creation=models.CommitCreation(
        message='Added customers Delta table',
        metadata={'using': 'python_api'}))

And promote the data to production via a merge:

client.refs.merge_into_branch(
    repository=repo,
    source_ref=ingestionBranch,
    destination_branch=mainBranch)

By now, we have executed the following flow:

Schema validation flow that has been executed

Step 4: Change the schema and try to promote the table to production

For simplicity, we will change the name of one of the columns. Change Country to Country_name:

customersSchema = StructType([
  StructField("User_ID", IntegerType(), False),
  StructField("Country_Name", StringType(), False), # Column name changes from Country to Country_name
  StructField("Gender", StringType(), False),
  StructField("Personal_ID", IntegerType(), True),
  StructField("Customer_Name", StringType(), False),
  StructField("Customer_FirstName", StringType(), False),
  StructField("Customer_LastName", StringType(), False),
  StructField("Birth_Date", StringType(), False),
  StructField("Customer_Address", StringType(), False),
  StructField("Street_ID", LongType(), False),
  StructField("Street_Number", IntegerType(), False),
  StructField("Customer_Type_ID", IntegerType(), False)
])

Re-create the delta table in the ingest branch:

customersTablePath = f"s3a://{repo}/{ingestionBranch}/tables/customers"
df = spark.read.csv('./data/samples/OrionStar/CUSTOMER.csv',header=True,schema=customersSchema)
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(customersTablePath)
])

Commit the changes:

client.commits.commit(
    repository=repo,
    branch=ingestionBranch,
    commit_creation=models.CommitCreation(
        message='Added customers table with schema changes',
        metadata={'using': 'python_api'}))

And attempt to promote the data into production:

client.commits.commit(
    repository=repo,
    branch=ingestionBranch,
    commit_creation=models.CommitCreation(
        message='Added customer tables with schema changes!',
        metadata={'using': 'python_api'}))

This data will not be promoted into production:

Due to schema changes, Precondition Failed error. Pre-merge hook prevented the promotion,

From within the lakeFS UI, we can go to the repository, and click on the “Actions” tab. Click on the Run ID of the failed action, select the “pre merge checks on main branch,” expand check_schema_changes and see the error message as well. 

Summary

Schema validation on a data lake is vital but challenging due to the diverse and raw nature of the stored data. Data lakes house large volumes of data from various sources, making it crucial to validate schemas for consistency, integrity, and data quality. However, the difficulty lies in accommodating different data formats, including structured, semi-structured, and unstructured data. Managing schema evolution, data transformations, and compatibility checks across these formats require robust techniques and tools. Additionally, the decentralized nature of data lakes, where multiple users and systems can modify data, further complicates schema validation. Effective schema validation is essential for data governance, integration, and accurate analytics.

In this guide, we demonstrated how to create a pre-merge hook in lakeFS that validates the schema files before merging them into the production branch. 

We defined a hook configuration file and a Lua script for schema validation. We then performed an ETL process by creating an ingestion branch, defining the table schema, creating a table, and then atomically promoted the data to the production branch through a merge.

Finally, we attempted to change the schema of the table and promoted it to production again. However, the pre-merge hook prevented the promotion due to schema changes, resulting in a Precondition Failed error.

This approach of using pre-merge hooks for schema validation can be valuable for ensuring data integrity and preventing incompatible schema changes from being merged into the main branch. It provides an additional layer of quality control and helps maintain the consistency of data.

Take lakeFS for a spin and try it out yourself. 

Git for Data – lakeFS

  • Get Started
    Get Started
  • Who’s coming to Data+AI Summit? Meet the lakeFS team at Booth #69! Learn more about -

    lakeFS for Databricks
    +