Ride-Hailing Operations Multi-City Supply-Demand Analysis Data Warehouse Best Practices

Integrate ride-hailing platform passenger order events, driver GPS trajectories, and historical trip data to build a city-level supply-demand analysis data warehouse supporting dynamic pricing and driver incentive strategy calculations. This guide uses the NYC Yellow Taxi Trip Data as the dataset, demonstrating the complete end-to-end construction of Kafka PIPE β†’ ODS β†’ DWD β†’ DWS β†’ ADS, covering six core capabilities: Kafka real-time ingestion, Dynamic Table partitioned incremental aggregation, Table Stream + incentive batch processing, SQL UDFs, and Lakehouse Studio task scheduling.


Overview

The core challenge for a ride-hailing data warehouse is: high-frequency GPS events + multi-city sharded orders β†’ real-time supply-demand ratio β†’ dynamic pricing signals β†’ driver incentive settlements.

Singdata Lakehouse addresses these core problems with the following combination:

ProblemSolution
Driver GPS positions reported at high frequency, second-level writesKafka PIPE continuous ingestion, no need to write custom consumers
Order system distributed across multi-city MySQL shardsMySQL CDC full database mirroring, single PIPE merges multiple sources
ODS β†’ DWD β†’ DWS automatic incremental computationDynamic Tables, declarative SQL, system maintains refresh dependency chain
DWS needs time-period partitions (morning/evening peak, night, off-peak) for query accelerationPartitioned Dynamic Tables with PARTITIONED BY (time_period)
New completed orders trigger driver incentive batch processingTable Stream captures incremental trips, ZettaPark task consumes them
Reverse geocoding (coordinates β†’ administrative area)External Function calls map API (this guide demonstrates an equivalent SQL UDF implementation)

SQL Commands Used

Command / FunctionPurposeNotes
CREATE TABLEBuild ODS raw trip table and incentive results tableRegular tables serving as Dynamic Table upstream
CREATE BLOOMFILTER INDEXCreate filter index on pickup_longitudePoint query acceleration for high-cardinality coordinate columns
CREATE PIPECreate Kafka continuous ingestion pipelineReal-time ingestion of GPS and order events
CREATE FUNCTIONCreate SQL UDFscalc_trip_duration_min, calc_surge_factor
CREATE DYNAMIC TABLECreate DWD / DWS / ADS incremental computation tablesDeclarative SQL, system auto-incrementally refreshes
CREATE TABLE STREAMCreate APPEND_ONLY trip change streamCapture new completed orders to trigger incentive batch processing
REFRESH DYNAMIC TABLEManually trigger one refreshUsed for initial builds or debugging

Prerequisites

All examples in this guide run under the best_practice_ride_hailing schema.

CREATE SCHEMA IF NOT EXISTS best_practice_ride_hailing;

Download the Dataset

kaggle datasets download -d elemento/nyc-yellow-taxi-trip-data \ --unzip -p /tmp/ride_hailing/

After unzipping, you get 4 CSV files (January 2015, January–March 2016). This guide uses the first 100 rows of yellow_tripdata_2015-01.csv as the demo dataset, with 19 columns including pickup/dropoff time, location coordinates, trip distance, fare, tip, etc.

Create the ODS Table

CREATE TABLE IF NOT EXISTS best_practice_ride_hailing.doc_ods_trips ( vendor_id INT, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, passenger_count INT, trip_distance DOUBLE, pickup_longitude DOUBLE, pickup_latitude DOUBLE, rate_code_id INT, store_fwd_flag STRING, dropoff_longitude DOUBLE, dropoff_latitude DOUBLE, payment_type INT, fare_amount DOUBLE, extra DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, improvement_surcharge DOUBLE, total_amount DOUBLE, ingest_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

ingest_time uses DEFAULT CURRENT_TIMESTAMP(), automatically populated when Kafka PIPE writes β€” no need to carry it in the message body.

Create BloomFilter Index

Geofencing queries by pickup coordinates are high-frequency operations on the ride-hailing platform. The high-cardinality pickup_longitude column is suitable for BloomFilter acceleration.

CREATE BLOOMFILTER INDEX IF NOT EXISTS idx_bf_pickup_lon ON TABLE doc_ods_trips (pickup_longitude);


ODS Layer: Real-time Ingestion and Historical Data Import

Kafka PIPE Real-time Ingestion

In production, driver GPS positions and order status changes are reported to Kafka in real time. First create a raw JSON receiver table, then create the PIPE:

-- Raw table for receiving Kafka messages CREATE TABLE IF NOT EXISTS best_practice_ride_hailing.doc_ods_kafka_raw ( value STRING ); -- Create Kafka PIPE CREATE PIPE IF NOT EXISTS best_practice_ride_hailing.pipe_trip_events VIRTUAL_CLUSTER = 'DEFAULT' BATCH_INTERVAL_IN_SECONDS = '30' AS COPY INTO best_practice_ride_hailing.doc_ods_kafka_raw FROM ( SELECT CAST(value AS STRING) AS value FROM READ_KAFKA( '<kafka-broker>:9092', -- replace with actual broker address 'nyc_trip_events', -- topic name '', 'cz_ride_consumer', -- consumer group ID '','','','', 'raw', 'raw', 0, map() ) );

Option 1: Write via Kafka (recommended)

When a Kafka environment is available, you can trigger PIPE ingestion by sending messages to the nyc_trip_events topic. The following kafka-python producer example shows how to construct and send a trip event message:

from kafka import KafkaProducer import json import time producer = KafkaProducer( bootstrap_servers=['<kafka-broker>:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) trip_event = { "vendor_id": 2, "pickup_datetime": "2015-01-15 19:05:39", "dropoff_datetime": "2015-01-15 19:23:42", "passenger_count": 1, "trip_distance": 1.59, "pickup_longitude": -73.993896, "pickup_latitude": 40.750110, "dropoff_longitude": -73.974784, "dropoff_latitude": 40.750617, "payment_type": 1, "fare_amount": 12.0, "tip_amount": 3.25, "total_amount": 17.05 } producer.send('nyc_trip_events', value=trip_event) producer.flush() print(f"Sent trip event: {trip_event['pickup_datetime']}")

The PIPE batch-consumes every BATCH_INTERVAL_IN_SECONDS seconds. Messages are automatically written to doc_ods_kafka_raw and then parsed by downstream Dynamic Tables.

Option 2: INSERT simulation (when no Kafka environment available)

If Kafka is not yet configured, save the data as a local CSV file first, upload it to User Volume via cz-cli, then import using COPY INTO (recommended):

Import from local CSV (recommended)

-- Step 1: Upload the local CSV file to User Volume via SQL PUT PUT '/path/to/nyc_trips_data.csv' TO USER VOLUME FILE 'nyc_trips_data.csv';

-- Step 2: COPY INTO the table from User Volume COPY INTO best_practice_ride_hailing.doc_ods_trips FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('nyc_trips_data.csv');

Verify ODS row count:

SELECT COUNT(*) AS ods_row_count FROM best_practice_ride_hailing.doc_ods_trips;

ods_row_count ------------- 100


DWD Layer Dynamic Table: Trip Standardization and Feature Computation

The DWD layer performs two tasks on top of ODS:

  1. Calls the calc_trip_duration_min SQL UDF to compute trip duration, avoiding repetition of the time difference formula in multiple places
  2. Labels time periods (time_period) and calculates fare per mile (fare_per_mile) and tip rate (tip_rate_pct) for direct aggregation in the DWS layer

Create the Trip Duration UDF

CREATE OR REPLACE FUNCTION best_practice_ride_hailing.calc_trip_duration_min( pickup_ts TIMESTAMP, dropoff_ts TIMESTAMP ) RETURNS DOUBLE AS ROUND((UNIX_TIMESTAMP(dropoff_ts) - UNIX_TIMESTAMP(pickup_ts)) / 60.0, 2);

Verify the function (first record: 19:05:39 β†’ 19:23:42, trip duration 18.05 minutes):

SELECT best_practice_ride_hailing.calc_trip_duration_min( CAST('2015-01-15 19:05:39' AS TIMESTAMP), CAST('2015-01-15 19:23:42' AS TIMESTAMP) ) AS duration_min;

duration_min ------------ 18.05

Create the DWD Dynamic Table

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_ride_hailing.dwd_trip_events AS SELECT vendor_id, pickup_datetime, dropoff_datetime, passenger_count, trip_distance, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, rate_code_id, store_fwd_flag, payment_type, fare_amount, tip_amount, tolls_amount, total_amount, best_practice_ride_hailing.calc_trip_duration_min(pickup_datetime, dropoff_datetime) AS trip_duration_min, CASE WHEN HOUR(pickup_datetime) BETWEEN 7 AND 9 THEN 'morning_peak' WHEN HOUR(pickup_datetime) BETWEEN 17 AND 19 THEN 'evening_peak' WHEN HOUR(pickup_datetime) BETWEEN 22 AND 23 OR HOUR(pickup_datetime) BETWEEN 0 AND 5 THEN 'night' ELSE 'offpeak' END AS time_period, CASE WHEN trip_distance > 0 AND best_practice_ride_hailing.calc_trip_duration_min(pickup_datetime, dropoff_datetime) > 0 THEN ROUND(fare_amount / (trip_distance + 0.001), 2) ELSE NULL END AS fare_per_mile, CASE WHEN best_practice_ride_hailing.calc_trip_duration_min(pickup_datetime, dropoff_datetime) > 0 THEN ROUND(tip_amount / (total_amount + 0.001) * 100, 2) ELSE NULL END AS tip_rate_pct, ingest_time FROM best_practice_ride_hailing.doc_ods_trips WHERE pickup_datetime IS NOT NULL AND dropoff_datetime IS NOT NULL AND trip_distance >= 0 AND total_amount > 0;

Trigger the first manual refresh:

REFRESH DYNAMIC TABLE best_practice_ride_hailing.dwd_trip_events;

SELECT COUNT(*) AS dwd_count FROM best_practice_ride_hailing.dwd_trip_events;

dwd_count --------- 100

View sample evening peak trips:

SELECT vendor_id, pickup_datetime, trip_distance, trip_duration_min, time_period, fare_per_mile, tip_rate_pct FROM best_practice_ride_hailing.dwd_trip_events WHERE time_period = 'evening_peak' ORDER BY total_amount DESC LIMIT 5;

vendor_id | pickup_datetime | trip_distance | trip_duration_min | time_period | fare_per_mile | tip_rate_pct ----------+--------------------------+---------------+-------------------+--------------+---------------+------------- 2 | 2015-01-15T19:05:42 | 18.06 | 43.42 | evening_peak | 2.88 | 9.36 1 | 2015-01-10T19:12:21 | 16.4 | 33.78 | evening_peak | 3.17 | 15.92 2 | 2015-01-15T19:05:40 | 8.33 | 22.63 | evening_peak | 3.12 | 19.61 2 | 2015-01-15T19:05:41 | 7.13 | 14.68 | evening_peak | 3.02 | 16.19 2 | 2015-01-15T19:05:43 | 0.01 | 0.02 | evening_peak | 5454.55 | 0

Results interpretation: Evening peak long-distance trip (18 miles, 43 minutes) has a fare of approximately $2.88/mile with a 9.4% tip rate. The extreme short-distance outlier (0.01 miles) has a distorted fare_per_mile due to an extremely small denominator β€” add WHERE trip_distance > 0.5 when doing actual analysis.


DWS Layer Dynamic Table: Time-Period Supply-Demand Aggregation (Static Partitions)

The DWS layer partitions by time period (time_period), storing morning/evening peak, night, and off-peak in separate partitions. Queries can skip irrelevant partitions via partition pruning to accelerate supply-demand ratio calculations.

Create the Dynamic Pricing Multiplier UDF

CREATE OR REPLACE FUNCTION best_practice_ride_hailing.calc_surge_factor( trip_count INT, time_period STRING ) RETURNS DOUBLE AS CASE WHEN time_period IN ('morning_peak', 'evening_peak') AND trip_count > 15 THEN 1.8 WHEN time_period IN ('morning_peak', 'evening_peak') AND trip_count > 10 THEN 1.5 WHEN time_period = 'night' AND trip_count > 10 THEN 1.3 ELSE 1.0 END;

Verify:

SELECT best_practice_ride_hailing.calc_surge_factor(20, 'morning_peak') AS surge_peak, best_practice_ride_hailing.calc_surge_factor(8, 'offpeak') AS surge_offpeak, best_practice_ride_hailing.calc_surge_factor(12, 'night') AS surge_night;

surge_peak | surge_offpeak | surge_night -----------+---------------+------------ 1.8 | 1 | 1.3

Create the Partitioned DWS Dynamic Table

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_ride_hailing.dws_hourly_stats ( hour_window, time_period, trip_count, total_passengers, avg_distance_miles, avg_duration_min, avg_fare, avg_tip_rate_pct, total_revenue, avg_fare_per_mile, credit_card_trips, cash_trips ) PARTITIONED BY (time_period) AS SELECT DATE_TRUNC('hour', pickup_datetime) AS hour_window, time_period, COUNT(*) AS trip_count, SUM(passenger_count) AS total_passengers, ROUND(AVG(trip_distance), 2) AS avg_distance_miles, ROUND(AVG(trip_duration_min), 2) AS avg_duration_min, ROUND(AVG(fare_amount), 2) AS avg_fare, ROUND(AVG(tip_rate_pct), 2) AS avg_tip_rate_pct, ROUND(SUM(total_amount), 2) AS total_revenue, ROUND(AVG(fare_per_mile), 2) AS avg_fare_per_mile, SUM(CASE WHEN payment_type = 1 THEN 1 ELSE 0 END) AS credit_card_trips, SUM(CASE WHEN payment_type = 2 THEN 1 ELSE 0 END) AS cash_trips FROM best_practice_ride_hailing.dwd_trip_events WHERE time_period = SESSION_CONFIGS()['dt.args.time_period'] GROUP BY DATE_TRUNC('hour', pickup_datetime), time_period;

Refresh each time period partition:

SET dt.args.time_period = 'morning_peak'; REFRESH DYNAMIC TABLE best_practice_ride_hailing.dws_hourly_stats PARTITION (time_period = 'morning_peak'); SET dt.args.time_period = 'evening_peak'; REFRESH DYNAMIC TABLE best_practice_ride_hailing.dws_hourly_stats PARTITION (time_period = 'evening_peak'); SET dt.args.time_period = 'night'; REFRESH DYNAMIC TABLE best_practice_ride_hailing.dws_hourly_stats PARTITION (time_period = 'night'); SET dt.args.time_period = 'offpeak'; REFRESH DYNAMIC TABLE best_practice_ride_hailing.dws_hourly_stats PARTITION (time_period = 'offpeak');

View supply-demand summary by time period:

SELECT hour_window, time_period, trip_count, avg_distance_miles, avg_fare, total_revenue, credit_card_trips, cash_trips FROM best_practice_ride_hailing.dws_hourly_stats ORDER BY hour_window, time_period;

hour_window | time_period | trip_count | avg_distance_miles | avg_fare | total_revenue | credit_card_trips | cash_trips ---------------------+--------------+------------+--------------------+----------+---------------+-------------------+----------- 2015-01-04T13:00:00 | offpeak | 17 | 2.16 | 9.21 | 198.26 | 10 | 7 2015-01-10T19:00:00 | evening_peak | 2 | 9.65 | 32.75 | 77.1 | 1 | 1 2015-01-10T20:00:00 | offpeak | 14 | 3.3 | 13.68 | 237.43 | 7 | 7 2015-01-15T14:00:00 | offpeak | 22 | 4.19 | 17.75 | 481.82 | 11 | 11 2015-01-15T19:00:00 | evening_peak | 22 | 3.22 | 15.84 | 447.35 | 19 | 3 2015-01-25T00:00:00 | night | 21 | 2.65 | 11.76 | 308.95 | 15 | 6 2015-01-26T12:00:00 | offpeak | 2 | 2.65 | 11.25 | 25.65 | 1 | 1

Results interpretation:

  • January 15th evening peak (22 trips, average fare $15.84) vs. off-peak (22 trips, $17.75) have similar trip counts, but off-peak trips are longer (4.19 vs. 3.22 miles) with higher total revenue ($481 vs. $447).
  • Credit card payment ratio is high during evening peak (19/22 = 86%) and also tends toward credit card at night (15/21 = 71%), which can be used for targeted payment scenario promotions.

Supply-demand summary (combined by time period):

SELECT time_period, SUM(trip_count) AS total_trips, ROUND(AVG(avg_fare), 2) AS weighted_avg_fare, ROUND(SUM(total_revenue), 2) AS total_revenue FROM best_practice_ride_hailing.dws_hourly_stats GROUP BY time_period ORDER BY total_trips DESC;

time_period | total_trips | weighted_avg_fare | total_revenue -------------+-------------+-------------------+-------------- offpeak | 55 | 12.97 | 943.16 evening_peak | 24 | 24.3 | 524.45 night | 21 | 11.76 | 308.95


ADS Layer Dynamic Table: Trip Efficiency and Driver Incentive Data Mart

The ADS layer aggregates at day Γ— time period Γ— payment type granularity, outputting trip efficiency profiles and distance segment labels for direct consumption by dynamic pricing models and driver incentive programs.

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_ride_hailing.ads_trip_efficiency AS SELECT DATE(pickup_datetime) AS trip_date, time_period, payment_type, COUNT(*) AS trip_count, ROUND(AVG(trip_distance), 2) AS avg_distance_miles, ROUND(AVG(trip_duration_min), 2) AS avg_duration_min, ROUND(AVG(fare_per_mile), 2) AS avg_fare_per_mile, ROUND(AVG(tip_rate_pct), 2) AS avg_tip_rate_pct, ROUND(SUM(total_amount), 2) AS total_revenue, ROUND(AVG(total_amount), 2) AS avg_trip_revenue, CASE WHEN AVG(trip_distance) >= 5 THEN 'long_haul' WHEN AVG(trip_distance) >= 2 THEN 'medium' ELSE 'short' END AS distance_segment FROM best_practice_ride_hailing.dwd_trip_events GROUP BY DATE(pickup_datetime), time_period, payment_type;

Trigger manual refresh:

REFRESH DYNAMIC TABLE best_practice_ride_hailing.ads_trip_efficiency;

View the highest-revenue time period Γ— trip type combinations:

SELECT trip_date, time_period, trip_count, avg_distance_miles, avg_fare_per_mile, avg_tip_rate_pct, total_revenue, distance_segment FROM best_practice_ride_hailing.ads_trip_efficiency ORDER BY total_revenue DESC LIMIT 8;

trip_date | time_period | trip_count | avg_distance_miles | avg_fare_per_mile | avg_tip_rate_pct | total_revenue | distance_segment ------------+--------------+------------+--------------------+-------------------+------------------+---------------+----------------- 2015-01-15 | evening_peak | 19 | 3.31 | 291.86 | 14.43 | 402.95 | medium 2015-01-15 | offpeak | 11 | 5.15 | 6.06 | 16.85 | 310.02 | long_haul 2015-01-25 | night | 15 | 2.57 | 4.7 | 15.34 | 226.15 | medium 2015-01-15 | offpeak | 11 | 3.23 | 5.97 | 0 | 171.8 | medium 2015-01-04 | offpeak | 10 | 3.07 | 4.92 | 14.07 | 152.66 | medium 2015-01-10 | offpeak | 7 | 2.73 | 6.29 | 14.4 | 120.5 | medium 2015-01-10 | offpeak | 7 | 3.87 | 5.99 | 0 | 116.93 | medium 2015-01-25 | night | 6 | 2.84 | 24.39 | 0 | 82.8 | medium

Results interpretation:

  • January 15th evening peak credit card trips (19 trips) have the highest total revenue ($402.95) with average tip rate of 14.4% β€” the priority time period for incentives.
  • The extreme avg_fare_per_mile value of 291.86 in the first row comes from an extremely short trip (0.01 miles). Add WHERE trip_distance > 0.5 for production use.
  • Off-peak long-distance trips (5+ miles, $310.02) deserve a dedicated mileage incentive pool in incentive allocation.

Table Stream + Incentive Batch Processing

The driver incentive calculation for a ride-hailing platform requires: each new batch of completed orders β†’ count driver trips for the day β†’ determine incentive tier β†’ write to incentive results table. Table Stream + ZettaPark Task matches this pattern exactly.

Create a Table Stream

CREATE TABLE STREAM IF NOT EXISTS best_practice_ride_hailing.stream_new_trips ON TABLE best_practice_ride_hailing.doc_ods_trips WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY');

After new rows are written to doc_ods_trips, the Stream captures these incremental rows:

SELECT COUNT(*) AS stream_rows FROM best_practice_ride_hailing.stream_new_trips;

stream_rows ----------- 10

SELECT vendor_id, pickup_datetime, trip_distance, total_amount, fare_amount, tip_amount FROM best_practice_ride_hailing.stream_new_trips ORDER BY pickup_datetime LIMIT 5;

vendor_id | pickup_datetime | trip_distance | total_amount | fare_amount | tip_amount ----------+---------------------+---------------+--------------+-------------+----------- 1 | 2015-01-26T12:41:09 | 0.5 | 5.3 | 4.5 | 0 1 | 2015-01-26T12:41:09 | 0.8 | 5.8 | 5 | 0 1 | 2015-01-26T12:41:10 | 1.1 | 18.35 | 14.5 | 3.05 1 | 2015-01-26T12:41:10 | 2.9 | 14.8 | 14 | 0 1 | 2015-01-26T12:41:11 | 0.3 | 4.8 | 4 | 0

Create the Incentive Results Table and Consume the Stream

CREATE TABLE IF NOT EXISTS best_practice_ride_hailing.doc_driver_incentive_batch ( batch_date DATE, vendor_id INT, new_trip_count INT, new_revenue DOUBLE, avg_trip_value DOUBLE, incentive_tier STRING, processed_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

Consume the Stream and write incentive results:

INSERT INTO best_practice_ride_hailing.doc_driver_incentive_batch (batch_date, vendor_id, new_trip_count, new_revenue, avg_trip_value, incentive_tier) SELECT DATE(pickup_datetime) AS batch_date, vendor_id, COUNT(*) AS new_trip_count, ROUND(SUM(total_amount), 2) AS new_revenue, ROUND(AVG(total_amount), 2) AS avg_trip_value, CASE WHEN COUNT(*) >= 5 THEN 'gold' WHEN COUNT(*) >= 3 THEN 'silver' ELSE 'bronze' END AS incentive_tier FROM best_practice_ride_hailing.stream_new_trips GROUP BY DATE(pickup_datetime), vendor_id;

SELECT batch_date, vendor_id, new_trip_count, new_revenue, incentive_tier FROM best_practice_ride_hailing.doc_driver_incentive_batch;

batch_date | vendor_id | new_trip_count | new_revenue | incentive_tier ------------+-----------+----------------+-------------+--------------- 2015-01-26 | 1 | 10 | 108.86 | gold

Results interpretation: Vendor 1 added 10 new trips on this day with total revenue of $108.86, reaching the gold incentive tier (β‰₯5 trips). After Stream consumption, the offset automatically advances. The next INSERT will only process rows added after that point β€” no manual cursor management needed.


Studio Task Scheduling

Dynamic Table periodic refresh is managed through Studio Tasks, not by writing REFRESH INTERVAL in the DDL. Three refresh tasks are created under the skill_test profile:

# 1. DWD trip standardization refresh task (every 15 minutes) cz-cli task create refresh_dwd_trip_events --type SQL -p skill_test # Returns: {"data":{"id":10354660,...}} cz-cli task save-content 10354660 \ --content "REFRESH DYNAMIC TABLE best_practice_ride_hailing.dwd_trip_events;" \ -p skill_test cz-cli task save-cron 10354660 --cron "0 */15 * * * ?" -p skill_test # 2. DWS time-period partition refresh task (every 30 minutes) cz-cli task create refresh_dws_hourly_stats --type SQL -p skill_test # Returns: {"data":{"id":10354661,...}} cz-cli task save-content 10354661 \ --content "SET dt.args.time_period = 'morning_peak'; REFRESH DYNAMIC TABLE best_practice_ride_hailing.dws_hourly_stats PARTITION (time_period = 'morning_peak'); SET dt.args.time_period = 'evening_peak'; REFRESH DYNAMIC TABLE best_practice_ride_hailing.dws_hourly_stats PARTITION (time_period = 'evening_peak'); SET dt.args.time_period = 'night'; REFRESH DYNAMIC TABLE best_practice_ride_hailing.dws_hourly_stats PARTITION (time_period = 'night'); SET dt.args.time_period = 'offpeak'; REFRESH DYNAMIC TABLE best_practice_ride_hailing.dws_hourly_stats PARTITION (time_period = 'offpeak');" \ -p skill_test cz-cli task save-cron 10354661 --cron "0 */30 * * * ?" -p skill_test # 3. ADS trip efficiency refresh task (daily at 1 AM) cz-cli task create refresh_ads_trip_efficiency --type SQL -p skill_test # Returns: {"data":{"id":10353704,...}} cz-cli task save-content 10353704 \ --content "REFRESH DYNAMIC TABLE best_practice_ride_hailing.ads_trip_efficiency;" \ -p skill_test cz-cli task save-cron 10353704 --cron "0 0 1 * * ?" -p skill_test


Data Warehouse Object Overview

After full construction, objects under the best_practice_ride_hailing schema:

SHOW TABLES IN best_practice_ride_hailing;

schema_name | table_name | is_dynamic ------------------------------+---------------------------+----------- best_practice_ride_hailing | doc_ods_trips | false best_practice_ride_hailing | doc_ods_kafka_raw | false best_practice_ride_hailing | doc_driver_incentive_batch| false best_practice_ride_hailing | dwd_trip_events | true best_practice_ride_hailing | dws_hourly_stats | true best_practice_ride_hailing | ads_trip_efficiency | true

Data flow architecture:


Notes

  • BloomFilter Index does not automatically take effect on existing data: CREATE BLOOMFILTER INDEX only takes effect for new data written after creation. Existing historical trip data is not covered by the index. The BloomFilter type does not support BUILD INDEX for rebuild β€” to cover existing data, the table must be rebuilt.

  • Partitioned Dynamic Tables must use static partition declarations: dws_hourly_stats uses PARTITIONED BY (time_period) and must use SESSION_CONFIGS()['dt.args.time_period'] for per-partition refresh. REFRESH INTERVAL cannot be written directly in the DDL β€” scheduling is managed through Studio Tasks.

  • Table Stream offset automatically advances after consumption: Each INSERT INTO ... SELECT FROM stream operation on stream_new_trips advances the consumption offset. If the same Stream is consumed by multiple downstream consumers, each consumer needs to create its own independent Stream object. Sharing one Stream object will cause consumption competition.

  • calc_surge_factor thresholds are example values: The current multiplier thresholds (peak β‰₯15 trips triggers 1.5Γ—) are set based on the demo dataset. In production, dynamically calibrate them based on city historical supply-demand data.

  • Dynamic Table first refresh is a full snapshot: The first REFRESH of dwd_trip_events performs a full scan of doc_ods_trips. Subsequent incremental refreshes only process rows that are new or changed since the last refresh point. Using INSERT OVERWRITE to write to the ODS layer causes Dynamic Tables to degrade to full refresh.