Getting Started

    +
    To get started with the Couchbase Spark connector quickly, learn how to add the connector to your Spark project and run simple queries.

    Quickstart

    The following examples use Scala and its sbt dependency manager, but it is possible to use it through Maven/Gradle as well and from Java.

    Create a new sbt project and add the following content to the build.sbt file. This code includes all the Spark dependencies, as well as the Couchbase Spark connector. Just enough to get you started!

    Here is a reference to the Scala docs.

    name := "my-first-couchbase-spark-project"
    
    organization := "my.organization"
    
    version := "1.0.0-SNAPSHOT"
    
    scalaVersion := "2.12.14"
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-core" % "3.3.0",
      "org.apache.spark" %% "spark-sql" % "3.3.0",
      "com.couchbase.client" %% "spark-connector" % "3.3.1"
    )

    Now, under src/main/scala/, create a Quickstart.scala class with the following skeleton:

    object Quickstart {
    
    	def main(args: Array[String]): Unit = {
    
    	}
    
    }

    If you are not familiar with Scala, this is the equivalent to Java’s public static void main(String[] args) method. The following code goes inside the main body.

    When it comes to Spark, you always need to set up a configuration and initialize the SparkSession. The following snippet does that and also provides the necessary properties to connect to the Couchbase cluster. Please adjust the values to fit your environment.

    // Configure the Spark Session
    val spark = SparkSession
      .builder()
      .appName("Couchbase Quickstart")
      .master("local") // use the JVM as the master, great for testing
      .config("spark.couchbase.connectionString", "127.0.0.1")
      .config("spark.couchbase.username", "username")
      .config("spark.couchbase.password", "password")
      .getOrCreate()

    In this configuration, we are using the JVM-local spark master to run our jobs. The Couchbase cluster is located on our local machine (127.0.0.1) and we use the user username with a password of password.

    to showcase different APIs, the following examples us the Keyspace explicitly. It is also possible to configure an implicit bucket, scope etc. through the configuration properties. See the configuration documentation for more information.

    Creating and saving RDDs

    The fundamental data type in spark is the RDD (Resilient Distributed Dataset). The Couchbase Spark connector allows you to read and write RDDs against the cluster.

    Add the following line to your code so that all couchbase-specific methods are available on the SparkContext:

    import com.couchbase.spark._

    There are different APIs for each service (KeyValue, Query, etc) available, and one of the most used ones is fetching documents. This can be achieved by using couchbaseGet:

    // Imports needed, but let your IDE handle that!
    import com.couchbase.client.scala.json.JsonObject
    import com.couchbase.spark.kv.Get
    
    // IDs of all documents we want to fetch
    val ids = Seq(Get("airline_10"), Get("airline_10642"), Get("airline_10748"))
    
    // The bucket from which we want to fetch the keys
    val keyspace = Keyspace(bucket = Some("travel-sample"))
    
    spark
      // access the spark context
      .sparkContext
      // perform the get with ids and keyspace
      .couchbaseGet(ids, keyspace)
      // collect all spark results back to the master
      .collect()
      // print the content from each GetResult
      .foreach(getResult => println(getResult.contentAs[JsonObject].get))

    First, the IDs we want to fetch are specified (and wrapped in Get case classes). Then, the Keyspace tells the command where to look for the data. In this case, we tell it to use the travel-sample bucket. Note that if you do not provide a scope or collection, the default ones are assumed. This way, the code is compatible with Couchbase Server versions before 7.0, but also with newer ones.

    For each ID that has been found, a GetResult is returned, which is the same one as that would get returned from the Scala SDK directly. In the example above, the content is printed and turned into a generic JsonObject.

    Spark by default is very verbose in its logging, but looking towards the end of the execution you should see logs similar to this:

    {"country":"United States","iata":"Q5","name":"40-Mile Air","callsign":"MILE-AIR","icao":"MLA","id":10,"type":"airline"}
    {"country":"United Kingdom","iata":null,"name":"Jc royal.britannica","callsign":null,"icao":"JRB","id":10642,"type":"airline"}
    {"country":"United States","iata":"ZQ","name":"Locair","callsign":"LOCAIR","icao":"LOC","id":10748,"type":"airline"}

    The connector also allows to save RDDs as documents.

    The following code loads documents from the travel-sample bucket and then writes them into a different bucket. The same approach works for all kinds of RDDs, they could also come from different datasources.

    // IDs of all documents we want to fetch
    val ids = Seq(Get("airline_10"), Get("airline_10642"), Get("airline_10748"))
    
    // The bucket from where and into where we want to read/write
    val fromKeyspace = Keyspace(bucket = Some("travel-sample"))
    val toKeyspace = Keyspace(bucket = Some("foo"))
    
    spark
      .sparkContext
      // Retrieve the documents by ID
      .couchbaseGet(ids, fromKeyspace)
      // Turn the results into an "Upsert" case class...
      .map(getResult => Upsert(getResult.id, getResult.contentAs[JsonObject].get))
      // Which is accepted by the couchbaseUpsert method to store the documents
      .couchbaseUpsert(toKeyspace)
      .collect()
      .foreach(println)

    The println this time will show the results of the mutations, returning the Scala SDK’s MutationResult for each successful mutation.

    Next up is an introduction into Query/Analytics and DataFrames.

    Working with SparkSQL and DataFrames

    DataFrames, in its essence, are RDDs with a Schema. They are represented in the SparkSQL Row type.

    You need to at least have a primary index created on the travel-sample bucket to make the following examples work. If you haven’t done already, perform a CREATE PRIMARY INDEX ON `travel-sample` query.

    Because a DataFrame is like an RDD but with a schema and Couchbase is a schemaless database at its heart, you need a way to either define or infer a schema. The connector has built-in schema inference, but if you have a large or diverse data set, you need to give it some clues on filtering (or use scopes and collections with Server 7.0 and later).

    Suppose you want a DataFrame for all airlines, and you know that the JSON content has a type field with the value airline. You can pass this information to the connector for automatic schema inference:

    // Create a DataFrame with Schema Inference
    val airlines = spark.read.format("couchbase.query")
        .option(QueryOptions.Filter, "type = 'airline'")
        .option(QueryOptions.Bucket, "travel-sample")
        .load()
    
    // Print The Schema
    airlines.printSchema()

    The code automatically infers the schema and prints it in this format:

    root
     |-- __META_ID: string (nullable = true)
     |-- callsign: string (nullable = true)
     |-- country: string (nullable = true)
     |-- iata: string (nullable = true)
     |-- icao: string (nullable = true)
     |-- id: long (nullable = true)
     |-- name: string (nullable = true)
     |-- type: string (nullable = true)

    If you are using collections, something similar can be achieved without having to add the filter (since the airline collection only contains airlines in the first place):

    val airlines = spark.read.format("couchbase.query")
      .option(QueryOptions.Bucket, "travel-sample")
      .option(QueryOptions.Scope, "inventory")
      .option(QueryOptions.Collection, "airline")
      .load()

    Next you can perform an actual query where you are interested only in the name and callsign. This example sorts it by the callsign and loads only the first 10 rows.

    // Used for the $-sign to name a column
    import spark.sqlContext.implicits._
    
    airlines
      .select("name", "callsign")
      .sort($"name".asc)
      .show(10)

    The code prints the results on the console like this:

    +--------------------+-----------+
    |                name|   callsign|
    +--------------------+-----------+
    |         40-Mile Air|   MILE-AIR|
    |         AD Aviation|  FLIGHTVUE|
    |        ATA Airlines|     AMTRAN|
    |          Access Air|    CYCLONE|
    |          Aigle Azur| AIGLE AZUR|
    |         Air Austral|    REUNION|
    |Air Caledonie Int...|   AIRCALIN|
    |       Air Caraïbes|FRENCH WEST|
    |  Air Cargo Carriers|NIGHT CARGO|
    |          Air Cudlua|     Cudlua|
    +--------------------+-----------+
    only showing top 10 rows

    Working with Datasets

    A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine.

    The following example creates a Dataset out of a Dataframe and maps it to a case class. It then uses the case class to extract fields out of the result set in a typesafe way:

    // Create a DataFrame with Schema Inference
    val airlines = spark.read.format("couchbase.query")
      //.option(QueryOptions.Filter, "type = 'airline'")
      .option(QueryOptions.Bucket, "travel-sample")
      .option(QueryOptions.Scope, "inventory")
      .option(QueryOptions.Collection, "airline")
      .load()
    
    // Contains the Encoder
    import spark.implicits._
    
    // Create a Dataset from the DataFrame
    val airlinesDS = airlines.as[Airline]
    airlinesDS
      .limit(10)
      .collect()
      .foreach(println)

    The Airline case class itself is defined outside of the main object:

    case class Airline(name: String, country: String)

    When run, this should print:

    Airline(40-Mile Air,United States)
    Airline(Texas Wings,United States)
    Airline(Atifly,United States)
    Airline(Jc royal.britannica,United Kingdom)
    Airline(Locair,United States)
    Airline(SeaPort Airlines,United States)
    Airline(Alaska Central Express,United States)
    Airline(Astraeus,United Kingdom)
    Airline(Air Austral,France)
    Airline(Airlinair,France)

    Accessing the SDK directly

    Most of the time accessing the SDK directly is not needed, but if lower level operations need to be performed which are not exposed through the connector it can be done.

    To do this, a CouchbaseConnection needs to be obtained and fed a CouchbaseConfig. From there, the Cluster, Bucket etc. are available.

    val config = CouchbaseConfig(spark.sparkContext.getConf)
    val sdk = CouchbaseConnection()
    val cluster = sdk.cluster(config)
    
    val result = cluster.query("select 1=1")