Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展能力……
分布式系统,易于向外扩展;同时为发布和订阅提供高吞吐量;支持多订阅者,当失败时能自动平衡消费者;将消息持久化到磁盘,可用于批量消费;
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hBSMojUf-1645255556442)(C:UsersluffyDesktopluffynote大数据申请书知识点.assetsimage-20210829191209280.png)]
Producer:Producer即生产者,消息的产生者,是消息的入口。
kafka cluster:
Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……
Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!
Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
Message:每一条发送的消息主体。
Consumer:消费者,即消息的消费方,是消息的出口。
Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!
Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性
2. 基于Ubuntu18.04下Kafka的安装和部署 2.1 安装Java1.安装openjdk-8-jdk
sudo apt-get update sudo apt-get install openjdk-8-jdk
2.查看java版本
java -version
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DhEcyUfk-1645255556443)(C:UsersluffyDesktopluffynote大数据申请书知识点.assetsimage-20210829193511128.png)]
2.2 安装Zookeeper1.从 https://zookeeper.apache.org/releases.html 下载ZooKeeper目前最新的稳定版本,当前使用3.6.3版本
2.解压apache-zookeeper-3.6.3
tar -xzvf apache-zookeeper-3.6.3.tar.gz
3.要将zookeeper运行起来,需要将样例配置zoo_sample.cfg重命名为zoo.cfg,打开可以看到一些默认配置
cd apache-zookeeper-3.6.3/conf/ mv zoo_sample.cfg zoo.cfg cat zoo.cfg
tickTime :
时长单位为毫秒,为zk使用的基本时间度量单位。例如,1 * tickTime是客户端与zk服务端的心跳时间,2 * tickTime是客户端会话的超时时间。
tickTime的默认值为2000毫秒,更低的tickTime值可以更快地发现超时问题,但也会导致更高的网络流量(心跳消息)和更高的CPU使用率(会话的跟踪处理)。clientPort :
zk服务进程监听的TCP端口,默认情况下,服务端会监听2181端口。dataDir :
无默认配置,必须配置,用于配置存储快照文件的目录。如果没有配置dataLogDir,那么事务日志也会存储在此目录。server:zookeeper服务同讯配置
4.修改zoo.cfg配置
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/bigdata/zookeeper/zookeeperData dataDir=/bigdata/zookeeper/zookeeperLog # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 ## Metrics Providers # # https://prometheus.io Metrics Exporter #metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider #metricsProvider.httpPort=7000 #metricsProvider.exportJvmInfo=true server.1=localhost:2888:3888
2.3 安装Kafka重点:
dataDir=/bigdata/zookeeper/zookeeperData
dataDir=/bigdata/zookeeper/zookeeperLogserver.1=localhost:2888:3888
1.下载地址:https://kafka.apache.org/downloads,ubuntu下可以直接使用wget下载
wget https://artfiles.org/apache.org/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3.tar.gz
2.解压apache-zookeeper-3.6.3.tar.gz
tar -zxvf apache-zookeeper-3.6.3.tar.gz
3.在自己的kafka目录下创建日志目录
cd kafka/ mkdir logs-1
4.进入解压的kafka目录,修改kafka-server的配置文件
vim config/server.properties
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR ConDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=1 ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 listeners=PLAINTEXT://localhost:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://localhost:9092 # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 # The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma separated list of directories under which to store log files log.dirs=/bigdata/kafka/logs-1 # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1
重点:
listeners=PLAINTEXT://localhost:9092
log.dirs=/bigdata/kafka/logs-1
5.启动zookeeper
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
查看是否启动成功
ps -ef | grep zoo
6.启动Kafka服务,使用kafka-server-start.sh,启动kafka服务
./bin/kafka-server-start.sh config/server.properties
7.创建topic,使用 kafka-topics.sh 创建单分区单副本的 topic test
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看topic列表
./bin/kafka-topics.sh --list --zookeeper localhost:2181
8.创建消息产生者,产生消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
9.创建消息消费者,消费消息
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
10.测试在生产消息的窗口输入内容,在消费窗口就能打印出来
11.查看topic消息
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test3. 基于Netty到Kafka的GPS数据采集系统架构介绍 3.1 GPS数据采集
GPS设备每隔一定时间发送数据给采集端,采集端通过netty接收并解析数据后发送给kafka,服务端通过topic批量消费kafka数据,计算分析数据后分发到数据库。
3.1.1 Netty的Reactor线程模型 目前高性能网络通信服务大多是基于 epoll 机制和多线程模型组合的实现。而 Netty 可依据用户自定义的程序启动参数调整其运行期间的线程模型。etty 官方推荐使用主从 Reactor 多线程模型。其主要特点是拥有多个线程池,其中主线程池是处理新的客户端连接,处理完新连接后将新建的Socket 绑定到从线程池中的某个线程中;从线程池将负责后续对这个 Socket 的读写、编解码、业务处理工作。设计主从 Reactor 多线程模型的目的是将监听端口服务与处理数据功能剥离开来,从而提高处理数据的能力。在实际应用中,Netty 支持添加多个从线程池,可按照业务特性将不同的业务分配到不同的从线程池处理,或若干个特性相似的业务分配到同一个从线程池。
3.1.2 Kafka流式消息处理系统 Kafka 的 commit log 队列是 Kafka 消息队列概念的具体实现。生产者向 commit log 队列中发送流式消息,其他消费者可以在毫秒级延时处理这些日志的最新信息。每个数据消费者在 commit log 中有一个自己的指针,并独立移动,从而促使消费者们在分布式环境下能可靠、顺序的处理队列中的消息。commit log 可以被多个生产者和消费者所共享,并覆盖集群中的多台机器,为集群中机器提供容错保障。Kafka 作为一个现代的分布式系统还可以便捷地水平扩张和缩小。此外,Kafka 的消息代理(broker)能支持 TB 级消息的持久化。
3.2 系统架构设计 采集终端由GPS设备配合区域性的采集服务器组成。消息收集端暴露 IP 地址和端口,供采集终端连接,当有新的 TCP 连接或者新的消息发送时,都将触发消息收集端的网络通信处理程序。对于需要进一步处理的消息将由消息收集端通过异步方式推送到 Kafka 集群中。之后,再由不同的 Kafka consumer 进程按照不同的业务需求来处理被推送到 Kafka 集群中的消息。这些消息或被持久化到数据库,或进行其它实时计算。此外,Zookeeper 用来监测 Kafka 集群的运行状态,协调管理 Kafka 集群;同时 Zookeeper 还可预留作为协调管理收集端服务水平扩展业务的服务软件。
4. 交通大数据平台前端展示 4. Flink


