报错代码为
val address = IpParse.evaluate(dataMapIp)
调用工具类的如下代码,new 对象的时候找不到文件
val url = “/root/spark/mydatavipday2.ipdb”
db = new City(url);
主代码
ssc.foreachRDD(rdd => {
// 对每个RDD的分区的数据进行操作,主要是建立连接,不要每条数据都建立连接
val newrdd: Unit = rdd.foreachPartition(it => {
// 根据RedisUtil工具类获取redis客户端
val client: JedisCluster = RedisClusterUtil.getJedis
//val start = System.currentTimeMillis
println("程序正在运行。。。")
val client_md: Jedis = RedisUtil.getJedisClient
// 建立mysql的连接
val conn: Connection = MysqlUtil.getConnection
it.foreach(line => {
//将省市县解析后添加的dataMap里面
println("dataMapIp",dataMapIp)
if(dataMapIp != "null" && dataMapIp != ""){
val address = IpParse.evaluate(dataMapIp)
println("address",address)
if (address != null){
dataMap.put("$country",address(0))
dataMap.put("$province",address(1))
dataMap.put("$city",address(2))
}
}
将IP解析为国家、省份、市区 的工具类
package com.tcl.realtime.util
import java.sql.SQLException
import net.ipip.ipdb.City
import net.ipip.ipdb.IPFormatException
import java.util.Objects
object IpParse { //只读一次ipip库文
//sfsdf
def main(args: Array[String]): Unit = {
val address = IpParse.evaluate("14.29.77.234")
println(address(0),address(1),address(2))
}
def evaluate(ip: String): Array[String] = {
var db: City = null
var cns: Array[String] = null
//try {
//val url = this.getClass().getClassLoader().getResource("mydatavipday2.ipdb").getPath()
val url = "/root/spark/mydatavipday2.ipdb"
//val url = "/origin_data/ip/mydatavipday2vim.ipdb"
//println(url)
db = new City(url);
println(db)
//db = new City("D:\code\sparkstream_kudu\src\main\resources\mydatavipday2.ipdb");
if (Objects.isNull(ip) || "" == ip.trim) return null
//todo 1.174.142.24, 52.46.57.139 ip字段中存在脏数据 需要过滤
val split = ip.split(",")
cns = db.find(split(0), "CN")
//} catch {
// case e: Exception =>
// e.printStackTrace();
//}
cns
}
}
2.报错原因
执行命令为local模式的时候,正常运行,不报错
spark-submit --class com.tcl.realtime.app.Events --master local[4] --executor-memory 2g --driver-memory 2g --num-executors 3 --executor-cores 1 /root/spark/sparkstream_kudu-1.0-SNAPSHOT-jar-with-dependencies.jar
执行命令为yarn时候就报错。 new City这段逻辑实际为各个excutor执行的,因此将mydatavipday2.ipdb,放在了各个机器的 /root/spark/ 目录下,并且将spark目录和mydatavipday2.ipdb的权限都设置为了 777。 还是报权限拒绝。 执行jar的用户为root 用户,只有/root 的权限为dr-xr-xr-x 。 没有写权限。
spark-submit --class com.tcl.realtime.app.Events `在这里插入代码片` --master yarn --deploy-mode client --executor-memory 2g --driver-memory 2g --num-executors 3 --executor-cores 1 /root/spark/sparkstream_kudu-1.0-SNAPSHOT-jar-with-dependencies.jar3.问题解决
将各个节点的/root 目录的权限改为777 程序能够正常运行
4.报错详情java.io.FileNotFoundException: /root/spark/mydatavipday2.ipdb (Permission denied) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at net.ipip.ipdb.Reader. (Reader.java:21) at net.ipip.ipdb.City. (City.java:17) at com.tcl.realtime.util.IpParse$.evaluate(IpParse.scala:25) at com.tcl.realtime.app.Events$$anonfun$doSomething$1$$anonfun$1$$anonfun$apply$3.apply(Events.scala:206) at com.tcl.realtime.app.Events$$anonfun$doSomething$1$$anonfun$1$$anonfun$apply$3.apply(Events.scala:46) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.tcl.realtime.app.Events$$anonfun$doSomething$1$$anonfun$1.apply(Events.scala:46) at com.tcl.realtime.app.Events$$anonfun$doSomething$1$$anonfun$1.apply(Events.scala:36) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2121) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)



