cd /etc/init.d/
—启动master
[hadoop@hadoop000 init.d]$ sudo ./kudu-master start
Started Kudu Master Server (kudu-master): [ OK ]
—启动tserver
[hadoop@hadoop000 init.d]$ sudo ./kudu-tserver start
Started Kudu Tablet Server (kudu-tserver): [ OK ]
(
/// 会 由于时间同步原因,导致启动失败 ;可通过一下命令
sudo yum install ntp
service ntpd start
service ntpd status
)
— 前往浏览器查看进程
http://hadoop000:8050/
----在IDEA 中添加相关依赖
org.apache.kudu
kudu-client
1.7.0
import java.util
import org.apache.kudu.{ColumnSchema, Schema, Type}
import org.apache.kudu.client.{CreateTableOptions, KuduClient}
object kudu_API {
def createTable(client: KuduClient, tableName: String): Unit = {
import scala.collection.JavaConverters._
val columns=List(
new ColumnSchema.ColumnSchemaBuilder("word", Type.STRING).key(true).build(),
new ColumnSchema.ColumnSchemaBuilder("cnt", Type.INT32).build()
).asJava
val schema = new Schema(columns)
val options = new CreateTableOptions()
options.setNumReplicas(1)
val parcols = new util.linkedList[String]()
parcols.add("word")
options.addHashPartitions(parcols,3)
client.createTable(tableName,schema,options)
}
def insertRows(client: KuduClient, tableName: String): Unit = {
val table = client.openTable(tableName)
val session = client.newSession()
for (i <- 1 to 30) {
val insert = table.newInsert()
val row = insert.getRow
row.addString("word",s"lj+$i")
row.addInt("cnt",i*i)
session.apply(insert)
}
}
def deltable(client: KuduClient, tableName: String): Unit = {
client.deleteTable(tableName)
}
def querytable(client: KuduClient, tableName: String) = {
val table = client.openTable(tableName)
val scanner = client.newScannerBuilder(table).build()
while (scanner.hasMoreRows){
val iterator = scanner.nextRows()
while (iterator.hasNext){
val result = iterator.next()
println(result.getString("word")+"==>"+result.getInt("cnt"))
}
}
}
def updaterow(client: KuduClient, tableName: String) ={
val table = client.openTable(tableName)
val session = client.newSession()
val update = table.newUpdate()
val row = update.getRow
row.addString("word","lj+9")
row.addInt("cnt",19191919)
session.apply(update)
}
def main(args: Array[String]): Unit = {
val kudu_master="hadoop000"
val client = new KuduClient.KuduClientBuilder(kudu_master).build()
//创建一张表
val tableName="ljj-1226"
// 创建表
// createTable(client,tableName)
// 插入数据表
// insertRows(client,tableName)
// 删除表
// deltable(client,tableName)
// 查询表
querytable(client,tableName)
// 修改数据
updaterow(client,tableName)
querytable(client,tableName)
client.close()
}
}
spark -读写 kudu
import Util_ip.{new_schema, sink2kudu}
import org.apache.kudu.Schema
import org.apache.spark.sql.{Dataframe, SparkSession}
import trai.data_etl_process
object LogETL_1 extends data_etl_process{
override def process(session: SparkSession): Unit ={
val kudu_master="hadoop000"
import session.implicits._
val tableName="ods_20181007"
val df = session.read.format("org.apache.kudu.spark.kudu")
.option("kudu.table", tableName)
.option("kudu.master","hadoop000")
.option("kudu.operation.timeout.ms", "100000").load
// 需要写入的表
val frame = df.sample(0.01)
val frame1 = frame.select($"ip",$"adplatformkey",$"provincename")
frame1.printSchema()
println("===========写入 kudu 表中==============")
val table_name_new="renew_20220115"
val mainkey="ip"
sink2kudu.write2kudu(kudu_master:String,table_name_new:String,new_schema.renew_20220115_schema: Schema,frame1:Dataframe,mainkey:String)
print("===========验证该表是否有写入===========")
//
val df_check = session.read.format("org.apache.kudu.spark.kudu")
.option("kudu.table", table_name_new)
.option("kudu.master","hadoop000")
.option("kudu.operation.timeout.ms", "100000").load
df_check.show()
println("棒极了")
}
}
工具类-写入kudu 工具类 定义
package Util_ip
import java.util
import org.apache.kudu.{ColumnSchema, Schema, Type}
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.client.KuduClient.KuduClientBuilder
import org.apache.spark.sql.Dataframe
object sink2kudu {
def write2kudu(
kudu_master:String,table_name_new:String,schema: Schema,frame:Dataframe,mainkey:String
): Unit ={
// 创建连接口
val client = new KuduClientBuilder(kudu_master).build()
if (client.tableExists(table_name_new))
{client.deleteTable(table_name_new)
println("已把旧表删除。。。")}
else
{println(table_name_new + "该表不存在,开始创建")}
import scala.collection.JavaConverters._
val options = new CreateTableOptions()
options.setNumReplicas(1)
val parcols = new util.linkedList[String]()
parcols.add(mainkey)
options.addHashPartitions(parcols,3)
client.createTable(table_name_new,schema,options)
println(table_name_new + "已创建成功")
frame.write
.options(Map("kudu.master" -> kudu_master , "kudu.table" -> table_name_new))
.mode("append")
.format("org.apache.kudu.spark.kudu").save()
println("已成功写入表")
}
}
schema 工具类
package Util_ip
import org.apache.kudu.{ColumnSchema, Schema, Type}
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.client.KuduClient.KuduClientBuilder
import org.apache.spark.sql.Dataframe
object new_schema {
def renew_20220115_schema:Schema={
// val frame1 = frame.select($"ip",$"adplatformkey",$"provincename")
import scala.collection.JavaConverters._
val columns=List(
new ColumnSchema.ColumnSchemaBuilder("ip", Type.STRING).key(true).build(),
new ColumnSchema.ColumnSchemaBuilder("adplatformkey", Type.STRING).build(),
new ColumnSchema.ColumnSchemaBuilder("provincename", Type.STRING).build()
).asJava
val schema = new Schema(columns)
schema
}
def ctye_age_schema:Schema={
// val frame1 = frame.select($"ip",$"adplatformkey",$"provincename")
import scala.collection.JavaConverters._
val columns=List(
new ColumnSchema.ColumnSchemaBuilder("cityname", Type.STRING).key(true).build(),
new ColumnSchema.ColumnSchemaBuilder("provincename", Type.STRING).build(),
new ColumnSchema.ColumnSchemaBuilder("appname", Type.STRING).build(),
new ColumnSchema.ColumnSchemaBuilder("age_sum", Type.DOUBLE).build()
).asJava
val schema = new Schema(columns)
schema
}
def sdkversion_appname_schema:Schema={
// val frame1 = frame.select($"ip",$"adplatformkey",$"provincename")
import scala.collection.JavaConverters._
val columns=List(
new ColumnSchema.ColumnSchemaBuilder("sdkversion", Type.STRING).key(true).build(),
new ColumnSchema.ColumnSchemaBuilder("appname", Type.STRING).build(),
new ColumnSchema.ColumnSchemaBuilder("month", Type.INT32).build(),
new ColumnSchema.ColumnSchemaBuilder("age_sum", Type.DOUBLE).build()
).asJava
val schema = new Schema(columns)
schema
}
}



