A newer version of this documentation is available.

View Latest

Spark SQL Integration

    +
    Spark SQL integration depends on N1QL, which is available in Couchbase Server 4.0 and later. To use Spark SQL queries, you need to create and persist DataFrames/Datasets via the Spark SQL DataFrame/Dataset API.

    All examples presented on this page at least require a primary index on the travel-sample data set. If you haven’t done so already, you can create a primary index by executing this N1QL statement: CREATE PRIMARY INDEX ON `travel-sample`.

    DataFrame creation

    Before you can create a DataFrame with Couchbase, you need to create a SparkSession.

    // Configure Spark
    	val spark = SparkSession
    	      .builder()
    	      .appName("SparkSQLExample")
    	      .master("local[*]") // use the JVM as the master, great for testing
    	      .config("spark.couchbase.nodes", "127.0.0.1") // connect to couchbase on localhost
    	      .config("spark.couchbase.bucket.travel-sample", "") // open the travel-sample bucket with empty password
    	      .getOrCreate()

    Also, you need to make sure that the following import is present:

    import com.couchbase.spark.sql._

    The easiest way to create a DataFrame looks like this:

    val dataFrame = spark.read.couchbase()

    While this is the easiest, it has a few shortcomings. It will try to perform automatic schema inference based on the full data set, which is very likely to not hit the right schema (especially if you have a large or diverse data set).

    There are two options to solve this shortcoming: you can either provide a manual schema or narrow down the automatic schema inference by providing explicit predicates. The benefit of the latter approach is also that the predicate provided will be used on every query to optimize performance.

    If you want to get automatic schema inference on all airlines, you can specify it like this:

    val airline = spark.read.couchbase(EqualTo("type", "airline"))

    If you call airline.printSchema(), it will print:

    root
     |-- META_ID: string (nullable = true)
     |-- callsign: string (nullable = true)
     |-- country: string (nullable = true)
     |-- iata: string (nullable = true)
     |-- icao: string (nullable = true)
     |-- id: long (nullable = true)
     |-- name: string (nullable = true)
     |-- type: string (nullable = true)

    Not only did it automatically infer the schema, it also added a META_ID field which corresponds to the document ID if applicable.

    A manual schema can also be provided if the automatic inference does not work properly:

    spark.read.couchbase(StructType(
      StructField("name", StringType) ::
      StructField("type", StringType) :: Nil
    ))

    Since now you have a DataFrame, you can apply all the operations that Spark SQL provides. A simple example would be to load specific fields from the DataFrame and print some of those records:

    airline
      .select("name", "callsign")
      .sort(airline("callsign").desc)
      .show(10)
    +-------+--------------------+
    |   name|            callsign|
    +-------+--------------------+
    |   EASY|             easyJet|
    |   BABY|             bmibaby|
    |MIDLAND|                 bmi|
    |   null|          Yellowtail|
    |   null|               XOJET|
    |STARWAY|   XL Airways France|
    |   XAIR|            XAIR USA|
    |  WORLD|       World Airways|
    |WESTERN|    Western Airlines|
    |   RUBY|Vision Airlines (V2)|
    +-------+--------------------+

    It is also possible to provide a custom schema as well as a predicate for maximum flexibility when describing your data layout as well as optimizing the performance when retrieving unstructured data.

    For a more complete example, imagine you have landmarks in Hadoop distributed file system (HDFS) and airports in Couchbase. For each airport code you want to find all landmarks in the same city:

    // Load Landmarks from HDFS
    val landmarks = spark.read.json("hdfs://127.0.0.1:8091/landmarks/*")
    landmarks.createOrReplaceTempView("landmarks")
    
    // Load Airports from Couchbase
    val airports = spark.read.couchbase(schemaFilter = EqualTo("type", "airport"))
    
    // find all landmarks in the same city as the given FAA code
    val toFind = "SFO" // try SFO or LAX
    
    airports
      .join(landmarks, airports("city") === landmarks("city"))
      .select(airports("faa"), landmarks("name"), landmarks("url"))
      .where(airports("faa") === toFind and landmarks("url").isNotNull)
      .orderBy(landmarks("name").asc)
      .show(20)

    Direct DataFrame access

    If you are using DataFrames from Scala, using the implicit imports is a great way to simplify your code. If you want to use DFs from other languages, direct access is the preferred way to use them.

    In fact, the implicit imports are mainly just syntactic sugar over the direct access methods.

    The following code describes how to create a DF directly, with setting all the options manually:

    val df = spark.read
      // Define the source, required
      .format("com.couchbase.spark.sql.DefaultSource")
    
      // Either set the schema filter for inference. Like this
      .option("schemaFilter", N1QLRelation.filterToExpression(EqualTo("type", "airline")))
      // Or directly as a string
      .option("schemaFilter", "`type` = 'airline'")
      // Or provide the schema directly
      .schema(StructType(
        StructField("name", StringType):: Nil
      ))
    
      .load()

    You can also provide all kinds of options directly, either to spark or for advanced functionality in the N1QL integration. Currently, the following options are allowed:

    • idField: The name of the document ID field, defaults to "META_ID".

    • bucket: The name of the bucket to use, which is required if more than one bucket is opened.

    DataFrame persistence

    It is also possible to persist DataFrames into Couchbase. The important part is that a META_ID (or different if configured) field exists which can be mapped to the unique Document ID. All the other fields in the DataFrame will be converted into JSON and stored as the document content.

    import sql.implicits._
    
    val people = spark.sparkContext.parallelize(Seq(
      Person("user::michael", "Michael", 27),
      Person("user::tom", "Tom", 33)
    )).toDF()
    people.createOrReplaceTempView("people")
    
    people.write.couchbase(Map("idField" -> "uid"))

    In this example, the DataFrame is persisted into Couchbase and the document ID field is mapped to uid.

    Working with Datasets

    Spark 1.6 introduced Datasets and since they are built on top of Dataframes for Spark SQL Couchbase supports them out of the box.

    You can call .as[Target] on your DataFrame to turn it into typesafe counterpart (most of the time a case class). Consider having the following case class:

    // Airline has subset of the fields that are in the database
    case class Airline(name: String, iata: String, icao: String, country: String)

    Make sure to import the implicits for the SparkSession:

    import spark.implicits._

    You can now create a DataFrame as usual which can be turned into a Dataset:

    val airlines = spark.read.couchbase(schemaFilter = EqualTo("type", "airline")).as[Airline]

    If you want to print all Airlines that start with "A" you can access the properties on the case class:

    airlines
    	.map(_.name)
    	.filter(_.toLowerCase.startsWith("a"))
    	.foreach(println)

    Fore more information on Datasets, please refer to the Spark Dataset Docs.