栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark血缘字段解析

Spark血缘字段解析

Spark血缘字段解析
  • 一、废话不多说,直接上代码
  • 二、把项目打成jar包,并和spark集成
  • 三、本人对该项目的改造
      • 1.项目结构
  • 四、最后的清洗结果

一、废话不多说,直接上代码
package com.roundyuan.sparkagent

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
import org.apache.spark.sql.catalyst.expressions.Namedexpression
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate, DeserializeToObject, Distinct, Filter, Generate, GlobalLimit, InsertIntoTable, Join, LocalLimit, LocalRelation, LogicalPlan, MapElements, MapPartitions, Project, Repartition, RepartitionByexpression, SerializeFromObject, SubqueryAlias, TypedFilter, Union, Window}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable}
import org.apache.spark.sql.util.QueryExecutionListener

import scala.collection.mutable
import scala.collection.mutable.{ListBuffer, Map}
import scala.util.control.NonFatal



class FlQueryExecutionListener extends QueryExecutionListener with Logging {
  // 目标表应该只有一张
  private val targetTable: Map[Long, String] = Map()
  // source表 可能有多个
  private val sourceTables: Map[Long, String] = Map()
  // 字段执行过程的关系
  private val fieldProcess: Map[Long, mutable.Set[Long]] = Map()
  // 压缩后的血缘关系 只记录source表到 target表
  private val fieldLineage: Map[String, mutable.Set[String]] = mutable.Map();
  // SQL类型 考虑、insert select、create as
  private var processType: String = ""

  override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = withErrorHandling(qe) {
    // scuess exec logic plan exec
    lineageParser(qe)
  }

  override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = withErrorHandling(qe) {

  }

  private def withErrorHandling(qe: QueryExecution)(body: => Unit): Unit = {
    try
      body
    catch {
      case NonFatal(e) =>
        val ctx = qe.sparkSession.sparkContext
        logError(s"Unexpected error occurred during lineage processing for application: ${ctx.appName} #${ctx.applicationId}", e)
    }
  }

  def lineageParser(qe: QueryExecution): Unit = {
    logInfo("----------- field lineage parse start --------")
    // 针对做解析且将 source表及结果表记录
    val analyzedLogicPlan = qe.analyzed
    resolveLogicV2(analyzedLogicPlan)
    // 关系连接
    connectSourceFieldAndTargetField()
    println(fieldLineage)
  }

  
  def resolveLogicV2(plan: LogicalPlan): Unit = {
    // 获取原始表从 LogicalRelation 或 HiveTableRelation 目标表从 InsertIntoHiveTable 和 CreateHiveTableAsSelectCommand
    // 获取转换过程从Aggregate 和 Project
    plan.collect {
      case plan: LogicalRelation => {
        val calalogTable = plan.catalogTable.get
        val tableName = calalogTable.database + "." + calalogTable.identifier.table
        plan.output.foreach(columnAttribute => {
          val columnFullName = tableName + "." + columnAttribute.name
          sourceTables += (columnAttribute.exprId.id -> columnFullName)
        })
      }
      case plan: HiveTableRelation => {
        val tableName = plan.tablemeta.database + "." + plan.tablemeta.identifier.table
        plan.output.foreach(columnAttribute => {
          val columnFullName = tableName + "." + columnAttribute.name
          sourceTables += (columnAttribute.exprId.id -> columnFullName)
        })
      }
      case plan: InsertIntoHiveTable => {
        val tableName = plan.table.database + "." + plan.table.identifier.table
        extTargetTable(tableName, plan.query)
      }
      case plan: InsertIntoHadoopFsRelationCommand=>{
        val catalogTable: CatalogTable = plan.catalogTable.get
        val tableName=catalogTable.database+"."+catalogTable.identifier.table
        extTargetTable(tableName, plan.query)
      }
      case plan: CreateHiveTableAsSelectCommand => {
        val tableName = plan.tableDesc.database + "." + plan.tableDesc.identifier.table
        extTargetTable(tableName, plan.query)
      }
      case plan: Aggregate => {
        plan.aggregateexpressions.foreach(aggItem => {
          extFieldProcess(aggItem)
        }
        )
      }
      case plan: Project => {
        plan.projectList.toList.foreach {
          pojoItem => {
            extFieldProcess(pojoItem)
          }
        }
      }
      //      case `plan` => logInfo("******child plan******:n" + plan)
    }
  }

  def extFieldProcess(namedexpression: Namedexpression): Unit = {
    //alias 存在转换关系 不然就是原本的值
    if ("alias".equals(namedexpression.prettyName)) {
      val sourceFieldId = namedexpression.exprId.id
      val targetFieldIdSet: mutable.Set[Long] = fieldProcess.getOrElse(sourceFieldId, mutable.Set.empty)
      namedexpression.references.foreach(attribute => {
        targetFieldIdSet += attribute.exprId.id
      })
      fieldProcess += (sourceFieldId -> targetFieldIdSet)
    }
  }

  def extTargetTable(tableName: String, plan: LogicalPlan): Unit = {
    logInfo("start ext target table")
    plan.output.foreach(columnAttribute => {
      val columnFullName = tableName + "." + columnAttribute.name
      targetTable += (columnAttribute.exprId.id -> columnFullName)
    })
  }

  
  def connectSourceFieldAndTargetField(): Unit = {
    val fieldIds = targetTable.keySet
    fieldIds.foreach(fieldId => {
      val resTargetFieldName = targetTable(fieldId)
      val resSourceFieldSet: mutable.Set[String] = mutable.Set.empty[String]
      if (sourceTables.contains(fieldId)) {
        val sourceFieldId = sourceTables.getOrElse(fieldId, "")
        resSourceFieldSet += sourceFieldId
      } else {
        val targetIdsTmp = findSourceField(fieldId)
        resSourceFieldSet ++= targetIdsTmp
      }
      fieldLineage += (resTargetFieldName -> resSourceFieldSet)
    })
  }

  def findSourceField(fieldId: Long): mutable.Set[String] = {
    val resSourceFieldSet: mutable.Set[String] = mutable.Set.empty[String]
    if (fieldProcess.contains(fieldId)) {
      val fieldIds: mutable.Set[Long] = fieldProcess.getOrElse(fieldId, mutable.Set.empty)
      fieldIds.foreach(fieldId => {
        if (sourceTables.contains(fieldId)) {
          resSourceFieldSet += sourceTables(fieldId)
        } else {
          val sourceFieldSet = findSourceField(fieldId)
          resSourceFieldSet ++= sourceFieldSet
        }
      })
    }
    resSourceFieldSet
  }
}

测试代码

package com.roundyuan.sparkagent

import org.apache.spark.sql.SparkSession

object TestHive {
  def main(args: Array[String]): Unit = {
    // 连接hive数据仓库
    val sparkSession = SparkSession.builder()
      .config("hive.metastore.uris", "thrift://localhost:9083")
      .appName("HiveCaseJob")
      .master("local[*]")
      .enableHiveSupport()
      .getOrCreate()
    val listenV3 = new FlQueryExecutionListener()
    sparkSession.listenerManager.register(listenV3)
    //sparkSession.sql("show databases").show()
    sparkSession.sql("insert into test.test_orc select id,count(name) as num from test.test02 group by id").show()
    //val user_log = sparkSession.sql("select * from dbtaobao.user_log").collect()
    //val test = user_log.map(row => "user_id"+row(0))
    //test.map(row => println(row))

  }
}

pom.xml文件



    4.0.0

    com.roundyuan
    roundyuan-spark-lineage
    1.0-SNAPSHOT

    
        8
        8
        2.11.12
        2.11
        2.4.7
    

    
        
            org.apache.spark
            spark-core_${scala.binary.version}
            ${spark.version}
        
        
            org.apache.spark
            spark-sql_${scala.binary.version}
            ${spark.version}
        
        
            org.codehaus.janino
            commons-compiler
            3.0.16

        
        
            org.apache.spark
            spark-hive_${scala.binary.version}
            ${spark.version}

        
        
            org.scala-lang
            scala-library
            ${scala.version}
        
    
    
        
            
                org.scala-tools
                maven-scala-plugin
                2.15.2
                
                    
                        
                            compile
                            testCompile
                        
                    
                
            
        
    

二、把项目打成jar包,并和spark集成

1.先打包 mvn clean install
2.将jar包放到 spark的jar下
3.在spark-default.conf 配置 spark.sql.execution.arrow.enabled true spark.sql.queryExecutionListeners com.roundyuan.sparkagent.FlQueryExecutionListener
4.执行sql会打印出 字段的依赖关系

三、本人对该项目的改造 1.项目结构


<1>、resources中主要包含五个文件(core-site.xml、druid.properties、hdfs-site.xml、hive-site.xml、yarn-site.xml);
<2>、entity是我的实体类

package com.roundyuan.sparkagent.entity;

import lombok.Data;
import org.joda.time.DateTime;

@Data
public class TableLineageInfo{
    private String SourceTableNme;
    private String SourceDatabaseNme;
    private String SourceColNme;
    private String TargetTableNme;
    private String TargetDatabaseNme;
    private String TargetColNme;
    private DateTime updateDate;
}    

<3>、utils工具类包
代码如下:

package com.roundyuan.sparkagent.utils;

import java.sql.*;

public class DruidInsert {
    public static void insert (String SourceTableNme,String SourceDatabaseNme,String SourceColNme,String TargetTableNme,String TargetDatabaseNme,String TargetColNme){
        Connection conn =null;
        PreparedStatement pst = null;
        if(DruidSelect.select(TargetTableNme,TargetColNme)==false) {
            try {
                //给数据库day1 中的user表 添加一组数据
               // conn = JdbcUtil.getConnection();
                String url = "隐私,需要保密";
//换成自己PostgreSQL数据库实例所在的ip地址,并设置自己的端口 S
                String user = "postgres";
                String password = "123456";
//在这里我的密码为空,读者可以自己选择是否设置密码
                try {
                    Class.forName("org.postgresql.Driver");
                } catch (ClassNotFoundException e) {
                    e.printStackTrace();
                }
                conn= DriverManager.getConnection(url, user, password);
                String sql = "insert into TableLineageInfo(SourceTableNme,SourceDatabaseNme,SourceColNme,TargetTableNme,TargetDatabaseNme,TargetColNme,updateDate) values(?,?,?,?,?,?,?)";
                //获取操作sql对象
                pst = conn.prepareStatement(sql);
                //给?赋值
                pst.setString(1, SourceTableNme);
                pst.setString(2, SourceDatabaseNme);
                pst.setString(3, SourceColNme);
                pst.setString(4, TargetTableNme);
                pst.setString(5, TargetDatabaseNme);
                pst.setString(6, TargetColNme);
                pst.setTimestamp(7, new Timestamp(new java.util.Date().getTime()));
                //执行

                //pst.addBatch();
                int i = pst.executeUpdate();
               // System.out.println(i);
            } catch (SQLException e) {
               // e.printStackTrace();
            } finally {
                //释放资源
                //JdbcUtil.close(pst, conn);
                try {
                    conn.close();
                    pst.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }

        }else{
            DruidUpdate.update(SourceTableNme,SourceDatabaseNme,SourceColNme,TargetTableNme,TargetDatabaseNme,TargetColNme);
        }

    }
}    
package com.roundyuan.sparkagent.utils;

import java.sql.*;

public class DruidSelect {
    public static Boolean select (String TargetTableNme,String TargetColNme){
        Connection conn =null;
        PreparedStatement pst = null;
        try {
            //给数据库day1 中的user表 添加一组数据
           // conn = JdbcUtil.getConnection();
            String url = "隐私,需要保密";
//换成自己PostgreSQL数据库实例所在的ip地址,并设置自己的端口 S
            String user = "postgres";
            String password = "123456";
//在这里我的密码为空,读者可以自己选择是否设置密码
            try {
                Class.forName("org.postgresql.Driver");
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
            conn= DriverManager.getConnection(url, user, password);
            String sql ="select * from TableLineageInfo where TargetTableNme=? and TargetColNme=?";
            //获取操作sql对象
            pst = conn.prepareStatement(sql);
            //给?赋值
            pst.setString(1,TargetTableNme);
            pst.setString(2,TargetColNme);
            //执行
            //System.out.println(pst.execute());
            pst.execute();

        } catch (SQLException e) {
            e.printStackTrace();
        }finally {
            //释放资源
            //JdbcUtil.close(pst,conn);
            try {
                conn.close();
                pst.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }

        }
        return false;
    }
}    
package com.roundyuan.sparkagent.utils;

import java.sql.*;
public class DruidUpdate {
        public static void update (String SourceTableNme,String SourceDatabaseNme,String SourceColNme,String TargetTableNme,String TargetDatabaseNme,String TargetColNme){
            Connection conn =null;
            PreparedStatement pst = null;
            try {
                //修改一组数据
              //  conn = JdbcUtil.getConnection();
                String url = "隐私,需要保密";
//换成自己PostgreSQL数据库实例所在的ip地址,并设置自己的端口 S
                String user = "postgres";
                String password = "123456";
//在这里我的密码为空,读者可以自己选择是否设置密码
                try {
                    Class.forName("org.postgresql.Driver");
                } catch (ClassNotFoundException e) {
                    e.printStackTrace();
                }
                conn= DriverManager.getConnection(url, user, password);
                String sql ="UPDATE TableLineageInfo SET SourceTableNme = ?,SourceDatabaseNme=?,SourceColNme=?,TargetTableNme=?,TargetDatabaseNme=?,TargetColNme=?,updateDate=? WHERe TargetTableNme=?,TargetDatabaseNme=?";
                //获取操作sql对象
                pst = conn.prepareStatement(sql);
                //给?赋值
                pst.setString(1, SourceTableNme);
                pst.setString(2, SourceDatabaseNme);
                pst.setString(3, SourceColNme);
                pst.setString(4, TargetTableNme);
                pst.setString(5, TargetDatabaseNme);
                pst.setString(6, TargetColNme);
                pst.setTimestamp(7, new Timestamp(new java.util.Date().getTime()));
                pst.setString(8, TargetTableNme);
                pst.setString(9, TargetDatabaseNme);
                //执行
                System.out.println("***************更新了*****************");
                pst.execute();

            } catch (SQLException e) {
                e.printStackTrace();
            }finally {
                //释放资源
                try {
                    conn.close();
                    pst.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
}

<4>、FlQueryExecutionListener是我的核心类,手动获取解析计划为 explain extended,该方法为通过解析逻辑执行计划获取字段间的血缘关系

package com.roundyuan.sparkagent

import com.roundyuan.sparkagent.utils.DruidInsert
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
import org.apache.spark.sql.catalyst.expressions.Namedexpression
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveTable}
import org.apache.spark.sql.util.QueryExecutionListener

import scala.collection.mutable
import scala.collection.mutable.Map
import scala.util.control.NonFatal



class FlQueryExecutionListener extends QueryExecutionListener with Logging {
  private val  conn= null
  private val pst= null
  // 目标表应该只有一张
  private val targetTable: Map[Long, String] = Map()
  // source表 可能有多个
  private val sourceTables: Map[Long, String] = Map()
  // 字段执行过程的关系
  private val fieldProcess: Map[Long, mutable.Set[Long]] = Map()
  // 压缩后的血缘关系 只记录source表到 target表
  private val fieldLineage: Map[String, mutable.Set[String]] = mutable.Map();
  // SQL类型 考虑、insert select、create as
  private var processType: String = ""

  override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = withErrorHandling(qe) {
    // scuess exec logic plan exec
    lineageParser(qe)
  }

  override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = withErrorHandling(qe) {

  }

  private def withErrorHandling(qe: QueryExecution)(body: => Unit): Unit = {
    try
      body
    catch {
      case NonFatal(e) =>
        val ctx = qe.sparkSession.sparkContext
        logError(s"Unexpected error occurred during lineage processing for application: ${ctx.appName} #${ctx.applicationId}", e)
    }
  }

  def lineageParser(qe: QueryExecution): Unit = {
    logInfo("----------- field lineage parse start --------")
    // 针对做解析且将 source表及结果表记录
    val analyzedLogicPlan = qe.analyzed
    resolveLogicV2(analyzedLogicPlan)
    // 关系连接
    connectSourceFieldAndTargetField()
//    println(fieldLineage)
    for(i<- fieldLineage){
      if(i.toString().split(",")(1).length!=6){
        //println(i.toString().split(",")(0).substring(1)+"<-"+i.toString().split(",")(1).substring(4,i.toString().split(",")(1).length-2))
        var aa = i.toString().toLowerCase().split(",")(0).substring(1)
        var tdb = aa.split("\.")(0)
        var ttb = aa.split("\.")(1)
        var tfd = aa.split("\.")(2)

        var bb = i.toString().toLowerCase().split(",")(1).substring(4,i.toString().split(",")(1).length-2)
        var sdb = bb.split("\.")(0)
        var stb = bb.split("\.")(1)
        var sfd = bb.split("\.")(2)


        DruidInsert.insert(stb,sdb,sfd,ttb,tdb,tfd)

      }
    }
//    println(fieldLineage)
  }

  
  def resolveLogicV2(plan: LogicalPlan): Unit = {
    // 获取原始表从 LogicalRelation 或 HiveTableRelation 目标表从 InsertIntoHiveTable 和 CreateHiveTableAsSelectCommand
    // 获取转换过程从Aggregate 和 Project
    plan.collect {
      case plan: LogicalRelation => {
        val calalogTable = plan.catalogTable.get
        val tableName = calalogTable.database + "." + calalogTable.identifier.table
        plan.output.foreach(columnAttribute => {
          val columnFullName = tableName + "." + columnAttribute.name
          sourceTables += (columnAttribute.exprId.id -> columnFullName)
        })
      }
      case plan: HiveTableRelation => {
        val tableName = plan.tablemeta.database + "." + plan.tablemeta.identifier.table
        plan.output.foreach(columnAttribute => {
          val columnFullName = tableName + "." + columnAttribute.name
          sourceTables += (columnAttribute.exprId.id -> columnFullName)
        })
      }
      case plan: InsertIntoHiveTable => {
        val tableName = plan.table.database + "." + plan.table.identifier.table
        extTargetTable(tableName, plan.query)
      }
      case plan: InsertIntoHadoopFsRelationCommand=>{
        val catalogTable: CatalogTable = plan.catalogTable.get
        val tableName=catalogTable.database+"."+catalogTable.identifier.table
        extTargetTable(tableName, plan.query)
      }
      case plan: CreateHiveTableAsSelectCommand => {
        val tableName = plan.tableDesc.database + "." + plan.tableDesc.identifier.table
        extTargetTable(tableName, plan.query)
      }
      case plan: Aggregate => {
        plan.aggregateexpressions.foreach(aggItem => {
          extFieldProcess(aggItem)
        }
        )
      }
      case plan: Project => {
        plan.projectList.toList.foreach {
          pojoItem => {
            extFieldProcess(pojoItem)
          }
        }
      }
      //      case `plan` => logInfo("******child plan******:n" + plan)
    }
  }

  def extFieldProcess(namedexpression: Namedexpression): Unit = {
    //alias 存在转换关系 不然就是原本的值
    if ("alias".equals(namedexpression.prettyName)) {
      val sourceFieldId = namedexpression.exprId.id
      val targetFieldIdSet: mutable.Set[Long] = fieldProcess.getOrElse(sourceFieldId, mutable.Set.empty)
      namedexpression.references.foreach(attribute => {
        targetFieldIdSet += attribute.exprId.id
      })
      fieldProcess += (sourceFieldId -> targetFieldIdSet)
    }
  }

  def extTargetTable(tableName: String, plan: LogicalPlan): Unit = {
    logInfo("start ext target table")
    plan.output.foreach(columnAttribute => {
      val columnFullName = tableName + "." + columnAttribute.name
      targetTable += (columnAttribute.exprId.id -> columnFullName)
    })
  }

  
  def connectSourceFieldAndTargetField(): Unit = {
    val fieldIds = targetTable.keySet
    fieldIds.foreach(fieldId => {
      val resTargetFieldName = targetTable(fieldId)
      val resSourceFieldSet: mutable.Set[String] = mutable.Set.empty[String]
      if (sourceTables.contains(fieldId)) {
        val sourceFieldId = sourceTables.getOrElse(fieldId, "")
        resSourceFieldSet += sourceFieldId
      } else {
        val targetIdsTmp = findSourceField(fieldId)
        resSourceFieldSet ++= targetIdsTmp
      }
      fieldLineage += (resTargetFieldName -> resSourceFieldSet)
    })
  }

  def findSourceField(fieldId: Long): mutable.Set[String] = {
    val resSourceFieldSet: mutable.Set[String] = mutable.Set.empty[String]
    if (fieldProcess.contains(fieldId)) {
      val fieldIds: mutable.Set[Long] = fieldProcess.getOrElse(fieldId, mutable.Set.empty)
      fieldIds.foreach(fieldId => {
        if (sourceTables.contains(fieldId)) {
          resSourceFieldSet += sourceTables(fieldId)
        } else {
          val sourceFieldSet = findSourceField(fieldId)
          resSourceFieldSet ++= sourceFieldSet
        }
      })
    }
    resSourceFieldSet
  }
}

<5>、TestHive测试类

package com.roundyuan.sparkagent

import org.apache.spark.sql.SparkSession

object TestHive {
  def main(args: Array[String]): Unit = {
    // 连接hive数据仓库
    val sparkSession = SparkSession.builder()
      .config("hive.metastore.uris", "thrift://master1:9083,thrift://slave1:9083")
      .config("HiveDb","ods.db,dcl.db,dw.db,mdp.db,ads.db")
      .config("HdfsPath","hdfs://nameservice1:8020/user/hive/warehouse/")
      .config("Hdfs.dfs.nameservices","nameservice1")
      .config("Hdfs.dfs.ha.namenodes.nameservice1","namenode424,namenode442")
      .config("Hdfs.dfs.namenode.rpc-address.nameservice1.namenode424","master1:8020")
      .config("Hdfs.namenode.rpc-address.nameservice1.namenode442","master2:8020")
      .config("spark.sql.warehouse.dir","hdfs://nameservice1/user/hive/warehouse")
      .appName("HiveCaseJob")
      .master("local[*]")
      .enableHiveSupport()
      .getOrCreate()
    val listenV3 = new FlQueryExecutionListener()
    sparkSession.listenerManager.register(listenV3)
    //sparkSession.sql("show databases").show()
    sparkSession.sql("drop table tmp.t7")
    sparkSession.sql("create table tmp.t8 As select * from mdp.mdp_cusdm_social limit 10").show()
//    val a_1 = sparkSession.sql("select * from tmp.t3 limit 10").show()
//    val a_2 = sparkSession.sql("select * from tmp.t4").toDF()
//    val a_3 = a_1.join(a_2,a_1("client_oneid")===a_2("client_oneid"),"left")
//    a_3.createOrReplaceTempView("tt")
//    sparkSession.sql(
//      """
//        |create table tmp.tt_1 as select * from tt
//      """.stripMargin).show()

//    val user_log = sparkSession.sql("select * from mdp.mdp_cusdm_nature limit 100").collect()
//    val test = user_log.map(row => "user_id"+row(0))
//    test.map(row => println(row))

  }
}

<6>、pom.xml文件



    4.0.0

    com.roundyuan
    roundyuan-spark-lineage
    1.0-SNAPSHOT

    
        8
        8
        2.11.12
        2.11
        2.4.0
    

    
        
            org.apache.spark
            spark-core_${scala.binary.version}
            ${spark.version}
            
                
                    hadoop-client
                    org.apache.hadoop
                
            
        
        
            org.apache.spark
            spark-sql_${scala.binary.version}
            ${spark.version}
            
                
                    scala-reflect
                    org.scala-lang
                
            
        
        
            org.codehaus.janino
            commons-compiler
            3.0.16

        
        
            org.apache.spark
            spark-hive_${scala.binary.version}
            ${spark.version}

        
        
            org.scala-lang
            scala-library
            ${scala.version}
        
        
            org.projectlombok
            lombok
            1.16.10
        
        
        
            com.alibaba
            druid
            1.2.6
        
        
        
            mysql
            mysql-connector-java
            8.0.16
        
        
            org.postgresql
            postgresql
            42.1.1
        
        
        
            
            
            
        
        
            log4j
            log4j
            1.2.17
        
    
    
        
            
                org.apache.maven.plugins
                maven-surefire-plugin
                2.12.4
                
                    true
                
            
            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.8.1
                
                    1.8
                    1.8
                    UTF-8
                
            
            
            
                net.alchim31.maven
                scala-maven-plugin
                3.2.2
                
                    
                        eclipse-add-source
                        
                            add-source
                        
                    
                    
                        scala-compile-first
                        
                            compile
                        
                    
                    
                        scala-test-compile-first
                        
                            testCompile
                        
                    
                
            
            
            
                org.apache.maven.plugins
                maven-shade-plugin
                3.2.1
                
                    
                        package
                        
                            shade
                        
                        
                            
                            true
                            true
                            
                            dep
                            
                                
                                    
                                    *:*
                                
                            
                            
                                
                                    *:*
                                    
                                        meta-INF/*.SF
                                        meta-INF/*.DSA
                                        meta-INF/*.RSA
                                    
                                
                            
                            
                                
                                    reference.conf
                                
                            
                        
                    
                
            
        
    

四、最后的清洗结果


我的清洗结果表主要有7个字段(SourceTableNme->源表名,SourceDatabaseNme->源库名,SourceColNme->源列名,TargetTableNme->目标表名,TargetDatabaseNme->目标库名,TargetColNme->目标列名,UpdateDate->更新时间)

创作不易,如果对你有用,请为我点赞、关注,后续将会推出更加优质的内容,你的点赞和关注将是我创作最大的动力之所在!!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/450106.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号