main:
package com;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.alibaba.fastjson.JSONObject;
import com.process.MyProcessFunction;
import com.sink.LogSink;
import com.source.DataSourceGet;
import com.utils.PropertiesUtils;
public class IotMain {
public static void main(final String[] args) throws Exception {
// 0.获取环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 检查点 every 5000 msecs
env.enableCheckpointing(5000);
// 1.获取参数
final JSonObject argsObject = getFormArgs(args);
PropertiesUtils.initPropertiesByKey(argsObject.getString("prop.path"));
System.out.println(argsObject.toJSonString());
//
DataStreamSource> dataStreamSource = env
.addSource(new DataSourceGet().getFibonacciSource(argsObject));
SingleOutputStreamOperator process = dataStreamSource.process(new MyProcessFunction());
process.addSink(new LogSink());
env.execute("iot test");
}
private static JSonObject getFormArgs(final String[] args) {
// 参数map
JSonObject argsObject = new JSonObject();
// 循环参数和值
for (int i = 0; i < args.length; i++) {
String[] args_is = args[i].split("=");
if (args_is.length == 2) {
argsObject.put(args_is[0], args_is[1]);
}
}
return argsObject;
}
}
DataSourceGet:
package com.source;
import com.alibaba.fastjson.JSONObject;
import com.utils.KafkaProduceUtils;
public class DataSourceGet {
private KafkaProduceUtils kafkaProduceUtils = new KafkaProduceUtils();
public FibonacciSource getFibonacciSource(final JSonObject argsObject) {
//
kafkaProduceUtils.producer("dh_test_dh", "123");
//
FibonacciSource fibonacciSource = new FibonacciSource(argsObject);
return fibonacciSource;
}
}
MyProcessFunction:
package com.process;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.log4j.Logger;
import com.utils.JedisUtil;
public class MyProcessFunction extends ProcessFunction, String> {
private static final long serialVersionUID = 665249935064432746L;
private static Logger logger = Logger.getLogger(MyProcessFunction.class);
private JedisUtil jedisUtil = new JedisUtil();
@Override
public void processElement(final Tuple2 data, //
final ProcessFunction, String>.Context context, //
final Collector out) throws Exception {
final Integer key = data.f0;
final Integer value = data.f1;
// 从redis获取规则
String redisValue = jedisUtil.hget("afcs", "event_47_post");
if (logger.isInfoEnabled()) {
logger.info("##### redisValue为:" + redisValue + "#####");
}
out.collect("key:" + key + ",value:" + value);
}
}
LogSink:
package com.sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.log4j.Logger;
public class LogSink extends RichSinkFunction {
private static final long serialVersionUID = -8897600652941199311L;
private static Logger logger = Logger.getLogger(LogSink.class);
@Override
public void open(final Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void invoke(final String value, final Context context) throws Exception {
logger.info("rn##### value:" + value + " #####rn");
}
@Override
public void close() throws Exception {
super.close();
}
}
JedisUtil:
package com.utils;
import java.io.Serializable;
import java.util.Map;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.util.SafeEncoder;
public class JedisUtil implements Serializable {
private static final long serialVersionUID = 2712033148155101965L;
private static volatile JedisPool jedisPool = null;
public JedisPool getJedisPool() {
if (jedisPool == null) {
synchronized (this) {
if (jedisPool == null) {
String redisHost = PropertiesUtils.getPropertiesByKey("redis.host");
String redisPort = PropertiesUtils.getPropertiesByKey("redis.port");
String redisPass = PropertiesUtils.getPropertiesByKey("redis.password");
String database = PropertiesUtils.getPropertiesByKey("redis2.database");
String redisPoolMaxIdle = PropertiesUtils.getPropertiesByKey("redis.jedis.pool.max-idle");
JedisPoolConfig config = new JedisPoolConfig();
// 控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;
// 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
config.setMaxTotal(500);
// 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
config.setMaxIdle(Integer.parseInt(redisPoolMaxIdle));
// 表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
config.setMaxWaitMillis(1000 * 100);
// 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
config.setTestonBorrow(true);
// redis如果设置了密码:
jedisPool = new JedisPool(config, redisHost, Integer.parseInt(redisPort), 10000, redisPass,
Integer.parseInt(database));
}
}
}
return jedisPool;
}
public Jedis getJedis() {
return getJedisPool().getResource();
}
public void returnJedis(final Jedis jedis) {
if (null != jedis) {
jedis.close();
}
}
public long del(final String... keys) {
Jedis jedis = getJedis();
try {
long count = jedis.del(keys);
return count;
} finally {
returnJedis(jedis);
}
}
public long hdel(final String key, final String fieid) {
Jedis jedis = getJedis();
try {
long s = jedis.hdel(key, fieid);
return s;
} finally {
returnJedis(jedis);
}
}
public long hset(final String key, final String fieid, final String value) {
Jedis jedis = getJedis();
try {
long s = jedis.hset(key, fieid, value);
return s;
} finally {
returnJedis(jedis);
}
}
public String hget(final String key, final String fieid) {
Jedis jedis = getJedis();
try {
String s = jedis.hget(key, fieid);
return s;
} finally {
returnJedis(jedis);
}
}
public Map hgetAll(final String key) {
Jedis jedis = getJedis();
try {
Map map = jedis.hgetAll(key);
return map;
} finally {
returnJedis(jedis);
}
}
public String get(final String key) {
Jedis jedis = getJedis();
try {
String value = jedis.get(key);
return value;
} finally {
returnJedis(jedis);
}
}
public String set(final String key, final String value) {
return set(SafeEncoder.encode(key), SafeEncoder.encode(value));
}
public String set(final byte[] key, final byte[] value) {
Jedis jedis = getJedis();
try {
String status = jedis.set(key, value);
return status;
} finally {
returnJedis(jedis);
}
}
}
KafkaProduceUtils:
package com.utils;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Logger;
public class KafkaProduceUtils implements Serializable {
private static final long serialVersionUID = 3442009340575361234L;
private static Logger logger = Logger.getLogger(KafkaProduceUtils.class);
private static volatile KafkaProducer kafkaProducer;
public KafkaProducer getkafkaProducer() {
if (kafkaProducer == null) {
synchronized (this) {
if (kafkaProducer == null) {
logger.info("##### new KafkaProducer 开始#####");
String bootstrap = PropertiesUtils.getPropertiesByKey("kafka.second.bootstrap-servers");
String retry = PropertiesUtils.getPropertiesByKey("kafka.second.producer.retries");
String acks = PropertiesUtils.getPropertiesByKey("kafka.second.producer.acks");
final Map props = new HashMap();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
props.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(retry));
props.put(ProducerConfig.ACKS_CONFIG, acks);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaProducer = new KafkaProducer(props);
logger.info("##### new KafkaProducer 结束#####");
}
}
}
return kafkaProducer;
}
public void producer(final String topic, final String value) {
logger.info("##### kafkaProduceUtils开始,topic=" + topic + ",value=" + value);
// 发送消息
getkafkaProducer().send(new ProducerRecord(topic, value));
logger.info("##### kafkaProduceUtils结束 #####");
}
}
PropertiesUtils:
package com.utils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
public class PropertiesUtils implements Serializable {
private static final long serialVersionUID = 315601047419047701L;
private static Logger logger = Logger.getLogger(PropertiesUtils.class);
private static Properties properties = null;
public static void initPropertiesByKey(final String propPath) {
properties = new Properties();
try {
if (StringUtils.isNotEmpty(propPath)) {
// 优先从项目路径获取连接信息
// String confPath = System.getProperty("user.dir");
String confPath = propPath + File.separator + "environment.properties";
final File file = new File(confPath);
if (file.exists()) {
logger.info("配置文件路径---->>" + confPath);
System.out.println("配置文件路径---->>" + confPath);
final InputStream in = new FileInputStream(new File(confPath));
properties.load(in);
} else {// 未传入路径时,读取classpath路径
logger.info("项目路径confPath[" + confPath + "]下无连接信息,默认从classpath路径下加载properties文件");
System.out.println("项目路径confPath[" + confPath + "]下无连接信息,默认从classpath路径下加载properties文件");
final InputStream in = PropertiesUtils.class.getClassLoader()
.getResourceAsStream("environment.properties");
properties.load(in);
}
} else {// propPath属性为空
logger.info("项目路径propPath没有传入,默认从classpath路径下加载properties文件");
System.out.println("项目路径propPath没有传入,默认从classpath路径下加载properties文件");
final InputStream in = PropertiesUtils.class.getClassLoader()
.getResourceAsStream("environment.properties");
properties.load(in);
}
} catch (IOException e) {
logger.error(e);
}
}
public static String getPropertiesByKey(final String key) {
String value = properties.getProperty(key);
return value == null ? "" : value;
}
}
environment.properties
#kafka
kafka.bootstrap-servers=10.11.11.11:9092
kafka.consumer.group-id=aa
#kafka amqp
kafka.second.bootstrap-servers=10.11.11.12:9092
kafka.second.producer.acks=1
kafka.second.producer.retries=0
#redis.host
redis.host=10.11.11.11
redis.port=6379
redis.password=111111
redis2.database=1
redis.jedis.pool.max-idle=8
#rabbitmq.useflag
rabbitmq.queue=test
rabbitmq.host=10.11.11.11
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.vhost=/vadmin_host
#
jdbc.url=jdbc:mysql://10.11.11.11:3306/aa_aa?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull
jdbc.username=root
jdbc.password=111111