A newer version of this documentation is available.

View Latest

Quickstart

    +

    This section shows how to add the connector to your application.

    Download

    The connector is shipped as a zip archive which you can download from the Release Notes page.

    If you prefer to build the connectors from source:

    1. Clone the GitHub repository

    2. Run mvn package to generate the connector archive

    3. Look for kafka-connect-couchbase-<version>.zip in the target directory.

    Getting Started

    However you obtained the connector archive, go ahead and unzip it now. The result should be a directory called kafka-connect-couchbase-<version>. The rest of this guide will refer to this directory as $KAFKA_CONNECT_COUCHBASE_HOME.

    This guide assumes you have already installed Couchbase Server locally and have loaded the sample bucket called travel-sample. (It’s fine if you want to use a different bucket; neither the connector nor this guide depend on the contents of the documents in the bucket.)

    You’ll also need a local installation of Apache Kafka or Confluent Platform Kafka.

    Set Up Kafka

    If you already have an installation of Kafka and know how to start the servers, feel free to skip this section.

    Still reading? Don’t worry, setting up a basic installation is pretty easy. Download either Apache Kafka or Confluent Platform Kafka. For simplicity, this guide assumes you’re installing from a ZIP or TAR archive, so steer clear of the deb/rpm packages for now.

    Decompress the Apache Kafka or Confluent Platform archive and move the resulting directory under ~/opt (or wherever you like to keep this kind of software). The rest of this guide refers to the root of the installation directory as $KAFKA_HOME or $CONFLUENT_HOME. Be aware that some config files are in different relative locations depending on whether you’re using Apache Kafka or Confluent Platform Kafka.

    Make sure the Kafka command line tools are in your path:

    export PATH=<path-to-apache-kafka-or-confluent>/bin:$PATH

    Start the Kafka Servers

    If you’re using Confluent Platform Kafka, start the servers by running these commands, each in a separate terminal:

    zookeeper-server-start $CONFLUENT_HOME/etc/kafka/zookeeper.properties
    kafka-server-start $CONFLUENT_HOME/etc/kafka/server.properties
    schema-registry-start $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties
    Confluent 3.3.0 introduced a CLI tool that lets you start all the servers with a single command. It also provides a more sophisticated way to manage connectors than the technique described in this guide. Read about it here.

    If you’re not using Confluent, the commands are slightly different, but the idea is the same. Start the servers by running these commands, each in a separate terminal:

    zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
    kafka-server-start.sh $KAFKA_HOME/config/server.properties

    Couchbase Source Connector

    The source connector listens for changes to Couchbase documents and publishes them to a Kafka topic.

    If you’re more interested in saving Kafka messages to Couchbase, skip ahead to the Couchbase Sink Connector section.

    Configure the Source Connector

    The Couchbase connector distribution includes sample config files. Look inside $KAFKA_CONNECT_COUCHBASE_HOME/config and edit the quickstart-couchbase-source.properties file.

    Take a moment to peruse the configuration options specified here. Some are standard options available to all connectors. The rest are specific to the Couchbase connector.

    For this exercise, change the value of connection.bucket to travel-sample (or whichever bucket you want to stream from). For connection.username and connection.password, supply the credentials of a Couchbase user who has the "Data DCP Reader" role for the bucket. If you have not yet created such a user, now is a good time to read about Creating and Managing Users with the UI.

    For Couchbase Server versions prior to 5.0, leave the username blank. Set the password property to the bucket password, or leave it blank if the bucket does not have a password. The sample buckets do not have passwords.

    Run the Source Connector

    Kafka connectors can be run in standalone or distributed mode. For now let’s run the connector in standalone mode, using the CLASSPATH environment variable to include the Couchbase connector JAR in the class path.

    For Confluent Platform Kafka:

    cd $KAFKA_CONNECT_COUCHBASE_HOME
    env CLASSPATH=./* \
        connect-standalone $CONFLUENT_HOME/etc/schema-registry/connect-avro-standalone.properties \
                           config/quickstart-couchbase-source.properties

    Or for Apache Kafka:

    cd $KAFKA_CONNECT_COUCHBASE_HOME
    env CLASSPATH=./* \
        connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties \
                           config/quickstart-couchbase-source.properties

    Alternatively, Run the Connector with Class Loader Isolation

    Apache Kafka version 0.11.0 (and Confluent Platform 3.3.0) introduced a mechanism for plugin class path isolation. To take advantage of this feature, edit the connect worker config file (the connect-*.properties file in the above commands). Modify the plugin.path property to include the parent directory of kafka-connect-couchbase-<version>.jar.

    Run the connector using the same commands as above, but omitting the env CLASSPATH=./* prefix. Each Kafka Connect plugin will use a separate class loader, removing the possibility of dependency conflicts.

    Observe Messages Published by Couchbase Source Connector

    The sample config file tells the source connector to publish to a topic called test-default. Let’s use the Kafka command-line tools to spy on the contents of the topic.

    For Confluent Platform Kafka:

    kafka-avro-console-consumer --bootstrap-server localhost:9092 \
                                --topic test-default --from-beginning
    When a topic contains JSON messages, Confluent users should view the messages by running kafka-console-consumer instead of kafka-avro-console-consumer.

    Or for Apache Kafka:

    kafka-console-consumer.sh --bootstrap-server localhost:9092 \
                              --topic test-default --from-beginning

    The expected output is a stream of Couchbase event notification messages, at least one for each document in the bucket. The messages include document metadata as well as content. The document content is transferred as a byte array (encoded as Base64 if the connector is configured to use JSON for message values).

    Each message has an event field whose value indicates the type of change represented by the message. The possible values are:

    • mutation: A change to document content, including creation and changes made via subdocument commands.

    • deletion: Removal or expiration of the document.

    • expiration: Reserved for document expiration (Couchbase Server does not currently send this event type, but may in future versions).

    Once the consumer catches up to the current state of the bucket, try adding a new document to the bucket via the Couchbase Web Console. The consumer will print a notification of type mutation. Now delete the document and watch for an event of type deletion.

    Perhaps it goes without saying, but all of the offset management and fault tolerance features of Kafka Connect work with the Couchbase connector. You can kill and restart the processes and they will pick up where they left off.

    The shape of the message payload is controlled by the dcp.message.converter.class property of the connector config. By default it is set to com.couchbase.connect.kafka.converter.SchemaConverter, which formats each notification into a structure that holds document metadata and contents. For reference, the Avro schema for this payload format is shown below:

    {
      "type": "record",
      "name": "DcpMessage",
      "namespace": "com.couchbase",
      "fields": [
        {
          "name": "event",
          "type": "string"
        },
        {
          "name": "partition",
          "type": {
            "type": "int",
            "connect.type": "int16"
          }
        },
        {
          "name": "key",
          "type": "string"
        },
        {
          "name": "cas",
          "type": "long"
        },
        {
          "name": "bySeqno",
          "type": "long"
        },
        {
          "name": "revSeqno",
          "type": "long"
        },
        {
          "name": "expiration",
          "type": [
            "null",
            "int"
          ]
        },
        {
          "name": "flags",
          "type": [
            "null",
            "int"
          ]
        },
        {
          "name": "lockTime",
          "type": [
            "null",
            "int"
          ]
        },
        {
          "name": "content",
          "type": [
            "null",
            "bytes"
          ]
        }
      ],
      "connect.name": "com.couchbase.DcpMessage"
    }

    Publishing Raw Json Messages

    The information in this section applies to connector versions 3.4.3 and later.

    Since version 3.4.3 it’s easy to configure the connector to publish schemaless JSON messages identical to the JSON documents from Couchbase. This feature is enabled by setting the dcp.message.converter.class and value.converter source connector configuration properties like so:

    dcp.message.converter.class=com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler
    value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

    When a Couchbase document is deleted, RawJsonSourceHandler sends a Kafka message with a null value. If instead you wish to ignore deletion events, filter them out with the DropIfNullValue transform:

    transforms=ignoreDeletes
    transforms.ignoreDeletes.type=com.couchbase.connect.kafka.transform.DropIfNullValue

    As a performance optimization, RawJsonSourceHandler and its cousin RawJsonWithMetadataSourceHandler create Kafka Connect records whose values are byte arrays. If you wish to use these handlers together with transforms that modify document content, the record value must be converted from a byte array to a compatible format. To do this, include the DeserializeJson transform as the first in the chain and set value.converter to JsonConverter instead of ByteArrayConverter like so:

    dcp.message.converter.class=com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler
    
    value.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable=false
    
    transforms=deserializeJson,someOtherTransform
    transforms.deserializeJson.type=com.couchbase.connect.kafka.transform.DeserializeJson
    transforms.someOtherTransform.type=...

    Couchbase Sink Connector

    Now let’s talk about the sink connector, which reads messages from one or more Kafka topics and writes them to Couchbase Server.

    The sink connector will attempt to convert message values to JSON. If the conversion fails, the connector will fall back to treating the value as a String BLOB.

    If the Kafka key is a primitive type, the connector will use it as the document ID. If the Kafka key is absent or of complex type (array or struct), the document ID will be generated as topic/partition/offset.

    Alternatively, the document ID can come from the body of the Kafka message. Provide a couchbase.document.id property whose value is a JSON Pointer identifying the document ID node. If you want the connector to remove this node before persisting the document to Couchbase, provide a couchbase.remove.document.id property with value true. If the connector fails to locate the document ID node, it will fall back to using the Kafka key or topic/partition/offset as described above.

    As of version 3.2.2, if the Kafka message body is null, the sink connector will delete the Couchbase document whose ID matches the Kafka message key.

    Configure and Run the Sink Connector

    In the $KAFKA_CONNECT_COUCHBASE_HOME/config directory there is a file called quickstart-couchbase-sink.properties. Customize this file as described in Configure the Source Connector, only now the bucket will receive messages and the user must have write access to the bucket.

    Note: Make sure to specify an existing bucket, otherwise the sink connector will fail. You may wish to create a new bucket to receive the messages.

    To run the sink connector, use the same command as described in Run the Source Connector, but pass quickstart-couchbase-sink.properties as the second argument to connect-standalone instead of quickstart-couchbase-source.properties.

    Send Test Messages

    Now that the Couchbase Sink Connector is running, let’s give it some messages to import:

    cd $KAFKA_CONNECT_COUCHBASE_HOME/examples/json-producer
    mvn compile exec:java

    The producer will send some messages and then terminate. If all goes well, the messages will appear in the Couchbase bucket you specified in the sink connector config.

    If you wish to see how the Couchbase Sink Connector behaves in the absence of message keys, modify the publishMessage method in the example source code to set the message keys to null, then rerun the producer.

    Alternatively, if you want the Couchbase document ID to be the airport code, edit quickstart-couchbase-sink.properties and set couchbase.document.id=/airport, restart the sink connector, and run the producer again.

    Modify Documents Before Writing to Couchbase

    Kafka Connect supports Single Message Transforms that let you change the structure or content of a message. To experiment with this feature, try adding these lines to your sink connector configuration:

    transforms=addMagicWord
    transforms.addMagicWord.type=org.apache.kafka.connect.transforms.InsertField$Value
    transforms.addMagicWord.static.field=magicWord
    transforms.addMagicWord.static.value=xyzzy

    Now if you restart the sink connector and send some more test messages, each new Couchbase document should have a "magicWord" field with value "xyzzy".

    If the built-in transforms are not sufficient, you can write Java code to implement more complex logic. Since version 3.4.3, the examples/custom-extensions project in $KAFKA_CONNECT_COUCHBASE_HOME includes a sample CustomTransform which may be used as a starting point for creating your own transforms.

    Parent topic: Kafka Connector