Pipe Continuous Ingestion

Pipe is Singdata Lakehouse's continuous data ingestion pipeline object, used to continuously import data from Kafka or object storage (OSS/S3/COS) into Lakehouse tables.

You can think of Pipe as an automated conveyor belt: after data files are uploaded to OSS/S3/COS or messages are written to Kafka, Pipe automatically detects and ingests them into the warehouse without manual triggering or configuring scheduled tasks.

Selection Guide: If you are accustomed to managing data pipelines with SQL, choose Pipe. If you need to connect to relational databases (MySQL/PostgreSQL, etc.), or prefer visual configuration, choose Studio sync tasks.

What Is a Pipe

A Pipe is a SQL object created via DDL statements. Once created, the Pipe runs continuously, automatically reading data from the data source and writing it to the target table.

Differences from Studio Sync Tasks:

DimensionPipeStudio Sync Task
Creation MethodSQL DDLStudio visual interface
Management MethodSQL commandsStudio interface + cz-cli
Applicable ScenariosFamiliar with SQL, need code-based pipeline managementPrefer visual configuration, or need to connect to relational databases
Data SourcesKafka, object storageRelational databases, Kafka, object storage

The two are functionally equivalent; choose based on your usage habits.

Pipe Types

Kafka Pipe

Continuously consumes data from a Kafka topic and writes it to a Lakehouse table.

CREATE PIPE kafka_pipe AS
  COPY INTO orders FROM READ_KAFKA(...) USING JSON;

Two Ingestion Paths:

  1. READ_KAFKA Pipe (Recommended): Uses the READ_KAFKA() function directly in the Pipe
  2. Kafka External Table + Table Stream: First create a Kafka external table, then consume via Table Stream

Object Storage Pipe

Continuously scans new files from OSS/S3/COS and imports them.

CREATE PIPE oss_pipe
    VIRTUAL_CLUSTER = 'default'
    INGEST_MODE = 'LIST_PURGE'
    AS COPY INTO orders FROM VOLUME my_volume USING CSV PURGE = TRUE;

Comparison of Two Scan Modes:

DimensionLIST_PURGEEVENT_NOTIFICATION
Trigger MethodPeriodic polling scan of directoryObject storage event notification (near real-time trigger)
Supported CloudsOSS, S3, COSOSS, S3 only
Authorization MethodSecret key or Role ARNRole ARN only
Source File ProcessingAuto-deletes source files after successful import (requires PURGE = TRUE)Preserves source files
Configuration ComplexitySimple, no extra configuration neededRequires MNS queue configuration

Pipe Lifecycle

Create Pipe --> Auto Run --> Continuous Ingestion
     |              |
     v              v
  Suspend Pipe   Monitor Status
     |
     v
  Resume Pipe

Monitoring Pipes

-- Check Pipe status
SHOW PIPES;

-- View Pipe details
DESC PIPE my_pipe;

Applicable Scenarios

  • Kafka Real-time Ingestion: Log data, business events written in real time
  • Object Storage Batch Import: Regularly uploaded CSV/JSON/Parquet files automatically ingested
  • Replace Scheduled Tasks: No need to configure Cron; Pipe runs continuously