栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

kafka 安装配置

kafka 安装配置

下载地址:Apache Kafka

#创建应用目录 mkdir -p /storage
# wget -P /storage https://archive.apache.org/dist/kafka/2.3.0/kafka_2.11-2.3.0.tgz
#解压kafka应用包 tar -zxf kafka_2.11-2.3.0.tgz 修改目录名 mv kafka_2.11-2.3.0 kafka
根据实际情况创建需要的目录

修改配置文件:cd /storage/kafka/config

[root@test config]# cat zookeeper.properties
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/storage/zookeeper/data/zookeeper
dataLogDir=/storage/zookeeper/logs/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
#maxClientCnxns=0
#为zk的基本时间单元,毫秒
tickTime=2000
#Leader-Follower初始通信时限 tickTime*10
initLimit=10
#Leader-Follower同步通信时限 tickTime*5
syncLimit=5

#设置broker Id的服务地址
server.1=192.168.19.128:2888:3888


根据server.id,创建zookeeper的myid
echo 1 > /storage/kafka/data/zookeeper/myid

[root@test config]# cat server.properties
# broker 的全局唯一编号,不能重复,每台机器递增设置
broker.id=1
#服务器监听地址
listeners=SASL_PLAINTEXT://192.168.19.128:9092,client2://192.168.19.128:9093
#外部访问监听地址,根据服务器监听地址映射端口
advertised.listeners=SASL_PLAINTEXT://192.168.19.128:9092,client2://192.168.19.128:9093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,client2:SASL_PLAINTEXT,client3:SASL_PLAINTEXT

# 使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT 

#SASL机制 
sasl.enabled.mechanisms=PLAIN 
sasl.mechanism.inter.broker.protocol=PLAIN 

# 如果没有找到ACL(访问控制列表)配置,则允许任何操作。 
#allow.everyone.if.no.acl.found=true 
super.users=User:admin

# 处理网络请求的线程数量,默认
num.network.threads=3

# 用来处理磁盘IO的线程数量,默认
num.io.threads=8


# 发送,接受, 请求套接字的缓冲区大小,默认
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# kafka 运行日志存放路径
log.dirs=/storage/kafka/logs/kafka
# 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
num.partitions=1
# 用来恢复和清理data下数据的线程数量,默认
num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# segment文件保留的最长时间,超时将被删除,默认

log.retention.hours=168
# 滚动生成新的segment文件的最大时间,默认
log.roll.hours=168

log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#服务zookeeper连接地址
zookeeper.connect=192.168.19.128:2181
zookeeper.connection.timeout.ms=6000
#group.initial.rebalance.delay.ms=0
添加用户密码认证服务端与客户端

[root@test config]# cat kafka_client_jaas.conf 
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required 
username="test" 
password="Rmm@138,ab"; 
};
[root@test config]# cat kafka_server_jaas.conf 
KafkaServer { 
org.apache.kafka.common.security.plain.PlainLoginModule required 
username="admin" 
password="Rmm@138,ab" 
user_admin="admin-user" 
user_yjk="Rmm@138,ab"; 
};
在KafkaServer部分,username和password是broker用于初始化连接到其他的broker,在上面配
置中,admin用户为broker间的通讯,user_userName定义了所有连接到broker和broker验证的
所有的客户端连接包括其他broker的用户密码,user_userName必须配置admin用户,否则报
错。
username和password用于内部通讯的帐密
admin用于内部通讯,alice设置自己的登录帐密,提供给连接者

在KafkaClient部分,username和password是客户端用来配置客户端连接broker的用户,在上面配置
中,客户端使用admin用户连接到broker。

consuer和producer的配置文件consumer.properties和producer.properties

[root@test config]# cat consumer.properties |egrep -v "^$|^#"
bootstrap.servers=localhost:9092
group.id=test-consumer-group
#新增
security.protocol=SASL_PLAINTEXT 
sasl.mechanism=PLAIN

[root@test config]# cat consumer.properties |egrep -v "^$|^#"
bootstrap.servers=localhost:9092
group.id=test-consumer-group
#新增
security.protocol=SASL_PLAINTEXT 
sasl.mechanism=PLAIN
[root@test config]# cat producer.properties |grep -vE "^$|^#"
bootstrap.servers=localhost:9092
compression.type=none
security.protocol=SASL_PLAINTEXT 
sasl.mechanism=PLAIN

修改kafka启动脚本以下文件中增加:

kafka-console-consumer.sh

kafka-console-producer.sh

kafka-server-start.sh

export KAFKA_OPTS="-Djava.security.auth.login.config=/storage/kafka/config/kafka_client_jaas.conf"
启动脚本:cd /storage/kafka/bin

./zookeeper-server-start.sh  /storage/kafka/config/zookeeper.properties  &
./kafka-server-start.sh  /storage/kafka/config/server.properties  & 
先启动zookeeper,再启kafka。停止则相反,先停kafka再停zookeeper

[root@test shell]# netstat -nulpt|grep java
tcp6       0      0 :::44725                :::*                    LISTEN      6741/java           
tcp6       0      0 :::42875                :::*                    LISTEN      6433/java           
tcp6       0      0 192.168.19.128:9092     :::*                    LISTEN      6741/java           
tcp6       0      0 192.168.19.128:9093     :::*                    LISTEN      6741/java           
tcp6       0      0 :::2181                 :::*                    LISTEN      6433/java 


启动生产者

./kafka-console-producer.sh --broker-list 192.168.19.128:9092 --topic test --producer.config ../config/producer.properties

启动消费者

./kafka-console-consumer.sh --bootstrap-server 192.168.19.128:9092 --topic test --from-beginning --consumer.config ../config/consumer.properties

创建topic

[root@test bin]# ./kafka-topics.sh --create --zookeeper 192.168.19.128:2181 --replication-factor 1 --partitions 1 --topic hello

Created topic hello.

查看topic list

[root@test bin]# ./kafka-topics.sh --list --zookeeper 192.168.19.128:2181

__consumer_offsets

hello

test

查看分片情况

[root@test bin]# ./kafka-topics.sh --describe --zookeeper 192.168.19.128:2181

Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:1 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer

Topic: __consumer_offsets Partition: 0 Leader: 1 Replicas: 1 Isr: 1

Topic: __consumer_offsets Partition: 1 Leader: 1 Replicas: 1 Isr: 1

Topic:hello PartitionCount:1 ReplicationFactor:1 Configs:

Topic: hello Partition: 0 Leader: 1 Replicas: 1 Isr: 1

Topic:test PartitionCount:1 ReplicationFactor:1 Configs:

Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1

对应topic 存储数据

启动生产者:

sh kafka-console-producer.sh --broker-list 192.168.19.153:9092 --topic cloud

查看消费者:

sh kafka-console-consumer.sh --bootstrap-server 192.168.19.153:9092 --from-beginning --topic cloud

集群:

生产

sh kafka-console-producer.sh --broker-list 192.168.19.153:9092,192.168.19.142:9092,192.168.19.151:9092 --topic newcloud

消费

sh kafka-console-consumer.sh --bootstrap-server 192.168.19.153:9092,192.168.19.142:9092,192.168.19.151:9092 --topic newcloud --from-beginning

最新命令示例

创建topic

sh kafka-topics.sh --create --bootstrap-server 192.168.19.153:9092 --replication-factor 1 --partitions 1 --topic cloud

查看所有topic

sh kafka-topics.sh --bootstrap-server 192.168.19.153:9092 --list

查看指定topic详情

sh kafka-topics.sh --bootstrap-server 192.168.19.153:9092 --describe cloud

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/354663.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号