Pipe
Pipe is the Lakehouse's continuous data ingestion object. Created via SQL DDL, it runs automatically, continuously reading data from Kafka or object storage (OSS/COS/S3) and writing it to target tables without any manual triggering.
Think of a Pipe as an "automated conveyor belt" — once data files are uploaded to an OSS subdirectory or messages are written to Kafka, the Pipe automatically detects and loads them. Unlike scheduled jobs, a Pipe runs persistently; files are typically ingested within about 30 seconds of being uploaded.
Pipe vs. Studio Sync Jobs
| Dimension | Pipe | Studio Sync Job |
|---|---|---|
| Creation method | SQL DDL | Studio visual interface |
| Applicable sources | Kafka, OSS/COS/S3 | Relational databases, Kafka, object storage |
| Management method | SQL commands | Studio interface |
| Suitable for | SQL-oriented, code-based management | Prefer visual configuration |
The two are functionally equivalent; choose based on your workflow preference.
Pipe Types
Object Storage Pipe (OSS/COS/S3)
Continuously scans for new files in object storage and ingests them. Two modes are supported:
| Mode | Trigger | Source File Handling | Extra Configuration |
|---|---|---|---|
LIST_PURGE | Periodic polling scan (approx. 30 seconds) | Permanently deletes source files after ingestion | No extra configuration needed |
EVENT_NOTIFICATION | Object storage event notification (near real-time) | Retains source files | Requires MNS message queue configuration; OSS and S3 only |
Deduplication mechanism: Pipe uses load_history to record the file paths already ingested. A file at the same path is only ingested once — re-uploading the same file will not trigger a duplicate ingestion. Records are retained for 7 days.
Kafka Pipe
Creates a persistent consumer group that continuously pulls data from a Kafka topic in batches and writes it to a table.
Kafka Pipe Parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
VIRTUAL_CLUSTER | Yes | — | Compute cluster for executing COPY jobs |
INITIAL_DELAY_IN_SECONDS | No | 0 | Delay before the first job is scheduled (seconds) |
BATCH_INTERVAL_IN_SECONDS | No | 60 | Batch processing interval (seconds) |
BATCH_SIZE_PER_KAFKA_PARTITION | No | 500000 | Maximum messages per Kafka partition per batch |
MAX_SKIP_BATCH_COUNT_ON_ERROR | No | 30 | Maximum number of batches to skip on error |
RESET_KAFKA_GROUP_OFFSETS | No | none | Initial Kafka offset on startup. Options: none (no action), valid (reset expired offsets), earliest, latest, ${TIMESTAMP_MILLISECONDS} |
COPY_JOB_HINT | No | — | Reserved parameter. Supports IGNORE_TMP_FILE (default true), which filters files starting with . or _temporary |
load_history Function
View the COPY job file ingestion history for a table. Records are retained for 7 days. Pipe uses load_history to avoid re-ingesting the same files.
ALTER PIPE Syntax
Only one property can be modified per statement. Run multiple statements to modify multiple properties:
DESC PIPE Field Reference
DESC PIPE returns key-value format output. Key fields:
| Field | Description |
|---|---|
pipe_status | RUNNING / PAUSED / INVALID |
pipe_kind | VOLUME (object storage) or KAFKA |
properties | Shows ingest_mode and virtual_cluster configuration |
input_name | Data source, in the format volume:catalog.schema.volume_name or kafka_table_stream:workspace.schema.stream |
output_name | Target table full path catalog.schema.table |
invalid_reason | Error reason when the Pipe is in an invalid state; empty when normal |
pipe_latency | Kafka Pipe consumption lag (offsetLag of 0 means no backlog) |
Constraints and Limitations
- Object storage Pipe: the COPY statement does not support
FILES,REGEXP, orSUBDIRECTORYparameters - Object storage Pipe: each Pipe corresponds to a dedicated Volume; different Pipes cannot share the same Volume
- Kafka Pipe: a single Pipe can only contain one
READ_KAFKAfunction - The
COPYstatement cannot be modified after creation; to change the ingestion logic, drop the Pipe and recreate it
DESC PIPE key field reference:
| Field | Description |
|---|---|
pipe_status | RUNNING / PAUSED |
pipe_kind | VOLUME (object storage) or KAFKA |
properties | Shows ingest_mode and virtual_cluster configuration |
input_name | Data source, in the format volume:catalog.schema.volume_name |
output_name | Target table full path catalog.schema.table |
invalid_reason | Error reason when the Pipe is in an invalid state; empty when normal |
To view Pipe execution history: filter job history by query_tag in the format pipe.workspace_name.schema_name.pipe_name.
Important Notes
- Volume cannot point to the bucket root path:
LOCATIONmust be a subdirectory; otherwise Pipe creation will fail - Each Pipe requires a dedicated Volume: different Pipes cannot share the same Volume
- COPY statement cannot be modified: to change the ingestion logic, drop the Pipe and recreate it
- Data loading order is not strictly guaranteed
- Recommended file sizes: gzip-compressed files should be under 50 MB; uncompressed CSV/Parquet files should be 128 MB–256 MB
Related Documentation
- Pipe Overview — How it works and applicable scenarios
- Object Storage Pipe — Complete EVENT_NOTIFICATION mode configuration
- Kafka Pipe — READ_KAFKA parameter reference, offset management
- Pipe Syntax Reference — Complete DDL syntax
- Data Pipelines and Change Capture — Scenario selection and operation workflows
