下载地址:Apache Kafka
#创建应用目录 mkdir -p /storage
# wget -P /storage https://archive.apache.org/dist/kafka/2.3.0/kafka_2.11-2.3.0.tgz
#解压kafka应用包 tar -zxf kafka_2.11-2.3.0.tgz 修改目录名 mv kafka_2.11-2.3.0 kafka
根据实际情况创建需要的目录
修改配置文件:cd /storage/kafka/config
[root@test config]# cat zookeeper.properties
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/storage/zookeeper/data/zookeeper
dataLogDir=/storage/zookeeper/logs/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
#maxClientCnxns=0
#为zk的基本时间单元,毫秒
tickTime=2000
#Leader-Follower初始通信时限 tickTime*10
initLimit=10
#Leader-Follower同步通信时限 tickTime*5
syncLimit=5
#设置broker Id的服务地址
server.1=192.168.19.128:2888:3888
根据server.id,创建zookeeper的myid
echo 1 > /storage/kafka/data/zookeeper/myid
[root@test config]# cat server.properties
# broker 的全局唯一编号,不能重复,每台机器递增设置
broker.id=1
#服务器监听地址
listeners=SASL_PLAINTEXT://192.168.19.128:9092,client2://192.168.19.128:9093
#外部访问监听地址,根据服务器监听地址映射端口
advertised.listeners=SASL_PLAINTEXT://192.168.19.128:9092,client2://192.168.19.128:9093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,client2:SASL_PLAINTEXT,client3:SASL_PLAINTEXT
# 使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。
#allow.everyone.if.no.acl.found=true
super.users=User:admin
# 处理网络请求的线程数量,默认
num.network.threads=3
# 用来处理磁盘IO的线程数量,默认
num.io.threads=8
# 发送,接受, 请求套接字的缓冲区大小,默认
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# kafka 运行日志存放路径
log.dirs=/storage/kafka/logs/kafka
# 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
num.partitions=1
# 用来恢复和清理data下数据的线程数量,默认
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# segment文件保留的最长时间,超时将被删除,默认
log.retention.hours=168
# 滚动生成新的segment文件的最大时间,默认
log.roll.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#服务zookeeper连接地址
zookeeper.connect=192.168.19.128:2181
zookeeper.connection.timeout.ms=6000
#group.initial.rebalance.delay.ms=0
添加用户密码认证服务端与客户端
[root@test config]# cat kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="test"
password="Rmm@138,ab";
};
[root@test config]# cat kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="Rmm@138,ab"
user_admin="admin-user"
user_yjk="Rmm@138,ab";
};
在KafkaServer部分,username和password是broker用于初始化连接到其他的broker,在上面配
置中,admin用户为broker间的通讯,user_userName定义了所有连接到broker和broker验证的
所有的客户端连接包括其他broker的用户密码,user_userName必须配置admin用户,否则报
错。
username和password用于内部通讯的帐密
admin用于内部通讯,alice设置自己的登录帐密,提供给连接者
在KafkaClient部分,username和password是客户端用来配置客户端连接broker的用户,在上面配置
中,客户端使用admin用户连接到broker。
consuer和producer的配置文件consumer.properties和producer.properties
[root@test config]# cat consumer.properties |egrep -v "^$|^#"
bootstrap.servers=localhost:9092
group.id=test-consumer-group
#新增
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
[root@test config]# cat consumer.properties |egrep -v "^$|^#"
bootstrap.servers=localhost:9092
group.id=test-consumer-group
#新增
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
[root@test config]# cat producer.properties |grep -vE "^$|^#"
bootstrap.servers=localhost:9092
compression.type=none
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
修改kafka启动脚本以下文件中增加:
kafka-console-consumer.sh
kafka-console-producer.sh
kafka-server-start.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=/storage/kafka/config/kafka_client_jaas.conf"
启动脚本:cd /storage/kafka/bin
./zookeeper-server-start.sh /storage/kafka/config/zookeeper.properties &
./kafka-server-start.sh /storage/kafka/config/server.properties &
先启动zookeeper,再启kafka。停止则相反,先停kafka再停zookeeper
[root@test shell]# netstat -nulpt|grep java
tcp6 0 0 :::44725 :::* LISTEN 6741/java
tcp6 0 0 :::42875 :::* LISTEN 6433/java
tcp6 0 0 192.168.19.128:9092 :::* LISTEN 6741/java
tcp6 0 0 192.168.19.128:9093 :::* LISTEN 6741/java
tcp6 0 0 :::2181 :::* LISTEN 6433/java
启动生产者
./kafka-console-producer.sh --broker-list 192.168.19.128:9092 --topic test --producer.config ../config/producer.properties
启动消费者
./kafka-console-consumer.sh --bootstrap-server 192.168.19.128:9092 --topic test --from-beginning --consumer.config ../config/consumer.properties
创建topic
[root@test bin]# ./kafka-topics.sh --create --zookeeper 192.168.19.128:2181 --replication-factor 1 --partitions 1 --topic hello
Created topic hello.
查看topic list
[root@test bin]# ./kafka-topics.sh --list --zookeeper 192.168.19.128:2181
__consumer_offsets
hello
test
查看分片情况
[root@test bin]# ./kafka-topics.sh --describe --zookeeper 192.168.19.128:2181
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:1 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
Topic: __consumer_offsets Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic:hello PartitionCount:1 ReplicationFactor:1 Configs:
Topic: hello Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
对应topic 存储数据
启动生产者:
sh kafka-console-producer.sh --broker-list 192.168.19.153:9092 --topic cloud
查看消费者:
sh kafka-console-consumer.sh --bootstrap-server 192.168.19.153:9092 --from-beginning --topic cloud
集群:
生产
sh kafka-console-producer.sh --broker-list 192.168.19.153:9092,192.168.19.142:9092,192.168.19.151:9092 --topic newcloud
消费
sh kafka-console-consumer.sh --bootstrap-server 192.168.19.153:9092,192.168.19.142:9092,192.168.19.151:9092 --topic newcloud --from-beginning
最新命令示例
创建topic
sh kafka-topics.sh --create --bootstrap-server 192.168.19.153:9092 --replication-factor 1 --partitions 1 --topic cloud
查看所有topic
sh kafka-topics.sh --bootstrap-server 192.168.19.153:9092 --list
查看指定topic详情
sh kafka-topics.sh --bootstrap-server 192.168.19.153:9092 --describe cloud



