Graph-Driven Financial Fraud Gang Detection Data Warehouse

Using accounts, devices, and IPs as nodes, and transaction relationships and login bindings as edges, this guide builds an entity relationship graph to identify fraud gangs and dark-market ring networks. Using a dataset of 20 account nodes, 10 device nodes, 10 IP nodes, and 25 transaction edges, this document provides an end-to-end demonstration of building a complete ODS → DWD → DWS → ADS four-layer data warehouse, covering core capabilities including MERGE INTO incremental edge table updates, Dynamic Table aggregation, SQL UDF gang risk scoring, and BloomFilter Index efficient point lookups.


Overview

The typical data pipeline for financial fraud gang detection is: account registration/transaction data real-time ingestion → raw node/edge storage (ODS) → shared-device/transaction relationship edge building (DWD) → gang statistics and risk scoring (DWS) → high-risk account blacklist output (ADS).

Singdata Lakehouse addresses several core challenges through the following combination:

ProblemSolution
Accounts sharing the same device is the strongest gang association signalMERGE INTO incrementally maintains account-device association edge tables with no duplicates or omissions
Transaction graph has a huge number of nodes; cross-node aggregation is slowDynamic Table automatically maintains DWD/DWS aggregation results incrementally
Risk scoring logic needs to be reused across multiple downstream systemsSQL UDF encapsulates multi-factor weighted scoring formula
device_id has high cardinality; lookups of associated accounts by device are frequentBloomFilter Index for precise filtering, reducing full table scan overhead
IP city/carrier fields require keyword searchInverted Index accelerates exact matching on the city column
Graph algorithms (community detection, PageRank) exceed SQL capabilitiesZettaPark Python Task + NetworkX runs graph algorithms

SQL Commands Used

Command / FunctionPurposeNotes
CREATE TABLECreate ODS layer node and edge tablesRegular tables, upstream for Dynamic Tables
CREATE BLOOMFILTER INDEXCreate filter indexes on src_account_id and device_id columnsSuitable for point-lookup filtering on high-cardinality columns
CREATE INVERTED INDEXCreate keyword index on IP node city columnExact match on city dimension filter
MERGE INTOIncrementally update account-device association edge tableUpsert by primary key, avoiding duplicate edges
CREATE FUNCTIONCreate gang risk scoring UDF calc_gang_risk_scoreEncapsulates multi-factor weighted scoring formula
CREATE DYNAMIC TABLECreate DWD / DWS / ADS layer incremental computation tablesSystem automatically detects upstream changes and incrementally refreshes
REFRESH DYNAMIC TABLEManually trigger a single refreshUsed during initial build or debugging

Prerequisites

All examples in this document run under the best_practice_fraud_graph schema.

CREATE SCHEMA IF NOT EXISTS best_practice_fraud_graph;


ODS Layer: Raw Node and Edge Tables

Create Tables

-- Account node: records registration information and risk labels for each account CREATE TABLE IF NOT EXISTS best_practice_fraud_graph.doc_account_node ( account_id STRING, register_time TIMESTAMP, register_ip STRING, phone_tail STRING, id_cert_hash STRING, account_age_days INT, is_verified INT, risk_label INT -- 0: normal 1: known fraud ); -- Device node: records basic device attributes CREATE TABLE IF NOT EXISTS best_practice_fraud_graph.doc_device_node ( device_id STRING, device_type STRING, os_type STRING, first_seen TIMESTAMP, account_count INT ); -- IP node: records basic IP attributes and risk scores CREATE TABLE IF NOT EXISTS best_practice_fraud_graph.doc_ip_node ( ip_addr STRING, isp STRING, city STRING, risk_score DOUBLE, account_count INT ); -- Transaction edge: fund transfer relationships between accounts CREATE TABLE IF NOT EXISTS best_practice_fraud_graph.doc_transaction_edge ( txn_id STRING, src_account_id STRING, dst_account_id STRING, amount DOUBLE, txn_time TIMESTAMP, channel STRING, status STRING, is_suspicious INT ); -- Account-device association edge: binding relationship of accounts logging in via devices CREATE TABLE IF NOT EXISTS best_practice_fraud_graph.doc_account_device_edge ( account_id STRING, device_id STRING, first_seen TIMESTAMP, last_seen TIMESTAMP, login_count INT );

Create BloomFilter Index and Inverted Index

doc_transaction_edge.src_account_id and doc_account_device_edge.device_id are both high-cardinality columns with frequent point lookups, making them suitable for BloomFilter Index.

-- Transaction edge: exact filtering by originating account CREATE BLOOMFILTER INDEX IF NOT EXISTS best_practice_fraud_graph.idx_bf_txn_src ON TABLE best_practice_fraud_graph.doc_transaction_edge (src_account_id); -- Account-device edge: exact filtering by device ID CREATE BLOOMFILTER INDEX IF NOT EXISTS best_practice_fraud_graph.idx_bf_device_id ON TABLE best_practice_fraud_graph.doc_account_device_edge (device_id); -- IP node: exact keyword matching by city CREATE INVERTED INDEX IF NOT EXISTS best_practice_fraud_graph.idx_inv_city ON TABLE best_practice_fraud_graph.doc_ip_node (city) PROPERTIES ('analyzer'='keyword');

Load Sample Data

This document uses INSERT to directly construct entity relationship data, simulating account registration, login binding, and fund transfers:

Load data from a local CSV file (recommended):

-- Step 1: Upload the local CSV file to User Volume via SQL PUT PUT '/path/to/your/doc_account_node.csv' TO USER VOLUME FILE 'doc_account_node.csv';

-- Step 2: COPY INTO the table from User Volume COPY INTO best_practice_fraud_graph.doc_account_node FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('doc_account_node.csv');

You can also insert a small batch of test data inline (no CSV file required):

-- Insert 20 account nodes (A001–A020) INSERT INTO best_practice_fraud_graph.doc_account_node VALUES ('A001', CAST('2025-01-10 09:00:00' AS TIMESTAMP), '192.168.10.1', '8801', 'hash_id_001', 147, 1, 0), ('A002', CAST('2025-01-10 09:05:00' AS TIMESTAMP), '192.168.10.1', '8802', 'hash_id_002', 147, 1, 1), ('A003', CAST('2025-01-10 09:10:00' AS TIMESTAMP), '192.168.10.1', '8803', 'hash_id_003', 147, 0, 1), -- ... A004–A020 (complete 20 rows) ;

Verify ODS layer row counts:

SELECT COUNT(*) AS account_count FROM best_practice_fraud_graph.doc_account_node;

account_count ------------- 20

SELECT COUNT(*) AS txn_count FROM best_practice_fraud_graph.doc_transaction_edge; SELECT COUNT(*) AS edge_count FROM best_practice_fraud_graph.doc_account_device_edge;

txn_count --------- 25 edge_count ---------- 20

Data structure notes: In the sample data, accounts A001/A002/A003 all used the same IP 192.168.10.1 during registration and logged in via the same device D001. There are multiple rapid transfer transactions among A001, A002, and A003 with is_suspicious=1. This forms a typical fraud gang pattern.

Incrementally Update Edge Tables via MERGE INTO

In production, new login events are continuously generated, and the binding relationships between accounts and devices need incremental updates rather than full replacements. MERGE INTO can update last_seen and login_count when an existing (account_id, device_id) combination is found, and insert a new row for first-time occurrences:

MERGE INTO best_practice_fraud_graph.doc_account_device_edge AS t USING ( SELECT 'A001' AS account_id, 'D001' AS device_id, CAST('2025-06-01 10:00:00' AS TIMESTAMP) AS last_seen, 1 AS new_logins ) AS s ON t.account_id = s.account_id AND t.device_id = s.device_id WHEN MATCHED THEN UPDATE SET last_seen = s.last_seen, login_count = t.login_count + s.new_logins WHEN NOT MATCHED THEN INSERT (account_id, device_id, first_seen, last_seen, login_count) VALUES (s.account_id, s.device_id, s.last_seen, s.last_seen, s.new_logins);


Gang Risk Scoring UDF

Encapsulate the multi-factor gang risk scoring logic as a SQL UDF for reuse in both the DWS and ADS layers.

Scoring formula: suspicious transaction rate × 40 + log of shared-device account pairs × 30 (≥2 pairs = full score, =1 pair = 15 points) + registration IP risk × 20 + unverified identity +10, capped at 100, minimum 0.

CREATE OR REPLACE FUNCTION best_practice_fraud_graph.calc_gang_risk_score( suspicious_rate DOUBLE, shared_device_pairs INT, ip_risk_score DOUBLE, is_verified INT ) RETURNS DOUBLE AS GREATEST(0.0, LEAST(100.0, suspicious_rate * 40.0 + CASE WHEN shared_device_pairs >= 2 THEN 30.0 WHEN shared_device_pairs = 1 THEN 15.0 ELSE 0.0 END + ip_risk_score * 20.0 + CASE WHEN is_verified = 0 THEN 10.0 ELSE 0.0 END ));

Validate the function — high-risk account (100% suspicious rate, 2 shared device pairs, high-risk IP, unverified):

SELECT best_practice_fraud_graph.calc_gang_risk_score(1.0, 2, 0.85, 0) AS sample_score;

sample_score ------------ 97

This account scores 97, falling in the HIGH risk range, and can be directly blocked by the system.


DWD Layer Dynamic Table: Relationship Graph Edge Tables

The DWD layer does two things: first, it performs a SELF JOIN on the account-device association edges to find "shared-device account pairs"; second, it joins the transaction edges with account nodes to enrich both parties' IP risk information.

Shared-Device Account Pairs

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_fraud_graph.dwd_shared_device_pairs AS SELECT a1.account_id AS account_id_1, a2.account_id AS account_id_2, a1.device_id AS shared_device_id, a1.login_count AS login_count_1, a2.login_count AS login_count_2, LEAST(a1.last_seen, a2.last_seen) AS last_shared_time FROM best_practice_fraud_graph.doc_account_device_edge a1 JOIN best_practice_fraud_graph.doc_account_device_edge a2 ON a1.device_id = a2.device_id AND a1.account_id < a2.account_id;

Transaction Graph Edges (with Risk Labels)

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_fraud_graph.dwd_txn_graph_edge AS SELECT t.txn_id, t.src_account_id, t.dst_account_id, t.amount, t.txn_time, t.channel, t.status, t.is_suspicious, a_src.register_ip AS src_register_ip, a_dst.register_ip AS dst_register_ip, a_src.risk_label AS src_risk_label, a_dst.risk_label AS dst_risk_label FROM best_practice_fraud_graph.doc_transaction_edge t JOIN best_practice_fraud_graph.doc_account_node a_src ON t.src_account_id = a_src.account_id JOIN best_practice_fraud_graph.doc_account_node a_dst ON t.dst_account_id = a_dst.account_id;

Manually trigger the first refresh:

REFRESH DYNAMIC TABLE best_practice_fraud_graph.dwd_shared_device_pairs; REFRESH DYNAMIC TABLE best_practice_fraud_graph.dwd_txn_graph_edge; SELECT COUNT(*) AS pair_count FROM best_practice_fraud_graph.dwd_shared_device_pairs; SELECT COUNT(*) AS edge_count FROM best_practice_fraud_graph.dwd_txn_graph_edge;

pair_count ---------- 11 edge_count ---------- 25

Query account pairs under device D001 — the most typical gang characteristic:

SELECT account_id_1, account_id_2, shared_device_id, login_count_1, login_count_2 FROM best_practice_fraud_graph.dwd_shared_device_pairs WHERE shared_device_id = 'D001';

account_id_1 | account_id_2 | shared_device_id | login_count_1 | login_count_2 -------------+--------------+------------------+---------------+-------------- A001 | A003 | D001 | 35 | 22 A002 | A003 | D001 | 28 | 22 A001 | A002 | D001 | 35 | 28

Result interpretation: Device D001 is shared by three accounts A001/A002/A003, forming three pairwise association pairs (C(3,2)=3). These three accounts also share the same registration IP 192.168.10.1 in the ODS layer, and their registration times are concentrated on the same day — a typical batch-registration fraud gang pattern.

Creating Scheduled Tasks in Lakehouse Studio

Dynamic Table periodic refresh is managed through Lakehouse Studio Tasks, not by writing REFRESH INTERVAL in the DDL. In Studio under Development → Tasks, create two "Refresh Dynamic Table" tasks under the path best_practices/fraud_graph/:

  • Task name: refresh_dwd_fraud_graph
  • Operations: REFRESH DYNAMIC TABLE best_practice_fraud_graph.dwd_shared_device_pairs, REFRESH DYNAMIC TABLE best_practice_fraud_graph.dwd_txn_graph_edge
  • Schedule: trigger every 10 minutes
  • On the same task, you can attach data quality checks (e.g., trigger an alert when pair_count < 1) and monitoring rules

DWS Layer Dynamic Table: Aggregation Statistics and Gang Features

The DWS layer aggregates the two edge tables from the DWD layer to produce: per-device account cluster statistics (whether multiple account pairs share a device) and per-account suspicious transaction statistics (suspicious rate, amount, destination diversity).

Device Cluster Statistics

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_fraud_graph.dws_device_cluster_stats AS SELECT shared_device_id AS device_id, COUNT(DISTINCT account_id_1) + COUNT(DISTINCT account_id_2) AS approx_account_count, SUM(login_count_1 + login_count_2) AS total_login_count, MIN(last_shared_time) AS earliest_shared, MAX(last_shared_time) AS latest_shared, COUNT(*) AS pair_count FROM best_practice_fraud_graph.dwd_shared_device_pairs GROUP BY shared_device_id;

Account Transaction Risk Statistics

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_fraud_graph.dws_account_txn_risk AS SELECT src_account_id AS account_id, COUNT(*) AS total_txn_count, SUM(CASE WHEN is_suspicious = 1 THEN 1 ELSE 0 END) AS suspicious_count, ROUND(SUM(amount), 2) AS total_amount, ROUND(AVG(amount), 2) AS avg_amount, COUNT(DISTINCT dst_account_id) AS unique_dst_count, COUNT(DISTINCT channel) AS channel_diversity, ROUND( SUM(CASE WHEN is_suspicious = 1 THEN 1 ELSE 0 END) * 1.0 / COUNT(*), 4 ) AS suspicious_rate FROM best_practice_fraud_graph.dwd_txn_graph_edge GROUP BY src_account_id;

Manually trigger refresh:

REFRESH DYNAMIC TABLE best_practice_fraud_graph.dws_device_cluster_stats; REFRESH DYNAMIC TABLE best_practice_fraud_graph.dws_account_txn_risk;

View device cluster statistics:

SELECT device_id, approx_account_count, total_login_count, pair_count FROM best_practice_fraud_graph.dws_device_cluster_stats ORDER BY pair_count DESC;

device_id | approx_account_count | total_login_count | pair_count ----------+----------------------+-------------------+----------- D001 | 4 | 170 | 3 D005 | 2 | 38 | 1 D003 | 2 | 55 | 1 D002 | 2 | 33 | 1 D006 | 2 | 41 | 1 ...

Result interpretation: Device D001 has pair_count=3, far higher than other devices (all 1), and total_login_count=170 is also the highest — it is the core gang device. Other devices each have 2 accounts sharing them, forming several two-person fraud pairs.

View account transaction risk statistics (TOP 5):

SELECT account_id, total_txn_count, suspicious_count, total_amount, suspicious_rate FROM best_practice_fraud_graph.dws_account_txn_risk ORDER BY suspicious_rate DESC, total_amount DESC LIMIT 5;

account_id | total_txn_count | suspicious_count | total_amount | suspicious_rate -----------+-----------------+------------------+--------------+---------------- A001 | 3 | 3 | 1230 | 1.0000 A009 | 2 | 2 | 850 | 1.0000 A015 | 1 | 1 | 800 | 1.0000 A016 | 1 | 1 | 790 | 1.0000 A006 | 1 | 1 | 700 | 1.0000

Result interpretation: A001 has a 100% suspicious transaction rate and initiated 3 transactions totaling 1,230 yuan, making it the core transfer node in this gang. The destination accounts for multiple transactions are A002, A003, and A004 — other members of the same D001 device cluster.


ADS Layer Dynamic Table: High-Risk Account Blacklist

The ADS layer merges the DWS layer aggregation features with ODS layer account registration information, calls the calc_gang_risk_score UDF for scoring, and outputs the high-risk account blacklist.

CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_fraud_graph.ads_high_risk_account_blacklist AS SELECT r.account_id, r.total_txn_count, r.suspicious_count, r.suspicious_rate, r.total_amount, COALESCE(dc.pair_count, 0) AS shared_device_pair_count, COALESCE(ip.risk_score, 0.0) AS register_ip_risk, an.is_verified, an.risk_label AS original_risk_label, ROUND( best_practice_fraud_graph.calc_gang_risk_score( r.suspicious_rate, CAST(COALESCE(dc.pair_count, 0) AS INT), COALESCE(ip.risk_score, 0.0), an.is_verified ), 2 ) AS gang_risk_score, CASE WHEN best_practice_fraud_graph.calc_gang_risk_score( r.suspicious_rate, CAST(COALESCE(dc.pair_count, 0) AS INT), COALESCE(ip.risk_score, 0.0), an.is_verified ) >= 80 THEN 'HIGH' WHEN best_practice_fraud_graph.calc_gang_risk_score( r.suspicious_rate, CAST(COALESCE(dc.pair_count, 0) AS INT), COALESCE(ip.risk_score, 0.0), an.is_verified ) >= 50 THEN 'MEDIUM' ELSE 'LOW' END AS risk_level, CURRENT_TIMESTAMP() AS score_time FROM best_practice_fraud_graph.dws_account_txn_risk r JOIN best_practice_fraud_graph.doc_account_node an ON r.account_id = an.account_id LEFT JOIN best_practice_fraud_graph.doc_account_device_edge ade ON r.account_id = ade.account_id LEFT JOIN ( SELECT account_id_1 AS account_id, COUNT(*) AS pair_count FROM best_practice_fraud_graph.dwd_shared_device_pairs GROUP BY account_id_1 UNION ALL SELECT account_id_2 AS account_id, COUNT(*) AS pair_count FROM best_practice_fraud_graph.dwd_shared_device_pairs GROUP BY account_id_2 ) dc ON r.account_id = dc.account_id LEFT JOIN best_practice_fraud_graph.doc_ip_node ip ON an.register_ip = ip.ip_addr;

Manually trigger refresh:

REFRESH DYNAMIC TABLE best_practice_fraud_graph.ads_high_risk_account_blacklist;

View HIGH risk accounts:

SELECT account_id, gang_risk_score, risk_level, suspicious_rate, shared_device_pair_count FROM best_practice_fraud_graph.ads_high_risk_account_blacklist WHERE risk_level = 'HIGH' GROUP BY account_id, gang_risk_score, risk_level, suspicious_rate, shared_device_pair_count ORDER BY gang_risk_score DESC;

account_id | gang_risk_score | risk_level | suspicious_rate | shared_device_pair_count -----------+-----------------+------------+-----------------+------------------------- A003 | 97 | HIGH | 1.0000 | 2 A001 | 87 | HIGH | 1.0000 | 2 A012 | 80.6 | HIGH | 1.0000 | 1

Result interpretation:

  • A003 (97 points): 100% suspicious rate, appeared in shared-device account pairs twice (paired with both A001 and A002), registration IP risk 0.85, unverified identity — all four factors are in the high-risk zone, yielding the highest score.
  • A001 (87 points): Also a member of the D001 device gang, verified identity (minus 10 points), slightly lower score but still HIGH.
  • A012 (80.6 points): Shared device D006 with A011, 100% suspicious rate, unverified — a cross-device fraud pair member.

View risk level distribution:

SELECT risk_level, COUNT(DISTINCT account_id) AS account_count FROM best_practice_fraud_graph.ads_high_risk_account_blacklist GROUP BY risk_level ORDER BY account_count DESC;

risk_level | account_count -----------+-------------- MEDIUM | 14 HIGH | 3 LOW | 3

In the current dataset, 3 accounts are marked as HIGH (recommend immediate suspension), 14 as MEDIUM (strengthen verification or limit transactions), and 3 as LOW (normal operations).


ZettaPark Python Task: Graph Algorithm Extension

For graph algorithms that cannot be directly expressed in SQL (connected components, PageRank, community detection), create a ZettaPark Python Task in Lakehouse Studio, run the algorithms with NetworkX, and write the results back to the Lakehouse.

Reference code structure (runs in a Studio Python Task):

import networkx as nx from clickzetta_zettapark.session import Session session = Session.builder.config("profile", "skill_test").create() # Read shared-device account pairs from the DWD layer pairs = session.sql(""" SELECT account_id_1, account_id_2 FROM best_practice_fraud_graph.dwd_shared_device_pairs """).to_pandas() # Build undirected graph where each node is an account and each edge is a shared-device relationship G = nx.from_pandas_edgelist(pairs, 'account_id_1', 'account_id_2') # Find connected components (i.e., gang groupings) components = list(nx.connected_components(G)) gang_assignments = [] for gang_id, members in enumerate(components): for account in members: gang_assignments.append({ 'account_id': account, 'gang_id': f'GANG_{gang_id:04d}', 'gang_size': len(members) }) # Write results back to the Lakehouse import pandas as pd df = pd.DataFrame(gang_assignments) session.write_pandas(df, 'ads_gang_component_map', schema='best_practice_fraud_graph', overwrite=True)


Data Warehouse Object Overview

SHOW TABLES IN best_practice_fraud_graph;

schema_name | table_name | is_dynamic -----------------------------+----------------------------------+----------- best_practice_fraud_graph | doc_account_node | false best_practice_fraud_graph | doc_device_node | false best_practice_fraud_graph | doc_ip_node | false best_practice_fraud_graph | doc_transaction_edge | false best_practice_fraud_graph | doc_account_device_edge | false best_practice_fraud_graph | dwd_shared_device_pairs | true best_practice_fraud_graph | dwd_txn_graph_edge | true best_practice_fraud_graph | dws_device_cluster_stats | true best_practice_fraud_graph | dws_account_txn_risk | true best_practice_fraud_graph | ads_high_risk_account_blacklist | true

Data flow summary:

doc_account_device_edge (ODS) ↓ SELF JOIN (shared-device account pairs) dwd_shared_device_pairs (DWD, Dynamic Table) ↓ GROUP BY device_id dws_device_cluster_stats (DWS, Dynamic Table) ↓ ↘ ads_high_risk_account_blacklist (ADS, Dynamic Table) ↗ ← calc_gang_risk_score() SQL UDF dws_account_txn_risk (DWS, Dynamic Table) ↑ GROUP BY src_account_id dwd_txn_graph_edge (DWD, Dynamic Table) ↑ JOIN account node risk labels doc_transaction_edge + doc_account_node (ODS)


Notes

  • BloomFilter Index does not automatically apply to existing data: CREATE BLOOMFILTER INDEX only accelerates data written after the index is created. BloomFilter type does not support BUILD INDEX; to apply to existing data, the table must be rebuilt. Inverted Index supports BUILD INDEX and can be rebuilt for existing data after creation.

  • Semantics of Dynamic Table incremental refresh: The first REFRESH performs a full snapshot computation; subsequent incremental refreshes only process rows added or changed in the ODS layer since the last refresh point. If the ODS layer uses INSERT OVERWRITE to write data, the Dynamic Table degrades to a full refresh — use APPEND mode writing or continuous append via Kafka PIPE instead.

  • Do not write REFRESH INTERVAL in DDL: Dynamic Table periodic refresh is managed via Lakehouse Studio Tasks. Writing REFRESH INTERVAL in DDL prevents appending monitoring alerts and quality check rules to the same task. Use the refresh tasks scheduled under best_practices/fraud_graph/ in Studio instead.

  • ADS layer CURRENT_TIMESTAMP() score timestamp: The score_time field uses CURRENT_TIMESTAMP() and is updated to the current time on each refresh, not a historical scoring snapshot. For historical auditing, include the refresh task execution timestamp when writing back.

  • UDF called multiple times in the ADS layer: calc_gang_risk_score appears once in SELECT and once in CASE. To optimize, use a subquery to calculate the score once and perform the CASE judgment in the outer query.

  • Timing dependency for ZettaPark NetworkX Task: The graph algorithm task should run after the ADS Dynamic Table refresh completes. Configure task dependencies in Lakehouse Studio to avoid reading dirty data.