Marketing Attribution and Uplift Modeling Data Warehouse Best Practices

Building on multi-channel attribution analysis, this article introduces a causal inference perspective — distinguishing users who "would have purchased anyway" from users who "purchased because of marketing intervention" — to enable precise budget allocation. Using a dataset of 4 marketing campaigns, 50 exposure records, and 30 conversion records, this article demonstrates an end-to-end Kafka PIPE → ODS → DWD → DWS → ADS build process, covering three platform capabilities: Dynamic Table incremental calculation, BITMAP set operations, and ZettaPark Python Task.


Overview

The core challenge of Uplift Modeling is: attribution analysis can only tell you "how many users who clicked on an ad converted," but cannot answer "would these users have purchased the same way without this ad?" Solving this requires causal comparison of treatment and control groups, which translates to the following sub-tasks at the data warehouse level:

ProblemSingdata Solution
Real-time ingestion of in-app conversion eventsKafka PIPE continuous consumption, no manual consumer code needed
Batch import of DMP exposure/click logsOSS PIPE batch import with automatic file tracking
Automatically maintained user-campaign touchpoint wide tableDynamic Table, declarative SQL, incremental refresh
Treatment/control group set operationsBITMAP functions, second-level set intersections, unions, and differences for hundreds of millions of users
Run S-Learner / T-Learner / X-LearnerZettaPark Python Task, calling EconML causal inference library
Scheduled refresh and quality alertsLakehouse Studio task scheduling with monitoring rules attached

SQL Commands Used

Command / FunctionPurposeNotes
CREATE TABLEBuild ODS layer exposure, conversion, and user feature tablesRegular tables, written by Kafka PIPE and INSERT
CREATE PIPECreate a Kafka continuous ingestion pipelineBound to the conversion event target table
CREATE DYNAMIC TABLEBuild DWD / DWS / ADS three-layer incremental calculation tablesNo REFRESH INTERVAL; scheduling managed by Lakehouse Studio tasks
REFRESH DYNAMIC TABLEManually trigger a single refreshUsed during initial build or debugging
GROUP_BITMAP_STATEBuild user ID set bitmapTreatment/control group user sets
GROUP_BITMAPCalculate set cardinalityBitmap version equivalent of COUNT(DISTINCT)
BITMAP_ANDCompute the intersection of two bitmapsFind users who are "in treatment group AND converted"
BITMAP_COUNTCompute bitmap cardinalityUsed together with BITMAP_AND

Prerequisites

All examples in this article run under the best_practice_uplift_model schema.

CREATE SCHEMA IF NOT EXISTS best_practice_uplift_model;


ODS Layer: Raw Data Ingestion

Create Tables

Three ODS raw tables: exposure records, conversion records, and user features.

-- Ad exposure table (including experiment group assignment) CREATE TABLE IF NOT EXISTS best_practice_uplift_model.doc_exposures ( user_id STRING, campaign_id STRING, channel STRING, exposure_time TIMESTAMP, is_treated INT -- 1=treatment group (received marketing intervention), 0=control group ); -- Conversion event table CREATE TABLE IF NOT EXISTS best_practice_uplift_model.doc_conversions ( user_id STRING, conversion_time TIMESTAMP, order_value DOUBLE ); -- User profile feature table (from DMP) CREATE TABLE IF NOT EXISTS best_practice_uplift_model.doc_user_features ( user_id STRING, age_group STRING, region STRING, historical_purchase_count INT );

Kafka Real-Time Write of Conversion Events

Method 1: Write via actual Kafka (recommended)

In a production environment, in-app conversion events are reported in real time via a Kafka Topic. With a Kafka broker configured, create the PIPE for continuous consumption:

-- First create a raw string receiving table; PIPE writes JSON strings CREATE TABLE IF NOT EXISTS best_practice_uplift_model.kafka_raw_conversions (value STRING); -- Create Kafka PIPE CREATE PIPE IF NOT EXISTS best_practice_uplift_model.pipe_conversions VIRTUAL_CLUSTER = 'DEFAULT' BATCH_INTERVAL_IN_SECONDS = '60' AS COPY INTO best_practice_uplift_model.kafka_raw_conversions FROM ( SELECT CAST(value AS STRING) AS value FROM READ_KAFKA( '<kafka-broker>:9092', -- Replace with actual broker address 'marketing_conversions', -- Topic name '', 'cz_uplift_consumer', -- Consumer group ID '','','','', 'raw', 'raw', 0, map() ) );

After the PIPE is created it runs by default, consuming in batches every 60 seconds. Python example for sending JSON messages to the topic:

from kafka import KafkaProducer import json, time producer = KafkaProducer( bootstrap_servers=['<kafka-broker>:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) conversion_event = { "user_id": "U001", "conversion_time": "2026-05-01 14:22:00", "order_value": 258.00 } producer.send('marketing_conversions', value=conversion_event) producer.flush()

Method 2: INSERT simulation (when no Kafka environment is available)

If Kafka is not configured, use the following methods to write data.

Import from local CSV (recommended):

-- Step 1: Upload local CSV file to User Volume via SQL PUT PUT '/path/to/doc_exposures.csv' TO USER VOLUME FILE 'doc_exposures.csv';

-- Step 2: COPY INTO table from User Volume COPY INTO best_practice_uplift_model.doc_exposures FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('doc_exposures.csv');

You can also insert a small batch of test data inline (no CSV file required):

When Kafka is not configured, use INSERT INTO to write directly to the target table, simulating the effect of Kafka messages already parsed and written. The following INSERT has been actually executed via cz-cli:

INSERT INTO best_practice_uplift_model.doc_exposures (user_id, campaign_id, channel, exposure_time, is_treated) VALUES ('U001','CMP001','wechat', CAST('2026-05-01 09:00:00' AS TIMESTAMP),1), ('U002','CMP001','wechat', CAST('2026-05-01 09:05:00' AS TIMESTAMP),1), ('U003','CMP001','wechat', CAST('2026-05-01 09:10:00' AS TIMESTAMP),0), ('U004','CMP001','douyin', CAST('2026-05-01 09:15:00' AS TIMESTAMP),1), ('U005','CMP001','douyin', CAST('2026-05-01 09:20:00' AS TIMESTAMP),0), ('U006','CMP001','douyin', CAST('2026-05-01 09:25:00' AS TIMESTAMP),1), ('U007','CMP002','search', CAST('2026-05-01 10:00:00' AS TIMESTAMP),1), ('U008','CMP002','search', CAST('2026-05-01 10:05:00' AS TIMESTAMP),0), ('U009','CMP002','search', CAST('2026-05-01 10:10:00' AS TIMESTAMP),1), ('U010','CMP002','search', CAST('2026-05-01 10:15:00' AS TIMESTAMP),0) -- ...50 records total, including 30 treatment group users (is_treated=1) and 20 control group users (is_treated=0) ;

Verify data write results:

SELECT is_treated, COUNT(*) AS users FROM best_practice_uplift_model.doc_exposures GROUP BY is_treated ORDER BY is_treated;

is_treated | users -----------+------ 0 | 20 1 | 30

Write conversion records (30 records, covering high-intent users in the treatment group):

Import from local CSV (recommended):

-- Step 1: Upload local CSV file to User Volume via SQL PUT PUT '/path/to/doc_conversions.csv' TO USER VOLUME FILE 'doc_conversions.csv';

-- Step 2: COPY INTO table from User Volume COPY INTO best_practice_uplift_model.doc_conversions FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('doc_conversions.csv');

You can also insert a small batch of test data inline (no CSV file required):

INSERT INTO best_practice_uplift_model.doc_conversions (user_id, conversion_time, order_value) VALUES ('U001',CAST('2026-05-01 14:22:00' AS TIMESTAMP),258.00), ('U002',CAST('2026-05-01 15:10:00' AS TIMESTAMP),189.50), ('U004',CAST('2026-05-01 16:05:00' AS TIMESTAMP),320.00), ('U007',CAST('2026-05-01 18:00:00' AS TIMESTAMP),450.00), ('U009',CAST('2026-05-01 19:10:00' AS TIMESTAMP),175.00) -- ...30 records total ;

Write user features (20 records, from DMP audience package):

Import from local CSV (recommended):

-- Step 1: Upload local CSV file to User Volume via SQL PUT PUT '/path/to/doc_user_features.csv' TO USER VOLUME FILE 'doc_user_features.csv';

-- Step 2: COPY INTO table from User Volume COPY INTO best_practice_uplift_model.doc_user_features FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('doc_user_features.csv');

You can also insert a small batch of test data inline (no CSV file required):

INSERT INTO best_practice_uplift_model.doc_user_features (user_id, age_group, region, historical_purchase_count) VALUES ('U001','25-34','shanghai',8), ('U002','35-44','beijing',3), ('U003','18-24','guangzhou',1), ('U004','25-34','shenzhen',12), ('U007','35-44','chengdu',15) -- ...20 records total ;


DWD Layer: User-Campaign Touchpoint Wide Table

Create Dynamic Table

The DWD layer JOINs three ODS tables into a single wide table, which is the foundation for all downstream analysis.

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_uplift_model.dwd_user_campaign_facts AS SELECT e.user_id, e.campaign_id, e.channel, e.exposure_time, e.is_treated, f.age_group, f.region, f.historical_purchase_count, CASE WHEN c.user_id IS NOT NULL THEN 1 ELSE 0 END AS is_converted, c.order_value, c.conversion_time FROM best_practice_uplift_model.doc_exposures e LEFT JOIN best_practice_uplift_model.doc_user_features f ON e.user_id = f.user_id LEFT JOIN best_practice_uplift_model.doc_conversions c ON e.user_id = c.user_id;

Manually trigger the first refresh:

REFRESH DYNAMIC TABLE best_practice_uplift_model.dwd_user_campaign_facts;

View the first few rows of the wide table:

SELECT user_id, campaign_id, channel, is_treated, age_group, region, is_converted, order_value FROM best_practice_uplift_model.dwd_user_campaign_facts LIMIT 5;

user_id | campaign_id | channel | is_treated | age_group | region | is_converted | order_value --------+-------------+---------+------------+-----------+-----------+--------------+------------ U031 | CMP004 | wechat | 1 | null | null | 1 | 480 U036 | CMP004 | email | 1 | null | null | 1 | 175 U039 | CMP004 | sms | 1 | null | null | 1 | 165 U044 | CMP002 | display | 1 | null | null | 1 | 280 U045 | CMP003 | email | 1 | null | null | 1 | 350

Users with null age_group and region are users not covered in ODS user_features (doc_user_features has only 20 records while the exposure table has 50). The LEFT JOIN retains all exposure records.


DWS Layer: Channel-Level Uplift Aggregation

Create Dynamic Table

The DWS layer aggregates by campaign_id × channel × is_treated granularity, outputting conversion rates and average order values for each channel in treatment and control groups.

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_uplift_model.dws_channel_uplift AS SELECT campaign_id, channel, is_treated, COUNT(*) AS user_count, SUM(is_converted) AS converted_count, ROUND(SUM(is_converted) * 1.0 / COUNT(*), 4) AS cvr, ROUND(AVG(CASE WHEN is_converted = 1 THEN order_value ELSE 0 END), 2) AS avg_order_value FROM best_practice_uplift_model.dwd_user_campaign_facts GROUP BY campaign_id, channel, is_treated;

Manually trigger refresh and view results:

REFRESH DYNAMIC TABLE best_practice_uplift_model.dws_channel_uplift; SELECT campaign_id, channel, is_treated, user_count, converted_count, cvr, avg_order_value FROM best_practice_uplift_model.dws_channel_uplift ORDER BY campaign_id, channel, is_treated LIMIT 10;

campaign_id | channel | is_treated | user_count | converted_count | cvr | avg_order_value ------------+---------+------------+------------+-----------------+--------+---------------- CMP001 | douyin | 0 | 2 | 0 | 0.0000 | 0 CMP001 | douyin | 1 | 3 | 3 | 1.0000 | 261.63 CMP001 | search | 0 | 1 | 0 | 0.0000 | 0 CMP001 | search | 1 | 1 | 1 | 1.0000 | 430 CMP001 | wechat | 0 | 3 | 0 | 0.0000 | 0 CMP001 | wechat | 1 | 4 | 4 | 1.0000 | 210.13 CMP002 | display | 0 | 2 | 0 | 0.0000 | 0 CMP002 | display | 1 | 4 | 4 | 1.0000 | 182 CMP002 | search | 0 | 4 | 0 | 0.0000 | 0 CMP002 | search | 1 | 3 | 3 | 1.0000 | 281.67

BITMAP Set Operations: Intersection of Treatment Group and Converted Users

BITMAP functions are suitable for quickly completing set operations between treatment/control groups and converted user populations at the scale of hundreds of millions of users, avoiding large-scale JOINs.

-- Calculate the number of users in treatment and control groups (BITMAP cardinality) SELECT GROUP_BITMAP(CASE WHEN is_treated=1 THEN CAST(SUBSTR(user_id,2) AS BIGINT) END) AS treated_count, GROUP_BITMAP(CASE WHEN is_treated=0 THEN CAST(SUBSTR(user_id,2) AS BIGINT) END) AS control_count FROM best_practice_uplift_model.doc_exposures;

treated_count | control_count --------------+-------------- 30 | 20

Calculate the number of users "in treatment group AND converted" (intersection):

WITH treated_set AS ( SELECT GROUP_BITMAP_STATE(CAST(SUBSTR(user_id,2) AS BIGINT)) AS bm FROM best_practice_uplift_model.doc_exposures WHERE is_treated = 1 ), converted_set AS ( SELECT GROUP_BITMAP_STATE(CAST(SUBSTR(user_id,2) AS BIGINT)) AS bm FROM best_practice_uplift_model.doc_conversions ) SELECT BITMAP_COUNT(BITMAP_AND(t.bm, c.bm)) AS treated_and_converted FROM treated_set t CROSS JOIN converted_set c;

treated_and_converted --------------------- 30

All 30 treatment group users appear in the conversion records — consistent with the simulated data design (all conversion records come from treatment group users).


ADS Layer: Uplift Scoring and ROI Recommendations

Create Dynamic Table

The ADS layer calculates the Uplift CVR (treatment group conversion rate - control group conversion rate) and Uplift ARPU (incremental revenue per user) for each channel, and labels them with three tier levels.

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_uplift_model.ads_uplift_score AS WITH treated AS ( SELECT campaign_id, channel, SUM(is_converted) * 1.0 / COUNT(*) AS cvr_treated, AVG(CASE WHEN is_converted = 1 THEN order_value ELSE 0 END) AS arpu_treated, COUNT(*) AS cnt_treated FROM best_practice_uplift_model.dwd_user_campaign_facts WHERE is_treated = 1 GROUP BY campaign_id, channel ), control AS ( SELECT campaign_id, channel, SUM(is_converted) * 1.0 / COUNT(*) AS cvr_control, AVG(CASE WHEN is_converted = 1 THEN order_value ELSE 0 END) AS arpu_control, COUNT(*) AS cnt_control FROM best_practice_uplift_model.dwd_user_campaign_facts WHERE is_treated = 0 GROUP BY campaign_id, channel ) SELECT t.campaign_id, t.channel, ROUND(t.cvr_treated, 4) AS cvr_treated, ROUND(c.cvr_control, 4) AS cvr_control, ROUND(t.cvr_treated - c.cvr_control, 4) AS uplift_cvr, ROUND(t.arpu_treated - c.arpu_control, 2) AS uplift_arpu, t.cnt_treated, c.cnt_control, CASE WHEN t.cvr_treated - c.cvr_control > 0.5 THEN 'HIGH' WHEN t.cvr_treated - c.cvr_control > 0.2 THEN 'MEDIUM' ELSE 'LOW' END AS uplift_tier FROM treated t JOIN control c ON t.campaign_id = c.campaign_id AND t.channel = c.channel;

Uplift tier threshold explanation:

TierConditionMeaning
HIGHuplift_cvr > 0.5Marketing intervention brings more than 50% conversion rate increment; strongly recommend increasing investment
MEDIUMuplift_cvr > 0.2Moderate effect; decide whether to scale based on order value
LOWuplift_cvr ≤ 0.2Weak marketing effect; may be reaching many users who "would have purchased anyway"

Manually trigger refresh and view scoring results:

REFRESH DYNAMIC TABLE best_practice_uplift_model.ads_uplift_score; SELECT campaign_id, channel, cvr_treated, cvr_control, uplift_cvr, uplift_arpu, cnt_treated, cnt_control, uplift_tier FROM best_practice_uplift_model.ads_uplift_score ORDER BY campaign_id, channel;

campaign_id | channel | cvr_treated | cvr_control | uplift_cvr | uplift_arpu | cnt_treated | cnt_control | uplift_tier ------------+---------+-------------+-------------+------------+-------------+-------------+-------------+------------ CMP001 | douyin | 1.0000 | 0.0000 | 1.0000 | 261.63 | 3 | 2 | HIGH CMP001 | search | 1.0000 | 0.0000 | 1.0000 | 430.00 | 1 | 1 | HIGH CMP001 | wechat | 1.0000 | 0.0000 | 1.0000 | 210.13 | 4 | 3 | HIGH CMP002 | display | 1.0000 | 0.0000 | 1.0000 | 182.00 | 4 | 2 | HIGH CMP002 | search | 1.0000 | 0.0000 | 1.0000 | 281.67 | 3 | 4 | HIGH CMP003 | email | 1.0000 | 0.0000 | 1.0000 | 413.33 | 3 | 1 | HIGH CMP003 | push | 1.0000 | 0.0000 | 1.0000 | 217.50 | 2 | 2 | HIGH CMP003 | sms | 1.0000 | 0.0000 | 1.0000 | 244.50 | 2 | 1 | HIGH CMP004 | douyin | 1.0000 | 0.0000 | 1.0000 | 200.00 | 2 | 1 | HIGH CMP004 | email | 1.0000 | 0.0000 | 1.0000 | 175.00 | 1 | 1 | HIGH CMP004 | push | 1.0000 | 0.0000 | 1.0000 | 240.00 | 1 | 1 | HIGH CMP004 | wechat | 1.0000 | 0.0000 | 1.0000 | 300.00 | 2 | 1 | HIGH

Result interpretation:

  • In the simulated data, the control group (is_treated=0) has no conversion records, so all channels have uplift_cvr = 1.0 and are all rated HIGH. In real production data, the control group will have organic conversions, and uplift_cvr typically falls between 0.05–0.30 with clear stratification across channels.
  • Ranked by uplift_arpu: search (¥430) and email (¥413) channels have the highest incremental revenue per user, making them the top priorities for budget allocation.
  • display (¥182) channel has the lowest uplift_arpu. Although its conversion rate is also 100% in this dataset, display ads typically have higher organic conversion rates in practice, resulting in lower Uplift CVR.

Channel ROI Analysis

SELECT channel, SUM(CASE WHEN is_treated=1 AND is_converted=1 THEN order_value ELSE 0 END) AS treated_revenue, SUM(CASE WHEN is_treated=1 THEN 1 ELSE 0 END) AS treated_users, ROUND( SUM(CASE WHEN is_treated=1 AND is_converted=1 THEN order_value ELSE 0 END) / NULLIF(SUM(CASE WHEN is_treated=1 THEN 1 ELSE 0 END), 0), 2) AS roi_per_treated_user FROM best_practice_uplift_model.dwd_user_campaign_facts GROUP BY channel ORDER BY roi_per_treated_user DESC;

channel | treated_revenue | treated_users | roi_per_treated_user --------+-----------------+---------------+--------------------- email | 1415.00 | 4 | 353.75 search | 1275.00 | 4 | 318.75 wechat | 1440.50 | 6 | 240.08 douyin | 1184.90 | 5 | 236.98 sms | 944.00 | 4 | 236.00 push | 675.00 | 3 | 225.00 display | 728.00 | 4 | 182.00

Result interpretation: email and search channels generate the highest average revenue per treatment group user and are the priority budget allocation targets. The display channel, while its total revenue (¥728) is reasonable, has the lowest per-user ROI, indicating it reaches many users but has lower individual user value.


ZettaPark Python Task: Meta-Learner Uplift Modeling

Scenario Description

The SQL layer calculates a simple CVR difference (ATE, Average Treatment Effect), suitable for channel-level aggregated analysis. ZettaPark Python Task goes further to estimate Individual Treatment Effects (ITE) at the individual level, using the Meta-Learner framework (S-Learner / T-Learner / X-Learner) to identify which users are true Persuadables (with room to be influenced).

Code Example (T-Learner)

from clickzetta_zettapark.session import Session from sklearn.ensemble import GradientBoostingClassifier import pandas as pd # Connect to Lakehouse via ZettaPark session = Session.builder.configs({ "instance": "<instance>", "workspace": "<workspace>", "schema": "best_practice_uplift_model", "vcluster": "DEFAULT", "username": "<username>", "password": "<password>" }).create() # Read DWD wide table df = session.sql(""" SELECT user_id, is_treated, is_converted, COALESCE(historical_purchase_count, 0) AS hist_purchase, CASE age_group WHEN '18-24' THEN 1 WHEN '25-34' THEN 2 WHEN '35-44' THEN 3 WHEN '45-54' THEN 4 ELSE 0 END AS age_bucket FROM best_practice_uplift_model.dwd_user_campaign_facts WHERE age_group IS NOT NULL """).to_pandas() # T-Learner: train separate models for treatment and control groups features = ['hist_purchase', 'age_bucket'] treatment_df = df[df['is_treated'] == 1] control_df = df[df['is_treated'] == 0] m1 = GradientBoostingClassifier(n_estimators=50, random_state=42) m0 = GradientBoostingClassifier(n_estimators=50, random_state=42) m1.fit(treatment_df[features], treatment_df['is_converted']) m0.fit(control_df[features], control_df['is_converted']) # Predict ITE (Individual Treatment Effect) for all users df['p1'] = m1.predict_proba(df[features])[:, 1] df['p0'] = m0.predict_proba(df[features])[:, 1] df['ite'] = df['p1'] - df['p0'] # Write back to Lakehouse result_df = session.create_dataframe(df[['user_id', 'ite']]) result_df.write.save_as_table( "best_practice_uplift_model.ads_user_ite_scores", mode="overwrite" ) print(f"ITE scores written: {len(df)} users")

Using EconML's X-Learner (Higher Precision)

For scenarios with sufficient sample sizes, X-Learner eliminates confounding bias through double residuals for more accurate estimation:

from econml.metalearners import XLearner from sklearn.ensemble import RandomForestClassifier xl = XLearner( models=RandomForestClassifier(n_estimators=100, random_state=42) ) X = df[features].values T = df['is_treated'].values Y = df['is_converted'].values xl.fit(Y, T, X=X) ite_scores = xl.effect(X)


Scheduling Management: Lakehouse Studio Tasks

Periodic refresh of Dynamic Tables is not written in DDL; instead, scheduling is managed via Lakehouse Studio tasks, which allows monitoring alerts and data quality check rules to be attached to the same task.

Create the following refresh tasks under the best_practices/uplift_model/ path in Studio:

  1. refresh_dwd_user_campaign_facts

    • SQL: REFRESH DYNAMIC TABLE best_practice_uplift_model.dwd_user_campaign_facts
    • Schedule: every hour on the hour
    • Quality check: ensure the ratio of is_converted IS NULL does not exceed 80%
  2. refresh_dws_channel_uplift

    • SQL: REFRESH DYNAMIC TABLE best_practice_uplift_model.dws_channel_uplift
    • Schedule: trigger after refresh_dwd_user_campaign_facts completes successfully
    • Alert: send anomaly notification when uplift_cvr is 0 for all channels
  3. refresh_ads_uplift_score

    • SQL: REFRESH DYNAMIC TABLE best_practice_uplift_model.ads_uplift_score
    • Schedule: trigger after refresh_dws_channel_uplift completes successfully
    • Alert: send alert when the number of HIGH tier channels drops more than 50% compared to the previous day
  4. run_uplift_ml_task (ZettaPark Python Task)

    • Script: T-Learner code from the previous section
    • Schedule: daily at 02:00
    • Output: written to ads_user_ite_scores

Configure the Lakehouse Studio task DAG dependencies to ensure the data layers refresh sequentially from DWD to ADS before triggering the machine learning task.


Incremental Calculation Notes

Both dwd_user_campaign_facts and dws_channel_uplift are Dynamic Tables with three ODS tables as upstream. When new exposure or conversion records are inserted upstream, the Dynamic Table framework automatically detects changes and computes incrementally, without a full rerun.

This is especially important for Uplift Modeling scenarios:

  • After each new campaign ends, batch-write the final exposure and conversion data for the treatment/control groups to ODS
  • Trigger the chained refresh of DWD → DWS → ADS
  • The ADS layer immediately shows the latest Uplift CVR and tier results, driving the next round of budget decisions

Notes

  • Dynamic Table DDL does not include REFRESH INTERVAL; scheduling is managed by Lakehouse Studio tasks, with monitoring alerts and quality check rules attachable.
  • BITMAP functions (GROUP_BITMAP_STATE / BITMAP_AND / BITMAP_COUNT) require user IDs to be converted to BIGINT type. When user_id is a string, use CAST(SUBSTR(user_id, 2) AS BIGINT) to extract the numeric portion.
  • Meta-Learners depend on the Random Controlled Trial (RCT) assumption. If the experimental grouping is biased, ITE estimation results will be distorted and must be corrected using Propensity Scores.
  • The Uplift CVR difference calculation requires treatment and control groups to be paired within the same channel and campaign. If a channel has only a treatment group but no control group, the JOIN will filter out that channel and no row will appear in ads_uplift_score for it.
  • avg_order_value in the DWS layer uses AVG(CASE WHEN is_converted = 1 THEN order_value ELSE 0 END) rather than AVG(order_value), to treat unconverted users' order_value (NULL) as 0, ensuring the denominator is the total user count rather than just converted users.