A Comprehensive Guide to Importing Data into Singdata Lakehouse
Data Ingestion: Batch and Real-time Data Loading via JavaSDK
Overview
Singdata Lakehouse provides a JAVA SDK that allows data to be loaded into Singdata Lakehouse tables through JAVA and SQL programming in popular IDEs (such as VS Code).
Use Cases
This method facilitates batch data loading and is suitable for large-scale data ingestion and real-time uploads in a Java programming environment, as Singdata Lakehouse has been optimized for writing large volumes of data.
Implementation Steps
Download Code
Download the code for this guide from the Github repository to your local machine (ignore if already downloaded).
Add the directory to VS Code.

Modify Parameters
Rename the config/config-ingest-sample.json file to config/config-ingest.json, and modify the parameter values in config-ingest.json.
{ "username": "Please enter your username", "password": "Please enter your password", "service": "Please enter your service address, e.g., api.clickzetta.com", "instance": "Please enter your instance ID", "workspace": "Please enter your workspace, e.g., gharchive", "schema": "Please enter your schema, e.g., public", "vcluster": "Please enter your virtual cluster, e.g., default_ap", "sdk_job_timeout": 10, "hints": { "sdk.job.timeout": 3, "query_tag": "a_comprehensive_guide_to_ingesting_data_into_clickzetta" } }
Batch Loading
Run BulkLoadFile.java in VS Code:

import com.clickzetta.client.BulkloadStream; import com.clickzetta.client.ClickZettaClient; import com.clickzetta.client.RowStream; import com.clickzetta.client.StreamState; import com.clickzetta.platform.client.api.Row; import org.json.JSONObject; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.Statement; import java.text.MessageFormat; import java.nio.file.Files; import java.nio.file.Paths; import org.apache.log4j.PropertyConfigurator; public class BulkloadFile { private static ClickZettaClient client; private static String service; private static String instance; private static String password; private static String table = "lift\_tuckets\_import\_by\_java\_sdk\_bulkload"; private static String workspace; private static String schema; private static String vc; private static String user; static BulkloadStream bulkloadStream; public static void main(String\[] args) throws Exception { // Load log4j configuration file PropertyConfigurator.configure("config/log4j.properties"); // Read configuration file String content = new String(Files.readAllBytes(Paths.get("config/config-ingest.json"))); JSONObject config = new JSONObject(content); // Get values from JSON configuration file service = config.getString("service"); instance = config.getString("instance"); password = config.getString("password"); workspace = config.getString("workspace"); schema = config.getString("schema"); vc = config.getString("vcluster"); user = config.getString("username"); // Initialize initialize(); // Count the number of lines in the file int fileLineCount = countFileLines("data/lift\_tickets\_data.csv"); System.out.println("Total lines in file: " + fileLineCount); // Create table createTable(); // If the target table exists, delete the data in the table deleteTableData(); // Insert data insertData(); // Check the number of rows in the table int tableRowCount = countTableRows(); System.out.println("Total rows in table: " + tableRowCount); // Compare the number of lines in the file with the number of rows in the table if (fileLineCount == tableRowCount) { System.out.println("Data inserted successfully! The row count matches."); } else { System.out.println("Data insertion failed! The row count does not match."); } // Close client client.close(); } private static void initialize() throws Exception { String url = MessageFormat.format("jdbc\:clickzetta://{1}.{0}/{2}?" + "schema={3}\&username={4}\&password={5}\&virtualcluster={6}&", service, instance, workspace, schema, user, password, vc); client = ClickZettaClient.newBuilder().url(url).build(); } private static int countFileLines(String filePath) throws Exception { try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) { int lines = 0; while (reader.readLine() != null) lines++; return lines - 1; // Subtract the header line } } private static void createTable() throws Exception { String url = MessageFormat.format("jdbc\:clickzetta://{1}.{0}/{2}?" + "schema={3}\&username={4}\&password={5}\&virtualcluster={6}&", service, instance, workspace, schema, user, password, vc); String createTableSQL = "CREATE TABLE if not exists " + table + " (" + "\`txid\` string," + "\`rfid\` string," + "\`resort\` string," + "\`purchase\_time\` string," + "\`expiration\_time\` string," + "\`days\` int," + "\`name\` string," + "\`address\_street\` string," + "\`address\_city\` string," + "\`address\_state\` string," + "\`address\_postalcode\` string," + "\`phone\` string," + "\`email\` string," + "\`emergency\_contact\_name\` string," + "\`emergency\_contact\_phone\` string);"; try (Connection conn = DriverManager.getConnection(url, user, password); PreparedStatement pstmt = conn.prepareStatement(createTableSQL)) { pstmt.executeUpdate(); System.out.println("Table created successfully."); } catch (Exception e) { // Ignore the error and continue System.out.println("Ignoring exception: " + e.getMessage()); } } private static void deleteTableData() throws Exception { String url = MessageFormat.format("jdbc\:clickzetta://{1}.{0}/{2}?" + "schema={3}\&username={4}\&password={5}\&virtualcluster={6}&", service, instance, workspace, schema, user, password, vc); try (Connection conn = DriverManager.getConnection(url, user, password); PreparedStatement pstmt = conn.prepareStatement("DELETE FROM " + schema + "." + table)) { pstmt.executeUpdate(); System.out.println("Data deleted successfully from table: " + table); } } private static void insertData() throws Exception { bulkloadStream = client.newBulkloadStreamBuilder().schema(schema).table(table) .operate(RowStream.BulkLoadOperate.APPEND) .build(); File csvFile = new File("data/lift\_tickets\_data.csv"); BufferedReader reader = new BufferedReader(new FileReader(csvFile)); // Skip the header line reader.readLine(); // Skip the first line (header) // Insert data into the database String line; while ((line = reader.readLine()) != null) { String\[] values = line.split(","); // Type conversion to keep consistent with the server-side type String id = values\[0]; // ID is of string type String contentValue = values\[1]; Row row = bulkloadStream.createRow(); // Set parameter values row\.setValue(0, id); row\.setValue(1, contentValue); // Must call this method, otherwise data cannot be sent to the server bulkloadStream.apply(row); } // Close resources reader.close(); bulkloadStream.close(); waitForBulkloadCompletion(); } private static int countTableRows() throws Exception { String url = MessageFormat.format("jdbc\:clickzetta://{1}.{0}/{2}?" + "schema={3}\&username={4}\&password={5}\&virtualcluster={6}&", service, instance, workspace, schema, user, password, vc); try (Connection conn = DriverManager.getConnection(url, user, password); Statement stmt = conn.createStatement()) { String countSQL = "SELECT COUNT(\*) FROM " + schema + "." + table; try (ResultSet rs = stmt.executeQuery(countSQL)) { if (rs.next()) { return rs.getInt(1); } else { throw new Exception("Failed to count table rows."); } } } } private static void waitForBulkloadCompletion() throws InterruptedException { while (bulkloadStream.getState() == StreamState.RUNNING) { Thread.sleep(1000); } if (bulkloadStream.getState() == StreamState.FAILED) { throw new ArithmeticException(bulkloadStream.getErrorMessage()); } } }
View the running results:
Real-time Loading
Run StreamingInsert.java in VS Code:
import com.clickzetta.client.ClickZettaClient; import com.clickzetta.client.RealtimeStream; import com.clickzetta.client.RowStream; import com.clickzetta.platform.client.api.Options; import com.clickzetta.platform.client.api.Row; import com.clickzetta.platform.client.api.Stream; import com.github.javafaker.Faker; import org.json.JSONObject; import org.apache.log4j.PropertyConfigurator; import java.nio.file.Files; import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.Statement; import java.text.MessageFormat; import java.util.Date; import java.util.Random; import java.util.UUID; import java.io.IOException; import java.util.Locale; public class StreamingInsert { private static ClickZettaClient client; private static String service; private static String instance; private static String password; private static String table = "lift\_tuckets\_import\_by\_java\_sdk\_realtime\_ingest"; private static String workspace; private static String schema; private static String vc; private static String user; static RealtimeStream realtimeStream; public static void main(String\[] args) throws Exception { // Load log4j configuration file PropertyConfigurator.configure("config/log4j.properties"); // Read configuration file String content = new String(Files.readAllBytes(Paths.get("config/config-ingest.json"))); JSONObject config = new JSONObject(content); // Get values from JSON configuration file service = config.getString("service"); instance = config.getString("instance"); password = config.getString("password"); workspace = config.getString("workspace"); schema = config.getString("schema"); vc = config.getString("vcluster"); user = config.getString("username"); // Initialize String url = MessageFormat.format("jdbc\:clickzetta://{1}.{0}/{2}?" + "schema={3}\&username={4}\&password={5}\&virtualcluster={6}&", service, instance, workspace, schema, user, password, vc); Options options = Options.builder().build(); client = ClickZettaClient.newBuilder().url(url).build(); // Check and create target table checkAndCreateTable(url); realtimeStream = client.newRealtimeStreamBuilder() .operate(RowStream.RealTimeOperate.CDC) .options(options) .schema(schema) .table(table) .build(); Faker faker = new Faker(new Locale("zh", "CN")); String\[] resorts = {"Resort 1", "Resort 2", "Resort 3"}; Random random = new Random(); int duration = 200; int maxRetries = 3; // Record start time long startTime = System.currentTimeMillis(); System.out.println("Start time: " + new Date(startTime)); int totalRecords = 0; while (duration > 0) { for (int t = 1; t < 11; t++) { Row row = realtimeStream.createRow(Stream.Operator.UPSERT); row\.setValue("txid", UUID.randomUUID().toString()); row\.setValue("rfid", Long.toHexString(random.nextLong() & ((1L << 96) - 1))); row\.setValue("resort", faker.options().option(resorts)); row\.setValue("purchase\_time", new Date().toString()); row\.setValue("expiration\_time", new Date(System.currentTimeMillis() + 86400000L).toString()); row\.setValue("days", faker.number().numberBetween(1, 7)); row\.setValue("name", faker.name().fullName()); row\.setValue("address\_street", faker.address().streetAddress()); row\.setValue("address\_city", faker.address().city()); row\.setValue("address\_state", faker.address().state()); row\.setValue("address\_postalcode", faker.address().zipCode()); row\.setValue("phone", faker.phoneNumber().phoneNumber()); row\.setValue("email", faker.internet().emailAddress()); row\.setValue("emergency\_contact\_name", faker.name().fullName()); row\.setValue("emergency\_contact\_phone", faker.phoneNumber().phoneNumber()); int attempts = 0; boolean success = false; while (attempts < maxRetries && !success) { try { realtimeStream.apply(row); success = true; } catch (IOException e) { attempts++; System.err.println("Attempt " + attempts + " failed: " + e.getMessage()); if (attempts == maxRetries) { throw e; } Thread.sleep(1000); // Wait 1 second before retrying } } totalRecords++; } Thread.sleep(200); duration = duration - 1; } realtimeStream.close(); client.close(); // Record end time long endTime = System.currentTimeMillis(); System.out.println("End time: " + new Date(endTime)); // Calculate average records inserted per second double elapsedTimeInSeconds = (endTime - startTime) / 1000.0; double recordsPerSecond = totalRecords / elapsedTimeInSeconds; System.out.println("Total records inserted: " + totalRecords); System.out.println("Elapsed time (seconds): " + elapsedTimeInSeconds); System.out.println("Average records per second: " + recordsPerSecond); } private static void checkAndCreateTable(String url) throws Exception { String checkTableSQL = "SELECT 1 FROM " + schema + "." + table + " LIMIT 1"; String createTableSQL = "CREATE TABLE if not exists " + table + " (" + "\`txid\` string PRIMARY KEY," + "\`rfid\` string," + "\`resort\` string," + "\`purchase\_time\` string," + "\`expiration\_time\` string," + "\`days\` int," + "\`name\` string," + "\`address\_street\` string," + "\`address\_city\` string," + "\`address\_state\` string," + "\`address\_postalcode\` string," + "\`phone\` string," + "\`email\` string," + "\`emergency\_contact\_name\` string," + "\`emergency\_contact\_phone\` string);"; try (Connection conn = DriverManager.getConnection(url, user, password); Statement stmt = conn.createStatement()) { try (ResultSet rs = stmt.executeQuery(checkTableSQL)) { // If the table exists, do nothing } catch (Exception e) { // If the table does not exist, create the table try (PreparedStatement pstmt = conn.prepareStatement(createTableSQL)) { pstmt.executeUpdate(); System.out.println("Table created successfully."); } } } } }
查看运行结果:
Next Steps
View the imported data through Studio Data Management. Clean and transform the imported data. Explore and analyze the imported data through Data GPT.
Resources
Java SDK Introduction