栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

storm-executor-impl

storm-executor-impl

storm-executor-impl
2121SC@SDUSC

一、实现类、接口实现:

分析见注释
(1)class BatchAsyncResultHandler

public class BatchAsyncResultHandler implements           AsyncResultHandler> {

private ConcurrentlinkedQueue completed;

private ExecutionResultHandler handler;

创建一个新的{@link BatchAsyncResultHandler}实例

public BatchAsyncResultHandler(ExecutionResultHandler handler) {
    this.handler = handler;
    this.completed = new ConcurrentlinkedQueue<>();
}

该方法负责指定的输入失败。
默认方法不做操作。

@Override
public void failure(Throwable t, List input) {
    completed.offer(new ExecutionResultCollector.FailedCollector(input, t));
}
@Override
public void success(List input) {
    completed.offer(new ExecutionResultCollector.SucceedCollector(input));
}
@Override
public void flush(final OutputCollector collector) {
    ExecutionResultCollector poll;
    while ((poll = completed.poll()) != null) {
        poll.handle(collector, handler);
    }
}
}

(2) class SingleAsyncResultHandler

public class SingleAsyncResultHandler implements AsyncResultHandler {

private ConcurrentlinkedQueue completed;

private ExecutionResultHandler handler;

创建一个新的{@link SingleAsyncResultHandler}实例。

   public SingleAsyncResultHandler(ExecutionResultHandler handler) {
    this.handler = handler;
    this.completed = new ConcurrentlinkedQueue<>();
}

该方法对指定的输入失败负责。
默认方法不做任何操作。

@Override
public void failure(Throwable t, Tuple input) {
    completed.offer(new ExecutionResultCollector.FailedCollector(input, t));
}


@Override
public void success(Tuple input) {
    completed.offer(new ExecutionResultCollector.SucceedCollector(input));
}
@Override
public void flush(final OutputCollector collector) {
    ExecutionResultCollector poll;
    while ((poll = completed.poll()) != null) {
        poll.handle(collector, handler);
    }
}
}

二、服务异步执行语句:

异步执行与指定输入关联的所有语句
一旦所有查询成功,输入将传递给handler#onSuccess;
如果其中任何一个查询失败,输入将传递handler#onFailure

public List> execAsync(List statements, final T input) {

List> settableFutures = new ArrayList<>(statements.size());

    for (Statement s : statements) {
        settableFutures.add(execAsync(s, input, AsyncResultHandler.NO_OP_HANDLER));
    }

    ListenableFuture> allAsList = Futures.allAsList(settableFutures);
    Futures.addCallback(allAsList, new FutureCallback>() {
        @Override
        public void onSuccess(List inputs) {
            handler.success(input);
        }

        @Override
        public void onFailure(Throwable t) {
            handler.failure(t, input);
        }
    }, executorService);
    return settableFutures;
}

异步执行指定的批处理语句,输入将被传递给 {@link #handler}一旦查询成功或失败。

public SettableFuture execAsync(final Statement statement, final T inputs) {
    return execAsync(statement, inputs, handler);
}






        // Acquire a slot获得一个槽
        if (asyncContext.acquire()) {
            try {
                pending.incrementAndGet();
                final T input = inputs.get(i);
                final Statement statement = statements.get(i);
                ResultSetFuture future = session.executeAsync(statement);
                Futures.addCallback(future, new FutureCallback() {
                    @Override
                    public void onSuccess(ResultSet result) {
                        try {
                            handler.success(input, result);
                        } catch (Throwable throwable) {
                            asyncContext.exception(throwable);
                        } finally {
                            pending.decrementAndGet();
                            asyncContext.release();
                        }
                    }

                    @Override
                    public void onFailure(Throwable throwable) {
                        try {
                            handler.failure(throwable, input);
                        } catch (Throwable throwable2) {
                            asyncContext.exception(throwable2);
                        } finally {
                            asyncContext
                                .exception(throwable)
                                .release();
                            pending.decrementAndGet();
                            LOG.error(String.format("Failed to execute statement '%s' ", statement), throwable);
                        }
                    }
                }, executorService);
            } catch (Throwable throwable) {
                asyncContext.exception(throwable)
                            .release();
                pending.decrementAndGet();
                break;
            }
        }
    }
    return settableFuture;
}

completed.返回当前执行但尚未完成的任务数量。

public int getPendingTasksSize() {
    return this.pending.intValue();
}

public void shutdown() {
    if (!executorService.isShutdown()) {
        LOG.info("shutting down async handler executor");
        this.executorService.shutdownNow();
    }
}

private static class AsyncContext {
    private final List inputs;
    private final SettableFuture> future;
    private final AtomicInteger latch;
    private final List exceptions;
    private final Semaphore throttle;

    AsyncContext(List inputs, Semaphore throttle, SettableFuture> settableFuture) {
        this.inputs = inputs;
        this.latch = new AtomicInteger(inputs.size());
        this.throttle = throttle;
        this.exceptions = Collections.synchronizedList(new ArrayList());
        this.future = settableFuture;
    }

    public boolean acquire() {
        throttle.acquireUninterruptibly();

如果有异常,不要启动新的请求

        if (exceptions.size() > 0) {
            latch.decrementAndGet();
            throttle.release();
            return false;
        }
        return true;
    }

    public AsyncContext release() {
        int remaining = latch.decrementAndGet();
        if (remaining == 0) {
            if (exceptions.size() == 0) {
                future.set(inputs);
            } else {
                future.setException(new MultiFailedException(exceptions));
            }

        }
        throttle.release();
        return this;
    }

    public AsyncContext exception(Throwable throwable) {
        this.exceptions.add(throwable);
        return this;
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/613017.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号