CM版本:5.14.3CDH版本:5.14.2Apache Kafka版本:0.10.2SPARK版本:2.2.0Redhat版本:7.3已启用Kerberos,用root用户进行操作 102.2 操作演示
1.准备环境
导出keytab文件
[root@cdh01 ~]# kadmin.local Authenticating as principal hbase/admin@FAYSON.COM with password. kadmin.local: xst -norandkey -k fayson.keytab fayson@FAYSON.COM
检查导出的keytab文件是否正确
[root@cdh01 ~]# klist -ek fayson.keytab
jaas.cof文件:
把fayson.keytab和jaas.conf文件拷贝至集群的所有节点统一的/data/disk1/0286-kafka-shell/conf目录下
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab"
principal="fayson@FAYSON.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab"
principal="fayson@FAYSON.COM";
};
根据需求将conf下面的配置文件修改为自己集群的环境即可,发送至Kafka的JSON数据示例如下:
{
"occupation": "劳动者、运输工作和部分体力生产工作",
"address": "山东东三路18号-6-6",
"city": "长江",
"marriage": "1",
"sex": "1",
"name": "魏淑芬",
"mobile_phone_num": "13508268580",
"bank_name": "广发银行32",
"id": "510105198906185189",
"child_num": "1",
"fix_phone_num": "16004180180"
}
把spark_kafka_version的kafka版本修改为0.10通过CM下载Hbase客户端配置文件
把Spark2访问Hbase的依赖包添加到集群的/opt/cloudera/parcels/SPARK2/lib/spark2/jars目录下,依赖的jar包如下:
需要将依赖包拷贝至集群所有节点
hbase-client-1.2.0-cdh5.14.2.jar hbase-common-1.2.0-cdh5.14.2.jar hbase-protocol-1.2.0-cdh5.14.2.jar htrace-core-3.2.0-incubating.jar
2.开发示例
pom.xml依赖如下
在maven创建scala语言的spark2demo
org.apache.hbase hbase-client1.2.0-cdh5.14.2
添加访问Hbase的集群配置信息hdfs-site.xml/core-stie.xml/hbase-site.xml文件在resources下创建0289.properties配置文件:
kafka.brokers=cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092 kafka.topics=kafka_hbase_topic principal.account=fayson@FAYSON.COM keytab.filepath=/data/disk1/spark2streaming-kafka-hbase/conf/fayson.keytab
创建HbaseUtils.scala类,主要用于创建Hbase的Connection
package utils
import java.io.File
import java.security.PrivilegedAction
import org.apache.hadoop.hbase.{HbaseConfiguration}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
import org.apache.hadoop.security.UserGroupInformation
object HbaseUtil {
def getHbaseConn(confPath: String, principal: String, keytabPath: String): Connection = {
val configuration = HbaseConfiguration.create
val coreFile = new File(confPath + File.separator + "core-site.xml")
if(!coreFile.exists()) {
val in = HbaseUtil.getClass.getClassLoader.getResourceAsStream("hbase-conf/core-site.xml")
configuration.addResource(in)
}
val hdfsFile = new File(confPath + File.separator + "hdfs-site.xml")
if(!hdfsFile.exists()) {
val in = HbaseUtil.getClass.getClassLoader.getResourceAsStream("hbase-conf/hdfs-site.xml")
configuration.addResource(in)
}
val hbaseFile = new File(confPath + File.separator + "hbase-site.xml")
if(!hbaseFile.exists()) {
val in = HbaseUtil.getClass.getClassLoader.getResourceAsStream("hbase-conf/hbase-site.xml")
configuration.addResource(in)
}
UserGroupInformation.setConfiguration(configuration)
UserGroupInformation.loginUserFromKeytab(principal, keytabPath)
val loginUser = UserGroupInformation.getLoginUser
loginUser.doAs(new PrivilegedAction[Connection] {
override def run(): Connection = ConnectionFactory.createConnection(configuration)
})
}
}
创建Kafka2Spark2Hbase.scala文件:
package com.cloudera.streaming
import java.io.{File, FileInputStream}
import java.util.Properties
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import utils.HbaseUtil
import scala.util.Try
import scala.util.parsing.json.JSON
object Kafka2Spark2Hbase {
Logger.getLogger("com").setLevel(Level.ERROR) //设置日志级别
var confPath: String = System.getProperty("user.dir") + File.separator + "conf"
def main(args: Array[String]): Unit = {
//加载配置文件
val properties = new Properties()
val file = new File(confPath + File.separator + "0288.properties")
if(!file.exists()) {
val in = Kafka2Spark2Kudu.getClass.getClassLoader.getResourceAsStream("0289.properties")
properties.load(in);
} else {
properties.load(new FileInputStream(confPath))
}
val brokers = properties.getProperty("kafka.brokers")
val topics = properties.getProperty("kafka.topics")
val principal = properties.getProperty("principal.account")
val keytabFilePath = properties.getProperty("keytab.filepath")
println("kafka.brokers:" + brokers)
println("kafka.topics:" + topics)
if(StringUtils.isEmpty(brokers)|| StringUtils.isEmpty(topics) || StringUtils.isEmpty(principal) || StringUtils.isEmpty(keytabFilePath)) {
println("未配置Kafka和Kerberos信息")
System.exit(0)
}
val topicsSet = topics.split(",").toSet
val spark = SparkSession.builder().appName("Kafka2Spark2Hbase-kerberos").config(new SparkConf()).getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) //设置Spark时间窗口,每5s处理一次
val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers
, "auto.offset.reset" -> "latest"
, "security.protocol" -> "SASL_PLAINTEXT"
, "sasl.kerberos.service.name" -> "kafka"
, "key.deserializer" -> classOf[StringDeserializer]
, "value.deserializer" -> classOf[StringDeserializer]
, "group.id" -> "testgroup"
)
val dStream = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
dStream.foreachRDD(rdd => {
rdd.foreachPartition(partitionRecords => {
val connection = HbaseUtil.getHbaseConn(confPath, principal, keytabFilePath) // 获取Hbase连接
partitionRecords.foreach(line => {
//将Kafka的每一条消息解析为JSON格式数据
val jsonObj = JSON.parseFull(line.value())
println(line.value())
val map:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String, Any]]
val rowkey = map.get("id").get.asInstanceOf[String]
val name = map.get("name").get.asInstanceOf[String]
val sex = map.get("sex").get.asInstanceOf[String]
val city = map.get("city").get.asInstanceOf[String]
val occupation = map.get("occupation").get.asInstanceOf[String]
val mobile_phone_num = map.get("mobile_phone_num").get.asInstanceOf[String]
val fix_phone_num = map.get("fix_phone_num").get.asInstanceOf[String]
val bank_name = map.get("bank_name").get.asInstanceOf[String]
val address = map.get("address").get.asInstanceOf[String]
val marriage = map.get("marriage").get.asInstanceOf[String]
val child_num = map.get("child_num").get.asInstanceOf[String]
val tableName = TableName.valueOf("user_info")
val table = connection.getTable(tableName)
val put = new Put(Bytes.toBytes(rowkey))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("sex"), Bytes.toBytes(sex))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("city"), Bytes.toBytes(city))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("occupation"), Bytes.toBytes(occupation))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("mobile_phone_num"), Bytes.toBytes(mobile_phone_num))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("fix_phone_num"), Bytes.toBytes(fix_phone_num))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("bank_name"), Bytes.toBytes(bank_name))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("address"), Bytes.toBytes(address))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("marriage"), Bytes.toBytes(marriage))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("child_num"), Bytes.toBytes(child_num))
Try(table.put(put)).getOrElse(table.close())//将数据写入Hbase,若出错关闭table
table.close()//分区数据写入Hbase后关闭连接
})
connection.close()
})
})
ssc.start()
ssc.awaitTermination()
}
}
使用mvn命令编译工程
scala工程编译时mvn命令要加scala:compile
mvn clean scala:compile package7.将编译好的spark2-demo-1.0-SNAPSHOT.jar包上传至服务器
编译好的spark2-demo-1.0-SNAPSHOT.jar包上传至服务器
Spark2应用的配置文件放在conf目录下 spark2streaming-kafka-hbase目录拷贝至集群的所有节点
3.运行示例
spark2-submit命令向集群提交Spark2Streaming作业
spark2-submit --class com.cloudera.streaming.Kafka2Spark2Hbase --master yarn --deploy-mode client --executor-memory 2g --executor-cores 2 --driver-memory 2g --num-executors 2 --queue default --principal fayson@FAYSON.COM --keytab /data/disk1/spark2streaming-kafka-hbase/conf/fayson.keytab --files "/data/disk1/spark2streaming-kafka-hbase/conf/jaas.conf#jaas.conf" --driver-java-options "-Djava.security.auth.login.config=/data/disk1/spark2streaming-kafka-hbase/conf/jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/data/disk1/spark2streaming-kafka-hbase/conf/jaas.conf" spark2-demo-1.0-SNAPSHOT.jar
运行脚本向Kafka的Kafka_hbase_topic生产消息用hbase shell命令查看数据是否入库成功
4.总结
在访问Kerberos环境的Hbase,需要加载Hbase的客户端配置文件,为了方便直接将三个配置文件加载在0289.properties配置文件中,如果指定的为相对路径可能会出现Kerberos认证失败,所以要指定了keytab文件的绝对路径Spark2默认的kafka版本为0.9需要通过CM将默认的Kafka版本修改为0.10jaas.conf文件及keytab需要在集群的所有节点存在,因为Driver和Executor是随机在集群的节点上启动的在选择依赖包时需要注意与Spark版本的兼容性问题
大数据视频推荐:
CSDN
大数据语音推荐:
企业级大数据技术应用
大数据机器学习案例之推荐系统
自然语言处理
大数据基础
人工智能:深度学习入门到精通



