Jonathan Rosenberg, Tal Sofer
August 15, 2022

lakeFS is an interface to the data lake, or the parts of the data lake one chooses to version control. The lakeFS interface is S3 compatible, and hence easily used with all common data applications, including Spark.

In some cases, lakeFS is first adopted by the teams responsible for the data ingested to the lake, as it allows testing the quality of the ingested data in isolation, before exposing it to the consumers. Other data sets, that originate from other sources, may be introduced directly to an S3 bucket, not managed by lakeFS. In that case consumers of the data for more advanced analysis, such as ETLs and ML modeling, may consume data sets both from lakeFS, and directly from S3.

A common concern in such cases is: “I use explicit paths to the data within my spark jobs. Some of the datasets used by the spark jobs are managed in lakeFS, and some are managed directly on S3. Do I need to change my spark jobs code to the new explicitly lakeFS paths?” This concern is usually backed by a facepalm gesture: 🤦as it is an exhaustive, error prone action to take.

Concerning indeed. 

Well, the answer is NO. No need to change the code. Just like our Boto3 router , one can use the hadoop router fs, to apply the right data source to an explicit S3 path in an existing code , by using configuration only. 

How do I do this, you ask? Let’s get right to it.

How to configure RouterFS

Configure Spark to use RouterFS

Instruct Spark to use RouterFS as the file system implementation for the URIs you would like to transform at runtime by adding the following property to your Spark configurations:

				
					fs.${fromFsScheme}.impl=io.lakefs.routerfs.RouterFileSystem
				
			
For example, by adding the fs.s3a.impl=io.lakefs.routerfs.RouterFileSystem configuration, , you are instructing Spark to use RouterFS as the file system for any URI with: scheme=s3a

Add custom mapping configurations

RouterFS consumes your mapping configurations to understand which paths it needs to modify and how to modify them. It then performs a simple prefix replacement accordingly.

Mapping configurations are Hadoop properties of the form:

				
					routerfs.mapping.${fromFsScheme}.${mappingIdx}.(replace|with)=${path-prefix}
				
			

For a given URI, RouterFS scans the mapping configurations defined for the URI’s scheme, searches for the first mapping configuration that matches the URI prefix, and transforms the URI according to the matching configuration.

Check out the instructions for how to define your mapping configurations correctly. 

Default file system

For each mapped scheme you should configure a default file system implementation in case no mapping is found.

Add the following configuration for the schemes you configured RouteFS to handle:

				
					routerfs.default.fs.${fromFsScheme}=${the file system you used for this scheme without routerFS}
				
			
For example, by adding: routerfs.default.fs.s3a=org.apache.hadoop.fs.s3a.S3AFileSystem You are instructing RouterFS to use S3AFileSystem for any URI with scheme=s3a for which RouterFS did not find a mapping configuration.

When no mapping was found

In case RouterFS can’t find a matching mapping configuration, it will make sure that it’s handled by the default file system for the URI scheme.

Example

Given the following mapping configurations:

				
					fs.s3a.impl=io.lakefs.routerfs.RouterFileSystem
routerfs.mapping.s3a.1.replace=s3a://bucket/dir1/ # mapping src
routerfs.mapping.s3a.1.with=lakefs://repo/main/ # mapping dst
routerfs.mapping.s3a.2.replace=s3a://bucket/dir2/ # mapping src
routerfs.mapping.s3a.2.with=lakefs://example-repo/dev/ # mapping dst
routerfs.default.fs.s3a=org.apache.hadoop.fs.s3a.S3AFileSystem # default file system implementation for the `s3a` scheme

				
			
For the URI s3a://bucket/dir1/foo.parquet, RouterFS will perform the next Steps:
  1. Scan all routerfs mapping configurations that include the s3a scheme in their key: routerfs.mapping.s3a.${mappingIndex}.replace.
  2. Iterate the configurations by the order of the priorities specified by ${mappingIdx} and try to match the URI prefix to the configurations values. The iteration stops once reaching the s3a://bucket/dir1/ prefix that matches the URI s3a://bucket/dir1/foo.parquet.
  3. Replace it with the destination mapping value: lakefs://repo/main/ to create the desired URI: lakefs://repo/main/foo.parquet.
For the URI s3a://bucket/dir3/bar.parquet, RouterFS will perform the next Steps:
  1. Scan all routerfs mapping configurations that include the s3a scheme in their key: routerfs.mapping.s3a.${mappingIndex}.replace.
  2. Iterate the configurations by the order of the priorities specified by ${mappingIdx} and try to match the URI prefix to the configurations values. The iteration stops with no matching mapping.
  3. Fall back to the default file system implementation (S3AFileSystem) and leave the URI as it is.

Configure File Systems Implementations

The final configuration step is to instruct Spark what file system to use for each URI scheme. Make sure to add this configuration for any URI scheme you defined a mapping configuration for. For example, to instruct Spark to use S3AFileSystem for any URI with: scheme=lakefs
				
					fs.lakefs.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
				
			

Usage

Run your Spark Application with RouterFS

After building RouterFS, the build artifact is a jar under the target directory. You should supply this jar to your Spark application when running the application, or by placing it under your $SPARK_HOME/jars directory.

Usage with lakeFS

The current version of RouterFS only works for Spark applications that interact with lakeFS via the S3 Gateway. That is, you can’t use both RouterFS and LakeFSFileSystem together, but we have concrete plans to make this work.

Configuring RouterFS to work with lakeFS and S3

The current version of RouterFS requires the use of S3AFileSystem’s per-bucket configuration functionality to support multiple mappings that use S3AFileSystem as their file system implementation. That means that the compiled Hadoop version should be >= 2.8.0.

The per-bucket configurations treat the first part of the path (also called the “authority”) as the bucket to which we configure the S3A file system property.

For example, for the following configurations:

				
					fs.s3a.impl=io.lakefs.routerfs.RouterFileSystem
routerfs.mapping.s3a.1.replace=s3a://bucket/dir/
routerfs.mapping.s3a.1.with=lakefs://repo/branch/
routerfs.default.fs.s3a=org.apache.hadoop.fs.s3a.S3AFileSystem

fs.lakefs.impl=org.apache.hadoop.fs.s3a.S3AFileSystem

# The following configs will be used when URIs of the form `lakefs://repo/...` will be addressed
fs.s3a.bucket.repo.endpoint=https://lakefs.example.com
fs.s3a.bucket.repo.access.key=AKIAlakefs12345EXAMPLE
fs.s3a.bucket.repo.secret.key=abc/lakefs/1234567bPxRfiCYEXAMPLEKEY
...
# The following configs will be used when any non-mapped s3a URIs will be addressed
fs.s3a.endpoint=https://s3.us-east-1.amazonaws.com
fs.s3a.access.key=...
fs.s3a.secret.key=...

				
			

the configurations that begin with fs.s3a.bucket.repo will be used when trying to access lakefs://repo/<path>.

All other fs.s3a.<conf> properties will be used for the general case.

Working example

Please refer to the sample app.

 

Conclusion

The lakeFS community is continuously working to reduce the friction of integrating lakeFS into the workflow, to allow the benefits of lakeFS with as little overhead as possible.

 

The hadoop router fs is another step in the journey of using lakeFS to allow data engineering best practices for data resilience, with a smooth and simple integration into your workflow.

 

If you liked this capability and would like to learn more about it – feel free to book some time with one of our engineers👇

LakeFS

  • Get Started
    Get Started
  • Join our live webinar on October 12th:

    Troubleshoot and Reproduce Data with Apache Airflow
    +