启动 flink run -d -c class.main -p 3 xxx.jar xxx.properties
public static void main(String[] args) throws Exception {
// 解析参数
org.apache.commons.configuration2.Configuration conf = ConfigInitialization.initConfig(args);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(conf.getInt(MsmqConfigurationConsts.Flink_PARALLELISM));
// 设置全局参数
env.getConfig().setGlobalJobParameters(StaticFunctionCreator.clickhouseCommonConfig(conf));
env.execute("flink test");
}
import java.util.Map;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.ConfigurationMap;
import org.apache.commons.configuration2.FilebasedConfiguration;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.configuration2.builder.FilebasedConfigurationBuilder;
import org.apache.commons.configuration2.builder.fluent.Parameters;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConfigInitialization {
private static final Logger logger = LoggerFactory.getLogger(ConfigInitialization.class);
private ConfigInitialization() {}
public static Configuration initConfig(String[] args) {
String filePath = "develop_config.properties";
if(args.length>0) {
logger.info("The specified input parameter is {}" ,args[0]);
filePath = args[0];
}
FilebasedConfigurationBuilder builder = new FilebasedConfigurationBuilder(
PropertiesConfiguration.class).configure(new Parameters().properties().setFileName(filePath));
Configuration config = null;
try {
config = builder.getConfiguration();
} catch (ConfigurationException e) {
logger.error("Could not load configuration file|path:{}",filePath,e);
System.exit(-1);
}
return config;
}
}



