CREATE PIPE

Creates a Pipe object for continuously importing data from object storage or Kafka into the Lakehouse.

Import Data from Object Storage

CREATE PIPE [IF NOT EXISTS] <pipe_name>
    VIRTUAL_CLUSTER = 'virtual_cluster_name'
    INGEST_MODE = 'LIST_PURGE' | 'EVENT_NOTIFICATION'
    [COPY_JOB_HINT = '']
AS <copy_statement>;

Parameter Description:

ParameterRequiredDescription
pipe_nameYesName of the Pipe object
VIRTUAL_CLUSTERYesThe name of the compute cluster used to execute COPY jobs
INGEST_MODEYesData ingestion mode: LIST_PURGE (polling scan) or EVENT_NOTIFICATION (event notification trigger)
COPY_JOB_HINTNoLakehouse reserved parameter. Supports IGNORE_TMP_FILE (value true|false, default true), used to filter files or directories starting with . or _temporary
copy_statementYesA standard COPY INTO statement. Supports the ON_ERROR=CONTINUE|ABORT parameter to control error handling strategy

Usage Restrictions:

  • COPY statements in a Pipe do not support the FILES, REGEXP, or SUBDIRECTORY parameters.
  • Each Pipe must correspond to an independent Volume and cannot be reused.

Import Data from Kafka

CREATE PIPE [IF NOT EXISTS] <pipe_name>
    VIRTUAL_CLUSTER = 'virtual_cluster_name'
    [INITIAL_DELAY_IN_SECONDS = '']
    [BATCH_INTERVAL_IN_SECONDS = '']
    [BATCH_SIZE_PER_KAFKA_PARTITION = '']
    [MAX_SKIP_BATCH_COUNT_ON_ERROR = '']
    [RESET_KAFKA_GROUP_OFFSETS = '']
    [COPY_JOB_HINT = '']
AS <copy_statement>;

Parameter Description:

ParameterRequiredDefaultDescription
VIRTUAL_CLUSTERYes--The name of the compute cluster used to execute COPY jobs
INITIAL_DELAY_IN_SECONDSNo0Delay in seconds before the first job is scheduled
BATCH_INTERVAL_IN_SECONDSNo60Batch processing interval in seconds
BATCH_SIZE_PER_KAFKA_PARTITIONNo500000Maximum number of messages per batch per Kafka partition
MAX_SKIP_BATCH_COUNT_ON_ERRORNo30Maximum number of retries to skip a batch on error
RESET_KAFKA_GROUP_OFFSETSNononeInitial Kafka offset position when starting the Pipe. Options: none (no action), valid (reset expired offsets), earliest, latest, ${TIMESTAMP_MILLISECONDS}