Build a Real-Time Financial Risk Control Data Warehouse
Ingest bank card transaction streams into the Lakehouse in real time to build an ODS → DWD → DWS → ADS four-layer risk control data warehouse that outputs a real-time risk score for each transaction for use by interception systems. This document uses the Kaggle Credit Card Transactions Fraud Detection Dataset as its data foundation, demonstrating the complete pipeline of Kafka PIPE real-time ingestion → Dynamic Table sliding window aggregation → SQL UDF scoring → Column Masking.
Overview
The core challenge of financial risk control is: real-time transaction streams number in the thousands per second, and risk feature computation must be completed and an interception decision returned within milliseconds. Typical pain points and Singdata solutions are as follows:
Problem
Solution
Transaction streams ingested in real time; Kafka messages need to land in the warehouse within seconds
System automatically sorts and refreshes by reference relationships
REFRESH DYNAMIC TABLE
Manually trigger a single refresh
Manually trigger after initial setup to validate the pipeline
CREATE FUNCTION
Create SQL UDF calc_txn_risk_score
Encapsulates risk scoring formula
ALTER TABLE ... CHANGE COLUMN ... SET MASK
Bind Column Masking policy
Differentiated display for cc_num and other PII columns
GRANT / REVOKE
Configure RBAC role permissions
Three-role model (analyst / interception / audit)
Prerequisites
All examples in this document run under the best_practice_financial_risk schema.
CREATE SCHEMA IF NOT EXISTS best_practice_financial_risk;
Result:
{}
ODS Layer: Raw Transaction Table and Customer Master Data
Create Tables
The transaction main table records each card swipe event; the customer master table records cardholder profile information.
CREATE TABLE IF NOT EXISTS best_practice_financial_risk.ods_transactions (
txn_id STRING,
cc_num STRING, -- Bank card number, bound to Column Masking
merchant STRING,
category STRING,
amt DOUBLE,
first_name STRING,
last_name STRING,
gender STRING,
street STRING,
city STRING,
state STRING,
zip STRING,
lat DOUBLE, -- Cardholder location (latitude)
long_ DOUBLE, -- Cardholder location (longitude)
city_pop BIGINT,
job STRING,
dob STRING, -- Date of birth (string format)
trans_num STRING,
unix_time BIGINT,
merch_lat DOUBLE, -- Merchant location (latitude)
merch_long DOUBLE, -- Merchant location (longitude)
is_fraud INT, -- Fraud label: 0 normal / 1 fraud
trans_date_trans_time TIMESTAMP
);
CREATE TABLE IF NOT EXISTS best_practice_financial_risk.ods_customers (
cc_num STRING,
first_name STRING,
last_name STRING,
gender STRING,
street STRING,
city STRING,
state STRING,
zip STRING,
lat DOUBLE,
long_ DOUBLE,
city_pop BIGINT,
job STRING,
dob STRING
);
Configure Kafka PIPE
In production, transaction data is ingested into the ODS layer via Kafka in real time. The following is a PIPE configuration template — replace the actual broker address and topic name before creating.
-- First create a raw message receiving table (JSON string)
CREATE TABLE IF NOT EXISTS best_practice_financial_risk.kafka_txn_raw (value STRING);
-- Create Kafka PIPE
CREATE PIPE IF NOT EXISTS best_practice_financial_risk.pipe_txn_stream
VIRTUAL_CLUSTER = 'DEFAULT'
BATCH_INTERVAL_IN_SECONDS = '60'
AS
COPY INTO best_practice_financial_risk.kafka_txn_raw
FROM (
SELECT CAST(value AS STRING) AS value
FROM READ_KAFKA(
'<kafka-broker>:9092', -- Replace with the actual broker address
'bank_transactions', -- Kafka topic name
'',
'cz_fraud_consumer', -- consumer group ID
'','','','',
'raw', 'raw',
0,
map()
)
);
💡 Tip: READ_KAFKA positional parameters 5–8 (start/end offsets, timestamps) must be left empty in the PIPE DDL and are automatically managed by the PIPE runtime. After creation, the PIPE is running by default and consumes in batches every 60 seconds.
Load Test Data
This document uses a subset of the Kaggle fraud-detection dataset, inserting via INSERT to simulate the effect of Kafka messages already parsed and written.
Load data from a local CSV file (recommended):
-- Step 1: Upload the local CSV file to User Volume via SQL PUT
PUT '/path/to/your/data.csv' TO USER VOLUME FILE 'data.csv';
-- Step 2: COPY INTO the table from User Volume
COPY INTO best_practice_financial_risk.ods_customers
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('data.csv');
You can also insert a small batch of test data inline (no CSV file required):
INSERT INTO best_practice_financial_risk.ods_customers VALUES
('4532117694074009','John','Smith','M','123 Oak St','Austin','TX','78701',30.2672,-97.7431,950000,'Software Engineer','1985-06-15'),
('4716058826889367','Mary','Johnson','F','456 Elm Ave','Dallas','TX','75201',32.7767,-96.7970,1343000,'Accountant','1990-03-22'),
('4929429090508220','Robert','Williams','M','789 Pine Rd','Houston','TX','77001',29.7604,-95.3698,2300000,'Doctor','1978-11-08'),
('4532117691234567','Linda','Brown','F','321 Maple Dr','San Antonio','TX','78205',29.4241,-98.4936,1434000,'Teacher','1995-07-30'),
('4716058821111222','James','Davis','M','654 Birch Ln','Phoenix','AZ','85001',33.4484,-112.0740,1600000,'Manager','1982-09-14'),
('4929429095555666','Patricia','Miller','F','987 Cedar St','Chicago','IL','60601',41.8781,-87.6298,2700000,'Nurse','1988-04-25'),
('4532117697654321','Michael','Wilson','M','147 Spruce Ave','Los Angeles','CA','90001',34.0522,-118.2437,3980000,'Engineer','1975-12-03'),
('4716058828888999','Jennifer','Moore','F','258 Walnut Rd','New York','NY','10001',40.7128,-74.0060,8336000,'Lawyer','1992-08-17'),
('4929429093333444','David','Taylor','M','369 Hickory Dr','Seattle','WA','98101',47.6062,-122.3321,724000,'Data Scientist','1986-02-28'),
('4532117692222333','Barbara','Anderson','F','741 Ash Blvd','Miami','FL','33101',25.7617,-80.1918,460000,'Marketing','1993-11-11');
INSERT INTO best_practice_financial_risk.ods_transactions VALUES
('TXN001','4532117694074009','fraud_Kirlin and Sons','grocery_pos',9.36,'John','Smith','M','123 Oak St','Austin','TX','78701',30.2672,-97.7431,950000,'Software Engineer','1985-06-15','tx001',1325376018,30.4127,-97.8974,0,CAST('2020-01-01 00:00:18' AS TIMESTAMP)),
('TXN002','4716058826889367','fraud_Sporer-Keebler','entertainment',2529.0,'Mary','Johnson','F','456 Elm Ave','Dallas','TX','75201',32.7767,-96.7970,1343000,'Accountant','1990-03-22','tx002',1325376075,33.4897,-96.9132,1,CAST('2020-01-01 00:01:15' AS TIMESTAMP)),
('TXN007','4532117697654321','fraud_Olson, Becker and Koch','shopping_net',1987.40,'Michael','Wilson','M','147 Spruce Ave','Los Angeles','CA','90001',34.0522,-118.2437,3980000,'Engineer','1975-12-03','tx007',1325379440,34.1808,-118.4634,1,CAST('2020-01-01 00:57:20' AS TIMESTAMP)),
('TXN018','4716058828888999','fraud_Sauer-Kessler','entertainment',4500.00,'Jennifer','Moore','F','258 Walnut Rd','New York','NY','10001',40.7128,-74.0060,8336000,'Lawyer','1992-08-17','tx018',1325386200,40.9345,-74.1234,1,CAST('2020-01-01 02:50:00' AS TIMESTAMP))
-- ... complete 20 rows, abbreviated here
;
Verify ODS layer row counts:
SELECT COUNT(*) AS total_txns,
SUM(is_fraud) AS fraud_count,
ROUND(SUM(is_fraud)*100.0/COUNT(*), 1) AS fraud_rate_pct
FROM best_practice_financial_risk.ods_transactions;
Bank card numbers are highly sensitive PII data that needs to be masked for unauthorized users (showing only the last 4 digits).
-- Create the masking function
CREATE OR REPLACE FUNCTION best_practice_financial_risk.mask_cc_num(cc_num STRING)
RETURNS STRING
AS CASE
WHEN current_user() IN ('privileged_user') THEN cc_num -- Replace with the actual authorized username
ELSE CONCAT('****-****-****-', SUBSTRING(cc_num, LENGTH(cc_num) - 3, 4))
END;
-- Bind to the ods_transactions.cc_num column
ALTER TABLE best_practice_financial_risk.ods_transactions
CHANGE COLUMN cc_num
SET MASK best_practice_financial_risk.mask_cc_num;
💡 Tip: Replace 'privileged_user' with the actual username that needs to view plaintext data. Column Masking matches the current connection's username via the current_user() function; all authorized usernames must be explicitly listed in the IN() list.
⚠️ Note: Column masking applies transparently to all queries (including Dynamic Table downstream) — in the DWD layer JOIN, non-privileged users see the card number already in ****-****-****-4009 format.
Verify the binding effect (admin account sees the original values):
SELECT txn_id, cc_num, amt, is_fraud
FROM best_practice_financial_risk.ods_transactions
LIMIT 3;
The DWD layer uses a Dynamic Table to JOIN the ODS transaction stream with the customer master data, enriching cardholder profile information and computing in real time the geographic distance between the cardholder's location and the merchant's location (simplified Haversine formula).
Create Table
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_financial_risk.dwd_txn_events
REFRESH INTERVAL 1 MINUTE VCLUSTER DEFAULT
AS
SELECT
t.txn_id,
t.cc_num,
t.trans_num,
t.trans_date_trans_time AS txn_time,
t.unix_time,
t.merchant,
t.category,
t.amt,
t.is_fraud,
t.merch_lat,
t.merch_long,
-- Geographic distance: cardholder location vs. merchant location (km, simplified Haversine)
ROUND(
111.2 * SQRT(
POWER(t.lat - t.merch_lat, 2) +
POWER((t.long_ - t.merch_long) * COS(RADIANS(t.lat)), 2)
), 2
) AS dist_km,
c.first_name,
c.last_name,
c.gender,
c.city,
c.state,
c.job,
c.dob,
YEAR(t.trans_date_trans_time) -
CAST(SUBSTRING(c.dob, 1, 4) AS INT) AS age
FROM best_practice_financial_risk.ods_transactions t
LEFT JOIN best_practice_financial_risk.ods_customers c
ON t.cc_num = c.cc_num;
The dist_km column calculates the straight-line distance between the cardholder's registered address and the transaction merchant. A larger distance indicates the cardholder is spending far from their usual residence, which is a higher risk signal.
SELECT txn_id, cc_num, merchant, category, amt, is_fraud, dist_km, city, state, age
FROM best_practice_financial_risk.dwd_txn_events
ORDER BY txn_time
LIMIT 5;
All 20 records in the DWD layer are joined to customer information, and the dist_km and age fields are fully computed.
SELECT COUNT(*) AS dwd_count FROM best_practice_financial_risk.dwd_txn_events;
dwd_count
---------
20
DWS Layer: User Risk Feature Aggregation
The DWS layer aggregates historical transaction features by cardholder (cc_num), including spending mean, volatility, historical fraud count, and high-amount transaction frequency. These features are the core inputs for ADS layer risk scoring.
Create Table
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_financial_risk.dws_user_risk_features
REFRESH INTERVAL 1 MINUTE VCLUSTER DEFAULT
AS
SELECT
cc_num,
first_name,
last_name,
state,
job,
age,
-- Transaction statistics (full history)
COUNT(*) AS txn_total,
ROUND(SUM(amt), 2) AS amt_total,
ROUND(AVG(amt), 2) AS amt_avg,
ROUND(MAX(amt), 2) AS amt_max,
ROUND(STDDEV_POP(amt), 2) AS amt_stddev,
-- Historical fraud records
SUM(is_fraud) AS fraud_history_count,
-- Transaction count by category
COUNT(CASE WHEN category = 'shopping_net' THEN 1 END) AS cat_shopping_net,
COUNT(CASE WHEN category = 'entertainment' THEN 1 END) AS cat_entertainment,
COUNT(CASE WHEN category = 'grocery_pos' THEN 1 END) AS cat_grocery,
COUNT(CASE WHEN category = 'food_dining' THEN 1 END) AS cat_food_dining,
-- Most recent transaction time
MAX(txn_time) AS last_txn_time,
-- High-amount transaction count (single transaction > 1000)
COUNT(CASE WHEN amt > 1000 THEN 1 END) AS high_amt_txn_count
FROM best_practice_financial_risk.dwd_txn_events
GROUP BY cc_num, first_name, last_name, state, job, age;
Jennifer Moore's historical spending variance (amt_stddev = 2239.23) is extremely high, indicating highly volatile spending behavior — a high-risk signal. The DWS layer generates 10 user risk profiles in total.
Risk Scoring UDF
Encapsulate the scoring logic as a SQL UDF for direct invocation in the ADS layer; the formula is concise and auditable.
The score is composed of four factors (total capped at 100, minimum 0):
Factor
Calculation
Max Points
Amount deviation
(amt - hist_avg) / hist_stddev × 10, max 40 points
40
Geographic distance
>100 km from cardholder: +20 points, >50 km: +10 points
20
Historical fraud
+15 points per historical fraud record
No cap (truncated at 100)
High-amount frequency
+5 points per historical transaction >1000
No cap (truncated at 100)
CREATE FUNCTION best_practice_financial_risk.calc_txn_risk_score(
p_amt DOUBLE, -- Current transaction amount
p_hist_avg DOUBLE, -- User historical mean
p_hist_stddev DOUBLE, -- User historical standard deviation
p_dist_km DOUBLE, -- Distance between cardholder and merchant (km)
p_fraud_history DOUBLE, -- Historical fraud count
p_high_count DOUBLE -- High-amount transaction count
)
RETURNS DOUBLE
AS LEAST(100.0, GREATEST(0.0,
-- Amount deviation factor
CASE
WHEN p_hist_stddev > 0.0
THEN LEAST(40.0, ((p_amt - p_hist_avg) / p_hist_stddev) * 10.0)
ELSE 0.0
END
-- Geographic distance factor
+ CASE WHEN p_dist_km > 100.0 THEN 20.0 WHEN p_dist_km > 50.0 THEN 10.0 ELSE 0.0 END
-- Historical fraud factor
+ p_fraud_history * 15.0
-- High-amount frequency factor
+ p_high_count * 5.0
));
⚠️ Note: All UDF parameter types use DOUBLE to avoid overload conflicts with BIGINT parameters. When calling, explicitly convert fraud_history_count (BIGINT column): CAST(fraud_history_count AS DOUBLE).
The fraudulent transaction with a large amount deviation and 80 km distance scores 40 (MEDIUM); the small normal spending scores 0 (LOW).
ADS Layer: Real-Time Risk Score Output
The ADS layer is the final output of the risk control data warehouse. It joins DWD and DWS, calls the UDF to compute real-time risk scores, and applies risk level labels for direct querying by the interception system.
Create Table
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_financial_risk.ads_txn_risk_score
REFRESH INTERVAL 1 MINUTE VCLUSTER DEFAULT
AS
SELECT
t.txn_id,
t.cc_num,
t.txn_time,
t.merchant,
t.category,
t.amt,
t.dist_km,
t.city,
t.state,
t.is_fraud,
u.amt_avg,
u.amt_stddev,
u.fraud_history_count,
u.high_amt_txn_count,
-- Real-time risk score
ROUND(best_practice_financial_risk.calc_txn_risk_score(
t.amt,
u.amt_avg,
u.amt_stddev,
t.dist_km,
CAST(u.fraud_history_count AS DOUBLE),
CAST(u.high_amt_txn_count AS DOUBLE)
), 2) AS risk_score,
-- Risk level
CASE
WHEN best_practice_financial_risk.calc_txn_risk_score(
t.amt, u.amt_avg, u.amt_stddev, t.dist_km,
CAST(u.fraud_history_count AS DOUBLE),
CAST(u.high_amt_txn_count AS DOUBLE)
) >= 60 THEN 'HIGH'
WHEN best_practice_financial_risk.calc_txn_risk_score(
t.amt, u.amt_avg, u.amt_stddev, t.dist_km,
CAST(u.fraud_history_count AS DOUBLE),
CAST(u.high_amt_txn_count AS DOUBLE)
) >= 30 THEN 'MEDIUM'
ELSE 'LOW'
END AS risk_level
FROM best_practice_financial_risk.dwd_txn_events t
LEFT JOIN best_practice_financial_risk.dws_user_risk_features u
ON t.cc_num = u.cc_num;
SELECT
risk_level,
COUNT(*) AS txn_count,
SUM(is_fraud) AS fraud_in_bucket,
ROUND(SUM(is_fraud)*100.0/COUNT(*), 1) AS fraud_rate_pct,
ROUND(AVG(risk_score), 1) AS avg_score
FROM best_practice_financial_risk.ads_txn_risk_score
GROUP BY risk_level
ORDER BY avg_score DESC;
The fraud rate in the MEDIUM risk range reaches 100%, demonstrating that the scoring model effectively identifies high-risk transactions. The LOW range still has 18.8% fraud because some fraudulent transactions have relatively small amounts that don't trigger the amount deviation factor. Future improvements can incorporate sequential behavioral features to further optimize the scoring model.
Fraud Rate Analysis by Category
SELECT
category,
COUNT(*) AS txn_count,
SUM(is_fraud) AS fraud_count,
ROUND(SUM(is_fraud)*100.0/COUNT(*), 1) AS fraud_rate_pct,
ROUND(AVG(amt), 2) AS avg_amt,
ROUND(MAX(amt), 2) AS max_amt
FROM best_practice_financial_risk.ads_txn_risk_score
GROUP BY category
ORDER BY fraud_rate_pct DESC;
entertainment and shopping_net are the categories with the highest fraud incidence, and fraudulent transaction amounts are far higher than the average for normal spending. This provides a direct basis for risk control rules: additional transaction limits or secondary verification triggers can be set for these two categories.
RBAC: Three-Role Permission Model
The risk control scenario involves three types of users requiring differentiated authorization:
Retrieves risk scores and levels only; cannot view raw transaction details
audit_admin
All layers, including PII raw values
Compliance auditing; can view full cc_num and names
-- Create roles
CREATE ROLE IF NOT EXISTS risk_analyst;
CREATE ROLE IF NOT EXISTS risk_interception;
CREATE ROLE IF NOT EXISTS audit_admin;
-- risk_analyst: can query DWD and DWS (use DYNAMIC TABLE keyword for dynamic tables)
GRANT SELECT ON DYNAMIC TABLE best_practice_financial_risk.dwd_txn_events
TO ROLE risk_analyst;
GRANT SELECT ON DYNAMIC TABLE best_practice_financial_risk.dws_user_risk_features
TO ROLE risk_analyst;
-- risk_interception: only queries ADS output (dynamic table)
GRANT SELECT ON DYNAMIC TABLE best_practice_financial_risk.ads_txn_risk_score
TO ROLE risk_interception;
-- audit_admin: access to all layers
GRANT SELECT ON TABLE best_practice_financial_risk.ods_transactions
TO ROLE audit_admin;
GRANT SELECT ON TABLE best_practice_financial_risk.ods_customers
TO ROLE audit_admin;
GRANT SELECT ON DYNAMIC TABLE best_practice_financial_risk.dwd_txn_events
TO ROLE audit_admin;
GRANT SELECT ON DYNAMIC TABLE best_practice_financial_risk.dws_user_risk_features
TO ROLE audit_admin;
GRANT SELECT ON DYNAMIC TABLE best_practice_financial_risk.ads_txn_risk_score
TO ROLE audit_admin;
💡 Tip: When the risk_analyst role queries dwd_txn_events, Column Masking automatically applies — cc_num is displayed as ****-****-****-4009 without additional configuration; masking and authorization are decoupled.
Notes
Dynamic Table parameter format: The correct format is REFRESH INTERVAL N MINUTE VCLUSTER DEFAULT, not REFRESH_MODE = INCREMENTAL or REFRESH_INTERVAL = '1 minute'.
Column Masking: Masking applies transparently to SELECT operations on downstream Dynamic Tables as well.
UDF parameter types: All parameters for calc_txn_risk_score are defined as DOUBLE; when called from Dynamic Tables, BIGINT aggregate columns (fraud_history_count, high_amt_txn_count) must be explicitly converted with CAST(col AS DOUBLE).
BloomFilter Index use cases: cc_num is a high-cardinality column suitable for CREATE BLOOMFILTER INDEX; when creating, the index must be in the same schema context as the target table (USE SCHEMA best_practice_financial_risk), otherwise an "index and table must in the same schema" error is reported.
Kafka PIPE creation timing: When the PIPE DDL is executed, it attempts to connect to the Kafka broker to verify the subscription. Create it only after the Kafka cluster and topic are ready.
Limitations of the risk scoring formula: The simple rule-based scoring used in this document (amount deviation + geographic distance + historical fraud) is suitable for demonstration. In production, it is recommended to call a machine learning model API via an External Function and write the scoring results back to the ADS layer.
⚠️ Requires manual verification: Column Masking currently matches authorization by username via current_user(). All usernames authorized to view plaintext must be added one by one to the whitelist in the masking function. If your Lakehouse version supports role-based dynamic checking (e.g., HAS_ROLE('role_name')), roles can replace the username list for more flexible maintenance. Contact Singdata technical support to confirm whether the current version supports this function.