import org.apache.hadoop.hbase.HbaseConfiguration
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.{ FilterList, SingleColumnValueFilter}
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
var ss: SparkSession = null
ss = SparkSession.builder().enableHiveSupport().getOrCreate()
val sc = ss.sparkContext
val hbaseColumns = Array("c:tt", "c:url")
val queryColumns = hbaseColumns.map(c => c).mkString(" ")
val hbaseConfiguration = HbaseConfiguration.create()
hbaseConfiguration.set(TableInputFormat.INPUT_TABLE, "bidword_ad_pps_daily") //hbase 中的表
hbaseConfiguration.set(TableInputFormat.SCAN_COLUMNS, queryColumns)
//添加过滤条件
val filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE)
val scan = new Scan()
val filter1=new SingleColumnValueFilter("c".getBytes,"campaign_source".getBytes,
CompareOp.EQUAL,Bytes.toBytes("PPS-SHOPPING"))
val filter2=new SingleColumnValueFilter("c".getBytes,"campaign_source".getBytes,
CompareOp.EQUAL,Bytes.toBytes("PPS-APP"))
filterList.addFilter(filter1)
filterList.addFilter(filter2)
scan.setFilter(filterList)
hbaseConfiguration.set(TableInputFormat.SCAN,base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))
val sqlContext = ss.sqlContext
import sqlContext.implicits._
val hbaseRDD = sc.newAPIHadoopRDD(hbaseConfiguration, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).repartition(40)
val dataRDD = hbaseRDD.map({ case (_, result) =>
val item_id = Bytes.toString(result.getRow)
val url = Bytes.toString(result.getValue(Bytes.toBytes("c"), Bytes.toBytes("url")))
val title = Bytes.toString(result.getValue(Bytes.toBytes("c"), Bytes.toBytes("tt")))
(item_id,url,title )
})
dataRDD.toDF().show(20)
spark-shell --name "fetch_hbase_test" --master yarn-client --num-executors 4 --executor-cores 2 --executor-memory 3G --driver-memory 5G --conf spark.driver.maxResultSize=10g --conf spark.yarn.executor.memoryOverhead=10000 --conf spark.serializer="org.apache.spark.serializer.KryoSerializer" --conf spark.shuffle.memoryFraction=0.3 --conf spark.sql.shuffle.partitions=1000 --conf spark.default.parallelism=1000 --jars $path/hbase-client-1.0.2.jar --conf spark.hbase.obtainToken.enabled=true --conf spark.yarn.security.credentials.hbase.enabled=true --files $path/conf/hdfs-site.xml
通常引用hbase-client-1.0.2.jar就可以了,但有些jar包不知啥问题,引用以下几个才行:
--jars $path/spark-hbase_2.11-2.3.2.jar --jars $path/hbase-hadoop-compat-1.3.1.jar --jars $path/hbase-common-1.3.1.jar --jars $path/hbase-client-1.3.1-6407.jar



