栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

关于ES只更新少量数据,不批量处理

关于ES只更新少量数据,不批量处理

之前没有用过es直接上手的  如若有错误请指正

背景:新增数据界面可以单个录入,也可以批量录入。同事图简单,数据录入后,无论单个录入还是批量录入都使用BulkProcessor 进行批量处理

BulkProcessor bulkProcessor = esProcessor.getBulkProcessor();

bulkProcessor.add(new IndexRequest(esname_EN).source(JSONObject.toJSonString(data), XContentType.JSON));

。该方法确实没问题,但是代码执行后会有几秒的反应时间,用户体验不是很好。于是更改了需求,数据插入更新,界面需要实时显示。

开始:

插入更新使用同一个方法,所以一开始查看了官网的方法。

IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());

new 部分可以封装成:

 XContentBuilder builder = XContentFactory.jsonBuilder().startObject();

//document是新增或者修改该条数据的全量数据

Map document = esIndexApiService.getdocument(arcTableName, id);

//for循环将所有字段加入builder

for (String key : document .keySet()) {
       builder.field(key, document .get(key));
}

builder.endObject();

// index指esdb,type指_doc,1指_id,官网写法

//IndexRequest indexRequest = new IndexRequest("index", "type", "1"); 

//但是在我的项目中实际是IndexRequest indexRequest = new IndexRequest("esdb", "_doc", "ev2_K30BoZQ27hu-lMdk");

//id这里有坑,后面再说

IndexRequest indexRequest = new IndexRequest("index", "type", "1");

indexRequest .source(builder);

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1");

updateRequest .doc(builder).upsert(indexRequest);

//发送请求

restHighLevelClient.update(updateRequest , RequestOptions.DEFAULT);

该方法确实可以立马更新数据,但是无法自由自动新增!

IndexRequest indexRequest = new IndexRequest("index", "type", "ev2_K30BoZQ27hu-lMdk");

第三个参数id必须指定!这就很不科学了,索引id一般自动生成,也可以自己定义id自增规则,但是除了特殊要求,一般用自带的就行了。

因为无法自由自动新增就舍弃了这个方法。

换为:先删除,再新增:代码如下:

ids用于查询数据库数据,arcTableName是该条数据的表名。

restHighLevelClient框架自动配置好了

public AjaxResult updateByIdsAndTablename2(List ids, String arcTableName) {
        AjaxResult res = new AjaxResult();
        res.put("message", "更新成功!");
        res.put("code", 0);
        List> datas = new ArrayList<>();
        List> document = null;
        List> maps = null;
        if (ids != null && ids.size() > 0) {
            for (String id : ids) {
                // 获取数据库数据
                String sql1 = "select * from " + arcTableName + " where id=" + id;
                maps = jdbcTemplate.queryForList(sql1);
                document = esIndexApiService.getdocument(arcTableName, id);
                if (maps != null && maps.size() > 0) {
                    datas.add(maps.get(0));
                }
            }
        }
        if (datas.size() > 0) {
            if (document.size() != 0) {
                //如果该条数据在索引存在,则先删除
                DeleteRequest deleteRequest = new DeleteRequest(esname_EN, document.get(0).get("_id").toString());
                try {
                    DeleteResponse delete = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
                } catch (IOException e) {
                    res.put("message", "更新失败!");
                    res.put("code", -1);
                    res.put("data", "数据库中未找到任何数据!");
                    e.printStackTrace();
                }
            }
            for (Map data: datas) {
                data.put("table_name", arcTableName);
                IndexRequest source = new IndexRequest(esname_EN).source(JSONObject.toJSonString(data), XContentType.JSON);
                try {
                    IndexResponse index = restHighLevelClient.index(source, RequestOptions.DEFAULT);
                } catch (IOException e) {
                    res.put("message", "更新失败!");
                    res.put("code", -1);
                    res.put("data", "索引新增失败!");
                    e.printStackTrace();
                }
            }
        }
        return res;
    }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/583265.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号