栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

PDI(kettle) Java代码组件应用案例

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

PDI(kettle) Java代码组件应用案例

1 概述

Java代码步骤,位于Kettle转换的核心对象/脚本类别中,属于典型的需要编程基础才能掌控的步骤类型。而Java代码步骤,适用于熟悉Java语言的开发人员,用好这个步骤,需要对类、接口、多线程等语言相关知识有所掌握,并且需要对Kettle的基础框架有所理解。

2 主要方法说明 2.1 初始化

PDI转换在执行前,会有一个各步骤的初始化动作,为步骤执行前的准备工作创造机会。为提高初始化的性能,Kettle为每个步骤启用一个初始化线程,从而并行完成所有步骤的初始化。初始化的主要内容就是调用一次步骤的以下方法:

public boolean init( StepmetaInterface meta, StepDataInterfacedata)

此方法包含两个参数。其中,meta为元数据,data为数据。如果返回true,那么代表初始化成功,否则代表初始化失败。任何一个步骤初始化失败,都会导致整个转换停止执行。

2.2 执行

执行阶段是每一个步骤实现特定价值的时候。为提高效率,Kettle为每一个步骤单独启动一个线程来执行任务。线程会一直紧密循环调用步骤的processRow()方法,该方法是一步的心脏,将持续到返回 false 为止。
方法申明如下:

public boolean processRow( StepmetaInterface meta,StepDataInterfacedata ) throws KettleException;

从输入行集中取数据可以调用getRow方法。如果getRow方法返回值不为null,则步骤应将该行数据进行处理,并调用putRow方法将处理结果存入输出行集,然后返回true,以继续为下一行输入数据处理提供机会。如果getRow方法返回null,代表输入行集已经处理完毕,这时可以调用setOutputDone,标识本步骤执行完毕,并返回false,以结束本工作线程的执行。

2.3 释放资源

不管工作线程是正常执行完毕还是异常执行完毕,最终会调用dispose方法。该方法声明如下:

public void dispose( StepmetaInterface meta, StepDataInterfacedata);

一般情况下重写processRow方法即可满足需求,如果用到了一些重量级的资源,最好在init方法中初始化,并在dispose方法中释放。

3 案例分享 3.1 使用Java获取数据库中表的数据


第一个组件:增加常量,我的这里一个表面,其他的都写在了Java代码里面

第二个组件:Java代码

import java.sql.*;
long c = 0l;
String a = "";
String b = "";
public boolean processRow(StepmetaInterface smi, StepDataInterface sdi) throws KettleException
{
	Object[] r = getRow();
	if (r == null) {
		setOutputDone();
		return false;
	}
	if (first)
	{
		first = false;
	}
	r = createOutputRow(r, data.outputRowmeta.size());
	//数据库信息,简单做个例子直接写死了
	String urlString = "jdbc:oracle:thin:@ip:port:sid";
	String driverName = "oracle.jdbc.driver.OracleDriver";
	String usernameString = "username";
	String passwString = "pwd";
	PreparedStatement pst = null;
	ResultSet rs = null;

	String table = get(Fields.In, "table").getString(r);
	try {
		Class.forName(driverName);
		Connection dbconConnection  = DriverManager.getConnection(urlString,usernameString,passwString);
		String sqlString = "SELECt * FROM " + table;
		pst = dbconConnection.prepareStatement(sqlString);
		rs = pst.executeQuery();
		while (rs.next()) {
			String aa = rs.getString("a");
			String bb = rs.getString("b");
			get(Fields.Out, "a").setValue(r, aa);
			get(Fields.Out, "b").setValue(r, bb);
			get(Fields.Out, "c").setValue(r, ++ c);
			putRow(data.outputRowmeta, r);
		}

	} catch (Exception e) {
		e.printStackTrace();
	} finally {
		if(rs != null) {
				try {
					rs.close();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
	}
	if(pst != null) {
				try {
					pst.close();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
	if(myConnection != null ) {
		try {
			myConnection.close();
		} catch (SQLException e) {
			e.printStackTrace();
		}
	}
	return true;
}

第三个组件:表输出, 输出到一个数据库表,表结构可以通过获取字段直接拿到

大致内容就是这样。

3.2 Java代码组件内使用外部jar
有时候代码比较多, 逻辑比较复杂,直接在PDI里面写的话很不方便,而且调试也麻烦。这时候我们可以直接在Java开发工具里面写好,导出成一个jar,然后放到PDI的安装目录的lib下面,就可以直接引用了。

Java组件内代码

import com.controller.CheckIdcard;
import com.dao.ErrorResult;

public boolean processRow(StepmetaInterface smi, StepDataInterface sdi) throws KettleException {
  if (first) {
    first = false;
  }
  Object[] r = getRow();
  if (r == null) {
    setOutputDone();
    return false;
  }
  r = createOutputRow(r, data.outputRowmeta.size());
String idCard = get(Fields.In, "a").getString(r);

ErrorResult checkInfo = CheckIdcard.isNormal(idCard, "a");
if("身份证校验合格!".equals(checkInfo.getErrorDescribe())) {
	putRow(data.outputRowmeta, r);
} else {
	//错误处理的代码,Java步骤连接下一步骤的时候需要选择错误处理时需要用到
	putError(data.outputRowmeta, r, checkInfo.getErrorNum(), checkInfo.getErrorDescribe(), checkInfo.getErrorColumn(), checkInfo.getErrorCode());
}
  return true;
}

Java外部代码:
ErrorResult.java

package com.dao;

public class ErrorResult {
	//错误列
	private String errorColumn;
	//错误代码
	private String errorCode;
	//错误行数
	private int errorNum;
	//错误描述
	private String errorDescribe;
	public ErrorResult() {};
	public ErrorResult(String errorColumn, String errorCode, int errorNum, String errorDescribe) {
		this.errorColumn = errorColumn;
		this.errorCode = errorCode;
		this.errorNum = errorNum;
		this.errorDescribe = errorDescribe;
	}
	public String toString () {
		return "errorColumn:" + this.errorColumn + ",errorCode:" + this.errorCode + ",errorNum:" + this.errorNum + ",errorDescribe:" + this.errorDescribe;
	}
	public String getErrorColumn() {
		return errorColumn;
	}
	public void setErrorColumn(String errorColumn) {
		this.errorColumn = errorColumn;
	}
	public String getErrorCode() {
		return errorCode;
	}
	public void setErrorCode(String errorCode) {
		this.errorCode = errorCode;
	}
	public int getErrorNum() {
		return errorNum;
	}
	public void setErrorNum(int errorNum) {
		this.errorNum = errorNum;
	}
	public String getErrorDescribe() {
		return errorDescribe;
	}
	public void setErrorDescribe(String errorDescribe) {
		this.errorDescribe = errorDescribe;
	}
}

CheckIdcard.java

package com.controller;

import com.dao.ErrorResult;


public class CheckIdcard {
	public static ErrorResult isNormal(String idCard, String column) {
		int errorNum = 0;
		ErrorResult er = new ErrorResult();
		if (idCard == null || "".equals(idCard)) {
			er.setErrorCode("NOT_ALLOW_IDCARD");
			er.setErrorColumn(column);
			er.setErrorDescribe("身份证号码为空!");
			er.setErrorNum(++ errorNum);
			return er;
		}
		 // 定义判别用户身份证号的正则表达式(15位或者18位,最后一位可以为字母)
		String reg = "(^[1-9]\d{5}(19|([23]\d))\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\d{3}[0-9Xx]$)|(^[1-9]\d{5}\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\d{2}$)";
		//判断是否符合正则校验
        boolean isOK = idCard.matches(reg);
        if(isOK) {
        	if (idCard.length() == 18) {
                try {
                    char[] charArray = idCard.toCharArray();
                    // 前十七位加权因子
                    int[] idCardWi = {7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2};
                    // 这是除以11后,可能产生的11位余数对应的验证码
                    String[] idCardY = {"1", "0", "X", "9", "8", "7", "6", "5", "4", "3", "2"};
                    int sum = 0;
                    for (int i = 0; i < idCardWi.length; i++) {
                        int current = Integer.parseInt(String.valueOf(charArray[i]));
                        int count = current * idCardWi[i];
                        sum += count;
                    }
                    char idCardLast = charArray[17];
                    int idCardMod = sum % 11;
                    if (idCardY[idCardMod].toUpperCase().equals(String.valueOf(idCardLast).toUpperCase())) {
                    	er.setErrorCode("");
            			er.setErrorColumn(column);
            			er.setErrorDescribe("身份证校验合格!");
            			er.setErrorNum(errorNum);
            			return er;
                    } else {
//                        System.out.println("身份证最后一位:" + String.valueOf(idCardLast).toUpperCase()
//                                + "错误,正确的应该是:" + idCardY[idCardMod].toUpperCase());
                    	er.setErrorCode("NOT_ALLOW_IDCARD");
            			er.setErrorColumn(column);
            			er.setErrorDescribe("身份证号码校验失败,请检查!");
            			er.setErrorNum(++ errorNum);
            			return er;
                    }
 
                } catch (Exception e) {
                    e.printStackTrace();
                    er.setErrorCode("NOT_ALLOW_IDCARD");
        			er.setErrorColumn(column);
        			er.setErrorDescribe("身份证号码校验失败,请检查!");
        			er.setErrorNum(++ errorNum);
        			return er;
                }
            } else if(idCard.length() == 15) {
            	er.setErrorCode("");
    			er.setErrorColumn(column);
    			er.setErrorDescribe("身份证校验合格!");
    			er.setErrorNum(errorNum);
    			return er;
            } else {
            	er.setErrorCode("NOT_ALLOW_IDCARD");
    			er.setErrorColumn(column);
    			er.setErrorDescribe("身份证号码校验失败,请检查!");
    			er.setErrorNum(++ errorNum);
    			return er;
            }

        } else {
        	er.setErrorCode("NOT_ALLOW_IDCARD");
			er.setErrorColumn(column);
			er.setErrorDescribe("身份证号码校验失败,请检查!");
			er.setErrorNum(++ errorNum);
			return er;
        }
		
	}
	
	public static void main(String[] args) {
		ErrorResult er = isNormal("36243088052422", "c");
		System.out.println(er.toString());
	}

}

3.3 Java组件调用本转换已有的数据库连接

Java代码组件内容大致如下:

import java.sql.*;
import org.pentaho.di.core.database.*;

public boolean processRow(StepmetaInterface smi, StepDataInterface sdi) throws KettleException
{
	Object[] r = getRow();
	if (r == null) {
		setOutputDone();
		return false;
	}

	//获取数据库名和表名
	String table = get(Fields.In, "table").getString(r);
	String dbName = get(Fields.In, "dbName").getString(r);
	//数据库连接
	Database database=null;
	Databasemeta databasemeta=null;
	try {
		databasemeta = getTransmeta().findDatabase(dbName);
		if (databasemeta == null) {
			logError("A connection with name "+dbName+" could not be found!");
			setErrors(1);
			return false;
		}
		database = new Database(getTrans(), databasemeta);
		database.connect();
	} catch(Exception e) {
		logError("Connecting to database "+dbName+" failed.", e);
		setErrors(1);
		return false;
	}

	//查询表数据
	String sql="select a, b from " + table;
	ResultSet resultSet;
	try {
		resultSet = database.openQuery(sql);
		Object[] idxRow = database.getRow(resultSet);
		RowmetaInterface idxRowmeta =null;
		if(idxRow!=null){
			idxRowmeta=database.getReturnRowmeta();
		}
		int i=0;
		while(idxRow!=null){
			r = createOutputRow(r, data.outputRowmeta.size());
			int index = getInputRowmeta().size();
			r[index++] = idxRowmeta.getString(idxRow, "a", null);
			r[index++] = idxRowmeta.getString(idxRow, "b", null);
			putRow(data.outputRowmeta, r);
			idxRow = database.getRow(resultSet);
			i++;
		}
	} catch(Exception e) {
		throw new KettleException(e);
	} 
	//释放连接, 这个应该可以不写, 每个组件最后都会自动执行dispose释放资源
	if (database!=null) {
		database.disconnect();
		database.closeQuery(resultSet);
	}
	return true;
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/644580.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号