Pipe Syntax

This document provides detailed syntax for creating a Pipe object to automate the import of data from object storage or Kafka into the Lakehouse. The Pipe object is a powerful tool that can help users simplify the data import process and ensure efficient data flow.

Creating PIPE Syntax

Use the following syntax to create a Pipe object to automate the import of data from object storage into the Lakehouse.

Importing Data from Object Storage

To create a Pipe object to import data from object storage, you can use the following syntax:

-- Syntax for creating a Pipe from object storage
CREATE PIPE [ IF NOT EXISTS ] <pipe_name>
    VIRTUAL_CLUSTER = 'virtual_cluster_name'
    INGEST_MODE='LIST_PURGE'|'EVENT_NOTIFICATION'
    [COPY_JOB_HINT='']
AS <copy_statement>;
  • <pipe_name>: The name of the Pipe object you want to create.
  • VIRTUAL_CLUSTER: Specify the name of the virtual cluster.
  • INGEST_MODE: Set to LIST_PURGE or EVENT_NOTIFICATION to determine the data ingestion mode.
  • COPY_JOB_HINT: Optional, Lakehouse reserved parameter
  • IGNORE_TMP_FILE: Values can be true or false, with the default value being true. This parameter supports filtering files or directories that start with a dot (.) or _temporary. For example, s3://my_bucket/a/b/.SUCCESS, oss://my_bucket/a/b/_temporary/, or oss://my_bucket/a/b/_temporary_123/.

Instructions

Import data from Kafka

To create a Pipe object to import data from Kafka, you can use the following syntax:

-- Syntax for creating a Pipe from Kafka
CREATE PIPE [ IF NOT EXISTS ] <pipe_name>
    VIRTUAL_CLUSTER = 'virtual_cluster_name'
    [BATCH_INTERVAL_IN_SECONDS='']
   [ BATCH_SIZE_PER_KAFKA_PARTITION='']
    [MAX_SKIP_BATCH_COUNT_ON_ERROR='']
    [RESET_KAFKA_GROUP_OFFSETS='']
    [COPY_JOB_HINT='']
AS <copy_statement>;
  • <pipe_name>: The name of the Pipe object you want to create.
  • VIRTUAL_CLUSTER: Specify the name of the virtual cluster.
  • BATCH_INTERVAL_IN_SECONDS: (Optional) Set the batch interval time, default is 60 seconds.
  • BATCH_SIZE_PER_KAFKA_PARTITION: (Optional) Set the batch size per Kafka partition, default is 500,000 records.
  • MAX_SKIP_BATCH_COUNT_ON_ERROR: (Optional) Set the maximum retry count for skipped batches on error, default is 30.
  • RESET_KAFKA_GROUP_OFFSETS: (Optional) Sets the initial offset for Kafka when starting the pipe. This property cannot be modified after the pipe is created. Possible values are latest, earliest, none, valid, and ${TIMESTAMP_MILLISECONDS}.
    • none: No action by default.
    • valid: Checks if the current offset in the group is expired and resets expired partitions to the current earliest offset.
    • earliest: Resets to the current earliest offset.
    • latest: Resets to the current latest offset.
    • ${TIMESTAMP_MILLISECONDS}: Resets to the offset corresponding to the millisecond timestamp, for example, '1737789688000' (which corresponds to January 25, 2025, 15:21:28).

Instructions

Pause and Start PIPE

You can control the execution state of the PIPE to manage the data synchronization process.

  • Pause PIPE:
ALTER PIPE pipe_name SET PIPE_EXECUTION_PAUSED = true
  • Start PIPE
ALTER PIPE pipe_name SET PIPE_EXECUTION_PAUSED = false

View Pipe Details

View detailed information of a specific Pipe object.

DESC PIPE <name>;

View Pipe List and Object Details

List all Pipe objects and their detailed information.

SHOW PIPES;

Delete Pipe Object

When a Pipe object is no longer needed, you can use the following command to delete it.

DROP PIPE <name>;

load_history function

Function Description: The load_history function is used to view the COPY job import file history of a table, with a retention period of 7 days. At the same time, when Pipe is executed, it will avoid re-importing existing files based on load_history to ensure the uniqueness of the data.

Function Syntax:

load_history('schema_name.table_name')
  • schema_name.table_name: Specify the table name to view the import history.

Use Case:

SELECT * FROM load_history('myschema.mytable');

Constraints and Limitations

  • When the data source is Kafka: Only one read_kafka function is allowed in a pipe
  • When the data source is object storage: Only one volume object is allowed in a pipe