import java.io.IOException
import java.security.PrivilegedExceptionAction
import java.util
import com.alibaba.fastjson.JSonObject
import com.chinaoly.ssbkStreaming.config.HbaseConfig
import com.chinaoly.ssbkStreaming.utils.PropertiesUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.filter.PrefixFilter
import org.apache.hadoop.hbase.{Cell, CellUtil, HbaseConfiguration, HColumnDescriptor, HConstants, HTableDescriptor, NamespaceDescriptor, TableName}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.security.UserGroupInformation
import org.slf4j.LoggerFactory
import scala.collection.mutable.ArrayBuffer
object HbaseHelper {
private val logger = LoggerFactory.getLogger(getClass)
val QUORUM: String = PropertiesUtils.getString("hbase.zookeeper.quorum")
val PORT: String = PropertiesUtils.getString("hbase.zookeeper.port")
val ZNODE: String = PropertiesUtils.getString("hbase.zookeeper.zNode")
val IS_KERBEROS: Boolean = PropertiesUtils.getBoolean("hbase.kerberos")
val KRB5_CONF_PATH: String = PropertiesUtils.getString("java.security.krb5.conf")
val KEYTAB_PATH: String = PropertiesUtils.getString("kerberos.keytab.path")
val KERBEROS_USER: String = PropertiesUtils.getString("kerberos.user")
val Hbase_SITE_FILE: String = PropertiesUtils.getString("hbase.site.file")
val CORE_SITE_FILE: String = PropertiesUtils.getString("core.site.file")
val HDFS_SITE_FILE: String = PropertiesUtils.getString("hdfs.site.file")
var connection: Connection = _
var ugi: UserGroupInformation = _
var conf: Configuration = _
def login(): UserGroupInformation = {
if (ugi == null) {
if (conf == null) {
conf = HbaseConfiguration.create()
}
//kerberos
try {
System.setProperty("java.security.krb5.conf", KRB5_CONF_PATH)
conf.set("hadoop.security.authentication", "Kerberos")
conf.set("keytab.file", KEYTAB_PATH)
conf.set("kerberos.principal", KERBEROS_USER)
UserGroupInformation.setConfiguration(conf)
ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(KERBEROS_USER, KEYTAB_PATH)
} catch {
case e: IOException =>
logger.error(s"login hbase from keytab error,Cause:$e")
}
}
ugi
}
def getConnection: Connection = {
if (connection == null) {
if (conf == null) {
conf = HbaseConfiguration.create()
}
if (IS_KERBEROS) {
conf.addResource(new Path(CORE_SITE_FILE))
conf.addResource(new Path(HDFS_SITE_FILE))
conf.addResource(new Path(Hbase_SITE_FILE))
} else {
conf.set(HConstants.ZOOKEEPER_QUORUM, QUORUM)
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, PORT)
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ZNODE)
}
connection = ConnectionFactory.createConnection(conf)
}
connection
}
def createNamespace(namespace: String): Boolean = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[Boolean] {
override def run(): Boolean = createHbaseNamespace(namespace)
})
} else {
createHbaseNamespace(namespace)
}
}
def createHbaseNamespace(namespace: String): Boolean = {
try {
getConnection.getAdmin.createNamespace(NamespaceDescriptor.create(namespace).build())
true
} catch {
case e: Exception =>
logger.error(s"create hbase namespace $namespace error: $e")
false
}
}
def existsNamespace(namespace: String): Boolean = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[Boolean] {
override def run(): Boolean = existsHbaseNamespace(namespace)
})
} else {
existsHbaseNamespace(namespace)
}
}
def existsHbaseNamespace(namespace: String): Boolean = {
try {
getConnection.getAdmin.getNamespaceDescriptor(namespace)
true
} catch {
case e: Exception =>
logger.error(s"hbase namespace $namespace is not exists")
false
}
}
def dropNamespace(namespace: String): Boolean = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[Boolean] {
override def run(): Boolean = dropHbaseNamespace(namespace)
})
} else {
dropHbaseNamespace(namespace)
}
}
def dropHbaseNamespace(namespace: String): Boolean = {
try {
getConnection.getAdmin.deleteNamespace(namespace)
true
} catch {
case e: Exception =>
logger.error(s"drop hbase namespace $namespace error: $e")
false
}
}
def checkAndCreateNameSpace(namespace: String): Unit = {
if (!existsNamespace(namespace)) {
createNamespace(namespace)
}
}
def createTable(tableName: String, columnFamilyName: String, versions: Int, timeToLive: Int): Boolean = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[Boolean] {
override def run(): Boolean = createHbaseTable(tableName, columnFamilyName, versions, timeToLive: Int)
})
} else {
createHbaseTable(tableName, columnFamilyName, versions, timeToLive: Int)
}
}
def createHbaseTable(tableName: String, columnFamilyName: String, versions: Int, timeToLive: Int): Boolean = {
try {
val hTable = new HTableDescriptor(TableName.valueOf(tableName))
val hColumn = new HColumnDescriptor(columnFamilyName)
hColumn.setMaxVersions(versions)
hTable.addFamily(hColumn)
//设置过期时间,-1为永久
if(timeToLive > -1){
hColumn.setTimeToLive(timeToLive)
}
getConnection.getAdmin.createTable(hTable)
true
} catch {
case e: Exception =>
logger.info(s"create hbase table $tableName error: $e")
false
}
}
def existsTable(tableName: String): Boolean = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[Boolean] {
override def run(): Boolean = existsHbaseTable(tableName)
})
} else {
existsHbaseTable(tableName)
}
}
def existsHbaseTable(tableName: String): Boolean = {
try {
val bool: Boolean = getConnection.getAdmin.tableExists(TableName.valueOf(tableName))
bool
} catch {
case e: Exception =>
logger.info(s"hbase table $tableName is not exists")
false
}
}
def checkAndCreateNametable(tableName: String, columnFamilyName: String, versions: Int, timeToLive: Int): Unit = {
if (!existsTable(tableName)) {
createTable(tableName, columnFamilyName, versions, timeToLive: Int)
}
}
def getTable(connection: Connection, tableName: String): Option[Table] = {
try {
Some(connection.getTable(TableName.valueOf(tableName)))
} catch {
case e: Exception =>
logger.error(s"hbase getTable error,Cause:$e")
None
}
}
def getData(tableName: String, get: Get): Option[JSONObject] = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[Option[JSONObject]] {
override def run(): Option[JSONObject] = getHbaseData(tableName, get)
})
} else {
getHbaseData(tableName, get)
}
}
private def getHbaseData(tableName: String, get: Get): Option[JSONObject] = {
try {
val connection: Connection = getConnection
val result: Result = getTable(connection, tableName).get.get(get)
var obj: JSonObject = null
if (!result.isEmpty) {
val cellArray: Array[Cell] = result.rawCells()
obj = new JSonObject
cellArray.foreach(cell => {
val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell))
val cellValue: String = Bytes.toString(CellUtil.clonevalue(cell))
obj.put(cellName, cellValue)
})
Some(obj)
} else {
None
}
} catch {
case e: Exception =>
logger.error(s"hbase getData error,Cause:$e")
None
}
}
def getOneVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[ArrayBuffer[JSONObject]] {
override def run(): ArrayBuffer[JSONObject] = getHbaseOneVersionData(tableName, getList)
})
} else {
getHbaseOneVersionData(tableName, getList)
}
}
def getHbaseOneVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = {
val arrayBuffer: ArrayBuffer[JSONObject] = ArrayBuffer.apply[JSONObject]()
try {
val connection: Connection = getConnection
val resultArray: Array[Result] = getTable(connection, tableName).get.get(getList)
resultArray.foreach(result => {
if (!result.isEmpty) {
val cellArray: Array[Cell] = result.rawCells()
val obj = new JSonObject
cellArray.foreach(cell => {
val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell))
val cellValue: String = Bytes.toString(CellUtil.clonevalue(cell))
obj.put(cellName, cellValue)
})
arrayBuffer += obj
}
})
} catch {
case e: Exception =>
logger.error(s"hbase getoneVersionData error,Cause:$e")
None
}
arrayBuffer
}
def getMultiVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[ArrayBuffer[JSONObject]] {
override def run(): ArrayBuffer[JSONObject] = getHbaseMultiVersionData(tableName, getList)
})
} else {
getHbaseMultiVersionData(tableName, getList)
}
}
def getHbaseMultiVersionData(tableName: String, getList: java.util.List[Get]): ArrayBuffer[JSONObject] = {
val arrayBuffer: ArrayBuffer[JSONObject] = ArrayBuffer.apply[JSONObject]()
try {
val connection: Connection = getConnection
val searchArray: Array[Result] = getTable(connection, tableName).get.get(getList)
searchArray.foreach(bk => {
val groupMap: Map[Long, Array[Cell]] = bk.rawCells().groupBy(_.getTimestamp)
for (key <- groupMap.keySet) {
val cellArray: Array[Cell] = groupMap.apply(key)
val obj = new JSonObject
obj.put("timestamp", key)
cellArray.foreach(cell => {
val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell))
val cellValue: String = Bytes.toString(CellUtil.clonevalue(cell))
obj.put(cellName, cellValue)
})
arrayBuffer += obj
}
})
} catch {
case e: Exception =>
logger.error(s"Hbase getMultiVersionData error:$e")
}
arrayBuffer
}
def scanOneVersionData(tableName: String, scan: Scan): ArrayBuffer[JSONObject] = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[ArrayBuffer[JSONObject]] {
override def run(): ArrayBuffer[JSONObject] = scanHbaseOneVersionData(tableName, scan)
})
} else {
scanHbaseOneVersionData(tableName, scan)
}
}
def scanHbaseOneVersionData(tableName: String, scan: Scan): ArrayBuffer[JSONObject] = {
val arrayBuffer: ArrayBuffer[JSONObject] = ArrayBuffer.apply[JSONObject]()
try {
val connection: Connection = getConnection
val resultScanner: ResultScanner = getTable(connection, tableName).get.getScanner(scan)
val it: util.Iterator[Result] = resultScanner.iterator()
while (it.hasNext) {
val bk: Result = it.next()
val cellArray: Array[Cell] = bk.rawCells()
val obj = new JSonObject
obj.put("rowKey", Bytes.toString(bk.getRow))
cellArray.foreach(cell => {
val cellName: String = Bytes.toString(CellUtil.cloneQualifier(cell))
val cellValue: String = Bytes.toString(CellUtil.clonevalue(cell))
obj.put(cellName, cellValue)
})
arrayBuffer += obj
}
} catch {
case e: Exception =>
logger.error(s"Hbase getMultiVersionData error:$e")
}
arrayBuffer
}
def putData(tableName: String, put: Put): Boolean = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[Boolean] {
override def run(): Boolean = putHbaseData(tableName, put)
})
} else {
putHbaseData(tableName, put)
}
}
def putHbaseData(tableName: String, put: Put): Boolean = {
var result = false
try {
val connection: Connection = getConnection
val table: Table = getTable(connection, tableName).get
table.put(put)
result = true
} catch {
case e: Exception =>
logger.error(s"Hbase putData error:$e")
result = false
}
result
}
def putDataList(tableName: String, putList: java.util.List[Put]): Boolean = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[Boolean] {
override def run(): Boolean = putHbaseDataList(tableName, putList)
})
} else {
putHbaseDataList(tableName, putList)
}
}
def putHbaseDataList(tableName: String, putList: java.util.List[Put]): Boolean = {
var result = false
try {
val connection: Connection = getConnection
val table: Table = getTable(connection, tableName).get
table.put(putList)
result = true
} catch {
case e: Exception =>
logger.error(s"Hbase putDataList error:$e")
result = false
}
result
}
def delData(tableName: String, delete: Delete): Boolean = {
if (IS_KERBEROS) {
val ugi: UserGroupInformation = login()
ugi.doAs(new PrivilegedExceptionAction[Boolean] {
override def run(): Boolean = delHbaseData(tableName, delete)
})
} else {
delHbaseData(tableName, delete)
}
}
def delHbaseData(tableName: String, delete: Delete): Boolean = {
var result = false
try {
val connection: Connection = getConnection
val table: Table = getTable(connection, tableName).get
table.delete(delete)
result = true
} catch {
case e: Exception =>
logger.error(s"Hbase delData error:$e")
result = false
}
result
}
def buildScan(rowKeyPrefix: Array[Byte]): Scan = {
val scan = new Scan()
val prefixFilter = new PrefixFilter(rowKeyPrefix)
scan.setFilter(prefixFilter)
scan.addFamily(HbaseConfig.FAMILY_NAME.getBytes())
scan
}
def buildGet(rowKey: Array[Byte], versions: Int): Get = {
val get = new Get(rowKey)
get.addFamily(HbaseConfig.FAMILY_NAME.getBytes())
get.setMaxVersions(versions)
get
}
def buildPut(rowKey: Array[Byte], gjValue: JSONObject): Put = {
import scala.collection.JavaConversions._
val put = new Put(rowKey)
for (key <- gjValue.keySet()) {
put.addColumn(HbaseConfig.FAMILY_NAME.getBytes, key.getBytes, String.valueOf(gjValue.get(key)).getBytes)
}
put
}
}