Using Dagster’s Embedded ELT for Lakehouse Data Ingestion
In today’s data-driven world, organizations face a common challenge: avoiding vendor lock-in while managing data effectively. While powerful data warehouses like BigQuery and Snowflake excel in performance, they come with the downside of limited portability. On the other hand, data lakes offer flexible, cost-effective storage but can quickly turn into unwieldy data dumps.
Enter the lakehouse: a hybrid solution that decouples compute from storage while addressing the shortcomings of traditional data warehouses and offering the flexibility of data lakes. In this article, we’ll explore how combining Apache Iceberg, an open table format, with Dagster’s embedded ELT framework simplifies building a scalable, open data lakehouse — allowing you to streamline your ETL pipelines and future-proof your data infrastructure.
One of the core components of a data lakehouse is a robust table format capable of managing large datasets efficiently. This is where Apache Iceberg excels. According to its website, Iceberg is a high-performance format designed for massive analytics tables. Unlike traditional data lakes, which can devolve into chaotic data dumps, Iceberg brings order and structure with its support for schema evolution, partitioning, and ACID transactions. Its unique approach to metadata management, along with seamless integration with various compute engines, ensures that your data remains reliable, scalable, and well-organized. Best of all, Iceberg allows consistent data management without locking you into proprietary platforms.
Another core component of a data lakehouse is the catalog, which acts as the metadata store for your tables, tracking schema changes, table locations, and partitions. In simple terms, the catalog enables the lakehouse to manage metadata and work efficiently with your data. I like to say that any engine capable of interfacing with your catalog can integrate with your lakehouse and manipulate your data.
While Apache Iceberg itself promotes an open and flexible architecture, the choice of a catalog can significantly impact your ability to maintain that flexibility. Although Iceberg is designed to avoid vendor lock-in, relying on a specific catalog — such as those provided by proprietary platforms — can introduce limitations. When architecting your lakehouse, it’s crucial to consider not just your current tools but also the ones you’re likely to integrate with in the future. Choosing a catalog that solves for most, if not all, of your needs is key. Ultimately, preserving the open nature of your lakehouse means selecting a catalog that maintains the portability and interoperability Iceberg offers, ensuring your architecture remains adaptable to evolving business needs and technologies.
As I explored the prospect of building an ETL pipeline for a lakehouse, one key question emerged: what would data ingestion look like if I had to store data in the Iceberg format? To answer this, I had to consider the foundational components necessary for storing data efficiently in Iceberg. I sought a tool that was lightweight, easy to get started with, and flexible enough to address common ingestion challenges like schema evolution, fault tolerance, and scalability. My primary goal was to design a solution that could handle batch loads effectively, scale, and remain easy to maintain, all while minimizing the number of tools needed to handle various sources and file formats.
My initial approach involved PyIceberg, a Python implementation for accessing Iceberg tables without requiring a JVM. However, I quickly encountered limitations. Since I wanted to limit the tools, I was using for ingestion, I initially thought of building an ingestion framework around PyIceberg with a unified interface that abstracted away the nuances of different sources. The idea was to create a declarative, user-friendly tool, but this required writing a significant amount of custom code to tie everything together. At the time of writing, PyIceberg didn’t support merge-style statements, which made it impossible to achieve incremental loads.
I was also interested in tracking ingestion metadata, performing schema validation, and exploring the limits of a self-serviced data quality platform. However, I soon realized that I was on the verge of re-inventing the wheel and over-engineering the solution. That prompted me to take a step back.
The limitations I faced with PyIceberg, along with the challenges of building a custom framework, forced me to search for a more creative solution. That’s when I discovered DLT (Data Load Tool), a lightweight Python framework designed to simplify data movement and solve common data extraction and loading problems. DLT abstracts away the chaos of managing ingestion code for various sources, and its rest_api client provides a declarative interface for ingesting data from any API. Additionally, since DLT is a Python-first framework, I could easily extend its logic to handle custom requirements if needed.
With DLT solving most of the challenges I had in mind, my next step was figuring out how to write data to an Iceberg table. Initially, I considered building a custom destination, but my limited understanding of catalog specs and how to support various catalogs was going to either derail me or slow me down significantly. In the end, I opted for Dremio to write to Iceberg via DLT. While I wasn’t thrilled about being tied to a single technology, Dremio’s ability to integrate with various catalogs made it a reasonable compromise for my needs.
Once I confirmed that I could spin up Dremio with Docker and use Minio for object storage, I needed to decide on which catalog to integrate with. I considered both Nessie and Hive Metastore, but ultimately chose Nessie. Its Git-like features and implementation of Iceberg REST gave me confidence that it could be future-proof. Next, I designed a hypothetical use case for an organization needing to ingest both API data and data from an SFTP server into a central data lakehouse. That’s when I encountered a challenge — DLT’s implementation of fsspec for their filesystem connector didn’t handle ingestion for SFTP out of the box. Again, I faced two options: extend the implementation or look for an alternative.
That’s when I discovered SlingData, which had an SFTP implementation in its filesystem connector and could also write to Iceberg via Trino. This discovery excited me because it introduced more variety in compute options and lessened the risk of vendor lock-in even further.
Finally, after identifying the tools to help build my ingestion framework, I needed a way to orchestrate everything. I was thrilled to learn that Dagster offered seamless integration with the tools I had chosen under its embedded ELT framework. Using Dagster, I was able to unify my efforts, tying them together into a single framework that significantly enhanced observability and streamlined orchestration.
Implementing the Architecture
To implement a local version of the pipeline I leveraged docker images of the technologies in the architecture image I just listed (Sling, Trino, dltHub, Dagster, Dremio, Nessie, and Minio). I also wrote a custom API with the FastAPI framework with features like pagination, authentication, and the ability to filter records based on dates. My primary motivation for building the API was to create something that gave users full control around incremental changes which occur in real life scenarios. The implementation of the ETL pipeline can be found here.
All configurations required to get the different services to work properly can be found in the infra directory.
Sling leverages an env.yaml file in the sling home directory to help users manage connections in an easy to scale manner. The .sling directory is set to be the home directory for sling so it can be managed as part of version control. The Dockerfile used to build our API service is saved in the Dockerfiles sub-folder. The atmoz/sftp image uses a users.conf file to manage sftp users username/password pairs and this is saved in the sftp sub-folder. The Trino sub-folder is mounted as a volume for the Trino container to allow us configure the catalog through property files like iceberg.properties. I’ve also added some scripts to show how you can start up Trino and load data into Iceberg on startup.
To ease the process of setting up the lakehouse, I’ve created a makefile with commands to automate most of the manual work required in having the system up and running.
- After cloning the repository, start by executing `make lakehouse-setup` this will create the virtual environment on first run, install the required libraries and create the env files needed by the system.
- Open the env file at ./dagster/.env and fill it with:
- DESTINATION__FILESYSTEM__BUCKET_URL — The bucket dlt will stage files in before writing it to Iceberg tables.
- DESTINATION__FILESYSTEM__CREDENTIALS__AWS_ACCESS_KEY_ID — The AWS access key which we’ve already configured to Minio in the docker-compose file.
- DESTINATION__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY — The AWS secret key which we have already configured to be minio123 in the docker-compose file.
- DESTINATION__FILESYSTEM__CREDENTIALS__ENDPOINT_URL — Since we are using Minio which is an s3 compatible storage in place of S3, we must set this value to a URL that lets us call the Minio API.
- DESTINATION__DREMIO__STAGING_DATA_SOURCE — This can be the name of your connection in DREMIO to AWS S3 (Minio in our case) whatever you name the S3 source connection is the value you provide here.
- DESTINATION__DREMIO__CREDENTIALS__DATABASE — This is the name used to configure the connection to Nessie in Dremio.
- DESTINATION__DREMIO__CREDENTIALS__PASSWORD — The Dremio account password.
- DESTINATION__DREMIO__CREDENTIALS__USERNAME —The Dremio account username.
- DESTINATION__DREMIO__CREDENTIALS__HOST — The Dremio host which can be set to localhost in our setup.
- DESTINATION__DREMIO__CREDENTIALS__PORT — The port we bound to the Dremio’s container port.
- SLING_HOME_DIR — This value helps us to manage our Sling connections using the env.yaml approach.
- SLING_TRINO_CONNECTION_URL — The sling Trino URL.
- SLING_SFTP_CONNECTION_URL — The sftp connection URL.
- Note that variables with double underscores should not be carelessly modified as the convention is deliberate and used by dlthub to correctly resolve environment variables. You can learn more about this concept here.
- Open the env file at ./infra/.sling/env.yaml and provide the connection credentials. The values should be the values below since they’re managed via the docker-compose and the values at ./infra/sftp/users.conf:
connections:
SFTP_CONN:
type: sftp
host: localhost
password: password1
port: "2222"
user: user1
TRINO_CONN:
type: trino
http_url:
http://trino:@localhost:8080?catalog=iceberg&schema=customers
variables: {}
Trino is configured to not require a password and so your URL only contains the username.
- Start the lakehouse services with `make lakehouse-init`.
- Open Dremio in your web browser http://0.0.0.0:9047/ and sign up, ensure the username and password used, matches the values you set in your DESTINATION__DREMIO__CREDENTIALS__USERNAME and DESTINATION__DREMIO__CREDENTIALS__PASSWORD env values.
- Create a connection to Nessie. Ensure your connection name is the same as the value for DESTINATION__DREMIO__CREDENTIALS__DATABASE in your .env.
- Ensure that the authentication is set to None.
- Set the endpoint URL to http://nessie:19120/api/v2.
- In the storage tab, set the storage provider to AWS.
- Set authentication method to AWS Access Key.
- Set AWS Access and Access Secret keys to the values you configured for DESTINATION__FILESYSTEM__CREDENTIALS__AWS_ACCESS_KEY_ID and DESTINATION__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY which should match the values in the Minio service environment variable values in the docker-compose file.
- For the connection properties in the storage tab, set the following values:
- fs.s3a.path.style.access : True
- fs.s3a.endpoint : minio:9000
- dremio.s3.compat : true
- Uncheck the encrypt connection checkbox.
- Create connection to Minio in Dremio.
- Ensure the connection name is same as the value of DESTINATION__DREMIO__STAGING_DATA_SOURCE env variable, it is how dlt knows what source to reference for object storage in Dremio.
- Set authentication method to AWS Access Key.
- Set AWS Access and Access Secret keys to the values you configured for DESTINATION__FILESYSTEM__CREDENTIALS__AWS_ACCESS_KEY_ID and DESTINATION__FILESYSTEM__CREDENTIALS__AWS_SECRET_ACCESS_KEY which should match the values in the Minio service environment variable values in the docker-compose file.
- Uncheck the encrypt connection checkbox.
- Enable compatibility mode.
- Set the root path to /.
- Set the default CTAS format to ICEBERG.
- Add the following connection properties:
- fs.s3a.path.style.access : True
- fs.s3a.endpoint : minio:9000
- Start the Dagster server with `make lakehouse-serve`.
- Open Dagster at http://127.0.0.1:3000/ which should land you on the default view, the click on view global assets lineage on the top right corner and click on materialize all to run both Sling and dlt.
Click Save and this should connect Dremio to Nessie.
Click Save.
You can now query your iceberg tables in Dremio.
Trying it for Yourself
In this article, I’ve shared how I explored Apache Iceberg, designed an ingestion pipeline, and walked you through the code showing you how to leverage Dagster for building ingestion pipelines for your data lakehouses. Hopefully this has been an effective demonstration that allows you to try it on your own.
Have questions about my process or how your organization can benefit from this configuration? Contact us. Our data experts would be happy to help you with this setup or to discuss potential use cases for it.