Data Import into Lakehouse: Through Python Scripts
In modern data processing, importing data into a database is a common requirement. Using Python scripts for data import brings many advantages and is suitable for various scenarios, such as automation, data processing, and multi-database support. This article uses a PostgreSQL database as an example to demonstrate how to import data into Singdata Lakehouse.
Advantages
-
Automation: By creating automated tasks that run periodically or on demand, real-time data updates or migrations can be achieved.
-
Data Processing: Python has powerful data processing capabilities, allowing data to be preprocessed, cleaned, and transformed before importing into the database, improving data quality and import success rates.
-
Multi-Database Support: Python provides a wide range of database drivers and libraries, supporting relational databases (such as MySQL, PostgreSQL, SQLite, etc.) and non-relational databases (such as MongoDB, Redis, etc.).
-
Error Handling and Logging: When writing import scripts with Python, exceptions can be caught and detailed logs recorded, making it easier to track and resolve issues during the data import process.
Applicable Scenarios
- Import data from CSV or Excel files into a database.
- Preprocess data by applying statistical methods or standardization techniques before importing it into the database.
- Database migration tasks involving data validation and integrity checks.
Operation Guide
1. Import Required Python Packages
from sqlalchemy import create_engine
import pandas as pd
import psycopg2
import pygwalker as pyg
2. Create a Connection to the PostgreSQL Database and Read Source Table Data
def create_conn_pg(database: str, user: str, password: str, host: str, port: str):
connection = psycopg2.connect(
dbname=database,
user=user,
password=password,
host=host,
port=port
)
return connection
3. Use pd.read_sql_query() to Query Data from the Database and Store It in a DataFrame
conn_pg = create_conn_pg("<database>", "<username>", "<password>", "<host>", "<port>")
query = "SELECT * FROM public.orders;"
df = pd.read_sql_query(query, conn_pg)
conn_pg.close()
4. View PostgreSQL Data and Perform Cleaning and Visual Analysis
View total row count:
print(df.shape[0])
Replace all NaN values with 0:
df.fillna(0, inplace=True)
Display the first 5 rows:
print(df.head(5))
5. Use PyGWalker for Pivot Analysis (Optional)
walker = pyg.walk(df)
6. Create a Target Table in Singdata Lakehouse
engine_cz = create_engine("clickzetta://<username>:<password>@<instanceid>.<region_id>.api.singdata.com/<workspacename>?virtualcluster=<vcluster>&schema=<public>")
sql_cz = text("""
CREATE TABLE IF NOT EXISTS orders_tmp (
id INT,
user_id INT,
product_id INT,
subtotal DECIMAL(8, 2),
tax DECIMAL(8, 2),
total DECIMAL(16, 3),
discount DECIMAL(8, 2),
created_at TIMESTAMP,
quantity INT
);
""")
with engine_cz.connect() as conn:
results = conn.execute(sql_cz)
conn_cz = connect(
username="<username>",
password="<password>",
service="<region_id>.api.singdata.com",
instance="<instanceid>",
workspace="<workspacename>",
schema="public",
vcluster="default"
)
bulkload_stream = conn_cz.create_bulkload_stream(schema="public", table="orders")
writer = bulkload_stream.open_writer(0)
for index, row_data in df.iterrows():
row = writer.create_row()
row.set_value("id", row_data["id"])
row.set_value("created_at", row_data["created_at"])
row.set_value("user_id", row_data["user_id"])
row.set_value("product_id", row_data["product_id"])
row.set_value("subtotal", row_data["subtotal"])
row.set_value("tax", row_data["tax"])
row.set_value("total", row_data["total"])
row.set_value("discount", row_data["discount"])
row.set_value("quantity", row_data["quantity"])
writer.write(row)
writer.close()
bulkload_stream.commit()
8. Clean Up Resources
# Drop Singdata Lakehouse target table
sql_cz = text("""drop table if exists orders ;""")
with engine_cz.connect() as conn:
results = conn.execute(sql_cz)