Industrial IoT Device Health Monitoring Data Warehouse Best Practices
Build a multi-layer data warehouse from production-line sensor real-time data to output device health scores and predictive maintenance alerts. Using a dataset of 20 industrial devices and 100 sensor events, this guide walks through the complete end-to-end Kafka PIPE → Bronze → Silver → Gold pipeline, covering three key platform capabilities: BloomFilter Index, Column Masking, and SQL UDF.
Overview
The typical data pipeline for IoT device health monitoring is: sensor reporting → real-time ingestion → raw storage (Bronze) → cleansing and labeling (Silver) → metric aggregation and alerting (Gold).
Singdata Lakehouse addresses several core challenges through the following combination:
Challenge
Solution
Real-time sensor data ingestion with high-frequency millisecond writes
Kafka PIPE continuous ingestion — no need to write custom consumers
Automatic incremental computation across Bronze → Silver → Gold
Dynamic Table with declarative SQL; the system auto-schedules the dependency chain
Sensitive fields such as device location coordinates need masking
Column Masking bound to the column, transparent to non-privileged users
device_id is a high-cardinality column with frequent point lookups
BloomFilter Index for fast on-demand filtering
Anomaly detection scoring logic is reusable
SQL UDF encapsulating the weighted health score formula
SQL Commands Used
Command / Function
Purpose
Notes
CREATE TABLE
Create the Bronze-layer raw event table and device master table
Regular tables, serving as upstream for Dynamic Tables
CREATE BLOOMFILTER INDEX
Create a BloomFilter index on the device_id column
Suitable for point-lookup filtering on high-cardinality columns
ingest_time uses DEFAULT CURRENT_TIMESTAMP(), which is auto-populated when the Kafka PIPE writes data — no need to include it in the message body.
Create BloomFilter Index
Both the Silver and Gold layers will filter by device_id. This column has cardinality on the order of the number of devices (high cardinality), making it a good candidate for a BloomFilter Index.
CREATE BLOOMFILTER INDEX IF NOT EXISTS idx_bf_device_id
ON TABLE bronze_sensor_events (device_id);
⚠️ Note: CREATE BLOOMFILTER INDEX requires the same schema context as the target table. Run USE SCHEMA iot_health or pass -s iot_health before executing, otherwise you will get an "index and table must in the same schema" error.
Configure Kafka PIPE
The Kafka PIPE will attempt to connect to the Kafka broker and verify the subscription relationship during the DDL phase. Replace KAFKA_BROKER and TOPIC with your actual values before creating the pipe.
-- First create the raw string receiving table; the PIPE writes JSON strings
CREATE TABLE IF NOT EXISTS iot_health.kafka_raw_events (value STRING);
-- Create the Kafka PIPE
CREATE PIPE IF NOT EXISTS iot_health.pipe_sensor_events
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = '60'
AS
COPY INTO iot_health.kafka_raw_events
FROM (
SELECT CAST(value AS STRING) AS value
FROM READ_KAFKA(
'<kafka-broker>:9092', -- replace with your actual broker address
'iot_sensor_events', -- topic name
'',
'cz_iot_consumer', -- consumer group ID
'','','','',
'raw', 'raw',
0,
map()
)
);
💡 Tip: In the PIPE DDL, positional parameters 5–8 of READ_KAFKA (start/end offsets, timestamps) must be left empty and are managed automatically by the PIPE runtime. Only fill them in manually when using READ_KAFKA standalone for a one-time ad-hoc query.
Once the PIPE is created it is in running state by default, consuming in batches every BATCH_INTERVAL_IN_SECONDS seconds.
Load Sample Data
Import from local CSV (recommended)
-- Step 1: Upload the local CSV file to User Volume via SQL PUT
PUT '/path/to/your/bronze_sensor_events.csv' TO USER VOLUME FILE 'bronze_sensor_events.csv';
-- Step 2: COPY INTO the table from User Volume
COPY INTO iot_health.bronze_sensor_events
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('bronze_sensor_events.csv');
You can also insert a small batch of test data inline (no CSV file required).
This guide uses direct INSERT to simulate the effect of Kafka messages being parsed and written:
INSERT INTO iot_health.bronze_sensor_events
(event_id, device_id, device_type, temperature, vibration,
pressure, humidity, fault_label, error_code, event_time)
VALUES
('EVT001','DEV001','pump', 72.3,3.2, 98.5,45.2,0,NULL, CAST('2026-06-01 08:00:00' AS TIMESTAMP)),
('EVT003','DEV003','compressor',91.5,4.1,88.3,38.7,1,'E001',CAST('2026-06-01 08:02:00' AS TIMESTAMP)),
('EVT005','DEV005','motor', 55.2,8.9,101.2,50.0,1,'E002', CAST('2026-06-01 08:04:00' AS TIMESTAMP)),
('EVT006','DEV006','valve', 63.4,1.5,130.0,41.5,1,'E003', CAST('2026-06-01 08:05:00' AS TIMESTAMP))
-- ... full 100 rows omitted here
;
Verify the Bronze layer row count:
SELECT COUNT(*) AS bronze_row_count FROM iot_health.bronze_sensor_events;
bronze_row_count
----------------
100
Device Master Table and Column Masking
Create Table
CREATE TABLE IF NOT EXISTS iot_health.device_master (
device_id STRING,
device_name STRING,
device_type STRING,
location_lat DOUBLE, -- sensitive field: latitude
location_lon DOUBLE, -- sensitive field: longitude
install_date DATE,
manufacturer STRING,
model STRING,
status STRING
);
Load Device Master Data
Import from local CSV (recommended)
-- Step 1: Upload the local CSV file to User Volume via SQL PUT
PUT '/path/to/your/device_master.csv' TO USER VOLUME FILE 'device_master.csv';
-- Step 2: COPY INTO the table from User Volume
COPY INTO iot_health.device_master
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('device_master.csv');
You can also insert a small batch of test data inline (no CSV file required):
INSERT INTO iot_health.device_master VALUES
('DEV001','Pump-Alpha-01', 'pump', 31.2304,121.4737,CAST('2022-03-15' AS DATE),'SiemensCN','P300','active'),
('DEV002','Motor-Beta-01', 'motor', 31.2310,121.4740,CAST('2022-04-20' AS DATE),'ABB', 'M500','active'),
('DEV003','Compressor-Gamma-01','compressor',31.2315,121.4745,CAST('2021-11-10' AS DATE),'Atlas', 'C200','active')
-- ... full 20 rows omitted here
;
Create Masking Function and Bind to Coordinate Columns
location_lat and location_lon represent the device installation location and are sensitive data. The approach: privileged accounts (usernames listed in the masking policy) see the original precision; other users see coordinates rounded to 1 decimal place.
-- Create the masking function
CREATE OR REPLACE FUNCTION iot_health.mask_location_coord(coord DOUBLE)
RETURNS DOUBLE
AS CASE
WHEN current_user() IN ('privileged_user') THEN coord -- replace with actual authorized username
ELSE ROUND(coord, 1)
END;
-- Bind to location_lat
ALTER TABLE iot_health.device_master
CHANGE COLUMN location_lat
SET MASK iot_health.mask_location_coord;
-- Bind to location_lon
ALTER TABLE iot_health.device_master
CHANGE COLUMN location_lon
SET MASK iot_health.mask_location_coord;
💡 Tip: Replace 'privileged_user' with the actual usernames that need to see plaintext data. Column Masking uses the current_user() function to match the current connection's username. All authorized usernames must be explicitly listed in the IN() list.
⚠️ Note: The Column Masking effect applies transparently to all queries (including Dynamic Tables). When the Silver layer JOINs device_master, non-privileged users will see low-precision masked coordinates.
Verify the masking behavior:
-- Admin account query (sees original precision)
SELECT device_id, location_lat, location_lon FROM iot_health.device_master LIMIT 3;
CREATE OR REPLACE FUNCTION iot_health.calc_health_score(
temperature DOUBLE,
vibration DOUBLE,
pressure DOUBLE,
fault_label INT
)
RETURNS DOUBLE
AS GREATEST(0.0, LEAST(100.0,
100.0
- (temperature / 100.0 * 30.0)
- (vibration / 10.0 * 30.0)
- (pressure / 200.0 * 20.0)
- (fault_label * 20.0)
));
Verify the function:
-- Normal device: temperature 75, vibration 3.5, pressure 99, no fault
SELECT iot_health.calc_health_score(75.0, 3.5, 99.0, 0) AS sample_score;
sample_score
------------
57.1
💡 Tip: A health score of 57.1 falls in the YELLOW zone (RED alert triggers below 60). This indicates that temperature and vibration metrics have a significant impact on the score. Adjust the weight coefficients as needed.
Silver Layer Dynamic Table: Cleansing and Anomaly Labeling
The Silver layer performs two tasks on top of the Bronze raw events:
LEFT JOIN device_master to enrich each event with device name, manufacturer, install date, and other dimension fields
Mark anomalies (is_anomaly) and compute risk scores (risk_score) so the Gold layer can aggregate directly
CREATE DYNAMIC TABLE IF NOT EXISTS iot_health.silver_device_events
REFRESH INTERVAL 1 MINUTE VCLUSTER DEFAULT
AS
SELECT
e.event_id,
e.device_id,
e.device_type,
e.temperature,
e.vibration,
e.pressure,
e.humidity,
e.fault_label,
e.error_code,
e.event_time,
e.ingest_time,
d.device_name,
d.manufacturer,
d.model,
d.install_date,
d.status AS device_status,
-- Anomaly flag: temperature > 90, or vibration > 8, or pressure > 120
CASE WHEN e.temperature > 90 OR e.vibration > 8 OR e.pressure > 120
THEN 1 ELSE 0 END AS is_anomaly,
-- Weighted risk score (higher = more dangerous)
ROUND(e.temperature / 100.0 + e.vibration / 10.0 + e.pressure / 200.0, 4) AS risk_score
FROM iot_health.bronze_sensor_events e
LEFT JOIN iot_health.device_master d ON e.device_id = d.device_id;
Alert threshold for Class B machinery per ISO 10816 standard
pressure
> 120 bar
Typical upper design pressure limit for industrial pipelines
Manually trigger the initial refresh:
REFRESH DYNAMIC TABLE iot_health.silver_device_events;
SELECT COUNT(*) AS silver_count FROM iot_health.silver_device_events;
silver_count
------------
100
Gold Layer Dynamic Table: Device-Level Aggregation and Alerting
The Gold layer aggregates Silver data at device_id + hourly window granularity, calls the calc_health_score UDF to compute health scores, and outputs three alert levels.
CREATE DYNAMIC TABLE IF NOT EXISTS iot_health.gold_device_health
REFRESH INTERVAL 1 MINUTE VCLUSTER DEFAULT
AS
SELECT
device_id,
device_name,
manufacturer,
model,
device_status,
DATE_TRUNC('hour', event_time) AS hour_window,
COUNT(*) AS event_count,
ROUND(AVG(temperature), 2) AS avg_temperature,
ROUND(AVG(vibration), 2) AS avg_vibration,
ROUND(AVG(pressure), 2) AS avg_pressure,
SUM(is_anomaly) AS anomaly_count,
ROUND(iot_health.calc_health_score(
AVG(temperature),
AVG(vibration),
AVG(pressure),
CAST(MAX(fault_label) AS INT)
), 2) AS health_score,
CASE
WHEN iot_health.calc_health_score(
AVG(temperature), AVG(vibration),
AVG(pressure), CAST(MAX(fault_label) AS INT)
) >= 80 THEN 'GREEN'
WHEN iot_health.calc_health_score(
AVG(temperature), AVG(vibration),
AVG(pressure), CAST(MAX(fault_label) AS INT)
) >= 60 THEN 'YELLOW'
ELSE 'RED'
END AS alert_level
FROM iot_health.silver_device_events
GROUP BY
device_id, device_name, manufacturer, model, device_status,
DATE_TRUNC('hour', event_time);
MAX(fault_label) captures the most severe fault state within the window (if any single event has a fault, MAX = 1), preventing average values from masking transient faults.
Manually trigger the initial refresh and view results:
REFRESH DYNAMIC TABLE iot_health.gold_device_health;
SELECT device_id, device_name, hour_window, avg_temperature,
avg_vibration, avg_pressure, anomaly_count, health_score, alert_level
FROM iot_health.gold_device_health
ORDER BY health_score ASC
LIMIT 10;
DEV012 (Motor-Beta-04) is continuously in RED status across all observed hours. The root cause is elevated temperature (88–91°C) and vibration (6.5–7.5 mm/s). Although neither single metric exceeds the threshold (temperature threshold is 90), the weighted health score drops to 20–21. Prioritize inspection of motor cooling and bearing wear.
DEV019 (Motor-Beta-06) has vibration consistently above 8 mm/s (alert threshold), which is the direct cause of RED alerts. Additionally, fault_label=1 deducts 20 points. Inspect shaft alignment immediately.
In the current dataset, 83 device-hour records are RED and 17 are YELLOW, with no GREEN — indicating the simulated dataset contains many high-load scenarios.
View alert level distribution:
SELECT alert_level, COUNT(*) AS device_hour_count
FROM iot_health.gold_device_health
GROUP BY alert_level
ORDER BY alert_level;
alert_level | device_hour_count
------------+------------------
RED | 83
YELLOW | 17
Data Warehouse Object Summary
After completing the full build, objects in the iot_health schema:
BloomFilter Index does not automatically apply to existing data: CREATE BLOOMFILTER INDEX only accelerates new data written after the index is created. If the table already contains a large amount of existing data, the BloomFilter filtering acceleration is limited. The BLOOMFILTER index type does not support BUILD INDEX; to cover existing data you would need to rebuild the table.
Dynamic Table incremental refresh depends on Bronze layer change tracking: The first REFRESH performs a full snapshot computation. Subsequent incremental refreshes only process rows added or changed in the Bronze layer since the last refresh point. If the Bronze layer uses INSERT OVERWRITE, Dynamic Tables will fall back to full refresh.
Semantics of MAX(fault_label) in calc_health_score: The Gold layer uses MAX(fault_label) rather than AVG(fault_label) so that a single fault event can pull the entire hourly window into RED status. If the business requires "alert only when more than 50% of events are faults", change it to CASE WHEN AVG(fault_label) > 0.5 THEN 1 ELSE 0 END.
Column Masking applies transparently to Dynamic Tables: When the Silver layer queries device_master, non-privileged users receive low-precision masked location_lat/location_lon values, and Silver/Gold stores the masked values as well. If high-precision coordinates are needed for spatial analysis, query device_master directly using a privileged account.
⚠️ Manual verification required: Column Masking currently matches authorization by username via current_user(). All usernames that need to see plaintext data must be added one by one to the whitelist in the masking function. If your Lakehouse version supports role-based dynamic evaluation (e.g., HAS_ROLE('role_name')), you can use roles instead of username lists for more flexible management. Contact Singdata technical support to confirm whether this function is supported in your version.