Introduction
Schema validation ensures that the data stored in the lake conforms to a predefined schema, which specifies the structure, format, and constraints of the data.
It’s important for:
- Consistency: Data lakes typically store large volumes of data from a variety of sources. Without schema validation, there is a risk that data with inconsistent or invalid formats could be stored in the lake, which could lead to errors and inconsistencies when processing the data.
- Quality: Schema validation helps to ensure that the data stored in the lake is of high quality. By enforcing data constraints and standards, schema validation can help to identify and flag data quality issues, such as missing or incorrect values, before they can cause problems downstream.
- Efficiency: By enforcing a consistent schema across all data in the lake, schema validation can help to streamline data processing and analysis. This can help to reduce the time and effort required to clean, transform, and analyze the data, and can improve the overall efficiency of the data pipeline.
- Compliance: Many organizations are subject to regulatory and compliance requirements that require them to maintain strict data standards and controls. Schema validation can help to ensure that the data stored in the lake meets these requirements, and can provide a clear audit trail of data lineage and quality.
Data lakes provide a lot of flexibility compared to a more rigid data model that is available in a data warehouse; there are typically a variety of data sources ingested into a data lake. Different data sources may have different schema definitions, and schema evolution may occur over time as new data is added. This can make it difficult to enforce a consistent schema across all data in the lake, and can lead to inconsistencies and errors when processing the data.
Moreover, due to the ever-growing complexity of data pipelines built on top of data lakes, there is no one consistent schema. Data pipelines may involve multiple steps and transformations, each of which may require different schema definitions. As data is processed and transformed, the schema may change, making it impossible to enforce schema validation across the entire pipeline.
Schema validation: Approach
Like source control systems, lakeFS allows you to configure actions that are triggered when predefined events occur. Those are the lakeFS hooks.
Hooks are custom scripts or programs that can be executed by the lakeFS platform in response to specific events or actions. These events can include actions such as committing changes, merging branches, creating new branches, creating or deleting tags etc.
For example, a post-create-branch
hook, runs on the new branch after the branch was created. Or, a pre-merge hook
runs on the source branch when the merge occurs, before the merge is finalized.
In this post, we will review step-by-step how to create a pre-merge
hook that validates for parquet files that the schema isn’t different from the existing schema.
Implementation of lakeFS for schema validation
Prerequisites
To follow this guide, you will need:
- A lakeFS server (you can either install one, spin one up for free on the cloud, or run one as part of the sample-repo)
- OPTIONAL: Use the sample-repo to spin up a notebook you can configure to run against the lakeFS Server.
In this example, we will create a delta table in an ingest branch and merge it into production. Then, we will change the schema of the table and try to merge it again (simulating promoting data into production):


Step 1: Configuration set up
First, we will set some global variables, and install packages that will be useful in this example, running in a Python notebook.
Set up lakeFS’ credentials (if you don’t know your credentials, you can create a new key under the “administration” tab in lakeFS):
lakefsEndPoint = 'https://YOURLAKEFSCLOUD.lakefscloud.io' # e.g. 'https://LAKEFSAddress' or 'http://host.docker.internal:8000' (if lakeFS is running in a local Docker container)
lakefsAccessKey = '' # Your access key
lakefsSecretKey = '' # Your secret key
Next, we will set up some global variables with the names of the repository & Branches:
repo = "schema-validation-example-repo"
mainBranch = "main"
ingestionBranch = "ingestion_branch"
Each lakeFS repository must have a unique storage namespace, so we will set that as well:
storageNamespace = 's3://' # e.g. "s3://username-lakefs-cloud/"
If you have not done so before, you will need to configure your storage to work with lakeFS. In this example, I’m using AWS. However, you can use Azure, GCP or on-premise object stores such as MinIO.
Note: If you spun up a lakeFS Cloud instance, and have not yet connected your storage, copy the storage namespace of the sample repository and append a string to it.
For example, if lakeFS Cloud created this sample-repo for you:


You can configure:
storageNamespace = 's3://lakefs-sample-us-east-1-production/AROA5OU4KHZHHFCX4PTOM:2ae87b7718e5bb16573c021e542dd0ec429b7ccc1a4f9d0e3f17d6ee99253655/my_random_string'
We will use Python code in our notebook. Therefore, we will need to import the lakeFS python client packages:
import lakefs_client
from lakefs_client import models
from lakefs_client.client import LakeFSClient
import os
from pyspark.sql.types import ByteType, IntegerType, LongType, StringType, StructType, StructField
And configure our client:
%xmode Minimal
if not 'client' in locals():
# lakeFS credentials and endpoint
configuration = lakefs_client.Configuration()
configuration.username = lakefsAccessKey
configuration.password = lakefsSecretKey
configuration.host = lakefsEndPoint
client = LakeFSClient(configuration)
print("Created lakeFS client.")
In this example, we will create delta tables. Add the relevant packages:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:2.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" pyspark-shell'
lakeFS exposes an S3 Gateway that enables applications to communicate with lakeFS the same way they would with S3. (Alternatively, you can use the lakeFS Hadoop file system client).
To configure the gateway:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", lakefsAccessKey)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", lakefsSecretKey)
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", lakefsEndPoint)
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
We are now all set to start benefiting from lakeFS version control at scale in our notebook.
Step 2: Building the repository and hooks
Using the Python client, we will create our repository:
client.repositories.create_repository(
repository_creation=models.RepositoryCreation(
name=repo,
storage_namespace=storageNamespace,
default_branch=mainBranch))
In this example, we will use a pre-merge hook to validate the schema has not changed. Action files should be uploaded with the prefix _lakefs_actions/
to the lakeFS repository. A failure to parse an action file will result with a failing Run.
We will upload a hook configuration action file called pre-merge-schema-validation.yaml
that has the following content:
#Parquet schema Validator
#Args:
# - locations (list of strings): locations to look for parquet files under
# - sample (boolean): whether reading one new/changed file per directory is enough, or go through all of them
#Example hook declaration: (_lakefs_actions/pre-merge-schema-validation.yaml):
name: pre merge checks on main branch
on:
pre-merge:
branches:
- main
hooks:
- id: check_schema_changes
type: lua
properties:
script_path: scripts/parquet_schema_change.lua # location of this script in the repository
args:
sample: false
locations:
- tables/customers/
In our sample repo, this file (pre-merge-schema-validation.yaml
) is located in the subdirectory LuaHooks
. We need to upload the file to the lakeFS repository under _lakefs_actions
:
hooks_config_yaml = "pre-merge-schema-validation.yaml"
hooks_prefix = "_lakefs_actions"
with open(f'./LuaHooks/{hooks_config_yaml}', 'rb') as f:
client.objects.upload_object(repository=repo,
branch=mainBranch,
path=f'{hooks_prefix}/{hooks_config_yaml}',
content=f
)
We just configured an action script that will execute scripts/parquet_schema_change.lua
before merging into main.
Next, we will create the script itself (parquet_schema_change.lua
) and upload that to the scripts
directory. As you can see, in this example we are running hooks without relying on external components using an embedded Lua VM.
In the sample-repo this file also sits in the LuaHooks
subdirectory containing this code:
--[[
Parquet schema validator
Args:
- locations (list of strings): locations to look for parquet files under
- sample (boolean): whether reading one new/changed file per directory is enough, or go through all of them
]]
lakefs = require("lakefs")
strings = require("strings")
parquet = require("encoding/parquet")
regexp = require("regexp")
path = require("path")
visited_directories = {}
for _, location in ipairs(args.locations) do
after = ""
has_more = true
need_more = true
print("checking location: " .. location)
while has_more do
print("running diff, location = " .. location .. " after = " .. after)
local code, resp = lakefs.diff_refs(action.repository_id, action.branch_id, action.source_ref, after, location)
if code ~= 200 then
error("could not diff: " .. resp.message)
end
for _, result in pairs(resp.results) do
p = path.parse(result.path)
print("checking: '" .. result.path .. "'")
if not args.sample or (p.parent and not visited_directories[p.parent]) then
if result.path_type == "object" and result.type ~= "removed" then
if strings.has_suffix(p.base_name, ".parquet") then
-- check it!
code, content = lakefs.get_object(action.repository_id, action.source_ref, result.path)
if code ~= 200 then
error("could not fetch data file: HTTP " .. tostring(code) .. "body:\n" .. content)
end
schema = parquet.get_schema(content)
for _, column in ipairs(schema) do
for _, pattern in ipairs(args.column_block_list) do
if regexp.match(pattern, column.name) then
error("Column is not allowed: '" .. column.name .. "': type: " .. column.type .. " in path: " .. result.path)
end
end
end
print("\t all columns are valid")
visited_directories[p.parent] = true
end
end
else
print("\t skipping path, directory already sampled")
end
end
-- pagination
has_more = resp.pagination.has_more
after = resp.pagination.next_offset
end
end
We will upload the file (this time parquet_schema_change.lua
) from the LuaHooks
directory to our lakeFS repository in the location the action configuration file is configured to execute it (i.e. under the scripts
folder):
hooks_config_yaml = "pre-merge-schema-validation.yaml"
hooks_prefix = "_lakefs_actions"
with open(f'./LuaHooks/{hooks_config_yaml}', 'rb') as f:
client.objects.upload_object(repository=repo,
branch=mainBranch,
path=f'{hooks_prefix}/{hooks_config_yaml}',
content=f
)
After uploading the action file, we need to commit the changes for them to take effect:
client.commits.commit(
repository=repo,
branch=mainBranch,
commit_creation=models.CommitCreation(
message='Added hook config file and schema validation scripts'))
If you switch to your lakeFS UI, you should see the following directory structure and files under the main directory:






Step 3: Running the first ETL with the original schema
Typically, with lakeFS, ingestion and transformation are done in a separate branch than the production branch. This approach helps achieve many capabilities with lakeFS, to name a few: isolation & separation, auditing, data quality validation, atomic data promotion, rollbacks and reproducibility.
Therefore, we will create an ingestion branch:
client.branches.create_branch(
repository=repo,
branch_creation=models.BranchCreation(
name=ingestionBranch, source=mainBranch))
Next, we will utilize a dataset Orion Star – Sports and outdoors RDBMS dataset from Kaggle. We will use the Customer.csv, which we will upload from data/samples/OrionStar/ in our sample repository.
First, we will define the table schema:
customersSchema = StructType([
StructField("User_ID", IntegerType(), False),
StructField("Country", StringType(), False),
StructField("Gender", StringType(), False),
StructField("Personal_ID", IntegerType(), True),
StructField("Customer_Name", StringType(), False),
StructField("Customer_FirstName", StringType(), False),
StructField("Customer_LastName", StringType(), False),
StructField("Birth_Date", StringType(), False),
StructField("Customer_Address", StringType(), False),
StructField("Street_ID", LongType(), False),
StructField("Street_Number", IntegerType(), False),
StructField("Customer_Type_ID", IntegerType(), False)
])
Next, create a delta table from the csv file, and upload it to our repository:
customersTablePath = f"s3a://{repo}/{ingestionBranch}/tables/customers"
df = spark.read.csv('./data/samples/OrionStar/CUSTOMER.csv',header=True,schema=customersSchema)
df.write.format("delta").mode("overwrite").save(customersTablePath)
Commit the changes:
client.commits.commit(
repository=repo,
branch=ingestionBranch,
commit_creation=models.CommitCreation(
message='Added customers Delta table',
metadata={'using': 'python_api'}))
And promote the data to production via a merge:
client.refs.merge_into_branch(
repository=repo,
source_ref=ingestionBranch,
destination_branch=mainBranch)
By now, we have executed the following flow:


Step 4: Change the schema and try to promote the table to production
For simplicity, we will change the name of one of the columns. Change Country
to Country_name
:
customersSchema = StructType([
StructField("User_ID", IntegerType(), False),
StructField("Country_Name", StringType(), False), # Column name changes from Country to Country_name
StructField("Gender", StringType(), False),
StructField("Personal_ID", IntegerType(), True),
StructField("Customer_Name", StringType(), False),
StructField("Customer_FirstName", StringType(), False),
StructField("Customer_LastName", StringType(), False),
StructField("Birth_Date", StringType(), False),
StructField("Customer_Address", StringType(), False),
StructField("Street_ID", LongType(), False),
StructField("Street_Number", IntegerType(), False),
StructField("Customer_Type_ID", IntegerType(), False)
])
Re-create the delta table in the ingest branch:
customersTablePath = f"s3a://{repo}/{ingestionBranch}/tables/customers"
df = spark.read.csv('./data/samples/OrionStar/CUSTOMER.csv',header=True,schema=customersSchema)
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(customersTablePath)
])
Commit the changes:
client.commits.commit(
repository=repo,
branch=ingestionBranch,
commit_creation=models.CommitCreation(
message='Added customers table with schema changes',
metadata={'using': 'python_api'}))
And attempt to promote the data into production:
client.commits.commit(
repository=repo,
branch=ingestionBranch,
commit_creation=models.CommitCreation(
message='Added customer tables with schema changes!',
metadata={'using': 'python_api'}))
This data will not be promoted into production:


From within the lakeFS UI, we can go to the repository, and click on the “Actions” tab. Click on the Run ID
of the failed action, select the “pre merge checks on main branch,” expand check_schema_changes
and see the error message as well.


Summary
Schema validation on a data lake is vital but challenging due to the diverse and raw nature of the stored data. Data lakes house large volumes of data from various sources, making it crucial to validate schemas for consistency, integrity, and data quality. However, the difficulty lies in accommodating different data formats, including structured, semi-structured, and unstructured data. Managing schema evolution, data transformations, and compatibility checks across these formats require robust techniques and tools. Additionally, the decentralized nature of data lakes, where multiple users and systems can modify data, further complicates schema validation. Effective schema validation is essential for data governance, integration, and accurate analytics.
In this guide, we demonstrated how to create a pre-merge hook in lakeFS that validates the schema files before merging them into the production branch.
We defined a hook configuration file and a Lua script for schema validation. We then performed an ETL process by creating an ingestion branch, defining the table schema, creating a table, and then atomically promoted the data to the production branch through a merge.
Finally, we attempted to change the schema of the table and promoted it to production again. However, the pre-merge hook prevented the promotion due to schema changes, resulting in a Precondition Failed error.
This approach of using pre-merge hooks for schema validation can be valuable for ensuring data integrity and preventing incompatible schema changes from being merged into the main branch. It provides an additional layer of quality control and helps maintain the consistency of data.
Take lakeFS for a spin and try it out yourself.
Table of Contents