Using Python to Upload Data in Batch (BulkLoadV1)
Singdata Lakehouse provides APIs for batch data upload (Bulkload) using Python through the
clickzetta-connector package. This API allows data to be sent directly from the client to the storage system. The transmission process consumes no compute resources, and data becomes visible after an explicit commit (the commit process consumes a small amount of compute resources). It is suitable for high-throughput scenarios with relatively relaxed data freshness requirements.
Through Bulkload-related APIs, single-threaded data upload can be achieved.
Installation
If you have an older version of the SDK installed, uninstall it first to avoid conflicts:
Install the latest version (requires Python >= 3.7):
Bulk Import Principles
The Bulkload SDK provides an efficient data import mechanism suitable for Singdata Lakehouse. The following is a simplified description and flow diagram of how it works:
- Data Upload: Through the SDK, your data is first uploaded to the object storage service. The performance of this step is affected by local network speed and the number of concurrent connections.
- Trigger Import: After the data upload is complete, when you call the
method, the SDK automatically triggers an SQL command to import the data from object storage into the Lakehouse table. It is not recommended to callbulkloadStream.commit()
frequently within a single task; this method should ultimately only be called once.bulkloadStream.commit() - Compute Resources: For uploading data, it is recommended to use a General Purpose Virtual Cluster. General purpose compute resources are better suited for running batch jobs and data loading jobs. The speed of importing data from object storage to the Lakehouse table depends on the size of your configured compute resources.
- Shard Upload Optimization: When processing compressed data larger than 1GB, it is recommended to assign a unique shard ID to each concurrent thread or process in the
method. This approach can fully leverage the parallel processing advantages of multi-threading or multi-processing, significantly improving data import efficiency. The best practice is to determine the number of shard IDs based on the number of concurrent workers, ensuring each concurrent worker corresponds to an independent shard ID. If multiple concurrent workers are assigned the same shard ID, the final written data may be overwritten, causing previously written data to be lost. To ensure that data from all shards is correctly imported into the table, call thecreateRow
method after all concurrent operations are complete to commit the entire import task.bulkloadStream.commit()
The following is a flow diagram of the bulk import principle:
Single-threaded Writing
Assume the target table for uploading data is
public.bulkload_test, with the following DDL:
Complete sample code for single-threaded mode:
API Step-by-Step Explanation
- Create the
object by replacing the parameters according to your actual situation:connection
| Parameter | Required | Description |
|---|---|---|
| username | Y | Username |
| password | Y | Password |
| service | Y | The address to connect to Lakehouse, e.g., <region_id>.api.singdata.com. You can find the JDBC connection string in Lakehouse Studio under Management -> Workspace![]() |
| instance | Y | You can find the JDBC connection string in Lakehouse Studio under Management -> Workspace![]() |
| workspace | Y | Workspace to use |
| vcluster | Y | VC to use |
| schema | Y | Schema name to access |
- Create the
object, specifying the target table for upload and the upload method: Required ParametersBulkLoad Stream
: Table name Optional Parameterstable
: If not specified, uses theschema
specified in theschema
objectconnectionoperation
: Append mode (all written data is treated as new data, with no impact on existing data)BulkLoadOperation.APPEND
: Overwrite mode (clears existing table data and writes new data to the table)BulkLoadOperation.OVERWRITE
-
: Used to specify partition information for the target table, controlling the partition behavior for data writes.partition_spec- Non-partitioned table: Ignore this parameter or set it to empty.
- Partitioned table:
- Static partition write: Writes all data to a designated fixed partition. Regardless of the actual values in the partition column of the source data, the
value is used when writing to the target table, and all data is written to the same specified partition. The parameter format is 'partition_col1=value1,partition_col2=value2'.partition_spec - Dynamic partition write: Automatically writes to the corresponding partition based on the actual values of the partition column in the data. By ignoring this parameter, the system automatically creates or writes to the appropriate partition based on the values of the partition column in the data.
- Static partition write: Writes all data to a designated fixed partition. Regardless of the actual values in the partition column of the source data, the
- Create a
and write data: Eachwriter
can create multiplebulkload stream
s, and differentwriter
s must be identified by different IDs. Using multiplewriter
s enables multi-threaded concurrent writing within a single commit.writer - Write data:
Data written through the writer will directly form corresponding parquet files in the storage system. The writer will automatically split files based on the amount of data written. After the writer finishes writing data, you must explicitly call writer.close() to ensure data integrity. - Commit the stream. Before committing, ensure all writers have completed writing and are closed. After a successful commit, data becomes visible in the table.


