创建索引Logstash同步
主文档同步任务脚本嵌套文档同步任务脚本 主文档作为条件查询嵌套文档作为条件查询
创建索引PUT http://localhost:9200/索引名称
参数(一对多文档定义:properties是主文档属性,subProperties是用来做一对多关系中的嵌套文档):
{
"settings": {
"analysis": {
"analyzer": {
"dot_split": {
"type": "pattern",
"pattern": ","
},
"dian_split": {
"type": "pattern",
"pattern": "."
}
}
}
},
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"mainCode": {
"type": "keyword"
},
"catalogCode": {
"type": "keyword"
},
"path":{
"type": "text",
"analyzer": "dian_split",
"search_analyzer": "dian_split"
},
"shortName": {
"type": "keyword"
},
"desc": {
"type": "text"
},
"createTime": {
"type": "date"
},
"updateTime": {
"type": "date"
},
"subProperties": {
"type": "nested",
"properties": {
"orgCode": {
"type": "keyword"
},
"groupCodes": {
"type": "text",
"analyzer": "dot_split",
"search_analyzer": "dot_split"
}
}
}
}
}
}
API从文件中读取索引定义并创建索引
Resource resource = resourceLoader.getResource("classpath:索引参数文件.json");
InputStream is = resource.getInputStream();
InputStreamReader isr = new InputStreamReader(is);
BufferedReader br = new BufferedReader(isr);
StringBuffer sourceBuf = new StringBuffer();
String data;
while((data = br.readLine()) != null) {
sourceBuf.append(data);
}
br.close();
isr.close();
is.close();
//创建索引
CreateIndexRequest index = new CreateIndexRequest(IndexName);
index.source(sourceBuf.toString(), XContentType.JSON);
CreateIndexResponse createIndexResponse = client.indices().create(index, RequestOptions.DEFAULT);
return createIndexResponse.isAcknowledged();
Logstash同步
主文档同步任务脚本
input {
jdbc {
# mysql 数据库链接
jdbc_connection_string => "jdbc:oracle:thin:@127.0.0.1:1521:orcl"
# 用户名和密码
jdbc_user => "username"
jdbc_password => "password"
# 驱动
jdbc_driver_library => "D:softrepositorycomoracledatabasejdbcojdbc821.4.0.0.1ojdbc8-21.4.0.0.1.jar"
# 驱动类名
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
#是否分页
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#直接执行sql语句,通过id > :sql_last_value去判断上次执行到了哪里
statement =>"SELECT id,mainCode,catalogCode,path,shortName,createTime,updateTime
FROM main_table
WHERe updateTime > :sql_last_value"
#使用列值进行查询记录追踪
use_column_value => true
tracking_column => "updateTime"
tracking_column_type => "timestamp"
#设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
# 索引类型
type => "main_table"
#插件会在last_run_metadata_path参数所指示的元数据文件中,持久化sql_last_value参数
last_run_metadata_path=>"main_table"
lowercase_column_names => false
jdbc_default_timezone =>"Asia/Shanghai"
}
}
output {
#将数据库数据存储到es
elasticsearch {
#ES IP地址与端口
hosts => "127.0.0.1:9200"
#ES索引名称(自己定义的)
index => "索引名称"
#自增ID编号
document_id => "%{mainCode}"
#设置如果 document_id 不存在则创建
doc_as_upsert => true
#设置同步方式是更新
action => "update"
}
}
嵌套文档同步任务脚本
input {
jdbc {
# mysql 数据库链接
jdbc_connection_string => "jdbc:oracle:thin:@127.0.0.1:1521:orcl"
# 用户名和密码
jdbc_user => "username"
jdbc_password => "password"
# 驱动
jdbc_driver_library => "D:softrepositorycomoracledatabasejdbcojdbc821.4.0.0.1ojdbc8-21.4.0.0.1.jar"
# 驱动类名
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
#是否分页
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
# 执行的sql 文件路径+名称(注意这里一定要**按主文档标识排序**)
statement =>"SELECT mainCode,groupCode,orgCode,ceateTime,updateTime
FROM sub_table
WHERe updateTime > :sql_last_value
ORDER BY mainCode"
#使用列值进行查询记录追踪
use_column_value => true
tracking_column => "updateTime"
tracking_column_type => "timestamp"
#设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
# 索引类型
type => "sub_table"
#插件会在last_run_metadata_path参数所指示的元数据文件中,持久化sql_last_value参数
last_run_metadata_path=>"sub_table"
lowercase_column_names => false
jdbc_default_timezone =>"Asia/Shanghai"
}
}
filter {
aggregate {
task_id => "%{mainCode}"
code => "
#主文档的唯一标识
map['mainCode'] = event.get('mainCode')
map['org_code_list'] ||=[]
map['subProperties'] ||=[]
if (event.get('orgCode') != nil)
#嵌套文档的唯一标识
if !(map['org_code_list'].include? event.get('orgCode'))
map['org_code_list'] << event.get('orgCode')
map['subProperties'] << {
'orgCode' => event.get('orgCode'),
'groupCode' => event.get('groupCode')
}
end
end
event.cancel()
"
push_previous_map_as_event => true
timeout => 5
}
}
output {
#将数据库数据存储到es
elasticsearch {
#ES IP地址与端口
hosts => "127.0.0.1:9200"
#ES索引名称(自己定义的)
index => "mdm_material_nested"
#自增ID编号
document_id => "%{mainCode}"
#设置如果 document_id 不存在则创建
doc_as_upsert => true
#设置同步方式是更新
action => "update"
}
}
主文档作为条件查询
MatchPhraseQueryBuilder shortHandQueryBuilder = QueryBuilders.matchPhraseQuery("desc", queryMaterialDTO.getSearchKeyword()).boost(5f);
嵌套文档作为条件查询
MatchQueryBuilder groupIdQuery = QueryBuilders.matchQuery("subProperties.groupCode", queryMaterialDTO.getGroupCode());
boolQueryBuilder.filter(QueryBuilders.nestedQuery("subProperties", groupIdQuery, ScoreMode.Avg));
参考文章:https://blog.csdn.net/menglinjie/article/details/102984845



