事件流是人体中枢神经系统的数字等价物。 它是“永远在线”世界的技术基础,在这个世界中,企业越来越多地由软件定义和自动化,软件的用户更多的是软件。 从技术上讲,事件流是从数据库、传感器、移动设备、云服务和软件应用程序等事件源以事件流的形式实时捕获数据的做法; 持久地存储这些事件流以供以后检索; 实时和回顾性地操作、处理和响应事件流; 并根据需要将事件流路由到不同的目标技术。 因此,事件流可确保数据的连续流动和解释,以便正确的信息在正确的时间出现在正确的位置。
我可以使用事件流做什么?事件流应用于众多行业和组织的各种用例。 它的许多示例包括: 实时处理支付和金融交易,例如在证券交易所、银行和保险中。 实时跟踪和监控汽车、卡车、车队和货物,例如物流和汽车行业。 持续捕获和分析来自 IoT 设备或其他设备(例如工厂和风电场)的传感器数据。 收集客户互动和订单并立即做出反应,例如在零售、酒店和旅游行业以及移动应用程序中。 监测住院病人并预测病情变化,以确保在紧急情况下得到及时治疗。 连接、存储和提供公司不同部门产生的数据。 作为数据平台、事件驱动架构和微服务的基础。
Apache Kafka® 是一个事件流平台。 这意味着什么?Kafka 结合了三个关键功能,因此您可以使用一个久经考验的解决方案来实现端到端的事件流用例:
- 发布(写入)和订阅(读取)事件流,包括从其他系统持续导入/导出数据。
- 根据需要持久可靠地存储事件流。
- 在事件发生时或追溯性地处理事件流。
所有这些功能都是以分布式、高度可扩展、弹性、容错和安全的方式提供的。 Kafka 可以部署在裸机硬件、虚拟机和容器上,也可以部署在本地和云端。 您可以在自行管理 Kafka 环境和使用各种供应商提供的完全托管服务之间进行选择。
Kafka 是一个分布式系统,由通过高性能 TCP 网络协议进行通信的服务器和客户端组成。它可以部署在内部和云环境中的裸机硬件、虚拟机和容器上。服务器:Kafka 作为一个或多个服务器的集群运行,这些服务器可以跨越多个数据中心或云区域。其中一些服务器形成存储层,称为代理。其他服务器运行 Kafka Connect 以将数据作为事件流持续导入和导出,从而将 Kafka 与您现有的系统(例如关系数据库以及其他 Kafka 集群)集成。为了让您实现关键任务用例,Kafka 集群具有高度可扩展性和容错性:如果其中任何一个服务器出现故障,其他服务器将接管它们的工作,以确保连续运行而不会丢失任何数据。客户端:它们允许您编写分布式应用程序和微服务,即使在网络问题或机器故障的情况下,它们也可以并行、大规模和容错方式读取、写入和处理事件流。 Kafka 附带了一些这样的客户端,这些客户端通过 Kafka 社区提供的数十个客户端进行了增强:客户端可用于 Java 和 Scala,包括更高级别的 Kafka Streams 库,用于 Go、Python、C/C++ 以及其他的编程语言和Rest APIs
1. 运行环境配置本文主要参考文档,kafka官方文档:
http://kafka.apache.org/documentation/#quickstart
1.1 下载安装 1.1.1 官网下载http://kafka.apache.org/downloads
选择喜欢的版本下载
1.1.2 上传到服务器,然后解压缩遇到的问题:
解压后按照手册start zookeeper,遇到问题
猜想原因是因为下载的包是source,需要下载的是binary,所以重新下载上传
按照官方文档,下载了Scala 2.13 - kafka_2.13-3.0.0.tgz
后面再上传,解压
tar -xzf kafka_2.13-3.0.0.tgz cd kafka_2.13-3.0.01.2 项目启动 1.2.1 运行zookeeper
cd到kafka的目录下,运行zookeeper
# Start the ZooKeeper service # Note: Soon, ZooKeeper will no longer be required by Apache Kafka. bin/zookeeper-server-start.sh config/zookeeper.properties1.2.2 运行kafka
新开一个窗口,到kafka安装目录,运行指令启动kafka
bin/kafka-server-start.sh config/server.properties
以上步骤都完成后,kafka的运行环境搭建完成,接下来可以进行测试了
2. 创建主题测试Kafka 是一个分布式事件流平台,可让您在多台机器上读取、写入、存储和处理事件(在文档中也称为记录或消息)。 示例事件包括支付交易、来自手机的地理定位更新、运输订单、来自物联网设备或医疗设备的传感器测量等等。 这些事件被组织并存储在主题中。 非常简单,主题类似于文件系统中的文件夹,事件就是该文件夹中的文件。 因此,在编写第一个事件之前,您必须创建一个主题。
2.1 创建topic打开另一个终端会话并运行:
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
但是很神奇,这里报错了,不过报错给出了提示
Missing required argument "[partitions]"
按照报错提示,逐条修改,该加的加,该改的改,最终的,命令更改成了
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
topic创建成功,输入指令查看相关信息:
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:90922.2 向topic写入事件
Kafka 客户端通过网络与 Kafka 代理通信以写入(或读取)事件。 一旦收到,代理将以持久和容错的方式存储事件,只要您需要(甚至永远)。 运行控制台生产者客户端将一些事件写入您的主题。 默认情况下,您输入的每一行都会导致将单独的事件写入主题。
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 This is my first event This is my second message2.3 读取topic中的事件
再打开一个会话窗口,输入指令接收消息
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
如下,可以接收到前面我们写入的事件
您可以随时使用 Ctrl-C 停止消费者客户端。
您可以随意测试:例如,切换回您的生产者终端(上一步)以编写其他事件,并查看事件如何立即显示在您的消费者终端中。 因为事件持久地存储在 Kafka 中,所以它们可以被任意数量的消费者读取任意多次。 您可以通过打开另一个终端会话并再次重新运行上一个命令来轻松验证这一点。
附 小插曲:由于修改了etc/profile,可能是改错了,然后source etc/profile 后系统的指令都不可以使用了,解决方案是引入临时的系统变量,改回profile然后重新载入
export PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin



