Ready to dive into the lake?
lakeFS is currently only
available on desktop.

For an optimal experience, provide your email below and one of our lifeguards will send you a link to start swimming in the lake!

lakeFS Community
Robin Moffatt
Robin Moffatt Author

Robin is a Principal DevEx Engineer at lakeFS. He has...

June 1, 2023

Building resilient data pipelines and delivering trusted data to users are important duties of a data engineer. Using the Write-Audit-Publish (WAP) pattern in the pipelines we build can be a really useful component in helping us deliver on this requirement.

Be sure to check out my previous blogs where I explained WAP in detail, why you should consider using it, and how different technologies do (or don’t) support it.

In this article, I’m going to show you exactly how WAP can work in practice. The technology stack is:

  • Compute: Apache Spark v3.3 (using pySpark and Spark SQL)
  • Storage format: Delta Lake v2.3.0
  • Data Version Control: lakeFS v0.100.0
  • Storage: MinIO

The principles shown here would also work just as well with other S3-compatible storage including S3, GCS, and ABS/ADLS v2. I could also swap out Delta for (or use alongside) Apache Hudi, Apache Iceberg, Parquet, etc.

The environment is provided through the Docker Compose-provisioned Everything Bagel stack which you can run yourself to try out the notebook on which this blog is based.

Setup

The first thing we’ll do is set up the data to give you a feel for what it is and how we handle it. We’ll commit that to the main branch. If you just want to see WAP in practice, then skip below to the β€œWAP stuff happens now” section.

Import the source data

I’m using data from The Morrow Plots , an agricultural dataset from Illinois with over 130 years of information about crop farming including crop variety details, planting density, and yields. My thanks as always to Data Is Plural for helping publicise the availability of interesting data!

lakeFS provides an S3-compatible interface, using MinIO in the background for the actual storage of data. Before loading the data into Delta, I’m going to add the source CSV files into lakeFS so that they’re under source control too. Since we’re talking about S3, boto is a useful library here to copy files from local storage to S3-compatible (i.e. lakeFS)


s3 = boto3.client('s3', endpoint_url='http://lakefs:8000/', aws_access_key_id='AKIAIOSFODNN7EXAMPLE', aws_secret_access_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY')

# Here's the source data folder
folder_path = '/home/jovyan/work/WAP/DOI-10-13012-b2idb-7865141_v1'

# Set the S3 bucket name and key prefix
bucket_name = repo.id
branch = "main"
key_prefix = f"{branch}/src/"

# Iterate over the files in the folder and upload each file to S3
for root, dirs, files in os.walk(folder_path):
    for file in files:
        local_path = os.path.join(root, file)
        s3_key = os.path.join(key_prefix, os.path.relpath(local_path, folder_path))
        s3.upload_file(local_path, bucket_name, s3_key)
        print(f"Uploaded {local_path} to {bucket_name}/{s3_key}")

Now in lakeFS we have the source data, as well as related files including the data dictionary in PDF form.

If you happen to peek under the covers at the storage layer, you’ll see the actual data files:

However, I’m showing you this for curiosity and background aloneβ€”the point of lakeFS is that it worries about the object store, and you simply interface with lakeFS alone when it comes to storage.

Having written the data to lakeFS, we follow standard software development practice and commit it to the branch (since it’s a logical, self-contained unit of work that we’d want the option to roll back to).

api_instance.commit(repo.id,
                    'main',
                    CommitCreation(
                        message="Import the source data",
                        metadata={'url': 'https://databank.illinois.edu/datasets/IDB-7865141',
                                  'citation': 'Morrow Plots Data Curation Working Group (2022): Morrow Plots Treatment and Yield Data. University of Illinois at Urbana-Champaign. https://doi.org/10.13012/B2IDB-7865141_V1'
                                 }) )

Here’s where we’re at – a set of data committed to the main branch.

Load the source data

With the source data imported, we can now load it and store it as a Delta table. We’re also going to build two aggregates on the data. Since we’re making changes, we’ll create a branch to work on:

branch="initial_load"
api_instance.create_branch(repo.id, 
                           BranchCreation(
                                name=branch,
                                source="main"
                            ))

The root of the S3 path is the name of the lakeFS repository (called example). To this, we add the branch name, and that gives us the base path for the data that we want to store.

data_dir=repo.storage_namespace.replace('s3','s3a')
base_datapath=(f"{data_dir}/{branch}")
print(f"Branch:\t\t{branch}\nBase data path:\t{base_datapath}")
Base data path: s3a://example/initial_load

Using this base data path, we can load the source CSV into a data frame and use it to build the Delta table.

src_csv=(f"{base_datapath}/src/morrow-plots_v01_1888-2019_soil.csv")
df = spark.read.csv(src_csv,inferSchema=True,header=True)
df.write.format("delta").mode('overwrite').save(f"{base_datapath}/raw/soil")

We’ll commit our changes before going any further.

api_instance.commit(repo.id, branch, CommitCreation(
    message="Convert CSV to Delta Lake table",
    metadata={'src_file': 'morrow-plots_v01_1888-2019_soil.csv'}
) )

Here’s how it shows up in the web UI:

Build aggregates

We’re still using the initial_load branch because, well, it’s the initial load. From our unprocessed soil table, we’ll build a couple of pre-computed aggregates for two of the dimensions; variety and plot:

dims = {
    "variety",
    "plot"
}

for dim in dims:
    spark.sql(f"""CREATE OR REPLACE TABLE agg_{dim} USING DELTA 
                    LOCATION '{base_datapath}/aggs/agg_{dim}'
                    AS SELECT {dim}, 
                              COUNT(*) AS record_ct, 
                              PERCENTILE_APPROX(yield_bush,0.5) AS median_yield,
                              min(year) AS min_year,
                              max(year) AS max_year
                        FROM delta.`{base_datapath}/raw/soil`
                        GROUP BY {dim}""")

spark.sql("show tables").show()
+---------+-----------+-----------+
|namespace|  tableName|isTemporary|
+---------+-----------+-----------+
|  default|   agg_plot|      false|
|  default|agg_variety|      false|
+---------+-----------+-----------+

.

Commit and Merge

We’re happy with the aggregates, so we commit the change and then merge it back into main:

api_instance.commit(repo.id, branch, CommitCreation(f"Build two aggregate tables"))

lakefs.refs.merge_into_branch(repository=repo.id, source_ref=branch, destination_branch='main')

πŸ‘€ WAP stuff happens now πŸ‘‡πŸ»

So this is what we came here forβ€”what does it look like to use Write-Audit-Publish?

We’ve got some more soil data, for 2020 and 2021, and we’re going to load it using WAP along the way to check data quality and catch any problems with the pipeline.

Just as any good software developer would do, we’re going to branch from our mainline and work on a dedicated branch. Branches are cheap – lakeFS only stores data as it changes. So the pattern you would use with ETL/ELT jobs would be exactly like this:

  1. Branch mainline to branch dedicated for job
  2. Do processing
  3. Once completed & verified, merge back to main
  4. Delete branch

Let’s create the branch:

branch="load_new_data_20-21"
api_instance.create_branch(repo.id, BranchCreation(branch,"main"))

As we saw above, the branch name is integral part of the S3 path that we read and write from, so we’ll update the variable that we’re using for this:

base_datapath=(f"{data_dir}/{branch}")
print(f"Branch:\t\t{branch}\nBase data path:\t{base_datapath}")
Branch:         load_new_data_20-21
Base data path: s3a://example/load_new_data_20-21

Add the new data to the base table

For the convenience of this walkthrough, the new data is actually in a CSV file in the same location as before. Let’s load it into a data frame and verify that it’s data for the years that we’re expecting:

src_csv=(f"{data_dir}/{branch}/src/morrow-plots_v01x_2020-2021_soil.csv")
df = spark.read.csv(src_csv,inferSchema=True,header=True)
df.createOrReplaceTempView("soil_src")
spark.sql("SELECT min(year),max(year) FROM soil_src").show()
+---------+---------+
|min(year)|max(year)|
+---------+---------+
|     2020|     2021|
+---------+---------+

This looks good, so we’ll now append it to the existing table. Remember we’re writing the data to the current branch – the table on the main branch is unaffected.

df.write.format("delta").mode('append').save(f"{base_datapath}/raw/soil")

BUT this fails!

AnalysisException: Failed to merge fields 'rotation' and 'rotation'. 
Failed to merge incompatible data types StringType and IntegerType

Bear in mind that we’re importing from CSV files which by their nature don’t have data types. Spark takes a best guess at the type, and what it chose for the rotation field differs between the initial import and the new set of data.

So, now we align the schemas and then write the newly-aligned data to the existing table:

from pyspark.sql.functions import col

existing_table = spark.read.format("delta").load(f"{base_datapath}/raw/soil")

# Get the schema of each table
existing_schema = existing_table.schema
new_schema = df.schema

# Compare the schema of the tables
if existing_schema == new_schema:
    print("The schema of the two tables is the same.")
else:
    print("The schema of the two tables is different.")
    # Find any mismatched field types
    for field1, field2 in zip(existing_schema, new_schema):
        if field1 != field2:
            print(f"Field {field1.name} has type {field1.dataType} in existing table, but type {field2.dataType} in import data. Fixing.")
            # Update the newly-imported data to have the same schema as the existing data
            df = df.withColumnRenamed(field2.name, field1.name).withColumn(field1.name, col(field2.name).cast(field1.dataType))

df.write.format("delta").mode('append').save(f"{base_datapath}/raw/soil")

This succeeds, and now we have two versions of the soil table. The main one is what all end-users are querying, and continues to show the previous data.

The new version of the table includes the data for the additional two years:

Rebuild the aggregates with the new data

dims = {
    "variety",
    "plot"
}

for dim in dims:
    spark.sql(f"""CREATE OR REPLACE TABLE agg_{dim} USING DELTA 
                    LOCATION '{base_datapath}/aggs/agg_{dim}'
                    AS SELECT {dim}, 
                              COUNT(*) AS record_ct, 
                              PERCENTILE_APPROX(yield_bush,0.5) AS median_yield,
                              min(year) AS min_year,
                              max(year) AS max_year
                        FROM delta.`{base_datapath}/raw/soil`
                        GROUP BY {dim}""")

Just as with the raw table, the aggregate changes are not published yet. The main copy of the data remains as it was before.

Let’s query each instance to double-check.

Staged/unpublished data (branch load_new_data_20-21)

SELECT "agg_variety" AS table,
    COUNT(*) AS row_ct,
    min(min_year),
    max(max_year)
FROM delta.`s3a://example/load_new_data_20-21/aggs/agg_variety`
UNION ALL
SELECT "agg_plot" AS table,
    COUNT(*) AS row_ct,
    min(min_year),
    max(max_year)
FROM delta.`s3a://example/load_new_data_20-21/aggs/agg_plot`
tablerow_ctmin(min_year)max(max_year)
agg_variety4618882021
agg_plot2518882021

Published data (branch main)

SELECT "agg_variety" AS table,
    COUNT(*) AS row_ct,
    min(min_year),
    max(max_year)
FROM delta.`s3a://example/main/aggs/agg_variety`
UNION ALL
SELECT "agg_plot" AS table,
    COUNT(*) AS row_ct,
    min(min_year),
    max(max_year)
FROM delta.`s3a://example/main/aggs/agg_plot`
tablerow_ctmin(min_year)max(max_year)
agg_variety4618882019
agg_plot2418882019

Audit

At the moment, the data is written to the audit branch (load_new_data_20-21), but not published to main. How you audit the data is up to you. The nice thing about the data being staged is that you can do it within the same ETL job, or have another tool do it.

Here’s a very simple example of doing this in Python. We’re going to programmatically check that:

  1. The year on each table we’ve loaded matches the most recent year in the source CSV file
  2. From a data quality point of view, there should be no NULLs in the dimension field for each aggregate

Has the latest data been added to each table?

from pyspark.sql.functions import desc

src_csv=(f"{data_dir}/{branch}/src/morrow-plots_v01x_2020-2021_soil.csv")
latest_year_src = spark.read.csv(src_csv,inferSchema=True,header=True).orderBy(desc("year")).first()["year"]
print(f"Latest year in source file is {latest_year_src}")
Latest year in source file is 2021

Then we get the latest year in each table in turn and compare it to the source:

tables = {
    "raw/soil": "year",
    "aggs/agg_variety": "max_year",
    "aggs/agg_plot": "max_year"
}

for table, year_col in tables.items():
    df = spark.read.format("delta").load(f"{base_datapath}/{table}")
    latest_year = df.selectExpr(f"max({year_col})").collect()[0][0]

    if ( latest_year_src!=latest_year ):
        raise ValueError(f"Audit failed: latest year on {table} ({latest_year}) does not match source ({latest_year_src})")
    else:
        print(f"πŸ™Œ Audit has passed: the latest year on {table} ({latest_year}) matches the source ({latest_year_src})")

This gives us the following output and a warm fuzzy feeling on the inside that so far the data quality is looking good πŸ€—

πŸ™Œ Audit has passed: the latest year on raw/soil (2021) matches the source (2021)
πŸ™Œ Audit has passed: the latest year on aggs/agg_variety (2021) matches the source (2021)
πŸ™Œ Audit has passed: the latest year on aggs/agg_plot (2021) matches the source (2021)

Are there any NULLs in the dimension of each aggregate?

dims = {
    "variety",
    "plot"
}

for dim in dims:
    ct=spark.sql(f"SELECT COUNT(*) AS ct FROM agg_{dim} WHERE {dim} IS NULL").first()['ct']

    if ( ct!=0 ):
        raise ValueError(f"πŸ’€ Audit failed: Aggregate agg_{dim} has {ct} non-null rows")
    else:
        print(f"πŸ™Œ Audit has passed: Aggregate agg_{dim} has 0 non-null rows")

When we run this check, the output is less cheerful:

ValueError: πŸ’€ Audit failed: Aggregate agg_plot has 1 non-null rows

The audit failed! Oh noes! πŸ™€

Why would plot be null? Let’s look at the underlying data. First, is it just the new data (for 2020 and 2021)?

SELECT * FROM delta.`{base_datapath}/raw/soil` 
WHERE plot IS NULL AND year NOT IN (2020,2021);

This returns zero rows, meaning that the NULL values for plot are only in the new data (2020-2021). Let’s look at the source CSV file:

src=spark.read.text(src_csv)
src.show(1)
src.sample(fraction=0.2, seed=42).show()
+--------------------+
|               value|
+--------------------+
|phase,year,plot,p...|
+--------------------+
only showing top 1 row

+--------------------+
|               value|
+--------------------+
|5,2020,,3,SE,1,tr...|
|5,2020,,4,SE,2,fa...|
|5,2020,,5,NW,3,fa...|
|5,2021,,3,NW,1,tr...|
|5,2021,,4,NE,2,tr...|
|5,2021,,4,SW,2,tr...|
|5,2021,,5,SE,3,tr...|
+--------------------+

We can see the third field is plot (based on the header), and based on a random sample of the file seems to be always empty.

So what went wrong?

🀦🏻 It turns out we’re using the wrong source file! How very silly of us.

Let’s fix that (should be v01 in the filename and not v01x as previously) and check that it looks better:

src_csv=(f"{data_dir}/{branch}/src/morrow-plots_v01_2020-2021_soil.csv")
src=spark.read.text(src_csv)
src.show(1)
src.sample(fraction=0.2, seed=42).show()
+--------------------+
|               value|
+--------------------+
|phase,year,plot,p...|
+--------------------+
only showing top 1 row

+--------------------+
|               value|
+--------------------+
|5,2020,3SC,3,SE,1...|
|5,2020,4SD,4,SE,2...|
|5,2020,5NB,5,NW,3...|
|5,2021,3NB,3,NW,1...|
|5,2021,4ND,4,NE,2...|
|5,2021,4SB,4,SW,2...|
|5,2021,5SD,5,SE,3...|
+--------------------+

. That does look better – we can see the plot value clearly in the source.

Reverting the changes and running the load again

So what we need to do now is re-run the load of the new data. The current branch has the (invalid) data loaded but not yet committed, as we can see from the state in lakeFS:

Before we can re-run the data load we need to reset the branch to undo the existing unpublished work.

lakefs.branches.reset_branch(repo.id, branch,ResetCreation("reset"))

Now our branch is back to how it was when we branched it from main.

We re-run the load of the base and aggregate tables, bringing it to the point it was at earlier, before the failed audit. The difference this time is that the correct source file has been used and there should be no NULLs where NULLs don’t belong.

Re-running the Audit

We use the same data quality checks as above, and this time they both pass

πŸ™Œ Audit has passed: the latest year on raw/soil (2021) matches the source (2021)
πŸ™Œ Audit has passed: the latest year on aggs/agg_variety (2021) matches the source (2021)
πŸ™Œ Audit has passed: the latest year on aggs/agg_plot (2021) matches the source (2021)

πŸ™Œ Audit has passed: Aggregate agg_plot has 0 non-null rows
πŸ™Œ Audit has passed: Aggregate agg_variety has 0 non-null rows

Since the audit passed, let’s commit the changes to the branch.

api_instance.commit(repo.id, branch, CommitCreation(
    message="Add data for years 2020 and 2021",
    metadata={'src_file': 'morrow-plots_v01_2020-2021_soil.csv'}
) )

πŸŽˆπŸŽˆπŸŽˆπŸŽ‰πŸŽ‰πŸŽ‰πŸŽ‰ The Audit passed!πŸŽ‰πŸŽ‰πŸŽ‰πŸŽ‰πŸŽˆπŸŽˆπŸŽˆ
πŸ‘‰πŸ»We are now ready to Publish

The data quality checks have passed, and the data’s been committed – we’re now ready to merge the data to main and, in doing so, fulfil the Publish stage of WAP. Once in main, all users of the data will see the updates that until now have been happening in this isolated branch.

We can use lakeFS to do a diff between the two branches:

Rather usefully, we can also look at the difference between the tables themselves, based on the Delta history:

You can read more about Delta diff in lakeFS in this article.

So let’s go ahead and do the merge.

lakefs.refs.merge_into_branch(repository=repo.id, 
                              source_ref='load_new_data_20-21', 
                              destination_branch='main')

After all of that, our merry users querying the main branch now get to see the new, validated and checked, data:

Where Next?

Need help getting started?

Git for Data – lakeFS

  • Get Started
    Get Started