Table Stream records incremental changes (INSERT / UPDATE / DELETE) on a table. Zettapark lets you read a Stream directly with session.table(), treating the change data as a DataFrame to build incremental ETL pipelines.
Prerequisites
from clickzetta.zettapark.session import Session
from clickzetta.zettapark import functions as F
session = Session.builder.configs({
"username": "your_username",
"password": "your_password",
"service": "cn-shanghai-alicloud.api.singdata.com",
"instance": "your_instance",
"workspace": "your_workspace",
"schema": "public",
"vcluster": "default"
}).create()
Create the Source Table and Stream
Create the source table and enable change tracking:
session.sql("""
CREATE TABLE IF NOT EXISTS orders (
id INT, name STRING, amount DECIMAL(10,2), status STRING
)
""").collect()
session.sql("""
ALTER TABLE orders SET PROPERTIES ('change_tracking' = 'true')
""").collect()
session.sql("INSERT INTO orders VALUES (4, 'Dave', 3000.00, 'active')").collect()
session.sql("UPDATE orders SET amount = 1500.00 WHERE id = 1").collect()
session.sql("DELETE FROM orders WHERE id = 3").collect()
After consumption, the Stream offset advances automatically. The next read returns only new changes.
For production, use MERGE INTO for upsert semantics:
Write Stream changes to a staging table, then merge with SQL MERGE INTO:
session.sql("""
MERGE INTO orders_target AS t
USING orders_changes_tmp AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET
t.name = s.name, t.amount = s.amount, t.status = s.status
WHEN NOT MATCHED THEN INSERT (id, name, amount, status)
VALUES (s.id, s.name, s.amount, s.status)
""").collect()
session.sql("DROP TABLE IF EXISTS orders_changes_tmp").collect()
Notes
Stream offset advancement: After each read of the Stream followed by an action (collect(), save_as_table(), etc.), the offset advances automatically. The next read returns only new changes.
UPDATE produces two records: UPDATE_BEFORE (pre-update) and UPDATE_AFTER (post-update). You typically only need UPDATE_AFTER.
Stream does not store data: A Stream is a cursor object. Data remains in the source table and no extra storage is consumed.
change_tracking cannot be set at table creation: You must enable it after table creation using ALTER TABLE SET PROPERTIES.