Ready to dive into the lake?
lakeFS is currently only
available on desktop.

For an optimal experience, provide your email below and one of our lifeguards will send you a link to start swimming in the lake!

lakeFS Community
Robin Moffatt
Robin Moffatt Author

Robin is a Principal DevEx Engineer at lakeFS. He has...

June 26, 2023

lakeFS now directly supports Apache Iceberg tables. Using straightforward table identifiers you can switch between branches when reading and writing data:

SELECT * FROM catalog.ref.db.table;

lakeFS itself remains format agnostic, happily providing version control for data whether binary, tabular, or just a lump of text. The benefit of this new functionality is that the same table that you create and populate in the main branch…

CREATE TABLE lakefs.main.rmoff.my_table (id int, data string);

INSERT INTO lakefs.main.rmoff.my_table VALUES(1,"foo");

…is then available to continue working with if you branch main to a new branch such as dev:

SELECT * FROM lakefs.dev.rmoff.my_table;
+---+----+
| id|data|
+---+----+
|  1| foo|
+---+----+

Working with Apache Iceberg tables in lakeFS

To illustrate the benefits of using lakeFS and Apache Iceberg together, I’m going to show you an example based on a subset of the NYC Filming Permits dataset. I’m using PySpark in a notebook that you can run for yourself as part of the lakeFS Samples repository.

As you follow this example, you’ll hopefully notice how closely the workflow and statements align to how you would work with Iceberg anyway.

Configuration

There are three parts to configuring Iceberg and lakeFS to work together.

The first is to add the dependency io.lakefs:lakefs-iceberg:v0.1.2:

.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,io.lakefs:lakefs-iceberg:v0.1.2") \

You then need to set up the Spark SQL catalog. lakeFS has its own implementation of SparkCatalog, which handles the writing of the Iceberg data to lakeFS as well as reading from it and branching:

.config("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.LakeFSCatalog") \
.config("spark.sql.catalog.lakefs.warehouse", f"lakefs://{repo_name}") \
.config("spark.sql.catalog.lakefs.uri", lakefsEndPoint) \

You can also set the lakeFS catalog to be the default one, which means that you don’t need to include the prefix when referencing tables. This is what I’ve done for the examples below.

.config("spark.sql.defaultCatalog", "lakefs") \

Finally, you need to also configure the S3A Hadoop FileSystem for lakeFS.

.config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.endpoint", lakefsEndPoint) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.access.key", lakefsAccessKey) \
.config("spark.hadoop.fs.s3a.secret.key", lakefsSecretKey) \

Creating and Loading an Iceberg Table on a lakeFS branch

First, we load a dataframe with the dataset:

df = spark.read.option("inferSchema","true").option("multiline","true").json("/data/nyc_film_permits.json")

and then we write it as an Iceberg table under lakeFS. The only difference here from what we’d otherwise do is that we’re prefixing the table and database with the catalog (lakeFS) and branch (main):

df.write.saveAsTable("main.nyc.permits")

Over in the lakeFS UI, we can see that the Iceberg table’s data has been written successfully along with its metadata under the main branch of our repository (lakefs-iceberg-nyc):

This is just a regular Iceberg table that we can query:

SELECT borough, count(*) AS permit_cnt
FROM main.nyc.permits
GROUP BY borough;
+-------------+----------+
|      borough|permit_cnt|
+-------------+----------+
|       Queens|       168|
|        Bronx|        28|
|    Manhattan|       463|
|     Brooklyn|       334|
|Staten Island|         7|
+-------------+----------+

Now we can go ahead and commit this data, giving us a point to which we can roll back if we want to, as well as create a branch for development work.

lakefs.commits.commit(repo.id, "main", CommitCreation(
    message="Initial data load",
    metadata={'author': 'rmoff',
              'data source': 'https://data.cityofnewyork.us/City-Government/Film-Permits/tg4x-b46p'}
) )

The metadata is optional, but rather useful since we can refer back to a commit and find out information about it. Since it’s a freeform field, the commit author can include as much data as will be useful. This could include source identifiers, code commit IDs, program signatures, and so on.

Here’s the Really Neat bit—branching Iceberg tables natively within lakeFS

With a table created and committed to the main branch, let’s go ahead and create a dev branch on which we can experiment with some data transformations.

lakefs.branches.create_branch(repo.id, 
                              BranchCreation(name="dev",
                                             source="main"))

Now we can query the data using our dev branch reference:

SELECT count(*)
FROM dev.nyc.permits;
+-----+
|count|
+-----+
| 1000|
+-----+

Making changes on the branch

The joy of data version control is that we can make changes on branches that are isolated.

Perhaps it’s a regular data load that we want to quality-check before making it available for use downstream (the Write-Audit-Publish pattern). Or maybe it’s some noodling around with data manipulation or one-off processing that needs to be done against live data but we want to check first before putting hand to keyboard in anger.

Whatever it is we’re doing, the dev branch we just created is our own little domain of safety.

Let’s go big! Let’s see what happens when we delete the contents of the table with a careless DELETE, omitting an all-important predicate

DELETE FROM dev.nyc.permits;

How’s that data looking now?

SELECT count(*)
FROM dev.nyc.permits;
+-----+
|count|
+-----+
|    0|
+-----+

But over on the main branch the data is safe and unsullied 😌

SELECT count(*)
FROM main.nyc.permits;
+-----+
|count|
+-----+
| 1000|
+-----+

So, we carelessly deleted all our data from the dev version of the table. Let’s revel in the pleasure of the data version control that lakeFS provides and reverse that deletion:

lakefs.branches.reset_branch(repo.id, 
                             "dev",
                             ResetCreation(type="common_prefix", 
                                           path="nyc/permits/"))

As if by magic 🪄 our data’s back. 🎉

SELECT count(*)
FROM dev.nyc.permits;
+-----+
|count|
+-----+
| 1000|
+-----+

🦸🏻 lakeFS’ Super Power (one of them): Multi-Collection Consistency 📚

Let’s say that we’ve learnt from our ham-fisted DELETE efforts and are now ready to develop in earnest. We’re going to do two things:

  • Delete all rows for permits in Manhattan from the table
  • Build an aggregate of the data to show how many permits we issued by category

If everything works, we’ll promote the changes to main. If it doesn’t, we’ll promote none of it.

Keeping the base table and derived aggregate in line with each other is important. It would be really confusing, for example, if we promoted the aggregate table that we’re about to build to main without the changed underlying nyc.permits table. Users expect to be able to drill from an aggregate into the base, and to roll up the base to see the same numbers as in the aggregate.

First, we make the changes:


DELETE FROM dev.nyc.permits WHERE borough='Manhattan'

CREATE OR REPLACE TABLE dev.nyc.agg_permit_category AS
SELECT category, count(*) permit_cnt
FROM dev.nyc.permits
GROUP BY category;

Then we validate them within the branch:

  • No Manhattan rows in the base data:
SELECT borough, count(*) permit_cnt
FROM dev.nyc.permits
GROUP BY borough;
+-------------+----------+
|      borough|permit_cnt|
+-------------+----------+
|       Queens|       168|
|        Bronx|        28|
|     Brooklyn|       334|
|Staten Island|         7|
+-------------+----------+
  • Aggregate has been successfully built:
SELECT * FROM dev.nyc.agg_permit_category LIMIT 5;
+----------+----------+
|  category|permit_cnt|
+----------+----------+
|Television|       429|
|       WEB|        17|
|Commercial|        26|
|      Film|        24|
|   Theater|        27|
+----------+----------+

Finally, having made the changes and confirmed that they worked, we commit both pieces of work:

lakefs.commits.commit(repo.id, "dev", 
                      CommitCreation(
                          message="Remove data for Manhattan from permits dataset, build category aggregate",
                          metadata={"etl job name": "etl_job_42",
                                    "author": "rmoff"}
                      ))

and then merge this into main:

lakefs.refs.merge_into_branch(repository=repo.id, 
                              source_ref="dev", 
                              destination_branch="main")

It’s worth noting that we could have committed the two changes to dev independently if we wanted to rollback one within the dev branch. The multi-collection consistency comes in the all-or-nothing merge from dev into main.

🎁 Bonus Feature: Reference Expressions are Supported!

So far, I’ve shown you examples that have used a branch name as the reference to the data in lakeFS. You can also use tags and reference expressions. The latter in particular is extremely powerful as it enables you to reference previous commits, parents, and committed state directly.

Here’s an example. First off, we’ve got a new table with three rows in. It’s on a branch called dev:

SELECT * FROM dev.rmoff.my_table;
+---+----+
| id|data|
+---+----+
|  0|test|
|  1| foo|
|  2| bar|
+---+----+

We’ve committed this data in lakeFS, and now we add a new row to it:

INSERT INTO dev.rmoff.my_table VALUES(3,"wibble");

This new row shows up just as we would expect:

SELECT * FROM dev.rmoff.my_table;
+---+------+
| id|  data|
+---+------+
|  0|  test|
|  1|   foo|
|  2|   bar|
|  3|wibble|
+---+------+

So now we have the first three rows of data (id 0 – id 2) which are already committed, and then the new one we added (id 3). We know that, because we just ran it—but what about a more complex example where we have real tables with many rows and columns? How do we know what’s committed and what’s not?

lakeFS supports reference expressions and we can use one of them here—the @, which gives us only the committed changes for a given reference (in this case, a branch name):

SELECT * FROM `dev@`.rmoff.my_table;

(Note the use of backticks to quote the reference expression)

+---+----+
| id|data|
+---+----+
|  1| foo|
|  2| bar|
|  0|test|
+---+----+

If we know the current state of the table (committed + uncommitted), and we know what the committed state is, then we can derive the uncommitted state. That’s pretty useful! We’ll use the SQL EXCEPT operator to do it:

SELECT * FROM `dev`.rmoff.my_table 
EXCEPT 
SELECT * FROM `dev@`.rmoff.my_table;
+---+------+
| id|  data|
+---+------+
|  3|wibble|
+---+------+

Other examples include:

  • dev~17 is the 17 commits before dev
  • dev^2 is the _second parent of dev
  • dev^ is the same as dev^1 so is the the parent of dev

A Peek Under the Covers

What’s so neat about what’s happening here? Well, the default implementation of Iceberg is to write absolute paths in its metadata—and if you step outside of a version-control tool’s context, why wouldn’t you; it makes complete sense!

  "location" : "s3a://iceberg-01/nyc/permits",

But if you want to take advantage of the goodness that lakeFS provides—almost transparently through its S3 gateway—things aren’t as straightforward.

lakeFS uses the repository and branch references in the path as part of its version control model, so an Iceberg table might end up written to something like

  "location" : "s3a://iceberg-01/main/nyc/permits",

And this is where using absolute paths causes problems. When lakeFS creates a branch, it creates a copy-on-write clone of the existing files.

Let’s take the above example, in which we’ve written a table to the main branch. Its metadata will look something like this:

But when we create a new dev branch from this, look what happens:

Since the metadata references a path in main, when we write to dev we’ll actually end up writing to the absolute path in main 😱

lakeFS now supports Iceberg directly, and it does this by implementing a SparkCatalog which in turn handles branch references and stores paths in the metadata on disk as relative paths:

At the moment, this is implemented in the lakeFS Catalog, with plans for other catalogs on the roadmap. If you’re interested in more details check out the design document.

Go on…Try It Out Today

Apache Iceberg is one of the most popular open table formats, and lakeFS is the best (in our opinion!) data version control tool—and now you can now combine the two together seamlessly.

The integration is available on Maven already, with documentation available on how to use it.

The lakeFS Samples repository includes a full stack of the components you need to try it out—Jupyter, PySpark, and lakeFS—with a bunch of example notebooks including one for Iceberg and lakeFS.

Need help getting started?

Git for Data – lakeFS

  • Get Started
    Get Started