When building a data lake, there is perhaps no more consequential decision than the format data will be stored in. The outcome will have a direct effect on its performance, usability, and compatibility.
It is inspiring that by simply changing the format data is stored in, we can unlock new functionality and improve the performance of the overall system.
- Atomic Transactions — Guaranteeing that update or append operations to the lake don’t fail midway and leave data in a corrupted state.
- Consistent Updates — Preventing reads from failing or returning incomplete results during writes. Also handling potentially concurrent writes that conflict.
- Data & Metadata Scalability — Avoiding bottlenecks with object store APIs and related metadata when tables grow to the size of thousands of partitions and billions of files.
Let’s take a closer look at the approaches of each format with respect to update performance, concurrency, and compatibility with other tools. Finally, we’ll provide a recommendation on which format makes the most sense for your data lake.
Originally open-sourced by Uber, Hudi was designed to support incremental updates over columnar data formats. It supports ingesting data from multiple sources, primarily Apache Spark and Apache Flink. It also provides a Spark based utility to read from external sources such as Apache Kafka.
Iceberg supports Apache Spark for both reads and writes, including Spark’s structured streaming. Trino (PrestoSQL) is also supported for reads with limited support for deletions. Apache Flink is supported for both reading and writing. Finally, Iceberg offers read support for Apache Hive.
Delta Lake is maintained as an open-source project by Databricks (creators of Apache Spark) and not surprisingly provides deep integration with Spark for both reading and writing.
Read support is available for Presto, AWS Athena, AWS Redshift Spectrum, and Snowflake using Hive’s SymlinkTextInputFormat. Though this requires exporting a symlink.txt file for every Delta table partition, and as you might suspect, becomes expensive to maintain for larger tables.
Update Performance & Throughput
Support for row-level updates over large, immutable objects can be done in several ways, each with its unique trade-off regarding performance and throughput.
Let’s look at the strategy each data format employs for UPSERT operations. We’ll also touch on additional optimizations related to read performance.
Hudi tables are flexible (and explicit) in the performance trade-offs they offer when handling UPSERTS. The trade-offs differ between the two different types of Hudi tables:
- Copy on Write Table — Updates are written exclusively in columnar parquet files, creating new objects. This increases the cost of writes, but reduces the read amplification down to zero, making it ideal for read-heavy workloads.
- Merge on Read Table — Updates are written immediately in row-based log files and periodically merged into columnar parquet. Interestingly, queries can either include the latest log file data or not, providing a useful knob for users to choose between data latency and query efficiency.
For more information on the tunable performance trade-offs provided by Hudi, see Performance Latency for Hudi Writing.
Hudi further optimizes compactions by utilizing Key Indexing to efficiently keep track of which files contain stale records.
With the release of Spark 3.0 last summer, Iceberg supports upserts via MERGE INTO queries. They work using the straightforward copy-on-write approach in which files with records that require an update get immediately rewritten.
Where Iceberg excels is on read performance with tables containing a large number of partitions. By maintaining manifest files that map object to partitions and keep column-level statistics, Iceberg avoids expensive object-store directory listings or the need to fetch partition data from Hive.
Additionally, Iceberg’s manifests allow a single file to be assigned to multiple partitions simultaneously. This makes Iceberg tables efficient at partition pruning and improves the latency of highly selective queries.
During a MERGE operation, Delta uses metadata-informed data skipping to categorize files as either needing data inserted, updated, or deleted. It then performs these operations and records them as “commits” in a JSON log file called the Delta Log. These log files are rewritten every 10 commits as a Parquet “checkpoint” file that save the entire state of the table to prevent costly log file traversals.
To stay performant, Delta tables need to undergo periodic compaction processes that take many small parquet files and combine them into fewer, larger files (optimally ~1GB, but at least 128MB in size). Delta Engine, Databricks’ proprietary version, supports Auto-Compaction where this process is triggered automatically, as well as other behind-the-scenes write optimizations.
Allowing in-place updates for data tables means dealing with concurrency.
What happens if someone reads a table while it is being updated? And what happens when multiple writers make conflicting changes at the same time?
Typically databases solve this problem via Multiversion Concurrency Control (MVCC), a method that utilizes a logical transaction log where all changes are appended.
Another approach called Optimistic Concurrency Control (OCC) allows multiple writes to occur simultaneously, only checking for conflicts before a final commit. If conflict is detected, one of the transactions is retried until it succeeds.
True to form, Hudi offers both MVCC and OCC concurrency controls.
MVCC with Hudi means all writes must be totally ordered in its central log. To offer this guarantee, Hudi limits write concurrency to 1, meaning there can only be a single writer to a table at a given point in time.
Iceberg tables support Optimistic Concurrency (OCC) by performing an atomic swapping operation on metadata files during updates.
The way this works is every write creates a new table “snapshot”. Writers then attempt a Compare-And-swap (CAS) operation on a special value holding the current snapshot ID. If no other writer replaces the snapshot during a commit, the operation succeeds. If another writer makes a commit in the meantime, the other writer will have to retry until successful.
On distributed file systems such as HDFS, this can be done natively. For S3, an additional component is required to store pointers (currently only Hive Metastore is supported).
The Delta documentation explains that it uses Optimistic Control to handle concurrency, since the majority of data lake operations append data to a time-ordered partition and won’t conflict.
In the situation where two processes add commits to a Delta Log file, Delta will “silently and seamlessly” check if the file changes overlap and allow both to succeed if possible.
The implication though, is that the underlying object store needs a way of providing either a CAS operation or a way of failing a write when multiple writers start to overwrite each other’s log entries.
Similar as with Iceberg, this functionality is possible out-of-the-box on HDFS, but is not supported by S3. As a result, writing from multiple Spark clusters is not supported in Delta on AWS with true transactional guarantees.
Note: The proprietary Delta Engine version supports multi-cluster writes on S3 using an external synchronization server managed by Databricks itself.
So which one is right for you?
If you’ve made it this far, we’ve learned about some of the important similarities and differences between Apache Hudi, Delta Lake, and Apache Iceberg.
The time has come to decide which format makes the most sense for your use case! My recommendation is guided by which scenario is most applicable:
Go with Iceberg if…
Your primary pain point is not changes to existing records, but the metadata burden of managing huge tables on an object store (more than 10k partitions). Adopting Iceberg will relieve performance issues related to S3 object listing or Hive Metastore partition enumeration.
On the contrary, support for deletions and mutations is still preliminary, and there is operational overhead involved with data retention.
Go with Hudi if…
You use a variety of query engines and require flexibility for how to manage mutating datasets. Be mindful that supporting tooling and the overall developer experience can be rough around the edges. Though possible, there is also operational overhead required to install and tune Hudi for real, large-scale production workloads.
If you’re using AWS managed services such as Athena, Glue or EMR — Hudi already comes pre-installed and configured, and is supported by AWS.
Go with Delta Lake if…
You’re primarily a Spark shop and expect relatively low write throughput. If you are also already a Databricks customer, Delta Engine brings significant improvements to both read and write performance and concurrency, and it can make sense to double down on their ecosystem.
For other Apache Spark distributions, it is important to understand that Delta Lake, while open source, will likely always lag behind Delta Engine to act as a product differentiator.
Integration With lakeFS
If you were wondering “Can I use lakeFS in conjunction with these data formats?”… The answer is Yes!
lakeFS can works symbiotically with any of Delta, Iceberg, or Hudi, providing the ability to branch, merge, rollback, etc. across any number of tables.
Since these formats operate at the table level, they provide no guarantees for operations that span multiple. With lakeFS, it is possible to modify multiple tables on a single isolated branch, then merge those changes atomically into a main branch, achieving cross-table consistency.
This article was contributed to by Paul Singman.
The lakeFS project is an open source technology that provides a git-like version control interface for data lakes, with seamless integration to popular data tools and frameworks.
Our mission is to maximize the manageability of open source data analytics solutions that scale.