A newer version of this documentation is available.

View Latest

Working With RDDs

    +
    Spark operates on resilient distributed datasets (RDDs). When you need to extract data out of Couchbase, the Couchbase Spark connector creates RDDs for you. You can create and persist RDDs by using key-value pairs, views, or N1QL.

    Creating RDDs

    To get access to the RDDs through the implicit methods that the Couchbase Spark connector provides, you need to add the following import to your code:

    import com.couchbase.spark._

    This import adds Couchbase-specific methods to the SparkContext. Each method starts with couchbase.

    The code examples here use these additional imports:

    import com.couchbase.spark._
    import org.apache.spark.SparkConf
    import com.couchbase.client.java.document.JsonDocument
    import com.couchbase.client.java.document.json.{JsonArray, JsonObject}
    import com.couchbase.client.java.query.N1qlQuery
    import com.couchbase.client.java.view.ViewQuery
    import org.apache.spark.sql.SparkSession

    If you want to create an RDD for specific documents stored in a bucket, you can specify the IDs directly. The connector fetches them from the server and turns them into an RDD:

    sc.couchbaseGet[JsonDocument](Seq("airline_10123", "airline_10748"))
      .collect()
      .foreach(println)

    Because Spark outputs lots of information, you’ll find the output somewhere in the logs:

    JsonDocument{id='airline_10748', cas=314538279043072, expiry=0, content={"country":"United States","iata":"ZQ","name":"Locair","callsign":"LOCAIR","icao":"LOC","id":10748,"type":"airline"}}
    JsonDocument{id='airline_10123', cas=314538278125568, expiry=0, content={"country":"United States","iata":"TQ","name":"Texas Wings","callsign":"TXW","icao":"TXW","id":10123,"type":"airline"}}

    It is critical that you specify the target document type. Otherwise, the client does not know how to convert it. The main reason is that Couchbase has first-class JSON support, but is also able to store any data. You can even store serialized objects or protobuf-encoded documents, but then you’ll loose the secondary indexing capabilities.

    If you are unsure what to pick, stick with the JsonDocument. If you need raw access to the JSON data, you can also use the RawJsonDocument.

    You can perform a (spatial) view query to extract rows and turn them into an RDD. Given the following view against the travel-sample bucket:

    function(doc, meta) {
    	if (doc.type == "airline") {
    		emit(doc.name, null);
    	}
    }

    Creating an RDD for the first 10 results works like this:

    sc.couchbaseView(ViewQuery.from("airlines", "by_name").limit(10))
      .collect()
      .foreach(println)

    And prints the following:

    CouchbaseViewRow(airline_10,40-Mile Air,null)
    CouchbaseViewRow(airline_792,Access Air,null)
    CouchbaseViewRow(airline_665,AD Aviation,null)
    CouchbaseViewRow(airline_21,Aigle Azur,null)
    CouchbaseViewRow(airline_1191,Air Austral,null)
    CouchbaseViewRow(airline_139,Air Caledonie International,null)
    CouchbaseViewRow(airline_567,Air Carabes,null)
    CouchbaseViewRow(airline_149,Air Cargo Carriers,null)
    CouchbaseViewRow(airline_16881,Air Cudlua,null)
    CouchbaseViewRow(airline_882,Air Florida,null)

    If you want to load the full content for each document, you can mix both approaches:

    sc.couchbaseView(ViewQuery.from("airlines", "by_name").limit(10))
      .map(_.id)
      .couchbaseGet[JsonDocument]()
      .collect()
      .foreach(println)

    Here you can see that couchbaseGet is not just available on the context, but also on every RDD[String]. Neat, right? The exact same approach is also available for spatial views (just use the couchbaseSpatialView() method instead).

    You can utilize N1QL to perform efficient queries against your JSON data in a Couchbase Bucket. The following query is very similar to the one performed before through Views, just to show how the different approaches work.

    You need to at least have a primary index created on the travel-sample bucket to make the following examples work. If you haven’t done that already, perform a CREATE PRIMARY INDEX ON `travel-sample` query.
    val query = "SELECT name FROM `travel-sample` WHERE type = 'airline' ORDER BY name ASC LIMIT 10"
    sc.couchbaseQuery(N1qlQuery.simple(query))
      .collect()
      .foreach(println)

    And if you want to fetch the full document contents based on some criteria, here is a little more complex example:

    val query = "SELECT META(`travel-sample`).id as id FROM `travel-sample` WHERE type = 'airline' ORDER BY name ASC LIMIT 10"
    
    sc.couchbaseQuery(N1qlQuery.simple(query))
      .map(_.value.getString("id"))
      .couchbaseGet[JsonDocument]()
      .collect()
      .foreach(println)

    While this gives you the most flexibility with querying, we recommend using the higher level Spark SQL components through the DataFrame API. See the Spark SQL section for more information.

    Finally, if you’re running Couchbase Analytics (available in Couchbase Server 6.0.0 and above), you can query it like this:

    sc.couchbaseAnalytics(AnalyticsQuery.simple("""SELECT "Hello, world!" AS greeting"""))
      .collect()
      .foreach(println(_))

    Persisting RDDs

    Creating an RDD is only half the story. After you’ve done your aggregation, filtering, and machine learning, you normally want to persist the results somewhere. Couchbase provides the saveToCouchbase() method on every RDD[Document].

    The following example extracts all airline names through N1QL, aggregates them by country, and stores each list as a separate document. Since RDDs are source agnostic, the same approach can be used for example to load data out of HDFS and then store the results back in Couchbase.

    val query = "SELECT name, country FROM `travel-sample` WHERE type = 'airline' ORDER BY name"
    sc
      .couchbaseQuery(N1qlQuery.simple(query))
      .groupBy(_.value.getString("country"))
      .map(pair => {
        val airports = JsonArray.create()
        val content = JsonObject.create().put("airports", airports)
        pair._2.map(_.value.getString("name")).foreach(airports.add)
        JsonDocument.create("airports::" + pair._1, content)
      })
      .saveToCouchbase()

    This will create a document in the bucket for each country containing a list of airports. For example the document with the ID airports::United States contains this:

    {
      "airports": [
        "40-Mile Air",
        "ATA Airlines",
        "Access Air",
        "Air Cargo Carriers",
        "Air Florida",
        ...
        "United States Air Force",
        "Usa Sky Cargo",
        "Virgin America",
        "Vision Airlines (V2)",
        "Western Airlines",
        "World Airways",
        "XAIR USA",
        "XOJET",
        "Yellowtail"
      ]
    }

    By default, the saveToCouchbase() method will use the StoreMode.UPSERT. There are more options available, including inserting and replacing (ignoring or failing on error).

    We are also working on better support for automatic conversions from different types so that you don’t have to map to a specific document type manually.

    Parallelism

    saveToCouchbase() will, on each active executor, write the documents to Couchbase in batches. The default batch size is 128 - that is, 128 documents will be inserted concurrently, per executor - and this can be configured using the maxConcurrent parameter.

    If timeouts or other issues are seen during writing, the application can try a) reducing the batch size, and b) adjusting the error retry settings. The retry settings can be adjusted like this:

    val conf = new SparkConf()
      // ... skipping some settings
      // Maximum number of times to retry on error.  Here we're optimising for resilience over speed.
      .set("com.couchbase.maxRetries", "10")
      // The delay will increase exponentially from the minimum to the maximum, in milliseconds.
      .set("com.couchbase.maxRetryDelay", "5000")
      .set("com.couchbase.minRetryDelay", "1000")

    Subdocument API

    If you are running Couchbase Server 4.5 or later, you can use the “subdoc” API to fetch only a subset of the fields from a Document. This allows you to reduce network overhead, and you need to move less data between operations.

    The couchbaseSubdocLookup method is available on both the SparkContext and the RDD and accepts two/three arguments, depending on the context:

    • ids: The list of IDs to fetch the fragments from.

    • get: The paths of the JSON document that should be returned.

    • exists: The paths of the JSON document that should be checked for existence.

    The following example fetches only two fields from an airline and checks if the foobar field exists in the document:

    sc.parallelize(Seq("airline_10123"))
      .couchbaseSubdocLookup(get = Seq("name", "iata"), exists = Seq("foobar"))
      .collect()
      .foreach(println)

    This prints:

    SubdocLookupResult(airline_10123,0,Map(name -> Texas Wings, iata -> TQ),Map(foobar -> false))

    You can then extract the information from the SubdocLookupResult and use them in your RDD flow. Note that the Couchbase SDK also supports subdocument mutations which will be added to the Spark Connector in a future release.