A Comprehensive Guide to Importing Data into Singdata Lakehouse

Data Ingestion: Using ZettaPark PUT File to Ingest Data

Overview

Using the Zettapark Python library provided by Singdata Lakehouse, you can upload the data generated in the test data generation step to the data lake managed by Singdata Lakehouse through Python programming, achieving data ingestion.

Data lake operations require creating a new data lake connection and Volume, and then you can Put data into the data lake.

Use Cases

Suitable for those familiar with Python programming, leveraging Python's powerful programming capabilities and flexibility to perform data cleaning, transformation, and other data engineering and preparation tasks using Python and Dataframe, especially tasks closely related to AI analysis.

Implementation Steps

You can also download the file directly to your local machine.

Put Local Files into the Data Lake Managed by Singdata Lakehouse (Volume) via Zettapark
# !pip install clickzetta_zettapark_python  -i https://pypi.tuna.tsinghua.edu.cn/simple
from clickzetta.zettapark.session import Session
import json,requests
import os
from datetime import datetime
Create a Session to Singdata Lakehouse
# Read parameters from the configuration file
with open('config/config-ingest.json', 'r') as config_file:
    config = json.load(config_file)

print("Connecting to Singdata Lakehouse.....\n")

# Create session
session = Session.builder.configs(config).create()

print("Connection successful!...\n")

Connecting to Singdata Lakehouse.....

Connection successful!...

PUT file to Singdata Lakehouse Data Lake Volume

Please change 'data/' to the directory where the data generated in the 'Test Data Generation' step is stored.

for filename in os.listdir("data/"):
        if filename.endswith(".gz"):
            file_path = os.path.join("data/", filename)
            session.file.put(file_path,"volume://ingest_demo/gz/")
        if filename.endswith(".csv"):
            file_path = os.path.join("data/", filename)
            session.file.put(file_path,"volume://ingest_demo/csv/")
        if filename.endswith(".json"):
            file_path = os.path.join("data/", filename)
            session.file.put(file_path,"volume://ingest_demo/json/")
# Or upload all files in the directory
# session.file.put("../data/","volume://ingest_demo/gz/")
##### **Resynchronize the Data Lake Volume Directory to the Lakehouse**
```Python
session.sql(alter_datalake_sql).show()

---------------------

|result_message |

---------------------

|OPERATION SUCCEED |

---------------------

Check the files on the Singdata Lakehouse data lake Volume again, the data has been successfully ingested
results = session.sql("select * from directory(volume ingest_demo)").show()

----------------------------------------------------------------------------------------------------------------------------

|relative_path |url |size |last_modified_time |

----------------------------------------------------------------------------------------------------------------------------

|gz/lift_tickets_data.csv.gz |oss://yourbucketname/ingest_demo/gz/lift_ticket... |9717050 |2024-12-27 19:24:21+08:00 |

|gz/lift_tickets_data.json.gz |oss://yourbucketname/ingest_demo/gz/lift_ticket... |11146044 |2024-12-27 19:24:19+08:00 |

----------------------------------------------------------------------------------------------------------------------------

Test pulling files from the data lake back to local
session.file.get("volume://ingest_demo/gz/lift_tickets_data.json.gz","tmp/gz/")

[GetResult(file='tmp/gz/lift_tickets_data.json.gz', size=11146044, status='DOWNLOADED', message='')]

Verify the number of rows in the data lake file

Data validation, check the number of rows in the file. The query result is 100000, which is the same as the number of rows in the original file. From the perspective of the number of rows, the data has been correctly ingested into the lake.

datalake_data_verify_sql = """
select count() from volume ingest_demo (txid string) using csv
 options(
    'header'='true',
    'sep'=',',
    'compression' = 'gzip'
 ) files('gz/lift_tickets_data.csv.gz')
 limit 10
"""
session.sql(datalake_data_verify_sql).show()

-------------

|count() |

-------------

|100000 |

-------------

Query data in the data lake file
datalake_data_analytics_sql = """
select * from volume ingest_demo (txid string,name string, address_state string) using csv
 options(
    'header'='true',
    'sep'=',',
    'compression' = 'gzip'
 ) files('gz/lift_tickets_data.csv.gz')
 limit 10
"""
session.sql(datalake_data_analytics_sql).show()

-------------------------------------------------------------------------------------

|txid |name |address_state |

-------------------------------------------------------------------------------------

|80a7a77b-4941-46f3-bf1a-760bb46f12da |0xbb6eabaf2eb3c3d2ea164eba |Xin Rong Ji |

|976b4512-1b07-43f4-a8e4-1fe86a7e1ee4 |0xa08ab7945cf87fc0b5095dc |Da Dong Roast Duck |

|4c49f5cc-0bd4-4a7e-8f61-f4a501a0dd24 |0xdf7bd805b890815a4e0a008c |Jing Ya Tang |

|8579071f-1c8b-4214-9a4d-096e6403bc52 |0x3113aa5ae86c522f3176829e |New Mainland Chinese Restaurant |

|31962471-ad3b-463d-ab36-d1b1ab041a36 |0x28c6168f44e09cacd82ecfe9 |Shun Feng Seafood Restaurant |

|f253d271-092d-4261-8703-a440cc149c39 |0xab306bea9de6a13426361153 |Chang An No.1 |

|5e52e443-2c03-4ce2-a95d-992d7cb3f54e |0x52000c48116d3a4667c3b607 |Yu Bao Xuan |

|e45f3806-972c-4617-b4ab-f2cbfc449de1 |0x247dd8c03cab559125a63d1b |Dian Ke Dian Lai |

|9abeadfa-ecac-42fb-9dd7-33377e2e5387 |0x9824bf4d4f7e12590f692148 |Chuan Ban Restaurant |

|c8938377-27a0-4f1f-9800-00c169729fd3 |0x4b65182989de9a3d13943b10 |Nan Men Hotpot |

-------------------------------------------------------------------------------------

Close Zettapark Session
session.close()

Next Steps Recommendations

  • Clean and transform data using Zettapark in Dataframe format
  • Call ML and LLM related interfaces in Python code to deeply integrate Data+AI
  • Analyze data in data lake files using SQL in Singdata Lakehouse Studio

Documentation

Zettapark Quick Start