Window11 安装 wsl Ubuntu 18.04 LTS
2 安装Java 3 下载kafka安装包下载地址:https://kafka.apache.org/downloads
选择想要下载的二进制安装包
我这里使用的是kafka2.11(最好选用scala 2.12)
wsl中可以在/mnt文件加下直接访问windows文件,例如我windows的路径D:hadoopkafka_2.12-2.1.0,对应的wsl路径/mnt/hadoop/kafka_2.12-2.1.0
这时kafa已经可以直接使用了,但是如果要在windows系统中消费需要修改config/server.properties
增加如下两个配置
listeners=PLAINTEXT://localhost:9092 advertised.listeners=PLAINTEXT://localhost:90925 wsl中测试
进入kafka目录
cd /mnt/d/hadoop/kafka_2.12-2.1.05.1 启动zookeeper
kafka自带zookeeper,可直接启动
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties5.2 启动kafka
bin/kafka-server-start.sh -daemon config/server.properties5.3 创建一个主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test5.4 发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test5.5 接收消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning5.6 flink中测试
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// kafka读取数据
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
DataStream dataStream = env.addSource(new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties));
// // 使用KafkaSource读取数据
// KafkaSource source = KafkaSource.builder()
// .setBootstrapServers("localhost:9092")
// .setTopics("test")
// .setGroupId("consumer-group")
// .setValueonlyDeserializer(new SimpleStringSchema())
// .build();
// DataStreamSource dataStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// 打印输出
dataStream.print();
env.execute();
}
6 每次重启kafka徐删除 /tmp/kafka-logs 文件夹


