The driver exposes an asynchronous API that allows you to write programs in a fully-non blocking manner. Asynchronous methods return instances of Guava's ListenableFuture, that can be conveniently chained and composed.
Here is a short example that opens a session and runs a query asynchronously:
import com.google.common.util.concurrent.*;
ListenableFuture<Session> session = cluster.connectAsync();
// Use transform with an AsyncFunction to chain an async operation after another:
ListenableFuture<ResultSet> resultSet = Futures.transform(session,
new AsyncFunction<Session, ResultSet>() {
public ListenableFuture<ResultSet> apply(Session session) throws Exception {
return session.executeAsync("select release_version from system.local");
}
});
// Use transform with a simple Function to apply a synchronous computation on the result:
ListenableFuture<String> version = Futures.transform(resultSet,
new Function<ResultSet, String>() {
public String apply(ResultSet rs) {
return rs.one().getString("release_version");
}
});
// Use a callback to perform an action once the future is complete:
Futures.addCallback(version, new FutureCallback<String>() {
public void onSuccess(String version) {
System.out.printf("Cassandra version: %s%n", version);
}
public void onFailure(Throwable t) {
System.out.printf("Failed to retrieve the version: %s%n",
t.getMessage());
}
});
If you consume a ResultSet
in a callback, be aware that iterating the
rows will trigger synchronous queries as you page through the results.
To avoid this, use getAvailableWithoutFetching to limit the iteration
to the current page, and fetchMoreResults to get a future to the next
page (see also the section on paging).
Here is a full example:
Statement statement = new SimpleStatement("select * from foo").setFetchSize(20);
ListenableFuture<Void> future = Futures.transform(
session.executeAsync(statement),
iterateFirst());
// Unfortunately we have to special-case for the first page because the signatures of the
// futures differ.
// In 3.0, fetchMoreResults() will return a Future<ResultSet> to avoid this.
private static AsyncFunction<ResultSet, Void> iterateFirst() {
return new AsyncFunction<ResultSet, Void>() {
@Override
public ListenableFuture<Void> apply(ResultSet rs) throws Exception {
return iterate(rs, 1).apply(null);
}
};
}
private static AsyncFunction<Void, Void> iterate(final ResultSet rs, final int page) {
return new AsyncFunction<Void, Void>() {
@Override
public ListenableFuture<Void> apply(Void v) throws Exception {
// How far we can go without triggering the blocking fetch:
int remainingInPage = rs.getAvailableWithoutFetching();
System.out.printf("Starting page %d (%d rows)%n", page, remainingInPage);
for (Row row : rs) {
System.out.printf("[page %d - %d] row = %s%n", page, remainingInPage, row);
if (--remainingInPage == 0)
break;
}
System.out.printf("Done page %d%n", page);
boolean wasLastPage = rs.getExecutionInfo().getPagingState() == null;
if (wasLastPage) {
System.out.println("Done iterating");
return Futures.immediateFuture(null);
} else {
ListenableFuture<Void> future = rs.fetchMoreResults();
return Futures.transform(future, iterate(rs, page + 1));
}
}
};
}
If your callback is slow, consider providing a separate executor. Otherwise the callback might run on one of the driver's I/O threads, blocking I/O operations for other requests while it is running:
ListenableFuture<String> result = Futures.transform(resultSet,
new Function<ResultSet, String>() {
public String apply(ResultSet rs) {
return someVeryLongComputation(rs);
}
}, myCustomExecutor);
Avoid blocking operations in callbacks, especially if you don't provide a separate executor. This could easily lead to deadlock if the thread that's supposed to complete the blocking call is also the thread that's waiting on it:
ListenableFuture<ResultSet> resultSet = Futures.transform(session,
new Function<Session, ResultSet>() {
public ResultSet apply(Session session) {
// Synchronous operation in a callback.
// DON'T DO THIS! It might deadlock.
return session.execute("select release_version from system.local");
}
});
There are still a few places where the driver will block internally (mainly for historical reasons):
- Cluster#init performs blocking I/O operations. To avoid
issues, you should create your
Cluster
instances while bootstrapping your application, and callinit
immediately. If you need to create new instances at runtime, make sure this does not happen on an I/O thread. - if a connection pool is busy, the driver will block until a connection becomes available. To avoid this, set PoolingOptions.poolTimeoutMillis to 0; the driver will not block, just move to the next host immediately.
- if the session is set to a specific keyspace (either at startup or by
issuing a
USE
statement on a running session), the keyspace needs to be propagated to any newly created connection. This is done when the connection is first borrowed from the connection pool, and currently blocks. To avoid any issue, only use a session with no keyspace set (i.e. created byCluster#connect()
). This will be addressed in JAVA-893. - trying to read fields from a query trace will block if the trace hasn't been fetched already.