Complete Guide to Importing Data into Singdata Lakehouse

Data Ingestion: Real-time Multi-table Synchronization via Singdata Lakehouse Studio (CDC, Public Network Connection)

Overview

Use Case

Existing data sources (including databases and data warehouses) have publicly accessible addresses (such as through public NAT mapping), require simultaneous multi-table synchronization, have high data freshness requirements (often at the minute or even second level), and can accept higher synchronization costs to synchronize data from source tables to Lakehouse tables.

Singdata Lakehouse Studio's real-time multi-table synchronization supports multi-table merging, allowing data from multiple source tables in a structured system to be merged into a single Singdata Lakehouse target table.

Implementation Steps

Create a New Real-time Multi-table Synchronization Task

Navigate to Development -> Tasks, click "+", select "Real-time Multi-table Synchronization", and create a new real-time multi-table synchronization task.

Task Name: 06_multi_table_rt_sync_from_pg

Select "PostgreSQL" as the Source Data

Select Synchronization Objects

Singdata Lakehouse Studio will automatically synchronize the source database structure for selection:

Database CDC Configuration

Create a Slot

Plugin Type: pgoutput

Slot Name: slot_for_multi_table_ingest_demo

Target Configuration

Rule Expression: multitable_sync_{SOURCE_TABLE}. Add the prefix multitable_sync_{SOURCE_TABLE} to the target table to distinguish it from tables synchronized by other methods.

Check field mapping to ensure all mappings are successful:

Configure Synchronization Rules

Submit

Submit the real-time multi-table synchronization task after configuration:

Operations and Maintenance

Maintain the real-time multi-table synchronization task:

Start

Start the real-time multi-table synchronization task:

For the first start of this real-time multi-table synchronization task, select "Stateless Start" and "Full Data Synchronization":

After starting, you can view the detailed status of the task. Once the full synchronization is complete, the system will automatically begin incremental real-time synchronization.

Progress Monitoring

Full synchronization in progress:

Full synchronization complete.

Real-time synchronization begins:

Incremental Real-time Synchronization

Insert incremental data into the PG database for incremental real-time synchronization. Create a new file in VS Code named “rt_data_generate_insert_into_pg.py” and copy the following code into it:

import os
import sys
import rapidjson as json
import optional_faker as _
import uuid
import random
import time
import psycopg2
from faker import Faker
from datetime import date, datetime, timedelta
from dotenv import load_dotenv

load_dotenv()
fake = Faker('zh_CN')  # Use Chinese locale
resorts = ["大董烤鸭", "京雅堂", "新荣记", "仿膳饭庄", "全聚德", 
           "利群烤鸭店", "鼎泰丰", "海底捞", "江苏会所", "店客店来",
           "周黑鸭", "夜上海", "香宫", "长安壹号", "翡翠餐厅", "北京饭店",
           "四川豆花饭庄", "海底捞火锅", "川办餐厅", "南门火锅",
           "胡同", "翠园", "利苑酒家", "御宝轩", "金鼎轩", 
           "外婆家", "大董", "顺峰海鲜酒家", "小龙坎火锅",
           "新大陆中餐厅", "京兆尹", "鼎泰丰(台湾)", "滇池来客", 
           "绿波廊", "南美时光"]

# Load database credentials from environment variables
DB_NAME = 'postgres'
DB_USER = 'postgres'
DB_PASSWORD = 'postgres'
DB_HOST = 'localhost'
DB_PORT = '5432'

def connect_db():
    conn = psycopg2.connect(
        dbname=DB_NAME,
        user=DB_USER,
        password=DB_PASSWORD,
        host=DB_HOST,
        port=DB_PORT
    )
    return conn

def random_date_in_2025():
    start_date = date(2025, 1, 1)
    end_date = date(2025, 12, 31)
    return start_date + timedelta(days=random.randint(0, (end_date - start_date).days))

def random_datetime_between(start_year, end_year):
    start_datetime = datetime(start_year, 1, 1)
    end_datetime = datetime(end_year, 12, 31, 23, 59, 59)
    random_seconds = random.randint(0, int((end_datetime - start_datetime).total_seconds()))
    return start_datetime + timedelta(seconds=random_seconds)

def insert_lift_ticket(cursor, lift_ticket):
    cursor.execute("""
        INSERT INTO ingest_demo.lift_tickets_data (txid, rfid, resort, purchase_time, expiration_time, days, name, address_street, address_city, address_state, address_postalcode, phone, email, emergency_contact_name, emergency_contact_phone)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """, (
        lift_ticket['txid'], lift_ticket['rfid'], lift_ticket['resort'],
        lift_ticket['purchase_time'], lift_ticket['expiration_time'],
        lift_ticket['days'], lift_ticket['name'], lift_ticket['address_street'],
        lift_ticket['address_city'], lift_ticket['address_state'],
        lift_ticket['address_postalcode'], lift_ticket['phone'],
        lift_ticket['email'], lift_ticket['emergency_contact_name'],
        lift_ticket['emergency_contact_phone']
    ))

def generate_lift_ticket():
    global resorts, fake
    lift_ticket = {
        'txid': str(uuid.uuid4()),
        'rfid': hex(random.getrandbits(96)),
        'resort': fake.random_element(elements=resorts),
        'purchase_time': random_datetime_between(2021, 2024),
        'expiration_time': random_date_in_2025(),
        'days': fake.random_int(min=1, max=7),
        'name': fake.name(),
        'address_street': fake.street_address(),
        'address_city': fake.city(),
        'address_state': fake.province(),
        'address_postalcode': fake.postcode(),
        'phone': fake.phone_number(),
        'email': fake.email(),
        'emergency_contact_name': fake.name(),
        'emergency_contact_phone': fake.phone_number(),
    }
    return lift_ticket

def main(total_count, batch_size, sleep_time):
    conn = connect_db()
    cursor = conn.cursor()
    
    batch_data = []
    for _ in range(total_count):
        lift_ticket = generate_lift_ticket()
        batch_data.append(lift_ticket)
        
        if len(batch_data) >= batch_size:
            for ticket in batch_data:
                insert_lift_ticket(cursor, ticket)
            conn.commit()
            batch_data = []
            time.sleep(sleep_time)
    
    # Insert any remaining data
    if batch_data:
        for ticket in batch_data:
            insert_lift_ticket(cursor, ticket)
        conn.commit()

    cursor.close()
    conn.close()

if __name__ == "__main__":
    if len(sys.argv) < 4:
        print("Please provide total rows, rows per batch, and sleep seconds per batch. For example: python rt_data_generate_insert_into_pg.py 1000 100 10")
        sys.exit(1)
    
    total_count = int(sys.argv[1])
    batch_size = int(sys.argv[2])
    sleep_time = int(sys.argv[3])
    
    main(total_count, batch_size, sleep_time)

In VS Code, create a new "Terminal" and run the following command to activate the Python environment created in the "Environment Setup" step. If you are already in the cz-ingest-examples environment, please skip this step.

conda activate cz-ingest-examples

Then run the following command in the same terminal:

Insert 100,000 rows of data into the ingest_demo.lift_tickets_data table, inserting 100 rows at a time, with a 10-second sleep between each batch.

python rt_data_generate_insert_into_pg.py 100000 100 10

In Singdata Lakehouse Studio, view the real-time synchronization progress:

Next Steps

Insert new data into the data source to view the results of incremental synchronization.

"Stop" the running of this synchronization task. The virtual compute cluster set for this task has an automatic stop feature, which will stop running within the "auto stop" seconds after the job stops, saving costs and achieving on-demand operation.

Resources

Real-time Data Writing

Multi-table Real-time Synchronization

Capture Change Data (CDC) and Data Processing through Multi-table Real-time Synchronization and Dynamic Tables in Singdata Lakehouse