PySpark → ZettaPark Migration in Practice: F1 Racing Data Engineering Project

If you're familiar with PySpark, the learning curve for migrating to ZettaPark is much lower than you might expect. ZettaPark's DataFrame API is highly compatible with PySpark — select, filter, join, Window functions, and aggregation functions all use identical syntax. You don't need to learn a new API or rewrite your business logic. The changes are limited to 4 environment configuration points.

This article validates that claim with a real project: a complete migration of an F1 racing data engineering project built on the Apache Spark stack (Azure Databricks + PySpark) to Singdata Lakehouse, passing all 71 automated validation checks.

Full code on GitHub: spark2lakehouse-formula1


Original Project

spark2lakehouse-formula1 is forked from FerhattSimsekk/formula1-data-engineering. The original stack is Azure Databricks + PySpark + Delta Lake. The project implements a complete data engineering pipeline — from API download and data ingestion through multi-table join transformations to analytical layer output — covering all 125 races across the 2018–2023 seasons. It includes 8 core tables: circuits, drivers, constructors, race results, pit stops, lap times, and qualifying results, totaling approximately 140,000 rows.

The migrated code lives in the 03_lakehouse/ directory and can be compared file-by-file with 01_spark/.

Technology Stack Comparison

Original ProjectAfter Migration
Compute EngineApache Spark (Azure Databricks)Singdata Lakehouse
DataFrame APIPySpark (pyspark.sql)ZettaPark (clickzetta.zettapark)
Storage FormatDelta LakeLakehouse native tables
File StorageADLS (/mnt/... mount)Volume (vol://schema.vol/...)
Session Managementspark (Databricks global injection)Session.builder.configs({}).create()
SQL Executionspark.sql(q) executes immediatelysession.sql(q).collect() triggers execution
Partitioned Writedf.write.partitionBy("col")PARTITIONED BY (col TYPE) DDL
Temporary ViewsCREATE TEMP VIEWdf.create_or_replace_temp_view()
Runtime EnvironmentDatabricks NotebookLocal Python script / Jupyter

The differences in the table look significant, but their impact on migration effort is minimal. What changes is the runtime environment and the API package name — the data processing logic itself stays the same. 90% of the code can be reused directly. The 4 required changes are all mechanical replacements with no business logic rewriting. DataFrame operations (select, filter, join, groupBy, Window functions, aggregation functions) have identical syntax and require no changes.



Conclusion First

90% of PySpark code can be reused directly. Changes are limited to 4 areas.

ChangeEffortNotes
Import path replacementVery lowMechanical replacement, no logic changes
Explicit Session creationVery lowDatabricks injects spark globally; ZettaPark requires explicit creation
Add .collect() to DDL/DMLLowLazy execution — without it, nothing runs
File path formatLow/mnt/...vol://schema.vol/...

Window functions, aggregations, JOINs, filters, sorting — the core operations of data engineering all have identical syntax and need no changes.


Project Background

The data source is the F1 racing API (Jolpica), covering 125 races across the 2018–2023 seasons. The data architecture has two layers: f1_processed (ingestion layer) and f1_presentation (analytical layer).

TableRowsDescription
circuits78Circuits across 34 countries
races125Full 2018–2023 seasons
drivers37Race drivers
constructors15Race teams
results2,500Race results
pit_stops4,294Pit stop records
lap_times134,957Lap times
qualifying2,497Qualifying results

Migration Steps

Step 1: Replace Import Paths

This is the only mechanical replacement — a global find-and-replace in your editor:

# Before (PySpark) from pyspark.sql import functions as F from pyspark.sql.window import Window from pyspark.sql.types import StructType, StructField, StringType, IntegerType # After (ZettaPark) from clickzetta.zettapark import functions as F from clickzetta.zettapark.window import Window from clickzetta.zettapark.types import StructType, StructField, StringType, IntegerType

Step 2: Replace Session Creation

In Databricks, spark is globally injected and requires no setup. ZettaPark requires explicit Session creation:

# Before (Databricks — spark available globally) df = spark.read.csv("/mnt/formula1dltr/raw/circuits.csv", header=True) # After (ZettaPark) from clickzetta.zettapark.session import Session from dotenv import load_dotenv import os load_dotenv() session = Session.builder.configs({ "username": os.environ["CLICKZETTA_USERNAME"], "password": os.environ["CLICKZETTA_PASSWORD"], "service": os.environ["CLICKZETTA_SERVICE"], "instance": os.environ["CLICKZETTA_INSTANCE"], "workspace": os.environ["CLICKZETTA_WORKSPACE"], "schema": "f1_processed", "vcluster": os.environ.get("CLICKZETTA_VCLUSTER", "default_ap"), }).create() df = session.read.option("header", "true").csv("vol://f1_raw.formula1_raw_vol/raw/circuits.csv")

Step 3: Add .collect() to DDL/DML

ZettaPark's session.sql() is lazy — it returns a DataFrame object and won't actually execute without an action. This is the easiest thing to miss.

# Before (PySpark — executes immediately) spark.sql("MERGE INTO f1_processed.circuits tgt USING src ON ...") spark.sql("CREATE TABLE f1_processed.circuits AS SELECT ...") # After (ZettaPark — must add .collect()) session.sql("MERGE INTO f1_processed.circuits tgt USING src ON ...").collect() session.sql("CREATE TABLE f1_processed.circuits AS SELECT ...").collect()

For SELECT queries it's not strictly required, but adding .collect() when you just want to trigger execution (without needing the result) is good practice:

# Query and retrieve results rows = session.sql("SELECT COUNT(*) FROM f1_processed.circuits").collect() print(rows[0][0]) # 78

Step 4: Replace File Paths

The original project uses ADLS mount paths. ZettaPark uses Volume paths:

# Before "/mnt/formula1dltr/raw/circuits.csv" "/mnt/formula1dltr/processed/circuits" # After "vol://f1_raw.formula1_raw_vol/raw/circuits.csv" # For writes, use save_as_table directly — no path needed

Uploading local files to a Volume:

session.file.put( "/tmp/circuits.csv", "vol://f1_raw.formula1_raw_vol/raw/circuits.csv", auto_compress=False, overwrite=True )


Fully Compatible APIs

The following APIs were used in this project and require no modifications:

# Filter and select df.filter(F.col("race_year") == 2023) df.select("race_id", "race_year", "circuit_ref") # JOIN (multi-table) results_df.join(drivers_df, results_df["driver_id"] == drivers_df["driver_ref"]) .join(races_df, results_df["race_id"] == races_df["race_id"]) # Aggregation df.groupBy("race_year", "driver_name") \ .agg(F.sum("points").alias("total_points"), F.count("*").alias("total_races"), F.sum(F.when(F.col("position") == 1, 1).otherwise(0)).alias("wins")) # Window functions window_spec = Window.partitionBy("race_year").orderBy( F.desc("total_points"), F.desc("wins") ) df.select( "race_year", "driver_name", "total_points", "wins", F.rank().over(window_spec).alias("rank") ) # MERGE INTO (standard SQL — fully compatible) session.sql(""" MERGE INTO f1_processed.circuits tgt USING _merge_src src ON tgt.circuit_id = src.circuit_id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """).collect()


4 Real Gotchas from the Migration

These are issues encountered during the actual migration — not documented anywhere, only discovered by running the code.

Gotcha 1: withColumn Triggers Server-Side Schema Resolution in Older Versions

Symptom: After calling df.withColumn("rank", F.rank().over(window_spec)), column names become internal-prefix formats like r_f6fw_race_id, or the call throws an error outright.

Root cause: In older versions of ZettaPark, withColumn calls self._output, which triggers a server-side schema pre-resolution request. Under certain conditions this returns column names with internal prefixes.

Fix: Use select() + .alias() to handle all column operations in one pass:

# Problematic (older versions) df = df.withColumn("rank", F.rank().over(window_spec)) # Compatible with all versions df = df.select( "race_year", "driver_name", "total_points", "wins", F.rank().over(window_spec).alias("rank"), )

Gotcha 2: After Multi-Table JOINs, select() Must Explicitly Specify Column Sources

Symptom: After a three-table JOIN, calling select("race_id", "driver_name", ...) throws an error, or the written table has column names with internal prefixes.

Root cause: In older versions of ZettaPark, after multi-table JOINs, columns with the same name from different sources get internal hash prefixes (e.g., r_f6fw_race_id) to disambiguate them.

Fix: After a JOIN, explicitly specify the source DataFrame and .alias() for every column in select():

final_df = ( results_df .join(race_circuits_df, results_df["race_id"] == race_circuits_df["race_id"]) .join(drivers_df, results_df["driver_id"] == drivers_df["driver_ref"]) .join(constructors_df, results_df["constructor_id"] == constructors_df["constructor_ref"]) .select( race_circuits_df["race_id"].alias("race_id"), race_circuits_df["race_year"].alias("race_year"), race_circuits_df["circuit_location"].alias("location"), drivers_df["name"].alias("driver_name"), drivers_df["nationality"].alias("driver_nationality"), constructors_df["name"].alias("team"), results_df["grid"].alias("grid"), results_df["fastest_lap"].alias("fastest_lap"), results_df["time"].alias("race_time"), results_df["points"].alias("points"), results_df["position"].alias("position"), results_df["file_date"].alias("file_date"), ) )

Gotcha 3: CREATE TEMP VIEW Is Not Supported

Symptom: session.sql("CREATE TEMP VIEW v AS SELECT ...").collect() throws: CZLH-42000 Syntax error - missing KW_ENDPOINT at 'VIEW'.

Root cause: Singdata Lakehouse SQL does not support the CREATE TEMP VIEW syntax.

Fix: Use the DataFrame's create_or_replace_temp_view() method, which behaves identically to PySpark's createOrReplaceTempView:

# Not supported session.sql("CREATE TEMP VIEW race_result_updated AS SELECT ...").collect() # Correct approach df.create_or_replace_temp_view("race_result_updated") session.sql("SELECT * FROM race_result_updated WHERE ...").collect()

Gotcha 4: Column Name Ambiguity in MERGE INTO VALUES Clause

Symptom: In a MERGE INTO statement's WHEN NOT MATCHED THEN INSERT ... VALUES (...), bare column names in the VALUES clause are resolved as columns from the target table (tgt), resulting in null inserts.

Root cause: In Lakehouse SQL, bare column names in the VALUES clause resolve to tgt, not src.

Fix: Prefix all column names in the VALUES clause with src.:

session.sql(""" MERGE INTO f1_presentation.calculated_race_results tgt USING race_result_updated src ON (tgt.driver_id = src.driver_id AND tgt.race_id = src.race_id) WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT (race_year, team_name, driver_id, driver_name, race_id, position, points, calculated_points, created_date) VALUES (src.race_year, src.team_name, src.driver_id, src.driver_name, src.race_id, src.position, src.points, src.calculated_points, src.created_date) """).collect()


Incremental Write Wrapper: merge_delta_data

The original PySpark project uses overwrite_partition() for incremental writes. The ZettaPark equivalent is merge_delta_data(), which implements upsert semantics using MERGE INTO:

def merge_delta_data(input_df, db_name, table_name, merge_condition, partition_column): session = input_df.session full_name = f"{db_name}.{table_name}" # Check if table exists table_exists = False try: session.sql(f"DESCRIBE TABLE {full_name}").collect() table_exists = True except Exception: pass # Table doesn't exist — proceed to create it if table_exists: input_df.create_or_replace_temp_view("_merge_src") session.sql(f""" MERGE INTO {full_name} tgt USING _merge_src src ON {merge_condition} WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """).collect() else: _create_partitioned_table(session, full_name, input_df, partition_column)

The benefit of this wrapper: the same code creates the table on first run and performs incremental updates on subsequent runs — no need for the caller to check whether the table exists.

Partitioned table creation requires hand-written DDL because ZettaPark's save_as_table does not support a partition_by parameter:

def _create_partitioned_table(session, full_name, input_df, partition_column): fields = input_df.schema.fields def sql_type(dt): name = type(dt).__name__ mapping = { "LongType": "BIGINT", "IntegerType": "INT", "ShortType": "SMALLINT", "DoubleType": "DOUBLE", "FloatType": "FLOAT", "StringType": "STRING", "BooleanType": "BOOLEAN", "DateType": "DATE", "TimestampType": "TIMESTAMP", } if name == "DecimalType": return f"DECIMAL({dt.precision},{dt.scale})" return mapping.get(name, "STRING") col_defs = ", ".join( f"{f.name.strip('`')} {sql_type(f.datatype)}" for f in fields if f.name.strip('`') != partition_column ) part_field = next(f for f in fields if f.name.strip('`') == partition_column) part_def = f"{part_field.name.strip('`')} {sql_type(part_field.datatype)}" session.sql( f"CREATE TABLE {full_name} ({col_defs}) PARTITIONED BY ({part_def})" ).collect() # Must use explicit column names — SELECT * won't work here. # Reason: create_or_replace_temp_view normalizes column names (race_id → raceid). # Explicit column names match by position, bypassing this issue. all_cols = ", ".join(f.name.strip('`') for f in fields) input_df.create_or_replace_temp_view("_insert_src") session.sql( f"INSERT INTO {full_name} ({all_cols}) SELECT {all_cols} FROM _insert_src" ).collect()


Data Source Compatibility Issues

The original project used the Ergast API (now discontinued). This project switches to the Jolpica API. A few issues here are unrelated to ZettaPark but must be addressed during migration.

Primary Key Types Changed

The original data used integer IDs; the Jolpica API uses string refs (e.g., "hamilton", "mercedes"). JOIN conditions need to be updated accordingly:

# Original (integer IDs) results_df.join(drivers_df, results_df["driverId"] == drivers_df["driverId"]) # Jolpica data (string refs) results_df.join(drivers_df, results_df["driver_id"] == drivers_df["driver_ref"]) results_df.join(constructors_df, results_df["constructor_id"] == constructors_df["constructor_ref"])

Global Endpoint Returns Only 100 Records

Jolpica's /drivers.json?limit=1000 actually returns only 100 records (alphabetically sorted), which means major 2018–2023 participants like Hamilton, Vettel, Mercedes, and Red Bull are all missing.

Fix: use the per-season endpoint and download by season, then merge and deduplicate:

SEASONS = list(range(2018, 2024)) seen_drivers = {} for season in SEASONS: url = f"https://api.jolpi.ca/ergast/f1/{season}/drivers.json?limit=100" data = fetch_json(url) for d in data["MRData"]["DriverTable"]["Drivers"]: ref = d["driverId"] if ref not in seen_drivers: seen_drivers[ref] = { "driverId": len(seen_drivers) + 1, "driverRef": ref, "forename": d["givenName"], "surname": d["familyName"], "nationality": d.get("nationality", ""), # ... }

The symptom of this issue is that f1_presentation.race_results contains only 66 rows (should be 2,500), because most driver_id values can't find a matching driver_ref during the JOIN.


End-to-End Validation

After migration, data correctness was verified with 71 automated checks:

cd 03_lakehouse python ../04_validate.py

Validation covers 5 dimensions:

  1. Row count checks: All 12 tables have data with expected row counts
  2. NULL checks: Key columns (primary keys, foreign keys, business fields) have no null values
  3. Duplicate detection: No duplicate driver per race, no duplicate driver_ref
  4. Business rules: position is between 1–20, points are non-negative, calculated_points = 11 - position, rank minimum is 1
  5. Cross-layer consistency: driver_standings.total_points equals the year-aggregated result from race_results

Actual result: 71/71 all passed.


Full Compatibility Reference

APIPySparkZettaParkStatus
df.select()Fully compatible
df.filter() / df.where()Fully compatible
df.join(other, cond, how)Fully compatible
df.groupBy().agg()Fully compatible
df.sort() / df.orderBy()Fully compatible
df.limit(n) / df.count()Fully compatible
df.show() / df.collect()Fully compatible
df.dropDuplicates(keys)Fully compatible
df.union(other) / df.distinct()Fully compatible
F.col() / F.lit() / F.when().otherwise()Fully compatible
F.sum() / F.count() / F.avg() / F.max() / F.min()Fully compatible
F.rank() / F.dense_rank() / F.row_number()Fully compatible
F.current_timestamp() / F.current_date()Fully compatible
F.concat() / F.trim() / F.upper() / F.lower()Fully compatible
F.isNull() / F.isNotNull()Fully compatible
Window.partitionBy().orderBy()Fully compatible
MERGE INTO (SQL)Fully compatible
df.withColumn()✅ Fixed in 0.1.5Use select().alias() in older versions
session.sql() DDL/DMLImmediateLazy — requires .collect()⚠️ Must change
df.write.partitionBy()❌ Not supportedUse PARTITIONED BY DDL instead
CREATE TEMP VIEW (SQL)❌ Not supportedUse create_or_replace_temp_view() instead
File paths/mnt/...vol://schema.vol/...⚠️ Different format
Import pathspyspark.sqlclickzetta.zettapark⚠️ Mechanical replacement

Migration Conclusion

ZettaPark is highly compatible with PySpark's DataFrame API. The core migration effort is in environment adaptation, not business logic rewriting.

Fully compatible (no changes needed):

  • DataFrame operations: select, filter, join, groupBy, agg, sort, limit, union, distinct
  • Function library: F.col(), F.when().otherwise(), F.sum/count/avg/max/min, F.rank/dense_rank/row_number, F.concat/trim/upper/lower, F.current_timestamp()
  • Window functions: Window.partitionBy().orderBy() syntax is identical
  • SQL syntax: MERGE INTO, UPDATE, DELETE, INSERT, CTE are fully compatible

4 required changes:

ChangeReasonEffort
Import path pyspark.sqlclickzetta.zettaparkDifferent package nameVery low — global replace
Explicit Session creationDatabricks injects globally; ZettaPark requires explicit creationVery low — one-time
Add .collect() to DDL/DMLZettaPark is lazy — without it, nothing executesLow — check each statement
File path /mnt/...vol://...Different storage systemLow — format replacement

This project passed 71/71 validation checks, confirming that data integrity after migration is fully consistent with the original Databricks version.


References