- 5.3 Spark SQL探索分析法律服务网站数据
- 1.安装MySQL
- 2.启动MySQL
- 3.配置hive
- hive-config.sh(/hive/conf/)用老师的
- hive-site.xml(/hive/conf/)用老师的
- hive-env.sh(/hive/sbin)用老师的
- MySQL连接工具
- 4.配置spark
- 5.连接数据库
- 6.hive操作
- 7.进入spark进行操作
- 报错解决
实验基于云创大数据实验平台
本文所需文件(mysql-8.0.25-linux-glibc2.17-x86_64-minimal.tar.xz/mysql-connector-java-8.0.25.jar/hive-env.sh/hive-config.sh/hive-site.xml/law_utf8.csv)均可百度云获取
链接:https://pan.baidu.com/s/1xRDDnwtR8pgCrb-AGgJzpg
提取码:0t29
路径:
| 服务 | 路径 |
|---|---|
| Hadoop | /usr/cstor/hadoop |
| spark | /usr/cstor/spark |
| hive | /usr/cstor/hive |
| mysql | /usr/cstor/mysql(需自行安装) |
所需文件 mysql-8.0.25-linux-glibc2.17-x86_64-minimal.tar.xz
需要解压MySQL安装包
mv mysql-8.0.25-linux-glibc2.17-x86_64-minimal mysql
改下名
2.启动MySQL进入MySQL目录下,执行
[201805110159 root@master mysql]# mysql -u root -p
Enter password: 此处不要输入密码
Welcome to the MariaDB monitor. Commands end with ; or g.
Your MariaDB connection id is 1
Server version: 5.5.64-MariaDB MariaDB Server
MariaDB [(none)]> use mysql;
Database changed
MariaDB [mysql]> update user set password=password("root") where user='root';
Query OK, 4 rows affected (0.00 sec)
MariaDB [mysql]> flush privileges;
Query OK, 0 rows affected (0.00 sec)
MariaDB [mysql]> exit;
Bye
3.配置hive
hive-config.sh(/hive/conf/)用老师的
this="$0"
while [ -h "$this" ]; do
ls=`ls -ld "$this"`
link=`expr "$ls" : '.*-> (.*)$'`
if expr "$link" : '.*/.*' > /dev/null; then
this="$link"
else
this=`dirname "$this"`/"$link"
fi
done
# convert relative path to absolute path
bin=`dirname "$this"`
script=`basename "$this"`
bin=`cd "$bin"; pwd`
this="$bin/$script"
# the root of the Hive installation
if [[ -z $HIVE_HOME ]] ; then
export HIVE_HOME=`dirname "$bin"`
fi
#check to see if the conf dir is given as an optional argument
while [ $# -gt 0 ]; do # Until you run out of parameters . . .
case "$1" in
--config)
shift
confdir=$1
shift
HIVE_CONF_DIR=$confdir
;;
--auxpath)
shift
HIVE_AUX_JARS_PATH=$1
shift
;;
*)
break;
;;
esac
done
# Allow alternate conf dir location.
HIVE_CONF_DIR="${HIVE_CONF_DIR:-$HIVE_HOME/conf}"
export HIVE_CONF_DIR=$HIVE_CONF_DIR
export HIVE_AUX_JARS_PATH=$HIVE_AUX_JARS_PATH
# Default to use 256MB
export HADOOP_HEAPSIZE=${HADOOP_HEAPSIZE:-256}
export JAVA_HOME=/usr/local/jdk1.8.0_161
export HADOOP_HOME=/usr/cstor/hadoop
export HIVE_HOME=/usr/cstor/hive
hive-site.xml(/hive/conf/)用老师的
这个文档太大了,百度云自取
下方的为自己配置,可以更改成自己的相关配置
hive-env.sh(/hive/sbin)用老师的javax.jdo.option.ConnectionURL jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true&useSSL=false javax.jdo.option.ConnectionUserName hive javax.jdo.option.ConnectionPassword 123456 javax.jdo.option.ConnectionDriverName com.mysql.jdbc.Driver
HADOOP_HOME=/usr/cstor/hadoop
#配置后请执行 source hive-env.sh #并记得配置环境变量 vim /etc/profile #末尾加上 export HIVE_HOME=/usr/cstor/hive export PATH=$PATH:$HIVE_HOME/binMySQL连接工具
MySQL连接工具需要放到/spark/lib以及/hive/lib中
4.配置sparkmysql-connector-java-8.0.25.jar
仅针对spark已经安装好(实验所用集群为 云创实验十七sparksql)
[201805110159 root@master ~]# cat /usr/cstor/spark/sbin/spark-config.sh #末尾修改成正确的java地址 export JAVA_HOME=/usr/local/jdk1.8.0_161 #修改日志等级 [201805110159 root@master conf]# cd /usr/cstor/spark/conf [201805110159 root@master conf]# cp log4j.properties.template log4j.properties [201805110159 root@master conf]# vim log4j.properties [201805110159 root@master ~]# cat /usr/cstor/spark/conf/log4j.properties | grep ERROR log4j.rootCategory=ERROR, console #将此行的INFO改为ERROR,减少无聊信息打搅
MySQL连接工具需要放到/spark/lib以及/hive/lib中
将/hive/conf/hive-site.xml复制到/spark/conf/下
cp /usr/cstor/hive/conf/hive-site.xml /usr/cstor/spark/conf/
传输数据到集群另两台机器上,完成spark的配置
[201805110159 root@master cstor]# scp -r /usr/cstor/spark/ root@slave1:/usr/cstor/ [201805110159 root@master cstor]# scp -r /usr/cstor/spark/ root@slave2:/usr/cstor/
启动spark jps查看节点
5.连接数据库schematool -initSchema -dbType mysql
6.hive操作记得将law_utf8.csv上传到Hadoop的 /user/root/ 上
hadoop fs -mkdir -p /user/root/
hadoop fs -put law_utf8.csv /user/root/
#创建law表并导入数据 create database law; use law; CREATE TABLE law ( ip bigint, area int, ie_proxy string, ie_type string , userid string, clientid string, time_stamp bigint, time_format string, pagepath string, ymd int, visiturl string, page_type string, host string, page_title string, page_title_type int, page_title_name string, title_keyword string, in_port string, in_url string, search_keyword string, source string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE; load data inpath '/user/root/law_utf8.csv' overwrite into table law;7.进入spark进行操作
[201805110159 root@master ~]#cd /usr/cstor/hive/bin [201805110159 root@master bin]#./hive --service metastore &
复制一个终端并执行
启动hive --service metastore &的时候如果报错
Could not create ServerSocket on address 0.0.0.0/0.0.0.0:9083
jps一下,查看一下进程,会看到有RunJar这个进程,杀死它,即可
kill -9 进程号
[201805110159 root@master data]# spark-shell --master local[2] --jars /usr/cstor/spark/lib/mysql-connector-java-8.0.25.jar
scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode
scala> val sparkContext=new org.apache.spark.sql.SQLContext(sc)
scala> val hiveContext=new org.apache.spark.sql.hive.HiveContext(sc)
#查看hive数据库
scala> hiveContext.sql("show databases").show()
#使用law数据库
scala> hiveContext.sql("use law")
#2.网页类型分析
#(1)网页类型统计
scala> val pageType=hiveContext.sql("select substring(page_type,1,3) as page_type,count(*) as count_num,round((count(*)/837450.0)*100,4) as weights from law group by substring(page_type,1,3)")
pageType: org.apache.spark.sql.Dataframe = [page_type: string, count_num: bigint, weights: double]
scala> pageType.orderBy(-pageType("count_num")).show()
+---------+---------+-------+
|page_type|count_num|weights|
+---------+---------+-------+
| 101| 411665| 49.157|
| 199| 201399|24.0491|
| 107| 182900|21.8401|
| 301| 18430| 2.2007|
| 102| 17357| 2.0726|
| 106| 3957| 0.4725|
| 103| 1715| 0.2048|
| "ht| 14| 0.0017|
| 201| 12| 0.0014|
| cfr| 1| 1.0E-4|
+---------+---------+-------+
scala> pageType.repartition(1).save("/user/root/sparkSql/pageType.json","json",SaveMode.Overwrite)
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
#(2)网页类别统计
scala> val pageLevel=hiveContext.sql("select substring(page_type,1,7) as page_type,count(*) as count_num from law where visiturl like '%faguizt%' and substring(page_type,1,7) like '%199%' group by page_type")
pageLevel: org.apache.spark.sql.Dataframe = [page_type: string, count_num: bigint]
scala> pageLevel.show()
+---------+---------+
|page_type|count_num|
+---------+---------+
| 1999001| 47407|
+---------+---------+
#(3)咨询内部统计
scala> pageLevel.repartition(1).save("/user/root/sparkSql/pageLevel.json","json",SaveMode.Overwrite)
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
scala> val consultCount=hiveContext.sql("select substring(page_type,1,6) as page_type,count(*) as count_num,round((count(*)/411665.0)*100,4) as weights from law where substring(page_type,1,3)=101 group by substring(page_type,1,6)")
consultCount: org.apache.spark.sql.Dataframe = [page_type: string, count_num: bigint, weights: double]
scala> consultCount.orderBy(-consultCount("count_num")).show()
+---------+---------+-------+
|page_type|count_num|weights|
+---------+---------+-------+
| 101003| 396612|96.3434|
| 101002| 7776| 1.8889|
| 101001| 5603| 1.3611|
| 101009| 854| 0.2075|
| 101008| 378| 0.0918|
| 101007| 147| 0.0357|
| 101004| 125| 0.0304|
| 101006| 107| 0.026|
| 101005| 63| 0.0153|
+---------+---------+-------+
scala> consultCount.repartition(1).save("/user/root/sparkSql/consultCount.json","json",SaveMode.Overwrite)
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
#(4)网页中带有“?”的记录统计
scala> val pageWith=hiveContext.sql("select substring(page_type,1,7) as page_type,count(*) as count_num,round((count(*)*100)/65477.0,4) as weights from law where visiturl like '%?%' group by substring(page_type,1,7)")
pageWith: org.apache.spark.sql.Dataframe = [page_type: string, count_num: bigint, weights: double]
scala> pageWith.orderBy(-pageWith("weights")).show()
+---------+---------+-------+
|page_type|count_num|weights|
+---------+---------+-------+
| 1999001| 64691|98.7996|
| 301001| 356| 0.5437|
| 107001| 346| 0.5284|
| 101003| 47| 0.0718|
| 102002| 25| 0.0382|
| 2015020| 5| 0.0076|
| 2015042| 3| 0.0046|
| 2015021| 2| 0.0031|
| 2015031| 2| 0.0031|
+---------+---------+-------+
scala> pageWith.repartition(1).save("/user/root/sparkSql/pageWith.json","json",SaveMode.Overwrite)
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
#(5)分析其他类型网页的内部规律
scala> val otherPage=hiveContext.sql("select count(*) as count_num,round((count(*)/64691.0)*100,4) as weights,page_title from law where visiturl like '%?%' and substring(page_type,1,7)=1999001 group by page_title")
otherPage: org.apache.spark.sql.Dataframe = [count_num: bigint, weights: double, page_title: string]
scala> otherPage.orderBy(-otherPage("count_num")).limit(5).show()
+---------+-------+--------------------+
|count_num|weights| page_title|
+---------+-------+--------------------+
| 49894|77.1266| 法律快车-律师助手|
| 6166| 9.5315| 免费发布法律咨询 - 法律快车法律咨询|
| 4455| 6.8866| 咨询发布成功|
| 765| 1.1825| 咨询发布成功 - 法律快车|
| 342| 0.5287|法律快搜-中国法律搜索第一品牌(s...|
+---------+-------+--------------------+
scala> otherPage.orderBy(-otherPage("count_num")).limit(5).save("/user/root/ sparkSql/otherPage.json","json",SaveMode.Overwrite)
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
#(6)统计“瞎逛用户”点击的网页类型
scala> val streel=hiveContext.sql("select count(*) as count_num,substring(page_type,1,3) as page_type from law where visiturl not like '%.html' group by substring(page_type,1,3)")
streel: org.apache.spark.sql.Dataframe = [count_num: bigint, page_type: string]
scala> streel.orderBy(-streel("count_num")).limit(6).show()
+---------+---------+
|count_num|page_type|
+---------+---------+
| 118011| 199|
| 18175| 107|
| 17357| 102|
| 7130| 101|
| 3957| 106|
| 1024| 301|
+---------+---------+
scala> streel.orderBy(-streel("count_num")).limit(6).save("/user/root/sparkSql/streel.json","json",SaveMode.Overwrite)
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
#3.点击次数分析
#(1)用户点击次数统计
scala> val clickCount=hiveContext.sql("select click_num,count(click_num) as count,round(count(click_num)*100/350090.0,2),round((count(click_num)*click_num)*100/837450.0,2) from (select count(userid) as click_num from law group by userid)tmp_table group by click_num order by count desc")
clickCount: org.apache.spark.sql.Dataframe = [click_num: bigint, count: bigint, _c2: double, _c3: double]
scala> clickCount.limit(7).show()
+---------+------+-----+-----+
|click_num| count| _c2| _c3|
+---------+------+-----+-----+
| 1|229365|65.52|27.39|
| 2| 63605|18.17|15.19|
| 3| 20992| 6.0| 7.52|
| 4| 12079| 3.45| 5.77|
| 5| 6177| 1.76| 3.69|
| 6| 4181| 1.19| 3.0|
| 7| 2556| 0.73| 2.14|
+---------+------+-----+-----+
scala> clickCount.limit(7).save("/user/root/sparkSql/clickCount.json","json",SaveMode.Overwrite)
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
#(2)浏览一次用户统计分析
scala> val onceScan=hiveContext.sql("select page_type,count(page_type) as count,round((count(page_type)*100)/229365.0,4) from (select substring(a.page_type,1,7) as page_type from law a,(select userid from law group by userid having(count(userid)=1))b where a.userid=b.userid)c group by page_type order by count desc")
onceScan: org.apache.spark.sql.Dataframe = [page_type: string, count: bigint, _c2: double]
scala> onceScan.limit(5).show()
+---------+------+-------+
|page_type| count| _c2|
+---------+------+-------+
| 101003|171804|74.9042|
| 107001| 36915|16.0944|
| 1999001| 18581| 8.1011|
| 301001| 1314| 0.5729|
| 102001| 173| 0.0754|
+---------+------+-------+
scala> onceScan.limit(5).save("/user/root/sparkSql/onceScan.json","json",SaveMode.Overwrite)
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
#(3)统计点击一次用户访问URL排名
scala> val urlRank=hiveContext.sql("select a.visiturl,count(*) as count from law a,(select userid from law group by userid having(count(userid)=1))b where a.userid=b.userid group by a.visiturl")
urlRank: org.apache.spark.sql.Dataframe = [visiturl: string, count: bigint]
scala> urlRank.orderBy(-urlRank("count")).limit(7).show(false)
+---------------------------------------------------------------+-----+
|visiturl |count|
+---------------------------------------------------------------+-----+
|http://www.lawtime.cn/info/shuifa/slb/2012111978933.html |2130 |
|http://www.lawtime.cn/ask/exp/13655.html |859 |
|http://www.lawtime.cn/info/hunyin/lhlawlhxy/20110707137693.html|804 |
|http://www.lawtime.cn/info/shuifa/slb/2012111978933_2.html |684 |
|http://www.lawtime.cn/ask/question_925675.html |682 |
|http://www.lawtime.cn/ask/exp/8495.html |534 |
|http://www.lawtime.cn/guangzhou |375 |
+---------------------------------------------------------------+-----+
scala> urlRank.orderBy(-urlRank("count")).limit(7).save("/user/root/sparkSql/urlRank.json","json",SaveMode.Overwrite)
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
#(4)原始数据中包含以.html扩展名的网页点击率统计
scala> val clickHtml=hiveContext.sql("select a.visiturl,count(*) as count from law a where a.visiturl like '%.html%' group by a.visiturl")
clickHtml: org.apache.spark.sql.Dataframe = [visiturl: string, count: bigint]
scala> clickHtml.orderBy(-clickHtml("count")).limit(10).show(false)
+-----------------------------------------------------------------+-----+
|visiturl |count|
+-----------------------------------------------------------------+-----+
|http://www.lawtime.cn/faguizt/23.html |6503 |
|http://www.lawtime.cn/info/hunyin/lhlawlhxy/20110707137693.html |4938 |
|http://www.lawtime.cn/faguizt/9.html |4562 |
|http://www.lawtime.cn/info/shuifa/slb/2012111978933.html |4495 |
|http://www.lawtime.cn/faguizt/11.html |3976 |
|http://www.lawtime.cn/info/hunyin/lhlawlhxy/20110707137693_2.html|3305 |
|http://www.lawtime.cn/faguizt/43.html |3251 |
|http://www.lawtime.cn/faguizt/15.html |2718 |
|http://www.lawtime.cn/faguizt/117.html |2670 |
|http://www.lawtime.cn/faguizt/41.html |2455 |
+-----------------------------------------------------------------+-----+
scala> clickHtml.orderBy(-clickHtml("count")).limit(10).save("/user/root/sparkSql/clickHtml.json","json",SaveMode.Overwrite)
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
#(5)翻页网页统计
scala> hiveContext.sql("select count(*) from law where visiturl like 'http://www.%.cn/info/gongsi/slbgzcdj/201312312876742.html'").show()
+---+
|_c0|
+---+
|221|
+---+
scala> hiveContext.sql("select count(*) from law where visiturl like 'http://www.%.cn/info/gongsi/slbgzcdj/201312312876742_2.html'").show()
+---+
|_c0|
+---+
|141|
+---+
scala> hiveContext.sql("select count(*) from law where visiturl like 'http://www.%.cn/info/hetong/ldht/201311152872128.html'").show()
+---+
|_c0|
+---+
|144|
+---+
scala> hiveContext.sql("select count(*) from law where visiturl like 'http://www.%.cn/info/hetong/ldht/201311152872128_2.html'").show()
+---+
|_c0|
+---+
|377|
+---+
scala> hiveContext.sql("select count(*) from law where visiturl like 'http://www.%.cn/info/hetong/ldht/201311152872128_3.html'").show()
+---+
|_c0|
+---+
|218|
+---+
scala> hiveContext.sql("select count(*) from law where visiturl like 'http://www.%.cn/info/hetong/ldht/201311152872128_4.html'").show()
+---+
|_c0|
+---+
|146|
+---+
[201805110159 root@master ~]# hadoop fs -ls /user/root/sparkSql Found 9 items drwxr-xr-x - root supergroup 0 2021-06-26 22:47 /user/root/sparkSql/clickCount.json drwxr-xr-x - root supergroup 0 2021-06-26 22:50 /user/root/sparkSql/clickHtml.json drwxr-xr-x - root supergroup 0 2021-06-26 22:38 /user/root/sparkSql/consultCount.json drwxr-xr-x - root supergroup 0 2021-06-26 22:49 /user/root/sparkSql/onceScan.json drwxr-xr-x - root supergroup 0 2021-06-26 22:38 /user/root/sparkSql/pageLevel.json drwxr-xr-x - root supergroup 0 2021-06-26 22:37 /user/root/sparkSql/pageType.json drwxr-xr-x - root supergroup 0 2021-06-26 22:39 /user/root/sparkSql/pageWith.json drwxr-xr-x - root supergroup 0 2021-06-26 22:46 /user/root/sparkSql/streel.json drwxr-xr-x - root supergroup 0 2021-06-26 23:49 /user/root/sparkSql/urlRank.json报错解决
错误:saprk报错Table not found: law; line 1 pos 116
错误:Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHivemetaStoreClient
cd /usr/cstor/hive/bin
./hive --service metastore &
#复制终端执行:
spark-shell --master local[2] --jars lib/mysql-connector-java-8.0.25.jar
val sparkContext=new org.apache.spark.sql.SQLContext(sc)
val hiveContext=new org.apache.spark.sql.hive.HiveContext(sc)
#查看hive数据库
hiveContext.sql("show databases").show()
#使用law数据库
hiveContext.sql("use law")
启动hive --service metastore &的时候如果报错
Could not create ServerSocket on address 0.0.0.0/0.0.0.0:9083
jps一下,查看一下进程,会看到有RunJar这个进程,杀死它,即可
kill -9 进程号



