pom.xml
4.0.0 com.kafkaspace kafkaWorkspace 1.0-SNAPSHOT src/main/scala src/test/scala net.alchim31.maven scala-maven-plugin 3.2.2 compile testCompile -dependencyfile ${project.build.directory}/.scala_dependencies org.apache.maven.plugins maven-shade-plugin 2.4.3 package shade *:* meta-INF/*.SF meta-INF/*.DSA meta-INF/*.RSA org.apache.maven.plugins maven-compiler-plugin6 6 2.11.8 2.7.4 2.3.2 org.scala-lang scala-library${scala.version} org.apache.spark spark-core_2.11${spark.version} org.apache.hadoop hadoop-client${hadoop.version} org.apache.spark spark-sql_2.112.3.2 mysql mysql-connector-java5.1.46 org.apache.kafka kafka-clients2.0.0 org.apache.kafka kafka-streams2.0.0
LogProcessor.java
import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import java.util.HashMap; public class LogProcessor implements Processor{ private ProcessorContext processorContext; @Override public void init(ProcessorContext processorContext) { this.processorContext = processorContext; } @Override public void process(byte[] key, byte[] value) { String inputOri = new String(value); HashMap map = new HashMap (); int times = 1; if (inputOri.contains(" ")){ //截取字段 String[] words = inputOri.split(" "); for (String word:words){ if (map.containsKey(word)){ map.put(word, map.get(word)+1); }else { map.put(word, times); } } } inputOri = map.toString(); processorContext.forward(key, inputOri.getBytes()); } @Override public void close() { } }
App.java
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import java.util.Properties;
public class App {
public static void main(String[] args) {
//声明来源主题
String fromTopic = "testStreams1";
//声明目标主题
String toTopic = "testStreams2";
//设置KafkaStreams参数信息
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "logProcessor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop01:9092,hadoop02:9092,hadoop03:9092");
//实例化StreamsConfig对象
StreamsConfig config = new StreamsConfig(props);
//创建拓扑结构
Topology topology = new Topology();
//添加处理节点,为源处理节点指定名称和它订阅的主题
topology.addSource("SOURCE", fromTopic)
//添加自定义处理节点,指定处理器类和上一节点的名称
.addProcessor("PROCESSOR", new ProcessorSupplier() {
@Override
public Processor get() {
return new LogProcessor();
}
}, "SOURCE")
//添加目标处理节点,需要指定目标处理节点和上一节点的名称
.addSink("SINK", toTopic, "PROCESSOR");
//实例化KafkaStreams对象
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();
}
}
各节点启动kafka和zookeeper集群
在hadoop01中创建两个主题
kafka-topics.sh --create --topic testStreams1 --partitions 3 --replication-factor 2 --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
kafka-topics.sh --create --topic testStreams2 --partitions 3 --replication-factor 2 --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
hadoop01中启动生产者服务
kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic testStreams1
Hadoop02中启动消费者服务
kafka-console-consumer.sh --from-beginning --topic testStreams2 --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092
运行App.java,在生产者服务中输入内容,统计后将在消费者中输出。



