栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka 集群部署及Java调用测试问题记录

Kafka 集群部署及Java调用测试问题记录

Kafka 集群部署测试记录
    • 参考教程网址
    • 问题记录
      • 背景
      • 虚拟机
        • 无法联网
        • 虚拟机之间网络不通
          • /etc/ssh/ssh_config: line 69: Bad configuration option: permitrootlogin
          • No route to host
      • 集群
        • 启动集群
        • 发送消息
        • 集群重启报错
        • 自带停止应用脚本无效
        • 未知名服务
      • JAVA 测试
        • 启动报错
        • Can't resolve address: host1:9094
    • 其它
      • 测试结果
      • 终端命令

参考教程网址

该教程为单机版

问题记录 背景

本地建立了三台虚拟机,准备部署集群模式,但是教程部署的是单机版,在此过程遇到的问题主要是虚拟机相关问题以及集群使用的时候,配置文件没改全等,关于Java调用,则是建了JAVA工程来测试

虚拟机 无法联网

勾选该连接方式可联网

固定IP对应的配置文件

[root@host1 network-scripts]# 
[root@host1 network-scripts]# 
[root@host1 network-scripts]# 
[root@host1 network-scripts]# ifconfig
enp0s3: flags=4163  mtu 1500
        inet 192.168.56.3  netmask 255.255.255.0  broadcast 192.168.56.255
        inet6 fe80::5393:d075:c1ae:c2dd  prefixlen 64  scopeid 0x20
        ether 08:00:27:1e:a0:f6  txqueuelen 1000  (Ethernet)
        RX packets 469178  bytes 48974376 (46.7 MiB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 716775  bytes 847224356 (807.9 MiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

enp0s8: flags=4163  mtu 1500
        inet 10.0.3.15  netmask 255.255.255.0  broadcast 10.0.3.255
        inet6 fe80::f417:7229:7076:9a13  prefixlen 64  scopeid 0x20
        ether 08:00:27:4b:f5:68  txqueuelen 1000  (Ethernet)
        RX packets 766  bytes 86713 (84.6 KiB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 851  bytes 61124 (59.6 KiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

enp0s9: flags=4163  mtu 1500
        inet 192.168.56.11  netmask 255.255.255.0  broadcast 192.168.56.255
        inet6 fe80::e408:cce2:5def:e1ef  prefixlen 64  scopeid 0x20
        ether 08:00:27:7c:85:21  txqueuelen 1000  (Ethernet)
        RX packets 99265  bytes 9309002 (8.8 MiB)
        RX errors 0  dropped 146  overruns 0  frame 0
        TX packets 19  bytes 1502 (1.4 KiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

lo: flags=73  mtu 65536
        inet 127.0.0.1  netmask 255.0.0.0
        inet6 ::1  prefixlen 128  scopeid 0x10
        loop  txqueuelen 1  (Local Loopback)
        RX packets 519103  bytes 42694799 (40.7 MiB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 519103  bytes 42694799 (40.7 MiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

[root@host1 network-scripts]# cat ifcfg-enp0s3|grep -v "#" |grep -v "^$"
TYPE=Ethernet
PROXY_METHOD=none
BROWSER_onLY=no
BOOTPROTO=static
DEFROUTE=yes
IPV4_FAILURE_FATAL=no
IPV6INIT=yes
IPV6_AUTOCONF=yes
IPV6_DEFROUTE=yes
IPV6_FAILURE_FATAL=no
IPV6_ADDR_GEN_MODE=stable-privacy
NAME=enp0s3
UUID=89bd6619-1e20-423d-96cc-c224d5d6d15a
DEVICE=enp0s3
onBOOT=yes
IPADDR=192.168.56.11
NETMASK=255.255.255.0
GATWAY=192.168.56.1
PREFIX=24
DNS1=114.114.114.114
[root@host1 network-scripts]# cat ifcfg-enp0s9|grep -v "#" |grep -v "^$"
TYPE=Ethernet
PROXY_METHOD=none
BROWSER_onLY=no
BOOTPROTO=static
DEFROUTE=yes
IPV4_FAILURE_FATAL=no
IPV6INIT=yes
IPV6_AUTOCONF=yes
IPV6_DEFROUTE=yes
IPV6_FAILURE_FATAL=no
IPV6_ADDR_GEN_MODE=stable-privacy
NAME=enp0s9
UUID=93d13955-e9e2-a6bd-df73-12e3c747f122
DEVICE=enp0s9
onBOOT=yes
IPADDR=192.168.56.11
NETMASK=255.255.255.0
GATWAY=192.168.56.1
PREFIX=24
DNS1=114.114.114.114
[root@host1 network-scripts]# pwd
/etc/sysconfig/network-scripts
[root@host1 network-scripts]# 

虚拟机之间网络不通 /etc/ssh/ssh_config: line 69: Bad configuration option: permitrootlogin

scp 报错

[root@host1 ~]# scp -r soft/* 192.168.56.12:/root/soft/
/etc/ssh/ssh_config: line 69: Bad configuration option: permitrootlogin
/etc/ssh/ssh_config: line 72: Bad configuration option: permitrootlogin

解决办法
注释掉/etc/ssh/ssh_config中的permitrootlogin参数,错误解除

[root@host1 network-scripts]# grep -i permitrootlogin /etc/ssh/ssh_config 
#PermitRootLogin yes
[root@host1 network-scripts]# 
No route to host
[root@host1 ~]# scp -r soft/* 192.168.56.12:/root/soft/
ssh: connect to host 192.168.56.12 port 22: No route to host

找不到主机,防火墙已经全部关闭,原因是虚拟机网卡不一致,有两个 其中一个多了#2,要保持虚拟机选择的网卡一致,否则无法ping通

集群 启动集群

直接从第一台机器复制的安装包,因此报了id已注册问题

[2021-12-22 09:38:05,289] FATAL Fatal error during KafkaServerStartable startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
	at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:295)
	at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:281)
	at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:64)
	at kafka.server.KafkaHealthcheck.startup(KafkaHealthcheck.scala:45)
	at kafka.server.KafkaServer.startup(KafkaServer.scala:231)
	at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37)
	at kafka.Kafka$.main(Kafka.scala:67)
	at kafka.Kafka.main(Kafka.scala)
[2021-12-22 09:38:05,295] INFO [Kafka Server 0], shutting down (kafka.server.KafkaServer)

解决办法:修改对应配置文件server.properties
brokerid不能重复

对应配置文件

[root@host2 config]# cat server.properties |grep -v "#"|grep -v "^$"
broker.id=1
listeners=PLAINTEXT://192.168.56.12:9092
advertised.host.name=192.168.56.12
dvertised.listeners=PLAINTEXT://192.168.56.12:9092
num.network.threads=3
 
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181
zookeeper.connection.timeout.ms=6000
[root@host2 config]# 

再次执行还是报错,如下:

kafka.common.InconsistentBrokerIdException: Configured brokerId 2 doesn't match stored brokerId 0 in meta.properties
	at kafka.server.KafkaServer.getBrokerId(KafkaServer.scala:630)
	at kafka.server.KafkaServer.startup(KafkaServer.scala:175)
	at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37)
	at kafka.Kafka$.main(Kafka.scala:67)
	at kafka.Kafka.main(Kafka.scala)
[2021-12-22 09:40:11,536] INFO shutting down (kafka.server.KafkaServer)

找到该文件,修改brokerid和server.properties中一致

[root@host3 kafka_2.11-0.9.0.0]# grep tmp * -r
config/connect-standalone.properties:offset.storage.file.filename=/tmp/connect.offsets
config/server.properties:log.dirs=/tmp/kafka-logs
logs/server.log:	log.dir = /tmp/kafka-logs
logs/server.log:	log.dirs = /tmp/kafka-logs
logs/server.log:[2021-12-22 09:38:03,339] INFO Log directory '/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager)
logs/server.log:[2021-12-22 09:38:03,395] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokermetadataCheckpoint)
logs/server.log:	log.dir = /tmp/kafka-logs
logs/server.log:	log.dirs = /tmp/kafka-logs
[root@host3 kafka_2.11-0.9.0.0]# cd /tmp/kafka-logs
[root@host3 kafka-logs]# ll
总用量 4
-rw-r--r--. 1 root root 54 12月 22 09:38 meta.properties
-rw-r--r--. 1 root root  0 12月 22 09:38 recovery-point-offset-checkpoint
-rw-r--r--. 1 root root  0 12月 22 09:38 replication-offset-checkpoint
[root@host3 kafka-logs]# vi meta.properties 
[root@host2 kafka-logs]# cat meta.properties |grep  -v "#"|grep -v "^$"
version=0
broker.id=1
[root@host2 kafka-logs]# 

发送消息

发送消息报错:LEADER_NOT_AVAILABLE及过期错

root@host2 kafka_2.11-0.9.0.0]# bin/kafka-console-producer.sh --broker-list 192.168.56.12:9092 --topic Hello-Kafka
你好     
[2021-12-22 10:07:37,399] WARN Error while fetching metadata with correlation id 0 : {Hello-Kafka=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2021-12-22 10:07:37,498] WARN Error while fetching metadata with correlation id 1 : {Hello-Kafka=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2021-12-22 10:07:37,604] WARN Error while fetching metadata with correlation id 2 : {Hello-Kafka=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2021-12-22 10:07:37,709] WARN Error while fetching metadata with correlation id 3 : {Hello-Kafka=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)


[root@host1 kafka_2.11-0.9.0.0]# bin/kafka-console-producer.sh --broker-list 192.168.56.11:9092 --topic Hello-Kafka
你好      
[2021-12-22 10:13:09,255] ERROR Error when sending message to topic Hello-Kafka with key: null, value: 6 bytes with error: Batch Expired (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

解决办法:保证集群节点已启动,修改配置文件,最终zookeeper、kafka全部重启解决

查看集群是否有broker没有运行

[root@host1 bin]# zookeeper-shell.sh host1:2181,host2:2181,host3:2181
Connecting to host1:2181,host2:2181,host3:2181
Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
ls /brokers/ids
[0, 1, 2]

对应配置:

[root@host1 config]# cat server.properties |grep -v "#" |grep -v "^$"
broker.id=0
listeners=PLAINTEXT://192.168.56.11:9092
advertised.host.name=192.168.56.11
advertised.listeners=PLAINTEXT://192.168.56.11:9092
num.network.threads=3
 
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181
zookeeper.connection.timeout.ms=6000

集群重启报错
kafka.common.KafkaException: Failed to acquire lock on file .lock in /tmp/kafka-logs. A Kafka instance in another process or thread is using this directory.
	at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:98)
	at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:95)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at kafka.log.LogManager.lockLogDirs(LogManager.scala:95)
	at kafka.log.LogManager.(LogManager.scala:57)
	at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:589)
	at kafka.server.KafkaServer.startup(KafkaServer.scala:171)
	at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37)
	at kafka.Kafka$.main(Kafka.scala:67)
	at kafka.Kafka.main(Kafka.scala)
[2021-12-22 10:36:34,680] INFO shutting down (kafka.server.KafkaServer)

[root@host1 bin]# cd /tmp/kafka-logs/
[root@host1 kafka-logs]# ll -a
总用量 4
drwxr-xr-x.  2 root root 119 12月 22 10:32 .
drwxrwxrwt. 11 root root 230 12月 22 10:33 ..
-rw-r--r--.  1 root root   0 12月 22 09:36 .lock
-rw-r--r--.  1 root root  54 12月 22 09:36 meta.properties
-rw-r--r--.  1 root root   0 12月 22 09:36 recovery-point-offset-checkpoint
-rw-r--r--.  1 root root   0 12月 22 09:36 replication-offset-checkpoint
[root@host1 kafka-logs]# rm -f .lock 
[root@host1 kafka-logs]# 

删除 .lock 即可

自带停止应用脚本无效

改成如下即可

[root@host1 bin]# cat kafka-server-stop.sh |grep -v "#"
PIDS=$(jps -lm | grep -i 'Kafka' | awk '{print $1}')

if [ -z "$PIDS" ]; then
  echo "No kafka server to stop"
  exit 1
else 
  kill -9 $PIDS
fi
[root@host1 bin]# 

未知名服务
[root@host1 bin]# less nohup.out 
[root@host1 bin]# zookeeper-shell.sh host1:2181,host2:2181,host3:2181
Connecting to host1:2181,host2:2181,host3:2181
Exception in thread "main" java.net.UnknownHostException: host2: 未知的名称或服务

hosts文件支配着本台机器,需要把另外的集群也配置过来,相当于别名,配置文件可以直接配置别名,但是后面测试java报错,无法识别,需要改成具体IP

[root@host1 bin]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
#::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.56.11   host1
192.168.56.12   host2
192.168.56.13   host3
JAVA 测试 启动报错

缺少log4j相关包以及没有logj配置文件,导入相关包:

Can’t resolve address: host1:9094
DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:386) - No node found. Trying previously-seen node with ID 0
DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:386) - No node found. Trying previously-seen node with ID 1
DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:386) - No node found. Trying previously-seen node with ID 4
DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:386) - No node found. Trying previously-seen node with ID 2
DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:386) - No node found. Trying previously-seen node with ID 5
DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:656) - Give up sending metadata request since no node is available
DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:512) - Initiating connection to node 0 at host1:9092.
DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:523) - Error connecting to node 0 at host1:9092:
java.io.IOException: Can't resolve address: host1:9092
	at org.apache.kafka.common.network.Selector.connect(Selector.java:156)
	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:514)
	at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:169)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:180)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:141)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.UnresolvedAddressException
	at sun.nio.ch.Net.checkAddress(Net.java:101)
	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
	at org.apache.kafka.common.network.Selector.connect(Selector.java:153)
	... 5 more
DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:512) - Initiating connection to node 0 at host1:9092.
DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:523) - Error connecting to node 0 at host1:9092:
java.io.IOException: Can't resolve address: host1:9092
	at org.apache.kafka.common.network.Selector.connect(Selector.java:156)
	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:514)
	at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:169)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:180)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:141)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.UnresolvedAddressException
	at sun.nio.ch.Net.checkAddress(Net.java:101)
	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
	at org.apache.kafka.common.network.Selector.connect(Selector.java:153)
	... 5 more
DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:671) - Initialize connection to node 5 for sending metadata request
DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:512) - Initiating connection to node 5 at host1:9094.
DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:523) - Error connecting to node 5 at host1:9094:
java.io.IOException: Can't resolve address: host1:9094
	at org.apache.kafka.common.network.Selector.connect(Selector.java:156)
	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:514)
	at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:49)
	at org.apache.kafka.clients.NetworkClient$DefaultmetadataUpdater.maybeUpdate(NetworkClient.java:672)
	at org.apache.kafka.clients.NetworkClient$DefaultmetadataUpdater.maybeUpdate(NetworkClient.java:568)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:268)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:141)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.UnresolvedAddressException
	at sun.nio.ch.Net.checkAddress(Net.java:101)
	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
	at org.apache.kafka.common.network.Selector.connect(Selector.java:153)
	... 8 more
DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name connections-closed:client-id-producer-1
DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name connections-created:client-id-producer-1
DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name bytes-sent-received:client-id-producer-1
DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name bytes-received:client-id-producer-1
DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name bytes-sent:client-id-producer-1
DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name select-time:client-id-producer-1
DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name io-time:client-id-producer-1
DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name node--2.bytes-sent
DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name node--2.bytes-received
DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name node--2.latency
DEBUG [kafka-producer-network-thread | producer-1] (Sender.java:157) - Shutdown of Kafka producer I/O thread has completed.
DEBUG [main] (KafkaProducer.java:656) - The Kafka producer has closed.

替换为具体IP即可
serverX.properties 为多代理配置,详见教程

[root@host1 config]# sed -i "s/host1/192.168.56.11/g" server.properties 
[root@host1 config]# sed -i "s/host1/192.168.56.11/g" server1.properties 
[root@host1 config]# sed -i "s/host1/192.168.56.11/g" server2.properties 

最终成功执行的源码:
消费者

package kafka.com.java;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class SimpleConsumer {

	@SuppressWarnings({ "unchecked", "rawtypes", "resource" })
	public static void main(String[] args) throws Exception {
		if (args.length == 0) {
			System.out.println("Enter topic name");
			return;
		}
		// Kafka consumer configuration settings
		String topicName = args[0].toString();
		Properties props = new Properties();

		props.put("bootstrap.servers",
				"192.168.56.11:9092,192.168.56.11:9093,192.168.56.11:9094,192.168.56.12:9092,192.168.56.13:9092");
		props.put("group.id", "test-consumer-group");
		props.put("enable.auto.commit", "true");
		props.put("auto.commit.interval.ms", "1000");
		props.put("session.timeout.ms", "30000");
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumer consumer = new KafkaConsumer(props);
		// Kafka Consumer subscribes list of topics here.
		consumer.subscribe(Arrays.asList(topicName));
		// print the topic name
		System.out.println("Subscribed to topic " + topicName);
		while (true) {
			ConsumerRecords records = consumer.poll(100);
			Iterator i = records.records(topicName).iterator();
			while (i.hasNext()) {
				ConsumerRecord record = (ConsumerRecord)i.next();
				System.out.printf("offset = %d, key = %s, value = %s n", record.offset(), record.key(),
						record.value());
			}
		}
	}
}

生产者

package kafka.com.java;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class SimpleProducer {

	@SuppressWarnings({ "rawtypes", "unchecked" })
	public static void main(String[] args) throws Exception {

		// Check arguments length value
		if (args.length == 0) {
			System.out.println("Enter topic name");
			return;
		}

		// Assign topicName to string variable
		String topicName = args[0].toString();

		// create instance for properties to access producer configs
		Properties props = new Properties();

		// Assign localhost id
		props.put("bootstrap.servers",
				"192.168.56.11:9092,192.168.56.11:9093,192.168.56.11:9094,192.168.56.12:9092,192.168.56.13:9092");

		// Set acknowledgements for producer requests.
		props.put("acks", "all");

		// If the request fails, the producer can automatically retry,
		props.put("retries", 0);

		// Specify buffer size in config
		props.put("batch.size", 16384);

		// Reduce the no of requests less than 0
		props.put("linger.ms", 1);

		// The buffer.memory controls the total amount of memory available to the
		// producer for buffering.
		props.put("buffer.memory", 33554432);

		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

		Producer producer = new KafkaProducer(props);

		for (int i = 0; i < 10; i++) {
			producer.send(new ProducerRecord(topicName, Integer.toString(i), "test java-" + i));
		}
		System.out.println("Message sent successfully");
		producer.close();
	}
}

其它 测试结果

终端

JAVA
生产


消费


终端命令

查看topic

[root@host1 bin]# kafka-topics.sh --list --zookeeper host1:2181,host2:2181,host3:2181
Hello-Kafka
Hello-Kafka1
Multibrokerapplication

修改topic

[root@host1 bin]# kafka-topics.sh --zookeeper host1:2181,host2:2181,host3:2181 --alter --topic Hello-Kafka --partitions 2
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[root@host1 bin]# 

删除topic

[root@host1 bin]# kafka-topics.sh --zookeeper host1:2181,host2:2181,host3:2181 --delete --topic Hello-Kafka1
Topic Hello-Kafka1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

发送消息

[root@host1 bin]# ./kafka-console-producer.sh --broker-list 192.168.56.11:9092 --topic Hello-Kafka
hello
word
is ok?   
maybe

接收消息

[root@host1 bin]# ./kafka-console-consumer.sh --zookeeper host1:2181 --topic Hello-Kafka --from-beginning
hello
word
is ok?
maybe
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/676854.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号