Building an Analysis-Oriented Modern Data Stack Based on Singdata Lakehouse

This article introduces how to build an analysis-oriented Modern Data Stack based on Singdata Lakehouse, Metabase, and MindsDB.

Solution Architecture

Features of the Modern Data Stack solution based on Singdata Lakehouse:

  • Implement AWS data warehouse through the data lake, using Singdata Lakehouse to achieve optimized integration of lake and warehouse, significantly reducing data storage costs, computing costs, and maintenance costs.
  • Unlimited storage and efficient migration. The entire data channel uses cloud object storage to achieve a storage-compute separation architecture, avoiding the bandwidth and storage capacity bottlenecks of intermediate server nodes in traditional solutions.
  • Singdata Lakehouse + Metabase, achieving simplified BI data analysis, allowing visual data exploration and analysis with just two mouse clicks, greatly lowering the threshold for business personnel to analyze data, making it very friendly for business personnel.
  • Singdata Lakehouse + MindsDB, achieving 100% SQL-based AI and LLM enhanced analysis. Without mastering other complex languages, data engineers and BI analysts can achieve AI and LLM enhanced analysis based on SQL.
  • Reducing the requirements for technical personnel across the entire data stack (cloud infrastructure, data lake, data warehouse, BI, AI), lowering the human resource threshold for enterprises, and improving talent accessibility.

  The entire solution focuses on simplicity and ease of use, helping enterprises shift their focus from data infrastructure to data analysis, achieving modernization of data analysis.

The above diagram shows how we will migrate and build the Modern Data Stack, summarized as follows:

  • Use the Reshift unload command to unload data to Parquet files on an S3 bucket.
  • Load data directly from Parquet files in the AWS S3 bucket into Singdata Lakehouse tables using Singdata Lakehouse SELECT * FROM VOLUME, achieving rapid data warehousing (in this example, loading a table with over 20 million rows takes only 30 seconds).
  • BI application: Explore and analyze data through Metabase (from table to dashboard with just two mouse clicks, yes, just two mouse clicks).
  • AI application: Predict house prices through MindsDB (100% SQL-based model prediction).

Solution Components

  • AWS:

    • Redshift
    • S3
  • Singdata

    • Singdata Lakehouse, a multi-cloud and integrated data platform. It adopts a fully managed SaaS service model, providing enterprises with a simplified data architecture.
    • Singdata Lakehouse Driver for Metabase.
    • Singdata Lakehouse Connector for MindsDB.
  • Data Analysis

    • Metabase with Lakehouse Driver on Docker. Metabase is a complete BI platform, but its design philosophy is very different from Superset. Metabase focuses heavily on the experience of business personnel (such as product managers, marketing operators) when using this tool, allowing them to freely explore data and answer their own questions.
    • MindsDB with Lakehouse Connector on Docker. MindsDB can directly model in Singdata Lakehouse, eliminating the professional steps of data processing and building machine learning models. Data analysts and BI analysts can use it out of the box without being familiar with data engineering and modeling knowledge, lowering the modeling threshold, making everyone a data analyst, and everyone can apply algorithms.
    • Zeppelin with Lakehouse JDBC Interpreter on Docker
    • Zeppelin with MySQL JDBC Interpreter on Docker (connecting to MindsDB's MySQL interface)

Why Use Singdata Lakehouse?

  • Fully Managed: Singdata Lakehouse provides a fully managed, cloud-based Lakehouse service that is easy to use and scale. This means you don't have to worry about managing and maintaining your own hardware and infrastructure, avoiding time-consuming and expensive costs, and achieving peace of mind.
  • Cost Savings: Compared to Redshift, the total cost of ownership (TCO) of Singdata Lakehouse is usually lower because it charges based on usage without requiring upfront commitments. Singdata Lakehouse's highly flexible pricing model allows you to pay only for the resources you actually use, rather than being locked into a fixed cost model.
  • Scalability: Singdata Lakehouse is designed to handle large amounts of data and can scale up or down as needed, making it a good choice for enterprises with significant fluctuations in computing loads. Singdata Lakehouse data is stored in cloud object storage services, achieving "unlimited scalability" in data scale.
  • Performance: Singdata Lakehouse adopts a Single Engine All Data architecture, separating compute and storage, enabling it to process query tasks faster than Redshift.
  • Easy to Use: Singdata Lakehouse provides an integrated platform for data integration, development, operations, and governance, making development and management easier without the need for complex scheme integration.
  • Data Source Support: Singdata Lakehouse supports multiple data sources and formats, including structured and semi-structured data. In most cases, BI and AI applications can be developed using only SQL.
  • Data Integration: The built-in data integration of Singdata Lakehouse supports a wide range of data sources, making data loading and preparation for analysis easier. Overall, migrating to Singdata Lakehouse can help you save time and money, and enable you to handle and analyze data more easily and effectively.

Implementation Steps

Data Extraction (E)

Unload Housing Price Sales Data from Redshift to S3

Redshift UNLOAD command: Use Amazon S3 server-side encryption (SSE-S3) to unload query results into one or more text, JSON, or Apache Parquet files on Amazon S3.

UNLOAD ('select-statement')
TO 's3://object-path/name-prefix'
authorization
[ option, ...] 

where authorization is
IAM_ROLE { default | 'arn:aws:iam::<AWS account-id-1>:role/<role-name>[,arn:aws:iam::<AWS account-id-2>:role/<role-name>][,...]' }
            
where option is
| [ FORMAT [ AS ] ] CSV | PARQUET | JSON
| PARTITION BY ( column_name [, ... ] ) [ INCLUDE ]
| MANIFEST [ VERBOSE ]
| HEADER
| DELIMITER [ AS ] 'delimiter-char'
| FIXEDWIDTH [ AS ] 'fixedwidth-spec'
| ENCRYPTED [ AUTO ]
| BZIP2
| GZIP
| ZSTD
| ADDQUOTES
| NULL [ AS ] 'null-string'
| ESCAPE
| ALLOWOVERWRITE
| CLEANPATH
| PARALLEL [ { ON | TRUE } | { OFF | FALSE } ]
| MAXFILESIZE [AS] max-size [ MB | GB ]
| ROWGROUPSIZE [AS] size [ MB | GB ]
| REGION [AS] 'aws-region' }
| EXTENSION 'extension-name'

Data Lake Data Exploration: Explore Parquet Data on AWS S3 with Singdata Lakehouse

View the total number of rows of data (requires pre-creation of Singdata Lakehouse STORAGE CONNECTION and EXTERNAL VOLUME):

Preview Data

select 
        *
    from volume hz_qiliang_csv_volume(
    price  int,
    date  int,
    postcode1  binary,
    postcode2  binary,
    type  binary,
    is_new  int,
    duration  binary,
    addr1  binary,
    addr2  binary,
    street  binary,
    locality  binary,
    town  binary,
    district  binary,
    county  binary
) USING parquet
regexp '/house_prices_iceberg/data/000.*.parquet'
limit 10;

In Singdata Lakehouse, executing the above query yields the following result:

Data Ingestion: Load Data from S3 (L) to Singdata Lakehouse and Perform Data Transformation (T)

use schema public_datasets;
create table if not exists house_prices_paid_from_oss_parquet as
select price,
        cast(date*24*3600 as timestamp) as date,
        cast(postcode1 as string) as postcode1,
        cast(postcode2 as string) as postcode2,
        cast(type as string) as type,
        is_new,
        cast(duration as string) as duration,
        cast(addr1 as string) as addr1,
        cast(addr2 as string) as addr2,
        cast(street as string) as street,
        cast(locality as string) as locality,
        cast(town as string) as town,
        cast(district as string) as district,
        cast(county as string) as county
    from volume public.hz_qiliang_csv_volume(
    price  int,
    date  int,
    postcode1  binary,
    postcode2  binary,
    type  binary,
    is_new  int,
    duration  binary,
    addr1  binary,
    addr2  binary,
    street  binary,
    locality  binary,
    town  binary,
    district  binary,
    county  binary
) USING parquet
regexp '/house_prices_iceberg/data/000.*.parquet'
order by date,county,price;

Verify the number of rows of data in the warehouse:

Explore the data in the warehouse using SQL:

BI Application: Explore and analyze data in Singdata Lakehouse through Metabase

Create a database connection to Singdata Lakehouse in Metabase

Explore and analyze data through Metabase (just two clicks, yes, just two!)

Select the database and table:

Browse and analyze data through Metabase

Explore and analyze data through Metabase:

AI Application: Predict and analyze house prices through MindsDB (Only SQL)

The data flow in this section: Zeppelin -> MindsDB -> Singdata Lakehouse.

  • Zeppelin creates a new Interpreter connection to MindsDB through MySQL JDBC Driver
  • MindsDB connects to Singdata Lakehouse through clickzetta handler (based on python SQLAlchemy)

Build model training data in Singdata Lakehouse

drop table if exists house_prices_paid_grouped_by_features;
create table if not exists house_prices_paid_grouped_by_features as 
SELECT
  postcode1,
  postcode2,
  TYPE,
  is_new,
  duration,
  street,
  town,
  district,
  county,
  round(max(price)) as max_price,
  round(min(price)) as min_price,
  round(avg(price)) as avg_price,
  count(*) as paid_times,
FROM
  house_prices_paid_from_oss_parquet
WHERE  postcode1 !='' and  postcode2 !=''
GROUP BY 1,2,3,4,5,6,7,8,9
ORDER BY 9,1,2,3,4,5,6,7,8
LIMIT 10000;

Create a Zeppelin Interpreter to Connect MindsDB via MySQL JDBC

Create a New Notebook in Zeppelin

MindsDB connects to Singdata Lakehouse, using Singdata Lakehouse as a data source

--MindsDB connects to Singdata Lakehouse
CREATE DATABASE if not exists clickzetta_uat_public_datasets    --- display name for database.
WITH ENGINE = 'clickzetta',                                     --- name of the mindsdb handler
PARAMETERS = {
    "service": "api.singdata.com",                        --- ClickZetta Lakehouse service address.
    "workspace": "********",                                       --- ClickZetta workspace.
    "instance": "********",                                     --- account instance id.
    "vcluster": "default",                                      --- vcluster
    "username": "********",                                      --- your usename.
    "password": "********",                                    --- Your password.
    "schema": "public_datasets"                                 --- common schema PUBLIC.
};

Create Model

Create a prediction model to predict paid_times, which is the number of times a house is sold.

--Create prediction model
CREATE MODEL IF NOT EXISTS
  clickzetta.uk_house_prices_grouped_by_features_model_avg_price
FROM clickzetta_uat_public_datasets  (SELECT * FROM house_prices_paid_grouped_by_features)
PREDICT avg_price;
--View model status
DESCRIBE clickzetta.uk_house_prices_grouped_by_features_model_avg_price;

House Price Prediction

--MAKE A PREDICTION
SELECT avg_price, 
       avg_price_explain 
FROM clickzetta.uk_house_prices_grouped_by_features_model_avg_price
WHERE postcode1 = 'BS32'
AND postcode2= '9DF'
AND type= 'terraced'
AND is_new =1
AND duration= 'freehold'
AND street= 'FERNDENE'
AND town= 'BRISTOL'
AND district= 'NORTHAVON'
AND county= 'AVON';

Prediction results:

avg_price        avg_price_explain
1306        {"predicted_value": 1306, "confidence": 0.97, "anomaly": null, "truth": null, "confidence_lower_bound": 0, "confidence_upper_bound": 7654}

Batch House Price Prediction

-- Bulk predictions by joining a table with your model:
SELECT t.*, m.avg_price as predicted_avg_price,m.avg_price_explain
FROM clickzetta_uat_public_datasets.house_prices_paid_grouped_by_features as t 
JOIN clickzetta.uk_house_prices_grouped_by_features_model_avg_price as m
LIMIT 100;

Appendix

Metabase, MindsDB, Zeppelin Environment Installation and Deployment Guide

  • Metabase with Lakehouse Driver on Docker
  • MindsDB with Lakehouse Connector on Docker
  • Zeppelin with Lakehouse JDBC Driver

Preview Parquet file schema and data through Python code, and generate Singdata Lakehouse SQL code

import os
import pyarrow.parquet as pq

def print_parquet_file_head(file_path, num_rows=10):
    # Open the Parquet file
    parquet_file = pq.ParquetFile(file_path)
    
    # Read the first few rows of the Parquet file into a pandas DataFrame
    table = parquet_file.read_row_group(0, columns=None, use_threads=True)
    df = table.to_pandas()

    # Truncate the DataFrame to the desired number of rows
    if len(df) > num_rows:
        df = df.head(num_rows)

    # Print DataFrame with headers
    print(df)

def print_parquet_schema(file_path):
    # Open the Parquet file
    parquet_file = pq.ParquetFile(file_path)
    
    # Get schema information and build SQL fragment
    schema = parquet_file.schema.to_arrow_schema()
    sql_parts = []
    for field in schema:
        field_name = field.name
        field_type = str(field.type)
        sql_parts.append(f"    {field_name}  {field_type}")

    # Combine the list of fields into an SQL string
    sql_fields = ",\n".join(sql_parts)
    file_name = os.path.basename(file_path)

    # Print the final SQL statement format
    print(f"""-- Schema for {file_name}
select * from volume hz_qiliang_csv_volume(
{sql_fields}
) USING parquet
files('/amazon_reviews/{file_name}');
""")

# Update the directory path as needed
local_directory = "/Users/liangmo/Documents/yqgithub/qiliang_py"

# List all relevant Parquet files in the given directory
parquet_files = [f for f in os.listdir(local_directory) if f.endswith('.parquet') and f.startswith('000')]

# Print the schema and head for each Parquet file
for file_name in parquet_files:
    file_path = os.path.join(local_directory, file_name)
    try:
        print_parquet_schema(file_path)
        print_parquet_file_head(file_path)  # Function call to print the top rows
    except Exception as e:
        print(f"Error processing {file_path}: {e}")

Input example: