数据仓库( Data Warehouse ),可简写为DW或DWH。数据仓库,是为企业所有决策制定过程,提供所有系统数据支持的战略集合。
通过对数据仓库中数据的分析,可以帮助企业,改进业务流程、控制成本、提高产品质量等。
数据仓库,并不是数据的最终目的地,而是为数据最终的目的地做好准备。这些准备包括对数据的:清洗,转义,分类,重组,合并,拆分,统计等等。
1、实时采集埋点的用户行为数据
2、实现数据仓库的分层搭建
3、每天定时导入业务数据
4、根据数据仓库中的数据进行报表分析
数据采集传输:Flume,Kafka,Logstash,DataX,Sqoop
数据存储:Hive,MySql,HDFS,Hbase,S3
数据计算:Spark,Hive,Tez,Flink,Storm
数据查询:Presto,Impala,Kylin
| 软件产品 | 版本 |
|---|---|
| Hadoop | 2.7.2 |
| Flume | 1.7.0 |
| Kafka | 0.11.0.2 |
| Kafka Manager | 1.3.3.22 |
| Hive | 1.2.1 |
| Sqoop | 1.4.6 |
| Mysql | 5.7 |
| Azkaban | 2.5.0 |
| Java | 1.8 |
| Zookeeper | 3.4.10 |
注意事项:框架选型尽量不要选择最新的框架,选择最新框架半年前左右的稳定版。
2.2.5 集群资源规划设计
| 服务器1 | 服务器2 | 服务器3 | |
|---|---|---|---|
| HDFS | NameNode,DataNode | DataNode | DataNode |
| Yarn | NodeManager | Resourcemanager,NodeManager | NodeManager |
| Zookeeper | Zookeeper | Zookeeper | Zookeeper |
| Flume(采集日志) | Flume | Flume | |
| Kafka | Kafka | Kafka | Kafka |
| Flume(消费Kafka) | Flume | ||
| Hive | Hive | ||
| Mysql | Mysql |
private String sysId; // 平台编码 private String serviceName; // 接口服务名称 private String homeProvinceCode; // 归属省 private String visitProvinceCode; // 访问省 private String channelCode; // 渠道编码 private String serviceCode; // 业务流水号 private String cdrGenTime; // 开始时间 private String duration; // 时长 private String recordType; // 话单类型 private String imsi; // 46000开头15位编码 private String msisdn; // 通常指手机号码 private String dataUplinkVolume; // 上行流量 private String dataDownlinkVolume; // 下行流量 private String charge; // 费用 private String resultCode; // 结果编码3.2 话单数据格式:
{"cdrGenTime":"20211206165309844","channelCode":"wRFUC","charge":"62.28","dataDownlinkVolume":"2197","dataUplinkVolume":"467","duration":"1678","homeProvinceCode":"551","imsi":"460007237312954","msisdn":"15636420864","recordType":"gprs","resultCode":"000000","serviceCode":"398814910321850","serviceName":"a9icp8xcNy","sysId":"U1ob6","visitProvinceCode":"451"}
3.3 编写项目模拟生成话单数据
详见项目:cdr_gen_app
对项目进行打包并部署到Hadoop101机器上,执行命令测试数据生成
[hadoop@hadoop101 ~]$ head datas/call.log
{"cdrGenTime":"20211206173630762","channelCode":"QEHVc","charge":"78.59","dataDownlinkVolume":"3007","dataUplinkVolume":"3399","duration":"1545","homeProvinceCode":"571","imsi":"460002441249614","msisdn":"18981080935","recordType":"gprs","resultCode":"000000","serviceCode":"408688053609568","serviceName":"8lr23kV4C2","sysId":"j4FbV","visitProvinceCode":"931"}
{"cdrGenTime":"20211206173630884","channelCode":"TY4On","charge":"31.17","dataDownlinkVolume":"1853","dataUplinkVolume":"2802","duration":"42","homeProvinceCode":"891","imsi":"460008474876800","msisdn":"15998955319","recordType":"gprs","resultCode":"000000","serviceCode":"739375515156215","serviceName":"6p87h0OHdC","sysId":"UnI9V","visitProvinceCode":"991"}
...
4 数据采集
4.1 Hadoop环境准备
| Hadoop101 | Hadoop102 | Hadoop103 | |
|---|---|---|---|
| HDFD | NameNode, DataNode | DataNode | DataNode |
| Yarn | NodeManager | ResourceManager,NodeManager | NodeManager |
1)先下载lzo的jar项目
https://github.com/twitter/hadoop-lzo/archive/master.zip">https://github.com/twitter/hadoop-lzo/archive/master.zip
2)下载后的文件名是hadoop-lzo-master,它是一个zip格式的压缩包,先进行解压,然后用maven编译。生成hadoop-lzo-0.4.20。
3)将编译好后的hadoop-lzo-0.4.20.jar 放入hadoop-2.7.2/share/hadoop/common/
[hadoop@hadoop101 common]$ pwd /opt/modules/hadoop-2.7.2/share/hadoop/common [hadoop@hadoop101 common]$ ls hadoop-lzo-0.4.20.jar
4)同步hadoop-lzo-0.4.20.jar到hadoop102、hadoop103
[hadoop@hadoop101 common]$ xsync hadoop-lzo-0.4.20.jar4.1.2 添加配置
1)core-site.xml增加配置支持LZO压缩
io.compression.codecs org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.BZip2Codec, org.apache.hadoop.io.compress.SnappyCodec, com.hadoop.compression.lzo.LzoCodec, com.hadoop.compression.lzo.LzopCodec io.compression.codec.lzo.class com.hadoop.compression.lzo.LzoCodec
2)同步core-site.xml到hadoop102、hadoop103
[hadoop@hadoop101 hadoop]$ xsync core-site.xml4.1.2 启动集群
[hadoop@hadoop101 hadoop-2.7.2]$ sbin/start-dfs.sh [hadoop@hadoop102 hadoop-2.7.2]$ sbin/start-yarn.sh4.1.3 验证
1)web和进程查看
- Web查看:http://hadoop101:50070
- 进程查看:jps查看各个节点状态。
| Hadoop101 | Hadoop102 | Hadoop103 | |
|---|---|---|---|
| Zookeeper | Zookeeper | Zookeeper | Zookeeper |
1)在hadoop101的/home/hadoop/bin目录下创建脚本
[hadoop@hadoop101 bin]$ vim zk.sh
在脚本中编写如下内容
#! /bin/bash
case $1 in
"start"){
for i in hadoop101 hadoop102 hadoop103
do
ssh $i "/opt/modules/zookeeper-3.4.10/bin/zkServer.sh start"
done
};;
"stop"){
for i in hadoop101 hadoop102 hadoop103
do
ssh $i "/opt/modules/zookeeper-3.4.10/bin/zkServer.sh stop"
done
};;
esac
2)增加脚本执行权限
[hadoop@hadoop101 bin]$ chmod +x zk.sh
3)Zookeeper集群启动脚本
[hadoop@hadoop101 modules]$ zk.sh start
4)Zookeeper集群停止脚本
[hadoop@hadoop101 modules]$ zk.sh stop4.3 Flume环境准备
| Hadoop101 | Hadoop102 | Hadoop103 | |
|---|---|---|---|
| Flume(采集) | Flume |
flume配置分析
Flume的具体配置
(1)在/opt/datas/calllogs/flume1/目录下创建flume1.conf文件
[hadoop@hadoop101 conf]$ vim flume1.conf
在文件配置如下内容
a1.sources=r1 a1.channels=c1 c2 a1.sinks=k1 k2 # configure source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/datas/calllogs/flume/log_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/datas/calllogs/records/calllogs.+ a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 c2 #interceptor a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = com.cmcc.jackyan.flume.interceptor.LogETLInterceptor$Builder a1.sources.r1.interceptors.i2.type = com.cmcc.jackyan.flume.interceptor.LogTypeInterceptor$Builder # selector a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = recordType a1.sources.r1.selector.mapping.volte= c1 a1.sources.r1.selector.mapping.cdr= c2 # configure channel a1.channels.c1.type = memory a1.channels.c1.capacity=10000 a1.channels.c1.byteCapacityBufferPercentage=20 a1.channels.c2.type = memory a1.channels.c2.capacity=10000 a1.channels.c2.byteCapacityBufferPercentage=20 # configure sink # start-sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = topic_volte a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092 a1.sinks.k1.kafka.flumeBatchSize = 2000 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.channel = c1 # event-sink a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k2.kafka.topic = topic_cdr a1.sinks.k2.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092 a1.sinks.k2.kafka.flumeBatchSize = 2000 a1.sinks.k2.kafka.producer.acks = 1 a1.sinks.k2.channel = c2
注意:com.cmcc.jackyan.flume.interceptor.LogETLInterceptor和com.jackyan.flume.interceptor.LogTypeInterceptor是自定义的拦截器的全类名。需要根据用户自定义的拦截器做相应修改。
4.3.3 自定义Flume拦截器这里自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。
ETL拦截器主要用于,过滤时间戳不合法和json数据不完整的日志
日志类型区分拦截器主要用于,将volte话单和其他话单区分开来,方便发往kafka的不同topic。
1)创建maven模块flume-interceptor
2)创建包名:com.cmcc.jackyan.flume.interceptor
3)在pom.xml文件中添加如下配置
org.apache.flume flume-ng-core1.7.0 maven-compiler-plugin 2.3.2 1.8 1.8 maven-assembly-plugin jar-with-dependencies com.cmcc.jackyan.appclient.AppMain make-assembly package single
4)在com.cmcc.jackyan.flume.interceptor包下创建LogETLInterceptor类名
Flume ETL拦截器LogETLInterceptor
public class LogETLInterceptor implements Interceptor {
//打印日志,用于调试
private static final Logger logger = LoggerFactory.getLogger(LogETLInterceptor.class);
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
String body = new String(event.getBody(), Charset.forName("UTF-8"));
// logger.info("Before:" + body);
event.setBody(LogUtils.addTimeStamp(body).getBytes());
body = new String(event.getBody(), Charset.forName("UTF-8"));
// logger.info("After:" + body);
return event;
}
@Override
public List intercept(List events) {
ArrayList intercepts = new ArrayList<>();
// 遍历所有的events,过滤掉不合法的
for (Event event : events) {
Event interceptEvent = intercept(event);
if (interceptEvent != null) {
intercepts.add(event);
}
}
return intercepts;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new LogETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
4)Flume日志处理工具类
public class LogUtils {
private static Logger logger = LoggerFactory.getLogger(LogUtils.class);
public static String addTimeStamp(String log) {
//{"cdrGenTime":"20211207091838913","channelCode":"k82Ce","charge":"86.14","dataDownlinkVolume":"3083","dataUplinkVolume":"2311","duration":"133","homeProvinceCode":"210","imsi":"460004320498004","msisdn":"17268221069","recordType":"mms","resultCode":"000000","serviceCode":"711869363795868","serviceName":"H79cKXuJO4","sysId":"aGIL4","visitProvinceCode":"220"}
long t = System.currentTimeMillis();
return t + "|" + log;
}
}
5)Flume日志类型区分拦截器LogTypeInterceptor
public class LogTypeInterceptor implements Interceptor {
//打印日志,用于调试
private static final Logger logger = LoggerFactory.getLogger(LogTypeInterceptor.class);
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
// 1获取flume接收消息头
Map headers = event.getHeaders();
// 2获取flume接收的json数据数组
byte[] json = event.getBody();
// 将json数组转换为字符串
String jsonStr = new String(json);
String recordType = "" ;
// volte
if (jsonStr.contains("volte")) {
recordType = "volte";
}
// cdr
else {
recordType = "cdr";
}
// 3将日志类型存储到flume头中
headers.put("recordType", recordType);
// logger.info("recordType:" + recordType);
return event;
}
@Override
public List intercept(List events) {
ArrayList interceptors = new ArrayList<>();
for (Event event : events) {
Event interceptEvent = intercept(event);
interceptors.add(interceptEvent);
}
return interceptors;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
public Interceptor build() {
return new LogTypeInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
6)打包
拦截器打包之后,只需要单独包,不需要将依赖的包上传。打包之后要放入flume的lib文件夹下面。
7)需要先将打好的包放入到hadoop101的/opt/modules/flume/lib文件夹下面。
[hadoop@hadoop101 lib]$ ls | grep interceptor flume-interceptor-1.0-SNAPSHOT.jar
8)启动flume
[hadoop@hadoop101 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file /opt/datas/calllogs/flume1/flume1.conf &
4.3.4 日志采集Flume启动停止脚本
1)在/home/hadoop/bin目录下创建脚本calllog-flume1.sh
[hadoop@hadoop101 bin]$ vim calllog-flume1.sh
在脚本中填写如下内容
#! /bin/bash
case $1 in
"start"){
for i in hadoop101
do
echo " --------启动 $i 消费flume-------"
ssh $i "nohup /opt/modules/flume/bin/flume-ng agent --conf /opt/modules/flume/conf/ --conf-file /opt/datas/calllogs/flume1/flume1.conf &"
done
};;
"stop"){
for i in hadoop101
do
echo " --------停止 $i 消费flume-------"
ssh $i "ps -ef | grep flume1 | grep -v grep | awk '{print $2}' | xargs kill -9"
done
};;
esac
说明:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思。
2)增加脚本执行权限
[hadoop@hadoop101 bin]$ chmod +x calllog-flume1.sh
3)flume1集群启动脚本
[hadoop@hadoop101 modules]$ calllog-flume1.sh start
4)flume1集群停止脚本
[hadoop@hadoop101 modules]$ calllog-flume1.sh stop4.4 Kafka环境准备
| Hadoop101 | Hadoop102 | Hadoop103 | |
|---|---|---|---|
| Kafka | Kafka | Kafka | Kafka |
1)在/home/hadoop/bin目录下创建脚本kafka.sh
[hadoop@hadoop101 bin]$ vim kafka.sh
在脚本中填写如下内容
#! /bin/bash
case $1 in
"start"){
for i in hadoop101 hadoop102 hadoop103
do
echo " --------启动 $i kafka-------"
# 用于KafkaManager监控
ssh $i "export JMX_PORT=9988 && /opt/modules/kafka/bin/kafka-server-start.sh -daemon /opt/modules/kafka/config/server.properties "
done
};;
"stop"){
for i in hadoop101 hadoop102 hadoop103
do
echo " --------停止 $i kafka-------"
ssh $i "ps -ef | grep server.properties | grep -v grep| awk '{print $2}' | xargs kill >/dev/null 2>&1 &"
done
};;
esac
注意:启动Kafka时要先开启JMX端口,是用于后续KafkaManager监控。
2)增加脚本执行权限
[hadoop@hadoop101 bin]$ chmod +x kafka.sh
3)kf集群启动脚本
[hadoop@hadoop101 modules]$ kafka.sh start
4)kf集群停止脚本
[hadoop@hadoop101 modules]$ kafka.sh stop4.4.2 查看所有Kafka topic
[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --list4.4.3 创建 Kafka topic
进入到/opt/modules/kafka/目录下分别创建:启动日志主题、事件日志主题。
1)创建volte主题
[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --create --replication-factor 2 --partitions 3 --topic topic-volte
2)创建cdr主题
[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --create --replication-factor 2 --partitions 3 --topic topic-cdr4.4.4 删除 Kafka topic
[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic topic-volte [hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop101:2181,hadoop102:2181,hadoop103:2181 --topic topic-cdr4.4.5 生产消息
[hadoop@hadoop101 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop101:9092 --topic topic-volte >hello world >hadoop hadoop4.4.6 消费消息
[hadoop@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop101:9092 --from-beginning --topic topic-volte
--from-beginning:会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。
4.4.7 查看某个Topic的详情[hadoop@hadoop101 kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --describe --topic topic-volte4.5 Flume消费Kafka数据写到HDFS
| Hadoop101 | Hadoop102 | Hadoop103 | |
|---|---|---|---|
| Flume(消费kafka) | Flume |
配置分析
Flume的具体配置
(1)在hadoop103的/opt/datas/calllogs/flume2/目录下创建flume2.conf文件
[hadoop@hadoop101 conf]$ vim flume2.conf
在文件配置如下内容
## 组件 a1.sources=r1 r2 a1.channels=c1 c2 a1.sinks=k1 k2 ## source1 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092 a1.sources.r1.kafka.zookeeperConnect = hadoop101:2181,hadoop102:2181,hadoop103:2181 a1.sources.r1.kafka.topics=topic-volte ## source2 a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r2.batchSize = 5000 a1.sources.r2.batchDurationMillis = 2000 a1.sources.r2.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092 a1.sources.r2.kafka.zookeeperConnect = hadoop101:2181,hadoop102:2181,hadoop103:2181 a1.sources.r2.kafka.topics=topic-cdr ## channel1 a1.channels.c1.type=memory a1.channels.c1.capacity=100000 a1.channels.c1.transactionCapacity=10000 ## channel2 a1.channels.c2.type=memory a1.channels.c2.capacity=100000 a1.channels.c2.transactionCapacity=10000 ## sink1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/calllogs/records/topic-volte/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = volte- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 30 a1.sinks.k1.hdfs.roundUnit = second ##sink2 a1.sinks.k2.type = hdfs a1.sinks.k2.hdfs.path = /origin_data/calllogs/records/topic-cdr/%Y-%m-%d a1.sinks.k2.hdfs.filePrefix = cdr- a1.sinks.k2.hdfs.round = true a1.sinks.k2.hdfs.roundValue = 30 a1.sinks.k2.hdfs.roundUnit = second ## 不要产生大量小文件 a1.sinks.k1.hdfs.rollInterval = 30 a1.sinks.k1.hdfs.rollSize = 0 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k2.hdfs.rollInterval = 30 a1.sinks.k2.hdfs.rollSize = 0 a1.sinks.k2.hdfs.rollCount = 0 ## 控制输出文件是原生文件。 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k2.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = lzop a1.sinks.k2.hdfs.codeC = lzop ## 拼装 a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1 a1.sources.r2.channels = c2 a1.sinks.k2.channel= c2
4.5.2 Flume异常处理
1)问题描述:如果启动消费Flume抛出如下异常
ERROR hdfs.HDFSEventSink: process failed java.lang.OutOfMemoryError: GC overhead limit exceeded
2)解决方案步骤:
(1)在hadoop101服务器的/opt/modules/flume/conf/flume-env.sh文件中增加如下配置
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
(2)同步配置到hadoop102、hadoop103服务器
[hadoop@hadoop101 conf]$ xsync flume-env.sh
4.5.2 日志消费Flume启动停止脚本
1)在/home/hadoop/bin目录下创建脚本calllog-flume2.sh
[hadoop@hadoop101 bin]$ vim calllog-flume2.sh
在脚本中填写如下内容
#! /bin/bash
case $1 in
"start"){
for i in hadoop103
do
echo " --------启动 $i 消费flume-------"
ssh $i "nohup /opt/modules/flume/bin/flume-ng agent --conf /opt/modules/flume/conf/ --conf-file /opt/datas/calllogs/flume2/flume2.conf &"
done
};;
"stop"){
for i in hadoop103
do
echo " --------停止 $i 消费flume-------"
ssh $i "ps -ef | grep flume2 | grep -v grep | awk '{print $2}' | xargs kill -9"
done
};;
esac
2)增加脚本执行权限
[hadoop@hadoop101 bin]$ chmod +x calllog-flume2.sh
3)f2集群启动脚本
[hadoop@hadoop101 modules]$ calllog-flume2.sh start
4)f2集群停止脚本
[hadoop@hadoop101 modules]$ calllog-flume2.sh stop4.6 采集通道启动/停止脚本
1)在/opt/datas/calllogs目录下创建脚本cluster.sh
[hadoop@hadoop101 calllogs]$ vim cluster.sh
在脚本中填写如下内容
#! /bin/bash
case $1 in
"start"){
echo " -------- 启动 集群 -------"
echo " -------- 启动 hadoop集群 -------"
/opt/modules/hadoop-2.7.2/sbin/start-dfs.sh
ssh hadoop102 "/opt/modules/hadoop-2.7.2/sbin/start-yarn.sh"
#启动 Zookeeper集群
zk.sh start
#启动 Flume采集集群
calllog-flume1.sh start
#启动 Kafka采集集群
kafka.sh start
sleep 4s;
#启动 Flume消费集群
calllog-flume2.sh start
};;
"stop"){
echo " -------- 停止 集群 -------"
#停止 Flume消费集群
calllog-flume2.sh stop
#停止 Kafka采集集群
kafka.sh stop
sleep 4s;
#停止 Flume采集集群
calllog-flume1.sh stop
#停止 Zookeeper集群
zk.sh stop
echo " -------- 停止 hadoop集群 -------"
ssh hadoop102 "/opt/modules/hadoop-2.7.2/sbin/stop-yarn.sh"
/opt/modules/hadoop-2.7.2/sbin/stop-dfs.sh
};;
esac
2)增加脚本执行权限
[hadoop@hadoop101 calllogs]$ chmod +x cluster.sh
3)cluster集群启动脚本
[hadoop@hadoop101 calllogs]$ ./cluster.sh start
4)cluster集群停止脚本
[hadoop@hadoop101 calllogs]$ ./cluster.sh stop



