Marketing Attribution and Uplift Modeling Data Warehouse Best Practices
Building on multi-channel attribution analysis, this article introduces a causal inference perspective — distinguishing users who "would have purchased anyway" from users who "purchased because of marketing intervention" — to enable precise budget allocation. Using a dataset of 4 marketing campaigns, 50 exposure records, and 30 conversion records, this article demonstrates an end-to-end Kafka PIPE → ODS → DWD → DWS → ADS build process, covering three platform capabilities: Dynamic Table incremental calculation, BITMAP set operations, and ZettaPark Python Task.
Overview
The core challenge of Uplift Modeling is: attribution analysis can only tell you "how many users who clicked on an ad converted," but cannot answer "would these users have purchased the same way without this ad?" Solving this requires causal comparison of treatment and control groups, which translates to the following sub-tasks at the data warehouse level:
Problem
Singdata Solution
Real-time ingestion of in-app conversion events
Kafka PIPE continuous consumption, no manual consumer code needed
Batch import of DMP exposure/click logs
OSS PIPE batch import with automatic file tracking
No REFRESH INTERVAL; scheduling managed by Lakehouse Studio tasks
REFRESH DYNAMIC TABLE
Manually trigger a single refresh
Used during initial build or debugging
GROUP_BITMAP_STATE
Build user ID set bitmap
Treatment/control group user sets
GROUP_BITMAP
Calculate set cardinality
Bitmap version equivalent of COUNT(DISTINCT)
BITMAP_AND
Compute the intersection of two bitmaps
Find users who are "in treatment group AND converted"
BITMAP_COUNT
Compute bitmap cardinality
Used together with BITMAP_AND
Prerequisites
All examples in this article run under the best_practice_uplift_model schema.
CREATE SCHEMA IF NOT EXISTS best_practice_uplift_model;
ODS Layer: Raw Data Ingestion
Create Tables
Three ODS raw tables: exposure records, conversion records, and user features.
-- Ad exposure table (including experiment group assignment)
CREATE TABLE IF NOT EXISTS best_practice_uplift_model.doc_exposures (
user_id STRING,
campaign_id STRING,
channel STRING,
exposure_time TIMESTAMP,
is_treated INT -- 1=treatment group (received marketing intervention), 0=control group
);
-- Conversion event table
CREATE TABLE IF NOT EXISTS best_practice_uplift_model.doc_conversions (
user_id STRING,
conversion_time TIMESTAMP,
order_value DOUBLE
);
-- User profile feature table (from DMP)
CREATE TABLE IF NOT EXISTS best_practice_uplift_model.doc_user_features (
user_id STRING,
age_group STRING,
region STRING,
historical_purchase_count INT
);
Kafka Real-Time Write of Conversion Events
Method 1: Write via actual Kafka (recommended)
In a production environment, in-app conversion events are reported in real time via a Kafka Topic. With a Kafka broker configured, create the PIPE for continuous consumption:
-- First create a raw string receiving table; PIPE writes JSON strings
CREATE TABLE IF NOT EXISTS best_practice_uplift_model.kafka_raw_conversions (value STRING);
-- Create Kafka PIPE
CREATE PIPE IF NOT EXISTS best_practice_uplift_model.pipe_conversions
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = '60'
AS
COPY INTO best_practice_uplift_model.kafka_raw_conversions
FROM (
SELECT CAST(value AS STRING) AS value
FROM READ_KAFKA(
'<kafka-broker>:9092', -- Replace with actual broker address
'marketing_conversions', -- Topic name
'',
'cz_uplift_consumer', -- Consumer group ID
'','','','',
'raw', 'raw',
0,
map()
)
);
After the PIPE is created it runs by default, consuming in batches every 60 seconds. Python example for sending JSON messages to the topic:
Method 2: INSERT simulation (when no Kafka environment is available)
If Kafka is not configured, use the following methods to write data.
Import from local CSV (recommended):
-- Step 1: Upload local CSV file to User Volume via SQL PUT
PUT '/path/to/doc_exposures.csv' TO USER VOLUME FILE 'doc_exposures.csv';
-- Step 2: COPY INTO table from User Volume
COPY INTO best_practice_uplift_model.doc_exposures
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('doc_exposures.csv');
You can also insert a small batch of test data inline (no CSV file required):
💡 Tip: The example below uses cz-cli (the Singdata Lakehouse command-line tool). If cz-cli is not installed, refer to the cz-cli setup guide. If you prefer not to use the command line, you can execute SQL in Develop -> SQL Editor in Lakehouse Studio and configure and trigger scheduled tasks in the Studio -> Task page.
When Kafka is not configured, use INSERT INTO to write directly to the target table, simulating the effect of Kafka messages already parsed and written. The following INSERT has been actually executed via cz-cli:
INSERT INTO best_practice_uplift_model.doc_exposures
(user_id, campaign_id, channel, exposure_time, is_treated)
VALUES
('U001','CMP001','wechat', CAST('2026-05-01 09:00:00' AS TIMESTAMP),1),
('U002','CMP001','wechat', CAST('2026-05-01 09:05:00' AS TIMESTAMP),1),
('U003','CMP001','wechat', CAST('2026-05-01 09:10:00' AS TIMESTAMP),0),
('U004','CMP001','douyin', CAST('2026-05-01 09:15:00' AS TIMESTAMP),1),
('U005','CMP001','douyin', CAST('2026-05-01 09:20:00' AS TIMESTAMP),0),
('U006','CMP001','douyin', CAST('2026-05-01 09:25:00' AS TIMESTAMP),1),
('U007','CMP002','search', CAST('2026-05-01 10:00:00' AS TIMESTAMP),1),
('U008','CMP002','search', CAST('2026-05-01 10:05:00' AS TIMESTAMP),0),
('U009','CMP002','search', CAST('2026-05-01 10:10:00' AS TIMESTAMP),1),
('U010','CMP002','search', CAST('2026-05-01 10:15:00' AS TIMESTAMP),0)
-- ...50 records total, including 30 treatment group users (is_treated=1) and 20 control group users (is_treated=0)
;
Verify data write results:
SELECT is_treated, COUNT(*) AS users
FROM best_practice_uplift_model.doc_exposures
GROUP BY is_treated
ORDER BY is_treated;
Write conversion records (30 records, covering high-intent users in the treatment group):
Import from local CSV (recommended):
-- Step 1: Upload local CSV file to User Volume via SQL PUT
PUT '/path/to/doc_conversions.csv' TO USER VOLUME FILE 'doc_conversions.csv';
-- Step 2: COPY INTO table from User Volume
COPY INTO best_practice_uplift_model.doc_conversions
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('doc_conversions.csv');
You can also insert a small batch of test data inline (no CSV file required):
INSERT INTO best_practice_uplift_model.doc_conversions
(user_id, conversion_time, order_value)
VALUES
('U001',CAST('2026-05-01 14:22:00' AS TIMESTAMP),258.00),
('U002',CAST('2026-05-01 15:10:00' AS TIMESTAMP),189.50),
('U004',CAST('2026-05-01 16:05:00' AS TIMESTAMP),320.00),
('U007',CAST('2026-05-01 18:00:00' AS TIMESTAMP),450.00),
('U009',CAST('2026-05-01 19:10:00' AS TIMESTAMP),175.00)
-- ...30 records total
;
Write user features (20 records, from DMP audience package):
Import from local CSV (recommended):
-- Step 1: Upload local CSV file to User Volume via SQL PUT
PUT '/path/to/doc_user_features.csv' TO USER VOLUME FILE 'doc_user_features.csv';
-- Step 2: COPY INTO table from User Volume
COPY INTO best_practice_uplift_model.doc_user_features
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('doc_user_features.csv');
You can also insert a small batch of test data inline (no CSV file required):
INSERT INTO best_practice_uplift_model.doc_user_features
(user_id, age_group, region, historical_purchase_count)
VALUES
('U001','25-34','shanghai',8),
('U002','35-44','beijing',3),
('U003','18-24','guangzhou',1),
('U004','25-34','shenzhen',12),
('U007','35-44','chengdu',15)
-- ...20 records total
;
DWD Layer: User-Campaign Touchpoint Wide Table
Create Dynamic Table
The DWD layer JOINs three ODS tables into a single wide table, which is the foundation for all downstream analysis.
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_uplift_model.dwd_user_campaign_facts
AS
SELECT
e.user_id,
e.campaign_id,
e.channel,
e.exposure_time,
e.is_treated,
f.age_group,
f.region,
f.historical_purchase_count,
CASE WHEN c.user_id IS NOT NULL THEN 1 ELSE 0 END AS is_converted,
c.order_value,
c.conversion_time
FROM best_practice_uplift_model.doc_exposures e
LEFT JOIN best_practice_uplift_model.doc_user_features f ON e.user_id = f.user_id
LEFT JOIN best_practice_uplift_model.doc_conversions c ON e.user_id = c.user_id;
⚠️ Note: CREATE DYNAMIC TABLE DDL does not include REFRESH INTERVAL. Scheduling is managed by Lakehouse Studio tasks (see the "Scheduling Management" section).
Users with null age_group and region are users not covered in ODS user_features (doc_user_features has only 20 records while the exposure table has 50). The LEFT JOIN retains all exposure records.
DWS Layer: Channel-Level Uplift Aggregation
Create Dynamic Table
The DWS layer aggregates by campaign_id × channel × is_treated granularity, outputting conversion rates and average order values for each channel in treatment and control groups.
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_uplift_model.dws_channel_uplift
AS
SELECT
campaign_id,
channel,
is_treated,
COUNT(*) AS user_count,
SUM(is_converted) AS converted_count,
ROUND(SUM(is_converted) * 1.0 / COUNT(*), 4) AS cvr,
ROUND(AVG(CASE WHEN is_converted = 1 THEN order_value ELSE 0 END), 2) AS avg_order_value
FROM best_practice_uplift_model.dwd_user_campaign_facts
GROUP BY campaign_id, channel, is_treated;
Manually trigger refresh and view results:
REFRESH DYNAMIC TABLE best_practice_uplift_model.dws_channel_uplift;
SELECT campaign_id, channel, is_treated,
user_count, converted_count, cvr, avg_order_value
FROM best_practice_uplift_model.dws_channel_uplift
ORDER BY campaign_id, channel, is_treated
LIMIT 10;
BITMAP Set Operations: Intersection of Treatment Group and Converted Users
BITMAP functions are suitable for quickly completing set operations between treatment/control groups and converted user populations at the scale of hundreds of millions of users, avoiding large-scale JOINs.
-- Calculate the number of users in treatment and control groups (BITMAP cardinality)
SELECT
GROUP_BITMAP(CASE WHEN is_treated=1
THEN CAST(SUBSTR(user_id,2) AS BIGINT) END) AS treated_count,
GROUP_BITMAP(CASE WHEN is_treated=0
THEN CAST(SUBSTR(user_id,2) AS BIGINT) END) AS control_count
FROM best_practice_uplift_model.doc_exposures;
💡 Tip: GROUP_BITMAP returns set cardinality (INT), equivalent to COUNT(DISTINCT user_id) but using compressed bitmaps — performance advantage is significant with tens of millions of users. If you need the bitmap object itself (for subsequent set operations), use GROUP_BITMAP_STATE instead.
Calculate the number of users "in treatment group AND converted" (intersection):
WITH treated_set AS (
SELECT GROUP_BITMAP_STATE(CAST(SUBSTR(user_id,2) AS BIGINT)) AS bm
FROM best_practice_uplift_model.doc_exposures
WHERE is_treated = 1
),
converted_set AS (
SELECT GROUP_BITMAP_STATE(CAST(SUBSTR(user_id,2) AS BIGINT)) AS bm
FROM best_practice_uplift_model.doc_conversions
)
SELECT BITMAP_COUNT(BITMAP_AND(t.bm, c.bm)) AS treated_and_converted
FROM treated_set t CROSS JOIN converted_set c;
treated_and_converted
---------------------
30
All 30 treatment group users appear in the conversion records — consistent with the simulated data design (all conversion records come from treatment group users).
ADS Layer: Uplift Scoring and ROI Recommendations
Create Dynamic Table
The ADS layer calculates the Uplift CVR (treatment group conversion rate - control group conversion rate) and Uplift ARPU (incremental revenue per user) for each channel, and labels them with three tier levels.
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_uplift_model.ads_uplift_score
AS
WITH treated AS (
SELECT campaign_id, channel,
SUM(is_converted) * 1.0 / COUNT(*) AS cvr_treated,
AVG(CASE WHEN is_converted = 1 THEN order_value ELSE 0 END) AS arpu_treated,
COUNT(*) AS cnt_treated
FROM best_practice_uplift_model.dwd_user_campaign_facts
WHERE is_treated = 1
GROUP BY campaign_id, channel
),
control AS (
SELECT campaign_id, channel,
SUM(is_converted) * 1.0 / COUNT(*) AS cvr_control,
AVG(CASE WHEN is_converted = 1 THEN order_value ELSE 0 END) AS arpu_control,
COUNT(*) AS cnt_control
FROM best_practice_uplift_model.dwd_user_campaign_facts
WHERE is_treated = 0
GROUP BY campaign_id, channel
)
SELECT
t.campaign_id,
t.channel,
ROUND(t.cvr_treated, 4) AS cvr_treated,
ROUND(c.cvr_control, 4) AS cvr_control,
ROUND(t.cvr_treated - c.cvr_control, 4) AS uplift_cvr,
ROUND(t.arpu_treated - c.arpu_control, 2) AS uplift_arpu,
t.cnt_treated,
c.cnt_control,
CASE
WHEN t.cvr_treated - c.cvr_control > 0.5 THEN 'HIGH'
WHEN t.cvr_treated - c.cvr_control > 0.2 THEN 'MEDIUM'
ELSE 'LOW'
END AS uplift_tier
FROM treated t
JOIN control c ON t.campaign_id = c.campaign_id AND t.channel = c.channel;
Uplift tier threshold explanation:
Tier
Condition
Meaning
HIGH
uplift_cvr > 0.5
Marketing intervention brings more than 50% conversion rate increment; strongly recommend increasing investment
MEDIUM
uplift_cvr > 0.2
Moderate effect; decide whether to scale based on order value
LOW
uplift_cvr ≤ 0.2
Weak marketing effect; may be reaching many users who "would have purchased anyway"
Manually trigger refresh and view scoring results:
REFRESH DYNAMIC TABLE best_practice_uplift_model.ads_uplift_score;
SELECT campaign_id, channel, cvr_treated, cvr_control,
uplift_cvr, uplift_arpu, cnt_treated, cnt_control, uplift_tier
FROM best_practice_uplift_model.ads_uplift_score
ORDER BY campaign_id, channel;
In the simulated data, the control group (is_treated=0) has no conversion records, so all channels have uplift_cvr = 1.0 and are all rated HIGH. In real production data, the control group will have organic conversions, and uplift_cvr typically falls between 0.05–0.30 with clear stratification across channels.
Ranked by uplift_arpu: search (¥430) and email (¥413) channels have the highest incremental revenue per user, making them the top priorities for budget allocation.
display (¥182) channel has the lowest uplift_arpu. Although its conversion rate is also 100% in this dataset, display ads typically have higher organic conversion rates in practice, resulting in lower Uplift CVR.
Channel ROI Analysis
SELECT
channel,
SUM(CASE WHEN is_treated=1 AND is_converted=1 THEN order_value ELSE 0 END) AS treated_revenue,
SUM(CASE WHEN is_treated=1 THEN 1 ELSE 0 END) AS treated_users,
ROUND(
SUM(CASE WHEN is_treated=1 AND is_converted=1 THEN order_value ELSE 0 END) /
NULLIF(SUM(CASE WHEN is_treated=1 THEN 1 ELSE 0 END), 0),
2) AS roi_per_treated_user
FROM best_practice_uplift_model.dwd_user_campaign_facts
GROUP BY channel
ORDER BY roi_per_treated_user DESC;
Result interpretation: email and search channels generate the highest average revenue per treatment group user and are the priority budget allocation targets. The display channel, while its total revenue (¥728) is reasonable, has the lowest per-user ROI, indicating it reaches many users but has lower individual user value.
The SQL layer calculates a simple CVR difference (ATE, Average Treatment Effect), suitable for channel-level aggregated analysis. ZettaPark Python Task goes further to estimate Individual Treatment Effects (ITE) at the individual level, using the Meta-Learner framework (S-Learner / T-Learner / X-Learner) to identify which users are true Persuadables (with room to be influenced).
Code Example (T-Learner)
from clickzetta_zettapark.session import Session
from sklearn.ensemble import GradientBoostingClassifier
import pandas as pd
# Connect to Lakehouse via ZettaPark
session = Session.builder.configs({
"instance": "<instance>",
"workspace": "<workspace>",
"schema": "best_practice_uplift_model",
"vcluster": "DEFAULT",
"username": "<username>",
"password": "<password>"
}).create()
# Read DWD wide table
df = session.sql("""
SELECT user_id, is_treated, is_converted,
COALESCE(historical_purchase_count, 0) AS hist_purchase,
CASE age_group
WHEN '18-24' THEN 1
WHEN '25-34' THEN 2
WHEN '35-44' THEN 3
WHEN '45-54' THEN 4
ELSE 0
END AS age_bucket
FROM best_practice_uplift_model.dwd_user_campaign_facts
WHERE age_group IS NOT NULL
""").to_pandas()
# T-Learner: train separate models for treatment and control groups
features = ['hist_purchase', 'age_bucket']
treatment_df = df[df['is_treated'] == 1]
control_df = df[df['is_treated'] == 0]
m1 = GradientBoostingClassifier(n_estimators=50, random_state=42)
m0 = GradientBoostingClassifier(n_estimators=50, random_state=42)
m1.fit(treatment_df[features], treatment_df['is_converted'])
m0.fit(control_df[features], control_df['is_converted'])
# Predict ITE (Individual Treatment Effect) for all users
df['p1'] = m1.predict_proba(df[features])[:, 1]
df['p0'] = m0.predict_proba(df[features])[:, 1]
df['ite'] = df['p1'] - df['p0']
# Write back to Lakehouse
result_df = session.create_dataframe(df[['user_id', 'ite']])
result_df.write.save_as_table(
"best_practice_uplift_model.ads_user_ite_scores",
mode="overwrite"
)
print(f"ITE scores written: {len(df)} users")
💡 Tip: The above code is deployed as a Python Task in Studio with a daily scheduled task for automatic retraining and writing ITE scores back to the ADS layer. ITE > 0 indicates the user may convert due to marketing intervention (Persuadable); ITE < 0 indicates the intervention may backfire (Do Not Disturb).
Using EconML's X-Learner (Higher Precision)
For scenarios with sufficient sample sizes, X-Learner eliminates confounding bias through double residuals for more accurate estimation:
from econml.metalearners import XLearner
from sklearn.ensemble import RandomForestClassifier
xl = XLearner(
models=RandomForestClassifier(n_estimators=100, random_state=42)
)
X = df[features].values
T = df['is_treated'].values
Y = df['is_converted'].values
xl.fit(Y, T, X=X)
ite_scores = xl.effect(X)
⚠️ Note: Meta-Learners require balanced sample sizes for treatment and control groups, and require random grouping (RCT). If the experimental design has selection bias (e.g., high-intent users are disproportionately assigned to the treatment group), a Propensity Score correction must be applied before running the Meta-Learner.
Scheduling Management: Lakehouse Studio Tasks
Periodic refresh of Dynamic Tables is not written in DDL; instead, scheduling is managed via Lakehouse Studio tasks, which allows monitoring alerts and data quality check rules to be attached to the same task.
Create the following refresh tasks under the best_practices/uplift_model/ path in Studio:
Schedule: trigger after refresh_dws_channel_uplift completes successfully
Alert: send alert when the number of HIGH tier channels drops more than 50% compared to the previous day
run_uplift_ml_task (ZettaPark Python Task)
Script: T-Learner code from the previous section
Schedule: daily at 02:00
Output: written to ads_user_ite_scores
Configure the Lakehouse Studio task DAG dependencies to ensure the data layers refresh sequentially from DWD to ADS before triggering the machine learning task.
💡 Tip: Lakehouse Studio tasks support configuring data quality rules on task nodes, such as checking row count thresholds, NULL ratios, and field value ranges. Compared to manually triggering REFRESH, Lakehouse Studio task scheduling provides audit logs and supports alert notifications (Feishu, DingTalk, WeCom).
Incremental Calculation Notes
Both dwd_user_campaign_facts and dws_channel_uplift are Dynamic Tables with three ODS tables as upstream. When new exposure or conversion records are inserted upstream, the Dynamic Table framework automatically detects changes and computes incrementally, without a full rerun.
This is especially important for Uplift Modeling scenarios:
After each new campaign ends, batch-write the final exposure and conversion data for the treatment/control groups to ODS
Trigger the chained refresh of DWD → DWS → ADS
The ADS layer immediately shows the latest Uplift CVR and tier results, driving the next round of budget decisions
Notes
Dynamic Table DDL does not include REFRESH INTERVAL; scheduling is managed by Lakehouse Studio tasks, with monitoring alerts and quality check rules attachable.
BITMAP functions (GROUP_BITMAP_STATE / BITMAP_AND / BITMAP_COUNT) require user IDs to be converted to BIGINT type. When user_id is a string, use CAST(SUBSTR(user_id, 2) AS BIGINT) to extract the numeric portion.
Meta-Learners depend on the Random Controlled Trial (RCT) assumption. If the experimental grouping is biased, ITE estimation results will be distorted and must be corrected using Propensity Scores.
The Uplift CVR difference calculation requires treatment and control groups to be paired within the same channel and campaign. If a channel has only a treatment group but no control group, the JOIN will filter out that channel and no row will appear in ads_uplift_score for it.
avg_order_value in the DWS layer uses AVG(CASE WHEN is_converted = 1 THEN order_value ELSE 0 END) rather than AVG(order_value), to treat unconverted users' order_value (NULL) as 0, ensuring the denominator is the total user count rather than just converted users.