我用
scripted_metric弹性的解决了。而且,的
index状态已从初始状态更改。
剧本:
{ "size": 0, "aggs": { "intentPathsCountAgg": { "scripted_metric": { "init_script": "state.messagesList = new ArrayList();", "map_script": "long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis; Map currentMessage = ['conversationId': doc['messageReceivedEvent.context.conversationId.keyword'], 'time': currentMessageTime, 'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value]; state.messagesList.add(currentMessage);", "combine_script": "return state", "reduce_script": "List messages = new ArrayList(); Map conversationsMap = new HashMap(); Map intentsMap = new HashMap(); String[] ifElseWorkaround = new String[1]; for (state in states) { messages.addAll(state.messagesList);} messages.stream().forEach((message) -> { Map existingMessage = conversationsMap.get(message.conversationId); if(existingMessage == null || message.time > existingMessage.time) { conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]); } else { ifElseWorkaround[0] = ''; } }); conversationsMap.entrySet().forEach(conversation -> { if (intentsMap.containsKey(conversation.getValue().intentsPath)) { long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1; intentsMap.put(conversation.getValue().intentsPath, intentsCount); } else {intentsMap.put(conversation.getValue().intentsPath, 1L);} }); return intentsMap.entrySet().stream().map(intentPath -> [intentPath.getKey().toString(): intentPath.getValue()]).collect(Collectors.toSet()) " } } }}格式化脚本(为了提高可读性-使用.ts):
scripted_metric: { init_script: 'state.messagesList = new ArrayList();', map_script: ` long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis; Map currentMessage = [ 'conversationId': doc['messageReceivedEvent.context.conversationId.keyword'], 'time': currentMessageTime, 'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value ]; state.messagesList.add(currentMessage);`, combine_script: 'return state', reduce_script: ` List messages = new ArrayList(); Map conversationsMap = new HashMap(); Map intentsMap = new HashMap(); boolean[] ifElseWorkaround = new boolean[1]; for (state in states) { messages.addAll(state.messagesList); } messages.stream().forEach(message -> { Map existingMessage = conversationsMap.get(message.conversationId); if(existingMessage == null || message.time > existingMessage.time) { conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]); } else { ifElseWorkaround[0] = true; } }); conversationsMap.entrySet().forEach(conversation -> { if (intentsMap.containsKey(conversation.getValue().intentsPath)) { long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1; intentsMap.put(conversation.getValue().intentsPath, intentsCount); } else { intentsMap.put(conversation.getValue().intentsPath, 1L); } }); return intentsMap.entrySet().stream().map(intentPath -> [ 'path': intentPath.getKey().toString(), 'count': intentPath.getValue() ]).collect(Collectors.toSet())`答案:
{ "took": 2, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 11, "relation": "eq" }, "max_score": null, "hits": [] }, "aggregations": { "intentPathsCountAgg": { "value": [ { "smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3": 2 }, { "smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3 -> smallTalk.greet4": 1 }, { "smallTalk.greet -> smallTalk.greet2": 1 } ] } }}


