Continuous Data Import from Kafka Using Pipe
Kafka Pipe Syntax
<pipe_name>
: The name of the Pipe object you want to create.VIRTUAL_CLUSTER
: Specify the name of the virtual cluster.BATCH_INTERVAL_IN_SECONDS
: (Optional) Set the batch interval time, default is 60 seconds.BATCH_SIZE_PER_KAFKA_PARTITION
: (Optional) Set the batch size per Kafka partition, default is 500,000 records.MAX_SKIP_BATCH_COUNT_ON_ERROR
: (Optional) Set the maximum retry count for skipped batches on error, default is 30.
RESET_KAFKA_GROUP_OFFSETS
: (Optional) Sets the initial offset for Kafka when starting the pipe. This property cannot be modified after the pipe is created. Possible values arelatest
,earliest
,none
,valid
, and${TIMESTAMP_MILLISECONDS}
.none
: No action by default.valid
: Checks if the current offset in the group is expired and resets expired partitions to the current earliest offset.earliest
: Resets to the current earliest offset.latest
: Resets to the current latest offset.${TIMESTAMP_MILLISECONDS}
: Resets to the offset corresponding to the millisecond timestamp, for example,'1737789688000'
(which corresponds to January 25, 2025, 15:21:28).
Usage Example
Function: read_kafka
Function Description
Read data from an Apache Kafka cluster and return the data in tabular form.
Function Syntax
Parameter Description
- bootstrap: Comma-separated Kafka broker server addresses, such as
1.2.3.1:9092,1.2.3.2:9092
. - topic: Kafka topic name, multiple topics separated by commas, such as
topicA,topicB
. - topic_pattern: Topic regex, not supported yet, leave it empty by default. For example: ''.
- group_id: Kafka consumer group ID.
- STARTING_OFFSETS: Specifies the starting offset to read from, default is
latest
. This parameter does not need to be passed in the pipe. - ENDING_OFFSETS: Specifies the ending offset, default is
latest
. This parameter does not need to be passed in the pipe. - STARTING_OFFSETS_TIMESTAMP: Specifies the timestamp for the starting offset. This parameter does not need to be passed in the pipe.
- ENDING_OFFSETS_TIMESTAMP: Specifies the timestamp for the ending offset. This parameter does not need to be passed in the pipe.
- KEY_FORMAT: Specifies the format of the key to read, case-insensitive STRING type. Currently, only raw format is supported.
- VALUE_FORMAT: Specifies the format of the value to read, case-insensitive STRING type. Currently, only raw format is supported.
- MAX_ERROR_NUMBER: The maximum number of allowed error rows within the reading window. Must be greater than or equal to 0. The default is 0, which means no error rows are allowed, with a range of 0-100000.
- kafka_parameters: Parameters to be passed to Kafka, prefixed with kafka., directly using Kafka's parameters. These options can be found in Kafka. The format is like MAP('kafka.security.protocol', 'PLAINTEXT', 'kafka.auto.offset.reset', 'latest'). For values, refer to the Kafka documentation.
Return Values
Field | Meaning | Type |
---|---|---|
topic | Kafka topic name | STRING |
partition | Data partition ID | INT |
offset | Offset in Kafka partition | BIGINT |
timestamp | Kafka message timestamp | TIMESTAMP_LTZ |
timestamp_type | Kafka message timestamp type | STRING |
headers | Kafka message headers | MAP<STRING, BINARY> |
key | Kafka key value | BINARY |
value | Kafka value | BINARY |
Status Monitoring and Management
Viewing Kafka Consumption Latency
Use the DESC PIPE
command. For example, the JSON string in pipe_latency
below:
lastConsumeTimestamp
: The last consumed offset.offsetLag
: The backlog of Kafka data.timeLag
: Consumption latency, calculated as the current time minus the last consumed offset. If Kafka consumption is abnormal, the value is -1.
Viewing Pipe Execution History
Since each Pipe execution triggers a copy operation, you can view all operations in the job history. Use the query_tag
in the Job History to filter. All Pipe copy jobs are tagged in the format pipe.``workspace_name``.schema_name.pipe_name
, making it easy to track and manage.
Stopping and Starting a Pipe
-
Pause a Pipe:
-
Resume a Pipe:
Modifying Pipe Properties
You can modify the properties of a Pipe, but only one property at a time. If multiple properties need to be modified, execute the ALTER
command multiple times. Below are the modifiable properties and their syntax:
Examples:
Note
- Modifying the logic of the COPY statement is not supported. If you need to modify it, delete the Pipe and recreate it.
- When modifying the
COPY_JOB_HINT
of a Pipe, the new settings will overwrite existing hints. Therefore, if your Pipe already has hints (e.g.,{"cz.sql.split.kafka.strategy":"size"}
), you must set all required hints together when adding new ones; otherwise, the existing hints will be overwritten by the new settings. Separate multiple parameters with commas.