Sink Configuration Options

    +

    Reference of the sink connector options.

    Connection

    couchbase.seed.nodes

    Addresses of Couchbase Server nodes, delimited by commas.

    If a custom port is specified, it must be the KV port (which is normally 11210 for insecure connections, or 11207 for secure connections).

    • Type: list

    • Importance: high

    couchbase.username

    Name of the Couchbase user to authenticate as.

    • Type: string

    • Importance: high

    couchbase.password

    Password of the Couchbase user.

    May be overridden with the KAFKA_COUCHBASE_PASSWORD environment variable.

    • Type: password

    • Importance: high

    couchbase.bucket

    Name of the Couchbase bucket to use.

    This property is required unless using the experimental AnalyticsSinkHandler.

    • Type: string

    • Default: ""

    • Importance: high

    couchbase.network

    The network selection strategy for connecting to a Couchbase Server cluster that advertises alternate addresses.

    A Couchbase node running inside a container environment (like Docker or Kubernetes) might be configured to advertise both its address within the container environment (known as its "default" address) as well as an "external" address for use by clients connecting from outside the environment.

    Setting the 'couchbase.network' config property to 'default' or 'external' forces the selection of the respective addresses. Setting the value to 'auto' tells the connector to select whichever network contains the addresses specified in the 'couchbase.seed.nodes' config property.

    • Type: string

    • Default: auto

    • Importance: medium

    couchbase.bootstrap.timeout

    On startup, the connector will wait this long for a Couchbase connection to be established. If a connection is not established before the timeout expires, the connector will terminate.

    • Type: string

    • Default: 30s

    • Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m

    • Importance: medium

    Security

    couchbase.enable.tls

    Use secure connection to Couchbase Server.

    If true, you must also tell the connector which certificate to trust. Specify a certificate file with 'couchbase.trust.certificate.path', or a Java keystore file with 'couchbase.trust.store.path' and 'couchbase.trust.store.password'.

    couchbase.enable.hostname.verification

    Set this to false to disable TLS hostname verification for Couchbase connections. Less secure, but might be required if for some reason you can’t issue a certificate whose Subject Alternative Names match the hostname used to connect to the server. Only disable if you understand the impact and can accept the risks.

    • Type: boolean

    • Default: true

    • Importance: medium

    couchbase.trust.store.path

    Absolute filesystem path to the Java keystore holding the CA certificate used by Couchbase Server.

    If you want to use a PEM file instead of a Java keystore, specify couchbase.trust.certificate.path instead.

    • Type: string

    • Default: ""

    • Importance: medium

    couchbase.trust.store.password

    Password for accessing the trust store.

    May be overridden with the KAFKA_COUCHBASE_TRUST_STORE_PASSWORD environment variable.

    • Type: password

    • Default: [hidden]

    • Importance: medium

    couchbase.trust.certificate.path

    Absolute filesystem path to the PEM file containing the CA certificate used by Couchbase Server.

    If you want to use a Java keystore instead of a PEM file, specify couchbase.trust.store.path instead.

    • Type: string

    • Default: ""

    • Importance: medium

    couchbase.client.certificate.path

    Absolute filesystem path to a Java keystore or PKCS12 bundle holding the private key and certificate chain to use for client certificate authentication (mutual TLS).

    If you supply a value for this config property, the couchbase.username and couchbase.password properties will be ignored.

    • Type: string

    • Default: ""

    • Importance: medium

    couchbase.client.certificate.password

    Password for accessing the client certificate.

    May be overridden with the KAFKA_COUCHBASE_CLIENT_CERTIFICATE_PASSWORD environment variable.

    • Type: password

    • Default: [hidden]

    • Importance: medium

    Logging

    couchbase.log.redaction

    Determines which kinds of sensitive log messages from the Couchbase connector will be tagged for later redaction by the Couchbase log redaction tool. NONE = no tagging; PARTIAL = user data is tagged; FULL = user, meta, and system data is tagged.

    • Type: string

    • Default: NONE

    • Valid Values: One of [NONE, PARTIAL, FULL]

    • Importance: medium

    couchbase.log.document.lifecycle

    If true, document lifecycle milestones will be logged at INFO level instead of DEBUG. Enabling this feature lets you watch documents flow through the connector. Disabled by default because it generates many log messages.

    • Type: boolean

    • Default: false

    • Importance: medium

    Sink Behavior

    couchbase.default.collection

    Qualified name (scope.collection) of the destination collection for messages from topics that don’t have an entry in the couchbase.topic.to.collection map.

    • Type: string

    • Default: _default._default

    • Valid Values: A collection name qualified by a scope name (scope.collection)

    • Importance: medium

    couchbase.topic.to.collection

    A map from Kafka topic to Couchbase collection.

    Topic and collection are joined by an equals sign. Map entries are delimited by commas.

    For example, if you want to write messages from topic "topic1" to collection "scope-a.invoices", and messages from topic "topic2" to collection "scope-a.widgets", you would write: "topic1=scope-a.invoices,topic2=scope-a.widgets".

    Defaults to an empty map, with all documents going to the collection specified by couchbase.default.collection.

    • Type: list

    • Default: ""

    • Valid Values: topic=scope.collection,…​

    • Importance: medium

    couchbase.sink.handler

    The fully-qualified class name of the sink handler to use. The sink handler determines how the Kafka record is translated into actions on Couchbase documents.

    The built-in handlers are: com.couchbase.connect.kafka.handler.sink.UpsertSinkHandler, com.couchbase.connect.kafka.handler.sink.N1qlSinkHandler, and com.couchbase.connect.kafka.handler.sink.SubDocumentSinkHandler.

    You can customize the sink connector’s behavior by implementing your own SinkHandler.

    • Type: class

    • Default: com.couchbase.connect.kafka.handler.sink.UpsertSinkHandler

    • Importance: medium

    couchbase.document.mode

    Overrides the couchbase.sink.handler property.

    A value of N1QL forces the handler to com.couchbase.connect.kafka.handler.sink.N1qlSinkHandler. A value of SUBDOCUMENT forces the handler to com.couchbase.connect.kafka.handler.sink.SubDocumentSinkHandler.

    DEPRECATED. Please set the couchbase.sink.handler property instead.
    • Type: string

    • Default: DOCUMENT

    • Valid Values: One of [DOCUMENT, SUBDOCUMENT, N1QL]

    • Importance: medium

    couchbase.document.id

    Format string to use for the Couchbase document ID (overriding the message key). May refer to document fields via placeholders like ${/path/to/field}

    • Type: string

    • Default: ""

    • Importance: medium

    couchbase.remove.document.id

    Whether to remove the ID identified by 'couchbase.documentId' from the document before storing in Couchbase.

    • Type: boolean

    • Default: false

    • Importance: medium

    couchbase.document.expiration

    Document expiration time specified as an integer followed by a time unit (s = seconds, m = minutes, h = hours, d = days). For example, to have documents expire after 30 minutes, set this value to "30m".

    A value of "0" (the default) means documents never expire.

    • Type: string

    • Default: 0

    • Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m

    • Importance: medium

    couchbase.retry.timeout

    Retry failed writes to Couchbase until this deadline is reached. If time runs out, the connector terminates.

    A value of 0 (the default) means the connector will terminate immediately when a write fails.

    This retry timeout is distinct from the KV timeout (which you can set via couchbase.env.*). The KV timeout affects an individual write attempt, while the retry timeout spans multiple attempts and makes the connector resilient to more kinds of transient failures.
    Try not to confuse this with the Kafka Connect framework’s built-in errors.retry.timeout config property, which applies only to failures occurring before the framework delivers the record to the Couchbase connector.

    UNCOMMITTED; this feature may change in a patch release without notice.

    • Since: 4.1.4

    • Type: string

    • Default: 0

    • Valid Values: An integer followed by a time unit (ms = milliseconds, s = seconds, m = minutes, h = hours, d = days). For example, to specify 30 minutes: 30m

    • Importance: medium

    Durability

    couchbase.durability

    The preferred way to specify an enhanced durability requirement when using Couchbase Server 6.5 or later.

    The default value of NONE means a write is considered successful as soon as it reaches the memory of the active node.

    If you set this to anything other than NONE, then you must not set couchbase.persist.to or couchbase.replicate.to.
    • Type: string

    • Default: NONE

    • Valid Values: One of [NONE, MAJORITY, MAJORITY_AND_PERSIST_TO_ACTIVE, PERSIST_TO_MAJORITY]

    • Importance: medium

    couchbase.persist.to

    For Couchbase Server versions prior to 6.5, this is how you require the connector to verify a write is persisted to disk on a certain number of replicas before considering the write successful.

    If you’re using Couchbase Server 6.5 or later, we recommend using the couchbase.durability property instead.

    • Type: string

    • Default: NONE

    • Valid Values: One of [NONE, ACTIVE, ONE, TWO, THREE, FOUR]

    • Importance: medium

    couchbase.replicate.to

    For Couchbase Server versions prior to 6.5, this is how you require the connector to verify a write has reached the memory of a certain number of replicas before considering the write successful.

    If you’re using Couchbase Server 6.5 or later, we recommend using the couchbase.durability property instead.

    • Type: string

    • Default: NONE

    • Valid Values: One of [NONE, ONE, TWO, THREE]

    • Importance: medium

    N1ql Sink Handler

    couchbase.n1ql.operation

    The type of update to use when couchbase.sink.handler is set to com.couchbase.connect.kafka.handler.sink.N1qlSinkHandler.

    This property is specific to N1qlSinkHandler.

    • Type: string

    • Default: UPDATE

    • Valid Values: One of [UPDATE, UPDATE_WHERE]

    • Importance: medium

    couchbase.n1ql.where.fields

    When using the UPDATE_WHERE operation, this is the list of document fields that must match the Kafka message in order for the document to be updated with the remaining message fields. To match against a literal value instead of a message field, use a colon to delimit the document field name and the target value. For example, "type:widget,color" matches documents whose 'type' field is 'widget' and whose 'color' field matches the 'color' field of the Kafka message.

    This property is specific to N1qlSinkHandler.

    • Type: list

    • Default: ""

    • Importance: medium

    couchbase.n1ql.create.document

    Controls whether to create the document if it does not exist.

    This property is specific to N1qlSinkHandler.

    • Type: boolean

    • Default: true

    • Importance: medium

    Sub Document Sink Handler

    couchbase.subdocument.path

    JSON Pointer to the property of the Kafka message whose value is the subdocument path to use when modifying the Couchbase document.

    This property is specific to SubDocumentSinkHandler.

    • Type: string

    • Default: ""

    • Importance: medium

    couchbase.subdocument.operation

    Setting to indicate the type of update to a sub-document.

    This property is specific to SubDocumentSinkHandler.

    • Type: string

    • Default: UPSERT

    • Valid Values: One of [UPSERT, ARRAY_PREPEND, ARRAY_APPEND]

    • Importance: medium

    couchbase.subdocument.create.path

    Whether to add the parent paths if they are missing in the document.

    This property is specific to SubDocumentSinkHandler.

    • Type: boolean

    • Default: true

    • Importance: medium

    couchbase.subdocument.create.document

    This property controls whether to create the document if it does not exist.

    This property is specific to SubDocumentSinkHandler.

    • Type: boolean

    • Default: true

    • Importance: medium

    Couchbase Java SDK Settings

    couchbase.env.*

    Any system property recognized by the Couchbase Java SDK may be specified in the Sink connector config if you omit the com. prefix from the system property name.

    For example, the Couchbase Java SDK recognizes the system property com.couchbase.env.timeout.kvTimeout. To specify this setting in the connector config, use the property name couchbase.env.timeout.kvTimeout.

    If you’re thinking about increasing the KV timeout to handle transient error conditions, consider using the connector’s couchbase.retry.timeout config property instead. The retry timeout is a more robust way to handle all kinds of write failures.

    For a list of recognized properties, see Java SDK Client Settings.

    UNCOMMITTED; this feature may change in a patch release without notice.

    • Since: 4.0.6

    Parent topic: Kafka Connector