A Comprehensive Guide to Importing Data into Singdata Lakehouse

Data Ingestion: Loading Files from the Web into the Lakehouse via Singdata Lakehouse Studio's Built-in Python Node

Overview

Singdata Lakehouse Studio comes with a built-in Python node that allows you to develop and run Python code.

Use Case

This is suitable for scenarios where third-party Python libraries need to be called to process files during data ingestion. For example, in this case, the Python library for Alibaba Cloud Object Storage Service (OSS) is called.

Implementation Steps

Create a New Python Task

Navigate to Development -> Tasks, click "+", and create a new Python task.

Task Name: 05_Loading Files from the Web into the Lakehouse via Studio's Built-in Python Node.

Develop Python Task Code

Paste the following code into the code editor of the newly created Python task:

import os,io
import subprocess
from datetime import datetime, timedelta
import oss2

# Alibaba Cloud OSS configuration
ACCESS_KEY_ID = '${ak}'
ACCESS_KEY_SECRET = '${sk}'
BUCKET_NAME = 'yourbucketname'
ENDPOINT = 'oss-cn-hangzhou-internal.aliyuncs.com'
ROOT_PATH = f'{BUCKET_NAME}/ingest_demo/from_web'

try:
    # Construct wget command
    url = f"https://github.com/yunqiqiliang/clickzetta_quickstart/blob/main/a_comprehensive_guide_to_ingesting_data_into_clickzetta/data/lift_tickets_data.csv.gz"
    cmd = ["wget", "-qO-", url]
    print(f"wget cmd: {cmd}")

    # Execute wget command and capture output
    wget_output = subprocess.check_output(cmd)
    print(f"Wget file done...")

    # Convert output to an in-memory file object
    file_obj = io.BytesIO(wget_output)
except Exception as e:
    print(f"An error occurred: {e}")
    file_obj = None
    raise

if file_obj:
    try:
        # Initialize Alibaba Cloud OSS
        auth = oss2.Auth(ACCESS_KEY_ID, ACCESS_KEY_SECRET)
        bucket = oss2.Bucket(auth, ENDPOINT, BUCKET_NAME)

        # Upload file to OSS
        oss_path = f"{ROOT_PATH}/lift_tickets_data.csv.gz"
        print(f"osspath: {oss_path}")
        bucket.put_object(oss_path, file_obj)
        print(f"Put file to oss done...")
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        # Close the in-memory file object
        file_obj.close()
Configure Parameters for the Task

There are two parameters:

ACCESS_KEY_ID = '${ak}'ACCESS_KEY_SECRET = '${sk}'

Click on the schedule to fill in the default values for the parameters:

Click "Load Parameters from Code" and fill in the corresponding values:

Run the Test

Click "Run" to execute the Python code.

Check the Upload Results

Log in to Alibaba Cloud Object Storage to view the uploaded files.

Next Steps

  • Schedule Python tasks to achieve periodic data lake ingestion
  • Analyze the files loaded into the data lake using SQL
  • Form a complete ELT workflow with other tasks

Resources

Studio Python Task Node