DBT Real-Time Data Pipeline in Practice
Two Real-Time Pipeline Modes
dbt + Singdata Lakehouse supports two real-time data pipeline modes for different scenarios:
| Mode | Mechanism | Use Cases |
|---|---|---|
| Dynamic Table | Declarative SQL; the system automatically refreshes incrementally on a schedule | Aggregation, transformation, multi-layer data warehouse pipelines |
| Table Stream + CDC | Captures row-level changes (INSERT/UPDATE/DELETE) on a table; consumers read the change stream | Audit logs, dimension change tracking, downstream real-time sync |
The two can be combined: use Table Stream to capture source table changes, and use Dynamic Table to aggregate the change data.
Dynamic Table
What Is a Dynamic Table
A Dynamic Table is Singdata Lakehouse's declarative incremental computation object. You only need to define a SQL statement, and the system automatically computes and maintains the result on the configured refresh schedule — no scheduling scripts to write, no state to manage, and failed refreshes are automatically retried.
Incremental computation mechanism: A Dynamic Table does not recompute everything from scratch each time. Instead, it analyzes the SQL's dependency relationships and processes only the changed portions of upstream data. For example, a Dynamic Table that does a GROUP BY on an orders table — when 100 new rows are added to the orders table, the system only recomputes the groups affected by those 100 rows, rather than scanning all data. This is why the first creation requires a full refresh (to establish a baseline), while subsequent refreshes typically take only a few seconds.
Differences from a regular incremental model:
| incremental model | Dynamic Table | |
|---|---|---|
| Trigger | Manual dbt run | System auto-refreshes on a schedule |
| Refresh logic | You write {% if is_incremental() %} filters | System automatically handles incremental computation |
| Suitable for | Need precise control over run timing | Need continuous automatic refresh |
| View refresh status | N/A | SHOW DYNAMIC TABLE REFRESH HISTORY |
Basic Usage
Two key parameters:
refresh_interval: Refresh schedule, supports'5 MINUTE','1 HOUR','1 DAY', etc. (use singular uppercase for units)refresh_vc: Name of the compute cluster to use for refreshes. Since Dynamic Tables perform aggregation queries, it is recommended to use an analytics cluster (default_ap)
After dbt build creates the Dynamic Table, the system immediately begins auto-refreshing on the schedule — no additional action needed.
Manually Triggering a Refresh
When you need the latest data immediately, you can trigger a manual refresh:
Practical Example: Real-Time Order Aggregation
The following is an actual Dynamic Table running in snowflake-dbt2lakehouse-dbt, automatically refreshing order aggregation metrics every hour:
Jinja's {% for %} loop automatically expands aggregation columns for multiple statuses, avoiding repetitive code.
Dynamic Table Referencing a Dynamic Table
A Dynamic Table can reference another Dynamic Table, building a multi-layer pipeline:
Viewing Refresh Status
Returns the start time, end time, status (SUCCEED/FAILED), and refresh mode (INCREMENTAL/FULL) for each refresh.
Table Stream (CDC)
What Is a Table Stream
Table Stream is Singdata Lakehouse's change data capture (CDC) mechanism. After creating a Stream on a table, every INSERT, UPDATE, and DELETE operation on that table is recorded, and consumers can read these change records.
Core characteristics of a Stream:
- Does not copy data: A Stream only records changes; it does not store a full data copy
- Offset advances automatically: Only DML operations (INSERT/MERGE, etc.) advance the offset; SELECT does not
- System columns: Each change row comes with three system columns:
__change_type,__commit_timestamp,__commit_version
Using Table Stream in DBT
Step 1: Declare the Stream in sources.yml
Step 2: Reference it in the model using source()
SELECT * EXCEPT to Filter System Columns
When consuming a Stream, you typically only need the business columns, not the system columns. Use SELECT * EXCEPT(...) to filter out system columns without hardcoding all business column names:
This way, when new columns are added to the source table, the consuming model does not need to be modified.
Practical Example: Customer Dimension Change Tracking
The following is an actual CDC pipeline running in snowflake-dbt2lakehouse-dbt, tracking every change to the dim_customers table:
Step 1: Macro to create the Stream (macros/get_table_stream.sql)
This macro automatically creates the Stream at model run time (if it does not exist) and returns the Stream name for use in the FROM clause. TABLE_STREAM_MODE = 'STANDARD' captures all three types of changes: INSERT/UPDATE/DELETE.
Step 2: Consume the Stream (models/gold/dim_customer_changes.sql)
A few key points:
get_table_stream(ref('dim_customers'))automatically creates a Stream ondim_customersand returns the Stream name* except (...)filters out the three system columns, then aliases them separatelywhere not (__change_type = 'DELETE' and __change_type != 'UPDATE_BEFORE')— this condition filters out pure DELETE rows (__change_type = 'DELETE'and not UPDATE_BEFORE), keeping INSERT, UPDATE_BEFORE, and UPDATE_AFTERqualify row_number() = 1keeps only the most recent change record for the samecustomer_key
Stream Offset Management
A Stream's offset records "where the consumer last read up to." The offset only advances after a DML operation (INSERT/MERGE, etc.) succeeds; SELECT does not advance it. This design guarantees idempotency: if consumption fails, the same data can be read again on the next attempt without losing any changes.
dbt's incremental model executes MERGE INTO on each run, so the Stream's offset automatically advances after each successful dbt run. If dbt run fails midway, the offset does not advance, and the next run will reprocess the same batch of changes — this is the guarantee of "at-least-once" semantics.
Combined Usage: Stream + Dynamic Table
A typical real-time pipeline architecture:
Advantages of this architecture:
- Stream guarantees no changes are missed
- Dynamic Table automatically maintains aggregation results without manual scheduling
- The two are decoupled and can have their refresh frequencies adjusted independently
