本文日志收集流程:
- k8s 使用 daemonset 创建 filebeat 服务;
- filebeat 发送日志到 kafka;
- logstash 收集 kafka 接收到的日志;
- logstash 发送日志到 ES;
- kibana 从 ES 提取日志并展示。
其中除filebeat是搭建在k8s中外,其他组件均使用宿主机或docker搭建
一、部署kafka
参考文章:Dokcer 搭建 kafka_我的喵叫初六的博客-CSDN博客pull镜像docker pull wurstmeister/zookeeperdocker pull wurstmeister/kafka安装zookeeper我这里做了资源限制docker run -d --name zookeeper -p 2181:2181 --memory=1024m --cpus=1 -t wurstmeister/zookeeper安装kafkadocker run -d --name kafka -p 9092https://blog.csdn.net/weixin_38367535/article/details/121103419
二、部署logstash、ES、kibana
参考文章:ELK + Filebeat 7.13_我的喵叫初六的博客-CSDN博客之前编写过低版本的ELK日志收集,最近换公司,部署了一套单节点7.13版本ELK,7.13版本较之前6.x版本还是有很大区别的。一、环境ELK部署在同一台Centos7.9服务器上Filebeat用于收集nginx日志java日志使用的是阿里云NAS共享存储,所以直接将日志目录挂载到了ELK服务器上JDK 1.8二、下载安装1、下载# elasticsearchwget https://artifacts.elastic.co/downloads/elasti..https://blog.csdn.net/weixin_38367535/article/details/119183688
三、部署filebeat
下载 filebeat yaml 文件
curl -L -O https://raw.githubusercontent.com/elastic/beats/master/deploy/kubernetes/filebeat-kubernetes.yaml
文件内容:
# cat filebeat-kubernetes.yaml
---
apiVersion: v1
kind: ConfigMap
metadata:
name: filebeat-config
namespace: kube-system
labels:
k8s-app: filebeat
data:
filebeat.yml: |-
filebeat.inputs:
- type: container
paths:
- /var/log/containers/*.log
processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: "/var/log/containers/"
# To enable hints based autodiscover, remove `filebeat.inputs` configuration and uncomment this:
#filebeat.autodiscover:
# providers:
# - type: kubernetes
# node: ${NODE_NAME}
# hints.enabled: true
# hints.default_config:
# type: container
# paths:
# - /var/log/containers/*${data.kubernetes.container.id}.log
processors:
- add_cloud_metadata:
- add_host_metadata:
cloud.id: ${ELASTIC_CLOUD_ID}
cloud.auth: ${ELASTIC_CLOUD_AUTH}
output.kafka:
enabled: true
hosts: ["172.16.105.148:9092"]
topic: 'k8s-uat-log'
max_message_bytes: 5242880
partition.round_robin:
reachable_only: true
keep-alive: 120
required_acks: 1
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: filebeat
namespace: kube-system
labels:
k8s-app: filebeat
spec:
selector:
matchLabels:
k8s-app: filebeat
template:
metadata:
labels:
k8s-app: filebeat
spec:
serviceAccountName: filebeat
terminationGracePeriodSeconds: 30
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
containers:
- name: filebeat
image: docker.elastic.co/beats/filebeat:7.4.2
args: [
"-c", "/etc/filebeat.yml",
"-e",
]
env:
- name: ELASTICSEARCH_HOST
value: elasticsearch
- name: ELASTICSEARCH_PORT
value: "9200"
- name: ELASTICSEARCH_USERNAME
value: elastic
- name: ELASTICSEARCH_PASSWORD
value: changeme
- name: ELASTIC_CLOUD_ID
value:
- name: ELASTIC_CLOUD_AUTH
value:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
securityContext:
runAsUser: 0
# If using Red Hat OpenShift uncomment this:
#privileged: true
resources:
limits:
memory: 200Mi
requests:
cpu: 100m
memory: 100Mi
volumeMounts:
- name: config
mountPath: /etc/filebeat.yml
readOnly: true
subPath: filebeat.yml
- name: data
mountPath: /usr/share/filebeat/data
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: varlog
mountPath: /var/log
readOnly: true
volumes:
- name: config
configMap:
defaultMode: 0640
name: filebeat-config
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: varlog
hostPath:
path: /var/log
# data folder stores a registry of read status for all files, so we don't send everything again on a Filebeat pod restart
- name: data
hostPath:
# When filebeat runs as non-root user, this directory needs to be writable by group (g+w).
path: /var/lib/filebeat-data
type: DirectoryOrCreate
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: filebeat
subjects:
- kind: ServiceAccount
name: filebeat
namespace: kube-system
roleRef:
kind: ClusterRole
name: filebeat
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: filebeat
namespace: kube-system
subjects:
- kind: ServiceAccount
name: filebeat
namespace: kube-system
roleRef:
kind: Role
name: filebeat
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: filebeat-kubeadm-config
namespace: kube-system
subjects:
- kind: ServiceAccount
name: filebeat
namespace: kube-system
roleRef:
kind: Role
name: filebeat-kubeadm-config
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: filebeat
labels:
k8s-app: filebeat
rules:
- apiGroups: [""] # "" indicates the core API group
resources:
- namespaces
- pods
- nodes
verbs:
- get
- watch
- list
- apiGroups: ["apps"]
resources:
- replicasets
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: filebeat
# should be the namespace where filebeat is running
namespace: kube-system
labels:
k8s-app: filebeat
rules:
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs: ["get", "create", "update"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: filebeat-kubeadm-config
namespace: kube-system
labels:
k8s-app: filebeat
rules:
- apiGroups: [""]
resources:
- configmaps
resourceNames:
- kubeadm-config
verbs: ["get"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: filebeat
namespace: kube-system
labels:
k8s-app: filebeat
---
文件并未做过多修改,主要有镜像版本,我拉取文件的时候,文件中的filebeat版本是8.0,拉取失败了所以改成了7.4.2版本
pod cpu处建议做个限制,否则发生错误可能会占用很大cpu
另外就是删除了output部分,删除了原文的输出到ES,改为kafka
output.kafka:
enabled: true
hosts: ["172.16.105.148:9092"]
topic: 'k8s-uat-log'
max_message_bytes: 5242880
partition.round_robin:
reachable_only: true
keep-alive: 120
required_acks: 1
注意这里的topci必须提前在kafka中创建一下。
启动pod: kubectl apply -f filebeat-kubernetes.yaml
pod状态正常后查看日志,出现2021-11-03T03:47:39.954Z INFO [monitoring] log/log.go:145 Non-zero metrics in the last 30s {"monitoring":xxxxxxxxx等字样表示已经发送给kafka了
这时我们去kafka查看一下
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic k8s-uat-log --from-beginning
这段命令在kafka服务器上执行
出现快速滚动的日志信息即表示已经接收到来自filebeat的日志了,记得快点ctrl c。。。
四、配置logstash
cat config/k8s-uat-log.conf
input {
kafka {
bootstrap_servers => ["172.16.105.148:9092"]
client_id => "k8s-uat-log1"
group_id => "host_log"
auto_offset_reset => "latest"
consumer_threads => 5
decorate_events => true
topics => ["k8s-uat-log"]
type => "k8s-uat-log"
codec => json
}
}
filter {
mutate {
remove_field => ["@metadata","ecs","stream","input","log","pod-template-hash","uid","architecture","containerized","hostname","os","agent"] # 删除字段
}
}
output {
if [type] == "k8s-uat-log" {
elasticsearch{
hosts => ["127.0.0.1:9200"]
index => "k8s-uat-log-%{+YYYY.MM.dd}"
timeout => 300
}
}
}
input处指定kafka
client_id 自定义写
group_id 这个用来做logstash负载用的,比如你想多启动一个logstash收集同样的这个日志,那么group_id必须一致,client_id不同
多台logstash实例消费同一个topics时,需要保证kafka的分区不能只有一个,logstash的实例数不能大于kafka的分区数。
auto_offset_reset => "latest" 从最后提交的offset开始消费,避免logstash重启后从头消费,浪费资源数据重复
topics 指定 kafka topic
type 用于output部分匹配,多个input可以使用不同的type,达到output区分发送
codec 使用json格式
filter处使用了删除字段配置,这里你可以去掉,看自己需要啥就留什么
启动logstash
logstash日志出现to the committed offset FetchPosition{offset=104940字样表示启动成功,这里的offset就是上面 auto_offset_reset => "latest" 配置相关的字段
五、查看结果
打开kibana添加索引输入k8s-uat能出现自动匹配的日期索引就表示logstash已将日志发送到ES中了,配置好索引后日志收集工作就全部完成了。



