READ_KAFKA

Syntax

SELECT ... FROM READ_KAFKA(
 'bootstrap',
 'topic',
'topic_pattern',
 'group_id', 
'STARTING_OFFSETS', 'ENDING_OFFSETS', 'STARTING_OFFSETS_TIMESTAMP', 'ENDING_OFFSETS_TIMESTAMP', 
'KEY_FORMAT', 
'VALUE_FORMAT', 
   0,
MAP()      
)

The read_kafka function is used to read data from Kafka. It supports the following parameters:

  • bootstrap: Kafka server address, such as 1.2.3.1:9092,1.2.3.2:9092.
  • topic: Kafka topic name, multiple topics separated by commas, such as topicA,topicB.
  • topic_patternt*: topic regex, not supported yet, leave it empty by default. For example: ''.
  • group_id: Kafka consumer group ID.
  • STARTING_OFFSETS: Specify the starting offset to read from, default is earliest.
  • ENDING_OFFSETS: Specify the ending offset to read to, default is latest.
  • STARTING_OFFSETS_TIMESTAMP: Specify the timestamp for the starting offset.
  • ENDING_OFFSETS_TIMESTAMP: Specify the timestamp for the ending offset.
  • KEY_FORMAT: Specify the format of the key to read, type is STRING and case-insensitive. Currently only supports raw format.
  • VALUE_FORMAT: Specify the format of the value to read, type is STRING and case-insensitive. Currently only supports raw format.
  • MAX_ERROR_NUMBER: The maximum number of error rows allowed within the read window. Must be greater than or equal to 0. Default is 0, which means no error rows are allowed, range is 0-100000.
  • MAP(): Parameters to be passed to Kafka, prefixed with kafka., directly use Kafka's parameters, can be found in Kafka. Format like MAP('kafka.security.protocol', 'PLAINTEXT', 'kafka.auto.offset.reset', 'earliest'), refer to Kafka documentation for values.

read_kafka result return value:

FieldMeaningType
topicKafka topic nameSTRING
partitionData partition IDINT
offsetOffset in Kafka partitionBIGINT
timestampKafka message timestampTIMESTAMP_LTZ
timestamp_typeKafka message timestamp typeSTRING
headersKafka message headersMAP<STRING, BINARY>
keyKafka key valueBINARY
valueKafka value valueBINARY

Notes

  • When using read_kafka, please ensure network connectivity with Lakehouse.

Examples

SELECT topic, partition, offset, `timestamp`, `timestamp_type`, headers,
           cast(key as string) as key_str, cast(value as string) as value_str
    FROM read_kafka(
        '1.2.3.1:9092,1.2.3.2:9092',
        'my_topic',
        'my_group',
        'earliest','latest','','',
        'RAW', 'RAW',
        0,
        MAP('kafka.security.protocol', 'PLAINTEXT', 'kafka.auto.offset.reset', 'earliest')
    )