Industrial IoT Device Health Monitoring Data Warehouse Best Practices

Build a multi-layer data warehouse from production-line sensor real-time data to output device health scores and predictive maintenance alerts. Using a dataset of 20 industrial devices and 100 sensor events, this guide walks through the complete end-to-end Kafka PIPE → Bronze → Silver → Gold pipeline, covering three key platform capabilities: BloomFilter Index, Column Masking, and SQL UDF.


Overview

The typical data pipeline for IoT device health monitoring is: sensor reporting → real-time ingestion → raw storage (Bronze) → cleansing and labeling (Silver) → metric aggregation and alerting (Gold).

Singdata Lakehouse addresses several core challenges through the following combination:

ChallengeSolution
Real-time sensor data ingestion with high-frequency millisecond writesKafka PIPE continuous ingestion — no need to write custom consumers
Automatic incremental computation across Bronze → Silver → GoldDynamic Table with declarative SQL; the system auto-schedules the dependency chain
Sensitive fields such as device location coordinates need maskingColumn Masking bound to the column, transparent to non-privileged users
device_id is a high-cardinality column with frequent point lookupsBloomFilter Index for fast on-demand filtering
Anomaly detection scoring logic is reusableSQL UDF encapsulating the weighted health score formula

SQL Commands Used

Command / FunctionPurposeNotes
CREATE TABLECreate the Bronze-layer raw event table and device master tableRegular tables, serving as upstream for Dynamic Tables
CREATE BLOOMFILTER INDEXCreate a BloomFilter index on the device_id columnSuitable for point-lookup filtering on high-cardinality columns
CREATE PIPECreate a Kafka continuous ingestion pipelineBound to the Bronze-layer target table
CREATE FUNCTIONCreate SQL UDF calc_health_scoreEncapsulates the weighted health scoring formula
ALTER TABLE ... CHANGE COLUMN ... SET MASKBind a Column Masking policyMasks sensitive latitude/longitude columns
CREATE DYNAMIC TABLECreate Silver / Gold layer incremental computation tablesSystem automatically detects upstream changes and refreshes incrementally
REFRESH DYNAMIC TABLEManually trigger a single refreshUsed during initial build or debugging

Prerequisites

All examples in this guide run under the iot_health schema.

CREATE SCHEMA IF NOT EXISTS iot_health;


Bronze Layer: Raw Sensor Event Table

Create Table

CREATE TABLE IF NOT EXISTS iot_health.bronze_sensor_events ( event_id STRING, device_id STRING, device_type STRING, temperature DOUBLE, vibration DOUBLE, pressure DOUBLE, humidity DOUBLE, fault_label INT, error_code STRING, event_time TIMESTAMP, ingest_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

ingest_time uses DEFAULT CURRENT_TIMESTAMP(), which is auto-populated when the Kafka PIPE writes data — no need to include it in the message body.

Create BloomFilter Index

Both the Silver and Gold layers will filter by device_id. This column has cardinality on the order of the number of devices (high cardinality), making it a good candidate for a BloomFilter Index.

CREATE BLOOMFILTER INDEX IF NOT EXISTS idx_bf_device_id ON TABLE bronze_sensor_events (device_id);

Configure Kafka PIPE

The Kafka PIPE will attempt to connect to the Kafka broker and verify the subscription relationship during the DDL phase. Replace KAFKA_BROKER and TOPIC with your actual values before creating the pipe.

-- First create the raw string receiving table; the PIPE writes JSON strings CREATE TABLE IF NOT EXISTS iot_health.kafka_raw_events (value STRING); -- Create the Kafka PIPE CREATE PIPE IF NOT EXISTS iot_health.pipe_sensor_events VIRTUAL_CLUSTER = 'DEFAULT' BATCH_INTERVAL_IN_SECONDS = '60' AS COPY INTO iot_health.kafka_raw_events FROM ( SELECT CAST(value AS STRING) AS value FROM READ_KAFKA( '<kafka-broker>:9092', -- replace with your actual broker address 'iot_sensor_events', -- topic name '', 'cz_iot_consumer', -- consumer group ID '','','','', 'raw', 'raw', 0, map() ) );

Once the PIPE is created it is in running state by default, consuming in batches every BATCH_INTERVAL_IN_SECONDS seconds.

Load Sample Data

Import from local CSV (recommended)

-- Step 1: Upload the local CSV file to User Volume via SQL PUT PUT '/path/to/your/bronze_sensor_events.csv' TO USER VOLUME FILE 'bronze_sensor_events.csv';

-- Step 2: COPY INTO the table from User Volume COPY INTO iot_health.bronze_sensor_events FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('bronze_sensor_events.csv');

You can also insert a small batch of test data inline (no CSV file required).

This guide uses direct INSERT to simulate the effect of Kafka messages being parsed and written:

INSERT INTO iot_health.bronze_sensor_events (event_id, device_id, device_type, temperature, vibration, pressure, humidity, fault_label, error_code, event_time) VALUES ('EVT001','DEV001','pump', 72.3,3.2, 98.5,45.2,0,NULL, CAST('2026-06-01 08:00:00' AS TIMESTAMP)), ('EVT003','DEV003','compressor',91.5,4.1,88.3,38.7,1,'E001',CAST('2026-06-01 08:02:00' AS TIMESTAMP)), ('EVT005','DEV005','motor', 55.2,8.9,101.2,50.0,1,'E002', CAST('2026-06-01 08:04:00' AS TIMESTAMP)), ('EVT006','DEV006','valve', 63.4,1.5,130.0,41.5,1,'E003', CAST('2026-06-01 08:05:00' AS TIMESTAMP)) -- ... full 100 rows omitted here ;

Verify the Bronze layer row count:

SELECT COUNT(*) AS bronze_row_count FROM iot_health.bronze_sensor_events;

bronze_row_count ---------------- 100


Device Master Table and Column Masking

Create Table

CREATE TABLE IF NOT EXISTS iot_health.device_master ( device_id STRING, device_name STRING, device_type STRING, location_lat DOUBLE, -- sensitive field: latitude location_lon DOUBLE, -- sensitive field: longitude install_date DATE, manufacturer STRING, model STRING, status STRING );

Load Device Master Data

Import from local CSV (recommended)

-- Step 1: Upload the local CSV file to User Volume via SQL PUT PUT '/path/to/your/device_master.csv' TO USER VOLUME FILE 'device_master.csv';

-- Step 2: COPY INTO the table from User Volume COPY INTO iot_health.device_master FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('device_master.csv');

You can also insert a small batch of test data inline (no CSV file required):

INSERT INTO iot_health.device_master VALUES ('DEV001','Pump-Alpha-01', 'pump', 31.2304,121.4737,CAST('2022-03-15' AS DATE),'SiemensCN','P300','active'), ('DEV002','Motor-Beta-01', 'motor', 31.2310,121.4740,CAST('2022-04-20' AS DATE),'ABB', 'M500','active'), ('DEV003','Compressor-Gamma-01','compressor',31.2315,121.4745,CAST('2021-11-10' AS DATE),'Atlas', 'C200','active') -- ... full 20 rows omitted here ;

Create Masking Function and Bind to Coordinate Columns

location_lat and location_lon represent the device installation location and are sensitive data. The approach: privileged accounts (usernames listed in the masking policy) see the original precision; other users see coordinates rounded to 1 decimal place.

-- Create the masking function CREATE OR REPLACE FUNCTION iot_health.mask_location_coord(coord DOUBLE) RETURNS DOUBLE AS CASE WHEN current_user() IN ('privileged_user') THEN coord -- replace with actual authorized username ELSE ROUND(coord, 1) END; -- Bind to location_lat ALTER TABLE iot_health.device_master CHANGE COLUMN location_lat SET MASK iot_health.mask_location_coord; -- Bind to location_lon ALTER TABLE iot_health.device_master CHANGE COLUMN location_lon SET MASK iot_health.mask_location_coord;

Verify the masking behavior:

-- Admin account query (sees original precision) SELECT device_id, location_lat, location_lon FROM iot_health.device_master LIMIT 3;

device_id | location_lat | location_lon ----------+--------------+------------- DEV001 | 31.2304 | 121.4737 DEV002 | 31.231 | 121.474 DEV003 | 31.2315 | 121.4745


Health Score UDF

Encapsulate the anomaly detection scoring logic in a SQL UDF so it can be reused by both the Silver and Gold layers.

Scoring formula: 100 - (temperature/100 × 30 + vibration/10 × 30 + pressure/200 × 20 + fault_label × 20), capped at 100, floored at 0.

CREATE OR REPLACE FUNCTION iot_health.calc_health_score( temperature DOUBLE, vibration DOUBLE, pressure DOUBLE, fault_label INT ) RETURNS DOUBLE AS GREATEST(0.0, LEAST(100.0, 100.0 - (temperature / 100.0 * 30.0) - (vibration / 10.0 * 30.0) - (pressure / 200.0 * 20.0) - (fault_label * 20.0) ));

Verify the function:

-- Normal device: temperature 75, vibration 3.5, pressure 99, no fault SELECT iot_health.calc_health_score(75.0, 3.5, 99.0, 0) AS sample_score;

sample_score ------------ 57.1


Silver Layer Dynamic Table: Cleansing and Anomaly Labeling

The Silver layer performs two tasks on top of the Bronze raw events:

  1. LEFT JOIN device_master to enrich each event with device name, manufacturer, install date, and other dimension fields
  2. Mark anomalies (is_anomaly) and compute risk scores (risk_score) so the Gold layer can aggregate directly

CREATE DYNAMIC TABLE IF NOT EXISTS iot_health.silver_device_events REFRESH INTERVAL 1 MINUTE VCLUSTER DEFAULT AS SELECT e.event_id, e.device_id, e.device_type, e.temperature, e.vibration, e.pressure, e.humidity, e.fault_label, e.error_code, e.event_time, e.ingest_time, d.device_name, d.manufacturer, d.model, d.install_date, d.status AS device_status, -- Anomaly flag: temperature > 90, or vibration > 8, or pressure > 120 CASE WHEN e.temperature > 90 OR e.vibration > 8 OR e.pressure > 120 THEN 1 ELSE 0 END AS is_anomaly, -- Weighted risk score (higher = more dangerous) ROUND(e.temperature / 100.0 + e.vibration / 10.0 + e.pressure / 200.0, 4) AS risk_score FROM iot_health.bronze_sensor_events e LEFT JOIN iot_health.device_master d ON e.device_id = d.device_id;

Anomaly threshold explanation:

MetricThresholdRationale
temperature> 90 °CCritical overheating point; sustained exceedance damages insulation
vibration> 8 mm/sAlert threshold for Class B machinery per ISO 10816 standard
pressure> 120 barTypical upper design pressure limit for industrial pipelines

Manually trigger the initial refresh:

REFRESH DYNAMIC TABLE iot_health.silver_device_events; SELECT COUNT(*) AS silver_count FROM iot_health.silver_device_events;

silver_count ------------ 100


Gold Layer Dynamic Table: Device-Level Aggregation and Alerting

The Gold layer aggregates Silver data at device_id + hourly window granularity, calls the calc_health_score UDF to compute health scores, and outputs three alert levels.

CREATE DYNAMIC TABLE IF NOT EXISTS iot_health.gold_device_health REFRESH INTERVAL 1 MINUTE VCLUSTER DEFAULT AS SELECT device_id, device_name, manufacturer, model, device_status, DATE_TRUNC('hour', event_time) AS hour_window, COUNT(*) AS event_count, ROUND(AVG(temperature), 2) AS avg_temperature, ROUND(AVG(vibration), 2) AS avg_vibration, ROUND(AVG(pressure), 2) AS avg_pressure, SUM(is_anomaly) AS anomaly_count, ROUND(iot_health.calc_health_score( AVG(temperature), AVG(vibration), AVG(pressure), CAST(MAX(fault_label) AS INT) ), 2) AS health_score, CASE WHEN iot_health.calc_health_score( AVG(temperature), AVG(vibration), AVG(pressure), CAST(MAX(fault_label) AS INT) ) >= 80 THEN 'GREEN' WHEN iot_health.calc_health_score( AVG(temperature), AVG(vibration), AVG(pressure), CAST(MAX(fault_label) AS INT) ) >= 60 THEN 'YELLOW' ELSE 'RED' END AS alert_level FROM iot_health.silver_device_events GROUP BY device_id, device_name, manufacturer, model, device_status, DATE_TRUNC('hour', event_time);

MAX(fault_label) captures the most severe fault state within the window (if any single event has a fault, MAX = 1), preventing average values from masking transient faults.

Manually trigger the initial refresh and view results:

REFRESH DYNAMIC TABLE iot_health.gold_device_health; SELECT device_id, device_name, hour_window, avg_temperature, avg_vibration, avg_pressure, anomaly_count, health_score, alert_level FROM iot_health.gold_device_health ORDER BY health_score ASC LIMIT 10;

device_id | device_name | hour_window | avg_temperature | avg_vibration | avg_pressure | anomaly_count | health_score | alert_level ----------+---------------+---------------------+-----------------+---------------+--------------+---------------+--------------+------------ DEV012 | Motor-Beta-04 | 2026-06-01T08:00:00 | 88.7 | 7.5 | 108.0 | 0 | 20.09 | RED DEV012 | Motor-Beta-04 | 2026-06-01T09:00:00 | 87.9 | 7.2 | 109.3 | 0 | 21.10 | RED DEV012 | Motor-Beta-04 | 2026-06-01T10:00:00 | 89.1 | 7.0 | 110.5 | 0 | 21.22 | RED DEV012 | Motor-Beta-04 | 2026-06-01T11:00:00 | 90.2 | 6.8 | 111.7 | 1 | 21.37 | RED DEV019 | Motor-Beta-06 | 2026-06-01T12:00:00 | 60.8 | 10.4 | 90.4 | 1 | 21.52 | RED DEV012 | Motor-Beta-04 | 2026-06-01T12:00:00 | 91.4 | 6.5 | 112.8 | 1 | 21.80 | RED DEV019 | Motor-Beta-06 | 2026-06-01T11:00:00 | 59.9 | 10.1 | 89.7 | 1 | 22.76 | RED DEV019 | Motor-Beta-06 | 2026-06-01T10:00:00 | 59.0 | 9.8 | 89.0 | 1 | 24.00 | RED DEV015 | Motor-Beta-05 | 2026-06-01T12:00:00 | 96.8 | 5.6 | 92.3 | 1 | 24.93 | RED DEV019 | Motor-Beta-06 | 2026-06-01T09:00:00 | 58.1 | 9.5 | 88.3 | 1 | 25.24 | RED

Result interpretation:

  • DEV012 (Motor-Beta-04) is continuously in RED status across all observed hours. The root cause is elevated temperature (88–91°C) and vibration (6.5–7.5 mm/s). Although neither single metric exceeds the threshold (temperature threshold is 90), the weighted health score drops to 20–21. Prioritize inspection of motor cooling and bearing wear.
  • DEV019 (Motor-Beta-06) has vibration consistently above 8 mm/s (alert threshold), which is the direct cause of RED alerts. Additionally, fault_label=1 deducts 20 points. Inspect shaft alignment immediately.
  • In the current dataset, 83 device-hour records are RED and 17 are YELLOW, with no GREEN — indicating the simulated dataset contains many high-load scenarios.

View alert level distribution:

SELECT alert_level, COUNT(*) AS device_hour_count FROM iot_health.gold_device_health GROUP BY alert_level ORDER BY alert_level;

alert_level | device_hour_count ------------+------------------ RED | 83 YELLOW | 17


Data Warehouse Object Summary

After completing the full build, objects in the iot_health schema:

SHOW TABLES IN iot_health;

schema_name | table_name | is_dynamic ------------+----------------------+----------- iot_health | bronze_sensor_events | false iot_health | device_master | false iot_health | kafka_raw_events | false iot_health | silver_device_events | true iot_health | gold_device_health | true

Architecture diagram:

Kafka (real-time) │ ▼ pipe_sensor_events (Kafka PIPE) kafka_raw_events bronze_sensor_events │ ← INSERT (simulated / production writes) │ BloomFilter Index (device_id) │ device_master (device master data) Column Masking (location_lat / location_lon) │ ▼ REFRESH INTERVAL 1 MINUTE silver_device_events (Dynamic Table) is_anomaly / risk_score / dimension enrichment │ ▼ REFRESH INTERVAL 1 MINUTE gold_device_health (Dynamic Table) health_score (calc_health_score UDF) alert_level (GREEN / YELLOW / RED)


Notes

  • BloomFilter Index does not automatically apply to existing data: CREATE BLOOMFILTER INDEX only accelerates new data written after the index is created. If the table already contains a large amount of existing data, the BloomFilter filtering acceleration is limited. The BLOOMFILTER index type does not support BUILD INDEX; to cover existing data you would need to rebuild the table.

  • Dynamic Table incremental refresh depends on Bronze layer change tracking: The first REFRESH performs a full snapshot computation. Subsequent incremental refreshes only process rows added or changed in the Bronze layer since the last refresh point. If the Bronze layer uses INSERT OVERWRITE, Dynamic Tables will fall back to full refresh.

  • Semantics of MAX(fault_label) in calc_health_score: The Gold layer uses MAX(fault_label) rather than AVG(fault_label) so that a single fault event can pull the entire hourly window into RED status. If the business requires "alert only when more than 50% of events are faults", change it to CASE WHEN AVG(fault_label) > 0.5 THEN 1 ELSE 0 END.

  • Column Masking applies transparently to Dynamic Tables: When the Silver layer queries device_master, non-privileged users receive low-precision masked location_lat/location_lon values, and Silver/Gold stores the masked values as well. If high-precision coordinates are needed for spatial analysis, query device_master directly using a privileged account.