- 动态加载Jar
- 动态Jar调用方式
对应的参数:
- path:Jar 的存放路径。
- env:Flink 的环境实例。
- classPath:动态Jar的类路径。
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ygsoft.dataprocess.exception.LoadFunctionException;
public class LoadFunction {
public static Class> loadJar(final String path, final StreamExecutionEnvironment env, final String classPath) throws LoadFunctionException {
final String newPath = "file:///" + path;
try {
loadJar(new URL(newPath));
Field configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration");
configuration.setAccessible(true);
Configuration o = (Configuration) configuration.get(env);
Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
@SuppressWarnings("unchecked")
Map temp = (Map) confData.get(o);
List jarList = new ArrayList<>();
jarList.add(newPath);
temp.put("pipeline.classpaths", jarList);
Class> result = Class.forName(classPath);
return result;
} catch(Exception e) {
throw new LoadFunctionException("加载动态jar包异常");
}
}
// 动态加载Jar
private static void loadJar(final URL jarUrl) {
// 从URLClassLoader类加载器中获取类的addURL方法
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
// 修改访问权限为可写
if (accessible == false) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
// jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}
}
动态Jar调用方式
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ygsoft.dataprocess.exception.LoadFunctionException;
import com.ygsoft.dataprocess.load.LoadFunction;
import com.ygsoft.dataprocess.sdk.domain.NodeParam;
import com.ygsoft.dataprocess.sdk.flow.IFlow;
import com.ygsoft.dataprocess.vo.flow.CustomCondition;
public class CustomConditionFlow implements IFlow {
private StreamExecutionEnvironment env;
public CustomConditionFlow(StreamExecutionEnvironment env) {
this.env = env;
}
@Override
public DataStream


