栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

话单数据仓库搭建(1)- 数仓概念及数据采集

话单数据仓库搭建(1)- 数仓概念及数据采集

1 数据仓库概念

数据仓库( Data Warehouse ),可简写为DW或DWH。数据仓库,是为企业所有决策制定过程,提供所有系统数据支持的战略集合。
通过对数据仓库中数据的分析,可以帮助企业,改进业务流程、控制成本、提高产品质量等。
数据仓库,并不是数据的最终目的地,而是为数据最终的目的地做好准备。这些准备包括对数据的:清洗,转义,分类,重组,合并,拆分,统计等等。

2 项目需求 2.1 项目需求分析

1、实时采集埋点的用户行为数据
2、实现数据仓库的分层搭建
3、每天定时导入业务数据
4、根据数据仓库中的数据进行报表分析

2.2 项目框架 2.2.1 技术选型

数据采集传输:Flume,Kafka,Logstash,DataX,Sqoop
数据存储:Hive,MySql,HDFS,Hbase,S3
数据计算:Spark,Hive,Tez,Flink,Storm
数据查询:Presto,Impala,Kylin

2.2.2 常见系统架构图设计

2.2.3 常见系统数据流程设计

2.2.4 框架版本选型
软件产品版本
Hadoop2.7.2
Flume1.7.0
Kafka0.11.0.2
Kafka Manager1.3.3.22
Hive1.2.1
Sqoop1.4.6
Mysql5.7
Azkaban2.5.0
Java1.8
Zookeeper3.4.10

注意事项:框架选型尽量不要选择最新的框架,选择最新框架半年前左右的稳定版。
2.2.5 集群资源规划设计

服务器1服务器2服务器3
HDFSNameNode,DataNodeDataNodeDataNode
YarnNodeManagerResourcemanager,NodeManagerNodeManager
ZookeeperZookeeperZookeeperZookeeper
Flume(采集日志)FlumeFlume
KafkaKafkaKafkaKafka
Flume(消费Kafka)Flume
HiveHive
MysqlMysql
3 数据生成模块 3.1 话单数据字段
private String sysId;               // 平台编码
private String serviceName;         // 接口服务名称
private String homeProvinceCode;    // 归属省
private String visitProvinceCode;   // 访问省
private String channelCode;         // 渠道编码
private String serviceCode;         // 业务流水号
private String cdrGenTime;  // 开始时间
private String duration;            // 时长
private String recordType;          // 话单类型
private String imsi;                // 46000开头15位编码
private String msisdn;              // 通常指手机号码
private String dataUplinkVolume;    // 上行流量
private String dataDownlinkVolume;  // 下行流量
private String charge;              // 费用
private String resultCode;          // 结果编码
3.2 话单数据格式:
{"cdrGenTime":"20211206165309844","channelCode":"wRFUC","charge":"62.28","dataDownlinkVolume":"2197","dataUplinkVolume":"467","duration":"1678","homeProvinceCode":"551","imsi":"460007237312954","msisdn":"15636420864","recordType":"gprs","resultCode":"000000","serviceCode":"398814910321850","serviceName":"a9icp8xcNy","sysId":"U1ob6","visitProvinceCode":"451"}
3.3 编写项目模拟生成话单数据

详见项目:cdr_gen_app
对项目进行打包并部署到Hadoop101机器上,执行命令测试数据生成

[hadoop@hadoop101 ~]$ head datas/call.log 
{"cdrGenTime":"20211206173630762","channelCode":"QEHVc","charge":"78.59","dataDownlinkVolume":"3007","dataUplinkVolume":"3399","duration":"1545","homeProvinceCode":"571","imsi":"460002441249614","msisdn":"18981080935","recordType":"gprs","resultCode":"000000","serviceCode":"408688053609568","serviceName":"8lr23kV4C2","sysId":"j4FbV","visitProvinceCode":"931"}
{"cdrGenTime":"20211206173630884","channelCode":"TY4On","charge":"31.17","dataDownlinkVolume":"1853","dataUplinkVolume":"2802","duration":"42","homeProvinceCode":"891","imsi":"460008474876800","msisdn":"15998955319","recordType":"gprs","resultCode":"000000","serviceCode":"739375515156215","serviceName":"6p87h0OHdC","sysId":"UnI9V","visitProvinceCode":"991"}
...
4 数据采集 4.1 Hadoop环境准备
Hadoop101Hadoop102Hadoop103
HDFDNameNode, DataNodeDataNodeDataNode
YarnNodeManagerResourceManager,NodeManagerNodeManager
4.1.1 添加LZO支持包

1)先下载lzo的jar项目
https://github.com/twitter/hadoop-lzo/archive/master.zip">https://github.com/twitter/hadoop-lzo/archive/master.zip
2)下载后的文件名是hadoop-lzo-master,它是一个zip格式的压缩包,先进行解压,然后用maven编译。生成hadoop-lzo-0.4.20。
3)将编译好后的hadoop-lzo-0.4.20.jar 放入hadoop-2.7.2/share/hadoop/common/

[hadoop@hadoop101 common]$ pwd
/opt/modules/hadoop-2.7.2/share/hadoop/common
[hadoop@hadoop101 common]$ ls
hadoop-lzo-0.4.20.jar

4)同步hadoop-lzo-0.4.20.jar到hadoop102、hadoop103

[hadoop@hadoop101 common]$ xsync hadoop-lzo-0.4.20.jar
4.1.2 添加配置

1)core-site.xml增加配置支持LZO压缩

    
       io.compression.codecs
       
           org.apache.hadoop.io.compress.GzipCodec,
           org.apache.hadoop.io.compress.DefaultCodec,
           org.apache.hadoop.io.compress.BZip2Codec,
           org.apache.hadoop.io.compress.SnappyCodec,
           com.hadoop.compression.lzo.LzoCodec,
           com.hadoop.compression.lzo.LzopCodec
       
  
  
      io.compression.codec.lzo.class
      com.hadoop.compression.lzo.LzoCodec

2)同步core-site.xml到hadoop102、hadoop103

[hadoop@hadoop101 hadoop]$ xsync core-site.xml
4.1.2 启动集群
[hadoop@hadoop101 hadoop-2.7.2]$ sbin/start-dfs.sh
[hadoop@hadoop102 hadoop-2.7.2]$ sbin/start-yarn.sh
4.1.3 验证

1)web和进程查看

  • Web查看:http://hadoop101:50070
  • 进程查看:jps查看各个节点状态。
4.2 Zookeeper环境准备
Hadoop101Hadoop102Hadoop103
ZookeeperZookeeperZookeeperZookeeper
4.2.2 ZK集群启动停止脚本

1)在hadoop101的/home/hadoop/bin目录下创建脚本

[hadoop@hadoop101 bin]$ vim zk.sh

在脚本中编写如下内容

#! /bin/bash

case $1 in
"start"){
    for i in hadoop101 hadoop102 hadoop103
    do
        ssh $i "/opt/modules/zookeeper-3.4.10/bin/zkServer.sh start"
    done
    };;
"stop"){
    for i in hadoop101 hadoop102 hadoop103
    do
        ssh $i "/opt/modules/zookeeper-3.4.10/bin/zkServer.sh stop"
    done
    };;
esac

2)增加脚本执行权限

[hadoop@hadoop101 bin]$ chmod +x zk.sh

3)Zookeeper集群启动脚本

[hadoop@hadoop101 modules]$ zk.sh start

4)Zookeeper集群停止脚本

[hadoop@hadoop101 modules]$ zk.sh stop
4.3 Flume环境准备
Hadoop101Hadoop102Hadoop103
Flume(采集)Flume

flume配置分析

Flume的具体配置
(1)在/opt/datas/calllogs/flume1/目录下创建flume1.conf文件
[hadoop@hadoop101 conf]$ vim flume1.conf
在文件配置如下内容

a1.sources=r1
a1.channels=c1 c2 
a1.sinks=k1 k2 

# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/datas/calllogs/flume/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/datas/calllogs/records/calllogs.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2

#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.cmcc.jackyan.flume.interceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.cmcc.jackyan.flume.interceptor.LogTypeInterceptor$Builder

# selector
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = recordType
a1.sources.r1.selector.mapping.volte= c1
a1.sources.r1.selector.mapping.cdr= c2

# configure channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000
a1.channels.c1.byteCapacityBufferPercentage=20

a1.channels.c2.type = memory
a1.channels.c2.capacity=10000
a1.channels.c2.byteCapacityBufferPercentage=20

# configure sink
# start-sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = topic_volte
a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k1.kafka.flumeBatchSize = 2000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.channel = c1

# event-sink
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = topic_cdr
a1.sinks.k2.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k2.kafka.flumeBatchSize = 2000
a1.sinks.k2.kafka.producer.acks = 1
a1.sinks.k2.channel = c2

注意:com.cmcc.jackyan.flume.interceptor.LogETLInterceptor和com.jackyan.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。

4.3.3 自定义Flume拦截器

这里自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。
ETL拦截器主要用于,过滤时间戳不合法和json数据不完整的日志
日志类型区分拦截器主要用于,将volte话单和其他话单区分开来,方便发往kafka的不同topic。
1)创建maven模块flume-interceptor
2)创建包名:com.cmcc.jackyan.flume.interceptor
3)在pom.xml文件中添加如下配置


        
            org.apache.flume
            flume-ng-core
            1.7.0
        
    

    
        
            
                maven-compiler-plugin
                2.3.2
                
                    1.8
                    1.8
                
            
            
                maven-assembly-plugin
                
                    
                        jar-with-dependencies
                    
                    
                        
                            com.cmcc.jackyan.appclient.AppMain
                        
                    
                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
        
    

4)在com.cmcc.jackyan.flume.interceptor包下创建LogETLInterceptor类名
Flume ETL拦截器LogETLInterceptor

public class LogETLInterceptor implements Interceptor {

    //打印日志,用于调试
    private static final Logger logger = LoggerFactory.getLogger(LogETLInterceptor.class);

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        String body = new String(event.getBody(), Charset.forName("UTF-8"));
//        logger.info("Before:" + body);

        event.setBody(LogUtils.addTimeStamp(body).getBytes());

        body = new String(event.getBody(), Charset.forName("UTF-8"));

//        logger.info("After:" + body);

        return event;
    }

    @Override
    public List intercept(List events) {

        ArrayList intercepts = new ArrayList<>();

        // 遍历所有的events,过滤掉不合法的
        for (Event event : events) {
            Event interceptEvent = intercept(event);
            if (interceptEvent != null) {
                intercepts.add(event);
            }
        }
        return intercepts;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new LogETLInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

4)Flume日志处理工具类

public class LogUtils {

    private static Logger logger = LoggerFactory.getLogger(LogUtils.class);

    
    public static String addTimeStamp(String log) {

        //{"cdrGenTime":"20211207091838913","channelCode":"k82Ce","charge":"86.14","dataDownlinkVolume":"3083","dataUplinkVolume":"2311","duration":"133","homeProvinceCode":"210","imsi":"460004320498004","msisdn":"17268221069","recordType":"mms","resultCode":"000000","serviceCode":"711869363795868","serviceName":"H79cKXuJO4","sysId":"aGIL4","visitProvinceCode":"220"}
        long t = System.currentTimeMillis();

        return t + "|" + log;
    }
}

5)Flume日志类型区分拦截器LogTypeInterceptor

public class LogTypeInterceptor implements Interceptor {

    //打印日志,用于调试
    private static final Logger logger = LoggerFactory.getLogger(LogTypeInterceptor.class);

    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        // 1获取flume接收消息头
        Map headers = event.getHeaders();

        // 2获取flume接收的json数据数组

        byte[] json = event.getBody();
        // 将json数组转换为字符串
        String jsonStr = new String(json);

        String recordType = "" ;

        // volte
        if (jsonStr.contains("volte")) {
            recordType = "volte";
        }
        // cdr
        else {
            recordType = "cdr";
        }

        // 3将日志类型存储到flume头中
        headers.put("recordType", recordType);

//        logger.info("recordType:" + recordType);

        return event;
    }

    @Override
    public List intercept(List events) {
        ArrayList interceptors = new ArrayList<>();

        for (Event event : events) {
            Event interceptEvent = intercept(event);

            interceptors.add(interceptEvent);
        }

        return interceptors;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {

        public Interceptor build() {
            return new LogTypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

6)打包
拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入flume的lib文件夹下面。
7)需要先将打好的包放入到hadoop101的/opt/modules/flume/lib文件夹下面。

[hadoop@hadoop101 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT.jar

8)启动flume

[hadoop@hadoop101 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file /opt/datas/calllogs/flume1/flume1.conf &

4.3.4 日志采集Flume启动停止脚本
1)在/home/hadoop/bin目录下创建脚本calllog-flume1.sh

[hadoop@hadoop101 bin]$ vim calllog-flume1.sh

在脚本中填写如下内容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop101
        do
                echo " --------启动 $i 消费flume-------"
                ssh $i "nohup /opt/modules/flume/bin/flume-ng agent --conf /opt/modules/flume/conf/ --conf-file /opt/datas/calllogs/flume1/flume1.conf &"
        done
};;
"stop"){
        for i in hadoop101
        do
                echo " --------停止 $i 消费flume-------"
                ssh $i "ps -ef | grep flume1 | grep -v grep | awk '{print $2}' | xargs kill -9"
        done

};;
esac

说明:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思。
2)增加脚本执行权限

[hadoop@hadoop101 bin]$ chmod +x calllog-flume1.sh

3)flume1集群启动脚本

[hadoop@hadoop101 modules]$ calllog-flume1.sh start

4)flume1集群停止脚本

[hadoop@hadoop101 modules]$ calllog-flume1.sh stop
4.4 Kafka环境准备
Hadoop101Hadoop102Hadoop103
KafkaKafkaKafkaKafka
4.4.1 Kafka集群启动停止脚本

1)在/home/hadoop/bin目录下创建脚本kafka.sh

[hadoop@hadoop101 bin]$ vim kafka.sh

在脚本中填写如下内容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop101 hadoop102 hadoop103
        do
                echo " --------启动 $i kafka-------"
                # 用于KafkaManager监控

                ssh $i "export JMX_PORT=9988 && /opt/modules/kafka/bin/kafka-server-start.sh -daemon /opt/modules/kafka/config/server.properties "
        done
};;
"stop"){
        for i in hadoop101 hadoop102 hadoop103
        do
                echo " --------停止 $i kafka-------"
                ssh $i "ps -ef | grep server.properties | grep -v grep| awk '{print $2}' | xargs kill >/dev/null 2>&1 &"
        done
};;
esac

注意:启动Kafka时要先开启JMX端口,是用于后续KafkaManager监控。
2)增加脚本执行权限

[hadoop@hadoop101 bin]$ chmod +x kafka.sh

3)kf集群启动脚本

[hadoop@hadoop101 modules]$ kafka.sh start

4)kf集群停止脚本

[hadoop@hadoop101 modules]$ kafka.sh stop
4.4.2 查看所有Kafka topic
[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --list
4.4.3 创建 Kafka topic

进入到/opt/modules/kafka/目录下分别创建:启动日志主题、事件日志主题。
1)创建volte主题

[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181  --create --replication-factor 2 --partitions 3 --topic topic-volte  

2)创建cdr主题

[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181  --create --replication-factor 2 --partitions 3 --topic topic-cdr
4.4.4 删除 Kafka topic
[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic topic-volte

[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic topic-cdr
4.4.5 生产消息
[hadoop@hadoop101 kafka]$ bin/kafka-console-producer.sh 
--bootstrap-server hadoop101:9092 --topic topic-volte
>hello world
>hadoop  hadoop
4.4.6 消费消息
[hadoop@hadoop102 kafka]$ bin/kafka-console-consumer.sh 
--bootstrap-server hadoop101:9092 --from-beginning --topic topic-volte

--from-beginning:会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

4.4.7 查看某个Topic的详情
[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 
--describe --topic topic-volte
4.5 Flume消费Kafka数据写到HDFS
Hadoop101Hadoop102Hadoop103
Flume(消费kafka)Flume

配置分析

Flume的具体配置
(1)在hadoop103的/opt/datas/calllogs/flume2/目录下创建flume2.conf文件

[hadoop@hadoop101 conf]$ vim flume2.conf

在文件配置如下内容

## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.zookeeperConnect = hadoop101:2181,hadoop102:2181,hadoop103:2181
a1.sources.r1.kafka.topics=topic-volte

## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sources.r2.kafka.zookeeperConnect = hadoop101:2181,hadoop102:2181,hadoop103:2181
a1.sources.r2.kafka.topics=topic-cdr

## channel1
a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=10000

## channel2
a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=10000

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/calllogs/records/topic-volte/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = volte-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 30
a1.sinks.k1.hdfs.roundUnit = second

##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/calllogs/records/topic-cdr/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = cdr-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 30
a1.sinks.k2.hdfs.roundUnit = second

## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k2.hdfs.rollInterval = 30
a1.sinks.k2.hdfs.rollSize = 0
a1.sinks.k2.hdfs.rollCount = 0

## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream 
a1.sinks.k2.hdfs.fileType = CompressedStream 

a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2

4.5.2 Flume异常处理
1)问题描述:如果启动消费Flume抛出如下异常

ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded

2)解决方案步骤:
(1)在hadoop101服务器的/opt/modules/flume/conf/flume-env.sh文件中增加如下配置

export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

(2)同步配置到hadoop102、hadoop103服务器

[hadoop@hadoop101 conf]$ xsync flume-env.sh

4.5.2 日志消费Flume启动停止脚本
1)在/home/hadoop/bin目录下创建脚本calllog-flume2.sh

[hadoop@hadoop101 bin]$ vim calllog-flume2.sh

在脚本中填写如下内容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop103
        do
                echo " --------启动 $i 消费flume-------"
                ssh $i "nohup /opt/modules/flume/bin/flume-ng agent --conf /opt/modules/flume/conf/ --conf-file /opt/datas/calllogs/flume2/flume2.conf &"
        done
};;
"stop"){
        for i in hadoop103
        do
                echo " --------停止 $i 消费flume-------"
                ssh $i "ps -ef | grep flume2 | grep -v grep | awk '{print $2}' | xargs kill -9"
        done

};;
esac

2)增加脚本执行权限

[hadoop@hadoop101 bin]$ chmod +x calllog-flume2.sh

3)f2集群启动脚本

[hadoop@hadoop101 modules]$ calllog-flume2.sh start

4)f2集群停止脚本

[hadoop@hadoop101 modules]$ calllog-flume2.sh stop
4.6 采集通道启动/停止脚本

1)在/opt/datas/calllogs目录下创建脚本cluster.sh

[hadoop@hadoop101 calllogs]$ vim cluster.sh

在脚本中填写如下内容

#! /bin/bash

case $1 in
"start"){
        echo " -------- 启动 集群 -------"

        echo " -------- 启动 hadoop集群 -------"
        /opt/modules/hadoop-2.7.2/sbin/start-dfs.sh 
        ssh hadoop102 "/opt/modules/hadoop-2.7.2/sbin/start-yarn.sh"

        #启动 Zookeeper集群
        zk.sh start

        #启动 Flume采集集群
        calllog-flume1.sh start

        #启动 Kafka采集集群
        kafka.sh start

        sleep 4s;

        #启动 Flume消费集群
        calllog-flume2.sh start
};;
"stop"){
        echo " -------- 停止 集群 -------"

        #停止 Flume消费集群
        calllog-flume2.sh stop

        #停止 Kafka采集集群
        kafka.sh stop

        sleep 4s;

        #停止 Flume采集集群
        calllog-flume1.sh stop

        #停止 Zookeeper集群
        zk.sh stop

        echo " -------- 停止 hadoop集群 -------"
        ssh hadoop102 "/opt/modules/hadoop-2.7.2/sbin/stop-yarn.sh"
        /opt/modules/hadoop-2.7.2/sbin/stop-dfs.sh 
};;
esac

2)增加脚本执行权限

[hadoop@hadoop101 calllogs]$ chmod +x cluster.sh

3)cluster集群启动脚本

[hadoop@hadoop101 calllogs]$ ./cluster.sh start

4)cluster集群停止脚本

[hadoop@hadoop101 calllogs]$ ./cluster.sh stop
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/688523.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号