- 一、废话不多说,直接上代码
- 二、把项目打成jar包,并和spark集成
- 三、本人对该项目的改造
- 1.项目结构
- 四、最后的清洗结果
package com.roundyuan.sparkagent
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
import org.apache.spark.sql.catalyst.expressions.Namedexpression
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, DeserializeToObject, Distinct, Filter, Generate, GlobalLimit, InsertIntoTable, Join, LocalLimit, LocalRelation, LogicalPlan, MapElements, MapPartitions, Project, Repartition, RepartitionByexpression, SerializeFromObject, SubqueryAlias, TypedFilter, Union, Window}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable}
import org.apache.spark.sql.util.QueryExecutionListener
import scala.collection.mutable
import scala.collection.mutable.{ListBuffer, Map}
import scala.util.control.NonFatal
class FlQueryExecutionListener extends QueryExecutionListener with Logging {
// 目标表应该只有一张
private val targetTable: Map[Long, String] = Map()
// source表 可能有多个
private val sourceTables: Map[Long, String] = Map()
// 字段执行过程的关系
private val fieldProcess: Map[Long, mutable.Set[Long]] = Map()
// 压缩后的血缘关系 只记录source表到 target表
private val fieldLineage: Map[String, mutable.Set[String]] = mutable.Map();
// SQL类型 考虑、insert select、create as
private var processType: String = ""
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = withErrorHandling(qe) {
// scuess exec logic plan exec
lineageParser(qe)
}
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = withErrorHandling(qe) {
}
private def withErrorHandling(qe: QueryExecution)(body: => Unit): Unit = {
try
body
catch {
case NonFatal(e) =>
val ctx = qe.sparkSession.sparkContext
logError(s"Unexpected error occurred during lineage processing for application: ${ctx.appName} #${ctx.applicationId}", e)
}
}
def lineageParser(qe: QueryExecution): Unit = {
logInfo("----------- field lineage parse start --------")
// 针对做解析且将 source表及结果表记录
val analyzedLogicPlan = qe.analyzed
resolveLogicV2(analyzedLogicPlan)
// 关系连接
connectSourceFieldAndTargetField()
println(fieldLineage)
}
def resolveLogicV2(plan: LogicalPlan): Unit = {
// 获取原始表从 LogicalRelation 或 HiveTableRelation 目标表从 InsertIntoHiveTable 和 CreateHiveTableAsSelectCommand
// 获取转换过程从Aggregate 和 Project
plan.collect {
case plan: LogicalRelation => {
val calalogTable = plan.catalogTable.get
val tableName = calalogTable.database + "." + calalogTable.identifier.table
plan.output.foreach(columnAttribute => {
val columnFullName = tableName + "." + columnAttribute.name
sourceTables += (columnAttribute.exprId.id -> columnFullName)
})
}
case plan: HiveTableRelation => {
val tableName = plan.tablemeta.database + "." + plan.tablemeta.identifier.table
plan.output.foreach(columnAttribute => {
val columnFullName = tableName + "." + columnAttribute.name
sourceTables += (columnAttribute.exprId.id -> columnFullName)
})
}
case plan: InsertIntoHiveTable => {
val tableName = plan.table.database + "." + plan.table.identifier.table
extTargetTable(tableName, plan.query)
}
case plan: InsertIntoHadoopFsRelationCommand=>{
val catalogTable: CatalogTable = plan.catalogTable.get
val tableName=catalogTable.database+"."+catalogTable.identifier.table
extTargetTable(tableName, plan.query)
}
case plan: CreateHiveTableAsSelectCommand => {
val tableName = plan.tableDesc.database + "." + plan.tableDesc.identifier.table
extTargetTable(tableName, plan.query)
}
case plan: Aggregate => {
plan.aggregateexpressions.foreach(aggItem => {
extFieldProcess(aggItem)
}
)
}
case plan: Project => {
plan.projectList.toList.foreach {
pojoItem => {
extFieldProcess(pojoItem)
}
}
}
// case `plan` => logInfo("******child plan******:n" + plan)
}
}
def extFieldProcess(namedexpression: Namedexpression): Unit = {
//alias 存在转换关系 不然就是原本的值
if ("alias".equals(namedexpression.prettyName)) {
val sourceFieldId = namedexpression.exprId.id
val targetFieldIdSet: mutable.Set[Long] = fieldProcess.getOrElse(sourceFieldId, mutable.Set.empty)
namedexpression.references.foreach(attribute => {
targetFieldIdSet += attribute.exprId.id
})
fieldProcess += (sourceFieldId -> targetFieldIdSet)
}
}
def extTargetTable(tableName: String, plan: LogicalPlan): Unit = {
logInfo("start ext target table")
plan.output.foreach(columnAttribute => {
val columnFullName = tableName + "." + columnAttribute.name
targetTable += (columnAttribute.exprId.id -> columnFullName)
})
}
def connectSourceFieldAndTargetField(): Unit = {
val fieldIds = targetTable.keySet
fieldIds.foreach(fieldId => {
val resTargetFieldName = targetTable(fieldId)
val resSourceFieldSet: mutable.Set[String] = mutable.Set.empty[String]
if (sourceTables.contains(fieldId)) {
val sourceFieldId = sourceTables.getOrElse(fieldId, "")
resSourceFieldSet += sourceFieldId
} else {
val targetIdsTmp = findSourceField(fieldId)
resSourceFieldSet ++= targetIdsTmp
}
fieldLineage += (resTargetFieldName -> resSourceFieldSet)
})
}
def findSourceField(fieldId: Long): mutable.Set[String] = {
val resSourceFieldSet: mutable.Set[String] = mutable.Set.empty[String]
if (fieldProcess.contains(fieldId)) {
val fieldIds: mutable.Set[Long] = fieldProcess.getOrElse(fieldId, mutable.Set.empty)
fieldIds.foreach(fieldId => {
if (sourceTables.contains(fieldId)) {
resSourceFieldSet += sourceTables(fieldId)
} else {
val sourceFieldSet = findSourceField(fieldId)
resSourceFieldSet ++= sourceFieldSet
}
})
}
resSourceFieldSet
}
}
测试代码
package com.roundyuan.sparkagent
import org.apache.spark.sql.SparkSession
object TestHive {
def main(args: Array[String]): Unit = {
// 连接hive数据仓库
val sparkSession = SparkSession.builder()
.config("hive.metastore.uris", "thrift://localhost:9083")
.appName("HiveCaseJob")
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
val listenV3 = new FlQueryExecutionListener()
sparkSession.listenerManager.register(listenV3)
//sparkSession.sql("show databases").show()
sparkSession.sql("insert into test.test_orc select id,count(name) as num from test.test02 group by id").show()
//val user_log = sparkSession.sql("select * from dbtaobao.user_log").collect()
//val test = user_log.map(row => "user_id"+row(0))
//test.map(row => println(row))
}
}
pom.xml文件
二、把项目打成jar包,并和spark集成4.0.0 com.roundyuan roundyuan-spark-lineage 1.0-SNAPSHOT 8 8 2.11.12 2.11 2.4.7 org.apache.spark spark-core_${scala.binary.version} ${spark.version} org.apache.spark spark-sql_${scala.binary.version} ${spark.version} org.codehaus.janino commons-compiler 3.0.16 org.apache.spark spark-hive_${scala.binary.version} ${spark.version} org.scala-lang scala-library ${scala.version} org.scala-tools maven-scala-plugin 2.15.2 compile testCompile
1.先打包 mvn clean install
2.将jar包放到 spark的jar下
3.在spark-default.conf 配置 spark.sql.execution.arrow.enabled true spark.sql.queryExecutionListeners com.roundyuan.sparkagent.FlQueryExecutionListener
4.执行sql会打印出 字段的依赖关系
<1>、resources中主要包含五个文件(core-site.xml、druid.properties、hdfs-site.xml、hive-site.xml、yarn-site.xml);
<2>、entity是我的实体类
package com.roundyuan.sparkagent.entity;
import lombok.Data;
import org.joda.time.DateTime;
@Data
public class TableLineageInfo{
private String SourceTableNme;
private String SourceDatabaseNme;
private String SourceColNme;
private String TargetTableNme;
private String TargetDatabaseNme;
private String TargetColNme;
private DateTime updateDate;
}
<3>、utils工具类包
代码如下:
package com.roundyuan.sparkagent.utils;
import java.sql.*;
public class DruidInsert {
public static void insert (String SourceTableNme,String SourceDatabaseNme,String SourceColNme,String TargetTableNme,String TargetDatabaseNme,String TargetColNme){
Connection conn =null;
PreparedStatement pst = null;
if(DruidSelect.select(TargetTableNme,TargetColNme)==false) {
try {
//给数据库day1 中的user表 添加一组数据
// conn = JdbcUtil.getConnection();
String url = "隐私,需要保密";
//换成自己PostgreSQL数据库实例所在的ip地址,并设置自己的端口 S
String user = "postgres";
String password = "123456";
//在这里我的密码为空,读者可以自己选择是否设置密码
try {
Class.forName("org.postgresql.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
conn= DriverManager.getConnection(url, user, password);
String sql = "insert into TableLineageInfo(SourceTableNme,SourceDatabaseNme,SourceColNme,TargetTableNme,TargetDatabaseNme,TargetColNme,updateDate) values(?,?,?,?,?,?,?)";
//获取操作sql对象
pst = conn.prepareStatement(sql);
//给?赋值
pst.setString(1, SourceTableNme);
pst.setString(2, SourceDatabaseNme);
pst.setString(3, SourceColNme);
pst.setString(4, TargetTableNme);
pst.setString(5, TargetDatabaseNme);
pst.setString(6, TargetColNme);
pst.setTimestamp(7, new Timestamp(new java.util.Date().getTime()));
//执行
//pst.addBatch();
int i = pst.executeUpdate();
// System.out.println(i);
} catch (SQLException e) {
// e.printStackTrace();
} finally {
//释放资源
//JdbcUtil.close(pst, conn);
try {
conn.close();
pst.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}else{
DruidUpdate.update(SourceTableNme,SourceDatabaseNme,SourceColNme,TargetTableNme,TargetDatabaseNme,TargetColNme);
}
}
}
package com.roundyuan.sparkagent.utils;
import java.sql.*;
public class DruidSelect {
public static Boolean select (String TargetTableNme,String TargetColNme){
Connection conn =null;
PreparedStatement pst = null;
try {
//给数据库day1 中的user表 添加一组数据
// conn = JdbcUtil.getConnection();
String url = "隐私,需要保密";
//换成自己PostgreSQL数据库实例所在的ip地址,并设置自己的端口 S
String user = "postgres";
String password = "123456";
//在这里我的密码为空,读者可以自己选择是否设置密码
try {
Class.forName("org.postgresql.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
conn= DriverManager.getConnection(url, user, password);
String sql ="select * from TableLineageInfo where TargetTableNme=? and TargetColNme=?";
//获取操作sql对象
pst = conn.prepareStatement(sql);
//给?赋值
pst.setString(1,TargetTableNme);
pst.setString(2,TargetColNme);
//执行
//System.out.println(pst.execute());
pst.execute();
} catch (SQLException e) {
e.printStackTrace();
}finally {
//释放资源
//JdbcUtil.close(pst,conn);
try {
conn.close();
pst.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
return false;
}
}
package com.roundyuan.sparkagent.utils;
import java.sql.*;
public class DruidUpdate {
public static void update (String SourceTableNme,String SourceDatabaseNme,String SourceColNme,String TargetTableNme,String TargetDatabaseNme,String TargetColNme){
Connection conn =null;
PreparedStatement pst = null;
try {
//修改一组数据
// conn = JdbcUtil.getConnection();
String url = "隐私,需要保密";
//换成自己PostgreSQL数据库实例所在的ip地址,并设置自己的端口 S
String user = "postgres";
String password = "123456";
//在这里我的密码为空,读者可以自己选择是否设置密码
try {
Class.forName("org.postgresql.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
conn= DriverManager.getConnection(url, user, password);
String sql ="UPDATE TableLineageInfo SET SourceTableNme = ?,SourceDatabaseNme=?,SourceColNme=?,TargetTableNme=?,TargetDatabaseNme=?,TargetColNme=?,updateDate=? WHERe TargetTableNme=?,TargetDatabaseNme=?";
//获取操作sql对象
pst = conn.prepareStatement(sql);
//给?赋值
pst.setString(1, SourceTableNme);
pst.setString(2, SourceDatabaseNme);
pst.setString(3, SourceColNme);
pst.setString(4, TargetTableNme);
pst.setString(5, TargetDatabaseNme);
pst.setString(6, TargetColNme);
pst.setTimestamp(7, new Timestamp(new java.util.Date().getTime()));
pst.setString(8, TargetTableNme);
pst.setString(9, TargetDatabaseNme);
//执行
System.out.println("***************更新了*****************");
pst.execute();
} catch (SQLException e) {
e.printStackTrace();
}finally {
//释放资源
try {
conn.close();
pst.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
<4>、FlQueryExecutionListener是我的核心类,手动获取解析计划为 explain extended,该方法为通过解析逻辑执行计划获取字段间的血缘关系
package com.roundyuan.sparkagent
import com.roundyuan.sparkagent.utils.DruidInsert
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
import org.apache.spark.sql.catalyst.expressions.Namedexpression
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable}
import org.apache.spark.sql.util.QueryExecutionListener
import scala.collection.mutable
import scala.collection.mutable.Map
import scala.util.control.NonFatal
class FlQueryExecutionListener extends QueryExecutionListener with Logging {
private val conn= null
private val pst= null
// 目标表应该只有一张
private val targetTable: Map[Long, String] = Map()
// source表 可能有多个
private val sourceTables: Map[Long, String] = Map()
// 字段执行过程的关系
private val fieldProcess: Map[Long, mutable.Set[Long]] = Map()
// 压缩后的血缘关系 只记录source表到 target表
private val fieldLineage: Map[String, mutable.Set[String]] = mutable.Map();
// SQL类型 考虑、insert select、create as
private var processType: String = ""
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = withErrorHandling(qe) {
// scuess exec logic plan exec
lineageParser(qe)
}
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = withErrorHandling(qe) {
}
private def withErrorHandling(qe: QueryExecution)(body: => Unit): Unit = {
try
body
catch {
case NonFatal(e) =>
val ctx = qe.sparkSession.sparkContext
logError(s"Unexpected error occurred during lineage processing for application: ${ctx.appName} #${ctx.applicationId}", e)
}
}
def lineageParser(qe: QueryExecution): Unit = {
logInfo("----------- field lineage parse start --------")
// 针对做解析且将 source表及结果表记录
val analyzedLogicPlan = qe.analyzed
resolveLogicV2(analyzedLogicPlan)
// 关系连接
connectSourceFieldAndTargetField()
// println(fieldLineage)
for(i<- fieldLineage){
if(i.toString().split(",")(1).length!=6){
//println(i.toString().split(",")(0).substring(1)+"<-"+i.toString().split(",")(1).substring(4,i.toString().split(",")(1).length-2))
var aa = i.toString().toLowerCase().split(",")(0).substring(1)
var tdb = aa.split("\.")(0)
var ttb = aa.split("\.")(1)
var tfd = aa.split("\.")(2)
var bb = i.toString().toLowerCase().split(",")(1).substring(4,i.toString().split(",")(1).length-2)
var sdb = bb.split("\.")(0)
var stb = bb.split("\.")(1)
var sfd = bb.split("\.")(2)
DruidInsert.insert(stb,sdb,sfd,ttb,tdb,tfd)
}
}
// println(fieldLineage)
}
def resolveLogicV2(plan: LogicalPlan): Unit = {
// 获取原始表从 LogicalRelation 或 HiveTableRelation 目标表从 InsertIntoHiveTable 和 CreateHiveTableAsSelectCommand
// 获取转换过程从Aggregate 和 Project
plan.collect {
case plan: LogicalRelation => {
val calalogTable = plan.catalogTable.get
val tableName = calalogTable.database + "." + calalogTable.identifier.table
plan.output.foreach(columnAttribute => {
val columnFullName = tableName + "." + columnAttribute.name
sourceTables += (columnAttribute.exprId.id -> columnFullName)
})
}
case plan: HiveTableRelation => {
val tableName = plan.tablemeta.database + "." + plan.tablemeta.identifier.table
plan.output.foreach(columnAttribute => {
val columnFullName = tableName + "." + columnAttribute.name
sourceTables += (columnAttribute.exprId.id -> columnFullName)
})
}
case plan: InsertIntoHiveTable => {
val tableName = plan.table.database + "." + plan.table.identifier.table
extTargetTable(tableName, plan.query)
}
case plan: InsertIntoHadoopFsRelationCommand=>{
val catalogTable: CatalogTable = plan.catalogTable.get
val tableName=catalogTable.database+"."+catalogTable.identifier.table
extTargetTable(tableName, plan.query)
}
case plan: CreateHiveTableAsSelectCommand => {
val tableName = plan.tableDesc.database + "." + plan.tableDesc.identifier.table
extTargetTable(tableName, plan.query)
}
case plan: Aggregate => {
plan.aggregateexpressions.foreach(aggItem => {
extFieldProcess(aggItem)
}
)
}
case plan: Project => {
plan.projectList.toList.foreach {
pojoItem => {
extFieldProcess(pojoItem)
}
}
}
// case `plan` => logInfo("******child plan******:n" + plan)
}
}
def extFieldProcess(namedexpression: Namedexpression): Unit = {
//alias 存在转换关系 不然就是原本的值
if ("alias".equals(namedexpression.prettyName)) {
val sourceFieldId = namedexpression.exprId.id
val targetFieldIdSet: mutable.Set[Long] = fieldProcess.getOrElse(sourceFieldId, mutable.Set.empty)
namedexpression.references.foreach(attribute => {
targetFieldIdSet += attribute.exprId.id
})
fieldProcess += (sourceFieldId -> targetFieldIdSet)
}
}
def extTargetTable(tableName: String, plan: LogicalPlan): Unit = {
logInfo("start ext target table")
plan.output.foreach(columnAttribute => {
val columnFullName = tableName + "." + columnAttribute.name
targetTable += (columnAttribute.exprId.id -> columnFullName)
})
}
def connectSourceFieldAndTargetField(): Unit = {
val fieldIds = targetTable.keySet
fieldIds.foreach(fieldId => {
val resTargetFieldName = targetTable(fieldId)
val resSourceFieldSet: mutable.Set[String] = mutable.Set.empty[String]
if (sourceTables.contains(fieldId)) {
val sourceFieldId = sourceTables.getOrElse(fieldId, "")
resSourceFieldSet += sourceFieldId
} else {
val targetIdsTmp = findSourceField(fieldId)
resSourceFieldSet ++= targetIdsTmp
}
fieldLineage += (resTargetFieldName -> resSourceFieldSet)
})
}
def findSourceField(fieldId: Long): mutable.Set[String] = {
val resSourceFieldSet: mutable.Set[String] = mutable.Set.empty[String]
if (fieldProcess.contains(fieldId)) {
val fieldIds: mutable.Set[Long] = fieldProcess.getOrElse(fieldId, mutable.Set.empty)
fieldIds.foreach(fieldId => {
if (sourceTables.contains(fieldId)) {
resSourceFieldSet += sourceTables(fieldId)
} else {
val sourceFieldSet = findSourceField(fieldId)
resSourceFieldSet ++= sourceFieldSet
}
})
}
resSourceFieldSet
}
}
<5>、TestHive测试类
package com.roundyuan.sparkagent
import org.apache.spark.sql.SparkSession
object TestHive {
def main(args: Array[String]): Unit = {
// 连接hive数据仓库
val sparkSession = SparkSession.builder()
.config("hive.metastore.uris", "thrift://master1:9083,thrift://slave1:9083")
.config("HiveDb","ods.db,dcl.db,dw.db,mdp.db,ads.db")
.config("HdfsPath","hdfs://nameservice1:8020/user/hive/warehouse/")
.config("Hdfs.dfs.nameservices","nameservice1")
.config("Hdfs.dfs.ha.namenodes.nameservice1","namenode424,namenode442")
.config("Hdfs.dfs.namenode.rpc-address.nameservice1.namenode424","master1:8020")
.config("Hdfs.namenode.rpc-address.nameservice1.namenode442","master2:8020")
.config("spark.sql.warehouse.dir","hdfs://nameservice1/user/hive/warehouse")
.appName("HiveCaseJob")
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
val listenV3 = new FlQueryExecutionListener()
sparkSession.listenerManager.register(listenV3)
//sparkSession.sql("show databases").show()
sparkSession.sql("drop table tmp.t7")
sparkSession.sql("create table tmp.t8 As select * from mdp.mdp_cusdm_social limit 10").show()
// val a_1 = sparkSession.sql("select * from tmp.t3 limit 10").show()
// val a_2 = sparkSession.sql("select * from tmp.t4").toDF()
// val a_3 = a_1.join(a_2,a_1("client_oneid")===a_2("client_oneid"),"left")
// a_3.createOrReplaceTempView("tt")
// sparkSession.sql(
// """
// |create table tmp.tt_1 as select * from tt
// """.stripMargin).show()
// val user_log = sparkSession.sql("select * from mdp.mdp_cusdm_nature limit 100").collect()
// val test = user_log.map(row => "user_id"+row(0))
// test.map(row => println(row))
}
}
<6>、pom.xml文件
四、最后的清洗结果4.0.0 com.roundyuan roundyuan-spark-lineage 1.0-SNAPSHOT 8 8 2.11.12 2.11 2.4.0 org.apache.spark spark-core_${scala.binary.version} ${spark.version} hadoop-client org.apache.hadoop org.apache.spark spark-sql_${scala.binary.version} ${spark.version} scala-reflect org.scala-lang org.codehaus.janino commons-compiler 3.0.16 org.apache.spark spark-hive_${scala.binary.version} ${spark.version} org.scala-lang scala-library ${scala.version} org.projectlombok lombok 1.16.10 com.alibaba druid 1.2.6 mysql mysql-connector-java 8.0.16 org.postgresql postgresql 42.1.1 log4j log4j 1.2.17 org.apache.maven.plugins maven-surefire-plugin 2.12.4 true org.apache.maven.plugins maven-compiler-plugin 3.8.1 1.8 1.8 UTF-8 net.alchim31.maven scala-maven-plugin 3.2.2 eclipse-add-source add-source scala-compile-first compile scala-test-compile-first testCompile org.apache.maven.plugins maven-shade-plugin 3.2.1 package shade true true dep *:* *:* meta-INF/*.SF meta-INF/*.DSA meta-INF/*.RSA reference.conf
我的清洗结果表主要有7个字段(SourceTableNme->源表名,SourceDatabaseNme->源库名,SourceColNme->源列名,TargetTableNme->目标表名,TargetDatabaseNme->目标库名,TargetColNme->目标列名,UpdateDate->更新时间)
创作不易,如果对你有用,请为我点赞、关注,后续将会推出更加优质的内容,你的点赞和关注将是我创作最大的动力之所在!!



