栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

logstash从数据库同步数据到elasticsearch(包含一对多关系)

logstash从数据库同步数据到elasticsearch(包含一对多关系)

目录

创建索引Logstash同步

主文档同步任务脚本嵌套文档同步任务脚本 主文档作为条件查询嵌套文档作为条件查询

创建索引

PUT http://localhost:9200/索引名称
参数(一对多文档定义:properties是主文档属性,subProperties是用来做一对多关系中的嵌套文档):

{
    "settings": {
        "analysis": {
            "analyzer": {
                "dot_split": {
                    "type": "pattern",
                    "pattern": ","
                },
                "dian_split": {
                    "type": "pattern",
                    "pattern": "."
                }
            }
        }
    },
    "mappings": {
        "properties": {
            "id": {
                "type": "keyword"
            },
            "mainCode": {
                "type": "keyword"
            },
            "catalogCode": {
                "type": "keyword"
            },
            "path":{
                "type": "text",
                "analyzer": "dian_split",
                "search_analyzer": "dian_split"
            },
            "shortName": {
                "type": "keyword"
            },
            "desc": {
                "type": "text"
            },
            "createTime": {
                "type": "date"
            },
            "updateTime": {
                "type": "date"
            },
            "subProperties": {
                "type": "nested",
                "properties": {
                    "orgCode": {
                        "type": "keyword"
                    },
                    "groupCodes": {
                        "type": "text",
                        "analyzer": "dot_split",
                        "search_analyzer": "dot_split"
                    }
                }
            }
        }
    }
}

API从文件中读取索引定义并创建索引

Resource resource = resourceLoader.getResource("classpath:索引参数文件.json");
InputStream is = resource.getInputStream();
InputStreamReader isr = new InputStreamReader(is);
BufferedReader br = new BufferedReader(isr);
StringBuffer sourceBuf = new StringBuffer();
String data;
while((data = br.readLine()) != null) {
	sourceBuf.append(data);
}
br.close();
isr.close();
is.close();
//创建索引
CreateIndexRequest index = new CreateIndexRequest(IndexName);
index.source(sourceBuf.toString(), XContentType.JSON);
CreateIndexResponse createIndexResponse = client.indices().create(index, RequestOptions.DEFAULT);
return createIndexResponse.isAcknowledged();
Logstash同步 主文档同步任务脚本
input {
	jdbc {
		# mysql 数据库链接
		jdbc_connection_string => "jdbc:oracle:thin:@127.0.0.1:1521:orcl"
		# 用户名和密码
		jdbc_user => "username"
		jdbc_password => "password"
		# 驱动
		jdbc_driver_library => "D:softrepositorycomoracledatabasejdbcojdbc821.4.0.0.1ojdbc8-21.4.0.0.1.jar"
		# 驱动类名
		jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
		#是否分页
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		#直接执行sql语句,通过id > :sql_last_value去判断上次执行到了哪里
		statement =>"SELECT id,mainCode,catalogCode,path,shortName,createTime,updateTime
		FROM main_table 
		WHERe updateTime > :sql_last_value"
		#使用列值进行查询记录追踪
		use_column_value => true
		tracking_column => "updateTime"
		tracking_column_type => "timestamp" 
		#设置监听间隔	各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "main_table"
		#插件会在last_run_metadata_path参数所指示的元数据文件中,持久化sql_last_value参数
		last_run_metadata_path=>"main_table"
		lowercase_column_names => false
		jdbc_default_timezone =>"Asia/Shanghai"
	}
}
output {
	#将数据库数据存储到es
	elasticsearch {
		#ES IP地址与端口
		hosts => "127.0.0.1:9200"
		#ES索引名称(自己定义的)
		index => "索引名称"
		#自增ID编号
		document_id => "%{mainCode}"
		#设置如果 document_id 不存在则创建
		doc_as_upsert => true
		#设置同步方式是更新
		action => "update"
	}
}

嵌套文档同步任务脚本
input {
	jdbc {
		# mysql 数据库链接
		jdbc_connection_string => "jdbc:oracle:thin:@127.0.0.1:1521:orcl"
		# 用户名和密码
		jdbc_user => "username"
		jdbc_password => "password"
		# 驱动
		jdbc_driver_library => "D:softrepositorycomoracledatabasejdbcojdbc821.4.0.0.1ojdbc8-21.4.0.0.1.jar"
		# 驱动类名
		jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
		#是否分页
		jdbc_paging_enabled => "true"
		jdbc_page_size => "50000"
		# 执行的sql 文件路径+名称(注意这里一定要**按主文档标识排序**)
		statement =>"SELECT mainCode,groupCode,orgCode,ceateTime,updateTime
		FROM sub_table  
		WHERe updateTime > :sql_last_value
		ORDER BY mainCode"
		#使用列值进行查询记录追踪
		use_column_value => true
		tracking_column => "updateTime"
		tracking_column_type => "timestamp" 
		#设置监听间隔	各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
		schedule => "* * * * *"
		# 索引类型
		type => "sub_table"
		#插件会在last_run_metadata_path参数所指示的元数据文件中,持久化sql_last_value参数
		last_run_metadata_path=>"sub_table"
		lowercase_column_names => false
		jdbc_default_timezone =>"Asia/Shanghai"
	}
}
filter {
	aggregate {
		task_id => "%{mainCode}"
		code => "
			#主文档的唯一标识
			map['mainCode'] = event.get('mainCode')
			map['org_code_list'] ||=[]
			map['subProperties'] ||=[]
			if (event.get('orgCode') != nil)
				#嵌套文档的唯一标识
				if !(map['org_code_list'].include? event.get('orgCode'))
					map['org_code_list'] << event.get('orgCode')
					map['subProperties'] << {
						'orgCode' => event.get('orgCode'),
						'groupCode' => event.get('groupCode')
					}
				end
			end
			event.cancel()
		"
		push_previous_map_as_event => true
		timeout => 5
	}
}
output {
	#将数据库数据存储到es
	elasticsearch {
		#ES IP地址与端口
		hosts => "127.0.0.1:9200"
		#ES索引名称(自己定义的)
		index => "mdm_material_nested"
		#自增ID编号
		document_id => "%{mainCode}"
		#设置如果 document_id 不存在则创建
		doc_as_upsert => true
		#设置同步方式是更新
		action => "update"
	}
}

主文档作为条件查询
MatchPhraseQueryBuilder shortHandQueryBuilder = QueryBuilders.matchPhraseQuery("desc", queryMaterialDTO.getSearchKeyword()).boost(5f);
嵌套文档作为条件查询
MatchQueryBuilder groupIdQuery = QueryBuilders.matchQuery("subProperties.groupCode", queryMaterialDTO.getGroupCode());
boolQueryBuilder.filter(QueryBuilders.nestedQuery("subProperties", groupIdQuery, ScoreMode.Avg));

参考文章:https://blog.csdn.net/menglinjie/article/details/102984845

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/712568.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号