栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink demo

flink demo

main:
package com;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.alibaba.fastjson.JSONObject;
import com.process.MyProcessFunction;
import com.sink.LogSink;
import com.source.DataSourceGet;
import com.utils.PropertiesUtils;


public class IotMain {

	
	public static void main(final String[] args) throws Exception {
		// 0.获取环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		// 检查点 every 5000 msecs
		env.enableCheckpointing(5000);
		// 1.获取参数
		final JSonObject argsObject = getFormArgs(args);
		PropertiesUtils.initPropertiesByKey(argsObject.getString("prop.path"));
		System.out.println(argsObject.toJSonString());
		//
		DataStreamSource> dataStreamSource = env
				.addSource(new DataSourceGet().getFibonacciSource(argsObject));

		SingleOutputStreamOperator process = dataStreamSource.process(new MyProcessFunction());

		process.addSink(new LogSink());
		env.execute("iot test");
	}

	
	private static JSonObject getFormArgs(final String[] args) {
		// 参数map
		JSonObject argsObject = new JSonObject();
		// 循环参数和值
		for (int i = 0; i < args.length; i++) {
			String[] args_is = args[i].split("=");
			if (args_is.length == 2) {
				argsObject.put(args_is[0], args_is[1]);
			}
		}
		return argsObject;
	}

}
DataSourceGet:
package com.source;

import com.alibaba.fastjson.JSONObject;
import com.utils.KafkaProduceUtils;

public class DataSourceGet {

	private KafkaProduceUtils kafkaProduceUtils = new KafkaProduceUtils();

	public FibonacciSource getFibonacciSource(final JSonObject argsObject) {

		//
		kafkaProduceUtils.producer("dh_test_dh", "123");
		
		//
		FibonacciSource fibonacciSource = new FibonacciSource(argsObject);
		return fibonacciSource;
	}

}
MyProcessFunction:
package com.process;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.log4j.Logger;

import com.utils.JedisUtil;


public class MyProcessFunction extends ProcessFunction, String> {

	private static final long serialVersionUID = 665249935064432746L;

	private static Logger logger = Logger.getLogger(MyProcessFunction.class);

	private JedisUtil jedisUtil = new JedisUtil();

	@Override
	public void processElement(final Tuple2 data, //
			final ProcessFunction, String>.Context context, //
			final Collector out) throws Exception {

		final Integer key = data.f0;
		final Integer value = data.f1;

		// 从redis获取规则
		String redisValue = jedisUtil.hget("afcs", "event_47_post");
		if (logger.isInfoEnabled()) {
			logger.info("##### redisValue为:" + redisValue + "#####");
		}

		out.collect("key:" + key + ",value:" + value);
	}

}
LogSink:
package com.sink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.log4j.Logger;


public class LogSink extends RichSinkFunction {

	private static final long serialVersionUID = -8897600652941199311L;
	private static Logger logger = Logger.getLogger(LogSink.class);

	
	@Override
	public void open(final Configuration parameters) throws Exception {
		super.open(parameters);
	}

	
	@Override
	public void invoke(final String value, final Context context) throws Exception {
		logger.info("rn##### value:" + value + " #####rn");
	}

	
	@Override
	public void close() throws Exception {
		super.close();
	}
}
JedisUtil:
package com.utils;

import java.io.Serializable;
import java.util.Map;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.util.SafeEncoder;


public class JedisUtil implements Serializable {

	private static final long serialVersionUID = 2712033148155101965L;

	private static volatile JedisPool jedisPool = null;

	
	public JedisPool getJedisPool() {
		if (jedisPool == null) {
			synchronized (this) {
				if (jedisPool == null) {
					String redisHost = PropertiesUtils.getPropertiesByKey("redis.host");
					String redisPort = PropertiesUtils.getPropertiesByKey("redis.port");
					String redisPass = PropertiesUtils.getPropertiesByKey("redis.password");
					String database = PropertiesUtils.getPropertiesByKey("redis2.database");
					String redisPoolMaxIdle = PropertiesUtils.getPropertiesByKey("redis.jedis.pool.max-idle");

					JedisPoolConfig config = new JedisPoolConfig();
					// 控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;
					// 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
					config.setMaxTotal(500);
					// 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
					config.setMaxIdle(Integer.parseInt(redisPoolMaxIdle));
					// 表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
					config.setMaxWaitMillis(1000 * 100);
					// 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
					config.setTestonBorrow(true);
					// redis如果设置了密码:
					jedisPool = new JedisPool(config, redisHost, Integer.parseInt(redisPort), 10000, redisPass,
							Integer.parseInt(database));
				}
			}
		}
		return jedisPool;
	}

	
	public Jedis getJedis() {
		return getJedisPool().getResource();
	}

	
	public void returnJedis(final Jedis jedis) {
		if (null != jedis) {
			jedis.close();
		}
	}

	
	public long del(final String... keys) {
		Jedis jedis = getJedis();
		try {
			long count = jedis.del(keys);
			return count;
		} finally {
			returnJedis(jedis);
		}
	}

	
	public long hdel(final String key, final String fieid) {
		Jedis jedis = getJedis();
		try {
			long s = jedis.hdel(key, fieid);
			return s;
		} finally {
			returnJedis(jedis);
		}
	}

	
	public long hset(final String key, final String fieid, final String value) {
		Jedis jedis = getJedis();
		try {
			long s = jedis.hset(key, fieid, value);
			return s;
		} finally {
			returnJedis(jedis);
		}
	}

	
	public String hget(final String key, final String fieid) {
		Jedis jedis = getJedis();
		try {
			String s = jedis.hget(key, fieid);
			return s;
		} finally {
			returnJedis(jedis);
		}
	}

	
	public Map hgetAll(final String key) {
		Jedis jedis = getJedis();
		try {
			Map map = jedis.hgetAll(key);
			return map;
		} finally {
			returnJedis(jedis);
		}
	}

	
	public String get(final String key) {
		Jedis jedis = getJedis();
		try {
			String value = jedis.get(key);
			return value;
		} finally {
			returnJedis(jedis);
		}
	}

	
	public String set(final String key, final String value) {
		return set(SafeEncoder.encode(key), SafeEncoder.encode(value));
	}

	
	public String set(final byte[] key, final byte[] value) {
		Jedis jedis = getJedis();
		try {
			String status = jedis.set(key, value);
			return status;
		} finally {
			returnJedis(jedis);
		}
	}

}
KafkaProduceUtils:
package com.utils;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Logger;


public class KafkaProduceUtils implements Serializable {

	private static final long serialVersionUID = 3442009340575361234L;

	private static Logger logger = Logger.getLogger(KafkaProduceUtils.class);

	private static volatile KafkaProducer kafkaProducer;

	
	public KafkaProducer getkafkaProducer() {
		if (kafkaProducer == null) {
			synchronized (this) {
				if (kafkaProducer == null) {
					logger.info("##### new KafkaProducer 开始#####");
					String bootstrap = PropertiesUtils.getPropertiesByKey("kafka.second.bootstrap-servers");
					String retry = PropertiesUtils.getPropertiesByKey("kafka.second.producer.retries");
					String acks = PropertiesUtils.getPropertiesByKey("kafka.second.producer.acks");
					final Map props = new HashMap();
					props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
					props.put(ProducerConfig.RETRIES_CONFIG, Integer.parseInt(retry));
					props.put(ProducerConfig.ACKS_CONFIG, acks);
					props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
					props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
					kafkaProducer = new KafkaProducer(props);
					logger.info("##### new KafkaProducer 结束#####");
				}
			}
		}
		return kafkaProducer;
	}

	
	public void producer(final String topic, final String value) {
		logger.info("##### kafkaProduceUtils开始,topic=" + topic + ",value=" + value);
		// 发送消息
		getkafkaProducer().send(new ProducerRecord(topic, value));
		logger.info("##### kafkaProduceUtils结束 #####");
	}

}
PropertiesUtils:
package com.utils;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Properties;

import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;


public class PropertiesUtils implements Serializable {

	private static final long serialVersionUID = 315601047419047701L;

	private static Logger logger = Logger.getLogger(PropertiesUtils.class);

	private static Properties properties = null;

	
	public static void initPropertiesByKey(final String propPath) {
		properties = new Properties();
		try {
			if (StringUtils.isNotEmpty(propPath)) {
				// 优先从项目路径获取连接信息
				// String confPath = System.getProperty("user.dir");
				String confPath = propPath + File.separator + "environment.properties";
				final File file = new File(confPath);
				if (file.exists()) {
					logger.info("配置文件路径---->>" + confPath);
					System.out.println("配置文件路径---->>" + confPath);
					final InputStream in = new FileInputStream(new File(confPath));
					properties.load(in);
				} else {// 未传入路径时,读取classpath路径
					logger.info("项目路径confPath[" + confPath + "]下无连接信息,默认从classpath路径下加载properties文件");
					System.out.println("项目路径confPath[" + confPath + "]下无连接信息,默认从classpath路径下加载properties文件");
					
					final InputStream in = PropertiesUtils.class.getClassLoader()
							.getResourceAsStream("environment.properties");
					properties.load(in);
				}
			} else {// propPath属性为空
				logger.info("项目路径propPath没有传入,默认从classpath路径下加载properties文件");
				System.out.println("项目路径propPath没有传入,默认从classpath路径下加载properties文件");
				
				final InputStream in = PropertiesUtils.class.getClassLoader()
						.getResourceAsStream("environment.properties");
				properties.load(in);
			}

		} catch (IOException e) {
			logger.error(e);
		}
	}

	
	public static String getPropertiesByKey(final String key) {
		String value = properties.getProperty(key);
		return value == null ? "" : value;
	}

}
environment.properties
#kafka
kafka.bootstrap-servers=10.11.11.11:9092
kafka.consumer.group-id=aa

#kafka amqp
kafka.second.bootstrap-servers=10.11.11.12:9092
kafka.second.producer.acks=1
kafka.second.producer.retries=0

#redis.host
redis.host=10.11.11.11
redis.port=6379
redis.password=111111
redis2.database=1
redis.jedis.pool.max-idle=8

#rabbitmq.useflag
rabbitmq.queue=test
rabbitmq.host=10.11.11.11
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.vhost=/vadmin_host

#
jdbc.url=jdbc:mysql://10.11.11.11:3306/aa_aa?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&zeroDateTimeBehavior=convertToNull
jdbc.username=root
jdbc.password=111111


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/327038.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号