public static void main(String[] args) throws ExecutionException, InterruptedException {
Map configs = new HashMap();
// 指定初始连接用到的broker地址
configs.put("bootstrap.servers", "192.168.0.103:9092");
// 指定key的序列化类
configs.put("key.serializer", IntegerSerializer.class);
// 指定value的序列化类
configs.put("value.serializer", StringSerializer.class);
// configs.put("acks", "all");
// configs.put("reties", "3");
KafkaProducer producer = new KafkaProducer(configs);
// 用于设置用户自定义的消息头字段
List headers = new ArrayList();
headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));
ProducerRecord record = new ProducerRecord(
"topic_1",
0,
0,
"hello hqk 0",
headers
);
// 消息的同步确认
final Future future = producer.send(record);
final RecordMetadata metadata = future.get();
System.out.println("消息的主题:" + metadata.topic());
System.out.println("消息的分区号:" + metadata.partition());
System.out.println("消息的偏移量:" + metadata.offset());
// 关闭生产者
producer.close();
}
WARN Client session timed out, have not heard from server in 30000ms for sessionid 0x0 (org.apache.zookeeper.ClientCnxn) Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:262) at kafka.zookeeper.ZooKeeperClient.(ZooKeeperClient.scala:119) at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1881) at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:376) at kafka.admin.TopicCommand$.main(TopicCommand.scala:57) at kafka.admin.TopicCommand.main(TopicCommand.scala)
查了一些资料可能的原因有:
1.kafka中config目录下的server.properties配置的zookeeper是否错误
2.2181以及9092的端口是否被占用
3.防火墙需要关闭
4.连接zookeeper端口是2181
5.高版本kafka创建主题时的参数不是--zookeeper 而是--bootstrap-server
最后发现都不是
需要在kafka的 /config/service.properties中,最后添加上一句host.name=自己的服务器ip
############################# Group Coordinator Settings ############################# # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. # The default value for this is 3 seconds. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0 host.name=192.168.0.103
然后重启kafka 再次连接
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [192.168.0.103:9092] buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = null key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.0.2 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : 2a121f7b1d402825 消息的主题:topic_1 消息的分区号:0 消息的偏移量:8 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. Process finished with exit code 0
发送成功



