DataOps Pipeline Data Quality Gates Best Practices
Automatically running assertion checks after each pipeline layer refresh, isolating non-conforming data into a Quarantine zone, and triggering alerts when a gate fails — these are the core components of a DataOps closed-loop quality control system. This guide uses an e-commerce event stream as the dataset, demonstrating the complete end-to-end setup of Bronze → Quality Gate → Passed (Silver) / Quarantine, and covers three key capabilities: Dynamic Table built-in quality filtering, Lakehouse Studio task DAG dependency orchestration, and information_schema.job_history trend tracking.
Overview
The core challenge for DataOps data quality is: after the data warehouse refreshes, how do you know whether the data that just came in is trustworthy? Singdata Lakehouse addresses this with the following combination:
Problem
Solution
Bronze layer has null values, negative numbers, and out-of-range anomalies
Dynamic Table built-in WHERE filter — only rows that pass checks flow into Silver
Non-conforming data needs separate storage for manual review
doc_events_quarantine Dynamic Table automatically labels rows with quarantine_reason
Quality check results need to be traceable and trend-analyzable
doc_quality_results + doc_quality_summary persist check details from each run
The order between pipeline refresh and quality checks must be guaranteed
Studio Task DAG: refresh Passed/Quarantine first → then refresh Summary → then evaluate gate → then trigger Gold
Gate failures need timely push notifications
Studio Task configures Webhook to push alerts to Lark/DingTalk operations channels
SQL Commands Used
Command / Function
Purpose
Notes
CREATE TABLE
Create Bronze raw event table, quality rules table, and quality results table
Static tables serving as upstream data sources for Dynamic Tables
CREATE DYNAMIC TABLE
Create Passed / Quarantine / Summary layers
No REFRESH INTERVAL — scheduling managed by Studio Tasks
REFRESH DYNAMIC TABLE
Manually trigger first refresh
Used for initial builds or debugging
FILTER (WHERE ...)
Conditional filtering in aggregate functions, counts rows of each dirty data type
Used for failed row count calculation in quality rules
SHOW TABLES
View all objects under a schema
Confirm table creation status
sys.information_schema.job_history
Track elapsed time and status of each Dynamic Table refresh
Analyze quality check run trends
Prerequisites
All examples in this guide run under the best_practice_dataops_quality schema.
CREATE SCHEMA IF NOT EXISTS best_practice_dataops_quality;
Bronze Layer: Raw Event Table with Dirty Data
Create the Table
CREATE TABLE IF NOT EXISTS best_practice_dataops_quality.doc_raw_events (
event_id STRING,
user_id STRING,
event_type STRING,
amount DOUBLE,
ts TIMESTAMP,
region STRING,
platform STRING,
status STRING
);
Insert Sample Data (with Dirty Data)
The dataset intentionally includes four types of quality problems: user_id IS NULL (3 rows), amount IS NULL (4 rows), purchase type with amount < 0 (1 row: EVT012), refund type with amount < 0 (4 rows: EVT003/EVT011/EVT021/EVT028 — valid negative refunds), amount > 10000 (2 rows), and logical cross-event duplicates (EVT001/EVT006/EVT016 have the same user_id + ts + amount).
Import from a local CSV file (recommended):
-- Step 1: Upload the local CSV file to User Volume via SQL PUT
PUT '/path/to/doc_raw_events.csv' TO USER VOLUME FILE 'doc_raw_events.csv';
-- Step 2: COPY INTO the table from User Volume
COPY INTO best_practice_dataops_quality.doc_raw_events
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('doc_raw_events.csv');
You can also directly insert a small batch of test data inline (no CSV file needed):
SELECT COUNT(*) AS total_rows FROM best_practice_dataops_quality.doc_raw_events;
total_rows
----------
33
Quality Rule Definitions
Quality rules are maintained in a table-driven way. Each rule includes: a check SQL expression (returning failure rate), an acceptable threshold, and an alert severity level.
-- Step 1: Upload the local CSV file to User Volume via SQL PUT
PUT '/path/to/doc_quality_rules.csv' TO USER VOLUME FILE 'doc_quality_rules.csv';
-- Step 2: COPY INTO the table from User Volume
COPY INTO best_practice_dataops_quality.doc_quality_rules
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('doc_quality_rules.csv');
You can also directly insert a small batch of test data inline (no CSV file needed):
INSERT INTO best_practice_dataops_quality.doc_quality_rules VALUES
('R001','user_id_not_null', 'doc_raw_events','user_id',
'COUNT(*) FILTER (WHERE user_id IS NULL) * 1.0 / COUNT(*)',
0.02, 'ERROR', 'user_id null rate must be below 2%'),
('R002','amount_not_null', 'doc_raw_events','amount',
'COUNT(*) FILTER (WHERE amount IS NULL) * 1.0 / COUNT(*)',
0.05, 'ERROR', 'amount null rate must be below 5%'),
('R003','amount_positive', 'doc_raw_events','amount',
'COUNT(*) FILTER (WHERE event_type = ''purchase'' AND amount < 0) * 1.0 / COUNT(*)',
0.0, 'ERROR', 'purchase amount must not be negative'),
('R004','amount_range_check', 'doc_raw_events','amount',
'COUNT(*) FILTER (WHERE amount > 10000) * 1.0 / COUNT(*)',
0.01, 'WARNING', 'amount > 10000 rate must be below 1%'),
('R005','event_type_whitelist', 'doc_raw_events','event_type',
'COUNT(*) FILTER (WHERE event_type NOT IN (''purchase'',''refund'',''login'')) * 1.0 / COUNT(*)',
0.0, 'ERROR', 'event_type must be in whitelist'),
('R006','status_whitelist', 'doc_raw_events','status',
'COUNT(*) FILTER (WHERE status NOT IN (''completed'',''pending'',''error'')) * 1.0 / COUNT(*)',
0.0, 'ERROR', 'status must be in whitelist'),
('R007','duplicate_event_id', 'doc_raw_events','event_id',
'(COUNT(*) - COUNT(DISTINCT event_id)) * 1.0 / COUNT(*)',
0.0, 'ERROR', 'event_id must be unique'),
('R008','ts_not_future', 'doc_raw_events','ts',
'COUNT(*) FILTER (WHERE ts > CURRENT_TIMESTAMP()) * 1.0 / COUNT(*)',
0.0, 'ERROR', 'ts must not be in the future'),
('R009','region_not_null', 'doc_raw_events','region',
'COUNT(*) FILTER (WHERE region IS NULL) * 1.0 / COUNT(*)',
0.0, 'WARNING', 'region should not be null'),
('R010','duplicate_events', 'doc_raw_events','*',
'(COUNT(*) - COUNT(DISTINCT event_id || CAST(ts AS STRING) || COALESCE(user_id,''''))) * 1.0 / COUNT(*)',
0.05,'WARNING', 'logical duplicate rate must be below 5%');
The sql_expr field in each rule stores a directly executable SQL expression. The actual check is performed by a quality check task that dynamically concatenates and runs the expression. threshold = 0.0 means zero tolerance — any single failed row triggers an alert.
Results interpretation:user_id null rate is 9.09%, far exceeding R001's threshold of 2%; amount null rate is 12.12%, exceeding R002's threshold of 5%; purchase type negative rate is 3.03%, triggering zero-tolerance rule R003 (negative values for refund type are valid refunds and are not counted); out-of-range rate is 6.06%, exceeding R004's threshold of 1%. event_id has no duplicates itself (R007 passes), but logical duplicates exist (events with the same user_id + ts + amount).
Write Check Results
Import from a local CSV file (recommended):
-- Step 1: Upload the local CSV file to User Volume via SQL PUT
PUT '/path/to/doc_quality_results.csv' TO USER VOLUME FILE 'doc_quality_results.csv';
-- Step 2: COPY INTO the table from User Volume
COPY INTO best_practice_dataops_quality.doc_quality_results
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('doc_quality_results.csv');
You can also directly insert a small batch of test data inline (no CSV file needed):
Quality Gate Layer: Passed and Quarantine Dynamic Tables
Passed Layer: Let Only Clean Data Flow into Silver
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_dataops_quality.doc_events_passed
AS
SELECT
event_id,
user_id,
event_type,
amount,
ts,
region,
platform,
status
FROM best_practice_dataops_quality.doc_raw_events
WHERE user_id IS NOT NULL
AND amount IS NOT NULL
AND (event_type != 'purchase' OR amount >= 0)
AND amount <= 10000
AND event_type IN ('purchase','refund','login')
AND status IN ('completed','pending','error');
⚠️ Note: The Dynamic Table DDL does not include REFRESH INTERVAL. Refresh scheduling is managed by Studio Tasks (see the "Studio Task DAG Orchestration" section).
💡 Tip: The WHERE conditions correspond directly to quality rules R001–R006. (event_type != 'purchase' OR amount >= 0) means only purchase type is forced to have amount >= 0. Negative amounts for refund type (refunds) are valid and should not be filtered. After each new data write to the Bronze layer, the Dynamic Table incrementally refreshes, and only rows that pass all gate conditions will appear in doc_events_passed.
Quarantine Layer: Store Non-Conforming Data in Isolation
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_dataops_quality.doc_events_quarantine
AS
SELECT
event_id,
user_id,
event_type,
amount,
ts,
region,
platform,
status,
CASE
WHEN user_id IS NULL THEN 'null_user_id'
WHEN amount IS NULL THEN 'null_amount'
WHEN event_type = 'purchase' AND amount < 0 THEN 'negative_amount'
WHEN amount > 10000 THEN 'amount_out_of_range'
WHEN event_type NOT IN ('purchase','refund','login') THEN 'invalid_event_type'
ELSE 'other'
END AS quarantine_reason
FROM best_practice_dataops_quality.doc_raw_events
WHERE user_id IS NULL
OR amount IS NULL
OR (event_type = 'purchase' AND amount < 0)
OR amount > 10000
OR event_type NOT IN ('purchase','refund','login');
Quality Summary Dynamic Table
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_dataops_quality.doc_quality_summary
AS
SELECT
pipeline_run,
check_ts,
COUNT(*) AS total_rules,
COUNT(*) FILTER (WHERE passed = true) AS passed_rules,
COUNT(*) FILTER (WHERE passed = false) AS failed_rules,
COUNT(*) FILTER (WHERE passed = false AND severity = 'ERROR') AS error_count,
COUNT(*) FILTER (WHERE passed = false AND severity = 'WARNING') AS warning_count,
ROUND(COUNT(*) FILTER (WHERE passed = true) * 1.0 / COUNT(*), 4) AS pass_rate,
CASE
WHEN COUNT(*) FILTER (WHERE passed = false AND severity = 'ERROR') > 0 THEN 'BLOCKED'
WHEN COUNT(*) FILTER (WHERE passed = false AND severity = 'WARNING') > 0 THEN 'WARNING'
ELSE 'PASSED'
END AS gate_decision
FROM best_practice_dataops_quality.doc_quality_results
GROUP BY pipeline_run, check_ts;
gate_decision is the final gate verdict: any single failed ERROR rule outputs BLOCKED, and the downstream Gold layer's Studio Task will not trigger a refresh upon seeing BLOCKED.
Results interpretation: Of 33 raw events, 23 passed all quality gates and entered the Silver layer; 10 were quarantined. The main quarantine reasons are amount null values (4 rows, e.g. EVT005/EVT014/EVT023/EVT033) and user_id null values (3 rows), followed by oversized amounts (EVT007/EVT019) and purchase type negative amounts (EVT012). Negative refund type amounts (EVT003/EVT011/EVT021/EVT028) are valid refunds and passed the quality gate to enter the Silver layer.
Results interpretation: In this run, 5 out of 10 rules failed, including 3 ERROR rules (user_id null rate, amount null rate, negative amount) and 2 WARNING rules (oversized amount, logical duplicates). gate_decision = BLOCKED means the downstream Gold layer should not refresh after this run.
View Rule Details
SELECT rule_id, rule_name, total_rows, failed_rows, fail_rate, threshold, passed, severity
FROM best_practice_dataops_quality.doc_quality_results
ORDER BY passed ASC, severity DESC, fail_rate DESC;
Depends on: task_gate_eval (executes only on PASSED branch)
Configure scheduling: Cron expression 0/30 * * * ? (every 30 minutes), set in the task properties of task_refresh_gate.
💡 Tip: After attaching monitoring alert rules to a Studio Task, quality check failures, task timeouts, and node run errors can all send notifications through the alert rules configured on the same task — no need to separately configure additional monitoring.
⚠️ Note: The REFRESH DYNAMIC TABLE command itself synchronously waits for the refresh to complete. Studio Task execution logs will show the actual refresh elapsed time, making it easy to track performance regressions.
Track Quality Trends with information_schema
sys.information_schema.job_history records the execution status of each Dynamic Table refresh, and can be used to track the run history of quality check tasks:
SELECT
job_id,
status,
ROUND(execution_time, 2) AS exec_s,
rows_produced,
rows_inserted,
start_time,
SUBSTR(job_text, 1, 80) AS sql_preview
FROM sys.information_schema.job_history
WHERE pt_date = CAST(CURRENT_DATE() AS STRING)
AND LOWER(job_text) LIKE '%best_practice_dataops_quality%'
ORDER BY start_time DESC
LIMIT 10;
💡 Tip: Wrap the above query into a Dynamic Table doc_pipeline_run_trend, aggregating by day for success/failure counts and average elapsed time. This can be fed directly into a BI tool as the data source for a DataOps quality dashboard.
Dynamic Tables do not use REFRESH INTERVAL: All Dynamic Table refresh cycles are managed uniformly in Studio Tasks. This allows attaching monitoring alerts, dependency conditions, and other rules to the same task, avoiding splitting DDL and scheduling configuration across two places.
Quarantine data is not automatically repaired: Data in doc_events_quarantine requires manual review to decide whether to repair and write back to the Bronze layer or discard. Consider running periodic cleanup tasks on the Quarantine table to avoid historical junk data accumulating continuously.
Semantics of FILTER (WHERE ...):COUNT(*) FILTER (WHERE condition) counts only rows satisfying the condition, equivalent to SUM(CASE WHEN condition THEN 1 ELSE 0 END), but with cleaner syntax. Not all aggregate functions support FILTER — MEDIAN does not.
Dynamic Table first full refresh: The first REFRESH DYNAMIC TABLE performs a full scan of upstream; subsequent incremental refreshes only process rows that are new or changed since the last refresh point. If the Bronze layer uses INSERT OVERWRITE to write data, this causes the Dynamic Table to degrade to full refresh every time.
Gate thresholds need business adjustment: The example R001 threshold of 0.02 (2%) is reasonable for real-time data streams, but batch historical data migration scenarios may need temporary relaxation. It is recommended to maintain threshold versions in the doc_quality_rules table and restore strict thresholds after migration is complete.
gate_decision = BLOCKED does not affect already refreshed Silver data:BLOCKED only informs the downstream Gold layer not to refresh after this run. Data already in doc_events_passed will not be rolled back. If rollback is needed, use Time Travel (RESTORE TABLE ... TO TIMESTAMP) to restore to the snapshot before the last refresh.