Overview
[Preview Release] This feature is currently in public preview release.
This document mainly introduces how to create an external table in SQL that connects to the Kafka message queue system. By defining an external table, you can easily read data streams from Kafka and query and analyze these data streams as tables.
Create Storage Connection
First, you need to create a storage connection to connect to the Kafka server. Currently, connections requiring certificates are not supported.
Syntax
Parameter Description
- connection_name: The name of the connection, used for subsequent references.
- TYPE: The type of connection, here it is
kafka
. - BOOTSTRAP_SERVERS: The address list of the Kafka cluster, formatted as
['host1:port1', 'host2:port2', ...]
. - SECURITY_PROTOCOL: Security protocol, can be
PLAINTEXT
, etc.
Syntax
Parameter Description
- external_table_name: The name of the external table.
- Field Description
Field | Meaning | Type |
---|---|---|
topic | Kafka topic name | STRING |
partition | Data partition ID | INT |
offset | Offset in the Kafka partition | BIGINT |
timestamp | Kafka message timestamp | TIMESTAMP_LTZ |
timestamp_type | Kafka message timestamp type | STRING |
headers | Kafka message headers | MAP<STRING, BINARY> |
key | Column name of the message key, type is binary . You can convert the binary type to a readable string using type conversion methods such as cast(key_column as string) | BINARY |
value | Column name of the message body, type is binary . You can convert the binary type to a readable string using type conversion methods such as cast(key_column as string) | BINARY |
- USING kafka: Specify using Kafka as the data source.
- OPTIONS:
- group_id: Kafka consumer group ID.
- topics: Kafka topic name.
- starting_offset: Starting offset, default is earliest, can be
earliest
orlatest
. - ending_offset: Ending offset, default is
latest
, can beearliest
orlatest
. - cz.kafka.seek.timeout.ms: Kafka seek timeout (milliseconds).
- cz.kafka.request.retry.times: Kafka request retry times.
- cz.kafka.request.retry.intervalMs: Kafka request retry interval time (milliseconds).
- CONNECTION: Specify the previously created storage connection name.