A newer version of this documentation is available.

View Latest

Distributed Transactions from the Java SDK

    +
    A practical guide to using Couchbase’s distributed ACID transactions, via the Java API.

    Unresolved include directive in modules/howtos/pages/distributed-acid-transactions-from-the-sdk.adoc - include::6.5@sdk:shared:partial$acid-transactions.adoc[]

    Below we show you how to create Transactions, step-by-step. You may also want to start with our transactions examples repository, which features useful downloadable examples of using Distributed Transactions.

    Javadocs are available online.

    Requirements

    • Couchbase Server 6.6 or above.

    • Couchbase Java client 3.0.7 or above. It is recommended to follow the transitive dependency for the transactions library from maven. Unresolved include directive in modules/howtos/pages/distributed-acid-transactions-from-the-sdk.adoc - include::6.5@sdk:shared:partial$acid-transactions.adoc[]

    Getting Started

    Couchbase transactions require no additional components or services to be configured. Simply add the transactions library into your project. The latest version, as of November 2020, is 1.1.3.

    With Gradle this can be accomplished by modifying these sections of your build.gradle file like so:

    dependencies {
        compile group: 'com.couchbase.client', name: 'couchbase-transactions', version: '1.1.3'
    }

    Use the same group, artifact, and version in your Maven-compatible tool of choice.

    A complete simple gradle project is available on our transactions examples repository.

    Initializing Transactions

    Here are all imports used by the following examples:

    import com.couchbase.client.core.cnc.Event;
    import com.couchbase.client.java.Bucket;
    import com.couchbase.client.java.Cluster;
    import com.couchbase.client.java.Collection;
    import com.couchbase.client.java.ReactiveCollection;
    import com.couchbase.client.java.json.JsonObject;
    import com.couchbase.client.java.kv.GetResult;
    import com.couchbase.client.java.query.QueryOptions;
    import com.couchbase.client.java.query.QueryProfile;
    import com.couchbase.client.java.query.QueryResult;
    import com.couchbase.transactions.TransactionDurabilityLevel;
    import com.couchbase.transactions.TransactionGetResult;
    import com.couchbase.transactions.TransactionQueryOptions;
    import com.couchbase.transactions.TransactionResult;
    import com.couchbase.transactions.Transactions;
    import com.couchbase.transactions.config.TransactionConfigBuilder;
    import com.couchbase.transactions.deferred.TransactionSerializedContext;
    import com.couchbase.transactions.error.TransactionCommitAmbiguous;
    import com.couchbase.transactions.error.TransactionFailed;
    import com.couchbase.transactions.log.IllegalDocumentState;
    import com.couchbase.transactions.log.LogDefer;
    import com.couchbase.transactions.log.TransactionCleanupAttempt;
    import com.couchbase.transactions.log.TransactionCleanupEndRunEvent;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Optional;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.logging.Logger;
    
    import static com.couchbase.client.java.query.QueryOptions.queryOptions;

    The starting point is the Transactions object. It is very important that the application ensures that only one of these is created, as it performs automated background processes that should not be duplicated.

    // Initialize the Couchbase cluster
    Cluster cluster = Cluster.connect("localhost", "transactor", "mypass");
    Bucket bucket = cluster.bucket("transact");
    Collection collection = bucket.defaultCollection();
    
    // Create the single Transactions object
    Transactions transactions = Transactions.create(cluster);

    Configuration

    Transactions can optionally be configured at the point of creating the Transactions object:

    Transactions transactions = Transactions.create(cluster,
            TransactionConfigBuilder.create()
                    .durabilityLevel(TransactionDurabilityLevel.PERSIST_TO_MAJORITY)
                    .logOnFailure(true, Event.Severity.WARN)
                .build());

    Unresolved include directive in modules/howtos/pages/distributed-acid-transactions-from-the-sdk.adoc - include::6.5@sdk:shared:partial$acid-transactions.adoc[]

    Unresolved include directive in modules/howtos/pages/distributed-acid-transactions-from-the-sdk.adoc - include::6.5@sdk:shared:partial$acid-transactions.adoc[]

    As with the Couchbase Java Client, you can use the library in either synchronous mode (the exceptions will be explained later in [Error Handling]):

    try {
        transactions.run((ctx) -> {
            // 'ctx' is an AttemptContext, which permits getting, inserting,
            // removing and replacing documents, along with committing and
            // rolling back the transaction.
    
            // ... Your transaction logic here ...
    
            // This call is optional - if you leave it off, the transaction
            // will be committed anyway.
            ctx.commit();
        });
    } catch (TransactionCommitAmbiguous e) {
        // The application will of course want to use its own logging rather
        // than System.err
        System.err.println("Transaction possibly committed");
    
        for (LogDefer err : e.result().log().logs()) {
            System.err.println(err.toString());
        }
    } catch (TransactionFailed e) {
        System.err.println("Transaction did not reach commit point");
    
        for (LogDefer err : e.result().log().logs()) {
            System.err.println(err.toString());
        }
    }

    or asynchronous modes, using the Project Reactor reactive library:

    Mono<TransactionResult> result = transactions.reactive().run((ctx) -> {
        // 'ctx' is an AttemptContextReactive, providing asynchronous versions of the AttemptContext methods
    
        return
    
                // Your transaction logic here: as an example, get and remove a doc
                ctx.get(collection.reactive(), "document-id")
                        .flatMap(doc -> ctx.remove(doc))
    
                        // The commit call is optional - if you leave it off,
                        // the transaction will be committed anyway.
                        .then(ctx.commit());
    }).doOnError(err -> {
        if (err instanceof TransactionCommitAmbiguous) {
            System.err.println("Transaction possibly committed: ");
        }
        else {
            System.err.println("Transaction failed: ");
        }
    
        for (LogDefer e : ((TransactionFailed) err).result().log().logs()) {
            // System.err is used for example, log failures to your own logging system
            System.err.println(err.toString());
        }
    });
    
    
    // Normally you will chain this result further and ultimately subscribe.  For simplicity, here we just block
    // on the result.
    TransactionResult finalResult = result.block();

    The synchronous mode is the easiest to write and understand. The asynchronous API allows you to build your application in a reactive style, without large thread pools, which can help you scale with excellent efficiency. Those new to reactive programming may want to check out the Project Reactor site for more details on this powerful paradigm.

    Some AttemptContextReactive methods, notably remove, return Mono<Void>. Be careful to use then rather than flatMap or similar on these, as Mono<Void> will only trigger a completion event, and not the next event, so many methods including flatMap will not work as expected.

    Examples

    A code example is worth a thousand words, so here is a quick summary of the main transaction operations. They are described in more detail below.

    With the synchronous API
    try {
        TransactionResult result = transactions.run((ctx) -> {
            // Inserting a doc:
            ctx.insert(collection, "doc-a", JsonObject.create());
    
            // Getting documents:
            // Use ctx.getOptional if the document may or may not exist
            Optional<TransactionGetResult> docOpt =
                    ctx.getOptional(collection, "doc-a");
    
            // Use ctx.get if the document should exist, and the transaction
            // will fail if it does not
            TransactionGetResult docA = ctx.get(collection, "doc-a");
    
            // Replacing a doc:
            TransactionGetResult docB = ctx.get(collection, "doc-b");
            JsonObject content = docB.contentAs(JsonObject.class);
            content.put("transactions", "are awesome");
            ctx.replace(docB, content);
    
            // Removing a doc:
            TransactionGetResult docC = ctx.get(collection, "doc-c");
            ctx.remove(docC);
    
            ctx.commit();
        });
    } catch (TransactionCommitAmbiguous e) {
        System.err.println("Transaction possibly committed");
    
        for (LogDefer err : e.result().log().logs()) {
            System.err.println(err.toString());
        }
    } catch (TransactionFailed e) {
        System.err.println("Transaction did not reach commit point");
    
        for (LogDefer err : e.result().log().logs()) {
            System.err.println(err.toString());
        }
    }
    With the asynchronous API
    Mono<TransactionResult> result = transactions.reactive().run((ctx) -> {
        return
                // Inserting a doc:
                ctx.insert(collection.reactive(), "doc-a", JsonObject.create())
    
                        // Getting and replacing a doc:
                        .then(ctx.get(collection.reactive(), "doc-b"))
                        .flatMap(docB -> {
                            JsonObject content = docB.contentAs(JsonObject.class);
                            content.put("transactions", "are awesome");
                            return ctx.replace(docB, content);
                        })
    
                        // Getting and removing a doc:
                        .then(ctx.get(collection.reactive(), "doc-c"))
                        .flatMap(doc -> ctx.remove(doc))
    
                        // Committing:
                        .then(ctx.commit());
    
    }).doOnError(err -> {
        if (err instanceof TransactionCommitAmbiguous) {
            System.err.println("Transaction possibly committed: ");
        }
        else {
            System.err.println("Transaction failed: ");
        }
    
        for (LogDefer e : ((TransactionFailed) err).result().log().logs()) {
            // System.err is used for example, log failures to your own logging system
            System.err.println(err.toString());
        }
    });
    
    // Normally you will chain this result further and ultimately subscribe.
    // For simplicity, here we just block on the result.
    result.block();

    Unresolved include directive in modules/howtos/pages/distributed-acid-transactions-from-the-sdk.adoc - include::6.5@sdk:shared:partial$acid-transactions.adoc[]

    Mutating Documents

    Replacing

    Replacing a document requires a ctx.get() call first. This is necessary to ensure that the document is not involved in another transaction. (If it is, then the transaction will handle this, generally by rolling back what has been done so far, and retrying the lambda.)

    With the synchronous API:
    transactions.run((ctx) -> {
        TransactionGetResult anotherDoc = ctx.get(collection, "anotherDoc");
        JsonObject content = anotherDoc.contentAs(JsonObject.class);
        content.put("transactions", "are awesome");
        ctx.replace(anotherDoc, content);
    });
    Asynchronous API:
    transactions.reactive().run((ctx) -> {
        return ctx.get(collection.reactive(), "anotherDoc")
                .flatMap(doc -> {
                    JsonObject content = doc.contentAs(JsonObject.class);
                    content.put("transactions", "are awesome");
                    return ctx.replace(doc, content);
                })
                .then(ctx.commit());
    });

    Removing

    As with replaces, removing a document requires a ctx.get() call first.

    With the asynchronous API:
    transactions.reactive().run((ctx) -> {
        return ctx.get(collection.reactive(), "anotherDoc")
                .flatMap(doc -> ctx.remove(doc));
    });
    Synchronous API:
    transactions.run((ctx) -> {
        TransactionGetResult anotherDoc = ctx.get(collection, "anotherDoc");
        ctx.remove(anotherDoc);
    });

    Inserting

    With the asynchronous API:
    transactions.reactive().run((ctx) -> {
        return ctx.insert(collection.reactive(), "docId", JsonObject.create()).then();
    }).block();
    With the synchronous API:
    transactions.run((ctx) -> {
        String docId = "docId";
    
        ctx.insert(collection, docId, JsonObject.create());
    
    });

    Getting Documents

    There are two ways to get a document, get and getOptional:

    transactions.run((ctx) -> {
        String docId = "a-doc";
    
        Optional<TransactionGetResult> docOpt = ctx.getOptional(collection, docId);
        TransactionGetResult doc = ctx.get(collection, docId);
    });

    get will cause the transaction to fail with TransactionFailed (after rolling back any changes, of course). It is provided as a convenience method so the developer does not have to check the Optional if the document must exist for the transaction to succeed.

    Gets will 'read your own writes', e.g. this will succeed:

    transactions.run((ctx) -> {
        String docId = "docId";
    
        ctx.insert(collection, docId, JsonObject.create());
    
        Optional<TransactionGetResult> doc = ctx.getOptional(collection, docId);
    
        assert (doc.isPresent());
    });

    N1QL Queries

    With Couchbase Server 7.0 beta and the 1.1.3 release of this library, comes support for using N1QL queries inside the transaction lambda.

    Support for queries in 1.1.3 is at beta level and is marked as @Stability.Volatile, which means the API may need to change as we gather feedback during the Couchbase Server 7.0 beta period. The feature should not be used in any production deployment while it is at beta level. Please take this opportunity to provide feedback on this feature, via either support or our forums.

    If you have used N1QL queries from the regular Java SDK, then you already know how to use queries inside the lambda, as the syntax is almost identical:

    transactions.run((ctx) -> {
        ctx.query("INSERT INTO `default` VALUES ('doc', {'hello':'world'})");
    
        String st = "SELECT `default`.* FROM `default` WHERE META().id = 'doc'";
        QueryResult qr = ctx.query(st);
        qr.rowsAsObject().forEach(row -> {
            System.out.println(row);
        });
    });

    Note that we are performing queries on the ctx object. This is very important: queries performed on a Cluster or Scope object inside the lambda will just act like a normal query and will not be part of the transaction.

    Key-Value operations and queries can be freely intermixed, and will 'see' each other as you would expect:

    transactions.run((ctx) -> {
        ctx.insert(collection, "doc", JsonObject.create().put("hello", "world"));
    
        String st = "SELECT `default`.* FROM `default` WHERE META().id = 'doc'";
        QueryResult qr = ctx.query(st);
        qr.rowsAsObject().forEach(row -> {
            System.out.println(row);
        });
    });

    For performance reasons it’s recommended to put Key-Value operations before the first query in the lambda if possible, as after that point they incur a small translation cost.

    Query options can be provided via TransactionsQueryOptions, which provides a subset of the options in the Java SDK’s QueryOptions:

    transactions.run((ctx) -> {
        ctx.query("INSERT INTO `default` VALUES ('doc', {'hello':'world'})",
                TransactionQueryOptions.queryOptions().profile(QueryProfile.TIMINGS));
    });

    Committing

    Committing is automatic: if there is no explicit call to ctx.commit() at the end of the transaction logic callback, and no exception is thrown, it will be committed.

    With the asynchronous API, if you leave off the explicit call to commit then you may need to call .then() on the result of the chain to convert it to the required Mono<Void> return type:

    Mono<TransactionResult> result = transactions.reactive().run((ctx) -> {
        return ctx.get(collection.reactive(), "anotherDoc")
                .flatMap(doc -> {
                    JsonObject content = doc.contentAs(JsonObject.class);
                    content.put("transactions", "are awesome");
                    return ctx.replace(doc, content);
                })
                .then();
    });

    As described above, as soon as the transaction is committed, all its changes will be atomically visible to reads from other transactions. The changes will also be committed (or "unstaged") so they are visible to non-transactional actors, in an eventually consistent fashion.

    Commit is final: after the transaction is committed, it cannot be rolled back, and no further operations are allowed on it.

    An asynchronous cleanup process ensures that once the transaction reaches the commit point, it will be fully committed - even if the application crashes.

    Unresolved include directive in modules/howtos/pages/distributed-acid-transactions-from-the-sdk.adoc - include::6.5@sdk:shared:partial$acid-transactions.adoc[]

    A complete version of this example is available on our GitHub transactions examples page.

    public void playerHitsMonster(int damage, String playerId, String monsterId) {
        Transactions transactions = getTransactions();
    
        try {
            transactions.run((ctx) -> {
                TransactionGetResult monsterDoc = ctx.get(collection, monsterId);
                TransactionGetResult playerDoc = ctx.get(collection, playerId);
    
                int monsterHitpoints = monsterDoc.contentAs(JsonObject.class).getInt("hitpoints");
                int monsterNewHitpoints = monsterHitpoints - damage;
    
                if (monsterNewHitpoints <= 0) {
                    // Monster is killed.  The remove is just for demoing, and a more realistic example would set a
                    // "dead" flag or similar.
                    ctx.remove(monsterDoc);
    
                    // The player earns experience for killing the monster
                    int experienceForKillingMonster = monsterDoc.contentAs(JsonObject.class).getInt(
                            "experienceWhenKilled");
                    int playerExperience = playerDoc.contentAs(JsonObject.class).getInt("experience");
                    int playerNewExperience = playerExperience + experienceForKillingMonster;
                    int playerNewLevel = calculateLevelForExperience(playerNewExperience);
    
                    JsonObject playerContent = playerDoc.contentAs(JsonObject.class);
    
                    playerContent.put("experience", playerNewExperience);
                    playerContent.put("level", playerNewLevel);
    
                    ctx.replace(playerDoc, playerContent);
                } else {
                    // Monster is damaged but still alive
                    JsonObject monsterContent = monsterDoc.contentAs(JsonObject.class);
                    monsterContent.put("hitpoints", monsterNewHitpoints);
    
                    ctx.replace(monsterDoc, monsterContent);
                }
            });
        } catch (TransactionFailed e) {
            // The operation failed.   Both the monster and the player will be untouched.
    
            // Situations that can cause this would include either the monster
            // or player not existing (as get is used), or a persistent
            // failure to be able to commit the transaction, for example on
            // prolonged node failure.
        }
    }

    Unresolved include directive in modules/howtos/pages/distributed-acid-transactions-from-the-sdk.adoc - include::6.5@sdk:shared:partial$acid-transactions.adoc[]

    cluster.environment().eventBus().subscribe(event -> {
        if (event instanceof IllegalDocumentState) {
            // log this event for review
        }
    });

    These events will be raised in the event of a non-transactional write being detected and overridden. The event contains the key of the document involved, to aid the application with debugging.

    Rollback

    If an exception is thrown, either by the application from the lambda, or by the transactions library, then that attempt is rolled back. The transaction logic may or may not be retried, depending on the exception.

    If the transaction is not retried then it will throw a TransactionFailed exception, and its getCause method can be used for more details on the failure.

    The application can use this to signal why it triggered a rollback, as so:

    class BalanceInsufficient extends RuntimeException {}
    
    try {
        transactions.run((ctx) -> {
            TransactionGetResult customer = ctx.get(collection, "customer-name");
    
            if (customer.contentAsObject().getInt("balance") < costOfItem) {
                throw new BalanceInsufficient();
            }
            // else continue transaction
        });
    } catch (TransactionCommitAmbiguous e) {
        // This exception can only be thrown at the commit point, after the
        // BalanceInsufficient logic has been passed, so there is no need to
        // check getCause here.
        System.err.println("Transaction possibly committed");
        for (LogDefer err : e.result().log().logs()) {
            System.err.println(err.toString());
        }
    } catch (TransactionFailed e) {
        if (e.getCause() instanceof BalanceInsufficient) {
            // Re-raise the error
            throw (RuntimeException) e.getCause();
        }
        else {
            System.err.println("Transaction did not reach commit point");
    
            for (LogDefer err : e.result().log().logs()) {
                System.err.println(err.toString());
            }
        }
    }

    The transaction can also be explicitly rolled back:

    transactions.run((ctx) -> {
        TransactionGetResult customer = ctx.get(collection, "customer-name");
    
        if (customer.contentAsObject().getInt("balance") < costOfItem) {
            ctx.rollback();
        }
        // else continue transaction
    });

    In this case, if ctx.rollback() is reached, then the transaction will be regarded as successfully rolled back and no TransactionFailed will be thrown.

    After a transaction is rolled back, it cannot be committed, no further operations are allowed on it, and the library will not try to automatically commit it at the end of the code block.

    Unresolved include directive in modules/howtos/pages/distributed-acid-transactions-from-the-sdk.adoc - include::6.5@sdk:shared:partial$acid-transactions.adoc[]

    Unresolved include directive in modules/howtos/pages/distributed-acid-transactions-from-the-sdk.adoc - include::6.5@sdk:shared:partial$acid-transactions.adoc[]

    Transactions transactions = Transactions.create(cluster, TransactionConfigBuilder.create()
            .expirationTime(Duration.ofSeconds(120))
            .build());

    Unresolved include directive in modules/howtos/pages/distributed-acid-transactions-from-the-sdk.adoc - include::6.5@sdk:shared:partial$acid-transactions.adoc[]

    Full Error Handling Example

    Pulling all of the above together, this is the suggested best practice for error handling:

    try {
        TransactionResult result = transactions.run((ctx) -> {
            // ... transactional code here ...
        });
    
        // The transaction definitely reached the commit point. Unstaging
        // the individual documents may or may not have completed
    
        if (result.unstagingComplete()) {
            // Operations with non-transactional actors will want
            // unstagingComplete() to be true.
            cluster.query(" ... N1QL ... ",
                    QueryOptions.queryOptions()
                            .consistentWith(result.mutationState()));
    
            String documentKey = "a document key involved in the transaction";
            GetResult getResult = collection.get(documentKey);
        }
        else {
            // This step is completely application-dependent.  It may
            // need to throw its own exception, if it is crucial that
            // result.unstagingComplete() is true at this point.
            // (Recall that the asynchronous cleanup process will
            // complete the unstaging later on).
        }
    }
    catch (TransactionCommitAmbiguous err) {
        // The transaction may or may not have reached commit point
        System.err.println("Transaction returned TransactionCommitAmbiguous and" +
                " may have succeeded, logs:");
    
        // Of course, the application will want to use its own logging rather
        // than System.err
        err.result().log().logs().forEach(log -> System.err.println(log.toString()));
    }
    catch (TransactionFailed err) {
        // The transaction definitely did not reach commit point
        System.err.println("Transaction failed with TransactionFailed, logs:");
        err.result().log().logs().forEach(log -> System.err.println(log.toString()));
    }

    Unresolved include directive in modules/howtos/pages/distributed-acid-transactions-from-the-sdk.adoc - include::6.5@sdk:shared:partial$acid-transactions.adoc[]

    Monitoring Cleanup

    If the application wishes to monitor cleanup it may subscribe to these events:

    cluster.environment().eventBus().subscribe(event -> {
        if (event instanceof TransactionCleanupAttempt
                || event instanceof TransactionCleanupEndRunEvent) {
            // log this event
        }
    });

    TransactionCleanupEndRunEvent is raised whenever a current 'run' is finished, and contains statistics from the run. (A run is typically around every 60 seconds, with default configuration.)

    A TransactionCleanupAttempt event is raised when an expired transaction was found by this process, and a cleanup attempt was made. It contains whether that attempt was successful, along with any logs relevant to the attempt.

    In addition, if cleanup fails to cleanup a transaction that is more than two hours past expiry, it will raise the TransactionCleanupAttempt event at WARN level (rather than the default DEBUG). With most default configurations of the event-bus (see Logging below), this will cause that event to be logged somewhere visible to the application. If there is not a good reason for the cleanup to be failed (such as a downed node that has not yet been failed-over), then the user is encouraged to report the issue.

    Logging

    To aid troubleshooting, each transaction maintains a list of log entries, which can be logged on failure like this:

    } catch (TransactionCommitAmbiguous e) {
        // The application will of course want to use its own logging rather
        // than System.err
        System.err.println("Transaction possibly committed");
    
        for (LogDefer err : e.result().log().logs()) {
            System.err.println(err.toString());
        }
    } catch (TransactionFailed e) {
        System.err.println("Transaction did not reach commit point");
    
        for (LogDefer err : e.result().log().logs()) {
            System.err.println(err.toString());
        }
    }

    or for the asynchronous API:

    }).doOnError(err -> {
        if (err instanceof TransactionCommitAmbiguous) {
            System.err.println("Transaction possibly committed: ");
        }
        else {
            System.err.println("Transaction failed: ");
        }
    
        for (LogDefer e : ((TransactionFailed) err).result().log().logs()) {
            // System.err is used for example, log failures to your own logging system
            System.err.println(err.toString());
        }
    });

    A failed transaction can involve dozens, even hundreds, of lines of logging, so the application may prefer to write failed transactions into a separate file.

    For convenience there is also a config option that will automatically write this programmatic log to the standard Couchbase Java logging configuration inherited from the SDK if a transaction fails. This will log all lines of any failed transactions, to WARN level:

    .logOnFailure(true, Event.Severity.WARN)

    By default the Couchbase Java logging event-bus is setup to look for and use SLF4J/logback, log4j1, and log4j2 on the classpath, and to fallback to java.util.Logging.

    Please see the Java SDK logging documentation for details.

    Most applications will have their own preferred Java logging solution in-place already. For those starting from scratch here is a complete example using the basic java.util.Logging:

    final Logger LOGGER = Logger.getLogger("transactions");
    
    try {
        TransactionResult result = transactions.run((ctx) -> {
            // ... transactional code here ...
        });
    }
    catch (TransactionCommitAmbiguous err) {
        // The transaction may or may not have reached commit point
        LOGGER.info("Transaction returned TransactionCommitAmbiguous and" +
                " may have succeeded, logs:");
        err.result().log().logs().forEach(log -> LOGGER.info(log.toString()));
    }
    catch (TransactionFailed err) {
        // The transaction definitely did not reach commit point
        LOGGER.info("Transaction failed with TransactionFailed, logs:");
        err.result().log().logs().forEach(log -> LOGGER.info(log.toString()));
    }

    Concurrent Operations with the Async API

    The async API allows operations to be performed concurrently inside a transaction, which can assist performance. There are two rules the application needs to follow:

    • The first mutation must be performed alone, in serial. This is because the first mutation also triggers the creation of metadata for the transaction.

    • All concurrent operations must be allowed to complete fully, so the transaction library can track which operations need to be rolled back in the event of failure. This means the application must 'swallow' the error, but record that an error occurred, and then at the end of the concurrent operations, if an error occurred, throw an error to cause the transaction to retry.

    These rules are demonstrated here:

    List<String> docIds = Arrays.asList("doc1", "doc2", "doc3", "doc4", "doc5");
    
    ReactiveCollection coll = collection.reactive();
    
    TransactionResult result = transactions.reactive((ctx) -> {
    
        // Tracks whether all operations were successful
        AtomicBoolean allOpsSucceeded = new AtomicBoolean(true);
    
        // The first mutation must be done in serial, as it also creates a metadata entry
        return ctx.get(coll, docIds.get(0))
                .flatMap(doc -> {
                    JsonObject content = doc.contentAsObject();
                    content.put("value", "updated");
                    return ctx.replace(doc, content);
                })
    
                // Do all other docs in parallel
                .thenMany(Flux.fromIterable(docIds.subList(1, docIds.size()))
                        .flatMap(docId -> ctx.get(coll, docId)
                                        .flatMap(doc -> {
                                            JsonObject content = doc.contentAsObject();
                                            content.put("value", "updated");
                                            return ctx.replace(doc, content);
                                        })
                                        .onErrorResume(err -> {
                                            allOpsSucceeded.set(false);
                                            // App should replace this with logging
                                            err.printStackTrace();
    
                                            // Allow other ops to finish
                                            return Mono.empty();
                                        }),
    
                                // Run these in parallel
                                docIds.size())
    
                // The commit or rollback must also be done in serial
                ).then(Mono.defer(() -> {
                    // Commit iff all ops succeeded
                    if (allOpsSucceeded.get()) {
                        return ctx.commit();
                    } else {
                        throw new RuntimeException("Retry the transaction");
                    }
                }));
    }).block();

    Custom Metadata Collections

    As described above, transactions create and use metadata documents, that are created in the bucket of the first mutated document in the transaction.

    With Couchbase Server 7.0 beta comes support for scopes and collections, providing a more granular way of organising documents.

    You can now, completely optionally, use a collection to store the metadata documents, using a new configuration parameter:

    Transactions transactions = Transactions.create(cluster,
            TransactionConfigBuilder.create()
                    .metadataCollection(metadataCollection));

    When specified:

    • Any transactions created from this Transactions object, will create and use metadata in that collection.

    • The asynchronous cleanup started by this Transactions object will be looking for expired transactions only in this collection.

    Most users will not need to use custom metadata collections, and can continue to use the existing 'out-of-the-box' behaviour. They are provided for these use-cases:

    • The metadata documents contain, for documents involved in each transaction, the document’s key and the name of the bucket, scope and collection it exists on. Some deployments may not want to potentially 'leak' this data across buckets, scopes and collections. In which case a metadata collection can be created, with appropriate RBAC permissions to control visibility.

    • If the user wants to remove the default collection entirely.

    Support for custom metadata collections in 1.1.3 is at beta level and is marked as @Stability.Volatile, which means the API may need to change as we gather feedback during the Couchbase Server 7.0 beta period. The feature should not be used in any production deployment while it is at beta level. Please take this opportunity to provide feedback on this feature, via either support or our forums.

    Deferred Commits

    The deferred commit feature is currently in alpha, and the API may change.

    Deferred commits allow a transaction to be paused just before the commit point. Optionally, everything required to finish the transaction can then be bundled up into a context that may be serialized into a String or byte array, and deserialized elsewhere (for example, in another process). The transaction can then be committed, or rolled back.

    The intention behind this feature is to allow multiple transactions, potentially spanning multiple databases, to be brought to just before the commit point, and then all committed together.

    Here’s an example of deferring the initial commit and serializing the transaction:

    try {
        TransactionResult result = transactions.run((ctx) -> {
            JsonObject initial = JsonObject.create().put("val", 1);
            ctx.insert(collection, "a-doc-id", initial);
    
            // Defer means don't do a commit right now.  `serialized` in the result will be present.
            ctx.defer();
        });
    
        // Available because ctx.defer() was called
        assert (result.serialized().isPresent());
    
        TransactionSerializedContext serialized = result.serialized().get();
    
        // This is going to store a serialized form of the transaction to pass around
        byte[] encoded = serialized.encodeAsBytes();
    
    } catch (TransactionFailed e) {
        // System.err is used for example, log failures to your own logging system
        System.err.println("Transaction did not reach commit point");
    
        for (LogDefer err : e.result().log().logs()) {
            System.err.println(err.toString());
        }
    }

    And then committing the transaction later:

    TransactionSerializedContext serialized = TransactionSerializedContext.createFrom(encoded);
    
    try {
        TransactionResult result = transactions.commit(serialized);
    
    } catch (TransactionFailed e) {
        // System.err is used for example, log failures to your own logging system
        System.err.println("Transaction did not reach commit point");
    
        for (LogDefer err : e.result().log().logs()) {
            System.err.println(err.toString());
        }
    }

    Alternatively the transaction can be rolled back:

    TransactionSerializedContext serialized = TransactionSerializedContext.createFrom(encoded);
    
    try {
        TransactionResult result = transactions.rollback(serialized);
    
    } catch (TransactionFailed e) {
        // System.err is used for example, log failures to your own logging system
        System.err.println("Transaction did not reach commit point");
    
        for (LogDefer err : e.result().log().logs()) {
            System.err.println(err.toString());
        }
    }

    The transaction expiry timer (which is configurable) will begin ticking once the transaction starts, and is not paused while the transaction is in a deferred state.

    Unresolved include directive in modules/howtos/pages/distributed-acid-transactions-from-the-sdk.adoc - include::6.5@sdk:shared:partial$acid-transactions.adoc[]