栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

学习笔记Kafka(六)—— Kafka Consumer API及开发实例

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

学习笔记Kafka(六)—— Kafka Consumer API及开发实例

一、Kafka Consumer API 1.1、Consumer

1.2、KafkaConsumer

1.3、ConsumerRecords

1.4、ConsumerRecord

1.5、KafkaConsumer 实战
package demo02;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SimpleConsumer {
    public static void main(String[] args) {
        String topic = "test_02_02";
        String group = "test_group";
        Map kafkaProperties = new HashMap<>();

        kafkaProperties.put("bootstrap.servers", "node100:9092,node101:9092,node102:9092");
        kafkaProperties.put("group.id", group);
        kafkaProperties.put("enable.auto.commit","true");

        kafkaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer consumer = new KafkaConsumer<>(kafkaProperties);
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
          ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
          for (ConsumerRecord record : records)
              System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
      }
    }
}

结果:

二、Producer & Consumer整合实战
  • 1、设计一个工具类可以返回随机字符串

WordUtil .java

package demo03;

import org.apache.kafka.common.protocol.types.Field;

import java.util.Random;

public class WordUtil {
    public static final String[] WORDS = "A traditional queue retains records in-order on the server, and if multiple consumers consume from the queue then the server hands out records in the order they are stored. However, although the server hands out records in order, the records are delivered asynchronously to consumers, so they may arrive out of order on different consumers. This effectively means the ordering of the records is lost in the presence of parallel consumption. Messaging systems often work around this by having a notion of "exclusive consumer" that allows only one process to consume from a queue, but of course this means that there is no parallelism in processing. Kafka does it better. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes. This is achieved by assigning the partitions in the topic to the consumers in the consumer group so that each partition is consumed by exactly one consumer in the group. By doing this we ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. Note however that there cannot be more consumer instances in a consumer group than partitions.".split(" ");

    static Random random = new Random();

    public static KV generateRandom(){
        int index = random.nextInt(WORDS.length);
        return new KV(String.valueOf(index),WORDS[index]);
    }

    public static void main(String[] args) {
        for(int i=0;i<10;i++){
            KV kv = generateRandom();
            System.out.printf("key: %s, value: %sn",kv.getK(),kv.getV());
        }
    }
}

KV.java

package demo03;

import org.apache.kafka.common.protocol.types.Field;

public class KV {
    public String k;
    public String v;
    public KV(String k, String v) {
        this.k = k;
        this.v = v;
    }

    public String getK() {
        return k;
    }

    public void setK(String k) {
        this.k = k;
    }

    public String getV() {
        return v;
    }

    public void setV(String v) {
        this.v = v;
    }
}

执行WordUtil.java结果:

  • 2、设计Producer可以每秒发送数据

TimerProducer.java

package demo03;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;

public class TimerProducer {
    public static void main(String[] args) throws InterruptedException {
        String topic = "test_02_02";
        Map kafkaProperties = new HashMap<>();

        kafkaProperties.put("bootstrap.servers", "node100:9092,node101:9092,node102:9092");
        kafkaProperties.put("acks", "all");

        kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer producer = new KafkaProducer<>(kafkaProperties);
        int size = 60;
        for (int i = 0; i < size; i++) {
            Thread.sleep(1000L);
            KV kv = WordUtil.generateRandom();
            producer.send(new ProducerRecord<>(topic, kv.getK(), kv.getV()));
        }
        producer.close();
    }
}

TimerConsumer.java

package demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class TimerConsumer {
    public static void main(String[] args) {
        String topic = "test_02_02";
        String group = "test_group";
        Map kafkaProperties = new HashMap<>();

        kafkaProperties.put("bootstrap.servers", "node100:9092,node101:9092,node102:9092");
        kafkaProperties.put("group.id", group);
        kafkaProperties.put("enable.auto.commit", "true");

        kafkaProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer consumer = new KafkaConsumer<>(kafkaProperties);
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            ConsumerRecords records = consumer.poll(Duration.ofMillis(3000));
            System.out.printf("nTime: %sn",new Date());
            for (ConsumerRecord record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

结果:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/351001.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号