Pipe Syntax
This document provides detailed syntax for creating a Pipe object to automate the import of data from object storage or Kafka into the Lakehouse. The Pipe object is a powerful tool that can help users simplify the data import process and ensure efficient data flow.
Creating PIPE Syntax
Use the following syntax to create a Pipe object to automate the import of data from object storage into the Lakehouse.
Importing Data from Object Storage
To create a Pipe object to import data from object storage, you can use the following syntax:
<pipe_name>
: The name of the Pipe object you want to create.VIRTUAL_CLUSTER
: Specify the name of the virtual cluster.INGEST_MODE
: Set toLIST_PURGE
orEVENT_NOTIFICATION
to determine the data ingestion mode.COPY_JOB_HINT
: Optional, Lakehouse reserved parameterIGNORE_TMP_FILE
: Values can betrue
orfalse
, with the default value beingtrue
. This parameter supports filtering files or directories that start with a dot (.
) or_temporary
. For example,s3://my_bucket/a/b/.SUCCESS
,oss://my_bucket/a/b/_temporary/
, oross://my_bucket/a/b/_temporary_123/
.
Instructions
Import data from Kafka
To create a Pipe object to import data from Kafka, you can use the following syntax:
<pipe_name>
: The name of the Pipe object you want to create.VIRTUAL_CLUSTER
: Specify the name of the virtual cluster.BATCH_INTERVAL_IN_SECONDS
: (Optional) Set the batch interval time, default is 60 seconds.BATCH_SIZE_PER_KAFKA_PARTITION
: (Optional) Set the batch size per Kafka partition, default is 500,000 records.MAX_SKIP_BATCH_COUNT_ON_ERROR
: (Optional) Set the maximum retry count for skipped batches on error, default is 30.RESET_KAFKA_GROUP_OFFSETS
: (Optional) Sets the initial offset for Kafka when starting the pipe. This property cannot be modified after the pipe is created. Possible values arelatest
,earliest
,none
,valid
, and${TIMESTAMP_MILLISECONDS}
.none
: No action by default.valid
: Checks if the current offset in the group is expired and resets expired partitions to the current earliest offset.earliest
: Resets to the current earliest offset.latest
: Resets to the current latest offset.${TIMESTAMP_MILLISECONDS}
: Resets to the offset corresponding to the millisecond timestamp, for example,'1737789688000'
(which corresponds to January 25, 2025, 15:21:28).
Instructions
- Using read_kafka to Continuously Import Kafka Data
- Using Kafka Table Stream to Continuously Import Kafka Data
Pause and Start PIPE
You can control the execution state of the PIPE to manage the data synchronization process.
- Pause PIPE:
- Start PIPE:
View Pipe Details
View detailed information of a specific Pipe object.
View Pipe List and Object Details
List all Pipe objects and their detailed information.
Delete Pipe Object
When a Pipe object is no longer needed, you can use the following command to delete it.
load_history function
Function Description: The load_history function is used to view the COPY job import file history of a table, with a retention period of 7 days. At the same time, when Pipe is executed, it will avoid re-importing existing files based on load_history to ensure the uniqueness of the data.
Function Syntax:
- schema_name.table_name: Specify the table name to view the import history.
Use Case:
Constraints and Limitations
- When the data source is Kafka: Only one read_kafka function is allowed in a pipe
- When the data source is object storage: Only one volume object is allowed in a pipe