Quickstart
This section shows how to add the connector to your application.
Download
The connector is distributed as a ZIP archive which you can download from the Release Notes page.
The connector is also available on Confluent Hub.
If you prefer to build the connector from source:
-
Clone the GitHub repository
-
Run
mvn package
to generate the connector archive -
Look for
couchbase-kafka-connect-couchbase-<version>.zip
under thetarget
directory.
Getting Started
However you obtained the connector archive, go ahead and unzip it now.
The result should be a directory called couchbase-kafka-connect-couchbase-<version>
.
The rest of this guide will refer to this directory as $KAFKA_CONNECT_COUCHBASE_HOME
.
This guide assumes you have already installed Couchbase Server locally and have loaded the sample bucket called travel-sample
.
(It’s fine if you want to use a different bucket;
neither the connector nor this guide depends on the contents of the documents in the bucket.)
You’ll also need a local installation of Apache Kafka or Confluent Platform Kafka.
Set Up Kafka
If you already have an installation of Kafka and know how to start the servers, feel free to skip this section.
Still reading? Don’t worry, setting up a basic installation is pretty easy. Download either Apache Kafka or Confluent Platform Kafka. For simplicity, this guide assumes you’re installing from a ZIP or TAR archive, so steer clear of the deb/rpm packages for now.
Decompress the Apache Kafka or Confluent Platform archive and move the resulting directory under ~/opt
(or wherever you like to keep this kind of software).
The rest of this guide refers to the root of the installation directory as $KAFKA_HOME
or $CONFLUENT_HOME
.
Be aware some config files are in different relative locations depending on whether you’re using Apache Kafka or Confluent Platform Kafka.
Make sure the Kafka command line tools are in your path:
export PATH=<path-to-apache-kafka-or-confluent>/bin:$PATH
Start the Kafka Servers
If you’re using Confluent Platform Kafka, you can use the confluent
command to start the servers in development mode:
confluent local services schema-registry start
If you’re not using Confluent, the commands are slightly different, but the idea is the same. Start the servers by running these commands, each in a separate terminal:
zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
kafka-server-start.sh $KAFKA_HOME/config/server.properties
Couchbase Source Connector
The source connector listens for changes to Couchbase documents and publishes them to a Kafka topic.
If you’re more interested in saving Kafka messages to Couchbase, skip ahead to the Couchbase Sink Connector section. |
Configure the Source Connector
The Couchbase connector distribution includes sample config files.
Look inside $KAFKA_CONNECT_COUCHBASE_HOME/etc
and edit the quickstart-couchbase-source.properties
file.
Take a moment to peruse the configuration options specified here. Some are standard options available to all connectors. The rest are specific to the Couchbase connector.
For this exercise, change the value of couchbase.bucket
to travel-sample
(or whichever bucket you want to stream from).
For couchbase.username
and couchbase.password
, supply the credentials of a Couchbase user who has the "Data DCP Reader" role for the bucket.
If you have not yet created such a user, now is a good time to read about Creating and Managing Users with the UI.
Run the Source Connector
Kafka connectors can be run in standalone or distributed mode. For now let’s run the connector in standalone mode, using the CLASSPATH environment variable to include the Couchbase connector JAR in the class path.
For Confluent Platform Kafka:
cd $KAFKA_CONNECT_COUCHBASE_HOME
env CLASSPATH=lib/* \
connect-standalone $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
etc/quickstart-couchbase-source.properties
Or for Apache Kafka:
cd $KAFKA_CONNECT_COUCHBASE_HOME
env CLASSPATH=lib/* \
connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties \
etc/quickstart-couchbase-source.properties
Alternatively, Run the Connector with Class Loader Isolation
Class loader isolation prevents plugins that use different versions of the same Java library from interfering with one another. Instead of adding the plugin JARs to the class path when running the connect worker, you can install them in a location called the "plugin path."
There are multiple ways to do this.
You can choose whichever method is most convenient.
All of these methods involve the plugin.path
worker config property which is defined in the connect-*.properties
file referenced by the above commands.
Once the connector is in the plugin path (using any of the methods described below), it is no longer necessary to set the CLASSPATH
environment variable when running the connect worker.
Option 1: Use Confluent Platform tools
Confluent Platform users can install the connector by running the confluent-hub install
command:
To install from a ZIP archive on the local filesystem:
confluent-hub install couchbase-kafka-connect-couchbase-<version>.zip
Alternatively, you can download and install the connector directly from the Confluent Hub component repository:
confluent-hub install couchbase/kafka-connect-couchbase:<version>
Option 2: Modify the plugin.path
config property
Alternatively, Edit the connect worker config file (connect-*.properties
) and search for the plugin.path
property.
Change the value of this property to include the full path to the couchbase-kafka-connect-couchbase-<version>
directory (which we have been calling $KAFKA_CONNECT_COUCHBASE_HOME
).
Option 3: Copy the connector to an existing plugin.path
location
Alternatively, copy the $KAFKA_CONNECT_COUCHBASE_HOME/lib
directory to one of the directories already listed in the plugin path.
You may also wish to rename the directory from lib
to kafka-connect-couchbase
.
For example, if the plugin.path
property is /opt/connectors
, you want to end up with a directory structure like:
opt
`-- connectors
`-- kafka-connect-couchbase
|-- kafka-connect-couchbase-<version>.jar
|-- java-client-<version>.jar
|-- core-io-<version>.jar
`-- (and the other JARs too)
Observe Messages Published by Couchbase Source Connector
The sample config file tells the source connector to publish to a topic called test-default
.
Let’s use the Kafka command-line tools to spy on the contents of the topic.
For Confluent Platform Kafka:
kafka-console-consumer --bootstrap-server localhost:9092 \
--property print.key=true \
--topic test-default --from-beginning
When a topic contains messages in Avro format, Confluent users should view the messages by running kafka-avro-console-consumer instead of kafka-console-consumer .
|
Or for Apache Kafka:
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--property print.key=true \
--topic test-default --from-beginning
Each line of the output represents a document in Couchbase. Every time a Couchbase document is created, modified, or deleted, the console consumer prints another line containing the updated version of the document.
Once the consumer catches up to the current state of the bucket, try creating, updating, or deleting a document via the Couchbase Web Console and observe how the change is propagated to the Kafka topic.
Changing the format of published messages
A component called a "source handler" determines the content of the published messages.
The sample config uses RawJsonSourceHandler
, which publishes JSON messages identical to the Couchbase documents. This section describes how to use this source handler, and then discusses alternate source handlers.
RawJsonSourceHandler
This handler always publishes records in JSON format, and requires the value converter be set to ByteArrayConverter
which acts as a pass-through for the output byte array containing the JSON.
key.converter=org.apache.kafka.connect.storage.StringConverter
couchbase.source.handler=com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
When a Couchbase document is deleted, RawJsonSourceHandler
sends a Kafka message with a null value.
If instead you wish to ignore deletion events, filter them out with the DropIfNullValue
transform:
transforms=ignoreDeletes
transforms.ignoreDeletes.type=com.couchbase.connect.kafka.transform.DropIfNullValue
As a performance optimization, RawJsonSourceHandler
and its cousin RawJsonWithMetadataSourceHandler
create Kafka Connect records whose values are byte arrays.
If you wish to use these handlers together with transforms that modify document content, the record value must be converted from a byte array to a compatible format.
To do this, include the DeserializeJson
transform as the first in the chain and set value.converter
to JsonConverter
instead of ByteArrayConverter
like so:
couchbase.source.handler=com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
transforms=deserializeJson,someOtherTransform
transforms.deserializeJson.type=com.couchbase.connect.kafka.transform.DeserializeJson
transforms.someOtherTransform.type=...
RawJsonWithMetadataSourceHandler
This source handler is similar to RawJsonSourceHandler
, but it wraps the
Couchbase document content in an envelope that includes document metadata.
Like RawJsonSourceHandler
, it requires the ByteArrayConverter
value converter (unless you are using Single Message Transforms, in which case you should use JsonConverter
and execute DeserializeJson
as the first transform).
key.converter=org.apache.kafka.connect.storage.StringConverter
couchbase.source.handler=com.couchbase.connect.kafka.handler.source.RawJsonWithMetadataSourceHandler
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
When you use this source handler, each record has an event
field whose value indicates the type of change represented by the message.
The possible values are:
-
mutation
: A change to document content, including creation and changes made via subdocument commands. -
deletion
: Removal or expiration of the document. -
expiration
: Reserved for document expiration (Couchbase Server does not currently send this event type, but may in future versions).
For mutation messages, the entire content of the Couchbase document is present as the value of the content
field.
DefaultSchemaSourceHandler
This source handler generates records whose values contain the same kind of metadata envelope as RawJsonWithMetadataSourceHandler
.
It differs in that it defines a schema for the envelope.
You can use this source handler with any value converter; specify whichever converter matches your desired publication format.
couchbase.source.handler=com.couchbase.connect.kafka.handler.source.DefaultSchemaSourceHandler
The schema used by this source handler defines the Couchbase document content to be a byte array.
If you use JsonConverter , this byte array will be serialized as a single Base64-encoded string.
If this is not the behavior you want, consider using one of the raw JSON source handlers instead.
|
For reference, the Avro schema for this payload format is shown below.
{
"type": "record",
"name": "DcpMessage",
"namespace": "com.couchbase",
"fields": [
{
"name": "event",
"type": "string"
},
{
"name": "partition",
"type": {
"type": "int",
"connect.type": "int16"
}
},
{
"name": "key",
"type": "string"
},
{
"name": "cas",
"type": "long"
},
{
"name": "bySeqno",
"type": "long"
},
{
"name": "revSeqno",
"type": "long"
},
{
"name": "expiration",
"type": [
"null",
"int"
]
},
{
"name": "flags",
"type": [
"null",
"int"
]
},
{
"name": "lockTime",
"type": [
"null",
"int"
]
},
{
"name": "content",
"type": [
"null",
"bytes"
]
}
],
"connect.name": "com.couchbase.DcpMessage"
}
Writing a custom SourceHandler
If none of the existing source handlers meet your requirements, you can write your own. The connector’s GitHub repository includes an example project you can use as a template for creating your own source handlers and Single Message Transforms.
Couchbase Sink Connector
Now let’s talk about the sink connector, which reads messages from one or more Kafka topics and writes them to Couchbase Server.
The sink connector will attempt to convert message values to JSON. If the conversion fails, the connector will fall back to treating the value as a String BLOB.
If the Kafka key is a primitive type, the connector will use it as the document ID.
If the Kafka key is absent or of complex type (array or struct), the document ID will be generated as topic/partition/offset
.
Alternatively, the document ID can come from the body of the Kafka message.
Provide a couchbase.document.id
property whose value is a JSON Pointer identifying the document ID node.
If you want the connector to remove this node before persisting the document to Couchbase, provide a couchbase.remove.document.id
property with value true
.
If the connector fails to locate the document ID node, it will fall back to using the Kafka key or topic/partition/offset
as described above.
As of version 3.2.2, if the Kafka message body is null, the sink connector will delete the Couchbase document whose ID matches the Kafka message key.
Configure and Run the Sink Connector
In the $KAFKA_CONNECT_COUCHBASE_HOME/etc
directory there is a file called quickstart-couchbase-sink.properties
.
Customize this file as described in Configure the Source Connector, only now the bucket will receive messages, and the user must have write access to the bucket.
Note: Make sure to specify an existing bucket, otherwise the sink connector will fail. You may wish to create a new bucket to receive the messages.
To run the sink connector, use the same command as described in Run the Source Connector, but pass quickstart-couchbase-sink.properties
as the second argument to connect-standalone
instead of quickstart-couchbase-source.properties
.
Send Test Messages
Now that the Couchbase Sink Connector is running, let’s give it some messages to import:
git clone https://github.com/couchbase/kafka-connect-couchbase.git
cd kafka-connect-couchbase/examples/json-producer
mvn compile exec:java
The producer will send some messages and then terminate. If all goes well, the messages will appear in the Couchbase bucket you specified in the sink connector config.
If you wish to see how the Couchbase Sink Connector behaves in the absence of message keys, modify the publishMessage
method in the example source code to set the message keys to null, then rerun the producer.
Alternatively, if you want the Couchbase document ID to be the airport code, edit quickstart-couchbase-sink.properties
and set couchbase.document.id=/airport
, restart the sink connector, and run the producer again.
Modify Documents Before Writing to Couchbase
Kafka Connect supports Single Message Transforms that let you change the structure or content of a message. To experiment with this feature, try adding these lines to your sink connector configuration:
transforms=addMagicWord
transforms.addMagicWord.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.addMagicWord.static.field=magicWord
transforms.addMagicWord.static.value=xyzzy
Now if you restart the sink connector and send some more test messages, each new Couchbase document should have a "magicWord" field with value "xyzzy".
If the built-in transforms are not sufficient, you can write Java code to implement more complex logic.
The custom-extensions
project on GitHub includes a sample CustomTransform
which you can use as a starting point for creating your own transforms.
Parent topic: Kafka Connector
Next topic: Source Configuration Options