在使用了C 一段时间之后,我相信您应该只将批处理用于保持多个表同步。如果您不需要该 功能 ,则完全不要使用批处理,因为这 会* 导致性能下降。
将数据加载到C *的正确方法是异步写入,如果集群无法跟上接收速度,则可以使用可选的反压。您应该用以下方法替换“自定义”批处理方法:
- 执行异步写入
- 控制下有多少机上写
- 写超时时执行一些重试。
要执行异步写入,请使用
.executeAsync方法,该方法将返回一个
ResultSetFuture对象。
为了控制下,有多少个运行中查询只是
ResultSetFuture将从
.executeAsync方法中检索到的对象收集在一个列表中,并且如果列表得到了(此处的计算值),则说1k个元素,然后等待所有这些元素完成后再发出更多写操作。或者,您可以等待第一个完成后再发出更多写操作,以保持列表完整。
最后,您可以在等待操作完成时检查写入失败。在这种情况下,您可以:
- 用相同的超时值再次写入
- 以增加的超时值再次写入
- 等待一段时间,然后使用相同的超时值再次写入
- 等待一段时间,然后以增加的超时值再次写入
从1到4,背压 强度 增加。选择最适合您的情况的一种。
问题更新后编辑
您的插入逻辑对我来说似乎有点混乱:
- 我看不到任何 重试 逻辑
- 如果失败,则不要删除列表中的项目
- 您
while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit)
是错误的,因为仅当发出的查询数>时您才会进入睡眠状态concurrentInsertLimit
,并且由于2.您的线程将仅停留在该位置。 - 你永远不会设置为假
concurrentInsertErrorOccured
我通常会保留(失败的)查询列表,以便稍后重试。这使我可以对查询进行有力的控制,并且当失败的查询开始累积时,我会睡一会儿,然后继续重试它们(最多X次,然后出现严重失败…)。
该列表应该非常动态,例如,当查询失败时,您可以在其中添加项目,而在执行重试时,则可以删除项目。现在,您可以了解群集的限制,并
concurrentInsertLimit根据例如最近一秒内失败查询的平均数量进行调整,或者使用更简单的方法“
如果重试列表中有项目则暂停 ”等。
注释后编辑2
由于您不需要任何重试逻辑,因此我将以这种方式更改代码:
private List<ResultSetFuture> runningInsertList;private static int concurrentInsertLimit = 1000;private static int concurrentInsertSleepTime = 500;...@Overridepublic void executeBatch(Statement statement) throws InterruptedException { if (this.runningInsertList == null) { this.runningInsertList = new ArrayList<>(); } ResultSetFuture future = this.executeAsync(statement); this.runningInsertList.add(future); Futures.addCallback(future, new FutureCallback<ResultSet>() { @Override public void onSuccess(ResultSet result) { runningInsertList.remove(future); } @Override public void onFailure(Throwable t) { runningInsertList.remove(future); concurrentInsertErrorOccured = true; } }, MoreExecutors.sameThreadExecutor()); //Sleep while the currently processing number of inserts is too high while (runningInsertList.size() >= concurrentInsertLimit) { Thread.sleep(concurrentInsertSleepTime); } if (!concurrentInsertErrorOccured) { // Increase your ingestion rate if no query failed so far concurrentInsertLimit += 10; } else { // Decrease your ingestion rate because at least one query failed concurrentInsertErrorOccured = false; concurrentInsertLimit = Max(1, concurrentInsertLimit - 50); while (runningInsertList.size() >= concurrentInsertLimit) { Thread.sleep(concurrentInsertSleepTime); } } return;}您还可以通过用
List<ResultSetFuture>计数器代替来优化过程。
希望能有所帮助。



