Centralizing firewall logs, identity authentication logs (IAM), and application access logs into Singdata Lakehouse to build a low-cost, high-coverage threat detection data warehouse that replaces or augments traditional SIEM analytics capabilities. Based on a Kaggle cybersecurity threat detection log dataset (containing firewall, IDS, and application logs — 30 representative samples), this guide provides an end-to-end demonstration of the complete OSS PIPE → Bronze → Silver → Gold build process, covering five key platform capabilities: BloomFilter Index, Inverted Index, Dynamic Table, SQL UDF, and Time Travel.
Overview
The core challenge of SOC log analysis is: rapidly ingesting massive heterogeneous logs (firewall/IDS/application), identifying genuinely threatening behavior, while supporting post-incident forensic analysis.
Problem
Singdata Solution
Log agents write to OSS buckets and need automatic ingestion
OSS PIPE (EVENT_NOTIFICATION mode) — new file triggers ingestion immediately upon landing
High-cardinality source_ip / dest_ip columns with frequent point queries
BloomFilter Index — filters non-matching data blocks and reduces scan volume
Attack tool fingerprint and request path keyword search
Inverted Index (english / keyword tokenizer) — supports full-text matching
Auto-refresh from raw logs → normalization → threat aggregation
Dynamic Table with declarative SQL and incremental computation
Real-time IP threat intelligence API for tagging malicious IPs
External Function — embed external threat intelligence service calls in SQL
Post-incident need to trace back the complete data state at attack time
Time Travel — query historical snapshots by timestamp or version
SQL Commands Used
Command / Function
Purpose
Notes
CREATE TABLE
Create Bronze layer raw log table
Regular table serving as upstream source for Dynamic Tables
CREATE BLOOMFILTER INDEX
Create BloomFilter Index on source_ip / dest_ip
Suitable for point-query filtering on high-cardinality IP columns
CREATE INVERTED INDEX
Create Inverted Index on user_agent / request_path
Supports full-text search for attack tool fingerprints
CREATE PIPE
Create OSS continuous ingestion pipeline
EVENT_NOTIFICATION mode — new file automatically triggers ingestion
ingest_time uses DEFAULT CURRENT_TIMESTAMP() and is automatically populated when OSS PIPE writes; it does not need to be carried in the log body.
Create BloomFilter Index
source_ip and dest_ip are high-cardinality string columns. Security analysts frequently query specific IPs as point queries, making them suitable for BloomFilter Index.
CREATE BLOOMFILTER INDEX IF NOT EXISTS idx_bf_source_ip
ON TABLE doc_raw_logs (source_ip);
CREATE BLOOMFILTER INDEX IF NOT EXISTS idx_bf_dest_ip
ON TABLE doc_raw_logs (dest_ip);
⚠️ Note: CREATE BLOOMFILTER INDEX requires the same schema context as the target table. Before executing, run USE SCHEMA best_practice_soc_log or use the -s best_practice_soc_log parameter, otherwise you will get an "index and table must in the same schema" error.
Create Inverted Index
Attack tools typically leave fingerprint strings in user_agent (e.g., SQLMap, Nmap, Metasploit), and request_path contains injection payloads (e.g., /admin?id=1 OR 1=1). Create Inverted Indexes on both columns to support full-text search:
CREATE INVERTED INDEX IF NOT EXISTS idx_inv_user_agent
ON TABLE doc_raw_logs (user_agent)
PROPERTIES('analyzer'='english');
CREATE INVERTED INDEX IF NOT EXISTS idx_inv_request_path
ON TABLE doc_raw_logs (request_path)
PROPERTIES('analyzer'='keyword');
user_agent uses the english tokenizer (splits by word); request_path uses keyword (treated as a whole, no splitting) to preserve URL semantics.
Verify index creation:
SHOW INDEXES FROM best_practice_soc_log.doc_raw_logs;
Data Ingestion: OSS PIPE (EVENT_NOTIFICATION Mode)
In production, log agents (Fluentd / Logstash) write logs to an OSS bucket, and OSS event notifications trigger the PIPE to automatically ingest.
Option 1: Continuous ingestion via OSS PIPE (recommended)
First create a Storage Connection and Volume pointing to the log bucket, then create the PIPE:
-- Create Volume after an OSS Storage Connection is available
CREATE VOLUME soc_log_vol EXTERNAL
STORAGE_CONNECTION = '<your_oss_connection>'
LOCATION = 'oss://<your-bucket>/soc-logs/';
-- Create OSS PIPE (EVENT_NOTIFICATION mode)
CREATE PIPE IF NOT EXISTS best_practice_soc_log.pipe_raw_logs
VIRTUAL_CLUSTER = 'DEFAULT'
AUTO_INGEST = TRUE
AS
COPY INTO best_practice_soc_log.doc_raw_logs
(log_timestamp, source_ip, dest_ip, protocol, action,
threat_label, log_type, bytes_transferred, user_agent, request_path)
FROM (
SELECT
TO_TIMESTAMP($1, 'YYYY-MM-DD"T"HH24:MI:SS'),
$2, $3, $4, $5, $6, $7, $8::BIGINT, $9, $10
FROM VOLUME soc_log_vol
)
USING csv
OPTIONS('header'='true', 'sep'=',');
💡 Tip: AUTO_INGEST = TRUE enables EVENT_NOTIFICATION mode. After configuring OSS bucket event notifications to point to the PIPE, new files landing in the bucket trigger ingestion — typically with less than 1 minute of latency. For bulk importing historical logs, you can also manually run ALTER PIPE pipe_raw_logs SET INGEST_MODE = LIST_PURGE.
Option 2: INSERT simulation (when OSS is unavailable)
If OSS is not yet configured, use INSERT INTO to write directly to simulate the effect of OSS PIPE already having parsed and written the data:
Import from local CSV (recommended):
-- Step 1: Upload local CSV to User Volume via SQL PUT
PUT '/path/to/your/data.csv' TO USER VOLUME FILE 'data.csv';
-- Step 2: COPY INTO the table from User Volume
COPY INTO best_practice_soc_log.doc_raw_logs
FROM USER VOLUME
USING csv
OPTIONS('header'='true', 'sep'=',', 'nullValue'='')
FILES ('data.csv');
You can also insert test data inline (no CSV file needed):
Verify Bronze layer row count and threat distribution:
SELECT threat_label, log_type, COUNT(*) AS cnt
FROM best_practice_soc_log.doc_raw_logs
GROUP BY threat_label, log_type
ORDER BY threat_label, cnt DESC;
4 logs carrying the SQLMap signature were found, 1 of which is already labeled suspicious. Rows with a benign label indicate the tool's operation did not trigger a threat judgment in this instance, but they are still worth monitoring.
Search by Request Path Keyword (BloomFilter-Assisted IP Filtering)
Combine BloomFilter Index point queries for a specific IP with path analysis to understand attack intent:
SELECT log_id, source_ip, threat_label, action
FROM best_practice_soc_log.doc_raw_logs
WHERE source_ip = '185.220.101.33';
The BloomFilter Index quickly eliminates data blocks that don't contain this IP at the block level, scanning only blocks with matches.
IP Risk Classification UDF
Encapsulate IP risk determination logic as a SQL UDF, reusable in both Silver and Gold layers:
CREATE OR REPLACE FUNCTION best_practice_soc_log.classify_ip_risk(
ip STRING,
threat_label STRING,
is_attack_tool INT
)
RETURNS STRING
AS CASE
WHEN threat_label = 'malicious' THEN 'HIGH'
WHEN threat_label = 'suspicious' AND is_attack_tool = 1 THEN 'HIGH'
WHEN threat_label = 'suspicious' THEN 'MEDIUM'
WHEN is_attack_tool = 1 THEN 'MEDIUM'
ELSE 'LOW'
END;
💡 Tip: In production scenarios, this UDF can be replaced with an External Function that calls external threat intelligence APIs (e.g., AbuseIPDB, VirusTotal) to tag IPs directly in SQL. For External Function development, see the related documentation section.
Verify the UDF:
SELECT source_ip, threat_label, is_attack_tool,
best_practice_soc_log.classify_ip_risk(source_ip, threat_label, is_attack_tool) AS ip_risk_level
FROM best_practice_soc_log.doc_silver_normalized_logs
WHERE threat_label != 'benign'
ORDER BY ip_risk_level
LIMIT 8;
source_ip | threat_label | is_attack_tool | ip_risk_level
-----------------|--------------|----------------|---------------
103.22.200.174 | suspicious | 1 | HIGH
45.142.213.99 | suspicious | 1 | HIGH
185.220.101.33 | malicious | 1 | HIGH
198.199.119.1 | malicious | 0 | HIGH
91.240.118.172 | malicious | 1 | HIGH
104.21.58.152 | malicious | 0 | HIGH
45.95.147.236 | malicious | 1 | HIGH
177.52.183.80 | suspicious | 0 | MEDIUM
Silver Layer Dynamic Table: Normalization and Threat Tagging
The Silver layer performs three operations on top of Bronze raw logs: IP internal/external network classification, threat level digitization, and attack tool fingerprint marking.
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_soc_log.doc_silver_normalized_logs
AS
SELECT
log_id,
log_timestamp,
DATE(log_timestamp) AS log_date,
HOUR(log_timestamp) AS log_hour,
source_ip,
dest_ip,
protocol,
action,
threat_label,
log_type,
bytes_transferred,
user_agent,
request_path,
ingest_time,
-- Whether source IP is internal (RFC 1918 address ranges)
CASE
WHEN source_ip LIKE '192.168.%'
OR source_ip LIKE '10.%'
OR source_ip LIKE '172.16.%'
THEN 1 ELSE 0
END AS is_internal_src,
-- Threat level digitization
CASE threat_label
WHEN 'malicious' THEN 3
WHEN 'suspicious' THEN 2
WHEN 'benign' THEN 1
ELSE 0
END AS threat_level,
-- Attack tool fingerprint
CASE WHEN user_agent IN (
'SQLMap/1.6-dev','Nmap Scripting Engine',
'Metasploit v6.3','Cobalt Strike','Havoc/0.7')
THEN 1 ELSE 0
END AS is_attack_tool
FROM best_practice_soc_log.doc_raw_logs;
⚠️ Note: Do not write REFRESH INTERVAL in Dynamic Table DDL. Refresh scheduling is managed via Lakehouse Studio Tasks (see the "Configure Refresh Tasks" section below).
The Gold layer provides two aggregation tables: daily per-IP threat summary, and high-risk external IP rankings.
Daily Threat Summary
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_soc_log.doc_gold_threat_summary
AS
SELECT
source_ip,
log_date,
log_type,
COUNT(*) AS total_events,
SUM(CASE WHEN threat_label = 'malicious' THEN 1 ELSE 0 END) AS malicious_cnt,
SUM(CASE WHEN threat_label = 'suspicious' THEN 1 ELSE 0 END) AS suspicious_cnt,
SUM(CASE WHEN action = 'blocked' THEN 1 ELSE 0 END) AS blocked_cnt,
SUM(is_attack_tool) AS attack_tool_cnt,
MAX(threat_level) AS max_threat_level,
SUM(bytes_transferred) AS total_bytes,
COUNT(DISTINCT dest_ip) AS unique_dest_count
FROM best_practice_soc_log.doc_silver_normalized_logs
GROUP BY source_ip, log_date, log_type;
High-Risk External IP Rankings
CREATE DYNAMIC TABLE IF NOT EXISTS best_practice_soc_log.doc_gold_high_risk_ips
AS
SELECT
source_ip,
COUNT(DISTINCT log_date) AS active_days,
COUNT(*) AS total_events,
SUM(CASE WHEN threat_label IN ('malicious','suspicious') THEN 1 ELSE 0 END) AS threat_events,
SUM(is_attack_tool) AS attack_tool_hits,
ROUND(
SUM(CASE WHEN threat_label IN ('malicious','suspicious') THEN 1.0 ELSE 0.0 END)
/ COUNT(*) * 100, 2
) AS threat_rate_pct,
MAX(log_timestamp) AS last_seen,
COUNT(DISTINCT dest_ip) AS targets_count
FROM best_practice_soc_log.doc_silver_normalized_logs
WHERE is_internal_src = 0
GROUP BY source_ip
HAVING threat_events > 0;
⚠️ Note: Both Gold layer Dynamic Tables do not write REFRESH INTERVAL — they are scheduled via Lakehouse Studio Tasks (see below).
max_threat_level = 3 is the highest threat level (malicious). The DNS tunneling behavior of 104.21.58.152 (blocked_cnt = 0 but malicious_cnt = 1) deserves special attention — this traffic was not blocked.
Configure Refresh Tasks
Dynamic Table periodic refresh is managed via Lakehouse Studio Tasks, which can have monitoring alerts and data quality checks attached.
💡 Tip: The examples below use cz-cli (the Singdata ClickZetta Lakehouse CLI tool). If cz-cli is not installed, see cz-cli Setup Guide. If you prefer not to use the command line, you can also run SQL in the Development → SQL Editor of Lakehouse Studio, and configure/trigger scheduled tasks in the Studio → Tasks page.
💡 Tip: In the Lakehouse Studio task interface, you can add a dependency relationship to refresh_soc_gold to ensure Silver refreshes before Gold. The same task can also have data quality rules (e.g., alert when malicious_cnt spikes beyond a threshold) and execution failure alert notifications attached.
After a security incident, analysts need to trace back the complete data state at the time of the attack. Time Travel provides historical snapshot queries based on timestamps.
View Version History
DESC HISTORY best_practice_soc_log.doc_raw_logs LIMIT 5;
-- Trace back data state at 2026-06-06 23:37:30
SELECT COUNT(*) AS row_count
FROM best_practice_soc_log.doc_raw_logs
TIMESTAMP AS OF '2026-06-06 23:37:30';
row_count
---------
30
Trace Attack Path
For known malicious IPs, trace back all operations within the attack time window:
SELECT log_timestamp, source_ip, dest_ip, protocol, action, request_path
FROM best_practice_soc_log.doc_raw_logs
TIMESTAMP AS OF '2026-06-06 23:39:07'
WHERE source_ip IN ('185.220.101.33', '198.199.119.1', '45.95.147.236')
ORDER BY log_timestamp;
The Time Travel retention period defaults to 7 days. Within this window, any historical version can be queried at any time without additional backups.
Attack Path Analysis
Combine Gold layer aggregated data to quickly identify high-frequency attack paths:
SELECT
request_path,
COUNT(*) AS total_hits,
SUM(CASE WHEN threat_label IN ('malicious','suspicious') THEN 1 ELSE 0 END) AS threat_hits
FROM best_practice_soc_log.doc_raw_logs
GROUP BY request_path
ORDER BY threat_hits DESC, total_hits DESC
LIMIT 8;
The / path has 13 hits with 3 being threats. /etc/passwd and /shell.php are classic system file probe and webshell upload signatures — even a single access requires immediate response.
Notes
BloomFilter Index only applies to newly added data: After the index is created, run BUILD INDEX idx_bf_source_ip ON TABLE doc_raw_logs to rebuild for existing historical data. It automatically takes effect for newly written data.
Inverted Index tokenizer selection: Use english for user_agent (splits by word) and keyword for request_path (treated as a whole). Choosing the wrong tokenizer will cause MATCH_ALL to fail to match expected content.
Do not write REFRESH INTERVAL in Dynamic Table DDL: Refresh scheduling must be managed via Lakehouse Studio Tasks to enable attaching alerts and data quality checks to the same task.
Dynamic Table static partition mode: To partition Silver/Gold layer Dynamic Tables by log_date, you must use PARTITION BY with static partition declaration — dynamic partition inference cannot be used.
Time Travel retention period: Default is 7 days. Historical versions beyond the retention period cannot be queried. For scenarios requiring longer retention for compliance, configure DATA_RETENTION_TIME_IN_DAYS at table creation time.
OSS PIPE EVENT_NOTIFICATION mode: Requires configuring event notification rules in the OSS console, pointing ObjectCreated:* events to the message queue endpoint provided by Lakehouse.
External Function calling threat intelligence APIs: The example classify_ip_risk is a SQL UDF — a simplified logical version. For production scenarios, call real APIs such as AbuseIPDB via External Function. See the related documentation section.
Column Masking: You can set masking policies on PII data such as source_ip.
Related Documentation
CREATE PIPE — OSS PIPE and Kafka PIPE syntax reference
⚠️ Pending manual verification: Column masking currently matches authorized users by username via current_user(), requiring all users with permission to view plaintext to be added one by one to the masking function's allowlist. If your Lakehouse version supports role-based dynamic evaluation (e.g., HAS_ROLE('role_name')), roles can replace the username list for more flexible maintenance. Please contact Singdata technical support to confirm whether the current version supports this function.