Installation
-
Uninstall Old Versions
If you have previously installed an older version of the SDK, uninstall the old versions of the
clickzetta-connector
andclickzetta-sqlalchemy
packages:- Note: The
clickzetta-connector
package will no longer be maintained in future versions. Please install theclickzetta-connector-python
andclickzetta-ingestion-python
packages as needed.
- Note: The
-
Install clickzetta-ingestion-python-v2
Install the
clickzetta-ingestion-python-v2
package, which requires Python version 3.6 or higher:
Real-time Write Principle
The real-time write feature of the Lakehouse Python SDK is an efficient data stream processing tool that allows users to upload and store data in Lakehouse in real-time. Here's how it works:
- SDK Uploads Data: Users upload data to the Lakehouse Ingestion Service via the Python SDK.
- Ingestion Service Processing: Upon receiving the data, the Ingestion Service writes it directly to the Lakehouse table. At this stage, the data is stored as temporary intermediate files, known as a hybrid table.
- Query Real-time Data: Before the data is committed, users can query (select) the newly written data. However, this data is not visible to table streams, materialized views, or dynamic tables.
- Data Commit: The newly written data is automatically committed after approximately one minute. After commitment, table streams, materialized views, and dynamic tables can read this data.
- Hybrid Table Becomes a Regular Table: After data commitment, a background process merges the hybrid table into a regular table. Once the merge is complete, users can perform update operations (update, merge, delete).
Applicable Scenarios
The real-time write feature of the Lakehouse Python SDK is suitable for the following scenarios:
- Short-Interval Data Import: If your application requires data import at very short intervals (e.g., 5 minutes or less), the real-time write SDK can meet your needs.
- Frequent Small Data Submissions: For applications that need to submit data frequently but in small volumes each time, the real-time write SDK provides an efficient solution.
- Real-time Data Analysis: The real-time write SDK is suitable for applications that require immediate data analysis and response, such as real-time monitoring, event tracking, and real-time reporting.
Notes
- Data written in real-time can be queried within seconds.
- Real-time data currently only supports schema change perception through the Flink Connector's sink operator (single concurrency) provided internally. For schema changes in other scenarios, you need to stop the real-time write task, wait for some time after the schema change (approximately 90 minutes), and then restart the task.
- Table streams, materialized views, and dynamic tables can only display committed data. Since real-time task data takes about 1 minute to confirm, table streams also need to wait 1 minute to see the data.
Data Type Support
The Lakehouse Python SDK supports the following data type mappings:
SQL Data Type | Python Internal Data Structure |
---|---|
BOOLEAN | bool |
STRING / JSON | str |
CHAR(n) / VARCHAR(n) | str (truncated on overflow) |
BINARY | bytes |
DECIMAL | Decimal |
INT8 | int |
INT16 | int |
INT32 | int |
INT64 | int |
FLOAT | float |
DOUBLE | float |
DATE | date |
TIMESTAMP_LTZ | datetime(tz=timezone) |
TIMESTAMP_NTZ | datetime |
INTERVAL_DAY_TIME | interval_day_time |
INTERVAL_YEAR_MONTH | - |
ARRAY | list |
MAP | map |
STRUCT | json or collections.namedtuple |
Creating a Real-time Data Stream via Client
To create a real-time data stream, you first need to connect using the Lakehouse client:
Parameter Description:
-
operate: Pass an enumeration value. The real-time interface supports
RealtimeOperation.APPEND_ONLY
andRealtimeOperation.CDC
.- Use
RealtimeOperation.APPEND_ONLY
for regular tables. - Primary key tables must use
RealtimeOperation.CDC
.
- Use
-
options: Used to pass parameters for the real-time write stream. See the detailed options below.
Options
In the Python SDK, you can configure the parameters of the real-time write stream using the RealtimeOptionsBuilder
class. These parameters are optional and it is recommended to use the default values.
Flush Control
Parameter Name | Default Value | Description |
---|---|---|
with_flush_mode | AUTO_FLUSH_BACKGROUND | Data flush policy. Options: - AUTO_FLUSH_SYNC: Synchronous flush (blocking, ensures order) - AUTO_FLUSH_BACKGROUND: Asynchronous flush (high throughput) - MANUAL_FLUSH: Manual trigger flush ⚠️ Primary key table restriction: PK tables must use AUTO_FLUSH_SYNC mode |
Buffer Configuration
Parameter Name | Default Value | Unit | Description |
---|---|---|---|
with_mutation_buffer_lines_num | 1000 | rows | Row threshold: Maximum number of rows per buffer, triggers flush when reached |
with_mutation_buffer_space | 10MB (1010241024) | bytes | Space threshold: Maximum memory usage per buffer, triggers flush when either threshold is reached |
with_mutation_buffer_max_num | 50 | buffers | Buffer pool capacity: Number of buffers allowed to exist simultaneously (similar to connection pool mechanism) |
Timed Flush
Parameter Name | Default Value | Description |
---|---|---|
with_flush_interval | 10 seconds | Maximum delay: Forced flush interval when buffer is not full |
Retry Mechanism Parameters
Parameter Name | Default Value | Description |
---|---|---|
with_request_failed_retry_enable | TRUE | Whether to enable retry mechanism for failed requests |
with_request_failed_retry_times | 5 | Maximum retry attempts per operation |
with_request_failed_retry_internal_ms | 5000ms | Retry interval (milliseconds) |
Advanced Configuration
Parameter Name | Default Value | Description |
---|---|---|
with_request_failed_retry_status | THROTTLED,INTERNAL_ERROR,FAILED,PRECHECK_FAILED | Retry trigger conditions: - THROTTLED: Flow control - INTERNAL_ERROR: Server-side error - FAILED: General failure - PRECHECK_FAILED: Pre-check failure |
with_request_failed_retry_log_debug_enable | TRUE | Whether to log detailed retry information (DEBUG level) |
Writing Data (Row)
Create a specific data object (Row) using the stream.create_row
method and encapsulate the data into the Row object using the row.set_value
method.
Row Type Description:
- When the Stream is created with
RealtimeOperation.APPEND_ONLY
, onlyRowOperator.INSERT
type Rows can be created. - When the Stream is created with
RealtimeOperation.CDC
, the following Row types can be used:RowOperator.UPSERT
: Insert a row, update the row if it already exists.RowOperator.DELETE_IGNORE
: Delete a row, ignore if the target row does not exist.
Note: Primary key tables (PK tables) must use CDC mode and can only use UPSERT and DELETE_IGNORE operation types.
Data Commit to Server
By calling the stream.flush()
method, data is immediately committed to the server. If this method is not explicitly called, data will be automatically committed based on one of the following conditions:
- Reaching the buffer size set by
with_mutation_buffer_space
. - Reaching the row count set by
with_mutation_buffer_lines_num
. - Reaching the time interval set by
with_flush_interval
.
Example
Primary Key Table (PK Table) Write Example
Common Issues and Solutions
1. Primary Key Table Write Failure
Issue: Error when writing data to a primary key table.
Solution:
- Ensure the correct operation type is used: Primary key tables must use
RealtimeOperation.CDC
mode and can only useRowOperator.UPSERT
andRowOperator.DELETE_IGNORE
operations. - Ensure the primary key field is correctly set.
- Primary key tables do not support
FlushMode.AUTO_FLUSH_BACKGROUND
; it will automatically reset toFlushMode.AUTO_FLUSH_SYNC
. - Partition columns must be a subset of the primary key.
2. High Memory Usage
Issue: High memory usage when writing large amounts of data.
Solution:
- Reduce the values of
with_mutation_buffer_lines_num
andwith_mutation_buffer_space
. - Call
stream.flush()
regularly to manually flush data, but avoid flushing too frequently as it may result in a large number of small files. - Consider writing data in batches instead of writing large amounts of data at once.
Summary
The real-time write feature of the ClickZetta Python SDK provides an efficient and flexible way to write data, supporting various data types and operation modes. By configuring parameters appropriately, you can optimize write performance for different scenarios.
For primary key tables and regular tables, different operation modes and row operation types are required. Primary key tables must use CDC mode and can only use UPSERT and DELETE_IGNORE operations; regular tables typically use APPEND_ONLY mode and INSERT operations.