Building a Content Platform Recommendation System Data Warehouse
Using the Steam game platform dataset (Steam Dataset 2025, approximately 240,000 games and 3.9 million reviews), this guide combines user interaction behaviors (views, likes, shares) with content metadata to build a three-layer data warehouse supporting recommendation model feature engineering. It demonstrates the complete end-to-end pipeline: Kafka PIPE β OSS PIPE β External Function β Bronze β Silver β Gold β ZettaPark, covering four key platform capabilities: Vector Index (IVFPQ vector recall), Inverted Index (full-text search), Dynamic Tables (daily Gold layer refresh), and BloomFilter Index.
Overview
The core challenge of a content recommendation system is unifying user behavior signals and content semantic information into the same feature space. Singdata Lakehouse addresses the key data warehouse construction problems through the following combination:
Problem
Solution
User behavior events written in real-time at high frequency, requiring low-latency ingestion
Kafka PIPE continuous ingestion, no need to write consumers manually
CREATE TABLE IF NOT EXISTS best_practice_content_rec.bronze_interaction_events (
event_id STRING,
user_id STRING,
content_id STRING,
event_type STRING, -- watch / like / share
session_id STRING,
duration_sec INT, -- Watch duration (seconds), 0 for like/share
event_time TIMESTAMP,
platform STRING, -- pc / mobile / console
ingest_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);
Create BloomFilter Index
Silver layer queries will frequently filter by content_id, a high-cardinality column. BloomFilter Index is appropriate here.
-- Must execute in the best_practice_content_rec context
USE SCHEMA best_practice_content_rec;
CREATE BLOOMFILTER INDEX IF NOT EXISTS idx_bf_content_id
ON TABLE bronze_interaction_events (content_id);
β οΈ Note: CREATE BLOOMFILTER INDEX requires the same schema context as the target table. Cross-schema execution will produce an "index and table must in the same schema" error. Use USE SCHEMA to switch context or specify the -s parameter in cz-cli.
π‘ Tip: The examples below use cz-cli (the Singdata Lakehouse command-line tool). If cz-cli is not installed, refer to the cz-cli setup guide. Alternatively, you can execute SQL in Lakehouse Studio β Development β SQL Editor and configure scheduled tasks on the Studio β Tasks page.
Configure Kafka PIPE
User behavior events are collected by the client SDK and sent to a Kafka topic. PIPE continuously consumes and writes to the Bronze table.
First create the raw receiver table (PIPE writes JSON strings), then create the PIPE:
CREATE TABLE IF NOT EXISTS best_practice_content_rec.bronze_kafka_raw_events (
value STRING
);
CREATE PIPE IF NOT EXISTS best_practice_content_rec.pipe_user_events
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = '30'
AS
COPY INTO best_practice_content_rec.bronze_kafka_raw_events
FROM (
SELECT CAST(value AS STRING) AS value
FROM READ_KAFKA(
'<kafka-broker>:9092', -- replace with actual broker address
'user_behavior_events', -- topic name
'',
'cz_content_rec_consumer', -- consumer group ID
'','','','',
'raw', 'raw',
0,
map()
)
);
π‘ Tip: In the PIPE DDL, positional parameters 5β8 in READ_KAFKA (start/end offsets, timestamps) must be left empty and are managed automatically by the PIPE at runtime.
Option 1: Write via actual Kafka (recommended)
In production, the client SDK serializes behavior events as JSON and sends them to the Kafka topic, and PIPE automatically consumes and writes to bronze_kafka_raw_events. Here is an example using kafka-python to construct messages:
Option 2: INSERT simulation (without a Kafka environment)
If Kafka is not yet configured, use the following approaches to write to bronze_interaction_events, simulating the effect of parsed Kafka messages. The downstream Silver layer logic can still be verified.
Load from local CSV (recommended)
-- Step 1: Upload the local CSV file to User Volume via SQL PUT
PUT '/path/to/interaction_events.csv' TO USER VOLUME FILE 'interaction_events.csv';
-- Step 2: COPY INTO the table from User Volume
COPY INTO best_practice_content_rec.bronze_interaction_events
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('interaction_events.csv');
You can also insert small batches of test data inline (no CSV file needed):
INSERT INTO best_practice_content_rec.bronze_interaction_events
(event_id, user_id, content_id, event_type, session_id, duration_sec, event_time, platform)
VALUES
('EVT001','USR001','GAME_730', 'watch', 'SES001', 3600, CAST('2026-05-01 10:00:00' AS TIMESTAMP), 'pc'),
('EVT002','USR001','GAME_570', 'like', 'SES001', 0, CAST('2026-05-01 10:05:00' AS TIMESTAMP), 'pc'),
('EVT003','USR002','GAME_730', 'share', 'SES002', 0, CAST('2026-05-01 11:00:00' AS TIMESTAMP), 'mobile'),
('EVT004','USR002','GAME_292030', 'watch', 'SES002', 7200, CAST('2026-05-01 11:30:00' AS TIMESTAMP), 'mobile'),
('EVT005','USR003','GAME_1091500','watch', 'SES003', 1800, CAST('2026-05-01 12:00:00' AS TIMESTAMP), 'pc'),
('EVT006','USR003','GAME_730', 'like', 'SES003', 0, CAST('2026-05-01 12:10:00' AS TIMESTAMP), 'pc'),
('EVT007','USR004','GAME_570', 'watch', 'SES004', 5400, CAST('2026-05-01 13:00:00' AS TIMESTAMP), 'console'),
('EVT008','USR004','GAME_292030', 'like', 'SES004', 0, CAST('2026-05-01 13:20:00' AS TIMESTAMP), 'console'),
('EVT009','USR005','GAME_1091500','share', 'SES005', 0, CAST('2026-05-01 14:00:00' AS TIMESTAMP), 'mobile'),
('EVT010','USR005','GAME_730', 'watch', 'SES005', 2700, CAST('2026-05-01 14:15:00' AS TIMESTAMP), 'mobile'),
('EVT011','USR001','GAME_1091500','watch', 'SES006', 4320, CAST('2026-05-02 09:00:00' AS TIMESTAMP), 'pc'),
('EVT012','USR002','GAME_570', 'share', 'SES007', 0, CAST('2026-05-02 10:00:00' AS TIMESTAMP), 'pc'),
('EVT013','USR003','GAME_292030', 'watch', 'SES008', 6600, CAST('2026-05-02 11:00:00' AS TIMESTAMP), 'console'),
('EVT014','USR004','GAME_730', 'share', 'SES009', 0, CAST('2026-05-02 12:00:00' AS TIMESTAMP), 'mobile'),
('EVT015','USR005','GAME_570', 'like', 'SES010', 0, CAST('2026-05-02 13:00:00' AS TIMESTAMP), 'pc');
Verify Bronze layer row count:
SELECT COUNT(*) AS bronze_event_count
FROM best_practice_content_rec.bronze_interaction_events;
Content descriptions may be in various languages. Create an Inverted Index on the description column with the chinese analyzer:
USE SCHEMA best_practice_content_rec;
CREATE INVERTED INDEX IF NOT EXISTS idx_inv_description
ON TABLE bronze_content_metadata (description)
WITH PROPERTIES ('analyzer' = 'chinese');
Build the index on existing data (CREATE INDEX only takes effect on newly written data; existing data must be built manually):
USE SCHEMA best_practice_content_rec;
BUILD INDEX idx_inv_description ON bronze_content_metadata;
β οΈ Note: BUILD INDEX syntax does not support the ON TABLE keyword. The correct syntax is BUILD INDEX <index_name> ON <table_name>, and it must be executed in the same schema context.
Configure OSS PIPE (Bulk Import Content Metadata)
The content operations team periodically uploads new game information in CSV format to OSS. The OSS PIPE uses LIST_PURGE mode to automatically scan and import:
-- First create a Storage Connection (OSS access credentials)
CREATE STORAGE CONNECTION IF NOT EXISTS best_practice_content_rec.conn_content_oss
TYPE = OSS
ACCESS_ID = '<your-access-id>'
ACCESS_KEY = '<your-access-key>'
ENDPOINT = 'oss-cn-hangzhou.aliyuncs.com';
-- Create External Volume
CREATE EXTERNAL VOLUME IF NOT EXISTS best_practice_content_rec.vol_content_metadata
TYPE = OSS
BUCKET = '<your-bucket>'
PATH = 'content-rec/metadata/'
CONNECTION = conn_content_oss;
-- Create OSS PIPE (LIST_PURGE: delete source files after import to prevent re-import)
CREATE PIPE IF NOT EXISTS best_practice_content_rec.pipe_content_metadata
VIRTUAL_CLUSTER = 'DEFAULT'
INGEST_MODE = 'LIST_PURGE'
AS
COPY INTO best_practice_content_rec.bronze_content_metadata
FROM VOLUME vol_content_metadata
USING csv
OPTIONS('header'='true', 'sep'=',', 'quote'='"');
π‘ Tip: LIST_PURGE mode deletes source files from Volume after successful import, suitable for ETL scenarios. To retain original files for reruns, use LIST mode (no deletion).
Insert Sample Data
Using five classic games from Steam Dataset 2025 as examples:
INSERT INTO best_practice_content_rec.bronze_content_metadata
(content_id, title, description, tags, category, release_date,
language, developer, price, positive_pct)
VALUES
('GAME_730', 'Counter-Strike 2',
'Multiplayer competitive FPS game where players join terrorist or counter-terrorist teams',
'FPS,Shooter,Multiplayer,Competitive,Action',
'Action', CAST('2023-09-27' AS DATE), 'en', 'Valve', 0.0, 0.78),
('GAME_570', 'Dota 2',
'Team battle strategy game, two teams of 5 compete for victory',
'MOBA,Strategy,Multiplayer,Free to Play',
'Strategy', CAST('2013-07-09' AS DATE), 'en', 'Valve', 0.0, 0.84),
('GAME_292030', 'The Witcher 3: Wild Hunt',
'Open world role-playing game with rich storyline',
'RPG,Open World,Adventure,Story Rich,Fantasy',
'RPG', CAST('2015-05-18' AS DATE), 'en', 'CD Projekt Red', 39.99, 0.97),
('GAME_1091500','Cyberpunk 2077',
'Open world RPG set in a futuristic cyberpunk world',
'RPG,Open World,Cyberpunk,Action,Sci-fi',
'RPG', CAST('2020-12-10' AS DATE), 'en', 'CD Projekt Red', 59.99, 0.79),
('GAME_271590', 'Grand Theft Auto V',
'Open world action adventure game',
'Action,Open World,Multiplayer,Crime',
'Action', CAST('2015-04-14' AS DATE), 'en', 'Rockstar Games', 29.99, 0.88);
Validate full-text search:
SELECT content_id, title, description
FROM best_practice_content_rec.bronze_content_metadata
WHERE MATCH_ALL(description, 'Open world');
content_id | title | description
-------------+--------------------------+----------------------
GAME_292030 | The Witcher 3: Wild Hunt | Open world role-playing game with rich storyline
GAME_1091500 | Cyberpunk 2077 | Open world RPG set in a futuristic cyberpunk world
GAME_271590 | Grand Theft Auto V | Open world action adventure game
3 Open World games are correctly recalled.
Bronze Layer: Content Embedding Table (External Function + Vector Type)
Create Table
CREATE TABLE IF NOT EXISTS best_practice_content_rec.bronze_content_embedding (
content_id STRING,
title STRING,
description STRING,
tags STRING,
embedding VECTOR(128) -- 128-dimensional Embedding generated by External Function
);
External Function for Embedding Generation
In production, deploy an External Function to call a vectorization model (e.g., DashScope text-embedding-v3) to automatically generate Embeddings:
-- Assuming External Function text2vec has been deployed
-- Usage example (executed in operations scripts or ZettaPark Tasks)
INSERT INTO best_practice_content_rec.bronze_content_embedding
(content_id, title, description, tags, embedding)
SELECT
content_id,
title,
description,
tags,
CAST(best_practice_content_rec.text2vec(description) AS VECTOR(128))
FROM best_practice_content_rec.bronze_content_metadata;
In test environments where an External Function has not yet been deployed, you can use SQL to generate deterministic 128-dimensional test vectors. The vector_seed below is only for constructing executable sample data and does not represent real model Embeddings:
INSERT INTO best_practice_content_rec.bronze_content_embedding
(content_id, title, description, tags, embedding)
SELECT
content_id,
title,
description,
tags,
CAST(
CONCAT(
'[',
ARRAY_JOIN(
TRANSFORM(
SEQUENCE(1, 128),
x -> CAST(ROUND(((x * 37 + vector_seed) % 1000) / 1000.0, 4) AS STRING)
),
','
),
']'
) AS VECTOR(128)
) AS embedding
FROM (
SELECT 'GAME_730' AS content_id, 'Counter-Strike 2' AS title,
'Multiplayer competitive FPS game where players join terrorist or counter-terrorist teams' AS description,
'FPS,Shooter,Multiplayer,Competitive,Action' AS tags,
300 AS vector_seed
UNION ALL
SELECT 'GAME_570', 'Dota 2',
'Team battle strategy game, two teams of 5 compete for victory',
'MOBA,Strategy,Multiplayer,Free to Play',
420
UNION ALL
SELECT 'GAME_292030', 'The Witcher 3: Wild Hunt',
'Open world role-playing game with rich storyline',
'RPG,Open World,Adventure,Story Rich,Fantasy',
180
UNION ALL
SELECT 'GAME_1091500', 'Cyberpunk 2077',
'Open world RPG set in a futuristic cyberpunk world',
'RPG,Open World,Cyberpunk,Action,Sci-fi',
190
UNION ALL
SELECT 'GAME_271590', 'Grand Theft Auto V',
'Open world action adventure game',
'Action,Open World,Multiplayer,Crime',
260
) s;
Verify the vector dimensions of the inserted data:
SELECT content_id, size(embedding) AS vector_dim
FROM best_practice_content_rec.bronze_content_embedding
ORDER BY content_id;
To manually construct a single test record, you can also directly CAST a complete 128-dimensional array string to VECTOR(128). The following example can be executed directly:
INSERT INTO best_practice_content_rec.bronze_content_embedding
(content_id, title, description, tags, embedding)
VALUES
('GAME_730', 'Counter-Strike 2', 'Multiplayer competitive FPS game where players join terrorist or counter-terrorist teams',
'FPS,Shooter,Multiplayer,Competitive,Action',
CAST('[0.3370,0.3740,0.4110,0.4480,0.4850,0.5220,0.5590,0.5960,0.6330,0.6700,0.7070,0.7440,0.7810,0.8180,0.8550,0.8920,0.9290,0.9660,0.0030,0.0400,0.0770,0.1140,0.1510,0.1880,0.2250,0.2620,0.2990,0.3360,0.3730,0.4100,0.4470,0.4840,0.5210,0.5580,0.5950,0.6320,0.6690,0.7060,0.7430,0.7800,0.8170,0.8540,0.8910,0.9280,0.9650,0.0020,0.0390,0.0760,0.1130,0.1500,0.1870,0.2240,0.2610,0.2980,0.3350,0.3720,0.4090,0.4460,0.4830,0.5200,0.5570,0.5940,0.6310,0.6680,0.7050,0.7420,0.7790,0.8160,0.8530,0.8900,0.9270,0.9640,0.0010,0.0380,0.0750,0.1120,0.1490,0.1860,0.2230,0.2600,0.2970,0.3340,0.3710,0.4080,0.4450,0.4820,0.5190,0.5560,0.5930,0.6300,0.6670,0.7040,0.7410,0.7780,0.8150,0.8520,0.8890,0.9260,0.9630,0.0000,0.0370,0.0740,0.1110,0.1480,0.1850,0.2220,0.2590,0.2960,0.3330,0.3700,0.4070,0.4440,0.4810,0.5180,0.5550,0.5920,0.6290,0.6660,0.7030,0.7400,0.7770,0.8140,0.8510,0.8880,0.9250,0.9620,0.9990,0.0360]' AS VECTOR(128)));
π‘ Tip: The TO_VECTOR function is not available in the current version. To generate VECTOR column data, use CAST('<array_string>' AS VECTOR(N)) syntax. The VECTOR dimension N must match the dimension at table creation time. Mismatched dimensions will result in a NULL vector, and subsequent COSINE_DISTANCE calls will also return NULL.
Create Vector Index (IVFPQ)
IVFPQ (Inverted File + Product Quantization) is a commonly used ANN index type in recommendation systems. It significantly reduces search time complexity through quantization and partitioning:
USE SCHEMA best_practice_content_rec;
CREATE VECTOR INDEX IF NOT EXISTS idx_vec_content_embedding
ON TABLE bronze_content_embedding (embedding)
PROPERTIES(
'index_type' = 'IVFPQ',
'distance_function' = 'cosine',
'nlist' = '100', -- Number of cluster centers; increase for larger datasets
'M' = '32', -- Number of sub-quantizers; affects precision and speed
'm' = '4' -- Number of compressed bytes
);
Build the index on existing data:
USE SCHEMA best_practice_content_rec;
BUILD INDEX idx_vec_content_embedding ON bronze_content_embedding;
β οΈ Note: nlist is recommended to be between sqrt(N) and 4*sqrt(N) (where N is the total number of vectors). For hundreds of thousands of records in production, nlist=100 and M=32 are common starting points that can be tuned based on recall rate and latency metrics.
Vector Similarity Search
Using The Witcher 3's Embedding as the query vector to find the most similar content:
SELECT
e.content_id,
e.title,
ROUND(COSINE_DISTANCE(e.embedding, q.query_embedding), 6) AS cos_dist
FROM best_practice_content_rec.bronze_content_embedding e
CROSS JOIN (
SELECT embedding AS query_embedding
FROM best_practice_content_rec.bronze_content_embedding
WHERE content_id = 'GAME_292030'
) q
ORDER BY cos_dist ASC
LIMIT 4;
content_id | title | cos_dist
-------------+--------------------------+---------------------
GAME_292030 | The Witcher 3: Wild Hunt | 0
GAME_1091500 | Cyberpunk 2077 | 0.045093998312950134
GAME_271590 | Grand Theft Auto V | 0.09233599901199341
GAME_730 | Counter-Strike 2 | 0.16785000264644623
COSINE_DISTANCE of 0 means identical (the query vector itself). The test vectors are generated with fixed seeds only to verify that VECTOR writes, index building, and similarity SQL can be executed directly. In production, real Embedding model vectors should be used for results to have business interpretability.
Silver Layer Dynamic Table: Denoising, Cleansing, and Interaction Sequences
The Silver layer performs two tasks on top of Bronze behavior events:
LEFT JOIN bronze_content_metadata to enrich each event with content title, category, developer, and other dimensions
Calculate normalized interaction weights and filter invalid interactions (is_valid) β watch events with duration under 60 seconds are treated as invalid browsing noise
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_content_rec.silver_user_content_interactions
AS
SELECT
e.event_id,
e.user_id,
e.content_id,
e.event_type,
e.session_id,
e.duration_sec,
e.event_time,
e.platform,
m.title AS content_title,
m.tags AS content_tags,
m.category AS content_category,
m.developer AS developer,
-- Normalized interaction weight: signal strength from high to low
CASE
WHEN e.event_type = 'share' THEN 3.0
WHEN e.event_type = 'like' THEN 2.0
WHEN e.event_type = 'watch' AND e.duration_sec >= 60 THEN 1.0
ELSE 0.0
END AS interaction_weight,
-- Valid interaction flag (watch duration < 60s treated as noise)
CASE
WHEN e.event_type IN ('like','share') THEN 1
WHEN e.event_type = 'watch' AND e.duration_sec >= 60 THEN 1
ELSE 0
END AS is_valid
FROM best_practice_content_rec.bronze_interaction_events e
LEFT JOIN best_practice_content_rec.bronze_content_metadata m
ON e.content_id = m.content_id;
Likely accidental click or channel switching; remove noise
Manually trigger the first refresh:
REFRESH DYNAMIC TABLE best_practice_content_rec.silver_user_content_interactions;
SELECT COUNT(*) AS silver_count
FROM best_practice_content_rec.silver_user_content_interactions;
silver_count
------------
15
View sample user-content interaction sequences:
SELECT user_id, content_id, content_title, event_type, interaction_weight, is_valid
FROM best_practice_content_rec.silver_user_content_interactions
ORDER BY user_id, event_time
LIMIT 10;
The Gold layer aggregates content popularity metrics from the Silver layer at content_id + date granularity for use as content-side features in recommendation models.
This DT is partitioned by stat_date and declared as static partition mode:
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_content_rec.gold_content_popularity
PARTITIONED BY (stat_date)
TBLPROPERTIES ('static_partitions' = 'true')
AS
SELECT
content_id,
content_title,
content_category,
developer,
CAST(DATE_TRUNC('day', event_time) AS DATE) AS stat_date,
COUNT(*) AS total_interactions,
SUM(CASE WHEN event_type = 'watch' AND duration_sec >= 60 THEN 1 ELSE 0 END) AS watch_count,
SUM(CASE WHEN event_type = 'like' THEN 1 ELSE 0 END) AS like_count,
SUM(CASE WHEN event_type = 'share' THEN 1 ELSE 0 END) AS share_count,
SUM(interaction_weight) AS weighted_score,
COUNT(DISTINCT user_id) AS unique_users,
ROUND(AVG(CASE WHEN event_type = 'watch' THEN duration_sec ELSE NULL END), 2) AS avg_watch_sec
FROM best_practice_content_rec.silver_user_content_interactions
WHERE is_valid = 1
GROUP BY
content_id, content_title, content_category, developer,
CAST(DATE_TRUNC('day', event_time) AS DATE);
β οΈ Note: A Dynamic Table with partitions must explicitly declare TBLPROPERTIES ('static_partitions' = 'true') to use static partitioning mode. Without this declaration, the system defaults to dynamic partition inference, which may cause abnormal partition data overwrites during incremental refresh.
Manually trigger the first refresh and view results:
REFRESH DYNAMIC TABLE best_practice_content_rec.gold_content_popularity;
SELECT content_id, content_title, content_category, stat_date,
total_interactions, watch_count, like_count, share_count,
weighted_score, unique_users
FROM best_practice_content_rec.gold_content_popularity
ORDER BY weighted_score DESC;
Result interpretation: Counter-Strike 2 (GAME_730) had the highest weighted score on 2026-05-01 (7.0), because 4 different users interacted that day (watchΓ2, likeΓ1, shareΓ1), with the high share weight boosting the total. Cyberpunk 2077's weighted score on May 1 (4.0) surpassed Dota 2 on the same day (3.0), mainly due to one share event (weight 3.0).
Gold Layer Dynamic Table: User Interest Vectors
User interest vectors aggregate each user's weighted interaction scores across content categories, serving as user-side features for recommendation models:
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_content_rec.gold_user_interest_profile
AS
SELECT
user_id,
CAST(DATE_TRUNC('day', MAX(event_time)) AS DATE) AS profile_date,
COUNT(DISTINCT content_id) AS content_count,
SUM(interaction_weight) AS total_weight,
COLLECT_LIST(content_id) AS interacted_content_ids,
-- Accumulate weights by content category to form category interest vectors
SUM(CASE WHEN content_category = 'Action' THEN interaction_weight ELSE 0 END) AS action_score,
SUM(CASE WHEN content_category = 'Strategy' THEN interaction_weight ELSE 0 END) AS strategy_score,
SUM(CASE WHEN content_category = 'RPG' THEN interaction_weight ELSE 0 END) AS rpg_score
FROM best_practice_content_rec.silver_user_content_interactions
WHERE is_valid = 1
GROUP BY user_id;
REFRESH DYNAMIC TABLE best_practice_content_rec.gold_user_interest_profile;
SELECT user_id, content_count, total_weight,
action_score, strategy_score, rpg_score,
interacted_content_ids
FROM best_practice_content_rec.gold_user_interest_profile
ORDER BY total_weight DESC;
Result interpretation: USR002 has a balanced category distribution (action=3.0, strategy=3.0), indicating broad interests. USR005's rpg_score (3.0) is significantly higher than other categories β an RPG preference user. The recommendation system should prioritize RPG content for this user. The interacted_content_ids field can be directly used to construct training samples for item-to-item collaborative filtering.
Dynamic Table Refresh Scheduling (Lakehouse Studio Tasks)
Do not write REFRESH INTERVAL in the DDL. Create refresh tasks in Lakehouse Studio to uniformly manage scheduling, alerts, and data quality checks.
Studio Tasks already created (path: best_practices/content_rec/):
Create a SQL-type task refresh_gold_content_popularity under the best_practices/content_rec/ path in Studio
π‘ Tip: Monitoring alerts (e.g., refresh failure notification) and data quality rules (e.g., alert when Gold layer row count falls below threshold) can be added to the Studio Task for unified management at one task node. This is the core benefit of not writing REFRESH INTERVAL.
The Silver layer DT should also have a separate refresh task (same path, task name refresh_silver_interactions), or merge Silver and Gold refreshes into a single DAG executed in dependency order: trigger Gold refresh after Silver completes.
ZettaPark Python Task: Feature Engineering and Sample Export
ZettaPark Tasks run Python scripts that directly access the Gold layer to generate the feature matrices needed for recommendation model training.
Create a VIRTUAL type task feature_engineering_export under the best_practices/content_rec/ path in Studio. Script example:
π‘ Tip: ZettaPark Tasks are created as VIRTUAL type in Studio. Once the connection is configured, Lakehouse tables can be operated on directly. Feature export frequency typically matches the Gold layer refresh task frequency. Configure it as a downstream dependency task in Studio to ensure feature data is always based on the latest Gold layer results.
Data Warehouse Object Summary
After completing the full build, the objects in the best_practice_content_rec schema:
Kafka Topic (user_behavior_events)
| Kafka PIPE (pipe_user_events Β· BATCH_INTERVAL=30s)
v
bronze_interaction_events bronze_content_metadata
BloomFilter Index (content_id) Inverted Index (description, chinese)
| |
+------------+------------------+
v
silver_user_content_interactions (Dynamic Table)
interaction_weight Β· is_valid denoising
|
+------------+------------------+
v v
gold_content_popularity (DT) gold_user_interest_profile (DT)
PARTITIONED BY (stat_date) category-level interest vectors
static_partitions=true
|
v
ZettaPark Python Task
feature_engineering_export
-> user x content feature matrix -> Volume
OSS Volume (vol_content_metadata)
| OSS PIPE (pipe_content_metadata Β· LIST_PURGE)
v
bronze_content_metadata
| External Function (text2vec)
v
bronze_content_embedding (VECTOR(128))
Vector Index IVFPQ (idx_vec_content_embedding)
-> COSINE_DISTANCE ANN recall
Studio Task scheduling path: best_practices/content_rec/
refresh_gold_content_popularity (daily 02:00, refreshes two Gold DTs)
feature_engineering_export (triggered after Gold refresh completes)
Notes
BloomFilter Index does not support BUILD INDEX: BloomFilter type indexes only take effect for data written after creation and do not support BUILD INDEX on existing data (unlike Vector / Inverted Indexes). To cover existing large volumes of historical data, the table must be rebuilt and data re-inserted.
Vector Index and Inverted Index must be explicitly built: CREATE INDEX only processes newly added data going forward. For existing data, manually execute BUILD INDEX <index_name> ON <table_name> (in the same schema context), otherwise vector retrieval and full-text search results will not include data from before index creation.
Partitioned Dynamic Tables must declare static_partitions: A DT with PARTITIONED BY must set TBLPROPERTIES ('static_partitions' = 'true'). Without this declaration, the system uses dynamic partition inference, which may cause old partition data to be overwritten or lost during incremental refresh.
Dynamic Tables do not write REFRESH INTERVAL: Use Lakehouse Studio Tasks (path best_practices/content_rec/) to uniformly manage refresh scheduling. Alert rules and data quality checks can be attached to the same task node. Do not write the REFRESH INTERVAL parameter in CREATE DYNAMIC TABLE DDLs.
COSINE_DISTANCE: lower means more similar: Unlike cosine similarity (-1 to 1), COSINE_DISTANCE returns values in the range 0 to 2, where 0 means completely identical. Use ORDER BY cos_dist ASC when querying, and take the TOP-K results as approximate nearest neighbors.
Kafka PIPE DDL validates broker connection: When executing CREATE PIPE, the system will attempt to connect to the Kafka broker to verify that the topic exists. In development scenarios without a Kafka environment, create the target table first, use INSERT to simulate data, verify downstream Silver/Gold logic, and create the PIPE once the Kafka environment is ready.
OSS PIPE LIST_PURGE mode is irreversible: Source files are deleted from Volume after successful import. If the business requires retaining files (e.g., for rerun scenarios), use LIST mode and add deduplication logic to the Bronze layer (e.g., DISTINCT by content_id + load_time).