- 通过curl命令
curl -H "Content-Type: application/json" -X PUT ':9200/index1' -d ' { "settings" : { "index" : { # 配置分片数,分片数多时查询速度快,一般分片数与节点数一致 # 分片数只能在索引创建时配置,创建后不可修改 "number_of_shards" : 5, "number_of_replicas" : 1 } }, "mappings": { "properties": { "col1": { "type": "text" }, "col2": { "type": "text" }, "col3": { "type": "date" }, "col4": { "type": "text" } } } }'
- 通过logstash:采用logstash将mysql中的数据库迁至es的某一index时,若该index不存在,则自动创建,但是推荐采用curl命令创建index,可以自行定义index的配置
两种语法均可
curl -XDELETe -u elastic:changeme http://1.3 查看索引配置:9200/index1 curl -X DELETE 'http:// :9200/index1'
由于都是get方法,可以直接在浏览器中打开
- 查看索引中总记录数
http://:9200/index1/_count
- 查看索引的配置、分片数等
http://1.4 修改索引配置:9200/index1/_settings
- 修改 max_result_window:该参数控制了一次ES查询的最大返回数量,默认为10000,很多场景下期望的查询返回超出该值,可通过如下语法进行修改。
curl -H "Content-Type: application/json" -X PUT http://:9200/_settings -d '{ "index" : { "max_result_window" : 100000000}}'
当只想修改某一index的max_result_window,需要在请求中指定index的名称
curl -XPUT http://:9200/index1/_settings -H 'Content-Type: application/json' -d '{"index": {"max_result_window": 1000000}}'
- 修改 max_buckets:该参数控制了一次ES分组查询的最大桶数,当某些应用场景需要使用ES的AGG统计功能时,某些列的分组可能会超过默认的桶数(10000),可通过如下语法进行修改。
curl -XPUT http://:9200/_cluster/settings -H 'Content-Type: application/json' -d '{"persistent": {"search.max_buckets": 300000}}'
- 修改字段属性:当某一场景需要统计某一列分组后的组内数量时,该列必须为fielddata属性才可以进行统计,可采用如下语法进行修改。当然也可以在query的查询语法中进行临时指定。
curl -XPUT http://2. ES客户端:9200/index1/_mapping -H 'Content-Type: application/json' -d '{"properties": {"bill_no": {"type": "text", "fielddata": true}}}'
es客户端提供了类似于一般sql的查询功能,对于不熟悉ES的query语法的用户而言,借助该客户端工具,可以更快的进行学习。当然,该客户端并未实现所有的sql语法,存在一定的局限性。
- 启动elasticsearch-sql-cli客户端,出现下图表明客户端启动成功
/opt/elasticsearch/bin/elasticsearch-sql-cli
2. 输入待查询的sql语句(与mysql的查询方法一致)
index建立后,需要向index中迁移数据,根据来源数据库的不同,这里有2种方法进行迁移。
3.1 从hive中迁移采用hive的jar包,可以方便的将hive中的数据迁移到es中,语法如下,ES的数据是存在hive表中的。
hive -e "ADD JAR /opt/install/elasticsearch-hadoop-7.6.2/dist/elasticsearch-hadoop-7.6.2.jar;
ADD JAR /usr/hdp/current/hive-client/lib/commons-httpclient-3.0.1.jar;
drop table es_data.index1;
CREATE EXTERNAL TABLE es_data.index1(
col1 string,
col2 string,
col3 string,
col4 string
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
'es.resource'='index1',
'es.query'='?q=col3:*',
'es.nodes'='','es.port'='9200',
'es.date.format'='yyyy-MM-dd HH:mm:ss',
'es.ser.reader.value.class'='com.jointsky.bigdata.hive.EsValueReader'
);
insert overwrite table es_data.index1 select col1, col2, col3, col4 from data_platform.index1;"
3.2 从mysql中迁移
从mysql中迁移需要借助logstash工具、mysql的jdbc的jar包,这里对于logstash的安装配置不作叙述。
执行命令如下,参数–path.data,当一台服务器中需执行多张表的迁移,且没有配置到一个conf文件中时,此时执行多个logstash命令时需要指定
/usr/local/logstash/bin/logstash -f /opt/merge/merge_index1_to_es.conf --path.data=/opt/index1/
merge_index1_to_es.conf的配置如下,当迁移多表时,可在input中编写多个jdbc配置:
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://:3306/?characterEncoding=UTF8&serverTimezone=UTC"
jdbc_user => "<>"
jdbc_password => "<>"
jdbc_driver_library => "/opt/merge_index1/mysql-connector-java-6.0.6.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
codec => plain { charset => "UTF-8"}
jdbc_default_timezone => "Asia/Shanghai"
statement_filepath => "/opt/merge_index1/sql_to_es_index1.sql"
# 定时配置,分、时...,如下为每2分钟执行一次
schedule => "*/2 * * * *"
type => "jdbc"
record_last_run => true
tracking_column => import_time
last_run_metadata_path => "/opt/merge_index1/index1_last_run.txt"
clean_run => false
}
}
sql_to_es_index1.sql的配置如下,sql_last_value为上一次merge成功的时间:
select * from tender_info_report where import_time>=:sql_last_value
由于logstash的定时最快为每分钟迁移一次,如果mysql源数据刷新间隔更快,可能会遗漏数据,此时可以对sql语句稍作修改,下列语法的含义为:当sql_last_value=2021-11-12 04:59:00.386511000时,执行下一次迁移时,将从mysql中取出import_time>='2021-11-12 04:00:00’的数据。
select * from tender_info where import_time>= str_to_date((:sql_last_value), '%Y-%m-%d %H')
index1_last_run.txt的配置如下:第一次执行时,提供一个空的txt即可,后续每次执行logstash会自动更新时间,形式如下
--- !ruby/object:DateTime '2021-11-12 04:59:00.386511000 Z'4. ES查询
es的高效查询是其被广泛应用的重要原因,特别是非结构化文本查询,mysql这样的数据库在大数据量时基本无法应用like查询,而ES却可以进行相对快速的查询。
4.1 简单查询当查询语法较少,且不使用ES的统计功能时,可以通过url进行查询,使用案例如下
import request # 一条url语法只能执行同一种逻辑运算:and 或者 or,不能既有and条件又有or条件 # 需要注意的是由于ES的分词器的原因,url条件中的双引号、单引号的含义是有区别的 # col2:"百度"——>查询文本中包含百度的记录 # col2:'百度'——>查询文本中包含百、度、百度的记录 url = 'http://4.2 query语法:9200/index1/_search?q=col1:[2021-11-01 TO 2021-11-07] AND col2:"百度"&size=10000&pretty' req = requests.get(url) req_json = req.json()['hits']['hits']
ES的查询语法说明官方文档:https://www.elastic.co/guide/cn/elasticsearch/guide/current/_most_important_queries.html
当某一场景的查询条件为如下的sql语句:
"select count(BILL_NO) from abnormal_result where (( MAIN_G_NAME like '%耳钉%' and PACK_NO='4')) and whole_risk!= 0 limit 10,5"
采用es的语句进行查询时的代码如下:
es = Elasticsearch(["5. GC管理" + ':' + "9200"]) body = { # from, size参数实现了limit 10,5的语法 "from": 10, "size": 5, "query": { # 条件联查,联合查询需要都返回TRUE # 此处bool将实现 (( MAIN_G_NAME like '%耳钉%' and PACK_NO='4')) and whole_risk!= 0 中第二个and的功能 # 其实更容易理解的方式就是有几个and、or就套几层bool,可以一直嵌套下去 "bool": { # should:or查询, must: and查询 # 此处 must语法内将实现( MAIN_G_NAME like '%耳钉%' and PACK_NO='4')中的and功能 "must": [ { # 模糊匹配,以短语形式,若使用"match"的语法,则会返回“耳环”、“钉珠”等结果 # 使用"match_phrase",则返回完全符合“耳钉”的结果 "match_phrase": { "main_g_name": '耳钉' } }, { # 精确匹配,term适用于数字,查字符串时可能无返回值 "term": { "pack_no": '4' } } ], # 不等于 # 此处实现whole_risk!= 0的语法 "must_not": [ { "match": { "whole_risk": "0" } } ] } }, # 统计符合query要求的单据的数量 # cardinality: 去重计数,value_count: 计数 # 此处bill_no需要指定为keyword类型 "aggs": { "count": { "cardinality": { "field": "bill_no.keyword" } } } } result = es.search(index="abnormal_result", body=body)
某次分组统计查询时,返回数据量较大,报错:[parent] Data too large
处理方法:增大内存
vim /opt/elasticsearch/config/jvm.options
# 可根据服务器内存自行配置,默认为1g -Xms2g -Xmx2g
es中jvm的gc管理参考:
https://blog.csdn.net/wang_zhenwei/article/details/50385720
采用如下语法在浏览器中打开查看索引中的fielddata内存情况:可以发现memory_size_in_bytes(占用内存),evictions(驱逐)为0
http://10.30.239.196:9200/abnormal_result/_stats/fielddata?fields=*&pretty
在elasticsearch.yml中配置断路器相关的参数
# 控制cache加载,默认为60%,这个设置高点会不会更好 # 当缓存区大小到达断路器所配置的大小时:会返回Data too large异常 indices.breaker.fielddata.limit: 60% # 配置fieldData的Cache大小,cache到达约定的内存大小时会自动清理 indices.fielddata.cache.size:20%



