2121SC@SDUSC
一、实现类、接口实现:
分析见注释
(1)class BatchAsyncResultHandler
public class BatchAsyncResultHandler implements AsyncResultHandler> { private ConcurrentlinkedQueue
completed; private ExecutionResultHandler handler;
创建一个新的{@link BatchAsyncResultHandler}实例
public BatchAsyncResultHandler(ExecutionResultHandler handler) {
this.handler = handler;
this.completed = new ConcurrentlinkedQueue<>();
}
该方法负责指定的输入失败。
默认方法不做操作。
@Override public void failure(Throwable t, Listinput) { completed.offer(new ExecutionResultCollector.FailedCollector(input, t)); } @Override public void success(List input) { completed.offer(new ExecutionResultCollector.SucceedCollector(input)); } @Override public void flush(final OutputCollector collector) { ExecutionResultCollector poll; while ((poll = completed.poll()) != null) { poll.handle(collector, handler); } } }
(2) class SingleAsyncResultHandler
public class SingleAsyncResultHandler implements AsyncResultHandler{ private ConcurrentlinkedQueue completed; private ExecutionResultHandler handler;
创建一个新的{@link SingleAsyncResultHandler}实例。
public SingleAsyncResultHandler(ExecutionResultHandler handler) {
this.handler = handler;
this.completed = new ConcurrentlinkedQueue<>();
}
该方法对指定的输入失败负责。
默认方法不做任何操作。
@Override
public void failure(Throwable t, Tuple input) {
completed.offer(new ExecutionResultCollector.FailedCollector(input, t));
}
@Override
public void success(Tuple input) {
completed.offer(new ExecutionResultCollector.SucceedCollector(input));
}
@Override
public void flush(final OutputCollector collector) {
ExecutionResultCollector poll;
while ((poll = completed.poll()) != null) {
poll.handle(collector, handler);
}
}
}
二、服务异步执行语句:
异步执行与指定输入关联的所有语句
一旦所有查询成功,输入将传递给handler#onSuccess;
如果其中任何一个查询失败,输入将传递handler#onFailure
public List> execAsync(List statements, final T input) { List > settableFutures = new ArrayList<>(statements.size()); for (Statement s : statements) { settableFutures.add(execAsync(s, input, AsyncResultHandler.NO_OP_HANDLER)); } ListenableFuture > allAsList = Futures.allAsList(settableFutures); Futures.addCallback(allAsList, new FutureCallback
>() { @Override public void onSuccess(List
inputs) { handler.success(input); } @Override public void onFailure(Throwable t) { handler.failure(t, input); } }, executorService); return settableFutures; }
异步执行指定的批处理语句,输入将被传递给 {@link #handler}一旦查询成功或失败。
public SettableFutureexecAsync(final Statement statement, final T inputs) { return execAsync(statement, inputs, handler); } // Acquire a slot获得一个槽 if (asyncContext.acquire()) { try { pending.incrementAndGet(); final T input = inputs.get(i); final Statement statement = statements.get(i); ResultSetFuture future = session.executeAsync(statement); Futures.addCallback(future, new FutureCallback () { @Override public void onSuccess(ResultSet result) { try { handler.success(input, result); } catch (Throwable throwable) { asyncContext.exception(throwable); } finally { pending.decrementAndGet(); asyncContext.release(); } } @Override public void onFailure(Throwable throwable) { try { handler.failure(throwable, input); } catch (Throwable throwable2) { asyncContext.exception(throwable2); } finally { asyncContext .exception(throwable) .release(); pending.decrementAndGet(); LOG.error(String.format("Failed to execute statement '%s' ", statement), throwable); } } }, executorService); } catch (Throwable throwable) { asyncContext.exception(throwable) .release(); pending.decrementAndGet(); break; } } } return settableFuture; }
completed.返回当前执行但尚未完成的任务数量。
public int getPendingTasksSize() {
return this.pending.intValue();
}
public void shutdown() {
if (!executorService.isShutdown()) {
LOG.info("shutting down async handler executor");
this.executorService.shutdownNow();
}
}
private static class AsyncContext {
private final List inputs;
private final SettableFuture> future;
private final AtomicInteger latch;
private final List exceptions;
private final Semaphore throttle;
AsyncContext(List inputs, Semaphore throttle, SettableFuture> settableFuture) {
this.inputs = inputs;
this.latch = new AtomicInteger(inputs.size());
this.throttle = throttle;
this.exceptions = Collections.synchronizedList(new ArrayList());
this.future = settableFuture;
}
public boolean acquire() {
throttle.acquireUninterruptibly();
如果有异常,不要启动新的请求
if (exceptions.size() > 0) {
latch.decrementAndGet();
throttle.release();
return false;
}
return true;
}
public AsyncContext release() {
int remaining = latch.decrementAndGet();
if (remaining == 0) {
if (exceptions.size() == 0) {
future.set(inputs);
} else {
future.setException(new MultiFailedException(exceptions));
}
}
throttle.release();
return this;
}
public AsyncContext exception(Throwable throwable) {
this.exceptions.add(throwable);
return this;
}
}



