Lakehouse Continuous Data Ingestion Guide (Pipe)
Overview
Pipe is Singdata Lakehouse's continuous data ingestion pipeline, supporting automatic and continuous data ingestion from Kafka or object storage (OSS/S3/COS) into Lakehouse tables. There is no need to manually configure scheduling tasks -- Pipe maintains consumption offsets and runs continuously. This guide is organized by business scenario to help you quickly master Pipe creation and monitoring methods.
Quick Navigation
- Create Kafka Pipe -- Continuously import data from Kafka topics
- Create OSS Pipe -- Continuously import files from object storage
- Check Pipe Status -- Monitor pipeline running status
- Manual Trigger Execution -- Execute a data import immediately
- Drop Pipe -- Clean up pipelines that are no longer needed
Related SQL Commands
| Command | Purpose | Applicable Scenario |
|---|---|---|
CREATE PIPE ... AS COPY INTO ... FROM READ_KAFKA(...) | Create a Kafka pipeline | Real-time logs, message queue ingestion |
CREATE PIPE ... AS COPY INTO ... FROM VOLUME ... | Create a Volume pipeline | Continuous object storage file import |
SHOW PIPES | View pipeline list | Monitor Pipe status |
DESC PIPE | View pipeline details | Check configuration and runtime information |
DROP PIPE | Drop a pipeline | Clean up unused pipelines |
Prerequisites
The following examples use simulated Kafka topics and Volume paths:
Create Kafka Pipe
Use CREATE PIPE with the READ_KAFKA function to define the logic for continuous Kafka import.
Parameter Description:
BATCH_INTERVAL_IN_SECONDS: Executes a batch import every 30 seconds.- The offset parameters in
READ_KAFKAare left empty, and Pipe automatically manages consumption offsets.
Create OSS Pipe
Use CREATE PIPE with a Volume to define the logic for continuous object storage file import.
Operating Modes:
- LIST_PURGE (default): Scans files in the Volume and auto-deletes them after import.
- LIST: Scans and imports, but preserves the source files.
Check Pipe Status
Use SHOW PIPES to view the running status of all Pipes.
Key Field Description:
status: Running status (RUNNING / SUSPENDED / FAILED)pipe_kind: Pipeline type (READ_KAFKA / VOLUME)copy_statement: The underlying COPY INTO statement
Manual Trigger Execution
Pipes run automatically based on BATCH_INTERVAL_IN_SECONDS by default, but can also be triggered manually via ALTER PIPE.
Drop Pipe
Use DROP PIPE to delete pipelines that are no longer needed.
Clean Up Test Data
After verifying the Pipe, it is recommended to clean up the test tables:
Notes
- Consumption Offset Management: Kafka Pipes automatically manage consumer group offsets. After restarts or failure recovery, consumption resumes from the last offset.
- Error Handling: Use the
max_errorsparameter to control the tolerated number of erroneous records. If the threshold is exceeded, the Pipe will pause and report an error. - VCluster Selection: It is recommended to use an
INTEGRATIONorGENERALtype VCluster to run Pipes. - Permission Requirements: Creating a Pipe requires
INSERTpermission on the target table and read permission on Kafka/Volume. - Idempotency: Pipes guarantee at-least-once semantics by default. It is recommended to design a primary key for the target table or use
MERGE INTOfor deduplication.
