This is a very basic data engineering example demonstrating how to perform fundamental DataFrame operations such as reading, grouping, and writing data using Singdata Zettapark Python code. This example uses the free sample database built into Singdata Lakehouse (clickzetta_sample_data.tpch_100g) as the data source.
The steps are as follows:
1. Connect to Singdata Lakehouse via Zettapark
2. Join 2 large tables via SupplierKey (LINEITEMS with 600 million rows & SUPPLIER with 1 million rows)
3. Demonstrate on-demand scaling by adjusting the virtual compute cluster to different sizes
4. Compare the execution time of the same task on different virtual compute cluster sizes
Summarize data by supplier and part number to calculate sum, min, and max (35 million rows)
Write the resulting DataFrame to a Singdata Lakehouse physical table (80 million rows)
The entire operation -- from adjusting compute resources, reading data, joining, to summarizing -- takes approximately 30 seconds, demonstrating the powerful capabilities, instant scalability, and performance of Singdata Lakehouse.
Installing Singdata Zettapark
# !pip install clickzetta-zettapark-python
Connecting to Singdata Lakehouse via Zettapark (Without PySpark)
import time
from clickzetta.zettapark.session import Session
import clickzetta.zettapark.functions as f
from clickzetta.zettapark import Session, DataFrame
from clickzetta.zettapark.functions import udf, col
from clickzetta.zettapark.types import IntegerType
from clickzetta.zettapark.functions import call_udf
<----- Make these changes before running the notebook --------------------
Change Connection params to match your environment
<----------------------------------------------------------------------------
import json
from clickzetta.zettapark.session import Session
# 1- Create a session to connect to Singdata Lakehouse
# Read parameters from configuration file
with open('config.json', 'r') as config_file:
config = json.load(config_file)
print("Connecting to Singdata Lakehouse.....\n")
# Create session
session = Session.builder.configs(config).create()
print("Connected successfully!...\n")
Connecting to Singdata Lakehouse.....
Connected successfully!...
sql_cmd = f"CREATE VCLUSTER IF NOT EXISTS {VCLUSTER_Name} VCLUSTER_SIZE = {VCLUSTER_Size} AUTO_SUSPEND_IN_SECOND = 10 "
print("XSMALL VCLUSTER ready\n")
session.sql(sql_cmd).collect()
session.use_schema(Schema_Name)
XSMALL VCLUSTER ready
⚠️ Note: The vcluster_size parameter for compute clusters supports both T-shirt sizes (XSMALL, SMALL, Large, etc.) and numeric values (1, 2, 4, 16, etc.) to provide a richer range of compute cluster specifications for different scenarios. For more information, see: VCluster Size Specification Change Description
{
"username": "Replace with your username",
"password": "Replace with your password",
"service": "Replace with your service address",
"instance": "Replace with your instance ID",
"workspace": "Replace with your workspace",
"schema": "Replace with your schema",
"vcluster": "Replace with your virtual cluster",
"sdk_job_timeout": 60,
"hints": {
"sdk.job.timeout": 60,
"query_tag": "test_conn_hints_zettapark"
}
}
Starting the Data Engineering Process
from clickzetta.zettapark.functions import col, sum, min, max
print("Joining, Aggregating with 2 large tables(600M & 1M rows) & Writing results to new table(80M rows) ..\n")
# 2- define table
dfLineItems = session.table("clickzetta_sample_data.tpch_100g.LINEITEM") # 600 Million Rows
dfSuppliers = session.table("clickzetta_sample_data.tpch_100g.SUPPLIER") # 100K Rows
print('Lineitems Table: %s rows' % dfLineItems.count())
print('Suppliers Table: %s rows' % dfSuppliers.count())
# 3 - JOIN TABLES
dfJoinTables = dfLineItems.join(dfSuppliers, dfLineItems["L_SUPPKEY"] == dfSuppliers["S_SUPPKEY"])
# 4 - SUMMARIZE THE DATA BY SUPPLIER, PART, SUM, MIN & MAX
dfSummary = dfJoinTables.groupBy("S_NAME", "L_PARTKEY").agg(
sum(col("L_QUANTITY")).alias("TOTAL_QTY"),
min(col("L_QUANTITY")).alias("MIN_QTY"),
max(col("L_QUANTITY")).alias("MAX_QTY")
)
dfSummary.show()
3. The Same Computation Task Takes Different Time with Different Compute Resources (Virtual Clusters), Demonstrating Elastic Scaling
start_time = time.time()
# 4 - Resize the virtual compute cluster to XSMALL
print(f"Resizing to {VCLUSTER_Size} ..")
sql_cmd = f"ALTER VCLUSTER {VCLUSTER_Name} SET VCLUSTER_SIZE = '{VCLUSTER_Size}' "
session.sql(sql_cmd).collect()
print("Done!...\n\n")
# 5 - Write results to a new table (80 million rows)
# <-- This is when all previous operations compile and execute as a single job
print("Creating target SALES_SUMMARY table...\n\n")
dfSummary.write.mode("overwrite").saveAsTable("SALES_SUMMARY")
print("Target table created!...")
# 6 - Query results (80 million rows)
print("Querying results...\n")
dfSales = session.table("SALES_SUMMARY")
dfSales.show()
end_time = time.time()
print("--- Joining, summarizing, and writing results to new table took %s seconds --- \n" % int(end_time - start_time))
print("--- Wrote %s rows to SALES_SUMMARY table" % dfSales.count())
# 7 - Scale the virtual compute cluster down to XSMALL
print("Scaling VCLUSTER down to XS...\n")
sql_cmd = "ALTER VCLUSTER {} SET VCLUSTER_SIZE = 'XSMALL'".format(VCLUSTER_Name)
session.sql(sql_cmd).collect()
print("Done!...\n")
start_time = time.time()
# 4 - Increase the virtual compute cluster size to MEDIUM
print(f"Resizing {VCLUSTER_Size} to {VCLUSTER_ReSize} ..")
sql_cmd = f"ALTER VCLUSTER {VCLUSTER_Name} SET VCLUSTER_SIZE = '{VCLUSTER_ReSize}'"
session.sql(sql_cmd).collect()
print("Done!...\n\n")
# 5 - Write results to a new table (80 million rows)
# <-- This is when all previous operations compile and execute as a single job
print("Creating target SALES_SUMMARY table...\n\n")
dfSummary.write.mode("overwrite").saveAsTable("SALES_SUMMARY")
print("Target table created!...")
# 6 - Query results (80 million rows)
print("Querying results...\n")
dfSales = session.table("SALES_SUMMARY")
dfSales.show()
end_time = time.time()
print("--- Joining, summarizing, and writing results to new table took %s seconds --- \n" % int(end_time - start_time))
print("--- Wrote %s rows to SALES_SUMMARY table" % dfSales.count())
# 7 - Scale the virtual compute cluster down to XSMALL
print("Scaling VCLUSTER down to XSMALL...\n")
sql_cmd = f"ALTER VCLUSTER {VCLUSTER_Name} SET VCLUSTER_SIZE = {VCLUSTER_Size}"
session.sql(sql_cmd).collect()
print("Done!...\n")
Quick Migration: Code is essentially the same as Spark/PySpark, with no need to learn a new language.
Cheaper: Computation is fully serverless, allowing sub-second scaling (up/down), and only running (incurring costs) when in use.
Sub-Second Instant Scaling: For the same computation task, the XSMALL virtual compute cluster (VCluster) takes about 75 seconds, while the MEDIUM size takes only about 20 seconds.
Faster: Eliminates all unnecessary data movement, resulting in shorter computation times and lower costs.
Easier to Use: Means less human effort, as both compute and storage require almost no maintenance.