Primary Key

Primary Key (PRIMARY KEY) is used to ensure the uniqueness of each record in the table. In the Lakehouse architecture, when a table with a defined primary key is being written to in real-time, the system will automatically deduplicate data based on the primary key value. This is particularly important for Change Data Capture (CDC) scenarios. For example, you can synchronize MySQL database binlog logs to the Lakehouse in real-time to ensure data consistency. However, once a primary key is set, you will not be able to perform insert, delete, or update operations through SQL statements, nor can you add or remove columns. In this case, you need to handle data through the real-time data interface. During the CDC real-time writing process, the system will automatically deduplicate data based on the primary key to maintain data accuracy and integrity.

Primary Key Table Creation Syntax

  • Normal Table
CREATE TABLE pk_table
(
    id int,
    col string PRIMARY KEY (id)
);
CREATE TABLE pk_table
(
    id int PRIMARY KEY,
    col string
);
  • Bucket table. If the created table contains bucket columns, the specified cluster key must include the primary key column, and the sort key must also include the primary key.
CREATE TABLE pk_table
(
    id int,
    col string,
    cluster_key string,
    PRIMARY key (id)
) 
CLUSTERED BY (id, cluster_key) SORTED BY (id) INTO 16 BUCKETS;
  • Partition table. If the table contains partition columns, those columns must be in the primary key
CREATE TABLE pk_table
(
    id int,
    col string,
    pt string,
    PRIMARY key (id, pt)
) PARTITIONED BY (pt);

Usage Notes

  • If the created table contains bucket columns, the primary key column must be included when specifying the cluster key, and the sort key must also include the primary key.
  • If the created table has partition columns, the column must be included in the primary key.
  • Currently, only real-time interface writing is supported, SQL operations are not supported.

Usage Examples

Create Table

When creating a Lakehouse table, a primary key must be specified. CDC writes will deduplicate data based on the primary key to ensure data accuracy. The created primary key table does not support SQL operations and can only be written to through real-time streaming. The created primary key table also does not support adding or modifying columns using SQL. Below is an example of creating a table:

CREATE TABLE igs_test_upsert (
    id int PRIMARY KEY,
    event varchar(100),
    event_time STRING
);

SDK Real-time Write Stream

Create Real-time Write Stream

To create a real-time write stream using the SDK, you need to specify the operation type (CDC) and related options. Below is an example of creating a real-time write stream:

RowStream stream = client.newRealtimeStreamBuilder()
    .operate(RowStream.RealtimeOperate.CDC)
    .options(options)
    .schema(schema)
    .table(table)
    .build(); // Close the stream, release stream resources, must be called
stream.close();

Specify Operation Type

According to the requirements, different operation types can be specified:

  • Stream.Operator.UPSERT: Insert or update a row. If the target row does not exist, insert it; if it already exists, update it.
  • Stream.Operator.DELETE_IGNORE: Delete a row. If the target row does not exist, it is automatically ignored.

Use Native Java Sdk to Write

Row row = stream.createRow(Stream.Operator.UPSERT); // Insert or update row
Row rowToDelete = stream.createRow(Stream.Operator.DELETE_IGNORE); // Delete row

Using Lakehouse Real-time Sync Function to Write

Refer to the documentation Multi-table Real-time Sync

Flink connector is based on the RealtimeStream SDK, used for real-time data synchronization. See Flink Connector

References

Java Real-time Programming Interface Using Java SDK to Read Kafka Data for Real-time Upload Singdata Lakehouse Multi-table Real-time Sync Implementing CDC