Getting Started
To get started with the Couchbase Spark connector quickly, learn how to add the connector to your Spark project and run simple queries.
Quickstart
Because most people using Spark are in the Scala ecosystem, the following examples use Scala and its sbt dependency manager.
Create a new sbt project and add the following content to the build.sbt
file.
This code includes all the Spark dependencies, as well as the Couchbase Spark connector.
Just enough to get you started!
Here is a reference to the Scala docs.
name := "my-first-couchbase-spark-project"
organization := "my.organization"
version := "1.0.0-SNAPSHOT"
scalaVersion := "2.11.12"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.3.0",
"org.apache.spark" %% "spark-streaming" % "2.3.0",
"org.apache.spark" %% "spark-sql" % "2.3.0",
"com.couchbase.client" %% "spark-connector" % "2.3.0"
)
Now, under src/main/scala/, create a Quickstart.scala
class with the following skeleton:
object Quickstart {
def main(args: Array[String]): Unit = {
}
}
If you are not familiar with Scala, this is more or less the equivalent to Java’s public static void main(String[] args)
method.
Because empty methods make the compiler sad, fill it with proper code now.
When it comes to Spark, you always need to set up a configuration and initialize the SparkSession
(or as in pre-2.0 versions the SparkContext
directly) object.
The following snippet does that and also instructs the Couchbase Spark connector to open a bucket in the background.
// Configure Spark
val spark = SparkSession
.builder()
.appName("KeyValueExample")
.master("local[*]") // use the JVM as the master, great for testing
.config("spark.couchbase.nodes", "127.0.0.1") // connect to Couchbase Server on localhost
.config("spark.couchbase.username", "Administrator") // with given credentials
.config("spark.couchbase.password", "password")
.config("spark.couchbase.bucket.travel-sample", "") // open the travel-sample bucket
.getOrCreate()
// The SparkContext for easy access
val sc = spark.sparkContext
By default, the Spark Connector connects to the default
bucket.
However, this example uses actual data from the travel-sample
bucket that ships with Couchbase Server, so the code connects to that bucket.
The Couchbase Spark connector also supports opening more buckets in parallel.
Creating a JAR for use with spark-submit
By compiling the application into a JAR file, it can be run with the standard Spark tool spark-submit
.
In this case, the Spark dependencies can be declared as provided
, as they will be supplied by spark-submit
:
name := "my-first-couchbase-spark-project"
organization := "my.organization"
version := "1.0.0-SNAPSHOT"
scalaVersion := "2.11.12"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.3.0" % "provided",
"org.apache.spark" %% "spark-streaming" % "2.3.0" % "provided",
"org.apache.spark" %% "spark-sql" % "2.3.0" % "provided",
"com.couchbase.client" %% "spark-connector" % "2.3.0"
)
spark-submit
allows the Spark session’s configuration to be supplied on the command line, so the Quickstart example above can be simplified to:
// Configure Spark
val spark = SparkSession
.builder()
.appName("KeyValueExample")
.getOrCreate()
The next step is to build a “fat JAR” containing all the application’s dependencies so it can easily be run with spark-submit
.
One good option is to use the sbt-assembly plugin.
Finally, the created JAR can be run, supplying the required Couchbase configuration options:
spark-submit --master local[*]
--conf spark.couchbase.username=Administrator
--conf spark.couchbase.password=password
--conf spark.couchbase.bucket.travel-sample=""
--class Quickstart target/scala-2.11/quickstart-assembly-0.1.jar
Creating and saving RDDs
Now that the SparkContext
is created, you can perform operations against Couchbase.
A common scenario is to create resilient distributed datasets (RDDs) out of documents stored in Couchbase.
The easiest one is probably by passing in Document IDs as strings.
Before getting further into the actual code, make sure to have the following items imported, because otherwise the implicit methods won’t be available.
import com.couchbase.spark._
Use the couchbaseGet
method on the SparkContext
to fetch documents from Couchbase and create an RDD.
sc
.couchbaseGet[JsonDocument](Seq("airline_10123", "airline_10748"))
.collect()
.foreach(println)
Make sure to specify which document type you want (in this case, a JsonDocument
).
If you forget to specify the document type, an exception is thrown because the connector does not know what format to use for the results.
Then call collect()
to aggregate the results and print them out to the command line.
Spark outputs lots of information, so you’ll find the output somewhere in the logs:
JsonDocument{id='airline_10748', cas=314538279043072, expiry=0, content={"country":"United States","iata":"ZQ","name":"Locair","callsign":"LOCAIR","icao":"LOC","id":10748,"type":"airline"}}
JsonDocument{id='airline_10123', cas=314538278125568, expiry=0, content={"country":"United States","iata":"TQ","name":"Texas Wings","callsign":"TXW","icao":"TXW","id":10123,"type":"airline"}}
But since just loading data is half the fun, the connector also provides a convenient way to save documents. The following code loads documents like before, but then modifies its contents and ID before saving it back. You can imagine taking any kind of data source, mapping them to documents and just storing them back in Couchbase.
sc
.couchbaseGet[JsonDocument](Seq("airline_10123", "airline_10748"))
.map(oldDoc => {
val id = "my_" + oldDoc.id()
val content = JsonObject.create().put("name", oldDoc.content().getString("name"))
JsonDocument.create(id, content)
})
.saveToCouchbase()
We utilize the saveToCouchbase()
method available on our RDD to store a modified version of our original JsonDocument
.
Go find your modified document in the Couchbase Server UI! Look for "my_airline_10123"
which will just have the name of the airline as its content.
Congratulations! You’ve successfully performed your first ETL job (extract-transform-load) using Couchbase and Spark. Next up is a whirlwind tour of N1QL and Spark DataFrames.
Accessing a Couchbase Bucket
In the Couchbase Java client, operations are performed against the Couchbase cluster using a Bucket object. Generally developers will not need direct access to this Bucket, as the Spark connector provides its own methods to perform operations against the cluster. But if required, a Bucket can be obtained like this:
val cbConfig = CouchbaseConfig(spark.sparkContext.getConf)
val bucket: Bucket = CouchbaseConnection().bucket(cbConfig, BUCKET_NAME)
This must have been specified earlier while configuring the SparkContext, e.g. with a
.config("spark.couchbase.bucket." + BUCKET_NAME, "")
.
Working with DataFrames
DataFrames were introduced in Spark 1.3 and have matured even further in Spark 1.4. The nature of the queries fits very well with what Couchbase N1QL provides.
To try this, you need Couchbase Server version 4.0 or later. |
You need to at least have a primary index created on the travel-sample bucket to make the following examples work.
If you haven’t done already, perform a CREATE PRIMARY INDEX ON `travel-sample` query.
|
In older Spark versions you had to create a SQLContext
like this:
val sql = new SQLContext(sc)
But if you are using the SparkSession
you can access most of the methods directly from it.
Note that you can also always get the SQLContext
out of the session via:
val sql = spark.sqlContext
Also, don’t forget the Couchbase imports again for all the automatic method goodness:
import com.couchbase.spark.sql._
Because a DataFrame
is like an RDD
but with a schema and Couchbase is a schemaless database at its heart, you need a way to either define or infer a schema.
The connector has built-in schema inference, but if you have a large or diverse data set, you need to give it some clues on filtering.
Suppose you want a DataFrame
for all airlines, and you know that the JSON content has a type
field with the value airline
.
You can pass this information to the connector for automatic schema inference:
// Create a DataFrame with Schema Inference
val airlines = sql.read.couchbase(schemaFilter = EqualTo("type", "airline"))
// Print The Schema
airlines.printSchema()
The code automatically infers the schema and prints it in this format:
root |-- META_ID: string (nullable = true) |-- callsign: string (nullable = true) |-- country: string (nullable = true) |-- iata: string (nullable = true) |-- icao: string (nullable = true) |-- id: long (nullable = true) |-- name: string (nullable = true) |-- type: string (nullable = true)
Next you can perform an actual query where you are interested only in the name
and callsign
.
This example sorts it by the callsign
and loads only the first 10 rows.
airlines
.select("name", "callsign")
.sort(df("callsign").desc)
.show(10)
The code prints the results on the console like this:
+-------+--------------------+ | name| callsign| +-------+--------------------+ | EASY| easyJet| | BABY| bmibaby| |MIDLAND| bmi| | null| Yellowtail| | null| XOJET| |STARWAY| XL Airways France| | XAIR| XAIR USA| | WORLD| World Airways| |WESTERN| Western Airlines| | RUBY|Vision Airlines (V2)| +-------+--------------------+
Working with Datasets
Spark 1.6 introduces Datasets, a typesafe way to work on top of Spark SQL. Since they are built on top of DataFrames, using them with Couchbase is easy.
The following example creates a Dataset out of a Dataframe and maps it to a case class. It then uses the case class to extract fields out of the result set in a typesafe way:
// Spark SQL Setup
val spark: SparkSession = .... /*setup your spark session as usual*/
import spark.implicits._
val airlines = sql.read.couchbase(schemaFilter = EqualTo("type", "airline")).as[Airline]
// Print schema
airlines.printSchema()
// Print airlines that start with A
println(airlines.map(_.name).filter(_.toLowerCase.startsWith("a")).foreach(println(_)))