A newer version of this documentation is available.

View Latest

Java API

    +
    In addition to the primary Scala API, the connector provides convenience APIs when accessed from Java.

    Couchbase from the SparkContext

    To use the Java API in spark, you need to initialize a JavaSparkContext:

    SparkConf conf = new SparkConf()
        .setAppName("javaSample")
        .setMaster("local[*]")
        .set("com.couchbase.bucket.travel-sample", "");
    
    JavaSparkContext sc = new JavaSparkContext(conf);

    Since Spark 2.0 you can also use the SparkSession:

    SparkSession spark = SparkSession
    		.builder()
    		.appName("JavaExample")
    		.master("local[*]") // use the JVM as the master, great for testing
    		.config("spark.couchbase.nodes", "127.0.0.1") // connect to couchbase on localhost
    		.config("spark.couchbase.bucket.travel-sample", "") // open the travel-sample bucket with empty password
    		.getOrCreate();
    
    // The Java wrapper around the SparkContext
    JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());

    Since Java doesn’t have the implicit imports like Scala, the connector provides a helper class to achieve similar functionality:

    // The Couchbase-Enabled spark context
    CouchbaseSparkContext csc = couchbaseContext(sc);

    The context is a static import. In general you want to statically import the following:

    import static com.couchbase.spark.japi.CouchbaseDocumentRDD.couchbaseDocumentRDD;
    import static com.couchbase.spark.japi.CouchbaseSparkContext.couchbaseContext;

    Now you can create RDDs through Key/Value, Views or N1QL:

    // Load docs through K/V
    List<JsonDocument> docs = csc
        .couchbaseGet(Arrays.asList("airline_10226", "airline_10748"))
        .collect();
    
    System.out.println(docs);
    // Perform a N1QL query
    List<CouchbaseQueryRow> results = csc
        .couchbaseQuery(N1qlQuery.simple("SELECT * FROM `travel-sample` LIMIT 10"))
        .collect();
    
    System.out.println(results);

    Mapping RDDs to Couchbase APIs

    An RDD can be wrapped with the couchbaseRDD static method to expose all the functions available. So instead of fetching the documents right from the SparkContext it can also be done like this:

    import static com.couchbase.spark.japi.CouchbaseRDD.couchbaseRDD;
    //...
    JavaRDD<String> ids = sc.parallelize(Arrays.asList("airline_10226", "airline_10748"));
    docs = couchbaseRDD(ids).couchbaseGet().collect();
    System.out.println(docs);

    The CouchbaseRDD exposes the following methods:

    • couchbaseGet: Fetch documents via their unique Document ID.

    • couchbaseSubdocLookup: Fetch fragments of a document.

    • couchbaseView: Query a Couchbase View.

    • couchbaseSpatialView: Query a Couchbase Spatial View.

    • couchbaseQuery: Perform a N1QL Query.

    Using SparkSQL with Couchbase from Java

    Using SparkSQL from Java is possible because the Java API provides wrappers for both the DataFrameReader and DataFrameWriter APIs. All you need to do is wrap the ones that are returned by Spark and wrap them like in the following example to get access to all couchbase specific methods:

    import static com.couchbase.spark.japi.CouchbaseDataFrameReader.couchbaseReader;
    //...
    
    // Use SparkSQL from Java
    SQLContext sql = new SQLContext(sc);
    
    // Wrap the Reader and create the DataFrame from Couchbase
    Dataset<Row> airlines = couchbaseReader(sql.read()).couchbase(new EqualTo("type", "airline"));
    
    // Print the number of airline
    System.out.println("Number of Airlines: " + airlines.count());

    Using Datasets with Couchbase

    Since Datasets work with actual Java objects, first create one:

    import java.io.Serializable;
    
    public class Airport implements Serializable {
        private String name;
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    }

    Next, you can convert a DataFrame to a Dataset through the .as() API in Spark 1.6:

    Dataset<Airport> airports = couchbaseReader(sql.read())
    	.couchbase(new EqualTo("type", "airport"))
    	.select(new Column("airportname").as("name"))
    	.as(Encoders.bean(Airport.class));
    
    	List<Airport> allAirports = airports.collectAsList();
    	System.out.println(allAirports.size());

    Writing to Couchbase

    If you want to store Documents in Couchbase, use the couchbaseDocumentRDD method:

    couchbaseDocumentRDD(
        sc.parallelize(Arrays.asList(JsonDocument.create("doc1", JsonObject.empty())))
    ).saveToCouchbase();