Building a Content Platform Recommendation System Data Warehouse

Using the Steam game platform dataset (Steam Dataset 2025, approximately 240,000 games and 3.9 million reviews), this guide combines user interaction behaviors (views, likes, shares) with content metadata to build a three-layer data warehouse supporting recommendation model feature engineering. It demonstrates the complete end-to-end pipeline: Kafka PIPE β†’ OSS PIPE β†’ External Function β†’ Bronze β†’ Silver β†’ Gold β†’ ZettaPark, covering four key platform capabilities: Vector Index (IVFPQ vector recall), Inverted Index (full-text search), Dynamic Tables (daily Gold layer refresh), and BloomFilter Index.


Overview

The core challenge of a content recommendation system is unifying user behavior signals and content semantic information into the same feature space. Singdata Lakehouse addresses the key data warehouse construction problems through the following combination:

ProblemSolution
User behavior events written in real-time at high frequency, requiring low-latency ingestionKafka PIPE continuous ingestion, no need to write consumers manually
Content metadata (titles, tags, descriptions) bulk importOSS PIPE scan import, automatically triggered when files land
Content text needs Embedding generation for similarity recallExternal Function calls vectorization model, results stored in VECTOR column
Approximate nearest neighbor (ANN) retrieval across large-scale content vectorsVector Index IVFPQ, sub-linear time complexity for vector recall
Full-text search of content titles and descriptionsInverted Index + Chinese Analyzer, MATCH_ALL function
Bronze β†’ Silver β†’ Gold automatic incremental computationDynamic Table with declarative SQL; system automatically schedules dependency chain
Feature engineering scripts need access to Gold layer dataZettaPark Python Task, directly operates on Lakehouse tables

SQL Commands Used

Command / FunctionPurposeNotes
CREATE TABLECreate Bronze layer raw tables and Embedding storage tableVECTOR(N) type column
CREATE BLOOMFILTER INDEXCreate BloomFilter index on content_id columnAccelerates point lookups on high-cardinality columns
CREATE INVERTED INDEXCreate Inverted Index on description columnanalyzer='chinese'
BUILD INDEXBuild index on existing dataRequired for both Vector and Inverted indexes
CREATE VECTOR INDEXIVFPQ vector index to accelerate ANN retrievalPROPERTIES('index_type'='IVFPQ')
COSINE_DISTANCECalculate cosine distance between two vectorsUsed for similarity ranking; lower is more similar
MATCH_ALLFull-text search, returns booleanRequires creating and BUILDing an Inverted Index first
CREATE PIPECreate Kafka or OSS continuous ingestion pipelineBound to Bronze layer target table
CREATE DYNAMIC TABLECreate Silver / Gold layer incremental computation tablesNo REFRESH INTERVAL; scheduled via Lakehouse Studio Tasks
REFRESH DYNAMIC TABLEManually trigger one refreshUsed during initial build or debugging
PARTITIONED BYDate-partitioned Gold layer DTUse with TBLPROPERTIES('static_partitions'='true')

Prerequisites

All examples in this guide run under the best_practice_content_rec schema.

CREATE SCHEMA IF NOT EXISTS best_practice_content_rec;


Bronze Layer: Behavior Event Table (Kafka PIPE Target)

Create Table

CREATE TABLE IF NOT EXISTS best_practice_content_rec.bronze_interaction_events ( event_id STRING, user_id STRING, content_id STRING, event_type STRING, -- watch / like / share session_id STRING, duration_sec INT, -- Watch duration (seconds), 0 for like/share event_time TIMESTAMP, platform STRING, -- pc / mobile / console ingest_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

Create BloomFilter Index

Silver layer queries will frequently filter by content_id, a high-cardinality column. BloomFilter Index is appropriate here.

-- Must execute in the best_practice_content_rec context USE SCHEMA best_practice_content_rec; CREATE BLOOMFILTER INDEX IF NOT EXISTS idx_bf_content_id ON TABLE bronze_interaction_events (content_id);

Configure Kafka PIPE

User behavior events are collected by the client SDK and sent to a Kafka topic. PIPE continuously consumes and writes to the Bronze table.

First create the raw receiver table (PIPE writes JSON strings), then create the PIPE:

CREATE TABLE IF NOT EXISTS best_practice_content_rec.bronze_kafka_raw_events ( value STRING ); CREATE PIPE IF NOT EXISTS best_practice_content_rec.pipe_user_events VIRTUAL_CLUSTER = 'DEFAULT' BATCH_INTERVAL_IN_SECONDS = '30' AS COPY INTO best_practice_content_rec.bronze_kafka_raw_events FROM ( SELECT CAST(value AS STRING) AS value FROM READ_KAFKA( '<kafka-broker>:9092', -- replace with actual broker address 'user_behavior_events', -- topic name '', 'cz_content_rec_consumer', -- consumer group ID '','','','', 'raw', 'raw', 0, map() ) );

Option 1: Write via actual Kafka (recommended)

In production, the client SDK serializes behavior events as JSON and sends them to the Kafka topic, and PIPE automatically consumes and writes to bronze_kafka_raw_events. Here is an example using kafka-python to construct messages:

from kafka import KafkaProducer import json import time producer = KafkaProducer( bootstrap_servers=['<kafka-broker>:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) event = { "event_id": "EVT001", "user_id": "USR001", "content_id": "GAME_730", "event_type": "watch", "session_id": "SES001", "duration_sec": 3600, "event_time": "2026-05-01 10:00:00", "platform": "pc" } producer.send('user_behavior_events', value=event) producer.flush()

Option 2: INSERT simulation (without a Kafka environment)

If Kafka is not yet configured, use the following approaches to write to bronze_interaction_events, simulating the effect of parsed Kafka messages. The downstream Silver layer logic can still be verified.

Load from local CSV (recommended)

-- Step 1: Upload the local CSV file to User Volume via SQL PUT PUT '/path/to/interaction_events.csv' TO USER VOLUME FILE 'interaction_events.csv';

-- Step 2: COPY INTO the table from User Volume COPY INTO best_practice_content_rec.bronze_interaction_events FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('interaction_events.csv');

You can also insert small batches of test data inline (no CSV file needed):

INSERT INTO best_practice_content_rec.bronze_interaction_events (event_id, user_id, content_id, event_type, session_id, duration_sec, event_time, platform) VALUES ('EVT001','USR001','GAME_730', 'watch', 'SES001', 3600, CAST('2026-05-01 10:00:00' AS TIMESTAMP), 'pc'), ('EVT002','USR001','GAME_570', 'like', 'SES001', 0, CAST('2026-05-01 10:05:00' AS TIMESTAMP), 'pc'), ('EVT003','USR002','GAME_730', 'share', 'SES002', 0, CAST('2026-05-01 11:00:00' AS TIMESTAMP), 'mobile'), ('EVT004','USR002','GAME_292030', 'watch', 'SES002', 7200, CAST('2026-05-01 11:30:00' AS TIMESTAMP), 'mobile'), ('EVT005','USR003','GAME_1091500','watch', 'SES003', 1800, CAST('2026-05-01 12:00:00' AS TIMESTAMP), 'pc'), ('EVT006','USR003','GAME_730', 'like', 'SES003', 0, CAST('2026-05-01 12:10:00' AS TIMESTAMP), 'pc'), ('EVT007','USR004','GAME_570', 'watch', 'SES004', 5400, CAST('2026-05-01 13:00:00' AS TIMESTAMP), 'console'), ('EVT008','USR004','GAME_292030', 'like', 'SES004', 0, CAST('2026-05-01 13:20:00' AS TIMESTAMP), 'console'), ('EVT009','USR005','GAME_1091500','share', 'SES005', 0, CAST('2026-05-01 14:00:00' AS TIMESTAMP), 'mobile'), ('EVT010','USR005','GAME_730', 'watch', 'SES005', 2700, CAST('2026-05-01 14:15:00' AS TIMESTAMP), 'mobile'), ('EVT011','USR001','GAME_1091500','watch', 'SES006', 4320, CAST('2026-05-02 09:00:00' AS TIMESTAMP), 'pc'), ('EVT012','USR002','GAME_570', 'share', 'SES007', 0, CAST('2026-05-02 10:00:00' AS TIMESTAMP), 'pc'), ('EVT013','USR003','GAME_292030', 'watch', 'SES008', 6600, CAST('2026-05-02 11:00:00' AS TIMESTAMP), 'console'), ('EVT014','USR004','GAME_730', 'share', 'SES009', 0, CAST('2026-05-02 12:00:00' AS TIMESTAMP), 'mobile'), ('EVT015','USR005','GAME_570', 'like', 'SES010', 0, CAST('2026-05-02 13:00:00' AS TIMESTAMP), 'pc');

Verify Bronze layer row count:

SELECT COUNT(*) AS bronze_event_count FROM best_practice_content_rec.bronze_interaction_events;

bronze_event_count ------------------ 15


Bronze Layer: Content Metadata Table (OSS PIPE Import)

Create Table

CREATE TABLE IF NOT EXISTS best_practice_content_rec.bronze_content_metadata ( content_id STRING, title STRING, description STRING, -- Content description, supports full-text search tags STRING, -- Comma-separated tags category STRING, release_date DATE, language STRING, developer STRING, price DOUBLE, positive_pct DOUBLE, -- Positive review rate (0-1) load_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

Create Inverted Index (Full-Text Search)

Content descriptions may be in various languages. Create an Inverted Index on the description column with the chinese analyzer:

USE SCHEMA best_practice_content_rec; CREATE INVERTED INDEX IF NOT EXISTS idx_inv_description ON TABLE bronze_content_metadata (description) WITH PROPERTIES ('analyzer' = 'chinese');

Build the index on existing data (CREATE INDEX only takes effect on newly written data; existing data must be built manually):

USE SCHEMA best_practice_content_rec; BUILD INDEX idx_inv_description ON bronze_content_metadata;

Configure OSS PIPE (Bulk Import Content Metadata)

The content operations team periodically uploads new game information in CSV format to OSS. The OSS PIPE uses LIST_PURGE mode to automatically scan and import:

-- First create a Storage Connection (OSS access credentials) CREATE STORAGE CONNECTION IF NOT EXISTS best_practice_content_rec.conn_content_oss TYPE = OSS ACCESS_ID = '<your-access-id>' ACCESS_KEY = '<your-access-key>' ENDPOINT = 'oss-cn-hangzhou.aliyuncs.com'; -- Create External Volume CREATE EXTERNAL VOLUME IF NOT EXISTS best_practice_content_rec.vol_content_metadata TYPE = OSS BUCKET = '<your-bucket>' PATH = 'content-rec/metadata/' CONNECTION = conn_content_oss; -- Create OSS PIPE (LIST_PURGE: delete source files after import to prevent re-import) CREATE PIPE IF NOT EXISTS best_practice_content_rec.pipe_content_metadata VIRTUAL_CLUSTER = 'DEFAULT' INGEST_MODE = 'LIST_PURGE' AS COPY INTO best_practice_content_rec.bronze_content_metadata FROM VOLUME vol_content_metadata USING csv OPTIONS('header'='true', 'sep'=',', 'quote'='"');

Insert Sample Data

Using five classic games from Steam Dataset 2025 as examples:

INSERT INTO best_practice_content_rec.bronze_content_metadata (content_id, title, description, tags, category, release_date, language, developer, price, positive_pct) VALUES ('GAME_730', 'Counter-Strike 2', 'Multiplayer competitive FPS game where players join terrorist or counter-terrorist teams', 'FPS,Shooter,Multiplayer,Competitive,Action', 'Action', CAST('2023-09-27' AS DATE), 'en', 'Valve', 0.0, 0.78), ('GAME_570', 'Dota 2', 'Team battle strategy game, two teams of 5 compete for victory', 'MOBA,Strategy,Multiplayer,Free to Play', 'Strategy', CAST('2013-07-09' AS DATE), 'en', 'Valve', 0.0, 0.84), ('GAME_292030', 'The Witcher 3: Wild Hunt', 'Open world role-playing game with rich storyline', 'RPG,Open World,Adventure,Story Rich,Fantasy', 'RPG', CAST('2015-05-18' AS DATE), 'en', 'CD Projekt Red', 39.99, 0.97), ('GAME_1091500','Cyberpunk 2077', 'Open world RPG set in a futuristic cyberpunk world', 'RPG,Open World,Cyberpunk,Action,Sci-fi', 'RPG', CAST('2020-12-10' AS DATE), 'en', 'CD Projekt Red', 59.99, 0.79), ('GAME_271590', 'Grand Theft Auto V', 'Open world action adventure game', 'Action,Open World,Multiplayer,Crime', 'Action', CAST('2015-04-14' AS DATE), 'en', 'Rockstar Games', 29.99, 0.88);

Validate full-text search:

SELECT content_id, title, description FROM best_practice_content_rec.bronze_content_metadata WHERE MATCH_ALL(description, 'Open world');

content_id | title | description -------------+--------------------------+---------------------- GAME_292030 | The Witcher 3: Wild Hunt | Open world role-playing game with rich storyline GAME_1091500 | Cyberpunk 2077 | Open world RPG set in a futuristic cyberpunk world GAME_271590 | Grand Theft Auto V | Open world action adventure game

3 Open World games are correctly recalled.


Bronze Layer: Content Embedding Table (External Function + Vector Type)

Create Table

CREATE TABLE IF NOT EXISTS best_practice_content_rec.bronze_content_embedding ( content_id STRING, title STRING, description STRING, tags STRING, embedding VECTOR(128) -- 128-dimensional Embedding generated by External Function );

External Function for Embedding Generation

In production, deploy an External Function to call a vectorization model (e.g., DashScope text-embedding-v3) to automatically generate Embeddings:

-- Assuming External Function text2vec has been deployed -- Usage example (executed in operations scripts or ZettaPark Tasks) INSERT INTO best_practice_content_rec.bronze_content_embedding (content_id, title, description, tags, embedding) SELECT content_id, title, description, tags, CAST(best_practice_content_rec.text2vec(description) AS VECTOR(128)) FROM best_practice_content_rec.bronze_content_metadata;

In test environments where an External Function has not yet been deployed, you can use SQL to generate deterministic 128-dimensional test vectors. The vector_seed below is only for constructing executable sample data and does not represent real model Embeddings:

INSERT INTO best_practice_content_rec.bronze_content_embedding (content_id, title, description, tags, embedding) SELECT content_id, title, description, tags, CAST( CONCAT( '[', ARRAY_JOIN( TRANSFORM( SEQUENCE(1, 128), x -> CAST(ROUND(((x * 37 + vector_seed) % 1000) / 1000.0, 4) AS STRING) ), ',' ), ']' ) AS VECTOR(128) ) AS embedding FROM ( SELECT 'GAME_730' AS content_id, 'Counter-Strike 2' AS title, 'Multiplayer competitive FPS game where players join terrorist or counter-terrorist teams' AS description, 'FPS,Shooter,Multiplayer,Competitive,Action' AS tags, 300 AS vector_seed UNION ALL SELECT 'GAME_570', 'Dota 2', 'Team battle strategy game, two teams of 5 compete for victory', 'MOBA,Strategy,Multiplayer,Free to Play', 420 UNION ALL SELECT 'GAME_292030', 'The Witcher 3: Wild Hunt', 'Open world role-playing game with rich storyline', 'RPG,Open World,Adventure,Story Rich,Fantasy', 180 UNION ALL SELECT 'GAME_1091500', 'Cyberpunk 2077', 'Open world RPG set in a futuristic cyberpunk world', 'RPG,Open World,Cyberpunk,Action,Sci-fi', 190 UNION ALL SELECT 'GAME_271590', 'Grand Theft Auto V', 'Open world action adventure game', 'Action,Open World,Multiplayer,Crime', 260 ) s;

Verify the vector dimensions of the inserted data:

SELECT content_id, size(embedding) AS vector_dim FROM best_practice_content_rec.bronze_content_embedding ORDER BY content_id;

content_id | vector_dim -------------+----------- GAME_1091500 | 128 GAME_271590 | 128 GAME_292030 | 128 GAME_570 | 128 GAME_730 | 128

To manually construct a single test record, you can also directly CAST a complete 128-dimensional array string to VECTOR(128). The following example can be executed directly:

INSERT INTO best_practice_content_rec.bronze_content_embedding (content_id, title, description, tags, embedding) VALUES ('GAME_730', 'Counter-Strike 2', 'Multiplayer competitive FPS game where players join terrorist or counter-terrorist teams', 'FPS,Shooter,Multiplayer,Competitive,Action', CAST('[0.3370,0.3740,0.4110,0.4480,0.4850,0.5220,0.5590,0.5960,0.6330,0.6700,0.7070,0.7440,0.7810,0.8180,0.8550,0.8920,0.9290,0.9660,0.0030,0.0400,0.0770,0.1140,0.1510,0.1880,0.2250,0.2620,0.2990,0.3360,0.3730,0.4100,0.4470,0.4840,0.5210,0.5580,0.5950,0.6320,0.6690,0.7060,0.7430,0.7800,0.8170,0.8540,0.8910,0.9280,0.9650,0.0020,0.0390,0.0760,0.1130,0.1500,0.1870,0.2240,0.2610,0.2980,0.3350,0.3720,0.4090,0.4460,0.4830,0.5200,0.5570,0.5940,0.6310,0.6680,0.7050,0.7420,0.7790,0.8160,0.8530,0.8900,0.9270,0.9640,0.0010,0.0380,0.0750,0.1120,0.1490,0.1860,0.2230,0.2600,0.2970,0.3340,0.3710,0.4080,0.4450,0.4820,0.5190,0.5560,0.5930,0.6300,0.6670,0.7040,0.7410,0.7780,0.8150,0.8520,0.8890,0.9260,0.9630,0.0000,0.0370,0.0740,0.1110,0.1480,0.1850,0.2220,0.2590,0.2960,0.3330,0.3700,0.4070,0.4440,0.4810,0.5180,0.5550,0.5920,0.6290,0.6660,0.7030,0.7400,0.7770,0.8140,0.8510,0.8880,0.9250,0.9620,0.9990,0.0360]' AS VECTOR(128)));

Create Vector Index (IVFPQ)

IVFPQ (Inverted File + Product Quantization) is a commonly used ANN index type in recommendation systems. It significantly reduces search time complexity through quantization and partitioning:

USE SCHEMA best_practice_content_rec; CREATE VECTOR INDEX IF NOT EXISTS idx_vec_content_embedding ON TABLE bronze_content_embedding (embedding) PROPERTIES( 'index_type' = 'IVFPQ', 'distance_function' = 'cosine', 'nlist' = '100', -- Number of cluster centers; increase for larger datasets 'M' = '32', -- Number of sub-quantizers; affects precision and speed 'm' = '4' -- Number of compressed bytes );

Build the index on existing data:

USE SCHEMA best_practice_content_rec; BUILD INDEX idx_vec_content_embedding ON bronze_content_embedding;

Using The Witcher 3's Embedding as the query vector to find the most similar content:

SELECT e.content_id, e.title, ROUND(COSINE_DISTANCE(e.embedding, q.query_embedding), 6) AS cos_dist FROM best_practice_content_rec.bronze_content_embedding e CROSS JOIN ( SELECT embedding AS query_embedding FROM best_practice_content_rec.bronze_content_embedding WHERE content_id = 'GAME_292030' ) q ORDER BY cos_dist ASC LIMIT 4;

content_id | title | cos_dist -------------+--------------------------+--------------------- GAME_292030 | The Witcher 3: Wild Hunt | 0 GAME_1091500 | Cyberpunk 2077 | 0.045093998312950134 GAME_271590 | Grand Theft Auto V | 0.09233599901199341 GAME_730 | Counter-Strike 2 | 0.16785000264644623

COSINE_DISTANCE of 0 means identical (the query vector itself). The test vectors are generated with fixed seeds only to verify that VECTOR writes, index building, and similarity SQL can be executed directly. In production, real Embedding model vectors should be used for results to have business interpretability.


Silver Layer Dynamic Table: Denoising, Cleansing, and Interaction Sequences

The Silver layer performs two tasks on top of Bronze behavior events:

  1. LEFT JOIN bronze_content_metadata to enrich each event with content title, category, developer, and other dimensions
  2. Calculate normalized interaction weights and filter invalid interactions (is_valid) β€” watch events with duration under 60 seconds are treated as invalid browsing noise

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_content_rec.silver_user_content_interactions AS SELECT e.event_id, e.user_id, e.content_id, e.event_type, e.session_id, e.duration_sec, e.event_time, e.platform, m.title AS content_title, m.tags AS content_tags, m.category AS content_category, m.developer AS developer, -- Normalized interaction weight: signal strength from high to low CASE WHEN e.event_type = 'share' THEN 3.0 WHEN e.event_type = 'like' THEN 2.0 WHEN e.event_type = 'watch' AND e.duration_sec >= 60 THEN 1.0 ELSE 0.0 END AS interaction_weight, -- Valid interaction flag (watch duration < 60s treated as noise) CASE WHEN e.event_type IN ('like','share') THEN 1 WHEN e.event_type = 'watch' AND e.duration_sec >= 60 THEN 1 ELSE 0 END AS is_valid FROM best_practice_content_rec.bronze_interaction_events e LEFT JOIN best_practice_content_rec.bronze_content_metadata m ON e.content_id = m.content_id;

Interaction weight design rationale:

Event TypeWeightRationale
share3.0Strong intent signal, active sharing
like2.0Explicit positive feedback
watch (β‰₯60s)1.0Implicit preference; duration threshold filters accidental triggers
watch (<60s)0.0 (invalid)Likely accidental click or channel switching; remove noise

Manually trigger the first refresh:

REFRESH DYNAMIC TABLE best_practice_content_rec.silver_user_content_interactions; SELECT COUNT(*) AS silver_count FROM best_practice_content_rec.silver_user_content_interactions;

silver_count ------------ 15

View sample user-content interaction sequences:

SELECT user_id, content_id, content_title, event_type, interaction_weight, is_valid FROM best_practice_content_rec.silver_user_content_interactions ORDER BY user_id, event_time LIMIT 10;

user_id | content_id | content_title | event_type | interaction_weight | is_valid --------+-------------+--------------------------+------------+--------------------+--------- USR001 | GAME_730 | Counter-Strike 2 | watch | 1.0 | 1 USR001 | GAME_570 | Dota 2 | like | 2.0 | 1 USR001 | GAME_1091500| Cyberpunk 2077 | watch | 1.0 | 1 USR002 | GAME_730 | Counter-Strike 2 | share | 3.0 | 1 USR002 | GAME_292030 | The Witcher 3: Wild Hunt | watch | 1.0 | 1 USR002 | GAME_570 | Dota 2 | share | 3.0 | 1 USR003 | GAME_1091500| Cyberpunk 2077 | watch | 1.0 | 1 USR003 | GAME_730 | Counter-Strike 2 | like | 2.0 | 1 USR003 | GAME_292030 | The Witcher 3: Wild Hunt | watch | 1.0 | 1 USR004 | GAME_570 | Dota 2 | watch | 1.0 | 1

All 15 sample records are valid interactions (is_valid=1) because the simulated INSERT data has watch durations all β‰₯ 60 seconds.


Gold Layer Dynamic Table: Content Popularity Metrics

The Gold layer aggregates content popularity metrics from the Silver layer at content_id + date granularity for use as content-side features in recommendation models.

This DT is partitioned by stat_date and declared as static partition mode:

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_content_rec.gold_content_popularity PARTITIONED BY (stat_date) TBLPROPERTIES ('static_partitions' = 'true') AS SELECT content_id, content_title, content_category, developer, CAST(DATE_TRUNC('day', event_time) AS DATE) AS stat_date, COUNT(*) AS total_interactions, SUM(CASE WHEN event_type = 'watch' AND duration_sec >= 60 THEN 1 ELSE 0 END) AS watch_count, SUM(CASE WHEN event_type = 'like' THEN 1 ELSE 0 END) AS like_count, SUM(CASE WHEN event_type = 'share' THEN 1 ELSE 0 END) AS share_count, SUM(interaction_weight) AS weighted_score, COUNT(DISTINCT user_id) AS unique_users, ROUND(AVG(CASE WHEN event_type = 'watch' THEN duration_sec ELSE NULL END), 2) AS avg_watch_sec FROM best_practice_content_rec.silver_user_content_interactions WHERE is_valid = 1 GROUP BY content_id, content_title, content_category, developer, CAST(DATE_TRUNC('day', event_time) AS DATE);

Manually trigger the first refresh and view results:

REFRESH DYNAMIC TABLE best_practice_content_rec.gold_content_popularity; SELECT content_id, content_title, content_category, stat_date, total_interactions, watch_count, like_count, share_count, weighted_score, unique_users FROM best_practice_content_rec.gold_content_popularity ORDER BY weighted_score DESC;

content_id | content_title | content_category | stat_date | total | watch | like | share | weighted_score | unique_users -------------+--------------------------+------------------+------------+-------+-------+------+-------+----------------+------------- GAME_730 | Counter-Strike 2 | Action | 2026-05-01 | 4 | 2 | 1 | 1 | 7.0 | 4 GAME_570 | Dota 2 | Strategy | 2026-05-02 | 2 | 0 | 1 | 1 | 5.0 | 2 GAME_1091500 | Cyberpunk 2077 | RPG | 2026-05-01 | 2 | 1 | 0 | 1 | 4.0 | 2 GAME_730 | Counter-Strike 2 | Action | 2026-05-02 | 1 | 0 | 0 | 1 | 3.0 | 1 GAME_570 | Dota 2 | Strategy | 2026-05-01 | 2 | 1 | 1 | 0 | 3.0 | 2 GAME_292030 | The Witcher 3: Wild Hunt | RPG | 2026-05-01 | 2 | 1 | 1 | 0 | 3.0 | 2 GAME_1091500 | Cyberpunk 2077 | RPG | 2026-05-02 | 1 | 1 | 0 | 0 | 1.0 | 1 GAME_292030 | The Witcher 3: Wild Hunt | RPG | 2026-05-02 | 1 | 1 | 0 | 0 | 1.0 | 1

Result interpretation: Counter-Strike 2 (GAME_730) had the highest weighted score on 2026-05-01 (7.0), because 4 different users interacted that day (watchΓ—2, likeΓ—1, shareΓ—1), with the high share weight boosting the total. Cyberpunk 2077's weighted score on May 1 (4.0) surpassed Dota 2 on the same day (3.0), mainly due to one share event (weight 3.0).


Gold Layer Dynamic Table: User Interest Vectors

User interest vectors aggregate each user's weighted interaction scores across content categories, serving as user-side features for recommendation models:

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_content_rec.gold_user_interest_profile AS SELECT user_id, CAST(DATE_TRUNC('day', MAX(event_time)) AS DATE) AS profile_date, COUNT(DISTINCT content_id) AS content_count, SUM(interaction_weight) AS total_weight, COLLECT_LIST(content_id) AS interacted_content_ids, -- Accumulate weights by content category to form category interest vectors SUM(CASE WHEN content_category = 'Action' THEN interaction_weight ELSE 0 END) AS action_score, SUM(CASE WHEN content_category = 'Strategy' THEN interaction_weight ELSE 0 END) AS strategy_score, SUM(CASE WHEN content_category = 'RPG' THEN interaction_weight ELSE 0 END) AS rpg_score FROM best_practice_content_rec.silver_user_content_interactions WHERE is_valid = 1 GROUP BY user_id;

REFRESH DYNAMIC TABLE best_practice_content_rec.gold_user_interest_profile; SELECT user_id, content_count, total_weight, action_score, strategy_score, rpg_score, interacted_content_ids FROM best_practice_content_rec.gold_user_interest_profile ORDER BY total_weight DESC;

user_id | content_count | total_weight | action_score | strategy_score | rpg_score | interacted_content_ids --------+---------------+--------------+--------------+----------------+-----------+----------------------- USR002 | 3 | 7.0 | 3.0 | 3.0 | 1.0 | [GAME_730, GAME_292030, GAME_570] USR004 | 3 | 6.0 | 3.0 | 1.0 | 2.0 | [GAME_570, GAME_292030, GAME_730] USR005 | 3 | 6.0 | 1.0 | 2.0 | 3.0 | [GAME_1091500, GAME_730, GAME_570] USR001 | 3 | 4.0 | 1.0 | 2.0 | 1.0 | [GAME_730, GAME_570, GAME_1091500] USR003 | 3 | 4.0 | 2.0 | 0.0 | 2.0 | [GAME_1091500, GAME_730, GAME_292030]

Result interpretation: USR002 has a balanced category distribution (action=3.0, strategy=3.0), indicating broad interests. USR005's rpg_score (3.0) is significantly higher than other categories β€” an RPG preference user. The recommendation system should prioritize RPG content for this user. The interacted_content_ids field can be directly used to construct training samples for item-to-item collaborative filtering.


Dynamic Table Refresh Scheduling (Lakehouse Studio Tasks)

Do not write REFRESH INTERVAL in the DDL. Create refresh tasks in Lakehouse Studio to uniformly manage scheduling, alerts, and data quality checks.

Studio Tasks already created (path: best_practices/content_rec/):

  1. Create a SQL-type task refresh_gold_content_popularity under the best_practices/content_rec/ path in Studio
  2. Task content:

REFRESH DYNAMIC TABLE best_practice_content_rec.gold_content_popularity; REFRESH DYNAMIC TABLE best_practice_content_rec.gold_user_interest_profile;

  1. Configure schedule (run daily at 02:00):

cz-cli task save-cron refresh_gold_content_popularity -p skill_test --cron "0 2 * * *"

  1. Publish task (starts scheduled execution after going live):

cz-cli task deploy refresh_gold_content_popularity -p skill_test

The Silver layer DT should also have a separate refresh task (same path, task name refresh_silver_interactions), or merge Silver and Gold refreshes into a single DAG executed in dependency order: trigger Gold refresh after Silver completes.


ZettaPark Python Task: Feature Engineering and Sample Export

ZettaPark Tasks run Python scripts that directly access the Gold layer to generate the feature matrices needed for recommendation model training.

Create a VIRTUAL type task feature_engineering_export under the best_practices/content_rec/ path in Studio. Script example:

from clickzetta_zettapark.session import Session session = Session.builder.configs({ "instance": "<instance>", "workspace": "<workspace>", "schema": "best_practice_content_rec", "vcluster": "DEFAULT", "username": "<username>", "password": "<password>", }).create() # Read Gold layer user interest vectors user_profile_df = session.table("gold_user_interest_profile") # Read Gold layer content popularity content_pop_df = session.table("gold_content_popularity") # Cross JOIN to build user-content feature pairs feature_df = user_profile_df.join( content_pop_df, how="cross" ).select( "user_id", "content_id", "total_weight", "action_score", "strategy_score", "rpg_score", "weighted_score", "unique_users", ) # Export as Parquet to Volume feature_df.write.mode("overwrite").parquet( "volume://vol_content_metadata/features/user_content_features.parquet" ) print(f"Feature matrix exported: {feature_df.count()} rows") session.close()


Data Warehouse Object Summary

After completing the full build, the objects in the best_practice_content_rec schema:

SHOW TABLES IN best_practice_content_rec;

schema_name | table_name | is_dynamic -----------------------------+----------------------------------+----------- best_practice_content_rec | bronze_content_embedding | false best_practice_content_rec | bronze_content_metadata | false best_practice_content_rec | bronze_interaction_events | false best_practice_content_rec | gold_content_popularity | true best_practice_content_rec | gold_user_interest_profile | true best_practice_content_rec | silver_user_content_interactions | true

Layer hierarchy:

Kafka Topic (user_behavior_events) | Kafka PIPE (pipe_user_events Β· BATCH_INTERVAL=30s) v bronze_interaction_events bronze_content_metadata BloomFilter Index (content_id) Inverted Index (description, chinese) | | +------------+------------------+ v silver_user_content_interactions (Dynamic Table) interaction_weight Β· is_valid denoising | +------------+------------------+ v v gold_content_popularity (DT) gold_user_interest_profile (DT) PARTITIONED BY (stat_date) category-level interest vectors static_partitions=true | v ZettaPark Python Task feature_engineering_export -> user x content feature matrix -> Volume OSS Volume (vol_content_metadata) | OSS PIPE (pipe_content_metadata Β· LIST_PURGE) v bronze_content_metadata | External Function (text2vec) v bronze_content_embedding (VECTOR(128)) Vector Index IVFPQ (idx_vec_content_embedding) -> COSINE_DISTANCE ANN recall

Studio Task scheduling path: best_practices/content_rec/

  • refresh_gold_content_popularity (daily 02:00, refreshes two Gold DTs)
  • feature_engineering_export (triggered after Gold refresh completes)

Notes

  • BloomFilter Index does not support BUILD INDEX: BloomFilter type indexes only take effect for data written after creation and do not support BUILD INDEX on existing data (unlike Vector / Inverted Indexes). To cover existing large volumes of historical data, the table must be rebuilt and data re-inserted.

  • Vector Index and Inverted Index must be explicitly built: CREATE INDEX only processes newly added data going forward. For existing data, manually execute BUILD INDEX <index_name> ON <table_name> (in the same schema context), otherwise vector retrieval and full-text search results will not include data from before index creation.

  • Partitioned Dynamic Tables must declare static_partitions: A DT with PARTITIONED BY must set TBLPROPERTIES ('static_partitions' = 'true'). Without this declaration, the system uses dynamic partition inference, which may cause old partition data to be overwritten or lost during incremental refresh.

  • Dynamic Tables do not write REFRESH INTERVAL: Use Lakehouse Studio Tasks (path best_practices/content_rec/) to uniformly manage refresh scheduling. Alert rules and data quality checks can be attached to the same task node. Do not write the REFRESH INTERVAL parameter in CREATE DYNAMIC TABLE DDLs.

  • COSINE_DISTANCE: lower means more similar: Unlike cosine similarity (-1 to 1), COSINE_DISTANCE returns values in the range 0 to 2, where 0 means completely identical. Use ORDER BY cos_dist ASC when querying, and take the TOP-K results as approximate nearest neighbors.

  • Kafka PIPE DDL validates broker connection: When executing CREATE PIPE, the system will attempt to connect to the Kafka broker to verify that the topic exists. In development scenarios without a Kafka environment, create the target table first, use INSERT to simulate data, verify downstream Silver/Gold logic, and create the PIPE once the Kafka environment is ready.

  • OSS PIPE LIST_PURGE mode is irreversible: Source files are deleted from Volume after successful import. If the business requires retaining files (e.g., for rerun scenarios), use LIST mode and add deduplication logic to the Bronze layer (e.g., DISTINCT by content_id + load_time).