Logstash
Logstash 基于java开发,是一个数据抽取转化工具。一般工作方式为c/s架构,client端安装在需要收集信息的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch或其他组件上去。
Beats
Beats 平台集合了多种单一用途数据采集器。它们从成百上千或成千上万台机器和系统向 Logstash 或 Elasticsearch 发送数据。
Beats由如下组成:
Packetbeat:轻量型网络数据采集器,用于深挖网线上传输的数据,了解应用程序动态。Packetbeat 是一款轻量型网络数据包分析器,能够将数据发送至 Logstash 或 Elasticsearch。其支 持ICMP (v4 and v6)、DNS、HTTP、Mysql、PostgreSQL、Redis、MongoDB、Memcache等协议。
Filebeat:轻量型日志采集器。当您要面对成百上千、甚至成千上万的服务器、虚拟机和容器生成的日志时,请告别 SSH 吧。Filebeat 将为您提供一种轻量型方法,用于转发和汇总日志与文件,让简单的事情不再繁杂。
Metricbeat :轻量型指标采集器。Metricbeat 能够以一种轻量型的方式,输送各种系统和服务统计数据,从 CPU 到内存,从 Redis 到 Nginx,不一而足。可定期获取外部系统的监控指标信息,其可以监控、收集 Apache http、HAProxy、MongoDB、MySQL、Nginx、PostgreSQL、Redis、System、Zookeeper等服务。
Winlogbeat:轻量型 Windows 事件日志采集器。用于密切监控基于 Windows 的基础设施上发生的事件。Winlogbeat 能够以一种轻量型的方式,将 Windows 事件日志实时地流式传输至 Elasticsearch 和 Logstash。
Auditbeat:轻量型审计日志采集器。收集您 Linux 审计框架的数据,监控文件完整性。Auditbeat 实时采集这些事件,然后发送到 Elastic Stack 其他部分做进一步分析。
Heartbeat:面向运行状态监测的轻量型采集器。通过主动探测来监测服务的可用性。通过给定 URL 列表,Heartbeat 仅仅询问:网站运行正常吗?Heartbeat 会将此信息和响应时间发送至 Elastic 的其他部分,以进行进一步分析。
Functionbeat:面向云端数据的无服务器采集器。在作为一项功能部署在云服务提供商的功能即服务 (FaaS) 平台上后,Functionbeat 即能收集、传送并监测来自您的云服务的相关数据。
Elastic cloud
基于 Elasticsearch 的软件即服务(SaaS)解决方案。通过 Elastic 的官方合作伙伴使用托管的 Elasticsearch 服务。
2.简单的集群管理
快速检查集群的健康状态
GET /_cat/health?v
快速查看集群中有哪些索引
GET /_cat/indices?v
3.查询
3.1 定制返回字段
就像sql不要select *,而要select name,price from book …一样。
GET /book/_doc/1?__source_includes=name,price
3.2 强制创建
为防止覆盖原有数据,我们在新增时,设置为强制创建,不会覆盖原有文档。
语法:PUT /index/ doc/id/create
PUT /test_index/_doc/1/_create
{
"test_field": "test"
}
3.3 全量替换
执行两次,返回结果中版本号(_version)在不断上升。此过程为全量替换。
PUT /test_index/_doc/1
{
"test_field": "test"
}
实质:旧文档的内容不会立即删除,只是标记为deleted。适当的时机,集群会将这些文档删除。
3.4 删除
DELETE /index/_doc/id
DELETE /test_index/_doc/1/
实质:旧文档的内容不会立即删除,只是标记为deleted。适当的时机,集群会将这些文档删除。
lazy delete
3.5 partial update局部更新
3.5.1 局部更新步骤
-
es内部获取旧文档
-
将传来的文档field更新到旧数据(内存)
-
将旧文档标记为delete,版本+1
-
创建新文档
3.5.2 局部更新相较于全量替换的有点
-
减少网络请求次数 (全量替换 需要客户端与es服务端三次IO)
-
减少网络开销
-
减少了并发冲突
3.6 图解悲观锁与乐观锁机制
-
es内部获取旧文档
-
将传来的文档field更新到旧数据(内存)
-
将旧文档标记为delete,版本+1
-
创建新文档
3.5.2 局部更新相较于全量替换的有点
-
减少网络请求次数 (全量替换 需要客户端与es服务端三次IO)
-
减少网络开销
-
减少了并发冲突
3.6 图解悲观锁与乐观锁机制
减少网络请求次数 (全量替换 需要客户端与es服务端三次IO)
减少网络开销
减少了并发冲突
为控制并发问题,我们通常采用锁机制。分为悲观锁和乐观锁两种机制。
悲观锁:很悲观,所有情况都上锁。此时只有一个线程可以操作数据。具体例子为数据库中的行级锁、表级锁、读锁、写锁等。
特点:优点是方便,直接加锁,对程序透明。缺点是效率低。
乐观锁:很乐观,对数据本身不加锁。提交数据时,通过一种机制验证是否存在冲突,如es中通过版本号验证。
特点:优点是并发能力高。缺点是操作繁琐,在提交数据时,可能反复重试多次。
3.7 图解es内部基于_version乐观锁控制
实验基于_version的版本控制
对于相同id的文档,不管进行增还是删,改都会使版本+1
3.8 es内部并发控制
es后台,主从同步时异步多线程的.所以,多个请求时乱序的.
es内部的主从同步,也是基于版本号的,线程1先到,线程2后到,副本数据是没有问题的,线程2先到,线程1后到 : 副本分片先把数据改为test3,version=3.线程1请求到了,副本分片,判断请求的version=1太旧了,就会丢弃这个请求,这样就可以保持主从同步了,有点暴力.....
4.java High Client 4.1 异步执行所有的由esClient发起的请求都可以是异步的,这里就不再赘述.
同步执行 :
//同步查询 SearchResponse response = esClient.search(searchRequest, RequestOptions.DEFAULT);
异步执行
//listener ActionListenerlistener = new ActionListener () { //成功ack @Override public void onResponse(SearchResponse searchResponse) { } //失败ack @Override public void onFailure(Exception e) { } }; //异步查询 esClient.searchAsync(searchRequest,RequestOptions.DEFAULT,listener);
//match
@Test
void match() throws IOException {
SearchRequest searchRequest = new SearchRequest("hotel");
searchRequest.source().query(QueryBuilders.matchQuery("name", "7天"));
//listener
ActionListener listener = new ActionListener() {
//成功ack
@Override
public void onResponse(SearchResponse searchResponse) {
}
//失败ack
@Override
public void onFailure(Exception e) {
}
};
//异步查询
esClient.searchAsync(searchRequest,RequestOptions.DEFAULT,listener);
//条数
long totalCount = response.getHits().getTotalHits().value;
System.err.println("查询出总条数为" + totalCount);
//查询结果
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsString());
}
}
4.2 新增文档结果校验
//新增文档
@Test
void createdocument() throws IOException {
User user = new User();
user.setId(1L);
user.setName("wuweinan");
IndexRequest indexRequest = new IndexRequest("user").id("1");
indexRequest.source(JSON.toJSonString(user), XContentType.JSON);
IndexResponse indexResponse = esClient.index(indexRequest, RequestOptions.DEFAULT);
if(indexResponse.getResult() == DocWriteResponse.Result.CREATED){
DocWriteResponse.Result result = indexResponse.getResult();
System.out.println("CREATE"+result);
}else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED){
DocWriteResponse.Result result = indexResponse.getResult();
System.out.println("UPDATE"+result);
}
}
5.es天生的集群特性
对于分片,全都是针对索引定制的
5.1 es分布式基础
5.1.1 es对复杂分布式机制的透明隐藏特性
-
分布式机制:分布式数据存储及共享。
-
分片机制:数据存储到哪个分片,副本数据写入。
-
集群发现机制:cluster discovery。新启动es实例,自动加入集群。
-
shard负载均衡:大量数据写入及查询,es会将数据平均分配。
-
shard副本:新增副本数,分片重分配。
-
分布式机制:分布式数据存储及共享。
-
分片机制:数据存储到哪个分片,副本数据写入。
-
集群发现机制:cluster discovery。新启动es实例,自动加入集群。
-
shard负载均衡:大量数据写入及查询,es会将数据平均分配。
-
shard副本:新增副本数,分片重分配。
5.1.2 Elasticsearch的垂直扩容与水平扩容
垂直扩容:使用更加强大的服务器替代老服务器。但单机存储及运算能力有上线。且成本直线上升。如10t服务器1万。单个10T服务器可能20万。
水平扩容:采购更多服务器,加入集群。大数据。(推荐)
5.1.3 增减或减少节点时的数据rebalance
新增或减少es实例时,es集群会将数据重新分配。
5.1.4 master节点
功能:
-
创建删除节点
-
创建删除索引
5.1.5 节点对等的分布式架构
-
节点对等,每个节点都能接收所有的请求
-
自动请求路由
-
响应收集
节点对等,每个节点都能接收所有的请求
自动请求路由
响应收集
这个和之前有点出入,默认的节点并不是协调节点
5.1.6 横向扩容
-
分片自动负载均衡,分片向空闲机器转移。
-
每个节点存储更少分片,系统资源给与每个分片的资源更多,整体集群性能提高。
-
扩容极限:节点数大于整体分片数,则必有空闲机器。
-
超出扩容极限时,可以增加副本数,如设置副本数为2,总共3*3=9个分片。9台机器同时运行,存储和搜索性能更强。容错性更好。
-
容错性:只要一个索引的所有主分片在,集群就就可以运行。
分片自动负载均衡,分片向空闲机器转移。
每个节点存储更少分片,系统资源给与每个分片的资源更多,整体集群性能提高。
扩容极限:节点数大于整体分片数,则必有空闲机器。
超出扩容极限时,可以增加副本数,如设置副本数为2,总共3*3=9个分片。9台机器同时运行,存储和搜索性能更强。容错性更好。
容错性:只要一个索引的所有主分片在,集群就就可以运行。
3个主分片,3个副本分片,两台机器
3个主分片,3分副本分片,3台机器
3个主分片,3分副本分片,6台机器
3个主分片,3分副本分片,7台机器 : 一主一副本的分片形式,第七台机器会置空,所以此时想让此时第7台以上的机器工作,可以增加主分片的副本分片数量
3个主分片,6分副本分片,9台机器 :
5.1.7 es容错机制 master选举,replica容错,数据恢复
以3分片,2副本数,3节点为例介绍。
-
master node宕机,自动master选举,集群为red
-
replica容错:新master将replica提升为primary shard,yellow
-
重启宕机node,master copy replica到该node,使用原有的shard并同步宕机后的修改,green
情景 :
node1宕机,P0shard没有了,所有主分片不是全active,集群状态是red
容错第一步 :
重新选举master节点.承担master相关功能
容错第二部:
1.新master将丢弃的主分片的某个副本分片提升为主分片
2.集群状态为yellow
3.现在缺少副本分片
容错第三步:
1.重启故障node
2.新master会感知到新节点的加入,进行部分f赋值
5.2 存储机制
5.2.1 数据路由
一个文档,最终会落在主分片的一个分片上,到底应该在哪一个分片?这就是数据路由。
路由算法 :
shard = hash(routing) % number_of_primary_shards
哈希值对主分片数取模。
举例:
对一个文档经行crud时,都会带一个路由值 routing number。默认为文档_id(可能是手动指定,也可能是自动生成)。
存储1号文档,经过哈希计算,哈希值为2,此索引有3个主分片,那么计算2%3=2,就算出此文档在P2分片上。
决定一个document在哪个shard上,最重要的一个值就是routing值,默认是_id,也可以手动指定,相同的routing值,每次过来,从hash函数中,产出的hash值一定是相同的
无论hash值是几,无论是什么数字,对number_of_primary_shards求余数,结果一定是在0~number_of_primary_shards-1之间这个范围内的。0,1,2。
手动指定 routing number
PUT /test_index/_doc/15?routing=num
{
"num": 0,
"tags": []
}
场景:在程序中,架构师可以手动指定已有数据的一个属性为路由值,好处是可以定制一类文档数据存储到一个分片中。缺点是设计不好,会造成数据倾斜。
所以,不同文档尽量放到不同的索引中。剩下的事情交给es集群自己处理。
主分片数量不可变
涉及到以往数据的查询搜索,所以一旦建立索引,主分片数不可变。
5.3 文档的增删改内部机制
增删改可以看做update,都是对数据的改动。一个改动请求发送到es集群,经历以下四个步骤:
(1)客户端选择一个node发送请求过去,这个node就是coordinating node(协调节点)
(2)coordinating node,对document进行路由,将请求转发给对应的node(有primary shard)
(3)实际的node上的primary shard处理请求,然后将数据同步到replica node。
(4)coordinating node,如果发现primary node和所有replica node都搞定之后,就返回响应结果给客户端。
5.4 文档的查询内部机制
1、客户端发送请求到任意一个node,成为coordinate node
2、coordinate node对document进行路由,将请求转发到对应的node,此时会使用round-robin随机轮询算法,在primary shard以及其所有replica中随机选择一个,让读请求负载均衡
3、接收请求的node返回汇总document给coordinate node
4、coordinate node返回document给客户端
5、特殊情况:document如果还在建立索引过程中,可能只有primary shard有,任何一个replica shard都没有,此时可能会导致无法读取到document,但是document完成索引建立之后,primary shard和replica shard就都有了。
5.5 复杂数据类型
5.5.1 multivalue field
{ "tags": [ "tag1", "tag2" ]}
建立索引时与string是一样的,数据类型不能混
5.5.2 empty field
基本类型 : null
数组 : [] 或 [null]
5.5.3 object field
PUT /company/_doc/1
{
"address": {
"country": "china",
"province": "guangdong",
"city": "guangzhou"
},
"name": "jack",
"age": 27,
"join_date": "2019-01-01"
}
address:object类型
查询映射
GET /company/_mapping
{
"company" : {
"mappings" : {
"properties" : {
"address" : {
"properties" : {
"city" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"country" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"province" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
},
"age" : {
"type" : "long"
},
"join_date" : {
"type" : "date"
},
"name" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
}
object
{
"address": {
"country": "china",
"province": "guangdong",
"city": "guangzhou"
},
"name": "jack",
"age": 27,
"join_date": "2017-01-01"
}
底层存储格式
{
"name": [jack],
"age": [27],
"join_date": [2017-01-01],
"address.country": [china],
"address.province": [guangdong],
"address.city": [guangzhou]
}
对象数组:
{
"authors": [
{ "age": 26, "name": "Jack White"},
{ "age": 55, "name": "Tom Jones"},
{ "age": 39, "name": "Kitty Smith"}
]
}
存储格式:
{
"authors.age": [26, 55, 39],
"authors.name": [jack, white, tom, jones, kitty, smith]
}
6.索引
6.1 删除索引
为了安全起见,防止恶意删除索引,在配置文件中配置,删除时必须指定索引名:
elasticsearch.yml
action.destructive_requires_name:true
6.2 dynamic mapping
在创建映射的时候可以设置dynamic 属性,如下
true:遇到陌生字段,就进行dynamic mapping
false:新检测到的字段将被忽略。这些字段将不会被索引,因此将无法搜索,但仍将出现在返回点击的源字段中。这些字段不会添加到映射中,必须显式添加新字段。
strict:遇到陌生字段,就报错
我自己觉得应该设置为strict,严格一点,存储结构怎么能随便改变呢.



