I previously looked at what the Write-Audit-Publish (WAP) pattern is and how it can be a really useful tool for data engineers for checking and enforcing data quality. In this article I’m now going to see how to implement WAP in several different technologies.
The implementation of the Write-Audit-Publish (WAP) pattern is done at the storage layer since it’s here that data is written and published. Within the context of this article, I’m only considering data lake/house storage. I’ll also touch on similar techniques that might be worth investigating on RDBMS such as Oracle.
Just like any other pattern, beware of vendors selling you their tool as the tool for that pattern. Any user of Git will know that you can’t just “use Git” but you have to understand it first, and the same goes for WAP.
That said, some tools can be used to implement WAP more effectively than others. If you’ve already adopted a particular tool, this guide aims to help you understand your options for implementing WAP using it. If you have yet to choose a tool but want to have the option of using WAP in the future, then the details below should help you choose well 🙂
This is a lakeFS blog so, of course, I’m going to evaluate support in such a way as to cast lakeFS in a favourable light – but this is actually an area where lakeFS excels since it was built for it 😉
Table of contents
Considerations for WAP tool choice
These are the broad areas over which I’d consider the suitability of a technology for use with WAP:
- Granularity – At what level can you write and publish (or rollback) data? Is it per file, table, catalog, or entire lake?
- Data Format – Can you decouple your choice of technology from the format of data, or are they chained together, hand-in-hand? Do you need to store any other data formats within the same logical group as that being published?
- Isolation – Can you do a “dirty read” of the data from another client once written and not published? This is important if you want to do an asynchronous audit of the data from a different process or tool (such as an external data quality tool).
- Ease of Use – Do you have to contort yourself in knots to fit the tool to the WAP pattern, or does it flow naturally?
- Cross-language support – given that not everyone lives and breathes the JVM, how accessible is the WAP functionality to a lowly SQL-hack fudging their way through this brave new world of data engineering?
Apache Iceberg
Of the three open table formats, Iceberg has made the biggest strides forward in this area, with active discussion and development for WAP.
Initially, support was limited to a somewhat-fiddly requirement to find the ID of the writer to a table before being able to query it for audit, but with the addition of branches and tags in the 1.2.0 release (video) it’s now really intuitive to use 🤩
WAP in Apache Iceberg
You can see this in action and try it out for yourself in this notebook.
Write
The process is described well in the documentation. The only bit that’s not entirely obvious—and is perhaps a hangover from when branches weren’t supported but “integrated audit” was—is how you address a branch and what session parameters you need to set.
However you do it, you need to create a branch before you write your changes to the table:
ALTER TABLE db.table
CREATE BRANCH etl_job_42
From here you can take one of two approaches in the write:
- Write directly to the branch using the
branch_<branch-name>
suffix to the table name:DELETE FROM db.table.branch_etl_job_42 WHERE borough='Manhattan'
OR
- Configure the table for WAP, set
spark.wap.branch
session parameter to the branch, and then perform the write:-- Enable WAP for the table ALTER TABLE db.table SET TBLPROPERTIES ( 'write.wap.enabled'='true' ) -- Set the branch used for this session for reads and writes spark.conf.set('spark.wap.branch', 'etl_job_42') -- Write. DELETE FROM db.table WHERE borough='Manhattan'
Audit
Once you’ve written the data, you can read it from the branch to check it. This can be in the same process, or a completely external one. The key thing is that the main table remains unchanged, so users won’t see any of the changed data at this stage.
To read from the branch you can use not one, but three different options:
- Add a
.branch_<branch-name>
suffix:SELECT foo, bar FROM db.table.branch_etl_job_42;
- Use the
VERSION AS OF
clause:SELECT foo, bar FROM db.table VERSION AS OF 'etl_job_42';
- Set
spark.wap.branch
:spark.conf.set('spark.wap.branch', 'etl_job_42')
SELECT foo, bar FROM db.table;
Publish
Once your audit passes, you are ready to publish the branch. You do this with a fast-forward merge of the branch using the manageSnapshots().fastForwardBranch
API:
table
.manageSnapshots()
.fastForwardBranch("main", "etl_job_42")
.commit();
Unlike the ManageSnapshots().cherrypick
API for which there is a stored-procedure exposed (cherrypick_snapshot
) as of 1.2.1 there isn’t for fastForwardBranch
, so you have to invoke the API directly.
Iceberg – WAP Summary
- Granularity: Table level only
- Data Format: Apache Iceberg only
- Isolation / Dirty Reads: Yes, can query the audit table from any other Iceberg client
- Ease of use: Very easy to use, extremely intuitive and logical syntax
- Cross-language support: Everything except the merge can be done from SQL or PySpark
Apache Hudi
Hudi has what I would call WAP-lite, with the use of Pre-Commit Validators which were added back in v0.9 in 2021. It ships with three of these validators, and you can extend it to define your own.
Crucially, it doesn’t allow for an asynchronous audit. Instead, it’s an all-or-nothing operation. This sample from the documentation gives an idea of how it works:
df.write.format("hudi").mode(Overwrite).
option(TABLE_NAME, tableName).
option("hoodie.precommit.validators",
"org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator").
option("hoodie.precommit.validators.single.value.sql.queries",
"select count(*) from <TABLE_NAME> where col=null#0").
save(basePath)
At the point of writing your data (WRITE), the pre-commit validator is invoked (AUDIT), and if it succeeds the write goes ahead (PUBLISH).
You can see from this that there’s no option to actually stage the written data for audit by a secondary process. This strikes me as limiting in terms of tooling, but also for troubleshooting. Assuming the validator fails, I guess you land the faulty data into a temp area from which you can inspect and fix it, and then re-run the original process.
Unlike Iceberg—with its wap
-based configuration naming—Hudi isn’t claiming to implement WAP as such, it’s just the pre-commit validators fit the pattern approximately. If Hudi is your tool and the validators fit fine with your pipeline implementation, that’s great and there’s no reason to not use them.
Hudi – WAP Summary
- Granularity: Individual write operation
- Data Format: Apache Hudi only
- Isolation / Dirty Reads: No – audit only possible within the write process itself
- Ease of use: If you have to use it, it’ll be usable, I’m sure. It’s not easy to pick up from the outside though, and it definitely doesn’t fit the WAP pattern seamlessly
- Cross-language support: I struggled to find any actual code examples, so this is clearly the domain of JVM folk or maybe those more accomplished at PySpark than me (which is not a high bar for sure 😉)
Delta Lake
This one’s simple – so far as I can tell, you can’t do WAP with Delta Lake.
If you use Databricks Runtime (DBR) you have the option of using a Shallow clone. I haven’t tried it out, but looking at the docs, it seems you’d need to roll your own Publish logic (compared to, for example, Iceberg’s merge branch). The example given in the docs is a bespoke MERGE INTO
statement that’s specific to the fields and logic of the write operation. This strikes me as being perhaps cumbersome and error-prone compared to simpler options.
Delta Lake (on Databricks) – WAP Summary
- Granularity: Table level
- Data Format: Delta Lake on DBR only
- Isolation / Dirty Reads: Yes
- Ease of use: Fiddly, to say the least
- Cross-language support: Java/Scala/Python/SQL
RDBMS
It’s worth remembering that whilst the data engineering world was busy burning things down and rebuilding them and the associated features within such as transactions, RDBMS have been merrily doing this for decades. In the same way that Apache Hudi offers a “Pre-Commit Validator”, so you can BEGIN TRANSACTION
in an RDBMS, write your data, audit it, and then COMMIT
to publish.
For more flexibility closer to the WAP pattern that enables asynchronous audit by an external process you have things like partition exchange. With this you write to a temporary table, audit it (from a different process if you want), and then publish it by switching it in as a partition of the table from which users read. Oracle calls this partition exchange (with variations on a theme including hybrid partitions ), and SQL Server SWITCH IN
.
- Granularity: Table partition level (partition exchange)
- Data Format: RDBMS
- Isolation / Dirty Reads: Yes
- Ease of use: Fairly easy, but constrained to partitioned tables only
- Cross-language support: SQL plus whatever APIs the RDBMS supports
lakeFS
OK, okay, ok. This is a lakeFS blog. What else am I going to say except for lakeFS is the best? Of course. But, perhaps it’s not! But probably, it is 😉
lakeFS works like Git, but for your data lake. The best example of a WAP-supporting tool so far is Apache Iceberg with its explicit use of branches and merging. lakeFS is branches and merging, which means that it supports what Iceberg does, and then some, namely:
- Any data format. This includes the open-table formats (Iceberg, Hudi, and Delta), as well as file formats including Parquet, CSV, etc – as well as binary files such as images
- Any compute engine that supports S3
- Cross-collection consistency within an entire data lake if required. In other words, multi-table transaction support.
WAP in lakeFS
You can see this in action in this notebook, and an example implementation of it in the next blog in this series.
lakeFS is a layer on top of your S3-compatible storage, with native clients for Spark and an S3-compatible gateway meaning that it will seamlessly interface with pretty much any data processing tool.
The only thing to really wrap your head around is that instead of addressing your data into literal object store paths such as
s3://my-bucket/foo/bar/table
you use lakeFS-based pathing, which includes the name of the repository and current branch:
s3a://my-repo/branch/foo/bar/table
Write
Before you make your write, you need to create a branch. Branches in lakeFS are copy-on-write, meaning that they’re basically “free” – you only start storing data (other than metadata) on disk once you write changes to the branch, and only then the data that has changed.
You can create a branch in various ways in lakeFS including from the Python client:
lakefs.create_branch(repository="my-repo",
branch_creation=BranchCreation(
name="my-etl-job",
source="main".
)
You can also use the CLI ( lakectl branch create
), the web interface, or call the REST API directly.
With a branch created, you go ahead and make your changes to one or more tables, in whatever format you want to write.
When writing the data, you use the branch name in the path to which you are writing:
repo="my-repo"
branch="my-etl-job"
base_datapath=(f"s3a://{repo}/{branch}")
df.write.mode('overwrite').save(f"{base_datapath}/my/table")
The beauty of lakeFS is that you could be using simple parquet files or open table formats such as Delta Lake (or Hudi and Iceberg too):
CREATE OR REPLACE TABLE this_is_a_delta_table
USING DELTA LOCATION 's3a://my-repo/my-etl-job/this_is_a_delta_table'
AS SELECT * FROM src_table
At this point, the data on the main
branch is entirely unchanged and isolated from any of the processing done here. With the data written, you’re now ready to audit it.
Audit
Audit can be an entirely standalone process. Anything that can read data from S3 can read data from lakeFS, and thus perform the audit.
For example, we could check for NULLs in the loaded data of a Delta table:
SELECT COUNT(*) AS ct
FROM delta.`s3a://my-repo/my-etl-job/this_is_a_delta_table`
WHERE year IS NULL
or inspect a CSV file that we saved as part of the Write phase:
spark.read.text('s3a://my-repo/my-etl-job/some_src_data.csv').sample(fraction=0.2, seed=42).show()
and so on. Whether Spark, Trino, Presto, Python, or many other data processing tools – all work with lakeFS.
Publish
If you’re happy with the audit, then you can move on to publish. This is done by merging the branch, as with creating a branch, there are a variety of ways to invoke the API. Here’s one using the lakectl
tool:
lakectl merge \
lakefs://my-repo/my-etl-job \
lakefs://my-repo/main
Unpublish
Wait, what? WAPU? Well, not really. But if you need to you can unpublish a branch, also known as rollback or revert.
lakectl branch revert \
lakefs://my-repo/main \
main --parent-number 1 --yes
lakeFS – WAP Summary
- Granularity: Single file up to entire data lake. Supports multi-table transactions (cross-collection consistency)
- Data Format: Any and every format is supported
- Isolation / Dirty Reads: Yes. You can query a branch from any tool that can read from S3
- Ease of use: Very easy to use, extremely intuitive and logical syntax
- Cross-language support: Web interface, REST API, CLI, and clients for Python and Java
Project Nessie
Nessie describes itself as a Transactional Catalog for Data Lakes. It also provides Git-like semantics as lakeFS does but with some important differences.
Whilst lakeFS opts to control objects at the store level itself—and thus provides compatibility with any S3-supporting tool—Nessie implements it at the Catalog level. This means that in some senses integration is tighter, but conversely means that it’s more limited.
Nessie was born out of close collaboration with the Apache Iceberg project and has good support for it. It has also recently added some limited support for Delta Lake (although not on Databricks).
WAP in Nessie
You can see this in action in this notebook.
Write
Once you’ve configured Nessie as your catalog (e.g., in Spark), you create a branch:
CREATE BRANCH etl_job_42
IN my_catalog FROM main
You then set the context for your session to this branch:
USE REFERENCE etl_job_42 IN my_catalog
After this you can go ahead and make changes to as many tables as you want:
DELETE FROM my_catalog.table WHERE borough='Manhattan'
Audit
Once you’ve written the data, you can read it from the branch to check it. This can be in the same process, or via an external one. The key thing is that the table(s) in the main branch remain unchanged, so users won’t see any of the changed data at this stage.
To read from the branch you can use not two different options:
- Set the context, just as when you did the write (in fact, if it’s in the same session you don’t need to set it again as it will still be valid)
USE REFERENCE etl_job_42 IN my_catalog; SELECT my, audit, query FROM my_catalog.table;
- Reference the table using
@<branch name>
directly:SELECT my, audit, query FROM my_catalog.`table@etl_job_42`;
Publish
To publish the data, you merge your branch back into main
for all to see:
MERGE BRANCH etl_job_42 INTO main IN my_catalog
Nessie – WAP Summary
- Granularity: Catalog level (i.e. supports multi-table transactions)
- Data Format: Iceberg, plus limited support for Delta (not on Databricks)
- Isolation / Dirty Reads: Yes
- Ease of use: Very intuitive syntax
- Cross-language support: SQL, Python, and Java. The documentation is first-class for SQL, and perhaps less comprehensive for Python and Java.
So You Want to WAP; Which Tool Should You Use?
What would a vendor blog be without a comparison chart putting their tool as the clear choice? 🤣
How you build your pipelines, how complex they are, and how many of them you have are all factors that will help you determine which tool is going to be the most suitable here.
Should you adopt a particular tool just for its suitability for use with WAP? Probably not. Should a tool’s ability to support WAP factor into your overall choice of tool? It’s not a bad idea.
I’ve already split out the criteria below, but would narrate the main decision points thus:
- Are you using an open-table format already? If so, you may find it has a WAP implementation already that will work great for you and you’re all set. Apache Iceberg is a good example of this.
- Do you have multiple formats of data (heterogeneous data lake), or want to keep your options open for the future? lakeFS will fit well here.
- lakeFS is a great fit if you want to use something like Apache Hudi or Delta Lake but want full support for the WAP pattern.
- What’s your language(s) of choice for implementation? Some tools are Java-heavy, others SQL, whilst some have good all-round coverage
- With what atomicity do you want to control your WAP? Per table, or more broadly? Nessie and lakeFS both support branching across a whole catalog (Nessie) or data lake (lakeFS).
Tool | Granularity | Data Format | Isolation / Dirty Reads | Ease of use | Cross-language support | |
---|---|---|---|---|---|---|
🟢 | Apache Iceberg | Per table | Iceberg only | Yes | Very easy to use, extremely intuitive and logical syntax | Everything except the merge can be done from SQL, Python, or Java |
🟢 | lakeFS | Single file up to entire data lake. Supports multi-table transactions (cross-collection consistency.) | Any and every format is supported | Yes | Easy to use, intuitive and logical syntax | Web interface, REST API, CLI, and clients for Python and Java |
🟢 | Nessie | Catalog level (i.e. supports multi-table transactions) | Iceberg and limited support for Delta | Yes | Very intuitive syntax | SQL, Python, and Java. The documentation is first-class for SQL, and perhaps less comprehensive for Python and Java. |
🟠 | Apache Hudi | Individual write operation | Hudi only | No – audit only possible within the write process itself | If you have to use it, it’ll be usable, I’m sure. It’s not easy to pick up from the outside though, and it definitely doesn’t fit the WAP pattern seamlessly. | I struggled to find any actual code examples, so this is clearly the domain of JVM folk, or maybe those more accomplished at PySpark than me (which is not a high bar for sure 😉) |
🟠 | Delta Lake (Databricks runtime only) | Per table (shallow clone) | Delta Lake only | Yes | Fiddly, to say the least. Doesn’t fit the WAP pattern intuitively. | Java/Scala/Python/SQL |
🟠 | RDBMS (Oracle and SQL Server) | Table partition only | RDBMS | Yes | Fairly easy, but constrained to partitioned tables only | SQL plus whatever APIs the RDBMS supports |
And finally…
In the last and final part of this blog series, I’m going to take a closer look at one of the tools shown in this article – lakeFS. With a notebook that you can take and run for yourself, I demonstrate an implementation of Write-Audit-Publish in a step-by-step guide.
Table of Contents