Version Requirements

ConnectorSparkScala
1.0.03.4.02.12.13

Getting the Connector

Please contact Lakehouse technical support

Parameter Description

ParameterRequiredDescription
endpointYThe endpoint address to connect to the lakehouse, eg: tmwmzxzs.dev-api.clickzetta.com
usernameYUsername
passwordYPassword
workspaceYWorkspace in use
virtualClusterYVirtual cluster in use
schemaYName of the schema to access
tableYName of the table to access

Data Type Mapping

Spark Data TypeLakehouse Data Type
BooleanTypeBOOLEAN
ByteTypeINT8
ShortTypeINT16
IntegerTypeINT32
LongTypeINT64
FloatTypeFLOAT32
DoubleTypeFLOAT64
DecimalTypeDECIMAL
BINARYTypeBINARY
DateTypeDATE
TimestampNTZTypeTimestamp_NTZ
TimestampTypeTimestampType_LTZ
ArrayTypeArray
MapTypeMap
StructTypeStruct

Usage and Testing

Preparation

Local Test pom Dependency

<properties>
    <spark.version>3.4.0</spark.version>
    <scala.version>2.12.13</scala.version>
    <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>log4j-slf4j2-impl</artifactId>
                    <groupId>org.apache.logging.log4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j-1.2-api</artifactId>
                    <groupId>org.apache.logging.log4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j-api</artifactId>
                    <groupId>org.apache.logging.log4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>


        <dependency>
            <groupId>com.clickzetta</groupId>
            <artifactId>spark-clickzetta</artifactId>
            <version>1.0.0-SNAPSHOT</version>
            <scope>system</scope>
            <systemPath>${package_path_prefix}/spark-clickzetta-1.0.0-SNAPSHOT-jar-with-dependencies.jar</systemPath>
        </dependency>
    </dependencies>
    
 # Note ${package_path_prefix} needs to be replaced with the local downloaded jar package path

Create Lakehouse Table

CREATE TABLE spark_connector_test_table(
    v_bool boolean,
    v_byte byte,
    v_short short,
    v_int int,
    v_long long,
    v_float float,
    v_double double,
    v_string string,
    v_decimal decimal(10,2),
    v_timestamp timestamp,
    v_date date
    );

Using Spark to Write Data to Lakehouse

Write Restrictions (Follow bulkload write restrictions)

  • PK table writing is not supported
  • Must write all fields, partial field writing is not supported

DataFrame Writing

val spark = SparkSession.builder
  .master("local")
  .appName("ClickzettaSourceSuite")
  .getOrCreate()

val rowSchema = StructType(
  List(
    StructField("v_bool", BooleanType),
    StructField("v_byte", ByteType),
    StructField("v_short", ShortType),
    StructField("v_int", IntegerType),
    StructField("v_long", LongType),
    StructField("v_float", FloatType),
    StructField("v_double", DoubleType),
    StructField("v_string", StringType),
    StructField("v_decimal", DecimalType.apply(10, 2)),
    StructField("v_timestamp", TimestampType),
    StructField("v_date", DateType)
  )
)

// bulkLoad require full field writing
val data = spark.sparkContext.parallelize(
  Seq(
    Row(true,
      1.byteValue(),
      1.shortValue(),
      1,
      1L,
      1.1f,
      1.1d,
      "001",
      BigDecimal(100.123),
      Timestamp.valueOf("2024-05-12 17:49:11.873533"),
      Date.valueOf("2024-05-13")
    )
  )
)
val df = spark.createDataFrame(data, rowSchema)

df.write.format("clickzetta")
  .option(ClickzettaOptions.CZ_ENDPOINT, endpoint)
  .option(ClickzettaOptions.CZ_USERNAME, username)
  .option(ClickzettaOptions.CZ_PASSWORD, password)
  .option(ClickzettaOptions.CZ_WORKSPACE, workspace)
  .option(ClickzettaOptions.CZ_VIRTUAL_CLUSTER, virtualCluster)
  .option(ClickzettaOptions.CZ_SCHEMA, schema)
  .option(ClickzettaOptions.CZ_TABLE, table)
  .option(ClickzettaOptions.CZ_ACCESS_MODE, accessMode)
  .mode("append")
  .save()

Using lakehouse sqlline to View Data

Using Spark to Read Data from Lakehouse

val spark = SparkSession.builder
  .master("local")
  .appName("ClickzettaSourceSuite")
  .getOrCreate()

val readDf = spark.read.format("clickzetta")
  .option(ClickzettaOptions.CZ_ENDPOINT, endpoint)
  .option(ClickzettaOptions.CZ_USERNAME, username)
  .option(ClickzettaOptions.CZ_PASSWORD, password)
  .option(ClickzettaOptions.CZ_WORKSPACE, workspace)
  .option(ClickzettaOptions.CZ_VIRTUAL_CLUSTER, virtualCluster)
  .option(ClickzettaOptions.CZ_SCHEMA, schema)
  .option(ClickzettaOptions.CZ_TABLE, table)
  .option(ClickzettaOptions.CZ_ACCESS_MODE, accessMode)
  .load()
readDf.printSchema()
val result1 = readDf.select(col("`v_string`"), col("`v_date`")).collect()
println("----------read from df-----------")
for (row <- result1) {
  println(row.toString())
}

readDf.createOrReplaceTempView("tmp_spark_connector_test_table")
val result2 = spark.sql("select * from tmp_spark_connector_test_table where v_float = 1.1f;").collect()
println("----------read from sql-----------")
for (row <- result2) {
  println(row.toString())
}