Ride-Hailing Operations Multi-City Supply-Demand Analysis Data Warehouse Best Practices
Integrate ride-hailing platform passenger order events, driver GPS trajectories, and historical trip data to build a city-level supply-demand analysis data warehouse supporting dynamic pricing and driver incentive strategy calculations. This guide uses the NYC Yellow Taxi Trip Data as the dataset, demonstrating the complete end-to-end construction of Kafka PIPE β ODS β DWD β DWS β ADS, covering six core capabilities: Kafka real-time ingestion, Dynamic Table partitioned incremental aggregation, Table Stream + incentive batch processing, SQL UDFs, and Lakehouse Studio task scheduling.
Overview
The core challenge for a ride-hailing data warehouse is: high-frequency GPS events + multi-city sharded orders β real-time supply-demand ratio β dynamic pricing signals β driver incentive settlements.
Singdata Lakehouse addresses these core problems with the following combination:
Problem
Solution
Driver GPS positions reported at high frequency, second-level writes
Kafka PIPE continuous ingestion, no need to write custom consumers
Order system distributed across multi-city MySQL shards
MySQL CDC full database mirroring, single PIPE merges multiple sources
After unzipping, you get 4 CSV files (January 2015, JanuaryβMarch 2016). This guide uses the first 100 rows of yellow_tripdata_2015-01.csv as the demo dataset, with 19 columns including pickup/dropoff time, location coordinates, trip distance, fare, tip, etc.
ingest_time uses DEFAULT CURRENT_TIMESTAMP(), automatically populated when Kafka PIPE writes β no need to carry it in the message body.
Create BloomFilter Index
Geofencing queries by pickup coordinates are high-frequency operations on the ride-hailing platform. The high-cardinality pickup_longitude column is suitable for BloomFilter acceleration.
CREATE BLOOMFILTER INDEX IF NOT EXISTS idx_bf_pickup_lon
ON TABLE doc_ods_trips (pickup_longitude);
β οΈ Note:CREATE BLOOMFILTER INDEX requires the same schema context as the target table. Execute USE SCHEMA best_practice_ride_hailing first, or use the -s best_practice_ride_hailing parameter when running via cz-cli. Otherwise, you get an "index and table must in the same schema" error.
ODS Layer: Real-time Ingestion and Historical Data Import
Kafka PIPE Real-time Ingestion
In production, driver GPS positions and order status changes are reported to Kafka in real time. First create a raw JSON receiver table, then create the PIPE:
-- Raw table for receiving Kafka messages
CREATE TABLE IF NOT EXISTS best_practice_ride_hailing.doc_ods_kafka_raw (
value STRING
);
-- Create Kafka PIPE
CREATE PIPE IF NOT EXISTS best_practice_ride_hailing.pipe_trip_events
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = '30'
AS
COPY INTO best_practice_ride_hailing.doc_ods_kafka_raw
FROM (
SELECT CAST(value AS STRING) AS value
FROM READ_KAFKA(
'<kafka-broker>:9092', -- replace with actual broker address
'nyc_trip_events', -- topic name
'',
'cz_ride_consumer', -- consumer group ID
'','','','',
'raw', 'raw',
0,
map()
)
);
π‘ Tip: In READ_KAFKA within the PIPE DDL, positional parameters 5β8 (start/end offsets, timestamps) must be left empty β they are automatically managed by the PIPE runtime.
Option 1: Write via Kafka (recommended)
When a Kafka environment is available, you can trigger PIPE ingestion by sending messages to the nyc_trip_events topic. The following kafka-python producer example shows how to construct and send a trip event message:
The PIPE batch-consumes every BATCH_INTERVAL_IN_SECONDS seconds. Messages are automatically written to doc_ods_kafka_raw and then parsed by downstream Dynamic Tables.
Option 2: INSERT simulation (when no Kafka environment available)
If Kafka is not yet configured, save the data as a local CSV file first, upload it to User Volume via cz-cli, then import using COPY INTO (recommended):
π‘ Tip: The examples below use cz-cli (Singdata Lakehouse command-line tool) to perform operations. If cz-cli is not installed, refer to the cz-cli setup guide. If you prefer not to use the CLI, you can also execute SQL in Lakehouse Studio under Development -> SQL Editor and configure and trigger scheduled tasks under Studio -> Tasks.
Import from local CSV (recommended)
-- Step 1: Upload the local CSV file to User Volume via SQL PUT
PUT '/path/to/nyc_trips_data.csv' TO USER VOLUME FILE 'nyc_trips_data.csv';
-- Step 2: COPY INTO the table from User Volume
COPY INTO best_practice_ride_hailing.doc_ods_trips
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('nyc_trips_data.csv');
Verify ODS row count:
SELECT COUNT(*) AS ods_row_count FROM best_practice_ride_hailing.doc_ods_trips;
ods_row_count
-------------
100
DWD Layer Dynamic Table: Trip Standardization and Feature Computation
The DWD layer performs two tasks on top of ODS:
Calls the calc_trip_duration_min SQL UDF to compute trip duration, avoiding repetition of the time difference formula in multiple places
Labels time periods (time_period) and calculates fare per mile (fare_per_mile) and tip rate (tip_rate_pct) for direct aggregation in the DWS layer
Create the Trip Duration UDF
CREATE OR REPLACE FUNCTION best_practice_ride_hailing.calc_trip_duration_min(
pickup_ts TIMESTAMP,
dropoff_ts TIMESTAMP
)
RETURNS DOUBLE
AS ROUND((UNIX_TIMESTAMP(dropoff_ts) - UNIX_TIMESTAMP(pickup_ts)) / 60.0, 2);
Verify the function (first record: 19:05:39 β 19:23:42, trip duration 18.05 minutes):
SELECT best_practice_ride_hailing.calc_trip_duration_min(
CAST('2015-01-15 19:05:39' AS TIMESTAMP),
CAST('2015-01-15 19:23:42' AS TIMESTAMP)
) AS duration_min;
duration_min
------------
18.05
Create the DWD Dynamic Table
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_ride_hailing.dwd_trip_events
AS
SELECT
vendor_id,
pickup_datetime,
dropoff_datetime,
passenger_count,
trip_distance,
pickup_longitude,
pickup_latitude,
dropoff_longitude,
dropoff_latitude,
rate_code_id,
store_fwd_flag,
payment_type,
fare_amount,
tip_amount,
tolls_amount,
total_amount,
best_practice_ride_hailing.calc_trip_duration_min(pickup_datetime, dropoff_datetime) AS trip_duration_min,
CASE
WHEN HOUR(pickup_datetime) BETWEEN 7 AND 9 THEN 'morning_peak'
WHEN HOUR(pickup_datetime) BETWEEN 17 AND 19 THEN 'evening_peak'
WHEN HOUR(pickup_datetime) BETWEEN 22 AND 23
OR HOUR(pickup_datetime) BETWEEN 0 AND 5 THEN 'night'
ELSE 'offpeak'
END AS time_period,
CASE
WHEN trip_distance > 0
AND best_practice_ride_hailing.calc_trip_duration_min(pickup_datetime, dropoff_datetime) > 0
THEN ROUND(fare_amount / (trip_distance + 0.001), 2)
ELSE NULL
END AS fare_per_mile,
CASE
WHEN best_practice_ride_hailing.calc_trip_duration_min(pickup_datetime, dropoff_datetime) > 0
THEN ROUND(tip_amount / (total_amount + 0.001) * 100, 2)
ELSE NULL
END AS tip_rate_pct,
ingest_time
FROM best_practice_ride_hailing.doc_ods_trips
WHERE pickup_datetime IS NOT NULL
AND dropoff_datetime IS NOT NULL
AND trip_distance >= 0
AND total_amount > 0;
β οΈ Note: The CREATE DYNAMIC TABLE DDL does not include REFRESH INTERVAL. Refresh scheduling is managed through Studio Tasks (see the "Studio Task Scheduling" section below), allowing data quality checks and alert rules to be attached to the same task.
SELECT COUNT(*) AS dwd_count FROM best_practice_ride_hailing.dwd_trip_events;
dwd_count
---------
100
View sample evening peak trips:
SELECT vendor_id, pickup_datetime, trip_distance, trip_duration_min,
time_period, fare_per_mile, tip_rate_pct
FROM best_practice_ride_hailing.dwd_trip_events
WHERE time_period = 'evening_peak'
ORDER BY total_amount DESC
LIMIT 5;
Results interpretation: Evening peak long-distance trip (18 miles, 43 minutes) has a fare of approximately $2.88/mile with a 9.4% tip rate. The extreme short-distance outlier (0.01 miles) has a distorted fare_per_mile due to an extremely small denominator β add WHERE trip_distance > 0.5 when doing actual analysis.
The DWS layer partitions by time period (time_period), storing morning/evening peak, night, and off-peak in separate partitions. Queries can skip irrelevant partitions via partition pruning to accelerate supply-demand ratio calculations.
Create the Dynamic Pricing Multiplier UDF
CREATE OR REPLACE FUNCTION best_practice_ride_hailing.calc_surge_factor(
trip_count INT,
time_period STRING
)
RETURNS DOUBLE
AS CASE
WHEN time_period IN ('morning_peak', 'evening_peak') AND trip_count > 15 THEN 1.8
WHEN time_period IN ('morning_peak', 'evening_peak') AND trip_count > 10 THEN 1.5
WHEN time_period = 'night' AND trip_count > 10 THEN 1.3
ELSE 1.0
END;
Verify:
SELECT
best_practice_ride_hailing.calc_surge_factor(20, 'morning_peak') AS surge_peak,
best_practice_ride_hailing.calc_surge_factor(8, 'offpeak') AS surge_offpeak,
best_practice_ride_hailing.calc_surge_factor(12, 'night') AS surge_night;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_ride_hailing.dws_hourly_stats (
hour_window, time_period, trip_count, total_passengers,
avg_distance_miles, avg_duration_min, avg_fare, avg_tip_rate_pct,
total_revenue, avg_fare_per_mile, credit_card_trips, cash_trips
)
PARTITIONED BY (time_period)
AS
SELECT
DATE_TRUNC('hour', pickup_datetime) AS hour_window,
time_period,
COUNT(*) AS trip_count,
SUM(passenger_count) AS total_passengers,
ROUND(AVG(trip_distance), 2) AS avg_distance_miles,
ROUND(AVG(trip_duration_min), 2) AS avg_duration_min,
ROUND(AVG(fare_amount), 2) AS avg_fare,
ROUND(AVG(tip_rate_pct), 2) AS avg_tip_rate_pct,
ROUND(SUM(total_amount), 2) AS total_revenue,
ROUND(AVG(fare_per_mile), 2) AS avg_fare_per_mile,
SUM(CASE WHEN payment_type = 1 THEN 1 ELSE 0 END) AS credit_card_trips,
SUM(CASE WHEN payment_type = 2 THEN 1 ELSE 0 END) AS cash_trips
FROM best_practice_ride_hailing.dwd_trip_events
WHERE time_period = SESSION_CONFIGS()['dt.args.time_period']
GROUP BY DATE_TRUNC('hour', pickup_datetime), time_period;
β οΈ Note: A partitioned Dynamic Table must explicitly declare PARTITIONED BY β dynamic partition inference cannot be relied upon. SESSION_CONFIGS()['dt.args.xxx'] returns STRING type; use CAST to convert to the target type (INT / DATE, etc.) in the query if needed. In this example, it is compared directly to the STRING column time_period, so no additional conversion is needed.
SELECT hour_window, time_period, trip_count, avg_distance_miles,
avg_fare, total_revenue, credit_card_trips, cash_trips
FROM best_practice_ride_hailing.dws_hourly_stats
ORDER BY hour_window, time_period;
January 15th evening peak (22 trips, average fare $15.84) vs. off-peak (22 trips, $17.75) have similar trip counts, but off-peak trips are longer (4.19 vs. 3.22 miles) with higher total revenue ($481 vs. $447).
Credit card payment ratio is high during evening peak (19/22 = 86%) and also tends toward credit card at night (15/21 = 71%), which can be used for targeted payment scenario promotions.
Supply-demand summary (combined by time period):
SELECT time_period,
SUM(trip_count) AS total_trips,
ROUND(AVG(avg_fare), 2) AS weighted_avg_fare,
ROUND(SUM(total_revenue), 2) AS total_revenue
FROM best_practice_ride_hailing.dws_hourly_stats
GROUP BY time_period
ORDER BY total_trips DESC;
ADS Layer Dynamic Table: Trip Efficiency and Driver Incentive Data Mart
The ADS layer aggregates at day Γ time period Γ payment type granularity, outputting trip efficiency profiles and distance segment labels for direct consumption by dynamic pricing models and driver incentive programs.
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_ride_hailing.ads_trip_efficiency
AS
SELECT
DATE(pickup_datetime) AS trip_date,
time_period,
payment_type,
COUNT(*) AS trip_count,
ROUND(AVG(trip_distance), 2) AS avg_distance_miles,
ROUND(AVG(trip_duration_min), 2) AS avg_duration_min,
ROUND(AVG(fare_per_mile), 2) AS avg_fare_per_mile,
ROUND(AVG(tip_rate_pct), 2) AS avg_tip_rate_pct,
ROUND(SUM(total_amount), 2) AS total_revenue,
ROUND(AVG(total_amount), 2) AS avg_trip_revenue,
CASE
WHEN AVG(trip_distance) >= 5 THEN 'long_haul'
WHEN AVG(trip_distance) >= 2 THEN 'medium'
ELSE 'short'
END AS distance_segment
FROM best_practice_ride_hailing.dwd_trip_events
GROUP BY DATE(pickup_datetime), time_period, payment_type;
January 15th evening peak credit card trips (19 trips) have the highest total revenue ($402.95) with average tip rate of 14.4% β the priority time period for incentives.
The extreme avg_fare_per_mile value of 291.86 in the first row comes from an extremely short trip (0.01 miles). Add WHERE trip_distance > 0.5 for production use.
Off-peak long-distance trips (5+ miles, $310.02) deserve a dedicated mileage incentive pool in incentive allocation.
Table Stream + Incentive Batch Processing
The driver incentive calculation for a ride-hailing platform requires: each new batch of completed orders β count driver trips for the day β determine incentive tier β write to incentive results table. Table Stream + ZettaPark Task matches this pattern exactly.
Create a Table Stream
CREATE TABLE STREAM IF NOT EXISTS best_practice_ride_hailing.stream_new_trips
ON TABLE best_practice_ride_hailing.doc_ods_trips
WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY');
After new rows are written to doc_ods_trips, the Stream captures these incremental rows:
SELECT COUNT(*) AS stream_rows FROM best_practice_ride_hailing.stream_new_trips;
stream_rows
-----------
10
SELECT vendor_id, pickup_datetime, trip_distance, total_amount, fare_amount, tip_amount
FROM best_practice_ride_hailing.stream_new_trips
ORDER BY pickup_datetime
LIMIT 5;
INSERT INTO best_practice_ride_hailing.doc_driver_incentive_batch
(batch_date, vendor_id, new_trip_count, new_revenue, avg_trip_value, incentive_tier)
SELECT
DATE(pickup_datetime) AS batch_date,
vendor_id,
COUNT(*) AS new_trip_count,
ROUND(SUM(total_amount), 2) AS new_revenue,
ROUND(AVG(total_amount), 2) AS avg_trip_value,
CASE
WHEN COUNT(*) >= 5 THEN 'gold'
WHEN COUNT(*) >= 3 THEN 'silver'
ELSE 'bronze'
END AS incentive_tier
FROM best_practice_ride_hailing.stream_new_trips
GROUP BY DATE(pickup_datetime), vendor_id;
SELECT batch_date, vendor_id, new_trip_count, new_revenue, incentive_tier
FROM best_practice_ride_hailing.doc_driver_incentive_batch;
Results interpretation: Vendor 1 added 10 new trips on this day with total revenue of $108.86, reaching the gold incentive tier (β₯5 trips). After Stream consumption, the offset automatically advances. The next INSERT will only process rows added after that point β no manual cursor management needed.
π‘ Tip: In production, this INSERT INTO ... SELECT FROM stream operation should be orchestrated via a Lakehouse Studio ZettaPark Task with a scheduled trigger (e.g., every hour). The Stream offset automatically updates after task execution, and repeated runs will not produce duplicate data.
Studio Task Scheduling
Dynamic Table periodic refresh is managed through Studio Tasks, not by writing REFRESH INTERVAL in the DDL. Three refresh tasks are created under the skill_test profile:
π‘ Tip: Studio Tasks support attaching data quality checks and alert notifications to the same task. For example, if dws_hourly_stats has zero rows after a DWS refresh, you can set an alert on the task to trigger a Lark message notification to the on-call team.
Data Warehouse Object Overview
After full construction, objects under the best_practice_ride_hailing schema:
BloomFilter Index does not automatically take effect on existing data:CREATE BLOOMFILTER INDEX only takes effect for new data written after creation. Existing historical trip data is not covered by the index. The BloomFilter type does not support BUILD INDEX for rebuild β to cover existing data, the table must be rebuilt.
Partitioned Dynamic Tables must use static partition declarations:dws_hourly_stats uses PARTITIONED BY (time_period) and must use SESSION_CONFIGS()['dt.args.time_period'] for per-partition refresh. REFRESH INTERVAL cannot be written directly in the DDL β scheduling is managed through Studio Tasks.
Table Stream offset automatically advances after consumption: Each INSERT INTO ... SELECT FROM stream operation on stream_new_trips advances the consumption offset. If the same Stream is consumed by multiple downstream consumers, each consumer needs to create its own independent Stream object. Sharing one Stream object will cause consumption competition.
calc_surge_factor thresholds are example values: The current multiplier thresholds (peak β₯15 trips triggers 1.5Γ) are set based on the demo dataset. In production, dynamically calibrate them based on city historical supply-demand data.
Dynamic Table first refresh is a full snapshot: The first REFRESH of dwd_trip_events performs a full scan of doc_ods_trips. Subsequent incremental refreshes only process rows that are new or changed since the last refresh point. Using INSERT OVERWRITE to write to the ODS layer causes Dynamic Tables to degrade to full refresh.