Skip to main content

4 posts tagged with "spark"

View All Tags

Starlake Iceberg integration

· 3 min read
Hayssam Saleh
Starlake Core Team

Introduction

This is a quick explanation on how to use Starlake to load data into Iceberg and how to use Iceberg tables to run transformations with Starlake. To make sure it works, we will query those tables with duckdb.

Project setup

We will use the starlake bootstrap command to create a new project.

$ mkdir starlake-iceberg
$ cd starlake-iceberg
$ starlake bootstrap

This will create a new project with a default configuration.

Let's update the application.sl.yml file to use iceberg. We do not need to include any library since iceberg jars are distributed with Starlake.

application:
defaultWriteFormat: iceberg
spark:
sql.extensions: "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
sql.catalog.spark_catalog: org.apache.iceberg.spark.SparkSessionCatalog
sql.catalog.spark_catalog.type: hadoop
sql.catalog.local: org.apache.iceberg.spark.SparkCatalog
sql.catalog.local.type: hadoop
sql.catalog.spark_catalog.warehouse: "{{SL_ROOT}}/warehouse"
sql.catalog.local.warehouse: "{{SL_ROOT}}/warehouse"
sql.defaultCatalog: local

iceberg metadata is stored in the warehouse folder.

By default, Starlake will use the duckdb connection. Let's define a new connection for iceberg. Create the file env.ICEBERG.sl.yml in the metadata directory, with the following content:

version: 1
env:
activeConnection: spark_local

Loading data into Iceberg

The bootstrap comes with sample files. We just need to run the following command to load the data into iceberg.

$ export SL_ENV=ICEBERG # to use definitions in the env.ICEBERG.sl.yml file
$ starlake autoload

That's it! We have loaded the data into iceberg.

Run transformations with Starlake on Iceberg tables

Let's create a new transformation.

$ mkdir metadata/transform/kpi
$ touch metadata/transform/kpi/revenue_summary.sql

Edit the file metadata/transform/kpi/revenue_summary.sql with the following content:

SELECT
o.order_id,
o.timestamp AS order_date,
SUM(ol.quantity * ol.sale_price) AS total_revenue
FROM
starbake.orders o
JOIN starbake.order_lines ol ON o.order_id = ol.order_id
GROUP BY
o.order_id, o.timestamp

Let's first preview the results of the transformation.

$ starlake transform  --name kpi.revenue_summary --interactive table
+--------+-----------------------+------------------+
|order_id| order_date| total_revenue|
+--------+-----------------------+------------------+
| 40|2024-02-11 06:49:28.665| 68.24|
| 8|2024-01-23 20:47:53.667| 8.68|
| 27|2024-02-26 01:12:45.282| 30.0|
| 46|2024-02-10 18:27:05.732| 45.0|
| 56|2024-01-30 07:33:08.621| 75.0|
| 35|2024-01-17 00:30:21.277| 18.18|
| 3|2024-02-10 23:10:30.685| 16.84|
| 54|2024-02-05 08:03:21.197| 115.64|
| 48|2024-02-17 10:05:36.367| 17.06|
| 45|2024-01-16 04:21:01.494| 45.44|
| 98| 2024-01-16 10:47:28.92| 72.72|
| 78| 2024-01-07 07:48:02.53| 45.0|
| ...
+--------+-----------------------+------------------+

Now, let's run the transformation.

$ starlake transform  --name kpi.revenue_summary

You should see the loaded data in the iceberg tables and the transformation results in the kpi.revenue_summary iceberg table. The structure is the following:

warehouse/
├── audit
│   ├── audit
│   └── rejected
├── kpi
│   └── revenue_summary
└── starbake
├── order_lines
├── orders
└── products

Querying data

To query the data, we can use duckdb.

$ duckdb
v1.1.1 af39bd0dcf
Enter ".help" for usage hints.
Connected to a transient in-memory database.
Use ".open FILENAME" to reopen on a persistent database.

> INSTALL iceberg
> LOAD iceberg
> WITH o as (
SELECT * FROM iceberg_scan('warehouse/starbake/orders')
),
ol as (
SELECT * FROM iceberg_scan('warehouse/starbake/order_lines')
)
SELECT
o.order_id,
o.timestamp AS order_date,
SUM(ol.quantity * ol.sale_price) AS total_revenue
FROM
o
JOIN ol ON o.order_id = ol.order_id
GROUP BY
o.order_id, o.timestamp;

That's it! We have queried the data from iceberg using duckdb.

Full code is available here.

Polars versus Spark

· 6 min read
Hayssam Saleh
Starlake Core Team

Introduction

Polars is often compared to Spark. In this post, I will highlight the main differences and the best use cases for each in my data engineering activities.

As a Data Engineer, I primarily focus on the following goals:

  1. Parsing files, validating their input, and loading the data into the target data warehouse.
  2. Once the data is loaded, applying transformations by joining and aggregating the data to build KPIs.

However, on a daily basis, I also need to develop on my laptop and test my work locally before delivering it to the CI pipeline and then to production.

What about my fellow data scientist colleagues? They need to run their workload on production data through their favorite notebook environment.

Handling Dynamic Partitioning and Merge with Spark on BigQuery

· 7 min read
Hayssam Saleh
Starlake Core Team

Data Loading strategies

When loading data into BigQuery, you may want to:

  • Overwrite the existing data and replace it with the incoming data.
  • Append incoming data to existing
  • Dynamic partition Overwrite where only the partitions to which the incoming data belong to are overwritten.
  • Merge incoming data with existing data by keeping the newest version of each record.

For performance reasons, when having huge amount of data, tables are usually split into multiple partitions. BigQuery supports range partitioning which are uncommon and date/time partitioning which is the most widely used type of partitioning.