Using Table Stream and Pipe to Import Kafka Data into Lakehouse
1. Background Introduction
In the field of big data processing, efficiently importing streaming data from Kafka into a Lakehouse (data lake warehouse) is a common requirement. CloudTech provides powerful Table Stream and Pipe functionalities that simplify and enhance this process. This article will detail how to use Table Stream and Pipe to import Kafka data into a Lakehouse, including the complete process of creating a Kafka external table and a Kafka Table Stream.
2. Operational Steps
Creating a Kafka External Table
Before using Table Stream and Pipe, we need to create an external table integrated with Kafka to access data in Kafka.
Creating a Table Stream
Create a Table Stream on the Kafka external table to capture real-time data changes in Kafka.
kafka_table_stream_pipe1
: Name of the Table Stream.ON TABLE external_table_kafka
: Specifies that the Table Stream is created based on the previously created Kafka external table.table_stream_mode='append_only'
: Sets the mode of the Table Stream to append-only, meaning it will only capture newly added data rows.
After creation, you can verify the data in the Table Stream with the following query:
This query converts the value
field in the Table Stream to a string type and returns it for subsequent processing.
Creating a Target Table
Next, create a target table to store data imported from Kafka.
kafak_sink_table_1
: Name of the target table.a TIMESTAMP
: First field for storing timestamp data.b STRING
: Second field for storing string data.
Creating a Pipe
Finally, use a Pipe to continuously import data from the Table Stream into the target table.
kafka_pipe_stream
: Name of the Pipe.VIRTUAL_CLUSTER = 'test_alter'
: Specifies the virtual cluster to use.COPY INTO kafak_sink_table_1
: Copies data into the target tablekafak_sink_table_1
.SELECT CURRENT_TIMESTAMP(), CAST(value AS STRING) FROM kafka_table_stream_pipe1
: Selects data from the Table Stream, using the current timestamp and the convertedvalue
field as the two columns for the target table.
Other Configurable Properties:
INITIAL_DELAY_IN_SECONDS
: Initial job scheduling delay (optional, default 0 seconds)BATCH_INTERVAL_IN_SECONDS
: (Optional) Sets the batch processing interval, default 60 seconds.BATCH_SIZE_PER_KAFKA_PARTITION
: (Optional) Sets the batch size per Kafka partition, default 500,000 records.MAX_SKIP_BATCH_COUNT_ON_ERROR
: (Optional) Sets the maximum number of batches to skip on error, default 30.RESET_KAFKA_GROUP_OFFSETS
: (Optional) Sets the initial offset for Kafka when starting the pipe. Cannot be modified. Possible values:latest
,earliest
,none
,valid
,${TIMESTAMP_MILLISECONDS}
none
: Default, no action.valid
: Checks if the current offset in the group is expired and resets expired partitions to the current earliest.earliest
: Resets to the current earliest.latest
: Resets to the current latest.${TIMESTAMP_MILLISECONDS}
: Resets to the offset corresponding to the millisecond timestamp, e.g., '1737789688000' (2025-01-25 15:21:28).
3. Verifying Results
You can verify whether the data has been successfully imported by querying the target table:
Additionally, check the running status of the Pipe to ensure it is functioning properly:
This command lists all created Pipes and their status information, including whether they are running and the last run time.
4. Status Monitoring and Management
Checking Kafka Consumption Latency
Use the DESC PIPE
command. For example, the JSON string in pipe_latency
below:
lastConsumeTimestamp
: The last consumed offset.offsetLag
: The backlog of Kafka data.timeLag
: Consumption latency, calculated as the current time minus the last consumed offset. If Kafka consumption is abnormal, the value is -1.
Viewing Pipe Execution History
Since each Pipe execution is a copy operation, you can view all operations in the job history. Use the query_tag
in the Job History to filter, as all Pipe copy jobs are tagged in the format pipe.``workspace_name``.schema_name.pipe_name
, making it easy to track and manage.
Stopping and Starting a Pipe
-
Pause a Pipe:
-
Resume a Pipe:
Modifying Pipe Properties
You can modify Pipe properties, but only one property at a time. If multiple properties need to be modified, execute the ALTER
command multiple times. Below are the modifiable properties and their syntax:
Examples:
Note
- Modifying the COPY statement logic is not supported. If needed, delete the Pipe and recreate it.
- When modifying the
COPY_JOB_HINT
of a Pipe, the new settings will overwrite existing hints. If your Pipe already has hints (e.g.,{"cz.sql.split.kafka.strategy":"size"}
), you must set all required hints together when adding new ones; otherwise, existing hints will be overwritten. Separate multiple parameters with commas.