本人实测环境:centos7.3+zookeeper+KAFKA(JDK自行安装1.8),废话少说,直接开始。
一、zookeeper安装部署(附件内附:zookeeper-3.4.11.tar.gz)
#下载zookeeper wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.11.tar.gz #解压 tar -zxcf zookeeper-3.4.11.tar.gz #拷贝至/usr/local/目录 mv zookeeper-3.4.11/ /usr/local/ #重命名文件夹 mv /usr/local/zookeeper-3.4.11/ /usr/local/zookeeper/ #创建目录 mkdir /usr/local/zookeeper/data/ #设置配置文件 cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
修改配置文件:/usr/local/zookeeper/conf/zoo.cfg,添加一行:dataDir=/usr/local/zookeeper/data,整体配置如下:
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/usr/local/zookeeper/data clientPort=2181
使用如下命令对zookeeper进行启动,查看其状态。
#启动zookeeper /usr/local/zookeeper/bin/zkServer.sh start #查看状态 /usr/local/zookeeper/bin/zkServer.sh status
启动成功后,查看状态结果如下:
二、KAFKA安装部署(附件内附:kafka_2.10-0.9.0.0.tgz)
#下载KAFKA wget https://archive.apache.org/dist/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz #解压KAFKA tar -zxvf kafka_2.10-0.9.0.0.tgz #拷贝到/usr/local目录 cp -R kafka_2.10-0.9.0.0 /usr/local #重命名 mv /usr/local/kafka_2.10-0.9.0.0/ /usr/local/kafka/
编辑/usr/local/kafka/config/server.properties配置文件,其他配置不变,保证具备如下配置参数:
broker.id=0 host.name=本机IP listeners=PLAINTEXT://:9092 #方便KAFKA远程访问 advertised.listeners=PLAINTEXT://本机IP:9092 advertised.host.name=localhost zookeeper.connect=localhost:2181
编辑/usr/local/kafka/bin/kafka-server-start.sh文件,修改如下配置(如果虚拟机内存足够,不必做此操作)。
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
启动kafka,创建topic,并进行测试。
#启动Kafka /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties #创建topic(名称为test) /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test #发送数据(生产者) /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test #接受数据(消费者) /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic test
三、JAVA代码(Flink实时读取Kafka数据,定时批量聚合写入Mysql,附件内附源码)
(1)Entity学生类。
public class Student {
public int id;
public String name;
public String password;
public int age;
public Student() {
}
public Student(int id, String name, String password, int age) {
this.id = id;
this.name = name;
this.password = password;
this.age = age;
}
@Override
public String toString() {
return "Student{" +
"id=" + id +
", name='" + name + ''' +
", password='" + password + ''' +
", age=" + age +
'}';
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
(2)主函数类Main2。(按标注自行替换参数)
public class Main2 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "zookeeper所在IP:9092");
props.put("zookeeper.connect", "zookeeper所在IP:2181");
//可使用命令查看
// ./kafka-consumer-groups.sh --zookeeper localhost:2181 --list
props.put("group.id", "console-consumer-91899");//请自行替换
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
SingleOutputStreamOperator student = env.addSource(new FlinkKafkaConsumer09<>(
"test",
new SimpleStringSchema(),
props)).setParallelism(1)
.map(string -> JSON.parseObject(string, Student.class));
//从kafka接受数据,对1min内的数据做聚合
student.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable values, Collector> out) throws Exception {
ArrayList students = Lists.newArrayList(values);
if (students.size() > 0) {
System.out.println("1 分钟内收集到 student 的数据条数是:" + students.size());
out.collect(students);
}
}
}).addSink(new SinkToMySQL());
env.execute("Flink add sink");
}
}
(3)SinkToMySQL类(将批量数据写入MySQL,按标注自行替换参数)
public class SinkToMySQL extends RichSinkFunction> { PreparedStatement ps; BasicDataSource dataSource; private Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); dataSource = new BasicDataSource(); connection = getConnection(dataSource); String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);"; ps = this.connection.prepareStatement(sql); } @Override public void close() throws Exception { super.close(); //关闭连接和释放资源 if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } @Override public void invoke(List
value, Context context) throws Exception { //遍历数据集合 for (Student student : value) { ps.setInt(1, student.getId()); ps.setString(2, student.getName()); ps.setString(3, student.getPassword()); ps.setInt(4, student.getAge()); ps.addBatch(); } int[] count = ps.executeBatch();//批量后执行 System.out.println("成功了插入了" + count.length + "行数据"); } private static Connection getConnection(BasicDataSource dataSource) { dataSource.setDriverClassName("com.mysql.jdbc.Driver"); //注意,替换成自己本地的 mysql 数据库地址和用户名、密码 dataSource.setUrl("jdbc:mysql://localhost:3306/数据库名"); dataSource.setUsername("数据库用户名"); dataSource.setPassword("数据库密码"); //设置连接池的一些参数 dataSource.setInitialSize(10); dataSource.setMaxTotal(50); dataSource.setMinIdle(2); Connection con = null; try { con = dataSource.getConnection(); System.out.println("创建连接池:" + con); } catch (Exception e) { System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage()); } return con; } }
(4)KafkaDataMonitor类(模拟向kafka发送数据,按标注自行替换参数)
public class KafkaDataMonitor {
public static final String broker_list = "zookeeper所在IP:9092";
public static final String topic = "test";//此处替换为zookeeper的topic
public static void main(String[] args) throws InterruptedException {
writeToKafka();
}
public static void writeToKafka() throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", broker_list);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
for (int i = 1; i <= 20; i++) {
Student student = new Student(i, "test" + i, "password" + i, 18 + i);
ProducerRecord record = new ProducerRecord(topic, null, null, JSON.toJSonString(student));
producer.send(record);
System.out.println("send data: " + JSON.toJSonString(student));
}
producer.flush();
}
}
(5)我们启动主函数类(Main2)的main方法,待启动成功后。然后,使用模拟器,启动KafkaDataMonitor类的main方法,模拟向KAFKA中写入数据。日志打印如下:
使用navicat工具连接对应mysql数据库,我们可以看到1min内写入kafka的20条模拟数据,均已成功写入Mysql。
附:源码+配套环境部署



