栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink 动态加载 Jar 包,实现自定义算子加载执行

Flink 动态加载 Jar 包,实现自定义算子加载执行

Flink 动态加载 Jar 包,实现自定义算子加载执行
  • 动态加载Jar
  • 动态Jar调用方式

动态加载Jar

对应的参数:

  • path:Jar 的存放路径。
  • env:Flink 的环境实例。
  • classPath:动态Jar的类路径。
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.ygsoft.dataprocess.exception.LoadFunctionException;


public class LoadFunction {

	public static Class loadJar(final String path, final StreamExecutionEnvironment env, final String classPath) throws LoadFunctionException {
		
		final String newPath = "file:///" + path;
		
		try {
			loadJar(new URL(newPath));

			Field configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration");
			configuration.setAccessible(true);
			Configuration o = (Configuration) configuration.get(env);

			Field confData = Configuration.class.getDeclaredField("confData");
			confData.setAccessible(true);
			
			@SuppressWarnings("unchecked")
			Map temp = (Map) confData.get(o);
			List jarList = new ArrayList<>();
			jarList.add(newPath);
			temp.put("pipeline.classpaths", jarList);
			
			Class result = Class.forName(classPath);
			
			return result;
		} catch(Exception e) {
			throw new LoadFunctionException("加载动态jar包异常");
		}
	}

	// 动态加载Jar
	private static void loadJar(final URL jarUrl) {
		// 从URLClassLoader类加载器中获取类的addURL方法
		Method method = null;
		try {
			method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
		} catch (NoSuchMethodException | SecurityException e1) {
			e1.printStackTrace();
		}
		// 获取方法的访问权限
		boolean accessible = method.isAccessible();
		try {
			// 修改访问权限为可写
			if (accessible == false) {
				method.setAccessible(true);
			}
			// 获取系统类加载器
			URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
			// jar路径加入到系统url路径里
			method.invoke(classLoader, jarUrl);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			method.setAccessible(accessible);
		}
	}
}

动态Jar调用方式
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.ygsoft.dataprocess.exception.LoadFunctionException;
import com.ygsoft.dataprocess.load.LoadFunction;
import com.ygsoft.dataprocess.sdk.domain.NodeParam;
import com.ygsoft.dataprocess.sdk.flow.IFlow;
import com.ygsoft.dataprocess.vo.flow.CustomCondition;


public class CustomConditionFlow implements IFlow {

	private StreamExecutionEnvironment env;
	
	public CustomConditionFlow(StreamExecutionEnvironment env) {
		this.env = env;
	}
	
	@Override
	public DataStream> execute(final DataStream> input, final NodeParam param) {
		final Object nodeParams = param.getParams();
		final CustomCondition condition = new CustomCondition(nodeParams);
		
		final String jarPath = condition.getJarPath();
		final String classPath = condition.getClassPath();
		final String methodName = condition.getMethod();
		
		try {
			Class c = LoadFunction.loadJar(jarPath, env, classPath);
			
			Object obj = c.newInstance();
			
			// 获取方法对象
			Method method = obj.getClass().getMethod(methodName, DataStream.class, param.getClass());
			
			// 执行方法
			@SuppressWarnings("unchecked")
			DataStream> result = (DataStream>) method.invoke(obj, input, param);
			
			return result;
		} catch (LoadFunctionException e) {
			e.printStackTrace();
			throw new RuntimeException(e.getMessage());
		} catch (InstantiationException e) {
			e.printStackTrace();
			throw new RuntimeException(e);
		} catch (IllegalAccessException e) {
			e.printStackTrace();
			throw new RuntimeException(e);
		} catch (NoSuchMethodException e) {
			e.printStackTrace();
			throw new RuntimeException(e);
		} catch (SecurityException e) {
			e.printStackTrace();
			throw new RuntimeException(e);
		} catch (IllegalArgumentException e) {
			e.printStackTrace();
			throw new RuntimeException(e);
		} catch (InvocationTargetException e) {
			e.printStackTrace();
			throw new RuntimeException(e);
		}
	}

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/696382.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号