Choosing an API

    +
    The Couchbase Python SDK offers both asyncio and Twisted APIs for async operation.

    Asyncio

    To use the Python SDK with asyncio, use the acouchbase module. As opposed to the synchronous SDK methods which wait for completion and return Result objects, the acouchbase methods return a Future. You may await the success or failure of the Future. If a Future is awaited, the method awaiting the task must have the async keyword in its signature. Result objects are obtained after await`ing the `Future as seen in the examples below.

    Create an acouchbase Cluster
    # needed for cluster creation
    from acouchbase.cluster import Cluster
    from couchbase.cluster import ClusterOptions
    from couchbase.auth import PasswordAuthenticator
    
    
    async def get_couchbase():
        cluster = Cluster(
            "couchbase://localhost",
            ClusterOptions(PasswordAuthenticator("Administrator", "password")))
        bucket = cluster.bucket("travel-sample")
        await bucket.on_connect()
    
        return cluster, bucket

    Asyncio Document (KV) Operations

    Key-value and sub-document operations return Future objects which can then be used for await clauses. The Future’s result will always be the relevant Result object for the operation performed.

    KV - Operations
    async def kv_operations(collection):
        key = "hotel_10025"
        res = await collection.get(key)
        hotel = res.content_as[dict]
        print("Hotel: {}".format(hotel))
    
        hotel["alias"] = "Hostel"
        res = await collection.upsert(key, hotel)
        print("CAS: {}".format(res.cas))
    
        # handle exception
        try:
            res = await collection.get("not-a-key")
        except DocumentNotFoundException as ex:
            print("Document not found exception caught!")
    Sub-document Operations
    async def sub_doc_operations(collection):
        key = "hotel_10025"
        res = await collection.lookup_in(key,
                                         [SD.get("reviews[0].ratings")])
    
        print("Review ratings: {}".format(res.content_as[dict](0)))
    
        res = await collection.mutate_in(key,
                                         [SD.replace("reviews[0].ratings.Rooms", 3.5)])
        print("CAS: {}".format(res.cas))

    Asyncio N1QL Operations

    The API for issuing N1QL queries is almost identical to the synchronous API. The notable difference is the use of async for rather than for when iterating over the results.

    N1QL Query
    async def n1ql_query(cluster):
        try:
            result = cluster.query(
                "SELECT h.* FROM `travel-sample`.inventory.hotel h WHERE h.country=$country LIMIT 10",
                QueryOptions(named_parameters={"country": "United Kingdom"}))
    
            async for row in result:
                print("Found row: {}".format(row))
        except ParsingFailedException as ex:
            print(ex)
        except CouchbaseException as ex:
            print(ex)

    Asyncio Search Operations

    The API for issuing full text search queries is almost identical to the synchronous API. The notable difference is the use of async for rather than for when iterating over the results.

    Search Query
    async def search_query(cluster):
        try:
            result = cluster.search_query(
                "travel-sample-index", search.QueryStringQuery("swanky"))
    
            async for row in result:
                print("Found row: {}".format(row))
            print("Reported total rows: {}".format(
                result.metadata().metrics.total_rows))
    
        except CouchbaseException as ex:
            print(ex)

    Asyncio Analytics Operations

    The API for issuing analytics queries is almost identical to the synchronous API. The notable difference is the use of async for rather than for when iterating over the results.

    Analytics Query
    async def analytics_query(cluster):
        try:
            result = cluster.analytics_query(
                "SELECT a.* FROM `travel-sample`.inventory.airports a WHERE a.country = $country LIMIT 10",
                AnalyticsOptions(named_parameters={"country": "France"}))
            async for row in result:
                print("Found row: {}".format(row))
        except CouchbaseException as ex:
            print(ex)

    Twisted

    To use the Python SDK with the "Twisted" framework, use the txcouchbase module. As opposed to the synchronous SDK methods which wait for completion and return Result objects, the txcouchbase methods return a Twisted Deferred. You may configure Deferred with callback and errback handlers. Result objects are propagated to the callback as seen in the examples below.

    Create a txcouchbase Cluster
    # create a cluster object
    cluster = TxCluster(
        connection_string='couchbase://localhost',
        authenticator=PasswordAuthenticator(
            'Administrator',
            'password'))
    
    # create a bucket object
    bucket = cluster.bucket('travel-sample')

    Twisted Document (KV) Operations

    Key-value and sub-document operations return Deferred objects which can then be configured with callback and errback handlers. The Deferred’s result will always be the relevant Result object for the operation performed.

    KV - Operations
    def on_kv_ok(result):
        if isinstance(result, GetResult):
            print("Document: {}".format(result.content_as[dict]))
        else:
            print("CAS: {}".format(result.cas))
    
    
    def on_kv_error(error):
        print("Operation had an error.\nError: {}".format(error))
    
    
    def kv_operations(collection):
        key = "hotel_10025"
        collection.get(key).addCallback(on_kv_ok).addErrback(on_kv_error)
    
        new_hotel = {
            "title": "Couchbase",
            "name": "The Couchbase Hotel",
            "address": "Pennington Street",
            "directions": None,
            "phone": "+44 1457 855449",
        }
    
        collection.upsert("hotel_98765", new_hotel).addCallback(
            on_kv_ok).addErrback(on_kv_error)
    
        collection.get("not-a-key").addCallback(on_kv_ok).addErrback(on_kv_error)
    Sub-document Operations
    def on_sd_ok(result, idx=0):
        if isinstance(result, LookupInResult):
            print("Sub-document: {}".format(result.content_as[dict](idx)))
        else:
            print("CAS: {}".format(result.cas))
    
    
    def on_sd_error(error):
        print("Operation had an error.\nError: {}".format(error))
    
    
    def sub_doc_operations(collection):
        key = "hotel_10025"
    
        collection.lookup_in(key, [SD.get("reviews[0].ratings")]).addCallback(
            on_sd_ok).addErrback(on_sd_error)
    
        collection.mutate_in(key, [SD.replace("reviews[0].ratings.Rooms", 3.5)]).addCallback(
            on_sd_ok).addErrback(on_sd_error)

    Twisted N1QL Operations

    The API for issuing N1QL queries is almost identical to the synchronous API. The notable difference is the use of a callback in order to handle iterating over results.

    N1QL Query
    def handle_n1ql_results(result):
        for r in result.rows():
            print("Query row: {}".format(r))
    
    
    def n1ql_query(cluster):
        cluster.query(
            "SELECT h.* FROM `travel-sample`.inventory.hotel h WHERE h.country=$country LIMIT 2",
            QueryOptions(named_parameters={"country": "United Kingdom"})).addCallback(handle_n1ql_results)

    Twisted Search Operations

    The API for issuing full text search queries is almost identical to the synchronous API. The notable difference is the use of a callback in order to handle iterating over results.

    Search Query
    def handle_search_results(result):
        for r in result.rows():
            print("Search row: {}".format(r))
    
    
    def search_query(cluster):
        cluster.search_query(
            "travel-sample-index", search.QueryStringQuery("swanky")).addCallback(handle_search_results)

    Twisted Analytics Operations

    The API for issuing analytics queries is almost identical to the synchronous API. The notable difference is the use of a callback in order to handle iterating over results.

    Analytics Query
    def handle_analytics_results(result):
        for r in result.rows():
            print("Analytics row: {}".format(r))
    
    
    def analytics_query(cluster):
        cluster.analytics_query(
            "SELECT id, country FROM `travel-sample`.inventory.airports a WHERE a.country = $country LIMIT 10",
            AnalyticsOptions(named_parameters={"country": "France"})).addCallback(handle_analytics_results)