Python Task Usage Practice
In the task development module, the Python task provided is a specific task type under a lightweight resource container, designed to run Python code. It offers environment isolation between tasks and has basic environment customization capabilities. This article introduces some usage practices for Python tasks.
Runtime Environment and Customization
Python tasks are executed in a system preset Pod environment, with the pre-installed Python version being Python 3 (current version is 3.9.2, which may be updated in the future).
The default system image includes some commonly used dependency packages to support connection and data access with Singdata Lakehouse, as well as operations on object storage services such as Alibaba Cloud OSS and Tencent Cloud COS. These dependencies include but are not limited to:
- clickzetta-connector
- clickzetta-sqlalchemy
- cos-python-sdk-v5
- numpy
- oss2
- pandas
- six
- urllib2
- ...
To meet specific runtime requirements, the Pod environment provides limited environment customization capabilities. You can perform custom installations under the /home/system_normal
path. Below is a sample code snippet demonstrating how to install custom packages (lines 4 and 5) and use them in the Python environment. Please note that after the Python task is completed, the Pod environment will be destroyed, so any environment customizations will not be retained.
import os
import sys
os.system('pip install some_custom_package')
os.system('pip install another_custom_package')
import some_custom_package
import another_custom_package
import subprocess
import sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "mysql-connector-python","--target", "/home/system_normal", "-i", "https://pypi.tuna.tsinghua.edu.cn/simple"])
sys.path.append('/home/system_normal')
import mysql.connector
# Create connection
connection = mysql.connector.connect(
host='127.0.0.1', # Database host address
user='****', # Database username
password='***********', # Database password
database='demo' # Name of the database to connect to
)
# Create cursor
cursor = connection.cursor()
# Execute query
query = "show tables" # Replace with your SQL query
cursor.execute(query)
# Fetch query results
results = cursor.fetchall()
print("Query results:")
for row in results:
print(row)
# Close cursor and connection
cursor.close()
connection.close()
Adjusting Runtime Resource Size
By default, the Pod provides 0.5 CPU cores and 512MB of memory resources. If needed, you can adjust the resource allocation in the task scheduling configuration using the following parameters:
- pod.limit.cpu: Set the number of CPU cores. It must be a value greater than 0, such as 1, with a maximum setting of 4. The default value is 0.5.
- pod.limit.memory: Set the memory size, formatted as a value followed by a unit, such as 2G, with a maximum setting of 8G. The default value is 512M.
By configuring these parameters reasonably, you can ensure that Python tasks have sufficient resources to meet different computational needs while avoiding resource waste.
More Use Cases
Querying Lakehouse Data Using Python Database API
from clickzetta import connect
# Establish connection
conn = connect(
username='your_username',
password='your_password',
service='api.sindata.com',
instance='your_instance',
workspace='your_workspace',
schema='public',
vcluster='default'
)
# Create cursor object
cursor = conn.cursor()
# Execute SQL query
cursor.execute('SELECT * FROM clickzetta_sample_data.ecommerce_events_history.ecommerce_events_multicategorystore_live LIMIT 10;')
# Fetch query results
results = cursor.fetchall()
for row in results:
print(row)
Using SQLAlchemy Interface to Query Lakehouse Data
from sqlalchemy import create_engine
from sqlalchemy import text
# Create an instance of the SQLAlchemy engine for ClickZetta Lakehouse
engine = create_engine(
"clickzetta://username:password@instance.api.singdata.com/workspace?schema=schema&vcluster=default"
)
# Execute SQL query
sql = text("SELECT * FROM ecommerce_events_multicategorystore_live;")
# Execute the query using the engine
with engine.connect() as conn:
result = conn.execute(sql)
for row in result:
print(row)
Using Python to Batch Upload Data to Lakehouse
from clickzetta import connect
conn = connect(
username='your_username',
password='your_password',
service='api.singdata.com',
instance='your_instance',
workspace='your_workspace',
schema='public',
vcluster='default'
)
bulkload_stream = conn.create_bulkload_stream(schema='public', table='bulkload_test')
writer = bulkload_stream.open_writer(0)
for index in range(1000000):
row = writer.create_row()
row.set_value('i', index)
row.set_value('s', 'Hello')
row.set_value('d', 123.456)
writer.write(row)
writer.close()
bulkload_stream.commit()