Back in June we happily announced that lakeFS ♥️ Iceberg. Since then our Iceberg support has been growing. A new experimental feature now allows you to use a Spark SQL engine in order to compute “data diffs” between versions of an Iceberg table that are stored in two different lakeFS versions.
Reminder: lakeFS versions are Iceberg schemas
The lakeFS Iceberg catalog maps lakeFS branches, tags, and version refs to schemas in Iceberg. The repository in our blog used the prefix
nyc/permits. The Iceberg catalog schema lets us access it at any commit:
lakefs.main.nyc.permitsis the table version currently visible on the main branch.
lakefs.dev.nyc.permitsis the table version currently visible on the dev branch.
lakefs.`main@`.nyc.permitsis the committed table version on the main branch.
lakefs.`dev~2`.nyc.permitsis the table version at 2 commits before the head of the dev branch.
lakefs.release_tag.nyc.permitsis the table version at tag release_tag.
lakefs.`0afbcc8`.nyc.permitsis the table version on the commit whose digest starts “0afbcc8…”.
Data diffs between table versions
The lakeFS Spark Extensions JAR adds syntax to Spark SQL to make it easy to compare versions of the same table.
Compare data between refs
To compare branches main and dev, just
SELECT * FROM refs_data_diff('lakefs', 'main', 'dev', 'nyc.permits');
This works for any ref, of course. To compare 2 other versions, you could
SELECT * FROM refs_data_diff('lakefs', 'main@', 'dev~2', 'nyc.permits');
refs_data_diff is an SQL table-valued function (TVF). Its output is the difference: a relation (like a view) that adds a single column lakefs_change to the table schema.
- Rows that appear only in the first version of the table (in the example, on branch main) appear in the difference with
- Rows that appear only in the second version of the table (in the example, on branch dev) appear in the difference with
- Rows that appear in both versions of the table do not appear in the difference.
Spark does not materialize the table.
Business logic in your diffs
A plain list of all the diffs is quite limited. refs_data_diff is part of Spark SQL, so it supports using SQL to focus, aggregate, or process your diffs.
For instance, how many permits changed in each borough between two versions of the table?
SELECT lakefs_change, borough, count(*) AS permit_diffs_cnt FROM refs_data_diff('lakefs', 'main~1', 'main', 'nyc.permits') GROUP BY lakefs_change, borough;
When the HEAD version of main holds data as of 2023-10-18 and the previous version main~1 2023-10-17, I get this output:
What changed in Queens?
SELECT lakefs_change, EventId, ParkingHeld FROM refs_data_diff('lakefs', 'main~1', 'main', 'data.nyc_permits') WHERE Borough='Queens';
|+||730689 BEACH 56TH PLACE between ROCKAWAY BEACH BLVD and DEAD END, ROCKAWAY BEACH BLVD between BEACH 56TH PLACE and BEACH 59TH ST|
|+||730001 80 STREET between JUNIPER BLVD NORTH and JUNIPER BLVD SOUTH, 81 STREET between JUNIPER BLVD SOUTH and PENELOPE AVENUE|
|+||730663 BEACH 108 STREET between ROCKAWAY BEACH BOULEVARD and ROCKAWAY BEACH DRIVE, BEACH 108 STREET between ROCKAWAY BEACH DRIVE and SHORE FRONT PARKWAY, SHORE FRONT PARKWAY between Beach 101st St and Beach 102nd St, SHORE FRONT PARKWAY between BEACH 102 STREET and BEACH 105 STREET|
If you already have a lakeFS setup with a Spark cluster, you’re all set! Otherwise you will need to:
- Create a lakeFS installation. You can follow our Quickstart; the fastest way is to get a lakeFS Cloud playground from the link at the bottom.
- Connect your Spark to lakefs. See our Apache Spark integration guide on how to connect Spark to lakeFS using S3A. If you do not have a Spark cluster, running spark-sql as below will by default run in local mode and allow you to succeed.
Under the hood
Adding data diff to lakeFS Iceberg
To use lakeFS with Iceberg you need to install and configure
- lakeFS support
- lakeFS Iceberg support
To gain access to the lakeFS SQL extension that provides refs_data_diff you additionally need to install and configure:
- lakeFS SQL extensions JAR
Here’s how to do it from the command line:
% spark-sql --packages io.lakefs:lakefs-spark-extensions_2.12:0.0.3,io.lakefs:lakefs-iceberg:0.1.3,org.apache.hadoop:hadoop-aws:3.3.1,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1 \ -c spark.hadoop.fs.s3a.access.key=AKIA...\ -c spark.hadoop.fs.s3a.secret.key=SHH... \ -c spark.sql.catalog.lakefs.warehouse=lakefs://REPO \ -c spark.hadoop.fs.s3a.endpoint=https://ORG.us-east-1.lakefscloud.io \ -c spark.sql.catalog.lakefs.uri=https://ORG.us-east-1.lakefscloud.io \ -c spark.sql.catalog.lakefs=org.apache.iceberg.spark.SparkCatalog \ -c spark.sql.catalog.lakefs.catalog-impl=io.lakefs.iceberg.LakeFSCatalog \ -c spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.lakefs.iceberg.extension.LakeFSSparkSessionExtensions \ -c spark.hadoop.fs.s3a.path.style.access=true \ -c spark.sql.catalog.lakefs.cache-enabled=false ...
The highlighted portions add and configure the lakeFS Spark Extensions.
How the data diff extension works
This Spark extension defines a new SQL table-valued function. The output of the function behaves exactly like a view. It translates an expression such as
refs_data_diff('lakefs', 'dev', 'main', 'table')
(SELECT '+' lakefs_change, * FROM ( SELECT * FROM lakefs.main.table EXCEPT SELECT * FROM lakefs.dev.table)) UNION ALL (SELECT '-' lakefs_change, * FROM ( SELECT * FROM lakefs.dev.table EXCEPT SELECT * FROM lakefs.main.table))
Of course the extension also handles correctly quoting table names and identifiers in SQL.
- Grab a lakeFS environment on the lakeFS playground if you don’t already have one.
- Join us on the lakeFS
- Read all about lakeFS Iceberg integration on our docs, or read about the lakeFS Spark extensions.
Table of Contents