1、生产者客户端案例2、生产者客户端案例
pom.xml
1、生产者客户端案例4.0.0 com.kafkaspace kafkaWorkspace 1.0-SNAPSHOT src/main/scala src/test/scala net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile -dependencyfile ${project.build.directory}/.scala_dependencies org.apache.maven.plugins maven-shade-plugin 2.4.3 package shade *:* meta-INF/*.SF meta-INF/*.DSA meta-INF/*.RSA org.apache.maven.plugins maven-compiler-plugin6 6 2.11.8 2.7.4 2.3.2 org.scala-lang scala-library${scala.version} org.apache.spark spark-core_2.11${spark.version} org.apache.hadoop hadoop-client${hadoop.version} org.apache.spark spark-sql_2.112.3.2 mysql mysql-connector-java5.1.46 org.apache.kafka kafka-clients2.0.0 org.apache.kafka kafka-streams2.0.0
KafkaProducerTest.java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerTest {
public static void main(String[] args) {
Properties props = new Properties();
//1.指定Kafaka集群的ip地址和端口号
props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
//2.等待所有副本节点的应答
props.put("acks", "all");
//3.消息发送最大尝试次数
props.put("retries", 0);
//4.指定一批消息处理次数
props.put("batch.size", 16384);
//5.指定请求延时
props.put("linger.ms", 1);
//6.指定缓存区内存大小
props.put("buffer.memory", 33554432);
//7.设置key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//8.设置value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 9、生产数据
KafkaProducer producer = new KafkaProducer(props);
for (int i = 0; i < 50; i++) {
producer.send(new ProducerRecord("itcasttopic", Integer.toString(i), "hello kafka-" + i));
}
producer.close();
}
}
打开所有节点的zookeeper和kafka集群,并启动hadoop02为消费者终端去监听此主题
[root@hadoop02 ~]# kafka-console-consumer.sh --from-beginning --topic itcasttopic --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:90922、生产者客户端案例
KafkaConsumerTest.java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerTest {
public static void main(String[] args) {
//1、准备配置文件
Properties props = new Properties();
//2、指定kafka集群主机名和端口号
props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
//3、指定消费者组id,在同一时刻同一消费组中只有一个线程可以
//去消费一个分区消息,不同的消费组可以去消费同一个分区消息
props.put("group.id", "itcasttopic");
//4、自动提交偏移量
props.put("enable.auto.commit", "true");
//5、自动提交时间间隔,每秒提交一次
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer kafkaConsumer = new KafkaConsumer(props);
//6、订阅消息,这里的topic可以是多个
kafkaConsumer.subscribe(Arrays.asList("itcasttopic"));
//7、获取消息
while (true) {
//每隔10s拉取一次
ConsumerRecords records = kafkaConsumer.poll(100);
for (ConsumerRecord record : records) {
System.out.printf("topic=%s,offset=%d,key=%s,value=%s%n", record.topic(), record.offset(), record.key(), record.value());
}
}
}
}
同样打开所有节点zookeeper和kafka集群
先运行KafkaConsumerTest.java,再运行KafkaProducerTest.java



