Using Python for Bulk Data Upload (BulkLoad)
Clickzetta Lakehouse provides a Python API for bulk data upload (Bulkload) through the clickzetta-connector
package. This API allows data to be sent directly from the client to the storage system without consuming computational resources during the transfer process. The data becomes visible after an explicit commit (the commit process consumes a small amount of computational resources). It is suitable for scenarios with high throughput and relatively loose requirements for data freshness.
With the Bulkload-related API, both single-threaded and distributed data upload scenarios can be achieved.
Installation
Install clickzetta-ingestion-python
cloud environment packages as needed.
Type | Command | Comment |
---|---|---|
all | pip install "clickzetta-ingestion-python[all]" | Install all cloud environment packages |
s3 | pip install "clickzetta-ingestion-python[s3]" | Install Amazon cloud environment package |
amazon | pip install "clickzetta-ingestion-python[amazon]" | Install Amazon cloud environment package |
aws | pip install "clickzetta-ingestion-python[aws]" | Install Amazon cloud environment package |
oss | pip install "clickzetta-ingestion-python[oss]" | Install Alibaba Cloud environment package |
aliyun | pip install "clickzetta-ingestion-python[aliyun]" | Install Alibaba Cloud environment package |
cos | pip install "clickzetta-ingestion-python[cos]" | Install Tencent Cloud environment package |
tencent | pip install "clickzetta-ingestion-python[tencent]" | Install Tencent Cloud environment package |
gcp | pip install "clickzetta-ingestion-python[gcp]" | Install Google Cloud environment package |
pip install "clickzetta-ingestion-python[google]" | Install Google Cloud environment package |
Batch Import Principle
The batch upload SDK provides an efficient data import mechanism suitable for Singdata Lakehouse. Below is a simplified description and flowchart of its working principle:
- Data Upload: Through the SDK, your data is first uploaded to the object storage service. The performance of this step is affected by local network speed and the number of concurrent connections.
- Trigger Import: After the data upload is complete, when you call the
bulkloadStream.commit()
method, the SDK automatically triggers an SQL command to import the data from the object storage into the Lakehouse table. It is not recommended to frequently callbulkloadStream.commit()
in one task;bulkloadStream.commit()
should only be called once at the end. - Computing Resources: It is recommended to choose General Purpose Virtual Cluster for uploading data. General-purpose computing resources are more suitable for running batch jobs and loading data jobs. The speed of importing data from object storage to the Lakehouse table depends on the size of the computing resources you configure.
- Shard Upload Optimization: When processing compressed data larger than 1GB, it is recommended to assign a unique shard ID to each concurrent thread or process in the
createRow
method. This approach can fully utilize the parallel processing advantages of multithreading or multiprocessing, significantly improving data import efficiency. The best practice is to determine the number of shard IDs based on the number of concurrents, ensuring each concurrent corresponds to an independent shard ID. If multiple concurrents are assigned the same shard ID, the final written data may be overwritten, resulting in the loss of previous data. To ensure that all shard data is correctly imported into the table, call thebulkloadStream.commit()
method to submit the entire import task after all concurrent operations are completed.
Below is the flowchart of the batch import principle:
Single-threaded Write
Assuming the target table for uploading data is public.bulkload_test
, the DDL is as follows:
Complete sample code for single-threaded mode:
API Step-by-Step Explanation
- Create a
connection
object, replacing the parameters according to your actual situation:
Parameter | Required | Description |
---|---|---|
username | Y | Username |
password | Y | Password |
service | Y | Address to connect to the lakehouse, region.api.singdata.com. You can see the JDBC connection string in Lakehouse Studio management -> workspace![]() |
instance | Y | Can be seen in Lakehouse Studio management -> workspace to view the jdbc connection string ![]() |
workspace | Y | Workspace in use |
vcluster | Y | VC in use |
schema | Y | Name of the schema being accessed |
- Create a
BulkLoad Stream
object, specifying the target table for upload, upload method, etc.: Required Parameterstable
Table name Optional Parametersschema
, if not specified, theschema
specified byconnection
will be usedoperation
BulkLoadOperation.APPEND
: Incremental mode (all written data is treated as new data, without affecting old data) *BulkLoadOperation.OVERWRITE
: Overwrite mode (clears old table data and writes new data into the table)
partition_spec
- If there are no partition columns, ignore this parameter. If there are partition columns, fill in according to the corresponding format, for example:
'pt=xx,item=xx'
, separated by commas. Note that if partition column values are specified, regardless of the values of the partition columns in the data source, the values of the partition columns in the table being written to will be the values defined by this parameter.
- Create
writer
and write data: Eachbulkload stream
can create multiplewriters
, and differentwriters
need to be identified with different ids. Using multiplewriters
can achieve multi-threaded concurrent writing or distributed concurrent writing in a single commit.
- Writing Data:
The data written by the writer will directly form the corresponding parquet files in the storage system. The writer will automatically split the files based on the amount of data written. When the writer finishes writing data, you need to explicitly call writer.close() to ensure data integrity.
- Submit stream. Before committing, ensure that all writers have completed writing and closed. After a successful commit, the data will be visible in the table.
Distributed Mode Writing
The bulkload stream is identified by a stream id (uuid). You can obtain the stream object created by other processes through the connection's get_bulkload_stream method and create a writer to achieve distributed writing.
Example code