Ingesting Data from Alibaba Cloud Data Lake into Singdata Lakehouse's Three-Layer Data Warehouse
About the Three-Layer Data Warehouse
In modern data lakehouse architectures, a three-layer data warehouse (3 Layer Data Warehouse) is typically divided into the Bronze layer, Silver layer, and Gold layer. This architecture provides a systematic approach to managing data in different states, from raw data to high-quality data.
1. Bronze Layer (Raw Data Layer)
The Bronze layer is the bottom layer of the data warehouse, used to store raw data extracted from various data sources. This data is unprocessed and retains its original form.
Characteristics:
- Data state: Raw, unprocessed data.
- Data sources: Various data sources (databases, logs, files, etc.).
- Purpose: Provide raw data backup and data lineage tracing, ensuring data integrity and auditability.
The Silver layer is used to store cleaned and transformed data. Data in this layer has gone through ETL (Extract, Transform, Load) processing, removing noise and redundancy, and being transformed into structured and standardized data formats.
Characteristics:
- Data state: Cleaned and standardized data.
- Data operations: Cleaning, deduplication, data transformation, and integration.
- Purpose: Provide high-quality, structured data for further processing and analysis.
The Gold layer is the key data layer for analytics and business, storing data that has been further optimized and aggregated. This layer typically supports business intelligence (BI), data analytics, and reporting applications.
Characteristics:
- Data state: High-quality, aggregated, and optimized data.
- Data operations: Data aggregation, multi-dimensional analysis, data modeling.
- Purpose: Support data analysis, business intelligence, and decision-making, providing optimized data views.
Advantages of the Three-Layer Data Warehouse Architecture
- Data management efficiency: Layered storage and processing makes management and maintenance more convenient.
- Data quality improvement: The cleaning and transformation layer ensures data consistency and accuracy.
- Efficient data access: Optimized data structures in the business information layer improve query performance.
- Strong flexibility: Adapts to different business requirements and supports integration and processing of various data sources.
Through this three-layer architecture, enterprises can more effectively manage and analyze data, ensuring proper processing and optimization at every stage from data collection to analysis.
Prerequisites
Implementation Solution Based on Singdata Lakehouse
This solution creates a multi-layer data warehouse architecture based on Singdata Lakehouse, with three layers: a Bronze layer for data extraction, a Silver layer for cleaning and transforming data, and a Gold layer for business-level aggregation and data modification.

Bronze Layer
The Bronze layer focuses on data ingestion from Alibaba Cloud Object Storage (OSS) into Singdata Lakehouse. This is accomplished by creating a data lake Connection and External Volume, along with Lakehouse Pipes and Table Streams. This stage specifies the location of data in Alibaba Cloud OSS within the External Volume. Using External Volumes, data can be automatically ingested in real time into Lakehouse tables using Lakehouse Pipes.
Finally, a Table Stream is created for each table to track and save any changes made to the table. These streams can be used to identify changes in Bronze layer tables and apply updates to the corresponding tables in the Silver layer.
Silver Layer
The Silver layer focuses on data cleaning and transformation. It uses raw data from the Bronze layer and transforms it to meet company requirements. These transformations include cleaning missing or outlier values, data validation, and removing unused or unimportant data.
| Transformation | Details |
|---|
| Email validation | Ensure email is not empty |
| Customer type | Standardize customer type to "Regular", "Premium", or "Unknown" |
| Age validation | Ensure age is between 18 and 120 |
| Gender standardization | Classify gender as "Male", "Female", or "Other" |
| Total purchases validation | Ensure total purchases is a number, default to 0 if invalid |
| Transformation | Details |
|---|
| Price validation | Ensure price is positive |
| Stock quantity validation | Ensure stock quantity is non-negative |
| Rating validation | Ensure rating is between 0 and 5 |
| Transformation | Details |
|---|
| Amount validation | Ensure transaction amount > 0 |
| Transaction ID validation | Ensure transaction ID is not empty |
Gold Layer
The Gold layer aims to leverage the transformed data from the Silver layer to create dynamic tables usable for business analysis. For example, DT_RegionAnalysis is a unified data view combining all three tables to analyze sales performance across different regions and identify the best-performing regions.
Beyond the simple dynamic tables demonstrated in this project, many additional analyses can be performed at the Gold layer.
Data Flow

Implementation Steps Based on Singdata Lakehouse
Navigate to Lakehouse Studio Development -> Tasks,

Click "+" to create the following directories:
- 01_QuickStarts_Data_from_Alicloud_Datalake_to_3Layer_Clickzetta_Data_Warehouse
Click "+" to create the following SQL tasks, and click Run after creation:

Build the Lakehouse Environment
Create SQL task: 01_Env_Setup
CREATE VCLUSTER IF NOT EXISTS Three_Layer_DWH_VC
VCLUSTER_SIZE = XSMALL
VCLUSTER_TYPE = GENERAL
AUTO_SUSPEND_IN_SECOND = 60
AUTO_RESUME = TRUE
COMMENT 'virtual cluster for Three_Layer_DWH';
-- Use our VCLUSTER
USE VCLUSTER Three_Layer_DWH_VC;
-- Create and Use SCHEMA
CREATE SCHEMA IF NOT EXISTS Three_Layer_DWH_SCH;
USE SCHEMA Three_Layer_DWH_SCH;
Develop the Bronze Layer
Create directory: 01_Bronze_Layer

Create Data Lake Connection
Create SQL task: 00_DataLake_Connections
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
-- Create Storage Connection to the data lake
CREATE STORAGE CONNECTION if not exists hz_ingestion_demo
TYPE oss
ENDPOINT = 'oss-cn-hangzhou-internal.aliyuncs.com'
access_id = 'Please enter your access_id'
access_key = 'Please enter your access_key'
comments = 'hangzhou oss private endpoint for ingest demo';
Create Data Lake Volumes
Create data lake Volumes, each corresponding to the storage location of customer, product, and order data files.
Create SQL task: VOLUME_FOR_RAW_CUSTOMER
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
-- Create Volume, the location of data lake storage files
CREATE EXTERNAL VOLUME if not exists VOLUME_FOR_RAW_CUSTOMER
LOCATION 'oss://yourbucketname/VOLUME_FOR_RAW_CUSTOMER'
USING connection hz_ingestion_demo -- storage Connection
DIRECTORY = (
enable = TRUE
)
recursive = TRUE;
-- Sync the data lake Volume directory to Lakehouse
ALTER volume VOLUME_FOR_RAW_CUSTOMER refresh;
-- View files on the Singdata Lakehouse data lake Volume
SELECT * from directory(volume VOLUME_FOR_RAW_CUSTOMER);
Create SQL task: VOLUME_FOR_RAW_ORDER
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
-- Create Volume, the location of data lake storage files
CREATE EXTERNAL VOLUME if not exists VOLUME_FOR_RAW_ORDER
LOCATION 'oss://yourbucketname/VOLUME_FOR_RAW_ORDER'
USING connection hz_ingestion_demo -- storage Connection
DIRECTORY = (
enable = TRUE
)
recursive = TRUE;
-- Sync the data lake Volume directory to Lakehouse
ALTER volume VOLUME_FOR_RAW_ORDER refresh;
-- View files on the Singdata Lakehouse data lake Volume
SELECT * from directory(volume VOLUME_FOR_RAW_ORDER);
Create SQL task: VOLUME_FOR_RAW_PRODUCT
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
-- Create Volumes, the location of data lake storage files
CREATE EXTERNAL VOLUME if not exists VOLUME_FOR_RAW_PRODUCT
LOCATION 'oss://yourbucketname/VOLUME_FOR_RAW_PRODUCT'
USING connection hz_ingestion_demo -- storage Connection
DIRECTORY = (
enable = TRUE
)
recursive = TRUE;
-- Sync the data lake Volume directory to Lakehouse
ALTER volume VOLUME_FOR_RAW_PRODUCT refresh;
-- View files on the Singdata Lakehouse data lake Volume
SELECT * from directory(volume VOLUME_FOR_RAW_PRODUCT);
Create Tables
Create Tables, each corresponding to storing raw data for customers, products, and orders.
Create SQL task: RAW_CUSTOMER
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
-- Create the table to store customer data
CREATE TABLE IF NOT EXISTS raw_customer (
customer_id INT,
name STRING,
email STRING,
country STRING,
customer_type STRING,
registration_date STRING,
age INT,
gender STRING,
total_purchases INT,
ingestion_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
Create SQL task: RAW_ORDER
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
-- Create table to store order data
CREATE TABLE IF NOT EXISTS raw_order (
customer_id INT,
payment_method STRING,
product_id INT,
quantity INT,
store_type STRING,
total_amount DOUBLE,
transaction_date DATE,
transaction_id STRING,
ingestion_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
Create SQL task: RAW_PRODUCT
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
-- Create the table to store the product data
CREATE TABLE IF NOT EXISTS raw_product (
product_id INT,
name STRING,
category STRING,
brand STRING,
price FLOAT,
stock_quantity INT,
rating FLOAT,
is_active BOOLEAN,
ingestion_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
Create Pipes
Create Pipes, each corresponding to real-time ingestion of data from customer, product, and order files into Singdata Lakehouse raw tables.
Create SQL task: PIPE_FOR_CUSTOMER
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
CREATE PIPE IF NOT EXISTS PIPE_FOR_CUSTOMER
VIRTUAL_CLUSTER = 'Three_Layer_DWH_VC'
-- Use LIST_PURGE mode to scan for the latest files
INGEST_MODE = 'LIST_PURGE'
AS
COPY INTO raw_customer FROM VOLUME VOLUME_FOR_RAW_CUSTOMER (
customer_id INT,
name STRING,
email STRING,
country STRING,
customer_type STRING,
registration_date STRING,
age INT,
gender STRING,
total_purchases INT,
ingestion_timestamp TIMESTAMP_NTZ
)
USING CSV OPTIONS (
'header'='true'
)
-- Must add purge parameter to delete data after successful import
PURGE=true
;
Create SQL task: PIPE_FOR_ORDER
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
CREATE PIPE IF NOT EXISTS PIPE_FOR_ORDER
VIRTUAL_CLUSTER = 'Three_Layer_DWH_VC'
-- Use LIST_PURGE mode to scan for the latest files
INGEST_MODE = 'LIST_PURGE'
AS
COPY INTO raw_ORDER FROM VOLUME VOLUME_FOR_RAW_ORDER (
customer_id INT,
payment_method STRING,
product_id INT,
quantity INT,
store_type STRING,
total_amount DOUBLE,
transaction_date DATE,
transaction_id STRING,
ingestion_timestamp TIMESTAMP_NTZ
)
USING CSV OPTIONS (
'header'='true'
)
-- Must add purge parameter to delete data after successful import
PURGE=true
;
Create SQL task: PIPE_FOR_PRODUCT
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
CREATE PIPE IF NOT EXISTS PIPE_FOR_PRODUCT
VIRTUAL_CLUSTER = 'Three_Layer_DWH_VC'
-- Use LIST_PURGE mode to scan for the latest files
INGEST_MODE = 'LIST_PURGE'
AS
COPY INTO raw_PRODUCT FROM VOLUME VOLUME_FOR_RAW_PRODUCT (
product_id INT,
name STRING,
category STRING,
brand STRING,
price FLOAT,
stock_quantity INT,
rating FLOAT,
is_active BOOLEAN,
ingestion_timestamp TIMESTAMP_NTZ
)
USING CSV OPTIONS (
'header'='true'
)
-- Must add purge parameter to delete data after successful import
PURGE=true
;
Create Table Streams
Create Table Streams, each detecting data changes in raw tables and storing change data in the Table Stream.
Create SQL task: CUSTOMER_CHANGES_STREAM
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
CREATE TABLE STREAM IF NOT EXISTS customer_changes_stream
ON TABLE raw_customer
WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY');
Create SQL task: ORDER_CHANGES_STREAM
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
CREATE TABLE STREAM IF NOT EXISTS order_changes_stream
ON TABLE raw_order
WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY');
Create SQL task: PRODUCT_CHANGES_STREAM
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
CREATE TABLE STREAM IF NOT EXISTS product_changes_stream
ON TABLE raw_product
WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY');
Develop the Silver Layer

Create Tables
Create Silver layer Tables to store cleaned and transformed data.
Create SQL task: SILVER_CUSTOMER
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
-- Silver Customer Table
CREATE TABLE IF NOT EXISTS SILVER_CUSTOMER (
customer_id INT,
name STRING,
email STRING,
country STRING,
customer_type STRING,
registration_date DATE,
age INT,
gender STRING,
total_purchases INT,
last_updated_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
Create SQL task: SILVER_ORDERS
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
-- Silver Order Table
CREATE TABLE IF NOT EXISTS SILVER_ORDERS (
transaction_id STRING,
customer_id INT,
product_id INT,
quantity INT,
store_type STRING,
total_amount DOUBLE,
transaction_date DATE,
payment_method STRING,
last_updated_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
Create SQL task: SILVER_PRODUCT
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
-- Silver Product Table
CREATE TABLE IF NOT EXISTS SILVER_PRODUCT (
product_id INT,
name STRING,
category STRING,
brand STRING,
price FLOAT,
stock_quantity INT,
rating FLOAT,
is_active BOOLEAN,
last_updated_timestamp TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
Develop SQL tasks to clean and transform raw data.
Create SQL task: CustomerTransform
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
-- Merge changes into silver layer
MERGE INTO silver_customer AS target
USING (
SELECT
customer_id,
name,
email,
country,
-- Customer type standardization
CASE
WHEN TRIM(UPPER(customer_type)) IN ('REGULAR', 'REG', 'R') THEN 'Regular'
WHEN TRIM(UPPER(customer_type)) IN ('PREMIUM', 'PREM', 'P') THEN 'Premium'
ELSE 'Unknown'
END AS customer_type,
-- Convert registration_date to DATE type for compatibility
CAST(registration_date AS DATE) AS registration_date,
-- Age validation
CASE
WHEN age BETWEEN 18 AND 120 THEN age
ELSE NULL
END AS age,
-- Gender standardization
CASE
WHEN TRIM(UPPER(gender)) IN ('M', 'MALE') THEN 'Male'
WHEN TRIM(UPPER(gender)) IN ('F', 'FEMALE') THEN 'Female'
ELSE 'Other'
END AS gender,
-- Total purchases validation
CASE
WHEN total_purchases >= 0 THEN total_purchases
ELSE 0
END AS total_purchases,
current_timestamp() AS last_updated_timestamp
FROM customer_changes_stream
WHERE customer_id IS NOT NULL AND email IS NOT NULL -- Basic data quality rule
) AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN
UPDATE SET
name = source.name,
email = source.email,
country = source.country,
customer_type = source.customer_type,
registration_date = source.registration_date,
age = source.age,
gender = source.gender,
total_purchases = source.total_purchases,
last_updated_timestamp = source.last_updated_timestamp
WHEN NOT MATCHED THEN
INSERT (customer_id, name, email, country, customer_type, registration_date, age, gender, total_purchases, last_updated_timestamp)
VALUES (source.customer_id, source.name, source.email, source.country, source.customer_type, source.registration_date, source.age, source.gender, source.total_purchases, source.last_updated_timestamp);
Create SQL task: OrderTransform
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
MERGE INTO silver_orders AS target
USING (
SELECT
transaction_id,
customer_id,
product_id,
quantity,
store_type,
total_amount,
transaction_date,
payment_method,
CURRENT_TIMESTAMP() AS last_updated_timestamp
FROM order_changes_stream where transaction_id is not null
and total_amount> 0) AS source
ON target.transaction_id = source.transaction_id
WHEN MATCHED THEN
UPDATE SET
customer_id = source.customer_id,
product_id = source.product_id,
quantity = source.quantity,
store_type = source.store_type,
total_amount = source.total_amount,
transaction_date = source.transaction_date,
payment_method = source.payment_method,
last_updated_timestamp = source.last_updated_timestamp
WHEN NOT MATCHED THEN
INSERT (transaction_id, customer_id, product_id, quantity, store_type, total_amount, transaction_date, payment_method, last_updated_timestamp)
VALUES (source.transaction_id, source.customer_id, source.product_id, source.quantity, source.store_type, source.total_amount, source.transaction_date, source.payment_method, source.last_updated_timestamp);
Create SQL task: ProductTransform
-- Use our VCLUSTER and SCHEMA
USE VCLUSTER Three_Layer_DWH_VC;
USE SCHEMA Three_Layer_DWH_SCH;
MERGE INTO silver_product AS target
USING (
SELECT
product_id,
name AS name,
category,
-- Price validation and normalization
CASE
WHEN price < 0 THEN 0
ELSE price
END AS price,
brand,
-- Stock quantity validation
CASE
WHEN stock_quantity >= 0 THEN stock_quantity
ELSE 0
END AS stock_quantity,
-- Rating validation
CASE
WHEN rating BETWEEN 0 AND 5 THEN rating
ELSE 0
END AS rating,
is_active,
CURRENT_TIMESTAMP() AS last_updated_timestamp
FROM product_changes_stream
) AS source
ON target.product_id = source.product_id
WHEN MATCHED THEN
UPDATE SET
name = source.name,
category = source.category,
price = source.price,
brand = source.brand,
stock_quantity = source.stock_quantity,
rating = source.rating,
is_active = source.is_active,
last_updated_timestamp = source.last_updated_timestamp
WHEN NOT MATCHED THEN
INSERT (product_id, name, category, price, brand, stock_quantity, rating, is_active, last_updated_timestamp)
VALUES (source.product_id, source.name, source.category, source.price, source.brand, source.stock_quantity, source.rating, source.is_active, source.last_updated_timestamp);
Develop the Gold Layer

Develop Dynamic Tables

Develop dynamic tables for business analysis on the data.
Create Dynamic Table: DynamicTable_ProductAnalysis
SELECT
p.CATEGORY,
c.GENDER,
SUM(o.TOTAL_AMOUNT) AS TOTAL_SALES,
AVG(p.RATING) AS AVG_RATING
FROM SILVER_ORDERS AS o
JOIN SILVER_PRODUCT AS p
ON o.product_id = p.product_id
JOIN SILVER_CUSTOMER AS c
ON o.customer_id = c.customer_id
GROUP BY
P.CATEGORY,
C.GENDER
ORDER BY
c.GENDER,
TOTAL_SALES DESC;
Create Dynamic Table: DynamicTable_RegionAnalysis
SELECT
CASE
WHEN c.COUNTRY IN ('USA', 'Canada') THEN 'NA'
WHEN c.COUNTRY IN ('Brazil') THEN 'SA'
WHEN c.COUNTRY IN ('Australia') THEN 'AUS'
WHEN c.COUNTRY IN ('Germany', 'UK', 'France') THEN 'EU'
WHEN c.COUNTRY IN ('China', 'India', 'Japan') THEN 'ASIA'
ELSE 'UNKNOWN'
END AS REGION,
o.STORE_TYPE,
SUM(o.TOTAL_AMOUNT) AS TOTAL_SALES,
AVG(o.TOTAL_AMOUNT) AS AVG_SALE,
AVG(o.QUANTITY) AS AVG_QUANTITY
FROM SILVER_ORDERS AS o
JOIN SILVER_PRODUCT AS p
ON o.product_id = p.product_id
JOIN SILVER_CUSTOMER AS c
ON o.customer_id = c.customer_id
GROUP BY
REGION,
o.STORE_TYPE
ORDER BY
TOTAL_SALES DESC,
AVG_SALE DESC,
AVG_QUANTITY DESC;
Follow the steps below to schedule the three Silver layer data transformation tasks on a one-minute interval.
Set scheduling parameters:

Then submit:

Ensure you repeat the above steps for all three data transformation tasks, configuring and starting the schedule for each.
You can set the scheduling interval to 1 minute, which ensures data appears in Silver layer tables within about 1 minute.
Enable Auto-Refresh for Gold Layer Dynamic Tables
Follow the steps below to start the dynamic tables.

Set the running cluster to "Three_Layer_DWH_VC", the refresh mode to "Auto Refresh", and the refresh interval to "1 Minute". Then submit.
Verify Object Creation Results
Navigate to Development -> Tasks, create a new SQL task "09_Test_Verification"

SHOW tables;
SHOW volumes;
SHOW pipes;
SHOW table streams;
SHOW tables result:

SHOW volumes result:

SHOW pipes result:

SHOW table streams result:

View detailed information for each object using the following commands:
DESC TABLE EXTENDED raw_customer;
DESC VOLUME volume_for_raw_customer;
DESC PIPE pipe_for_customer;
DESC TABLE STREAM customer_changes_stream;
DESC TABLE EXTENDED dt_productanalysis;
Generate Test Data and PUT to Data Lake
#pip install faker
from faker import Faker
import csv
import uuid
import random
from decimal import Decimal
from datetime import datetime
from clickzetta.zettapark.session import Session
import json
fake = Faker()
file_path = f'FakeDataset'
# Function to create CSV files, generating corresponding content for different tables
def create_csv_file(file_path, table_name, record_count):
with open(file_path, 'w', newline='') as csvfile:
if table_name == "raw_customer":
fieldnames = ["customer_id", "name", "email", "country", "customer_type",
"registration_date", "age", "gender", "total_purchases", "ingestion_timestamp"]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for i in range(1, record_count + 1):
writer.writerow(
{
"customer_id": i,
"name": fake.name(),
"email": fake.email(),
"country": fake.country(),
"customer_type": fake.random_element(elements=("Regular", "Premium", "VIP")),
"registration_date": fake.date(),
"age": fake.random_int(min=18, max=120),
"gender": fake.random_element(elements=("Male", "Female", "Other")),
"total_purchases": fake.random_int(min=0, max=1000),
"ingestion_timestamp": fake.date_time_this_year().isoformat()
}
)
elif table_name == "raw_product":
fieldnames = ["product_id", "name", "category", "brand", "price",
"stock_quantity", "rating", "is_active", "ingestion_timestamp"]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for i in range(1, record_count + 1):
writer.writerow(
{
"product_id": i,
"name": fake.word(),
"category": fake.word(),
"brand": fake.company(),
"price": round(fake.random_number(digits=5, fix_len=False), 2),
"stock_quantity": fake.random_int(min=0, max=1000),
"rating": round(fake.random_number(digits=2, fix_len=True) / 10, 1),
"is_active": fake.boolean(),
"ingestion_timestamp": fake.date_time_this_year().isoformat()
}
)
elif table_name == "raw_order":
fieldnames = ["customer_id", "payment_method", "product_id", "quantity",
"store_type", "total_amount", "transaction_date",
"transaction_id", "ingestion_timestamp"]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for _ in range(record_count):
writer.writerow(
{
"customer_id": fake.random_int(min=1, max=100),
"payment_method": fake.random_element(elements=("Credit Card", "PayPal", "Bank Transfer")),
"product_id": fake.random_int(min=1, max=100),
"quantity": fake.random_int(min=1, max=10),
"store_type": fake.random_element(elements=("Online", "Physical")),
"total_amount": round(fake.random_number(digits=5, fix_len=False), 2),
"transaction_date": fake.date(),
"transaction_id": str(uuid.uuid4()),
"ingestion_timestamp": fake.date_time_this_year().isoformat()
}
)
def put_file_into_volume(filename,volumename):
# Read parameters from config file
with open('security/config-uat-3layer-dwh.json', 'r') as config_file:
config = json.load(config_file)
# Create session
session = Session.builder.configs(config).create()
session.file.put(filename,f"volume://{volumename}/")
session.sql(f"show volume directory {volumename}").show()
session.close()
# First run:
current_time = datetime.now().strftime("%Y%m%d%H%M%S")
print(current_time)
if __name__ == '__main__':
# Example invocation
create_csv_file(f"{file_path}/customer/raw_customer_{current_time}.csv", "raw_customer", 100)
put_file_into_volume(f"{file_path}/customer/raw_customer_{current_time}.csv","VOLUME_FOR_RAW_CUSTOMER")
create_csv_file(f"{file_path}/product/raw_product_{current_time}.csv", "raw_product", 100)
put_file_into_volume(f"{file_path}/product/raw_product_{current_time}.csv","VOLUME_FOR_RAW_PRODUCT")
create_csv_file(f"{file_path}/order/raw_order_{current_time}.csv", "raw_order", 10000)
put_file_into_volume(f"{file_path}/order/raw_order_{current_time}.csv","VOLUME_FOR_RAW_ORDER")
# Second run: only generate order data
current_time = datetime.now().strftime("%Y%m%d%H%M%S")
print(current_time)
if __name__ == '__main__':
create_csv_file(f"{file_path}/order/raw_order_{current_time}.csv", "raw_order", 100000)
put_file_into_volume(f"{file_path}/order/raw_order_{current_time}.csv","VOLUME_FOR_RAW_ORDER")
The first run generates data for products, customers, and orders. The second and subsequent runs only need to generate new order data. You can execute multiple times to upload multiple order files.
Note: Each time files PUT to the Volume are consumed by the Pipe, they are automatically deleted.
References
Connection
External Volume
Pipe
Table Stream
Merge Into
Dynamic Table