- 参考教程网址
- 问题记录
- 背景
- 虚拟机
- 无法联网
- 虚拟机之间网络不通
- /etc/ssh/ssh_config: line 69: Bad configuration option: permitrootlogin
- No route to host
- 集群
- 启动集群
- 发送消息
- 集群重启报错
- 自带停止应用脚本无效
- 未知名服务
- JAVA 测试
- 启动报错
- Can't resolve address: host1:9094
- 其它
- 测试结果
- 终端命令
该教程为单机版
问题记录 背景本地建立了三台虚拟机,准备部署集群模式,但是教程部署的是单机版,在此过程遇到的问题主要是虚拟机相关问题以及集群使用的时候,配置文件没改全等,关于Java调用,则是建了JAVA工程来测试
虚拟机 无法联网勾选该连接方式可联网
固定IP对应的配置文件
[root@host1 network-scripts]# [root@host1 network-scripts]# [root@host1 network-scripts]# [root@host1 network-scripts]# ifconfig enp0s3: flags=4163虚拟机之间网络不通 /etc/ssh/ssh_config: line 69: Bad configuration option: permitrootloginmtu 1500 inet 192.168.56.3 netmask 255.255.255.0 broadcast 192.168.56.255 inet6 fe80::5393:d075:c1ae:c2dd prefixlen 64 scopeid 0x20 ether 08:00:27:1e:a0:f6 txqueuelen 1000 (Ethernet) RX packets 469178 bytes 48974376 (46.7 MiB) RX errors 0 dropped 0 overruns 0 frame 0 TX packets 716775 bytes 847224356 (807.9 MiB) TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0 enp0s8: flags=4163 mtu 1500 inet 10.0.3.15 netmask 255.255.255.0 broadcast 10.0.3.255 inet6 fe80::f417:7229:7076:9a13 prefixlen 64 scopeid 0x20 ether 08:00:27:4b:f5:68 txqueuelen 1000 (Ethernet) RX packets 766 bytes 86713 (84.6 KiB) RX errors 0 dropped 0 overruns 0 frame 0 TX packets 851 bytes 61124 (59.6 KiB) TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0 enp0s9: flags=4163 mtu 1500 inet 192.168.56.11 netmask 255.255.255.0 broadcast 192.168.56.255 inet6 fe80::e408:cce2:5def:e1ef prefixlen 64 scopeid 0x20 ether 08:00:27:7c:85:21 txqueuelen 1000 (Ethernet) RX packets 99265 bytes 9309002 (8.8 MiB) RX errors 0 dropped 146 overruns 0 frame 0 TX packets 19 bytes 1502 (1.4 KiB) TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0 lo: flags=73 mtu 65536 inet 127.0.0.1 netmask 255.0.0.0 inet6 ::1 prefixlen 128 scopeid 0x10 loop txqueuelen 1 (Local Loopback) RX packets 519103 bytes 42694799 (40.7 MiB) RX errors 0 dropped 0 overruns 0 frame 0 TX packets 519103 bytes 42694799 (40.7 MiB) TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0 [root@host1 network-scripts]# cat ifcfg-enp0s3|grep -v "#" |grep -v "^$" TYPE=Ethernet PROXY_METHOD=none BROWSER_onLY=no BOOTPROTO=static DEFROUTE=yes IPV4_FAILURE_FATAL=no IPV6INIT=yes IPV6_AUTOCONF=yes IPV6_DEFROUTE=yes IPV6_FAILURE_FATAL=no IPV6_ADDR_GEN_MODE=stable-privacy NAME=enp0s3 UUID=89bd6619-1e20-423d-96cc-c224d5d6d15a DEVICE=enp0s3 onBOOT=yes IPADDR=192.168.56.11 NETMASK=255.255.255.0 GATWAY=192.168.56.1 PREFIX=24 DNS1=114.114.114.114 [root@host1 network-scripts]# cat ifcfg-enp0s9|grep -v "#" |grep -v "^$" TYPE=Ethernet PROXY_METHOD=none BROWSER_onLY=no BOOTPROTO=static DEFROUTE=yes IPV4_FAILURE_FATAL=no IPV6INIT=yes IPV6_AUTOCONF=yes IPV6_DEFROUTE=yes IPV6_FAILURE_FATAL=no IPV6_ADDR_GEN_MODE=stable-privacy NAME=enp0s9 UUID=93d13955-e9e2-a6bd-df73-12e3c747f122 DEVICE=enp0s9 onBOOT=yes IPADDR=192.168.56.11 NETMASK=255.255.255.0 GATWAY=192.168.56.1 PREFIX=24 DNS1=114.114.114.114 [root@host1 network-scripts]# pwd /etc/sysconfig/network-scripts [root@host1 network-scripts]#
scp 报错
[root@host1 ~]# scp -r soft/* 192.168.56.12:/root/soft/ /etc/ssh/ssh_config: line 69: Bad configuration option: permitrootlogin /etc/ssh/ssh_config: line 72: Bad configuration option: permitrootlogin
解决办法
注释掉/etc/ssh/ssh_config中的permitrootlogin参数,错误解除
[root@host1 network-scripts]# grep -i permitrootlogin /etc/ssh/ssh_config #PermitRootLogin yes [root@host1 network-scripts]#No route to host
[root@host1 ~]# scp -r soft/* 192.168.56.12:/root/soft/ ssh: connect to host 192.168.56.12 port 22: No route to host
找不到主机,防火墙已经全部关闭,原因是虚拟机网卡不一致,有两个 其中一个多了#2,要保持虚拟机选择的网卡一致,否则无法ping通
直接从第一台机器复制的安装包,因此报了id已注册问题
[2021-12-22 09:38:05,289] FATAL Fatal error during KafkaServerStartable startup. Prepare to shutdown (kafka.server.KafkaServerStartable) java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering. at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:295) at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:281) at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:64) at kafka.server.KafkaHealthcheck.startup(KafkaHealthcheck.scala:45) at kafka.server.KafkaServer.startup(KafkaServer.scala:231) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37) at kafka.Kafka$.main(Kafka.scala:67) at kafka.Kafka.main(Kafka.scala) [2021-12-22 09:38:05,295] INFO [Kafka Server 0], shutting down (kafka.server.KafkaServer)
解决办法:修改对应配置文件server.properties
brokerid不能重复
对应配置文件
[root@host2 config]# cat server.properties |grep -v "#"|grep -v "^$" broker.id=1 listeners=PLAINTEXT://192.168.56.12:9092 advertised.host.name=192.168.56.12 dvertised.listeners=PLAINTEXT://192.168.56.12:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181 zookeeper.connection.timeout.ms=6000 [root@host2 config]#
再次执行还是报错,如下:
kafka.common.InconsistentBrokerIdException: Configured brokerId 2 doesn't match stored brokerId 0 in meta.properties at kafka.server.KafkaServer.getBrokerId(KafkaServer.scala:630) at kafka.server.KafkaServer.startup(KafkaServer.scala:175) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37) at kafka.Kafka$.main(Kafka.scala:67) at kafka.Kafka.main(Kafka.scala) [2021-12-22 09:40:11,536] INFO shutting down (kafka.server.KafkaServer)
找到该文件,修改brokerid和server.properties中一致
[root@host3 kafka_2.11-0.9.0.0]# grep tmp * -r config/connect-standalone.properties:offset.storage.file.filename=/tmp/connect.offsets config/server.properties:log.dirs=/tmp/kafka-logs logs/server.log: log.dir = /tmp/kafka-logs logs/server.log: log.dirs = /tmp/kafka-logs logs/server.log:[2021-12-22 09:38:03,339] INFO Log directory '/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager) logs/server.log:[2021-12-22 09:38:03,395] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokermetadataCheckpoint) logs/server.log: log.dir = /tmp/kafka-logs logs/server.log: log.dirs = /tmp/kafka-logs [root@host3 kafka_2.11-0.9.0.0]# cd /tmp/kafka-logs [root@host3 kafka-logs]# ll 总用量 4 -rw-r--r--. 1 root root 54 12月 22 09:38 meta.properties -rw-r--r--. 1 root root 0 12月 22 09:38 recovery-point-offset-checkpoint -rw-r--r--. 1 root root 0 12月 22 09:38 replication-offset-checkpoint [root@host3 kafka-logs]# vi meta.properties [root@host2 kafka-logs]# cat meta.properties |grep -v "#"|grep -v "^$" version=0 broker.id=1 [root@host2 kafka-logs]#发送消息
发送消息报错:LEADER_NOT_AVAILABLE及过期错
root@host2 kafka_2.11-0.9.0.0]# bin/kafka-console-producer.sh --broker-list 192.168.56.12:9092 --topic Hello-Kafka
你好
[2021-12-22 10:07:37,399] WARN Error while fetching metadata with correlation id 0 : {Hello-Kafka=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2021-12-22 10:07:37,498] WARN Error while fetching metadata with correlation id 1 : {Hello-Kafka=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2021-12-22 10:07:37,604] WARN Error while fetching metadata with correlation id 2 : {Hello-Kafka=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2021-12-22 10:07:37,709] WARN Error while fetching metadata with correlation id 3 : {Hello-Kafka=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[root@host1 kafka_2.11-0.9.0.0]# bin/kafka-console-producer.sh --broker-list 192.168.56.11:9092 --topic Hello-Kafka
你好
[2021-12-22 10:13:09,255] ERROR Error when sending message to topic Hello-Kafka with key: null, value: 6 bytes with error: Batch Expired (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
解决办法:保证集群节点已启动,修改配置文件,最终zookeeper、kafka全部重启解决
查看集群是否有broker没有运行
[root@host1 bin]# zookeeper-shell.sh host1:2181,host2:2181,host3:2181 Connecting to host1:2181,host2:2181,host3:2181 Welcome to ZooKeeper! JLine support is disabled WATCHER:: WatchedEvent state:SyncConnected type:None path:null ls /brokers/ids [0, 1, 2]
对应配置:
[root@host1 config]# cat server.properties |grep -v "#" |grep -v "^$" broker.id=0 listeners=PLAINTEXT://192.168.56.11:9092 advertised.host.name=192.168.56.11 advertised.listeners=PLAINTEXT://192.168.56.11:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=192.168.56.11:2181,192.168.56.12:2181,192.168.56.13:2181 zookeeper.connection.timeout.ms=6000集群重启报错
kafka.common.KafkaException: Failed to acquire lock on file .lock in /tmp/kafka-logs. A Kafka instance in another process or thread is using this directory. at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:98) at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:95) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.log.LogManager.lockLogDirs(LogManager.scala:95) at kafka.log.LogManager.(LogManager.scala:57) at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:589) at kafka.server.KafkaServer.startup(KafkaServer.scala:171) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37) at kafka.Kafka$.main(Kafka.scala:67) at kafka.Kafka.main(Kafka.scala) [2021-12-22 10:36:34,680] INFO shutting down (kafka.server.KafkaServer) [root@host1 bin]# cd /tmp/kafka-logs/ [root@host1 kafka-logs]# ll -a 总用量 4 drwxr-xr-x. 2 root root 119 12月 22 10:32 . drwxrwxrwt. 11 root root 230 12月 22 10:33 .. -rw-r--r--. 1 root root 0 12月 22 09:36 .lock -rw-r--r--. 1 root root 54 12月 22 09:36 meta.properties -rw-r--r--. 1 root root 0 12月 22 09:36 recovery-point-offset-checkpoint -rw-r--r--. 1 root root 0 12月 22 09:36 replication-offset-checkpoint [root@host1 kafka-logs]# rm -f .lock [root@host1 kafka-logs]#
删除 .lock 即可
自带停止应用脚本无效改成如下即可
[root@host1 bin]# cat kafka-server-stop.sh |grep -v "#"
PIDS=$(jps -lm | grep -i 'Kafka' | awk '{print $1}')
if [ -z "$PIDS" ]; then
echo "No kafka server to stop"
exit 1
else
kill -9 $PIDS
fi
[root@host1 bin]#
未知名服务
[root@host1 bin]# less nohup.out [root@host1 bin]# zookeeper-shell.sh host1:2181,host2:2181,host3:2181 Connecting to host1:2181,host2:2181,host3:2181 Exception in thread "main" java.net.UnknownHostException: host2: 未知的名称或服务
hosts文件支配着本台机器,需要把另外的集群也配置过来,相当于别名,配置文件可以直接配置别名,但是后面测试java报错,无法识别,需要改成具体IP
[root@host1 bin]# cat /etc/hosts 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 #::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.56.11 host1 192.168.56.12 host2 192.168.56.13 host3JAVA 测试 启动报错
缺少log4j相关包以及没有logj配置文件,导入相关包:
DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:386) - No node found. Trying previously-seen node with ID 0 DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:386) - No node found. Trying previously-seen node with ID 1 DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:386) - No node found. Trying previously-seen node with ID 4 DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:386) - No node found. Trying previously-seen node with ID 2 DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:386) - No node found. Trying previously-seen node with ID 5 DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:656) - Give up sending metadata request since no node is available DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:512) - Initiating connection to node 0 at host1:9092. DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:523) - Error connecting to node 0 at host1:9092: java.io.IOException: Can't resolve address: host1:9092 at org.apache.kafka.common.network.Selector.connect(Selector.java:156) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:514) at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:169) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:180) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:141) at java.lang.Thread.run(Thread.java:745) Caused by: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:101) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) at org.apache.kafka.common.network.Selector.connect(Selector.java:153) ... 5 more DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:512) - Initiating connection to node 0 at host1:9092. DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:523) - Error connecting to node 0 at host1:9092: java.io.IOException: Can't resolve address: host1:9092 at org.apache.kafka.common.network.Selector.connect(Selector.java:156) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:514) at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:169) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:180) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:141) at java.lang.Thread.run(Thread.java:745) Caused by: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:101) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) at org.apache.kafka.common.network.Selector.connect(Selector.java:153) ... 5 more DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:671) - Initialize connection to node 5 for sending metadata request DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:512) - Initiating connection to node 5 at host1:9094. DEBUG [kafka-producer-network-thread | producer-1] (NetworkClient.java:523) - Error connecting to node 5 at host1:9094: java.io.IOException: Can't resolve address: host1:9094 at org.apache.kafka.common.network.Selector.connect(Selector.java:156) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:514) at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:49) at org.apache.kafka.clients.NetworkClient$DefaultmetadataUpdater.maybeUpdate(NetworkClient.java:672) at org.apache.kafka.clients.NetworkClient$DefaultmetadataUpdater.maybeUpdate(NetworkClient.java:568) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:268) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:141) at java.lang.Thread.run(Thread.java:745) Caused by: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:101) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) at org.apache.kafka.common.network.Selector.connect(Selector.java:153) ... 8 more DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name connections-closed:client-id-producer-1 DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name connections-created:client-id-producer-1 DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name bytes-sent-received:client-id-producer-1 DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name bytes-received:client-id-producer-1 DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name bytes-sent:client-id-producer-1 DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name select-time:client-id-producer-1 DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name io-time:client-id-producer-1 DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name node--2.bytes-sent DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name node--2.bytes-received DEBUG [kafka-producer-network-thread | producer-1] (Metrics.java:220) - Removed sensor with name node--2.latency DEBUG [kafka-producer-network-thread | producer-1] (Sender.java:157) - Shutdown of Kafka producer I/O thread has completed. DEBUG [main] (KafkaProducer.java:656) - The Kafka producer has closed.
替换为具体IP即可
serverX.properties 为多代理配置,详见教程
[root@host1 config]# sed -i "s/host1/192.168.56.11/g" server.properties [root@host1 config]# sed -i "s/host1/192.168.56.11/g" server1.properties [root@host1 config]# sed -i "s/host1/192.168.56.11/g" server2.properties
最终成功执行的源码:
消费者
package kafka.com.java;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class SimpleConsumer {
@SuppressWarnings({ "unchecked", "rawtypes", "resource" })
public static void main(String[] args) throws Exception {
if (args.length == 0) {
System.out.println("Enter topic name");
return;
}
// Kafka consumer configuration settings
String topicName = args[0].toString();
Properties props = new Properties();
props.put("bootstrap.servers",
"192.168.56.11:9092,192.168.56.11:9093,192.168.56.11:9094,192.168.56.12:9092,192.168.56.13:9092");
props.put("group.id", "test-consumer-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer(props);
// Kafka Consumer subscribes list of topics here.
consumer.subscribe(Arrays.asList(topicName));
// print the topic name
System.out.println("Subscribed to topic " + topicName);
while (true) {
ConsumerRecords records = consumer.poll(100);
Iterator i = records.records(topicName).iterator();
while (i.hasNext()) {
ConsumerRecord record = (ConsumerRecord)i.next();
System.out.printf("offset = %d, key = %s, value = %s n", record.offset(), record.key(),
record.value());
}
}
}
}
生产者
package kafka.com.java;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class SimpleProducer {
@SuppressWarnings({ "rawtypes", "unchecked" })
public static void main(String[] args) throws Exception {
// Check arguments length value
if (args.length == 0) {
System.out.println("Enter topic name");
return;
}
// Assign topicName to string variable
String topicName = args[0].toString();
// create instance for properties to access producer configs
Properties props = new Properties();
// Assign localhost id
props.put("bootstrap.servers",
"192.168.56.11:9092,192.168.56.11:9093,192.168.56.11:9094,192.168.56.12:9092,192.168.56.13:9092");
// Set acknowledgements for producer requests.
props.put("acks", "all");
// If the request fails, the producer can automatically retry,
props.put("retries", 0);
// Specify buffer size in config
props.put("batch.size", 16384);
// Reduce the no of requests less than 0
props.put("linger.ms", 1);
// The buffer.memory controls the total amount of memory available to the
// producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer producer = new KafkaProducer(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord(topicName, Integer.toString(i), "test java-" + i));
}
System.out.println("Message sent successfully");
producer.close();
}
}
其它
测试结果
终端
JAVA
生产
消费
查看topic
[root@host1 bin]# kafka-topics.sh --list --zookeeper host1:2181,host2:2181,host3:2181 Hello-Kafka Hello-Kafka1 Multibrokerapplication
修改topic
[root@host1 bin]# kafka-topics.sh --zookeeper host1:2181,host2:2181,host3:2181 --alter --topic Hello-Kafka --partitions 2 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! [root@host1 bin]#
删除topic
[root@host1 bin]# kafka-topics.sh --zookeeper host1:2181,host2:2181,host3:2181 --delete --topic Hello-Kafka1 Topic Hello-Kafka1 is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
发送消息
[root@host1 bin]# ./kafka-console-producer.sh --broker-list 192.168.56.11:9092 --topic Hello-Kafka hello word is ok? maybe
接收消息
[root@host1 bin]# ./kafka-console-consumer.sh --zookeeper host1:2181 --topic Hello-Kafka --from-beginning hello word is ok? maybe



