Real-time Write Principle
The SDK for real-time writing to Lakehouse is an efficient data stream processing tool that allows users to upload and store data in Lakehouse in real-time. The following is the working principle of real-time writing:
- SDK Uploads Data: Users upload data to Lakehouse's Ingestion Service in real-time through the SDK.
- Ingestion Service Processing: After the Ingestion Service receives the data, it directly writes the data into the Lakehouse table. At this stage, the data is stored as temporary intermediate files, and this stage is called a hybrid table.
- Query Real-time Data: Before the data is committed, users can query (select) the newly written real-time data, but this data is not visible to table stream, materialized view, and dynamic table.
- Data Commit: The newly written data will be automatically committed after about one minute. After the commit, table stream, materialized view, and dynamic table can read this part of the data.
- Hybrid Table Becomes Regular Table: After the data is committed, the background process will merge the hybrid table into a regular table. Once the merge is complete, users can perform update operations (update\merge\delete).
Applicable Scenarios
The SDK for real-time writing to Lakehouse is suitable for the following scenarios:
- Short Interval Data Import: If your application scenario requires data to be imported at very short intervals (such as 5 minutes or less), the real-time write SDK can meet your needs.
- Frequent Small Data Submissions: For situations where data needs to be submitted frequently but the amount of data submitted each time is small, the real-time write SDK provides an efficient solution.
- Real-time Data Analysis: The real-time write SDK is suitable for applications that require immediate analysis and response to data, such as real-time monitoring, event tracking, and real-time reporting.
Notes
- Real-time written data can be queried at the second level.
- Real-time written data can currently only achieve real-time table structure change perception through the sink operator (single concurrency) that supports schema change provided internally by the Flink Connector. In other scenarios, when changing the table structure, you need to stop the real-time write task first, and then restart the task after a period of time (about 90 minutes) after the table structure change.
- Table stream, materialized view, and dynamic table can only display committed data. Real-time task written data needs to wait 1 minute to be confirmed, so table stream also needs to wait 1 minute to see it.
Create Real-time Data Stream via Client
To create a real-time data stream, you first need to use the ClickZetta client:
Options
The following options can be passed into the real-time data stream to control the behavior of data writing. These parameters are all optional, and it is recommended to use the default parameters.
Parameter Description
Parameter | Default Value | Description | |
---|---|---|---|
Write Performance Parameters | withFlushMode | FlushMode.AUTO_FLUSH_BACKGROUND | Data flush mode, currently supports FlushMode.AUTO_FLUSH_SYNC: Wait for the previous flush to complete before proceeding with the next write FlushMode.AUTO_FLUSH_BACKGROUND: Asynchronous flush allows multiple writes to proceed simultaneously without waiting for the previous write to complete |
withMutationBufferLinesNum | 100 | The limit on the number of data entries accumulated in each buffer. Once this limit is reached, the data will be sent to the server. When this limit is reached, the data will be actually flushed and submitted to the server. If the amount of data imported at one time reaches the MB level, this parameter can be increased to speed up the import. The conditions for data submission to the server are that either MutationBufferLinesNum or FlushInterval is reached first. | |
withMutationBufferMaxNum | 5 | During data submission, withMutationBufferLinesNum specifies the threshold for sending data entries after reaching a certain number. withMutationBufferMaxNum defines the maximum number of buffers that can exist simultaneously. Even if the data in the previous buffer has not been completely written, as long as the number of buffers does not exceed the limit specified by withMutationBufferMaxNum , new data can continue to be written to new buffers. This allows the system to achieve higher concurrency in processing and sending data, as it does not have to wait for all buffers to be cleared before continuing to write new data. In short, withMutationBufferMaxNum is equivalent to a JDBC connection pool. | |
withMutationBufferSpace | 5 * 1024 * 1024 (5MB) | When this limit is reached, the data will be actually flushed and submitted to the server. If the amount of data imported at one time reaches the MB level, this parameter can be increased to speed up the import. The condition for data submission to the server is that either MutationBufferSpace or MutationBufferLinesNum is reached first. | |
withFlushInterval | 10 * 1000 (10 seconds) | When this time limit is reached, the data will be actually flushed and submitted to the server. The condition for data submission to the server is that either MutationBufferSpace or withMutationBufferLinesNum is reached first. | |
Retry Mechanism Parameters | withRequestFailedRetryEnable | FALSE | Whether to enable the retry mechanism for failed mutations. TRUE |
withRequestFailedRetryTimes | 5 | The maximum number of retries for failed mutations. | |
withRequestFailedRetryInternalMs | 5000 (5 seconds) | Unit: ms, the interval time for retrying after a failure. | |
withRequestFailedRetryLogDebugEnable | FALSE | Whether to enable debug logging | |
withRequestFailedRetryStatus | RegisterStatus.RetryStatus.THROTTLED | Retry based on which error reason, the default is RegisterStatus.RetryStatus.THROTTLED. Multiple values are separated by commas. Values: RegisterStatus.RetryStatus.THROTTLED RegisterStatus.RetryStatus.INTERNAL_ERROR RegisterStatus.RetryStatus.FAILED RegisterStatus.RetryStatus.PRECHECK_FAILED |
Write Data (Row)
Create a specific data object (Row) through the stream.createRow
method, and encapsulate the data into the Row object through the row.setValue
method.
Row row = stream.createRow(Stream.Operator.INSERT);
row.setValue("id", t);
row.setValue("name", String.valueOf(t));
stream.apply(row);
-
When the Stream is created as
RowStream.RealTimeOperate.APPEND_ONLY
, onlyStream.Operator.INSERT
type Rows can be created. -
When the Stream is created as
RowStream.RealTimeOperate.CDC
, all the following Row types can be used:Stream.Operator.INSERT
: Insert row, an error is reported if the target row already exists.Stream.Operator.DELETE
: Delete row, an error is reported if the target row does not exist.Stream.Operator.UPDATE
: Update row, an error is reported if the target row does not exist.Stream.Operator.UPSERT
: Insert row, if the target row already exists, update that row.Stream.Operator.INSERT_IGNORE
: Insert row, if the target row already exists, it is automatically ignored.
Data Submission to Server
By calling the ((RealtimeStream)stream).flush() method, the data will be submitted to the server. If not called, the data will be sent to the server when any one of MutationBufferSpace, withMutationBufferLinesNum, or withFlushInterval is reached first.
Specific Cases
Append Write to Ordinary Table
Writing Vector Types
Creating a Table
Writing with Java
CDC Real-time Writing
Lakehouse supports the CDC (Change Data Capture) function of databases, writing data into Lakehouse tables in a streaming manner and updating table data in real-time. The synchronization process is achieved by RealtimeStream updating insert and delete row operations in real-time. Additionally, it supports data writing using Flink connector and IGS SDK. When creating a table, a primary key needs to be set to ensure the uniqueness and consistency of the data.
Creating a Table
When creating a Lakehouse table, a primary key needs to be specified. CDC writing will deduplicate data based on the primary key to ensure data accuracy. The created primary key table does not support SQL operations and can only write data through real-time writing streams. The created primary key table also does not support adding or modifying columns using SQL. Below is an example of creating a table:
IGS SDK Real-time Write Stream
Create Real-time Write Stream
To create a real-time write stream using the IGS SDK, you need to specify the operation type (CDC) and related options. Below is an example of creating a real-time write stream:
Specify Operation Type
According to the requirements, different operation types can be specified:
Stream.Operator.UPSERT
: Insert or update a row. If the target row does not exist, insert it; if it already exists, update it.Stream.Operator.DELETE_IGNORE
: Delete a row. If the target row does not exist, it is automatically ignored.
Use Native Java SDK to Write
Using Lakehouse Real-time Sync Function to Write
Refer to the documentation Multi-table Real-time Sync
Using FLINK CONNECTOR to Write
Flink connector is encapsulated based on RealtimeStream SDK, used for real-time data synchronization. See Flink Connector