栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

与MongoDB相比,使用Java驱动程序进行Cassandra批量写入的性能非常差

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

与MongoDB相比,使用Java驱动程序进行Cassandra批量写入的性能非常差

在使用了C 一段时间之后,我相信您应该只将批处理用于保持多个表同步。如果您不需要该 功能 ,则完全不要使用批处理,因为这 * 导致性能下降。

将数据加载到C *的正确方法是异步写入,如果集群无法跟上接收速度,则可以使用可选的反压。您应该用以下方法替换“自定义”批处理方法:

  • 执行异步写入
  • 控制下有多少机上写
  • 写超时时执行一些重试。

要执行异步写入,请使用

.executeAsync
方法,该方法将返回一个
ResultSetFuture
对象。

为了控制下,有多少个运行中查询只是

ResultSetFuture
将从
.executeAsync
方法中检索到的对象收集在一个列表中,并且如果列表得到了(此处的计算值),则说1k个元素,然后等待所有这些元素完成后再发出更多写操作。或者,您可以等待第一个完成后再发出更多写操作,以保持列表完整。

最后,您可以在等待操作完成时检查写入失败。在这种情况下,您可以:

  1. 用相同的超时值再次写入
  2. 以增加的超时值再次写入
  3. 等待一段时间,然后使用相同的超时值再次写入
  4. 等待一段时间,然后以增加的超时值再次写入

从1到4,背压 强度 增加。选择最适合您的情况的一种。


问题更新后编辑

您的插入逻辑对我来说似乎有点混乱:

  1. 我看不到任何 重试 逻辑
  2. 如果失败,则不要删除列表中的项目
  3. while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit)
    是错误的,因为仅当发出的查询数>时您才会进入睡眠状态
    concurrentInsertLimit
    ,并且由于2.您的线程将仅停留在该位置。
  4. 你永远不会设置为假
    concurrentInsertErrorOccured

我通常会保留(失败的)查询列表,以便稍后重试。这使我可以对查询进行有力的控制,并且当失败的查询开始累积时,我会睡一会儿,然后继续重试它们(最多X次,然后出现严重失败…)。

该列表应该非常动态,例如,当查询失败时,您可以在其中添加项目,而在执行重试时,则可以删除项目。现在,您可以了解群集的限制,并

concurrentInsertLimit
根据例如最近一秒内失败查询的平均数量进行调整,或者使用更简单的方法“
如果重试列表中有项目则暂停 ”等。


注释后编辑2

由于您不需要任何重试逻辑,因此我将以这种方式更改代码:

private List<ResultSetFuture> runningInsertList;private static int concurrentInsertLimit = 1000;private static int concurrentInsertSleepTime = 500;...@Overridepublic void executeBatch(Statement statement) throws InterruptedException {    if (this.runningInsertList == null) {        this.runningInsertList = new ArrayList<>();    }    ResultSetFuture future = this.executeAsync(statement);    this.runningInsertList.add(future);    Futures.addCallback(future, new FutureCallback<ResultSet>() {        @Override        public void onSuccess(ResultSet result) { runningInsertList.remove(future);        }        @Override        public void onFailure(Throwable t) { runningInsertList.remove(future); concurrentInsertErrorOccured = true;        }    }, MoreExecutors.sameThreadExecutor());    //Sleep while the currently processing number of inserts is too high    while (runningInsertList.size() >= concurrentInsertLimit) {        Thread.sleep(concurrentInsertSleepTime);    }    if (!concurrentInsertErrorOccured) {        // Increase your ingestion rate if no query failed so far        concurrentInsertLimit += 10;    } else {        // Decrease your ingestion rate because at least one query failed        concurrentInsertErrorOccured = false;        concurrentInsertLimit = Max(1, concurrentInsertLimit - 50);        while (runningInsertList.size() >= concurrentInsertLimit) { Thread.sleep(concurrentInsertSleepTime);        }    }    return;}

您还可以通过用

List<ResultSetFuture>
计数器代替来优化过程。

希望能有所帮助。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/454739.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号