Batch Upload Data Using Java SDK
This document mainly introduces how to use the Java SDK's Bulkloadstream to batch load data into Lakehouse, suitable for one-time large data imports, supporting custom data sources, and providing flexibility in data import. This case uses reading local files as an example. If your data source is within the range supported by object storage or Lakehouse Studio data integration, it is recommended to use the COPY command or data integration.
Reference Documentation
Java SDK Bulk Data Upload
Application Scenarios
- Suitable for business scenarios that require bulk uploading of large amounts of data.
- Suitable for developers familiar with Java and needing to customize data import logic.
Usage Restrictions
- BulkloadStream does not support writing to primary key (pk) tables.
- Not suitable for frequent data upload scenarios with intervals of less than five minutes.
Usage Example
This case uses reading local CSV files as an example. The dataset used this time is the olist_order_items_dataset data from the Brazilian E-commerce public dataset. It is recommended to use the COPY command or data integration when the data source is within the range supported by object storage or Lakehouse Studio.
Prerequisites
- Create a table
-
create table bulk_order_items (
order_id STRING,
order_item_id INT,
product_id STRING,
seller_id STRING,
shipping_limit_date STRING,
price DOUBLE,
freight_value DOUBLE
);
- Have INSERT permission on the target table.
Developing with Java Code
Maven Dependency
Add the Lakehouse Maven dependency to the project's pom.xml
file. The latest Lakehouse Maven dependency can be found in the Maven repository.
<dependency>
<groupId>com.clickzetta</groupId>
<artifactId>clickzetta-java</artifactId>
<version>1.3.1</version>
</dependency>
Writing Java Code
- Initialize Lakehouse Client and BulkloadStream: Create the
BulkloadFile
class, initialize the Lakehouse connection and BulkloadStream object.
- Read Local CSV File and Write to Lakehouse: Use Java IO streams to read the local CSV file and write the data to Lakehouse line by line.
import com.clickzetta.client.BulkloadStream;
import com.clickzetta.client.ClickZettaClient;
import com.clickzetta.client.RowStream;
import com.clickzetta.client.StreamState;
import com.clickzetta.platform.client.api.Row;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.text.MessageFormat;
public class BulkloadFile {
private static ClickZettaClient client;
private static final String password = "";
private static final String table = "bulk_order_items";
private static final String workspace = "";
private static final String schema = "public";
private static final String vc = "default";
private static final String user = "";
static BulkloadStream bulkloadStream;
public static void main(String[] args) throws Exception {
initialize();
File csvFile = new File("olist_order_items_dataset.csv");
BufferedReader reader = new BufferedReader(new FileReader(csvFile));
// Skip the header row
reader.readLine(); // Skip the first line (header)
// Insert data into the database
String line;
while ((line = reader.readLine()) != null) {
String[] values = line.split(",");
// Ensure type conversion is consistent with the server-side type
String orderId = values[0];
int orderItemId = Integer.parseInt(values[1]); // Convert order_item_id to int
String productId = values[2];
String sellerId = values[3];
String shippingLimitDate = values[4];
double price = Double.parseDouble(values[5]);
double freightValue = Double.parseDouble(values[6]);
Row row = bulkloadStream.createRow();
// Set parameter values
row.setValue(0, orderId);
row.setValue(1, orderItemId);
row.setValue(2, productId);
row.setValue(3, sellerId);
row.setValue(4, shippingLimitDate);
row.setValue(5, price);
row.setValue(6, freightValue);
// This method must be called, otherwise data cannot be sent to the server
bulkloadStream.apply(row);
}
// Close resources
reader.close();
bulkloadStream.close();
waitForBulkloadCompletion();
client.close();
System.out.println("Data inserted successfully!");
}
private static void initialize() throws Exception {
String url = MessageFormat.format("jdbc:clickzetta://jnsxwfyr.uat-api.clickzetta.com/{0}?" +
"schema={1}&username={2}&password={3}&virtualcluster={4}&",
workspace, schema, user, password, vc);
client = ClickZettaClient.newBuilder().url(url).build();
bulkloadStream = client.newBulkloadStreamBuilder().schema(schema).table(table)
.operate(RowStream.BulkLoadOperate.APPEND)
.build();
}
private static void waitForBulkloadCompletion() throws InterruptedException {
while (bulkloadStream.getState() == StreamState.RUNNING) {
Thread.sleep(1000);
}
if (bulkloadStream.getState() == StreamState.FAILED) {
throw new ArithmeticException(bulkloadStream.getErrorMessage());
}
}
}