Function Overview
Lakehouse, through the flink-connector-lakehouse plugin, achieves seamless integration with Flink, enabling efficient data writing into Lakehouse. This plugin uses real-time interfaces for data writing, ensuring the timeliness of data processing.
Lakehouse provides two Flink Connector writing modes: igs-dynamic-table and igs-dynamic-table-append-only, to meet the needs of different scenarios.
- igs-dynamic-table: Supports append mode and Flink CDC scenarios. In this mode, when Flink is used as a data source to access CDC logs, if the source end contains update, delete, and insert operations, and the Lakehouse server-side table is set with primary key attributes, using igs-dynamic-table will trigger data update and delete operations. Supports writing to primary key tables in Lakehouse.
- igs-dynamic-table-append-only: Particularly suitable for scenarios where data updates or deletions are not desired. Even when Flink CDC synchronizes data, this mode ensures that data is only appended, avoiding unnecessary data changes. If your goal is to avoid data deletions and updates, choosing igs-dynamic-table-append-only will be a safer choice. This way, your data will always remain in its original state and will not be affected by subsequent operations. Supports writing to regular tables in Lakehouse.
Category | Details |
---|---|
Supported Types | Only supports result tables, does not support source tables and dimension tables |
Running Mode | Stream mode |
Version Compatibility
Flink Version | Lakehouse Flink Connector Version |
---|---|
1.14, 1.15, 1.16, 1.17 | Please contact Lakehouse support |
When using FLINK-CDC, it is recommended to use version >= 2.3
Usage
Maven Introduction
The Maven repository coordinates are as follows:
Usage
Writing Using SQL
General Configuration Options
Parameter | Required | Default Value | Description |
---|---|---|---|
connector | Yes | - | igs-dynamic-table: Supports append mode and Flink CDC scenarios, generally Lakehouse is a primary key table. igs-dynamic-table-append-only: Only supports append, Lakehouse is a regular table. |
curl | Yes | - | Lakehouse Jdbc connection address, can be obtained from the workspace page. |
schema-name | Yes | - | Schema to be written |
table-name | Yes | - | Table to be written |
sink.parallelism | Yes | - | Degree of parallelism for writing |
properites | Yes | - | "authentication":"true" |
workspace | No | - | Workspace name |
flush.mode | No | AUTO_FLUSH_BACKGROUND | Data flush mode, currently supports AUTO_FLUSH_SYNC: Wait for the last flush to complete before proceeding to the next write AUTO_FLUSH_BACKGROUND: Asynchronous flush allows multiple writes to proceed simultaneously without waiting for the previous write to complete |
showDebugLog | No | False | Whether to enable debug logs |
mutation.flush.interval | No | 10 * 1000 | When this time limit is reached, the data will be actually flushed and submitted to the server. The conditions for data submission to the server are mutation.buffer.lines.num, mutation.buffer.space, and mutation.flush.interval, with any one of the three conditions being met first. |
mutation.buffer.space | No | 5 * 1024 * 1024 | Buffer accumulation size limit. When this limit is reached, the data will be actually flushed and submitted to the server. If the amount of data imported at one time reaches the MB level, this parameter can be increased to speed up the import. The conditions for data submission to the server are that any one of the three conditions: mutation.buffer.lines.num, mutation.buffer.space, or mutation.flush.interval is met first. |
mutation.buffer.max.num | No | 5 | During data submission, mutation.buffer.lines.num specifies the threshold for sending after reaching a certain number of data entries, which triggers asynchronous sending. The mutation.buffer.max.num defines the maximum number of buffers that can exist simultaneously. Even if the data in the previous buffer has not been completely written, as long as the number of buffers does not exceed the limit specified by mutation.buffer.max.num, data can continue to be written to the new buffer. This allows the system to achieve higher concurrency when processing and sending data, as it does not have to wait for all buffers to be emptied before continuing to write new data. In short, mutation.buffer.max.num is equivalent to a JDBC connection pool. |
mutation.buffer.lines.num | No | 100 | The accumulation limit of the number of data entries in each buffer. When the limit is reached, it will switch to a new buffer to continue accumulating until the mutation.buffer.max.num limit is reached, triggering a flush. |
error.type.handler | No | com.clickzetta.platform.client.api.ErrorTypeHandler$TerminateErrorTypeHandler | Default value (terminate program): com.clickzetta.platform.client.api.ErrorTypeHandler$TerminateErrorTypeHandler Optional value (do not terminate program): com.clickzetta.platform.client.api.ErrorTypeHandler$DefaultErrorTypeHandler Potential risk of data loss |
request.failed.retry.enable | No | false | Whether to enable the retry mechanism for mutate failures. |
request.failed.retry.times | No | 3 | Maximum number of retries for mutate failures. |
request.failed.retry.internal.ms | No | 1000 | Unit: ms, interval time for retrying failures is 1000ms. |
request.failed.retry.status | No | THROTTLED | Optional values: THROTTLED, FAILED, NOT_FOUND, INTERNAL_ERROR, PRECHECK_FAILED, STREAM_UNAVAILABLE |
mapping.operation.type.to | Yes | None | If set to igs-dynamic-table-append-only, you can specify the field name in the LH Table for the CDC operator. Must be of STRING type. Write data enumeration values: INSERT|UPDATE_BEFORE|UPDATE_AFTER|DELETE |
Writing using DataStream method
- Parameter meanings and settings refer to general configuration options
Usage Restrictions
- Currently only supports result tables, does not support source tables and dimension tables
Specific Use Cases
Real-time Data Synchronization: Using Flink CDC to Write MySQL Data into Lakehouse Primary Key Table
Overview
This article details how to use the igs-dynamic-table mode of the Lakehouse flink connector to achieve real-time synchronization of MySQL database change data capture (CDC) logs into the Lakehouse primary key table. In igs-dynamic-table mode, the data in the Lakehouse primary key column can be automatically updated to ensure data consistency and real-time performance.
STEP 1: Environment Preparation
- Use IntelliJ IDEA as the development tool, and have Flink programming capabilities.
- Obtain Lakehouse connection information, which can be viewed in Lakehouse Studio management -> workspace, and replace the jdbc protocol with igs. Modify as follows
-
Locally set up Mysql database
-
Download the FLink Connector package provided by Lakehouse (currently supported and provided by Lakehouse for download). Once downloaded, add the jar to the local maven repository for easy reference and packaging in maven projects.
-
-
Modify the pom.xml file and add the following dependencies
-
STEP 2: Create a table in Mysql and insert test data
STEP 3: Create Primary Key Table in Lakehouse
Lakehouse's PRIMARY KEY is used to ensure the uniqueness of each record in the table. In the Lakehouse architecture, for tables with a defined primary key, the system will automatically deduplicate data based on the primary key value during real-time data writing. Once a primary key is set, you will not be able to perform insert, delete, or update operations through SQL statements, nor can you add or delete columns.
STEP 4: Write code and start the task in IDEA
STEP 5: Data Synchronization Verification
- After the startup is completed, the MySQL data will be automatically synchronized to the lakehouse. Query the data in the Lakehouse.
- Update a record in MySQL
- Querying in the Lakehouse will keep the data consistent with MySQL
Real-time Data Synchronization: Writing MySQL Data into Lakehouse Regular Tables Using Flink CDC
Overview
This document provides a detailed introduction on how to use the Lakehouse flink connector with the igs-dynamic-table-append-only mode to achieve real-time synchronization of MySQL database change data capture (CDC) logs into Lakehouse regular tables. In the igs-dynamic-table-append-only mode, Lakehouse will directly record the original CDC logs without updating the data in Lakehouse.
STEP 1: Environment Preparation
- Use IntelliJ IDEA as the development tool, and have Flink programming capabilities.
- Obtain Lakehouse connection information, which can be viewed in Lakehouse Studio management -> workspace, and replace the jdbc protocol with igs. Modify as follows
-
Locally set up Mysql database
-
Download the FLink Connector package provided by Lakehouse (currently supported and provided by Lakehouse for download). After downloading, add the jar to the local maven repository for easy reference and packaging in maven projects.
-
-
Modify the pom.xml file and add the following dependencies
-
STEP 2: Create a table in Mysql and insert test data
STEP 3: Create a Regular Table in the Lakehouse
source_operate
field is used to record log operations in MySQL. In Lakehouse Flink Connector, you can configure mapping.operation.type.to
to specify the field name where the CDC operator falls into the Lakehouse table.
STEP 4: Write code and start the task in IDEA
STEP 5: Data Synchronization Verification
- After the startup is completed, the MySQL data will be automatically synchronized to the lakehouse. Query the data in the Lakehouse.
- Update a record in MySQL
- Querying in the Lakehouse will keep a record of all MySQL operations