目录
一、 流程
二、 代码
一、 流程
在继承kettle的类之前,先去看spoon中有一个脚本的组件,可以使用java代码或者js等处理数据,
当把这个流程配置起来的时候,是如下所示,
当双击这个main时,会出现一个方法,这就是处理行数据的方法,
其中有参考示例,设置值的示例,如下图所示,
当运行后结果如下图,确实,通过Java代码处理了数据,
二、 代码
在代码中可以定义Java节点,用来执行对应的代码,而代码就是界面工具中的processRow方法,那么也就是说可以通过processRow这个方法来处理数据。
private Stepmeta getJavaStep(Transmeta transmeta, PluginRegistry registry){
UserDefinedJavaClassmeta javaClassmeta = new UserDefinedJavaClassmeta();
//Java代码
String sourceCode = "public boolean processRow(StepmetaInterface smi, StepDataInterface sdi) throws KettleException {n" +
" if (first) {n" +
" first = false;n" +
"n" +
" n" +
" }n" +
"n" +
" Object[] r = getRow();n" +
"n" +
" if (r == null) {n" +
" setOutputDone();n" +
" return false;n" +
" }n" +
"n" +
" // It is always safest to call createOutputRow() to ensure that your output row's Object[] is largen" +
" // enough to handle any new fields you are creating in this step.n" +
" r = createOutputRow(r, data.outputRowmeta.size());n" +
"n" +
" n" +
"tString name = get(Fields.In,"name").getString(r);n" +
"tif(null!=name){n" +
"ttname = name+"_new";n" +
"t}n" +
"tget(Fields.Out,"new_name").setValue(r,name);n" +
"n" +
" // Send the row on to the next step.n" +
" putRow(data.outputRowmeta, r);n" +
"n" +
" return true;n" +
"}";
UserDefinedJavaClassDef classDef = new UserDefinedJavaClassDef(UserDefinedJavaClassDef.ClassType.TRANSFORM_CLASS,"Processor",sourceCode);
List classDefs = new ArrayList<>();
classDefs.add(classDef);
//添加Java脚本到节点中
javaClassmeta.replaceDefinitions(classDefs);
List fields = new ArrayList<>();
//定义目标输出字段
UserDefinedJavaClassmeta.FieldInfo fieldInfo =
new UserDefinedJavaClassmeta.FieldInfo("new_name",ValuemetaInterface.TYPE_STRING,-1,-1);
fields.add(fieldInfo);
javaClassmeta.setFieldInfo(fields);
String javaClassPluginId = registry.getPluginId(StepPluginType.class, javaClassmeta);
Stepmeta javaClassStep = new Stepmeta(javaClassPluginId, "Java 代码", (StepmetaInterface) javaClassmeta);
javaClassStep.setDraw(true);
javaClassStep.setLocation(560,304);
transmeta.addStep(javaClassStep);
return javaClassStep;
}
首先以TableInput和TableOutput这两个kettle中常用组件来说。
打开这两个的源码,发现都有processRow这个方法,那么也就是说表输入和表输出的数据处理都可以在此进行,
那么是否可以继承TableInput和TableOutput,并重写processRow来定义自己的处理方式呢?
TableInput
public class TableInput extends baseStep implements StepInterface {
private TableInputmeta meta;
private TableInputData data;
public boolean processRow( StepmetaInterface smi, StepDataInterface sdi ) throws KettleException {
//表查询
boolean success = doQuery( parametersmeta, parameters );
//设置数据
putRow( data.rowmeta, data.thisrow );
}
private boolean doQuery( RowmetaInterface parametersmeta, Object[] parameters ) throws KettleDatabaseException {
}
}
TableOutput
public class TableOutput extends baseStep implements StepInterface {
private TableOutputmeta meta;
private TableOutputData data;
public boolean processRow( StepmetaInterface smi, StepDataInterface sdi ) throws KettleException {
meta = (TableOutputmeta) smi;
data = (TableOutputData) sdi;
//获取数据
Object[] r = getRow();
try {
//写数据到表
Object[] outputRowData = writeToTable( getInputRowmeta(), r );
if ( outputRowData != null ) {
putRow( data.outputRowmeta, outputRowData ); // in case we want it go further...
}
} catch ( KettleException e ) {
}
}
}
后面就以这两个为例,来写自己的处理方式。



