Build a Supply Chain and Logistics Tracking Data Warehouse
Supply chain data warehouses face the challenge of three data pipelines running in parallel with varying latency from each system: OMS order status needs to be visible within seconds, WMS inventory snapshots are synced in daily batches, and TMS logistics EDI files arrive periodically. Using a Kaggle retail dataset (orders, inventory, logistics, suppliers covering the complete ODS→DWD→DWS→ADS pipeline), this document demonstrates how to use Singdata Lakehouse to integrate three streams of heterogeneous data into a unified supply chain visibility data warehouse, monitoring SKU inventory turnover and shipment on-time rates.
Overview
Problem
Singdata Solution
OMS order status changes need real-time warehouse sync with no hourly delay
PostgreSQL CDC real-time sync; order status changes written to ODS within seconds
WMS inventory data is large; querying historical snapshots by warehouse partition
MySQL multi-table offline sync + PARTITIONED BY (warehouse_id, dt) for accelerated partition pruning
Logistics providers supply EDI files that need periodic batch import
OSS PIPE continuously monitors the bucket; new files automatically trigger COPY INTO
Multi-layer aggregation (DWD→DWS→ADS) has complex dependency chains requiring automatic orchestration
Standardize multi-source status codes to unified business semantics
DATEDIFF
Calculate in-transit days transit_days
Supports COALESCE to handle undelivered shipments
DATE_FORMAT
Group statistics by month (yyyy-MM)
Used for ADS monthly SLA reports
NULLIF
Avoid division-by-zero exceptions
Divide by NULLIF(count, 0) when calculating SLA compliance rate
Prerequisites
Use a separate schema to isolate all test tables in this document:
CREATE SCHEMA IF NOT EXISTS best_practice_supply_chain;
💡 Tip: The examples below use cz-cli (the Singdata ClickZetta Lakehouse command-line tool) to execute operations. If cz-cli is not installed, see https://singdata.com/documents/setup_cz_cli; if you prefer not to use the command line, you can also execute SQL in Lakehouse Studio under Development -> SQL Editor and configure and trigger scheduled tasks on the Studio -> Tasks page.
cz-cli sql "CREATE SCHEMA IF NOT EXISTS best_practice_supply_chain" -p skill_test --write
Result:
{"data":{},"time_ms":101}
ODS Layer: Three-Stream Heterogeneous Data Ingestion
The ODS layer corresponds to three source systems, each ingested using a different approach.
Create Tables
OMS Orders Table (PostgreSQL CDC target table)
CREATE TABLE IF NOT EXISTS best_practice_supply_chain.doc_ods_orders (
order_id BIGINT,
order_date DATE,
customer_id BIGINT,
store_id INT,
status STRING,
total_amount DECIMAL(12,2),
currency STRING,
created_at TIMESTAMP,
updated_at TIMESTAMP
)
COMMENT 'ODS: raw orders from OMS (synced via PostgreSQL CDC)'
PARTITIONED BY (dt STRING);
OMS Order Items Table (PostgreSQL CDC target table)
CREATE TABLE IF NOT EXISTS best_practice_supply_chain.doc_ods_order_items (
item_id BIGINT,
order_id BIGINT,
product_id BIGINT,
sku_code STRING,
quantity INT,
unit_price DECIMAL(10,2),
discount DECIMAL(10,2),
warehouse_id INT,
created_at TIMESTAMP
)
COMMENT 'ODS: raw order line items from OMS'
PARTITIONED BY (dt STRING);
TMS Shipments Table (OSS PIPE target table)
CREATE TABLE IF NOT EXISTS best_practice_supply_chain.doc_ods_shipments (
shipment_id BIGINT,
order_id BIGINT,
carrier_code STRING,
tracking_number STRING,
origin_warehouse INT,
dest_city STRING,
dest_province STRING,
shipped_at TIMESTAMP,
expected_delivery DATE,
actual_delivery DATE,
status STRING,
created_at TIMESTAMP
)
COMMENT 'ODS: logistics shipment events from TMS / EDI files (via OSS PIPE)'
PARTITIONED BY (dt STRING);
WMS Supplier Master Data Table
CREATE TABLE IF NOT EXISTS best_practice_supply_chain.doc_ods_suppliers (
supplier_id INT,
supplier_name STRING,
contact_name STRING,
country STRING,
city STRING,
sla_days INT,
tier STRING,
created_at TIMESTAMP
)
COMMENT 'ODS: supplier master data from WMS';
CREATE TABLE IF NOT EXISTS best_practice_supply_chain.doc_ods_inventory (
snapshot_id BIGINT,
snapshot_date DATE,
warehouse_id INT,
sku_code STRING,
product_id BIGINT,
quantity_on_hand INT,
quantity_reserved INT,
quantity_in_transit INT,
reorder_point INT,
created_at TIMESTAMP
)
COMMENT 'ODS: WMS inventory snapshots (synced via MySQL batch offline sync)'
PARTITIONED BY (dt STRING);
⚠️ Note: Column names in PARTITIONED BY cannot duplicate column names in the columns definition, otherwise a key.found error is reported. Although the inventory table is queried on both warehouse and date dimensions, only one partition column dt STRING is defined; the warehouse dimension is filtered via WHERE clause pushdown rather than partition pruning.
OSS PIPE to Ingest TMS EDI Files
Logistics providers upload EDI files to an OSS bucket every night, automatically imported into the shipments table via PIPE:
-- Prerequisite: OSS Storage Connection and External Volume already created
CREATE PIPE IF NOT EXISTS best_practice_supply_chain.pipe_ods_shipments
AS
COPY INTO best_practice_supply_chain.doc_ods_shipments
FROM VOLUME oss_logistics_vol
USING csv
OPTIONS('header'='true', 'sep'=',');
💡 Tip: PIPE uses LIST_PURGE scan mode by default (periodically polls the Volume for new files). If OSS event notifications are enabled, you can switch to INGEST_MODE = EVENT_NOTIFICATION for second-level file triggering.
DWD Layer: Order Lifecycle Event Standardization
The DWD layer joins and widens the three core ODS tables, deriving delivery_flag (on_time/delayed/overdue) and transit_days (in-transit days), providing a unified order event view.
Create Tables
Order Event Wide Table (Dynamic Table)
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_supply_chain.doc_dwd_order_events
COMMENT 'DWD: standardized order lifecycle events with shipment join'
AS
SELECT
o.order_id,
o.order_date,
o.customer_id,
o.store_id,
o.status AS order_status,
o.total_amount,
o.currency,
o.created_at AS order_created_at,
o.updated_at AS order_updated_at,
oi.item_id,
oi.product_id,
oi.sku_code,
oi.quantity,
oi.unit_price,
oi.discount,
oi.warehouse_id,
(oi.unit_price * oi.quantity - oi.discount) AS line_amount,
s.shipment_id,
s.carrier_code,
s.tracking_number,
s.shipped_at,
s.expected_delivery,
s.actual_delivery,
s.status AS shipment_status,
s.dest_city,
s.dest_province,
CASE
WHEN s.actual_delivery IS NOT NULL AND s.actual_delivery <= s.expected_delivery THEN 'on_time'
WHEN s.actual_delivery IS NOT NULL AND s.actual_delivery > s.expected_delivery THEN 'delayed'
WHEN s.actual_delivery IS NULL AND CURRENT_DATE() > s.expected_delivery THEN 'overdue'
ELSE 'pending'
END AS delivery_flag,
DATEDIFF(COALESCE(s.actual_delivery, CURRENT_DATE()), s.shipped_at) AS transit_days
FROM best_practice_supply_chain.doc_ods_orders o
JOIN best_practice_supply_chain.doc_ods_order_items oi ON o.order_id = oi.order_id
LEFT JOIN best_practice_supply_chain.doc_ods_shipments s ON o.order_id = s.order_id;
Inventory Event Wide Table (Dynamic Table)
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_supply_chain.doc_dwd_inventory_events
COMMENT 'DWD: enriched inventory snapshots with availability calculation'
AS
SELECT
inv.snapshot_date,
inv.warehouse_id,
inv.sku_code,
inv.product_id,
inv.quantity_on_hand,
inv.quantity_reserved,
inv.quantity_in_transit,
inv.reorder_point,
(inv.quantity_on_hand - inv.quantity_reserved) AS available_quantity,
CASE
WHEN (inv.quantity_on_hand - inv.quantity_reserved) <= 0 THEN 'out_of_stock'
WHEN (inv.quantity_on_hand - inv.quantity_reserved) < inv.reorder_point THEN 'low_stock'
ELSE 'normal'
END AS stock_status
FROM best_practice_supply_chain.doc_ods_inventory inv;
Query the order event wide table to verify delivery_flag and transit_days derived fields:
SELECT order_id, sku_code, order_status, delivery_flag, transit_days
FROM best_practice_supply_chain.doc_dwd_order_events
ORDER BY order_id
LIMIT 10;
order_id
sku_code
order_status
delivery_flag
transit_days
100001
SKU-A001
delivered
delayed
4
100001
SKU-B012
delivered
delayed
4
100002
SKU-C005
shipped
overdue
795
100003
SKU-A001
processing
pending
null
100004
SKU-E007
delivered
delayed
4
100005
SKU-B012
cancelled
pending
null
100006
SKU-C005
delivered
delayed
4
100007
SKU-F001
shipped
overdue
793
100008
SKU-A001
delivered
delayed
4
delivery_flag value interpretation: delayed means actual delivery was later than the expected delivery date; overdue means the shipment was sent but has still not been received (exceeded the promised lead time); pending means not yet shipped or already cancelled. transit_days for undelivered shipments uses COALESCE(actual_delivery, CURRENT_DATE()), and rows not yet shipped have null.
Query the inventory event wide table:
SELECT warehouse_id, sku_code, quantity_on_hand, available_quantity, stock_status
FROM best_practice_supply_chain.doc_dwd_inventory_events
ORDER BY warehouse_id, sku_code;
warehouse_id
sku_code
quantity_on_hand
available_quantity
stock_status
1
SKU-A001
380
335
normal
1
SKU-B012
210
180
normal
1
SKU-F001
180
155
normal
2
SKU-A001
150
130
normal
2
SKU-C005
560
480
normal
2
SKU-G009
320
280
normal
3
SKU-D020
95
85
normal
3
SKU-E007
42
37
normal
available_quantity = quantity_on_hand - quantity_reserved, representing the actual shippable quantity after deducting pre-allocated inventory.
DWS Layer: SKU Inventory and Route Timeliness Aggregation
The DWS layer aggregates on two dimensions based on the DWD layer: SKU daily sales summary (for inventory turnover analysis) and carrier route timeliness summary (for SLA performance evaluation).
Create Tables
SKU Daily Sales Aggregation Table (Dynamic Table)
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_supply_chain.doc_dws_sku_daily_sales
COMMENT 'DWS: daily SKU-level sales and inventory turnover aggregation'
AS
SELECT
e.order_date,
e.sku_code,
e.warehouse_id,
COUNT(DISTINCT e.order_id) AS order_count,
SUM(e.quantity) AS total_quantity_sold,
SUM(e.line_amount) AS total_revenue,
AVG(e.unit_price) AS avg_unit_price,
SUM(CASE WHEN e.delivery_flag = 'on_time' THEN 1 ELSE 0 END) AS on_time_count,
SUM(CASE WHEN e.delivery_flag = 'delayed' THEN 1 ELSE 0 END) AS delayed_count,
SUM(CASE WHEN e.delivery_flag = 'overdue' THEN 1 ELSE 0 END) AS overdue_count
FROM best_practice_supply_chain.doc_dwd_order_events e
WHERE e.order_status NOT IN ('cancelled')
GROUP BY e.order_date, e.sku_code, e.warehouse_id;
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_supply_chain.doc_dws_carrier_timeliness
COMMENT 'DWS: carrier on-time delivery rate and route performance aggregation'
AS
SELECT
s.carrier_code,
s.dest_province,
DATE_TRUNC('week', s.shipped_at) AS ship_week,
COUNT(*) AS total_shipments,
SUM(CASE WHEN e.delivery_flag = 'on_time' THEN 1 ELSE 0 END) AS on_time_shipments,
SUM(CASE WHEN e.delivery_flag = 'delayed' THEN 1 ELSE 0 END) AS delayed_shipments,
ROUND(
SUM(CASE WHEN e.delivery_flag = 'on_time' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2
) AS on_time_rate_pct,
AVG(e.transit_days) AS avg_transit_days
FROM best_practice_supply_chain.doc_dwd_order_events e
JOIN best_practice_supply_chain.doc_ods_shipments s ON e.shipment_id = s.shipment_id
WHERE e.shipment_status IN ('delivered', 'in_transit')
GROUP BY s.carrier_code, s.dest_province, DATE_TRUNC('week', s.shipped_at);
Query DWS Aggregation Results
SKU sales summary (merged across all dates by SKU):
SELECT
sku_code,
SUM(total_quantity_sold) AS qty,
ROUND(SUM(total_revenue), 2) AS revenue
FROM best_practice_supply_chain.doc_dws_sku_daily_sales
GROUP BY sku_code
ORDER BY revenue DESC;
sku_code
qty
revenue
SKU-A001
8
694.00
SKU-F001
2
680.00
SKU-C005
11
554.50
SKU-D020
4
486.00
SKU-E007
1
215.30
SKU-B012
1
180.90
SKU-G009
1
63.00
SKU-A001 leads in both sales volume (8 units) and revenue (694); SKU-C005 has the highest unit count (11 units) but low unit price, ranking third in total revenue. This type of sales/revenue distribution disparity is a core input for replenishment priority decisions.
Carrier timeliness summary (aggregated across weeks):
SELECT
carrier_code,
SUM(total_shipments) AS shipments,
ROUND(AVG(on_time_rate_pct), 2) AS avg_ontime_pct,
ROUND(AVG(avg_transit_days), 1) AS avg_transit
FROM best_practice_supply_chain.doc_dws_carrier_timeliness
GROUP BY carrier_code
ORDER BY avg_ontime_pct DESC;
carrier_code
shipments
avg_ontime_pct
avg_transit
YTO
2
0.00
4
SF
3
0.00
4
ZTO
1
0.00
4
BEST
1
0.00
793
JD
1
0.00
795
BEST and JD show avg_transit_days of 793 and 795 days because these shipments have a status of in_transit (undelivered), and transit_days is calculated using CURRENT_DATE() as the cutoff — this is normal behavior for historical test data. In real production data, the DWS layer serves as the data source for operational monitoring dashboards, identifying routes with abnormally high average in-transit days.
ADS Layer: Supplier SLA Reports and Inventory Alerts
The ADS layer directly serves business decisions: supplier compliance management and inventory replenishment alerts.
Create Tables
Supplier SLA Monthly Report (Dynamic Table)
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_supply_chain.doc_ads_supplier_sla_report
COMMENT 'ADS: supplier SLA compliance report — monthly delivery performance vs contracted SLA days'
AS
SELECT
sup.supplier_id,
sup.supplier_name,
sup.tier AS supplier_tier,
sup.sla_days AS contracted_sla_days,
DATE_FORMAT(o.order_date, 'yyyy-MM') AS stat_month,
COUNT(DISTINCT o.order_id) AS total_orders,
SUM(CASE WHEN dwd.delivery_flag = 'on_time' THEN 1 ELSE 0 END) AS on_time_orders,
SUM(CASE WHEN dwd.delivery_flag = 'delayed' THEN 1 ELSE 0 END) AS delayed_orders,
ROUND(
SUM(CASE WHEN dwd.delivery_flag = 'on_time' THEN 1 ELSE 0 END) * 100.0
/ NULLIF(COUNT(DISTINCT o.order_id), 0), 2
) AS on_time_rate_pct,
AVG(dwd.transit_days) AS avg_transit_days,
CASE
WHEN ROUND(
SUM(CASE WHEN dwd.delivery_flag='on_time' THEN 1 ELSE 0 END) * 100.0
/ NULLIF(COUNT(DISTINCT o.order_id), 0), 2
) >= 95 THEN 'SLA_MET'
WHEN ROUND(
SUM(CASE WHEN dwd.delivery_flag='on_time' THEN 1 ELSE 0 END) * 100.0
/ NULLIF(COUNT(DISTINCT o.order_id), 0), 2
) >= 80 THEN 'SLA_AT_RISK'
ELSE 'SLA_BREACH'
END AS sla_status
FROM best_practice_supply_chain.doc_dwd_order_events dwd
JOIN best_practice_supply_chain.doc_ods_orders o ON dwd.order_id = o.order_id
JOIN best_practice_supply_chain.doc_ods_order_items oi ON dwd.item_id = oi.item_id
JOIN best_practice_supply_chain.doc_ods_suppliers sup ON oi.warehouse_id = sup.supplier_id
WHERE dwd.order_status != 'cancelled'
AND dwd.shipment_status IS NOT NULL
GROUP BY
sup.supplier_id, sup.supplier_name, sup.tier, sup.sla_days,
DATE_FORMAT(o.order_date, 'yyyy-MM');
Inventory Alert Table (Dynamic Table, high-frequency REFRESH every 5 min)
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_supply_chain.doc_ads_inventory_alert
COMMENT 'ADS: real-time inventory alert — low stock and out-of-stock SKUs requiring reorder'
AS
SELECT
inv.snapshot_date,
inv.warehouse_id,
inv.sku_code,
inv.product_id,
inv.quantity_on_hand,
inv.available_quantity,
inv.reorder_point,
inv.stock_status,
CASE
WHEN inv.stock_status = 'out_of_stock' THEN 'URGENT'
WHEN inv.stock_status = 'low_stock' THEN 'WARNING'
ELSE NULL
END AS alert_level,
(inv.reorder_point * 2 - inv.quantity_on_hand) AS suggested_reorder_qty
FROM best_practice_supply_chain.doc_dwd_inventory_events inv
WHERE inv.stock_status IN ('out_of_stock', 'low_stock');
💡 Tip: The inventory alert table retains only SKUs that need to be addressed (WHERE stock_status IN ('out_of_stock', 'low_stock')), resulting in far fewer rows than the full DWD table. The computational overhead of a 5-minute refresh is very low.
Query ADS Alert Data
Supplier SLA monthly compliance status:
SELECT
supplier_name,
supplier_tier,
contracted_sla_days,
stat_month,
total_orders,
on_time_orders,
on_time_rate_pct,
ROUND(avg_transit_days, 1) AS avg_transit,
sla_status
FROM best_practice_supply_chain.doc_ads_supplier_sla_report
ORDER BY supplier_name;
supplier_name
supplier_tier
contracted_sla_days
stat_month
total_orders
on_time_orders
on_time_rate_pct
avg_transit
sla_status
IndiaMakers Inc.
B
7
2024-04
2
0
0.00
398.5
SLA_BREACH
ShenzhenTech Co.
A
3
2024-04
2
0
0.00
4.0
SLA_BREACH
VietnamFactory Ltd.
B
5
2024-04
3
0
0.00
267.7
SLA_BREACH
All suppliers show SLA_BREACH for the current month because the actual delivery dates for shipments in the test data all fall after the expected dates (the test data period has now expired, so delivery_flag outputs as delayed). The contracted_sla_days field comes from doc_ods_suppliers.sla_days, recording the maximum in-transit days promised in the contract; combined with avg_transit_days, you can directly assess the gap between a supplier's actual performance and their contractual commitment.
Inventory alert list:
SELECT
snapshot_date,
warehouse_id,
sku_code,
available_quantity,
reorder_point,
stock_status,
alert_level,
suggested_reorder_qty
FROM best_practice_supply_chain.doc_ads_inventory_alert
ORDER BY alert_level, warehouse_id;
snapshot_date
warehouse_id
sku_code
available_quantity
reorder_point
stock_status
alert_level
suggested_reorder_qty
2024-04-02
3
SKU-E007
0
20
out_of_stock
URGENT
40
2024-04-02
3
SKU-D020
5
30
low_stock
WARNING
35
alert_level = URGENT means inventory is exhausted and immediate replenishment is needed; alert_level = WARNING means available inventory is below the reorder point and replenishment is recommended soon. suggested_reorder_qty = reorder_point * 2 - quantity_on_hand is a simple replenishment quantity estimation formula (replenish to twice the safety stock); the coefficient can be adjusted based on actual turnover rate.
Dynamic Table Cascading Refresh Verification
Run the following query to confirm all 6 Dynamic Tables have been created and are in active status:
SHOW DYNAMIC TABLES IN best_practice_supply_chain;
schema_name
table_name
is_dynamic
best_practice_supply_chain
doc_ads_inventory_alert
true
best_practice_supply_chain
doc_ads_supplier_sla_report
true
best_practice_supply_chain
doc_dwd_inventory_events
true
best_practice_supply_chain
doc_dwd_order_events
true
best_practice_supply_chain
doc_dws_carrier_timeliness
true
best_practice_supply_chain
doc_dws_sku_daily_sales
true
Cascading dependency chain:
ODS raw tables (static)
↓
doc_dwd_order_events ← JOIN orders + order_items + shipments
doc_dwd_inventory_events ← inventory snapshots + available quantity derivation
↓ (task dependency)
doc_dws_sku_daily_sales ← aggregate by date × SKU × warehouse
doc_dws_carrier_timeliness ← aggregate by carrier × province × week
↓ (task dependency)
doc_ads_inventory_alert ← filter low-stock SKUs
doc_ads_supplier_sla_report← monthly supplier compliance rating
None of the 6 Dynamic Tables write REFRESH INTERVAL in the DDL. The refresh order is guaranteed by the scheduling dependencies of Studio Tasks (see the next section).
Configure Studio Scheduled Tasks
In production, Dynamic Table periodic refresh is managed through Studio Tasks rather than writing REFRESH INTERVAL in the DDL. The benefits: scheduling time and dependency relationships can be adjusted without rebuilding tables, and alert rules can be attached to tasks so that on-call personnel are notified promptly on refresh failures.
Schedule times alone cannot guarantee the downstream starts only after upstream completes (if the upstream runs longer than expected, DWS computation may start before the data is refreshed). Use save-config --deps to configure task dependencies for completion-based cascading triggers:
💡 Tip: Studio Tasks support attaching alert rules to tasks. For example, if on a given day refresh_dwd_order_events_sc refreshes but doc_dwd_order_events has 0 rows, you can configure an alert on the task to trigger a notification to on-call personnel. You can also configure scheduling and dependencies through the Lakehouse Studio Development -> Tasks page UI instead of using the command line.
Notes
Partition column naming: Column names in PARTITIONED BY cannot duplicate field names in columns, otherwise a key.found error is reported. Although the ODS inventory table is queried on both warehouse and date dimensions, only dt STRING is defined as the partition column; the warehouse dimension is filtered via WHERE clause rather than partition pruning.
PostgreSQL CDC table structure: Column definitions for CDC target tables must align with source table fields. Type mismatches cause the CDC task to report implicit cast not allowed errors; explicit CAST('...' AS TIMESTAMP) is needed when inserting.
OSS PIPE FILES() limitation: PIPE definitions do not support FILES('filename') or SUBDIRECTORY 'dirname' to filter specific files; only the entire Volume path can be scanned. If EDI files come from multiple logistics providers with different formats, it is recommended to create separate Volumes and PIPEs for each provider.
Dynamic Tables should not write REFRESH INTERVAL in DDL: All Dynamic Table periodic refresh is managed through Studio Tasks. Studio Tasks support configuring scheduling dependencies (downstream triggers only after upstream completes), which is more reliable than fixed time intervals. They also support attaching data quality rules and alerts to the same task; writing REFRESH INTERVAL in DDL bypasses this management mechanism.
NULLIF to prevent division by zero: When calculating SLA compliance rate, use NULLIF(COUNT(DISTINCT order_id), 0) in the denominator to avoid division-by-zero exceptions. When a supplier has no shipped orders in a given month, on_time_rate_pct returns NULL rather than an error.
Behavior of CURRENT_DATE() in Dynamic Tables: CURRENT_DATE() in Dynamic Tables is recalculated on each refresh; transit_days automatically increases over time, suitable for monitoring overdue undelivered shipments.