Zettapark Dynamic Table Guide

Dynamic Table is an incremental computation object in Singdata Lakehouse. You define the transformation logic with a DataFrame, and the system automatically detects upstream changes and refreshes the result incrementally — no manual scheduling required. Zettapark creates Dynamic Tables directly from a DataFrame via create_or_replace_dynamic_table().


Prerequisites

from clickzetta.zettapark.session import Session from clickzetta.zettapark import functions as F session = Session.builder.configs({ "username": "your_username", "password": "your_password", "service": "cn-shanghai-alicloud.api.singdata.com", "instance": "your_instance", "workspace": "your_workspace", "schema": "public", "vcluster": "default" }).create()

Create the source table and insert test data:

session.sql(""" CREATE TABLE IF NOT EXISTS orders ( id INT, user_id INT, category STRING, amount DECIMAL(10,2), status STRING, order_date STRING ) """).collect() session.sql(""" INSERT INTO orders VALUES (1, 101, 'A', 100.0, 'paid', '2024-01-15'), (2, 102, 'A', 200.0, 'paid', '2024-01-15'), (3, 101, 'B', 300.0, 'pending', '2024-01-16'), (4, 103, 'B', 150.0, 'paid', '2024-01-16') """).collect()


Create a Dynamic Table

Aggregation Summary

source = session.table("orders")

Define the transformation logic:

agg_df = source.group_by("category").agg( F.count(F.col("id")).alias("order_count"), F.sum(F.col("amount")).alias("total_amount"), F.avg(F.col("amount")).alias("avg_amount") )

Create the Dynamic Table with automatic refresh every minute:

agg_df.create_or_replace_dynamic_table( "orders_summary", lag="1 minute", # refresh interval warehouse="default" # compute cluster to use ) session.table("orders_summary").show()

+--------+-----------+------------+----------+ |category|order_count|total_amount|avg_amount| +--------+-----------+------------+----------+ | A| 2| 300.00| 150.00| | B| 2| 450.00| 225.00| +--------+-----------+------------+----------+

Filter + Computed Columns

filtered_df = source \ .filter(F.col("amount") > 150) \ .with_column("amount_tax", F.col("amount") * 1.13) \ .with_column("year", F.year(F.to_date(F.col("order_date")))) filtered_df.create_or_replace_dynamic_table( "orders_filtered", lag="5 minutes", warehouse="default" )

Multi-table Join

Create the users table first:

session.sql(""" CREATE TABLE IF NOT EXISTS users ( user_id INT, name STRING, city STRING ) """).collect() session.sql(""" INSERT INTO users VALUES (101, 'Alice', 'Beijing'), (102, 'Bob', 'Shanghai'), (103, 'Carol', 'Guangzhou') """).collect()

orders = session.table("orders") users = session.table("users") paid = orders.filter(F.col("status") == "paid") \ .select(F.col("id"), orders["user_id"].alias("o_uid"), F.col("amount"), F.col("order_date"))

Disambiguate duplicate column names before joining:

joined = paid.join(users, paid["o_uid"] == users["user_id"]) \ .select("user_id", "name", "amount", "order_date") joined.create_or_replace_dynamic_table( "paid_orders_with_user", lag="1 minute", warehouse="default" )


Manage Dynamic Tables

Inspect Structure

session.sql("DESC DYNAMIC TABLE orders_summary").collect()

Suspend and Resume

Suspend automatic refresh:

session.sql("ALTER DYNAMIC TABLE orders_summary SUSPEND").collect()

Resume automatic refresh:

session.sql("ALTER DYNAMIC TABLE orders_summary RESUME").collect()

Trigger a Manual Refresh

session.sql("ALTER DYNAMIC TABLE orders_summary REFRESH").collect()

Drop

session.sql("DROP DYNAMIC TABLE IF EXISTS orders_summary").collect()


Dynamic Table vs. Manual Scheduling

AspectDynamic TableStudio SQL Task + Scheduler
DefinitionDataFrame API or SQLSQL script
Refresh triggerSystem auto-detects upstream changesCron schedule or manual trigger
Incremental computationSystem automatically chooses incremental or full refreshIncremental logic must be written manually
Dependency managementUpstream/downstream dependencies resolved automaticallyTask dependencies configured manually
Best forContinuously refreshed aggregations and transformationsComplex business logic requiring precise execution timing

Notes

  • The lag parameter accepts formats such as "1 minute", "5 minutes", and "1 hour". The minimum interval is 1 minute.
  • A newly created Dynamic Table starts empty and is populated after the first refresh.
  • To change a Dynamic Table's definition, use create_or_replace_dynamic_table again (this rebuilds the table).
  • Upstream tables must have change_tracking enabled to support incremental refresh.

DocumentDescription
Dynamic Table OverviewFull Dynamic Table concepts and SQL syntax
Zettapark Table Stream GuideProcessing CDC incremental changes with Zettapark
Zettapark Data Engineering GuideMulti-table joins, window functions, and more