Continuous Data Import from Object Storage Using Pipe
Pipe is a powerful data import feature in the Lakehouse platform. It allows users to read data directly from object storage at a fixed frequency and import it into the Lakehouse. By implementing a file detection mechanism, Pipe supports micro-batch processing to load files, enabling users to quickly access the latest data. It is particularly suitable for scenarios requiring real-time or near-real-time data processing.
How Pipe Works
-
File Detection:
- EVENT_NOTIFICATION_MODE: Requires enabling message service, using Alibaba Cloud Message Service to notify Lakehouse of new file uploads. Currently, only Alibaba Cloud OSS and AWS S3 are supported.
- LIST_PURGE mode: Periodically scans directories, synchronizes unrecorded files, and deletes the original files after synchronization.
-
COPY Statement: Defines the source location of the data files and the target table, supporting multiple file formats.
-
Automated Loading: Automatically detects new files and executes the COPY statement.
-
Duplicate Import Prevention Mechanism: To avoid duplicate imports, the load_history function records the copy import history files of the current table. When Pipe executes, it deduplicates based on the load_history table name and the import file name, ensuring that existing files are not imported again. If you need to import already recorded files, you can manually execute the copy command. The load_history records are currently retained for 7 days.
-
Pipe Import Job History: Since each execution is a Pipe-issued copy, you can view all operations in the job history. By filtering with the query_tag in the job history, all pipe-executed copy jobs will be tagged with the format
pipe.``workspace_name``.schema_name.pipe_name
, making it easy to track and manage.
Use Cases
- Real-time Data Synchronization: When your data is stored in object storage and needs to be frequently synchronized to obtain the latest data in a timely manner.
- Cost Optimization: Importing and exporting data on object storage can avoid incurring network public traffic costs. Especially within the same region, you can specify object storage for intranet transmission, further reducing costs.
Notes
- When using EVENT_NOTIFICATION_MODE, you need to use the role arn authorization method to create the storage connection.
- LIST_PURGE mode supports both key and role arn authorization methods.
- Recommended File Size: gzip compressed files are recommended to be 50MB. Uncompressed CSV and PARQUET files are recommended to be between 128MB and 256MB.
- Data Loading Order: Data loading cannot guarantee strict order.
- Pipe Latency: Pipe loading time is affected by various factors, including file format, size, and the complexity of the COPY statement.
Cost
Charged based on the computing resources used when loading files.
PIPE Syntax
<pipe_name>
: The name of the Pipe object you want to create.VIRTUAL_CLUSTER
: Specify the name of the virtual cluster.INGEST_MODE
: Set toLIST_PURGE
orEVENT_NOTIFICATION
to determine the data import mode.COPY_JOB_HINT
: Optional, Lakehouse reserved parameterIGNORE_TMP_FILE
: Values can betrue
orfalse
, with the default value beingtrue
. This parameter supports filtering files or directories that start with a dot (.
) or_temporary
. For example,s3://my_bucket/a/b/.SUCCESS
,oss://my_bucket/a/b/_temporary/
, oross://my_bucket/a/b/_temporary_123/
.
Supported File Formats
Refer to COPY INTO import.
Using PIPE Load Cases
Using Scan File Mode
Specific Steps
Step 1: Create connection and volume
Step 2: Execute the copy command separately to see if it can be imported successfully
步骤 2: 单独执行copy命令看是否可以导入成功
Step 3: Use the above statement to build the pipe object
Step 4: View Pipe Execution History and Imported Files
- View the execution status of pipe copy jobs
Filter through the job history using the query_tag. All copy jobs executed by the pipe will be tagged in the query_tag with the format: pipe.workspace_name.schema_name.pipe_name
- View the history of files imported by copy jobs
Using Message Service Notification Mode (Only Supports Alibaba Cloud OSS and AWS S3)
Step 1: Enable Alibaba Cloud Message Service (MNS)
- Enable the Message Service MNS in the Alibaba Cloud console.
- Configure MNS to listen to the OSS (Object Storage Service) folder to be synchronized. Refer to the specific documentation
Step 2: Authorize Lakehouse to Read OSS Refer to the method of using role arn Alibaba Cloud Storage Connection Creation to authorize Lakehouse to read the corresponding OSS Bucket.
Step 3: Authorize MNS to Lakehouse
Step 4: Create Storage Connection
Step 5: Create Volume
Step 6: Create Pipe