Multi-Engine Iceberg Data Lake Federation Query Pipeline Best Practices

Upstream Spark, Flink, or PyIceberg writes data in Iceberg format to OSS/S3/COS. Singdata Lakehouse federates queries directly through an External Catalog (Iceberg REST) without copying data. Dynamic Tables then handle incremental Silver/Gold layer processing and output report metrics. This guide uses 30 simulated orders and 10 product dimension records as the dataset, demonstrating the complete end-to-end setup of this architecture.


Overview

The core challenge with multi-engine Iceberg data lakes is: multiple write engines each maintain their own Iceberg tables, and the analytics layer needs to join this data with internal tables while correctly handling Iceberg DELETE/UPDATE semantics.

Singdata Lakehouse addresses these core problems with the following combination:

ProblemSolution
Iceberg tables written by Spark have DELETE files that PIPE cannot recognizeExternal Catalog reads snapshots via REST API and correctly applies deletion vectors
Multiple engines write separately, schema may evolveExternal Catalog automatically tracks Iceberg schema versions with no manual column mapping required
Large data volumes, no desire to fully copy into LakehouseExternal Catalog zero-copy federation, data files remain in OSS/S3
Analytics layer needs multi-layer Silver/Gold processingDynamic Tables use External Catalog tables as upstream and auto-incrementally refresh
Downstream Spark/Trino needs to read Lakehouse internal tablesLakehouse itself provides a standard Iceberg REST Catalog interface for bidirectional interoperability

SQL Commands Used

Command / FeaturePurposeNotes
CREATE STORAGE CONNECTIONDeclare credentials for accessing OSS/S3/COSUsed by External Catalog when reading Parquet data files
CREATE CATALOG CONNECTION TYPE ICEBERG_RESTConnect to the Iceberg REST Catalog serviceStores authentication info (URI, OAuth, etc.)
CREATE EXTERNAL CATALOGMount an Iceberg Catalog, mapping to three-level catalog.schema.table namingFederation query entry point
SELECT catalog.schema.tableFederated query of Iceberg data without landing itSupports snapshot skipping and delete file merging
CREATE DYNAMIC TABLEUse External Catalog tables as upstream, define Silver/Gold processing logicDeclarative SQL, system auto-incrementally refreshes
REFRESH DYNAMIC TABLETrigger a manual refreshUsed for initial builds or debugging

PIPE vs External Catalog (Iceberg REST): Choosing the Right Approach

When ingesting Iceberg data, two common approaches have different applicable scenarios:

DimensionPIPE (LIST_PURGE / EVENT_NOTIFICATION)External Catalog (Iceberg REST)
File understandingScans Parquet files, does not read Iceberg metadataReads snapshots/manifests via REST API
DELETE/UPDATE handlingCannot identify delete files, only sees data filesCorrectly applies deletion vectors, results are accurate
Schema evolutionRequires manual column mapping maintenance, error-proneAuto-detects column changes, follows Iceberg schema versions
Data landingData written to Lakehouse internal tablesData stays in OSS/S3, zero-copy federation
Applicable scenariosOne-time historical file import, append-only writesMulti-engine shared Iceberg, scenarios with UPDATE/DELETE
PrerequisitesRequires Volume + Storage ConnectionRequires Iceberg REST Catalog service

Prerequisites

All examples in this guide run under the best_practice_iceberg_fed schema.

CREATE SCHEMA IF NOT EXISTS best_practice_iceberg_fed;


External Catalog Layer: Connecting to an Iceberg REST Catalog

Prerequisites

External Catalog (Iceberg REST) requires the following environment to be set up in advance:

  • Write side: An engine capable of writing Iceberg format (Apache Spark, Flink, PyIceberg)
  • Iceberg REST Catalog service: Choose any of the following:
    • Open-source self-hosted: Apache Polaris, Apache Gravitino, Project Nessie
    • Cloud-managed: Snowflake Open Catalog, AWS Glue (Iceberg REST mode)
  • Object storage: OSS (Alibaba Cloud), S3 (AWS), or COS (Tencent Cloud) to store Parquet data files

Step 1: Create a Storage Connection

A Storage Connection stores the credentials needed to access Parquet data files. The External Catalog uses this connection for authentication when reading data files.

OSS (Alibaba Cloud) example:

CREATE STORAGE CONNECTION IF NOT EXISTS iceberg_oss_conn TYPE OSS ACCESS_ID = '<your-access-key-id>' ACCESS_KEY = '<your-access-key-secret>' ENDPOINT = 'oss-cn-hangzhou-internal.aliyuncs.com';

S3 (AWS) example:

CREATE STORAGE CONNECTION IF NOT EXISTS iceberg_s3_conn TYPE S3 ACCESS_KEY = '<your-access-key-id>' SECRET_KEY = '<your-secret-access-key>' ENDPOINT = 's3.cn-north-1.amazonaws.com.cn' REGION = 'cn-north-1';

Step 2: Create a Catalog Connection (TYPE ICEBERG_REST)

A Catalog Connection stores the API endpoint and authentication credentials for the Iceberg REST Catalog service.

Generic Iceberg REST Catalog (no auth, e.g. Nessie / self-hosted Gravitino):

CREATE CATALOG CONNECTION IF NOT EXISTS iceberg_rest_conn TYPE ICEBERG_REST URI = 'https://your-iceberg-catalog.example.com/api/catalog' ACCESS_REGION = 'cn-hangzhou';

With OAuth authentication (e.g. Apache Polaris / Snowflake Open Catalog):

CREATE CATALOG CONNECTION IF NOT EXISTS polaris_conn TYPE ICEBERG_REST URI = 'https://<account>.snowflakecomputing.com/polaris/api/catalog' ACCESS_REGION = 'ap-southeast-1' OAUTH_CLIENT_ID = '<your-client-id>' OAUTH_CLIENT_SECRET = '<your-client-secret>' OAUTH_SCOPE = 'PRINCIPAL_ROLE:ALL' NAMESPACE = '<your_database>' WAREHOUSE = '<your_catalog_name>' WITH PROPERTIES ( 'client.region' = 'ap-southeast-1', 'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO' );

Step 3: Create an External Catalog

Create an External Catalog based on the Catalog Connection, mapped to three-level catalog.schema.table naming:

CREATE EXTERNAL CATALOG iceberg_catalog CONNECTION iceberg_rest_conn;

After successful creation, you can view the schemas and tables in the Catalog:

-- View all schemas in the Catalog SHOW SCHEMAS IN iceberg_catalog; -- View tables under a specific schema SHOW TABLES IN iceberg_catalog.ecommerce; -- Federated query of Iceberg table (zero data copy) SELECT * FROM iceberg_catalog.ecommerce.orders LIMIT 10;


Simulated Data Layer: Local Tables as Iceberg External Table Substitutes

When there is no actual Iceberg REST Catalog environment, use Lakehouse internal tables to simulate the effect of reading from Iceberg, and verify the downstream Dynamic Table processing logic.

Create the Product Dimension Table

CREATE TABLE IF NOT EXISTS best_practice_iceberg_fed.doc_products_local ( product_id STRING, product_name STRING, category STRING, brand STRING, cost_price DOUBLE, list_price DOUBLE );

Import from a local CSV file (recommended):

-- Step 1: Upload the local CSV file to User Volume via SQL PUT PUT '/path/to/doc_products_local.csv' TO USER VOLUME FILE 'doc_products_local.csv';

-- Step 2: COPY INTO the table from User Volume COPY INTO best_practice_iceberg_fed.doc_products_local FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('doc_products_local.csv');

You can also directly insert a small batch of test data inline (no CSV file needed):

INSERT INTO best_practice_iceberg_fed.doc_products_local VALUES ('P001','Smartphone X1','Electronics','TechBrand',1800.0,3299.0), ('P002','Laptop Pro 14','Electronics','TechBrand',4200.0,7999.0), ('P003','Wireless Earbuds','Electronics','SoundMax',180.0,499.0), ('P004','Cotton T-Shirt','Apparel','FashionCo',30.0,129.0), ('P005','Running Shoes','Apparel','SportMax',220.0,699.0), ('P006','Coffee Maker','Kitchen','HomeChef',350.0,899.0), ('P007','Yoga Mat','Sports','FitLife',45.0,199.0), ('P008','Backpack 30L','Accessories','TravelPro',80.0,299.0), ('P009','LED Desk Lamp','Furniture','LightUp',55.0,199.0), ('P010','Protein Powder 1kg','Health','NutriPlus',120.0,349.0);

Create the Order Fact Table (Simulating Iceberg External Table Reads)

CREATE TABLE IF NOT EXISTS best_practice_iceberg_fed.doc_orders_local ( order_id STRING, customer_id STRING, product_id STRING, region STRING, order_date DATE, quantity INT, unit_price DOUBLE, discount_rate DOUBLE, status STRING, ingest_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP() );

Import from a local CSV file (recommended):

-- Step 1: Upload the local CSV file to User Volume via SQL PUT PUT '/path/to/doc_orders_local.csv' TO USER VOLUME FILE 'doc_orders_local.csv';

-- Step 2: COPY INTO the table from User Volume COPY INTO best_practice_iceberg_fed.doc_orders_local FROM USER VOLUME USING csv OPTIONS('header'='true', 'sep'=',', 'nullValue'='') FILES ('doc_orders_local.csv');

You can also directly insert a small batch of test data inline (no CSV file needed):

INSERT INTO best_practice_iceberg_fed.doc_orders_local (order_id, customer_id, product_id, region, order_date, quantity, unit_price, discount_rate, status) VALUES ('ORD001','C101','P001','East',DATE '2024-01-05',1,3299.0,0.0,'completed'), ('ORD002','C102','P002','West',DATE '2024-01-06',1,7999.0,0.05,'completed'), ('ORD003','C103','P003','East',DATE '2024-01-07',2,499.0,0.0,'completed'), ('ORD004','C104','P004','South',DATE '2024-01-08',3,129.0,0.1,'completed'), ('ORD005','C105','P005','North',DATE '2024-01-09',1,699.0,0.0,'completed'), ('ORD006','C101','P006','East',DATE '2024-01-10',1,899.0,0.0,'completed'), ('ORD007','C106','P007','West',DATE '2024-01-11',2,199.0,0.0,'completed'), ('ORD008','C107','P008','South',DATE '2024-01-12',1,299.0,0.0,'completed'), ('ORD009','C108','P001','North',DATE '2024-01-13',2,3299.0,0.1,'completed'), ('ORD010','C109','P009','East',DATE '2024-01-14',1,199.0,0.0,'completed'), ('ORD011','C110','P010','West',DATE '2024-01-15',2,349.0,0.0,'completed'), ('ORD012','C102','P003','East',DATE '2024-01-16',1,499.0,0.0,'completed'), ('ORD013','C111','P002','North',DATE '2024-01-17',1,7999.0,0.0,'completed'), ('ORD014','C112','P004','South',DATE '2024-01-18',5,129.0,0.15,'completed'), ('ORD015','C103','P005','East',DATE '2024-01-19',1,699.0,0.0,'completed'), ('ORD016','C113','P006','West',DATE '2024-01-20',1,899.0,0.05,'completed'), ('ORD017','C114','P007','North',DATE '2024-01-21',3,199.0,0.0,'completed'), ('ORD018','C115','P008','East',DATE '2024-01-22',2,299.0,0.0,'completed'), ('ORD019','C116','P001','South',DATE '2024-01-23',1,3299.0,0.0,'completed'), ('ORD020','C117','P009','West',DATE '2024-01-24',2,199.0,0.0,'completed'), ('ORD021','C118','P010','North',DATE '2024-01-25',1,349.0,0.0,'completed'), ('ORD022','C101','P002','East',DATE '2024-02-01',1,7999.0,0.0,'completed'), ('ORD023','C119','P003','West',DATE '2024-02-02',3,499.0,0.1,'completed'), ('ORD024','C120','P004','South',DATE '2024-02-03',2,129.0,0.0,'completed'), ('ORD025','C104','P005','East',DATE '2024-02-04',1,699.0,0.0,'completed'), ('ORD026','C121','P001','North',DATE '2024-02-05',1,3299.0,0.0,'cancelled'), ('ORD027','C122','P006','West',DATE '2024-02-06',1,899.0,0.0,'completed'), ('ORD028','C123','P007','East',DATE '2024-02-07',2,199.0,0.0,'completed'), ('ORD029','C124','P010','South',DATE '2024-02-08',3,349.0,0.0,'completed'), ('ORD030','C125','P002','North',DATE '2024-02-09',1,7999.0,0.05,'completed');

Verify data was written:

SELECT COUNT(*) AS order_count FROM best_practice_iceberg_fed.doc_orders_local LIMIT 50;

Result:

order_count
30

Silver Layer: Dynamic Table Cleansing + Dimension Join

The Silver layer joins the order table (corresponding to the Iceberg external table) with the product dimension table, filters out cancelled orders, and calculates actual revenue and gross profit.

In an actual Iceberg federation environment, replace doc_orders_local with iceberg_catalog.ecommerce.orders to use the same DDL.

CREATE DYNAMIC TABLE best_practice_iceberg_fed.dt_silver_orders AS SELECT o.order_id, o.customer_id, o.product_id, p.product_name, p.category, p.brand, o.region, o.order_date, YEAR(o.order_date) AS order_year, MONTH(o.order_date) AS order_month, o.quantity, o.unit_price, o.discount_rate, ROUND(o.quantity * o.unit_price * (1 - o.discount_rate), 2) AS net_revenue, ROUND(o.quantity * (o.unit_price - p.cost_price) * (1 - o.discount_rate), 2) AS gross_profit, o.status, o.ingest_ts FROM best_practice_iceberg_fed.doc_orders_local o LEFT JOIN best_practice_iceberg_fed.doc_products_local p ON o.product_id = p.product_id WHERE o.status = 'completed';

Trigger the first manual refresh and then query the Silver layer results:

REFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_silver_orders;

SELECT order_id, customer_id, product_name, category, region, order_date, net_revenue, gross_profit FROM best_practice_iceberg_fed.dt_silver_orders ORDER BY order_date LIMIT 10;

Result:

order_idcustomer_idproduct_namecategoryregionorder_datenet_revenuegross_profit
ORD001C101Smartphone X1ElectronicsEast2024-01-053299.01499.0
ORD002C102Laptop Pro 14ElectronicsWest2024-01-067599.053609.05
ORD003C103Wireless EarbudsElectronicsEast2024-01-07998.0638.0
ORD004C104Cotton T-ShirtApparelSouth2024-01-08348.3267.3
ORD005C105Running ShoesApparelNorth2024-01-09699.0479.0
ORD006C101Coffee MakerKitchenEast2024-01-10899.0549.0
ORD007C106Yoga MatSportsWest2024-01-11398.0308.0
ORD008C107Backpack 30LAccessoriesSouth2024-01-12299.0219.0
ORD009C108Smartphone X1ElectronicsNorth2024-01-135938.22698.2
ORD010C109LED Desk LampFurnitureEast2024-01-14199.0144.0

The Silver layer filters out ORD026 (status='cancelled', 3,299 yuan), retaining only 29 completed orders. net_revenue is net of discount and gross_profit is net of cost.


Gold Layer: Dynamic Table Aggregated Metrics

The Gold layer aggregates on top of Silver by region, category, and year-month, outputting order count, total revenue, gross profit, and profit margin per dimension for direct BI tool queries.

CREATE DYNAMIC TABLE best_practice_iceberg_fed.dt_gold_regional_metrics AS SELECT region, category, order_year, order_month, COUNT(order_id) AS order_count, SUM(quantity) AS total_qty, ROUND(SUM(net_revenue), 2) AS total_revenue, ROUND(SUM(gross_profit), 2) AS total_profit, ROUND(SUM(gross_profit) / NULLIF(SUM(net_revenue), 0) * 100, 2) AS profit_margin_pct FROM best_practice_iceberg_fed.dt_silver_orders GROUP BY region, category, order_year, order_month;

REFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_gold_regional_metrics;

SELECT region, category, order_year, order_month, order_count, total_revenue, total_profit, profit_margin_pct FROM best_practice_iceberg_fed.dt_gold_regional_metrics ORDER BY total_revenue DESC LIMIT 10;

Result:

regioncategoryorder_yearorder_monthorder_counttotal_revenuetotal_profitprofit_margin_pct
NorthElectronics20241213937.26497.246.62
EastElectronics2024217999.03799.047.49
WestElectronics2024117599.053609.0547.49
NorthElectronics2024217599.053609.0547.49
EastElectronics2024134796.02456.051.21
SouthElectronics2024113299.01499.045.44
WestElectronics2024211347.3861.363.93
SouthHealth2024211047.0687.065.62
EastKitchen202411899.0549.061.07
WestKitchen202421899.0549.061.07

Electronics has the highest revenue (North January: 13,937 yuan), while Health and Kitchen categories have relatively higher profit margins (60%+). BI tools can connect directly to this table to power regional sales dashboards.


Configure Refresh Scheduling

Dynamic Table periodic refresh is managed through Lakehouse Studio tasks, not written in the DDL. The advantage is that you can attach monitoring alerts and data quality check rules to the same task, providing a unified operations entry point.

The following tasks have been created under the best_practices/iceberg_fed/ path:

Task NameSQL ContentSchedule
refresh_dt_silver_ordersREFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_silver_ordersEvery hour (0 0/1 * * ?)
refresh_dt_gold_metricsREFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_gold_regional_metricsEvery hour (0 0/1 * * ?)

cz-cli workflow for creating tasks:

# 1. Create a subdirectory under best_practices cz-cli task create-folder iceberg_fed -p skill_test --parent 186117 # 2. Create the Silver refresh task (use the folder ID from the previous step) cz-cli task create refresh_dt_silver_orders -p skill_test --type SQL --folder <folder_id> cz-cli task save-content refresh_dt_silver_orders -p skill_test \ --content "REFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_silver_orders" cz-cli task save-cron refresh_dt_silver_orders -p skill_test --cron "0 0/1 * * ?" # 3. Create the Gold refresh task cz-cli task create refresh_dt_gold_metrics -p skill_test --type SQL --folder <folder_id> cz-cli task save-content refresh_dt_gold_metrics -p skill_test \ --content "REFRESH DYNAMIC TABLE best_practice_iceberg_fed.dt_gold_regional_metrics" cz-cli task save-cron refresh_dt_gold_metrics -p skill_test --cron "0 0/1 * * ?" # 4. Deploy to production cz-cli task deploy refresh_dt_silver_orders -p skill_test cz-cli task deploy refresh_dt_gold_metrics -p skill_test


Bidirectional Interoperability: Lakehouse as an Iceberg REST Provider

In addition to reading external Iceberg tables, Singdata Lakehouse itself provides a standard Iceberg REST Catalog interface externally. External engines such as Spark and Trino can reversely read Lakehouse internal tables, enabling bidirectional data sharing:

  • Direction 1 (main flow of this guide): External Spark writes Iceberg → Lakehouse External Catalog federated read
  • Direction 2 (reverse): Lakehouse internal tables → expose Iceberg REST API externally → external Spark/Trino reads

For configuration to provide Iceberg REST API externally, see: Accessing Lakehouse from Spark via Iceberg REST Catalog


Notes

  • External Catalog and Catalog Connection creation validates REST API reachability; if the Iceberg REST Catalog service is not running or the network is unavailable, DDL will fail
  • Currently, only the instance_admin role can query External Catalogs; after writing results to internal tables via Dynamic Tables, downstream access can be controlled with normal table permissions
  • When a Dynamic Table references an External Catalog table, use three-level naming in the DDL (catalog.schema.table); use two-level naming for internal tables (schema.table)
  • REFRESH DYNAMIC TABLE on a Dynamic Table that references an External Catalog table triggers a complete Iceberg snapshot read; if the upstream Iceberg table changes frequently, the scheduling interval should be no shorter than 5 minutes to avoid excessive REST API calls
  • In CREATE CATALOG CONNECTION TYPE ICEBERG_REST, there is no = after TYPE and no commas between parameters — this is a common syntax error