Skip to content

VerdictDB on Apache Spark

We will write a simple example program in Scala, the standard programming language for Apache Spark. To compile our example program, the following tools must be installed.

  1. sbt: sbt installation guide

Create an empty project

The following command creates a project that prints out "hello".

$ sbt new sbt/scala-seed.g8

A minimal Scala project.

name [Scala Seed Project]: hello-verdict

Template applied in ./hello-verdict

Move into the project directory: cd hello-verdict.

Remove the src/test directory, which we do not need: rm -rf src/test.

Configure build setting to use Spark and VerdictDB

Add the following line in build.sbt, under the existing import Dependencies._ line. As of the time of writing, the latest version of Apache Spark only supports Scala 2.11.

scalaVersion := "2.11.1"

Also, replace the existing dependency list with

libraryDependencies ++= Seq(
  scalaTest % Test,
  "org.verdictdb" % "verdictdb-core" % "0.5.8",
  "org.apache.spark" %% "spark-core" % "2.3.1" % "provided",
  "org.apache.spark" %% "spark-sql" % "2.3.1" % "provided"
)

This dependency declaration will let the compiler (sbt in our case) download relevant libraries automatically.

Write an example program

Edit src/main/scala/example/Hello.scala as follows:

package example

import org.apache.spark.sql.SparkSession
import org.verdictdb.VerdictContext
import org.verdictdb.connection.SparkConnection
import scala.util.Random

object Hello extends App {
  val spark = SparkSession
    .builder()
    .appName("VerdictDB basic example")
    .enableHiveSupport()
    .getOrCreate()
  spark.sparkContext.setLogLevel("ERROR")
  import spark.implicits._
  val verdict = VerdictContext.fromSparkSession(spark)

  // prepare data
  prepareData(spark, verdict)

  // run a query and print its result
  val rs = verdict.sql("select count(*) from myschema.sales")
  rs.printCsv()

  // simply the following lines will be printed (the actual count value may vary)
  // c2
  // 950.0

  def prepareData(spark: SparkSession, verdict: VerdictContext): Unit = {
    // create a schema and a table
    spark.sql("DROP SCHEMA IF EXISTS myschema CASCADE")
    spark.sql("CREATE SCHEMA IF NOT EXISTS myschema")
    spark.sql("CREATE TABLE IF NOT EXISTS myschema.sales (product string, price double)")

    // insert 1000 rows
    val productList = List("milk", "egg", "juice")
    val rand = new Random()
    var query = "INSERT INTO myschema.sales VALUES"
    for (i <- 0 until 1000) {
      val randInt: Int = rand.nextInt(3)
      val product: String = productList(randInt)
      val price: Double = (randInt+2) * 10 + rand.nextInt(10)
      if (i == 0) {
        query = query + f" ('$product', $price%.0f)"
      } else {
        query = query + f", ('$product', $price%.0f)"
      }
    }
    spark.sql(query)

    verdict.sql("BYPASS DROP TABLE IF EXISTS myschema.sales_scramble")
    verdict.sql("BYPASS DROP SCHEMA IF EXISTS verdictdbtemp CASCADE")
    verdict.sql("BYPASS DROP SCHEMA IF EXISTS verdictdbmeta CASCADE")
    verdict.sql("CREATE SCRAMBLE myschema.sales_scramble FROM myschema.sales BLOCKSIZE 100")
  }
}

Package and Submit

$ sbt assembly
$ spark-submit target/scala-2.11/Hello-assembly-0.1.0-SNAPSHOT.jar --class example.Hello

This example program is available on this public GitHub repository. See the directory verdictdb_on_spark.