Spark Connector Overview

Lakehouse Connector for Spark allows using Lakehouse as an Apache Spark data source, similar to other data sources (PostgreSQL, HDFS, S3, etc.).

Interaction between Lakehouse and Spark

This connector supports bidirectional data movement between Lakehouse and Spark clusters. Using the connector, you can:

  • Read tables from Lakehouse and convert them into Spark DataFrames
  • Write data from Spark DataFrames into tables in Lakehouse.

Data Transfer Process

Data transfer between the two systems is achieved through a Lakehouse volume that is automatically created and managed by the connector.

  • When connecting to Lakehouse and executing queries, data is loaded into a temporary volume, which the connector uses to store data.
  • Writing to Lakehouse is done by calling the bulkload SDK for batch writing.

Connection Parameters

ParameterRequiredDescription
endpointYThe endpoint address for connecting to Lakehouse, e.g., 6861c888.cn-shanghai-alicloud.api.clickzetta.com. You can find the JDBC connection string in Lakehouse Studio under Management -> Workspace. The domain name in the JDBC connection string is the endpoint
usernameYUsername
passwordYPassword
workspaceYWorkspace used
virtualClusterYVirtual cluster used
schemaYSchema name accessed
tableYName of the table being accessed

Connect to Lakehouse using Spark Command Line

  • Download the Spark 3.4 jar package from the Spark official website. In this case, the download is spark-3.4.3-bin-hadoop3.tgz
  1. Use the spark-shell command line to connect to Lakehouse locally
bin/spark-shell --jars ./spark_conenctor/spark-clickzetta-1.0.0-SNAPSHOT-jar-with-dependencies.jar
import org.apache.spark.sql.functions.col
import com.clickzetta.spark.clickzetta.ClickzettaOptions
val readDf = spark.read.format("clickzetta").option(ClickzettaOptions.CZ_ENDPOINT, "lakehouse_url").option(ClickzettaOptions.CZ_USERNAME, "user").option(ClickzettaOptions.CZ_PASSWORD, "password").option(ClickzettaOptions.CZ_WORKSPACE, "quikc_start").option(ClickzettaOptions.CZ_VIRTUAL_CLUSTER, "default").option(ClickzettaOptions.CZ_SCHEMA, "public").option(ClickzettaOptions.CZ_TABLE, "birds_test").load()
readDf.show()
2. Use the spark-sql command line to connect to Lakehouse
bin/spark-sql --jars ./jars/spark-clickzetta-1.0.0-SNAPSHOT-jar-with-dependencies.jar
CREATE TABLE lakehouse_table
USING clickzetta
OPTIONS (
    endpoint 'lakehouse_url',
    username 'user',
    password 'password',
    workspace 'quikc_start',
    virtualCluster 'default',
    schema 'public',
    table 'birds_test'
);

3. Use the pyspark command line to connect to Lakehouse

bin/pyspark --jars ./jars/spark-clickzetta-1.0.0-SNAPSHOT-jar-with-dependencies.jar
df=spark.read.format("clickzetta").option("endpoint", "jnsxwfyr.api.singdata.com").option("username", "user").option("password", "password").option("workspace","quick_start").option("virtualCluster", "default").option("schema", "public")option("table","birds_test").load()
df.show()

Lakehouse Spark Connector Usage Restrictions

  • Writing to pk tables is not supported
  • Must write all fields, partial field writing is not supported

Practical Case of Using Spark Connector

Using Spark to Write Data to Lakehouse

Overview

Seamless data transfer between different platforms is crucial for effective data management and analysis. A common scenario we help many customers solve is using Spark to process data and write it into Lakehouse, with BI reports connecting to Lakehouse for queries.

We will write a Spark program and run it in the Spark environment, using the Connector provided by Lakehouse to write data into Lakehouse.

Environment Requirements

  • Proficiency in Spark programming. You can refer to Developing Spark with IntelliJ IDEA

  • The dataset used in this case is the movie rating dataset from Spark's GitHub. You can download it by clicking the download button on this link

  • Create a table in Lakehouse

    • create table sample_movie_data(user_id int,movie_id int,rating float);
  • Download the Spark Connector package provided by Lakehouse (currently the download package is provided by Lakehouse support). After downloading, add the jar to the local maven repository to facilitate referencing and packaging in maven projects.

    • mvn install:install-file -Dfile=./Downloads/spark-clickzetta-1.0.0-SNAPSHOT-jar-with-dependencies.jar -DgroupId=com.clickzetta -DartifactId=spark-clickzetta -Dversion=1.0.0-SNAPSHOT -Dpackaging=jar
  • Modify the pom.xml file and add the following dependencies

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <spark.version>3.4.0</spark.version>
    <scala.version>2.12.17</scala.version>
    <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>


    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>

    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-compiler</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <dependency>
        <groupId>com.clickzetta</groupId>
        <artifactId>spark-clickzetta</artifactId>
        <version>1.0.0-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <!-- scala compilation plugin -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
        </plugin>
        <!-- java compilation plugin -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <!-- plugin version -->
            <version>3.8.1</version>
            <!-- compilation level forced to jdk1.8 -->
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <!-- encoding format -->
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <!-- maven packaging plugin -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.3</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <!-- specify the location of main -->
                                <mainClass>*</mainClass>
                            </transformer>
                        </transformers>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <!-- filter out unnecessary jar packages -->
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
  • Writing Scala Code
import com.clickzetta.spark.clickzetta.ClickzettaOptions
import org.apache.spark.sql.SparkSession
object SparkLH {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .master("local")
      .appName("ClickzettaSourceSuite")
      .getOrCreate()

    // Read data file
    val rdd =     spark.sparkContext.textFile("./Downloads/sample_movielens_data.txt")
    // Split data
    val rddSplit = rdd.map(line => line.split("::"))

    // Convert to DataFrame
    import spark.implicits._
     val df= rddSplit.map(arr => (arr(0).toInt, arr(1).toInt, arr(2).toFloat)).toDF()
    // Write to Lakehouse
    df.write.format("clickzetta")
      .option(ClickzettaOptions.CZ_ENDPOINT, "jnsxwfyr.api.clickzetta.com")
      .option(ClickzettaOptions.CZ_USERNAME, "username")
      .option(ClickzettaOptions.CZ_PASSWORD, "paswword")
      .option(ClickzettaOptions.CZ_WORKSPACE, "quick_start")
      .option(ClickzettaOptions.CZ_VIRTUAL_CLUSTER, "default")
      .option(ClickzettaOptions.CZ_SCHEMA, "public")
      .option(ClickzettaOptions.CZ_TABLE, "sample_movie_data")
      .mode("overwrite")
      .save()
    spark.stop()
  }
}

Using Spark ML to Process Lakehouse Data

Objective:

By reading data existing in the Lakehouse, use Spark ML to train a recommendation model to predict user ratings for movies, and use ranking metrics to evaluate the model's performance.

Environment Preparation

  • The data used in this case is the movie rating dataset from Spark's GitHub. You can download it by clicking the download button on this link. Use the Spark write to Lakehouse data case to write it into the Lakehouse.

  • Create a table in the Lakehouse

    • create table sample_movie_data(user_id int,movie_id int,rating float);
  • Install Python package version greater than 3.6

  • Install Pyspark

pip install pytspark
  • Download the Spark Connector package provided by Lakehouse,

    • When running locally with pyspark, you can reference the jar package in the code.
    • os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars 。/Downloads/spark-clickzetta-1.0.0-SNAPSHOT-jar-with-dependencies.jar pyspark-shell'
  • If running in cluster mode, you can use --jars to specify the required dependency jar packages

    •  spark-submit --master <master-url> --deploy-mode cluster --jars /path/to/your.jar my_app.py
  • Writing Python Code

# $example off$
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
# $example on$
from pyspark.mllib.recommendation import ALS, Rating
from pyspark.mllib.evaluation import RegressionMetrics
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars 。/Downloads/spark-clickzetta-1.0.0-SNAPSHOT-jar-with-dependencies.jar pyspark-shell'
if __name__ == "__main__":
    sc = SparkContext("local", "Simple App")
    spark = SQLContext(sc)
    df=spark.read.format("clickzetta").option("endpoint", "jnsxwfyr.api.clickzetta.com").option("username", "user").option("password", "password").option("workspace", "qucik_start").option("virtualCluster", "default").option("schema", "public").option("table", "sample_movie_data").load()

    # Convert DataFrame to RDD and parse as Rating objects
    ratings = df.rdd.map(lambda row: Rating(row.user_id, row.movie_id, row.rating-2.5))

    # Continue with subsequent model training and evaluation steps
    model = ALS.train(ratings, rank=10, iterations=10, lambda_=0.01)

    usersProducts = ratings.map(lambda r: (r.user, r.product))
    predictions = model.predictAll(usersProducts).map(lambda r: ((r.user, r.product), r.rating))

    ratingsTuple = ratings.map(lambda r: ((r.user, r.product), r.rating))
    scoreAndLabels = predictions.join(ratingsTuple).map(lambda tup: tup[1])

    metrics = RegressionMetrics(scoreAndLabels)

    # Root mean squared error
    print("RMSE = %s" % metrics.rootMeanSquaredError)

    # R-squared
    print("R-squared = %s" % metrics.r2)
    # $example off$