Choosing an API
The Couchbase Python SDK offers both asyncio and Twisted APIs for async operation.
Asyncio
To use the Python SDK with asyncio, use the acouchbase module.
As opposed to the synchronous SDK methods which wait for completion and return Result
objects, the acouchbase methods return a Future
.
You may await
the success or failure of the Future
.
If a Future
is awaited, the method awaiting the task must have the async
keyword in its signature.
Result
objects are obtained after await`ing the `Future
as seen in the examples below.
# needed for cluster creation
from acouchbase.cluster import Cluster
from couchbase.cluster import ClusterOptions
from couchbase.auth import PasswordAuthenticator
async def get_couchbase():
cluster = Cluster(
"couchbase://localhost",
ClusterOptions(PasswordAuthenticator("Administrator", "password")))
bucket = cluster.bucket("travel-sample")
await bucket.on_connect()
return cluster, bucket
Asyncio Document (KV) Operations
Key-value and sub-document operations return Future
objects which can then be used for await
clauses.
The Future
’s result will always be the relevant Result
object for the operation performed.
async def kv_operations(collection):
key = "hotel_10025"
res = await collection.get(key)
hotel = res.content_as[dict]
print("Hotel: {}".format(hotel))
hotel["alias"] = "Hostel"
res = await collection.upsert(key, hotel)
print("CAS: {}".format(res.cas))
# handle exception
try:
res = await collection.get("not-a-key")
except DocumentNotFoundException as ex:
print("Document not found exception caught!")
async def sub_doc_operations(collection):
key = "hotel_10025"
res = await collection.lookup_in(key,
[SD.get("reviews[0].ratings")])
print("Review ratings: {}".format(res.content_as[dict](0)))
res = await collection.mutate_in(key,
[SD.replace("reviews[0].ratings.Rooms", 3.5)])
print("CAS: {}".format(res.cas))
Asyncio N1QL Operations
The API for issuing N1QL queries is almost identical to the synchronous API.
The notable difference is the use of async for
rather than for
when iterating over the results.
async def n1ql_query(cluster):
try:
result = cluster.query(
"SELECT h.* FROM `travel-sample`.inventory.hotel h WHERE h.country=$country LIMIT 10",
QueryOptions(named_parameters={"country": "United Kingdom"}))
async for row in result:
print("Found row: {}".format(row))
except ParsingFailedException as ex:
print(ex)
except CouchbaseException as ex:
print(ex)
Asyncio Search Operations
The API for issuing full text search queries is almost identical to the synchronous API.
The notable difference is the use of async for
rather than for
when iterating over the results.
async def search_query(cluster):
try:
result = cluster.search_query(
"travel-sample-index", search.QueryStringQuery("swanky"))
async for row in result:
print("Found row: {}".format(row))
print("Reported total rows: {}".format(
result.metadata().metrics.total_rows))
except CouchbaseException as ex:
print(ex)
Asyncio Analytics Operations
The API for issuing analytics queries is almost identical to the synchronous API.
The notable difference is the use of async for
rather than for
when iterating over the results.
async def analytics_query(cluster):
try:
result = cluster.analytics_query(
"SELECT a.* FROM `travel-sample`.inventory.airports a WHERE a.country = $country LIMIT 10",
AnalyticsOptions(named_parameters={"country": "France"}))
async for row in result:
print("Found row: {}".format(row))
except CouchbaseException as ex:
print(ex)
Twisted
To use the Python SDK with the "Twisted" framework, use the txcouchbase module.
As opposed to the synchronous SDK methods which wait for completion and return Result
objects, the txcouchbase methods return a Twisted Deferred
.
You may configure Deferred
with callback and errback handlers.
Result
objects are propagated to the callback as seen in the examples below.
# create a cluster object
cluster = TxCluster(
connection_string='couchbase://localhost',
authenticator=PasswordAuthenticator(
'Administrator',
'password'))
# create a bucket object
bucket = cluster.bucket('travel-sample')
Twisted Document (KV) Operations
Key-value and sub-document operations return Deferred
objects which can then be configured with callback and errback handlers.
The Deferred
’s result will always be the relevant Result
object for the operation performed.
def on_kv_ok(result):
if isinstance(result, GetResult):
print("Document: {}".format(result.content_as[dict]))
else:
print("CAS: {}".format(result.cas))
def on_kv_error(error):
print("Operation had an error.\nError: {}".format(error))
def kv_operations(collection):
key = "hotel_10025"
collection.get(key).addCallback(on_kv_ok).addErrback(on_kv_error)
new_hotel = {
"title": "Couchbase",
"name": "The Couchbase Hotel",
"address": "Pennington Street",
"directions": None,
"phone": "+44 1457 855449",
}
collection.upsert("hotel_98765", new_hotel).addCallback(
on_kv_ok).addErrback(on_kv_error)
collection.get("not-a-key").addCallback(on_kv_ok).addErrback(on_kv_error)
def on_sd_ok(result, idx=0):
if isinstance(result, LookupInResult):
print("Sub-document: {}".format(result.content_as[dict](idx)))
else:
print("CAS: {}".format(result.cas))
def on_sd_error(error):
print("Operation had an error.\nError: {}".format(error))
def sub_doc_operations(collection):
key = "hotel_10025"
collection.lookup_in(key, [SD.get("reviews[0].ratings")]).addCallback(
on_sd_ok).addErrback(on_sd_error)
collection.mutate_in(key, [SD.replace("reviews[0].ratings.Rooms", 3.5)]).addCallback(
on_sd_ok).addErrback(on_sd_error)
Twisted N1QL Operations
The API for issuing N1QL queries is almost identical to the synchronous API. The notable difference is the use of a callback in order to handle iterating over results.
def handle_n1ql_results(result):
for r in result.rows():
print("Query row: {}".format(r))
def n1ql_query(cluster):
cluster.query(
"SELECT h.* FROM `travel-sample`.inventory.hotel h WHERE h.country=$country LIMIT 2",
QueryOptions(named_parameters={"country": "United Kingdom"})).addCallback(handle_n1ql_results)
Twisted Search Operations
The API for issuing full text search queries is almost identical to the synchronous API. The notable difference is the use of a callback in order to handle iterating over results.
def handle_search_results(result):
for r in result.rows():
print("Search row: {}".format(r))
def search_query(cluster):
cluster.search_query(
"travel-sample-index", search.QueryStringQuery("swanky")).addCallback(handle_search_results)
Twisted Analytics Operations
The API for issuing analytics queries is almost identical to the synchronous API. The notable difference is the use of a callback in order to handle iterating over results.
def handle_analytics_results(result):
for r in result.rows():
print("Analytics row: {}".format(r))
def analytics_query(cluster):
cluster.analytics_query(
"SELECT id, country FROM `travel-sample`.inventory.airports a WHERE a.country = $country LIMIT 10",
AnalyticsOptions(named_parameters={"country": "France"})).addCallback(handle_analytics_results)