Description

Table Stream is a real-time data stream used to record data manipulation language (DML) changes made to a table, including insert, update, and delete operations. At the same time, Table Stream also provides metadata about each change, so you can take appropriate actions based on the changed data. Table Stream can be created on Table, Dynamic Table, and Materialized View.

Table Stream Syntax

CREATE [OR REPLACE] TABLE STREAM [IF NOT EXISTS]
  <name>
  ON TABLE <table_name>
  [TIMESTAMP AS OF timestamp_expression]
  [COMMENT = '<string_literal>']
  WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY|STANDARD','SHOW_INITIAL_ROWS'='TRUE|FALSE');
  • <name>: The name of the Table Stream.

  • <table_name>: The base table from which to get the incremental data. Views are not supported.

  • TIMESTAMP AS OF timestamp_expression: (Optional) Specifies the timestamp expression from which the Table Stream should start receiving updates from the underlying table. If this parameter is omitted, the Table Stream will start receiving updates from the current time.

    • The timestamp_expression returns a standard timestamp type expression. The earliest timestamp specified by TIMESTAMP AS OF depends on the TIME TRAVEL(data_retention_days) parameter. If the specified version does not exist, an error will be reported. If not specified, the current timestamp version data will be used, for example:
      • '2023-11-07 14:49:18', a string that can be cast to a timestamp.
      • cast('2023-11-07 14:49:18 Asia/Shanghai' as timestamp).
      • current_timestamp() - interval '12' hours.
      • Any other expression that is itself a timestamp or can be cast to a timestamp.
  • COMMENT: (Optional) Comment for the Table Stream.

  • 'TABLE_STREAM_MODE' = 'APPEND_ONLY|STANDARD': (Required) Choose one of the two values, APPEND_ONLY or STANDARD.

    • APPEND_ONLY records only the INSERT operations of the object. Update and delete operations are not recorded. For example, if 10 rows were initially inserted into the table and then 5 rows were deleted without moving the pointer, the Table Stream would still record 10 rows of operations.
    • STANDARD mode: In STANDARD mode, all DML changes to the source object can be tracked, including inserts, updates, and deletes (including table truncation). This row-level change is provided by joining and processing all delta data changes. The delta changes in the Table Stream refer to the data changes that occur between two transaction points. For example, if a row is inserted and then updated after the Table Stream's offset, the delta change is a new row. If a row is inserted and then deleted after the stream's offset, the delta change is no row. In other words, the delta change reflects the latest state of the source object, not the historical changes.
  • SHOW_INITIAL_ROWS: Optional parameter. When specified as TRUE, the stream records the version of the table at the time of creation. The first time the stream is consumed, it always takes the version at the time of creation, and all data is in insert mode. After the first consumption, subsequent behavior is the same as before, with delta data between versions.

Notes

  • Before creating a Table Stream, the following operations must be performed on the base table:
ALTER TABLE table_name set PROPERTIES ('change_tracking' = 'true');
  • The earliest timestamp specified by TIMESTAMP AS OF depends on the TIME TRAVEL(data_retention_days) parameter. If the specified version does not exist, an error will be reported. This parameter defines the length of time that deleted data is retained. By default, Lakehouse retains data for one day. Depending on your business needs, you can extend or shorten the data retention period by adjusting the data_retention_days parameter. Please note that adjusting the data retention period may affect storage costs. Extending the retention period will increase storage requirements, which may increase related costs.
  • Data written through real-time uploads can only be read after one minute, and Table Stream can only read committed data. Data written by real-time tasks needs to wait for 1 minute to be confirmed, so Table Stream also needs to wait for 1 minute to see it.

Use Cases

Case 1: Create a Table Stream in APPEND_ONLY mode

-- Clean up the environment
DROP      TABLE IF EXISTS data_change_test;

DROP      TABLE STREAM IF EXISTS data_change_test_stream;

-- Create a test table
CREATE    TABLE data_change_test (id int, name string);

INSERT    INTO data_change_test
VALUES    (1, 'apple');

-- Create a TABLE stream on data_change_test to capture incremental records inserted from the current time
ALTER     TABLE data_change_test set PROPERTIES ('change_tracking' = 'true');

CREATE    TABLE STREAM data_change_test_stream ON TABLE data_change_test
WITH      PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY');

-- Insert test data
INSERT    INTO data_change_test
VALUES    (2, 'banana');

-- View STREAM data
SELECT    *
FROM      data_change_test_stream;

Case 2: Create a Table Stream in STANDARD Mode

-- Clean up the environment
DROP      TABLE IF EXISTS data_change_test;

DROP      TABLE STREAM IF EXISTS data_change_test_stream;

-- Create test table
CREATE    TABLE data_change_test (id int, name string);

INSERT    INTO data_change_test
VALUES    (1, 'apple');

-- Enable change_tracking on data_change_test
ALTER     TABLE data_change_test set PROPERTIES ('change_tracking' = 'true');

-- Create Table Stream in STANDARD mode
CREATE    TABLE STREAM data_change_test_stream ON TABLE data_change_test
WITH      PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD');

-- Insert test data
INSERT    INTO data_change_test
VALUES    (2, 'banana');

-- Update test data
update data_change_test set name = 'orange'
WHERE     id = 2;

-- Delete test data
DELETE
FROM      data_change_test
WHERE     id = 1;

-- View stream data
SELECT    *
FROM      data_change_test_stream;