栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

98.StreamSets实时采集Kafka

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

98.StreamSets实时采集Kafka

98.1 演示环境介绍

已安装Kafka并正常运行未启用KerberosRedHat版本:7.4CM和CDH版本:cdh5.13.3kafka版本:3.0.0(0.11.0)Kudu版本:1.5.0 98.2 操作演示

1.准备测试环境

创建测试topic

kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic kafka2kudu_topic

通过Hue使用Impala创建一个Kudu表:

CREATE TABLE ods_deal_daily_kudu (
  id STRING COMPRESSION snappy,
  name STRING COMPRESSION snappy,
  sex STRING COMPRESSION snappy,
  city STRING COMPRESSION snappy,
  occupation STRING COMPRESSION snappy,
  mobile_phone_num STRING COMPRESSION snappy,
  fix_phone_num STRING COMPRESSION snappy,
  bank_name STRING COMPRESSION snappy,
  address STRING COMPRESSION snappy,
  marriage STRING COMPRESSION snappy,
  child_num INT COMPRESSION snappy,
  PRIMARY KEY (id)
)
  PARTITION BY HASH PARTITIONS 16
STORED AS KUDU
  TBLPROPERTIES ('kudu.master_addresses'='master.gzyh.com'
);

2.生产Kafka消息

创建Maven工程,工程的pom.xml文件:


    
        cdh-project
        com.cloudera
        1.0-SNAPSHOT
    
    4.0.0
    kafka-demo
    jar
    kafka-demo
    http://maven.apache.org
    
        UTF-8
    
    
        
            org.apache.kafka
            kafka-clients
            0.10.2.0
        
        
            net.sf.json-lib
            json-lib
            2.4
            jdk15
        
    

编写ReadFileToKafka.java文件:

package com.cloudera.nokerberos;
import net.sf.json.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class ReadFileToKafka {
    public static String confPath = System.getProperty("user.dir") + File.separator + "conf";
    public static void main(String[] args) {
        if(args.length < 1) {
            System.out.print("缺少输入参数,请指定要处理的text文件");
            System.exit(1);
        }
        String filePath = args[0];
        BufferedReader reader = null;
        try {
            Properties appProperties = new Properties();
            appProperties.load(new FileInputStream(new File(confPath + File.separator + "app.properties")));
            String brokerlist = String.valueOf(appProperties.get("bootstrap.servers"));
            String topic_name = String.valueOf(appProperties.get("topic.name"));
            Properties props = getKafkaProps();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerlist);
            Producer producer = new KafkaProducer(props);
            reader = new BufferedReader(new FileReader(filePath));
            String tempString = null;
            int line = 1;
            // 一次读入一行,直到读入null为文件结束
            while ((tempString = reader.readLine()) != null) {
                String detailJson = parseJSON(tempString);
                ProducerRecord record = new ProducerRecord(topic_name, detailJson);
                producer.send(record);
                line++;
            }
            reader.close();
            producer.flush();
            producer.close();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (reader != null) {
                try {
                    reader.close();
                } catch (IOException e1) {
                }
            }
        }
    }
    
    private static String parseJSON(String tempString) {
        if(tempString != null && tempString.length() > 0) {
            Map resultMap = null;
            String[] detail = tempString.split("01");
            resultMap = new HashMap<>();
            resultMap.put("id", detail[0]);
            resultMap.put("name", detail[1]);
            resultMap.put("sex", detail[2]);
            resultMap.put("city", detail[3]);
            resultMap.put("occupation", detail[4]);
            resultMap.put("mobile_phone_num", detail[5]);
            resultMap.put("fix_phone_num", detail[6]);
            resultMap.put("bank_name", detail[7]);
            resultMap.put("address", detail[8]);
            resultMap.put("marriage", detail[9]);
            resultMap.put("child_num", detail[10]);
            return JSONObject.fromObject(resultMap).toString();
        }
        return null;
    }
    
    private static Properties getKafkaProps() {
        try{
            Properties props = new Properties();
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000); //批量发送消息
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            return props;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

将编写好的代码使用mvn命令打包

在工程目录使用mvn cleanpackage命令进行编译打包 编写脚本run.sh脚本运行jar包

run.sh脚本:

[root@master kafka-run]# vim run.sh 
#!/bin/bash
#########################################
# 创建Topic
# kafka-topics --create --zookeeper master.gzyh.com:2181,cdh01.gzyh.com:2181,cdh02.gzyh.com:2181 --replication-factor 3 --partitions 3 --topic ods_deal_daily_topic
########################################
JAVA_HOME=/usr/java/jdk1.8.0_131
#要读取的文件
read_file=$1
for file in `ls lib/*jar`
do
    CLASSPATH=$CLASSPATH:$file
done
export CLASSPATH
${JAVA_HOME}/bin/java -Xms1024m -Xmx2048m com.cloudera.nokerberos.ReadFileToKafka $read_file

conf目录下的配置文件app.properties:

[root@master kafka-run]# vim conf/app.properties 
bootstrap.servers=cdh01.gzyh.com:9092,cdh02.gzyh.com:9092,cdh03.gzyh.com:9092
topic.name=ods_deal_daily_topic

依赖包可以在命令行使用mvn命令导出:

mvn dependency:copy-dependencies -DoutputDirectory=/tmp/lib

3.创建Pipline

在StreamSets创建一个kafka2kudu的PiplinePipline流程中添加Kafka Consumer作为源并配置Kafka基础信息
配置Kafka相关信息,如Broker、ZK及Topic
配置数据格式化方式,写入Kafka的数据为JSON格式,所以这里选择JSON
添加Kudu模块及配置基本信息
配置Kudu的Master、Table、Operation等

Kudu Masters:可以配置多个,多个地址以“,”分割Table Name:如果使用Impala创建的Kudu表则需要添加impala::前缀Field to ColumnMapping:配置Json中key与Kudu表的column的映射关系,如果字段名称一致则不需要配置。DefaultOpertation:设置操作类型如:insert、upsert、delete
Kudu模块高级配置使用默认配置

4.验证启动kafka2kudu的Pipline
运行run.sh脚本向Kafka发送消息

[root@master kafka-run]# sh run.sh ods_user_600.txt 

向Kafka发送消息
查看监控信息
查看Kudu的ods_deal_daily_kudu表内容
入库的数据总条数

可以看到ods_deal_daily_kudu表的总条数与准备的测试数据量一致

大数据视频推荐:
CSDN
大数据语音推荐:
企业级大数据技术应用
大数据机器学习案例之推荐系统
自然语言处理
大数据基础
人工智能:深度学习入门到精通

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/731681.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号