栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

go实战之日志收集1

go实战之日志收集1

文章目录
  • 日志收集
  • 一、项目背景
  • 二、解决方案
  • 三、面临问题
  • 四、业界方案
    • 4.1 ELK
    • 4.2 存在问题:
  • 五、架构设计
    • 5.1 组件介绍
    • 5.2 将学到的技能
  • 六、队列消息的通信模型
    • 6.1 点对点模式(queue)
    • 6.2 发布/订阅(topic)
  • 七、Kafka
    • 7.1 介绍
    • 7.2 工作流程
    • 7.2 选择Partition原则
    • 7.3 ACK应答机制
    • 7.4 Topic和数据日志
    • 7.5 Patition结构
    • 7.6 消费数据
    • 7.7 使用场景
      • 7.7.1 消息队列(MQ)
      • 7.7.2 追踪网站活动
      • 7.7.3 Metrics
      • 7.7.4 日志聚合
      • 7.7.5 下载安装
  • 8 ZooKeeper
    • 8.1 下载安装
    • 8.2 配置
    • 8.3 启动
  • 9 准备kafka和zookeeper环境
    • 9.1 配置
    • 9.2 启动
    • 9.3 发送消息到kafka
    • 9.4 tailf包使用

日志收集 一、项目背景

每个业务系统都有日志,当系统出现问题时,需要通过日志信息定位和解决问题。当系统机器规模很小时,登陆到服务器上查看即可满足,当系统机器规模很大时,登陆到机器上查看几乎不可能(分布式系统,一个系统部署在十几台机器上)

二、解决方案

把机器上的日志实时收集,存储到一个中心系统。再对这些日志建立索引,通过搜索可快速找到对应的日志,通过提供一个界面友好的web页面实现日志检索和展示。

三、面临问题

实时日志量非常大,每天处理几十亿条。日志准实时收集,延迟控制在分钟级别,能够支持水平扩展。

四、业界方案 4.1 ELK

4.2 存在问题:
  • 运维成本高,每增加一个日志收集项,都需要手动修改配置
  • 监控缺失,无法准确获取Logstash状态
  • 无法做到定制化开发和维护
五、架构设计

5.1 组件介绍
  • LogAgent:日志收集客户端,用来收集服务器上的日志
  • Kafka:高吞吐量的分布式队列(linkin开发,apache顶级开源项目)
  • ElasticSearch:开源的搜索引擎,提供介于HTTP RESTful的web接口
  • Kibana:开源的ES数据分析和可视化工具。
  • Hadoop:分布式计算框架,能够对大量数据进行分布式处理的平台。
  • Storm:一个免费并开源的分布式实时计算系统
5.2 将学到的技能
  • 服务端agent的开发
  • 后端服务组件的开发
  • Kafka和zookeeper的使用
  • ES和Kibana的使用
  • etcd的使用
六、队列消息的通信模型 6.1 点对点模式(queue)

消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息,一条消息被消费以后,queue中就没有了,不存在重复消费

6.2 发布/订阅(topic)

消息生产者(发布)将消息发送到topic中,同时有多个消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费(类似关注了微信公众号的人都能收到推送的文章)。

补充:发布订阅模式下,当发布者消息量很大时,显然单个订阅者的处理能力时不足的。实际上现实场景中是多个订阅者节点组成一个订阅组负载均衡消费topic消息即分组订阅,这样订阅者容易实现消费能力线性扩展。可以看成是一个topic下有多个queue,每个queue是点对点的方式,queue之间是发布订阅方式。

七、Kafka 7.1 介绍

Apache Kafka由著名职业社交公司linkedin开发,最初是被设计用来解决linkedin公司内部海量日志传输等问题。Kafka使用Scala语言编写,与2011年开源并进入Apache孵化器,2012年10月正式毕业,现为Apache顶级项目。

Kafka是一个分布式数据流平台,可以运行在单台服务器上,也可以在多台服务器上部署成集群。它提供了发布和订阅功能,使用者可以发送数据到Kafka中,也可以从Kafka中读取数据(以便进行后续处理)。Kafka具有高吞吐、低延迟、高容错等特点。


  • Producer:生产者,消息的产生者,消息的入口
  • kafka cluster:kafka集群,一台或多台服务器组成
    1. Broker:指部署了kafka实例的服务器节点。每个服务器上有一个或多个Kafka实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0,broker-1等。。
    2. Topic:消息的主题。可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以建多个topic。实际应用中通常是一个业务线建一个topic。
    3. Partition:同一个topic在不同的分区的数据是不重复的。partition的表现形式就是一个一个的文件夹!
    4. Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10,且副本的数量不能大于broker的数量,follower和leader绝对是在不同的及其,同一及其对同一分区也只能存放一个副本(包括自己)。
  • Consumer:消费者,即消息的消费方,是消息的出口。
    • Consumer Group:多个消费者组成一个消费者组,在kafka设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组可以消费同一个Topic的不同分区的数据,这也是为了提高kafka的吞吐量。
7.2 工作流程

  1. 生产者从kafka集群获取分区Leader的消息
  2. 生产者将消息发送给Leader
  3. Leader将信息写入本地磁盘
  4. Follower从Leader中拉取消息
  5. Follower将消息写入本地磁盘并向Leader发送ACK
  6. Leader收到所有Follower的ACK后向生产者发送ACK
7.2 选择Partition原则

在kafka中,如果某个topic有多个partition,那么producer如何知道该将数据发送给哪个partition?

  1. 可以指定需要写入的partition
  2. 如果没有指定,可以设置数据的key,根据key的值Hash出一个partition
  3. 如果既没有指定也没有key,那么会采用轮询的方式,即每次取一小段时间的数据写入partition,下一时间段写入下一个partition
7.3 ACK应答机制

Producer在向kafka写入数据时,可以设置参数来确定是否确认kafka接收到数据,这个参数可以设置为0,1,all。

  • 0:代表producer往集群发送数据时,不需要等到集群的返回,不确保消息发送成功。安全性最低但效率最高。
  • 1:代表Producer往集群发送数据时,只需等到Leader应答即可发送下一条。
  • all:代表所有follower都完成从Leader的同步才发送下一条。确保leader和follower都完成备份。安全性最高但效率最低。

最后要注意的是,如果往不存在的topic中写入数据,kafka会自动创建topic,partition和replication的数量默认配置都是1。

7.4 Topic和数据日志


每个partition都是一个有序并且不可变的消息集合。当新消息写入时, 就被追加到partition的末尾。在每个partition中,每条消息都会被分配一个顺序的唯一标识,这个标识被称为offset,即偏移量。注意,kafka只保证在同一个partition内部消息是有序的,在不同的partition之间,并不能保证消息的有序。
kafka可以配置一个保存期限,用来标识日志在kafka集群中保留多长时间,kafka集群会保留在保留期限内所有被发布的消息,不管这些消息是否被消费过。比如设置保留两天,那么数据发布到kafka集群中两天以内,这些消息都可以被消费,超过两天后,这些数据将会被清空,以便给后续数据腾出空间。由于Kafka会将数据持久化存储在磁盘上,所以保留的数据大小可以设置为一个比较大的值。

7.5 Patition结构

Partition在服务器上的表现形式就是一个个文件夹;每个文件夹下有多组Segment文件,每组Segment文件中包好.index文件、.log文件和.timeindex文件。

  • log文件:实际存储message的地方
  • index文件和timeindex文件:索引文件,用于检索信息。
7.6 消费数据
  • 多个消费者可以组成消费者组,并用一个标签来标识这个消费者组。一个消费者组中的不同消费者可以运行在不同进程甚至不同机器上
  • 如果多有消费者在同一个消费者组,那么消息记录可以被很好的均衡的发送到每个消费者实例
  • 如果所有消费者在不同的消费者组,那么消息记录会被广播到每个消费者实例。

举个例子,如上图所示,一个有两个节点的kafka集群,拥有一个由四个partition组成的topic,有两个消费者组在消费这个topic的消息。消费者组A有两个消费者实例,消费者组B有四个消费者实例。从图中我们可以看到:

  1. 在同一消费者组中,每个消费者实例可以消费多个分区,但每个分区只能被消费者组中的一个实例消费。
  2. 如果在消费者组中动态的上线和下线消费者,kafka集群会自动调整分区与消费者实例之间的对应关系。
7.7 使用场景 7.7.1 消息队列(MQ)

在系统架构设计中,经常会使用到消息队列(Message Queue)。MQ是一种跨进程的通信机制,用于上下游的消息传递,使用MQ可以使上下游解耦,消息发送上游只需要依赖MQ,逻辑上和物理上都不需要依赖其他下有服务。MQ的常见使用场景如流量削峰,数据驱动的任务依赖等。在MQ领域,除了Kafka还有传统的消息队列ActiveMQ和RabbitMQ等。

7.7.2 追踪网站活动

Kafka最初就是被设计用来进行网站活动(比如PV, UV, 搜索记录等)的追踪。可以将不同的活动放入不同的Topic,供后续的计算,实时监控等程序的应用,也可以将数据导入到数据仓库进行后续的离线处理和生成报表等。

7.7.3 Metrics

Kafka经常被用来传输监控数据。主要用来聚合分布式应用程序的统计数据,将数据集中后统一分析和展示等。

7.7.4 日志聚合

日志聚合通常指将不同服务器上的日志收集起来,并放入一个日志中心,比如一台文件服务器或HDFS的一个目录,供后续分析处理。相比与Flume和Scribe等日志聚合工具,Kafka具有更出色的性能。

7.7.5 下载安装

下载地址:https://kafka.apache.org/downloads
下载: kafka_2.12-2.3.0.tgz
将压缩包解压到本地即可。

8 ZooKeeper

ZooKeeper是一种分布式协调服务,用于管理大型主机。在分布式环境中协调和管理服务是一个复杂的过程。ZooKeeper通过其简单的架构和API解决了这个问题。 ZooKeeper允许开发人员专注于核心应用程序逻辑,而不必担心应用程序的分布式特性。

ZooKeeper框架最初是在“Yahoo!”上构建的,用于以简单而稳健的方式访问他们的应用程序。后来,Apache ZooKeeper成为Hadoop,Hbase和其他分布式框架使用的有组织服务的标准

  • 自动注册发现

  • 分布式锁

8.1 下载安装

地址:https://downloads.apache.org/zookeeper/

解压即可,无需安装。

8.2 配置
  1. 将confzoo_sample.cfg复制一份,并重命名为zoo.cfg
  2. 打开上一步得到的zoo.cfg文件,修改dataDir=E:softwaresapache-zookeeper-3.6.3-bindata(保存数据的地址)
8.3 启动

在Zookeeper目录中执行E:softwaresapache-zookeeper-3.6.3-binbinzkServer.sh

9 准备kafka和zookeeper环境 9.1 配置
  1. 修改kafka_2.12-2.3.0configserver.properties中的(日志保存地址)
    # A comma separated list of directories under which to store log files
    log.dirs=E:/文件/Go/src/github.com/logCollector/kafka_logs
    
  2. 修改kafka_2.12-2.3.0configzookeeper.properties中的(zookeepers数据保存位置)
    # the directory where the snapshot is stored.
    dataDir=E:/文件/Go/src/github.com/logCollector/zkdatas
    
9.2 启动

先启动zookeeper,后启动kafka

  1. binwindowszookeeper-server-start.bat configzookeeper.properties
  2. binwindowskafka-server-start.bat configserver.properties
9.3 发送消息到kafka

下载:go get github.com/Shopify/sarama
如果是windows,要使用github.com/Shopify/sarama v1.19之前的(包括1.19)

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
)

func main() {

	// 1. 生产者配置
	config := sarama.NewConfig()	// 获得config对象
	config.Producer.RequiredAcks = sarama.WaitForAll	// ACK机制
	config.Producer.Partitioner = sarama.NewRandomPartitioner	// 随机分区
	config.Producer.Return.Successes = true		// 确认

	// 2. 连接kafka
	client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
	if err != nil{
		fmt.Println("Producer closed, err: ", err)
		return
	}
	defer client.Close()

	// 3. 封装消息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "shopping"
	msg.Value = sarama.StringEncoder("双十一大促销")

	// 4. 发送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil{
		fmt.Println("Send message failed, err: ", err)
		return
	}
	fmt.Printf("pid: %v  offset: %vn", pid, offset)
}
9.4 tailf包使用

功能:从kafka中读取文件
下载:go get github.com/hpcloud/tail

package main

import (
	"fmt"
	"github.com/hpcloud/tail"
	"time"
)

func main() {
	filename := "./xx.log"
	config := tail.Config{
		Location: &tail.SeekInfo{
			Offset: 0,
			Whence: 2,
		},
		ReOpen:      true,
		MustExist:   false,
		Poll:        true,
		Follow:      true,
	}

	// 打开文件读取数据
	tails, err := tail.TailFile(filename, config)
	if err != nil{
		fmt.Printf("Tail %s failed, err %vn", filename, err)
		return
	}

	// 开始读数据
	var (
		msg *tail.Line
		ok bool
	)
	for {
		msg, ok =  <- tails.Lines
		if !ok{
			fmt.Printf("tail file close reopen, filename:%s", filename)
			time.Sleep(time.Second)
			continue
		}
		fmt.Println("msg:", msg.Text)
	}
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/671230.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号