和其他所有的计算框架一样,flink也有一些基础的开发步骤以及基础,核心的API,从开发步骤的角度来讲,主要分为四大部分
1.EnvironmentFlink Job在提交执行计算时,需要首先建立和Flink框架之间的联系,也就指的是当前的flink运行环境,只有获取了环境信息,才能将task调度到不同的taskManager执行。而这个环境对象的获取方式相对比较简单
2.SourceFlink框架可以从不同的来源获取数据,将数据提交给框架进行处理, 我们将获取数据的来源称之为数据源.
3.Transform算子对数据进行操作后面我会详细介绍
4.Sink在Flink中所谓的Sink其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出操作
之前我们一直在使用的print方法其实就是一种Sink。
简单书写一个flink的应用工程
这里新建一个maven工程项目myflink刚开始学习的话我现在本地window下玩flink flink有多种写法 可以用java书写 也可以用scala书写 相比较java,scala书写对于flink来说更符合
//flink包org.apache.flink flink-scala_2.111.7.2 //flink streaming包 org.apache.flink flink-streaming-scala_2.111.7.2 //flink 连接hadoop包 //flink 连接kafka包org.apache.flink flink-shaded-hadoop-2-uber2.7.5-9.0 org.apache.flink flink-connector-kafka_2.111.10.1
import org.apache.flink.streaming.api.scala._
case class WaterSensor(id:String,ts:Long,high:Double)
object MyExp1{
def main(args: Array[String]): Unit = {
// 流式数据处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//这个数组是自己造的先简单的查看一下
// val ds = env.fromCollection(Seq(
// WaterSensor("ws_001", 1577844001, 45.0),
// WaterSensor("ws_002", 1577844015, 43.0),
// WaterSensor("ws_003", 1577844020, 42.0)
// ))
//
//读hdfs上的文件 如果是本地的修改地址即可 注意要导入连接hdfs的包 不然会报错
val ds = env.readTextFile("hdfs://192.168.80.181:9000/data/userbehavior/UserBehavior.csv")
ds.print()
env.execute("one")
}
}
文件就全部读取到了
第二步 连接kafka上读取文件
启动kafka kafka-server-start.sh /opt/soft/kafka200/config/server.properties
创建一个mydemo消费者
kafka-topics.sh --zookeeper 192.168.80.181:2181 --create --topic demo --partitions 1 --replication-factor 1
开启消费
kafka-console-producer.sh --broker-list 192.168.80.181:9092 --topic demo
开启监听
kafka-console-consumer.sh --bootstrap-server 192.168.80.181:9092 --topic demo --from-beginning
新建一个myexp2
输入代码
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka._
import org.apache.kafka.clients.consumer.ConsumerConfig
object MyExp2 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val prop=new Properties()
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.80.181:9092")
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer")
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer")
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest")
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"cm")
val kafka:DataStream[String] = env.addSource(
new FlinkKafkaConsumer[String](
"demo", new SimpleStringSchema(), prop)
)
kafka.print()
env.execute("sensor")
}
}
3.自定义数据源
import java.util.Date
import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.util.Random
class MySource extends SourceFunction[String]{
var flag=true
override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
while (flag){
ctx.collect(Random.nextInt(100)+":"+new Date().getTime+":"+"hehe")
Thread.sleep(1000)
}
}
override def cancel(): Unit ={
flag=false
}
}
import org.apache.flink.streaming.api.scala._
object MyExp3 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 这里写的是死循环岁哟会一直走 如果你只跑了10条或者不会一直循环的话 多加一行参数 给定一个固定的核数调用 取决于你的电脑核数
// env.setParallelism(1)
val ds = env.addSource(
new MySource
)
ds.print()
env.execute("one")
}
}
3.map
package com.hc.myflinkone.exp
import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api._
import org.apache.flink.streaming.api.scala._
object MyExp5 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val cfg=new Configuration()
cfg.setString("testParam","ws_001:50,ws_002:60,ws_003:70")
env.getConfig.setGlobalJobParameters(cfg)
val ds = env.fromElements(
WaterSensor("ws_001", 1577844001, 45.0),
WaterSensor("ws_002", 1577844015, 43.0),
WaterSensor("ws_003", 1577844020, 42.0)
)
ds.map(new MyRich()).print()
env.execute()
}
class MyRich extends RichMapFunction[WaterSensor, String] {
var paramMap=Map[String,Double]()
override def map(in: WaterSensor): String = {
val hi =paramMap.get(in.id).getOrElse(0.0)
in.id+":"+in.ts.toString+":"+(hi+in.high).toString
}
override def open(parameters: Configuration): Unit = {
var conf=getRuntimeContext.getExecutionConfig
.getGlobalJobParameters.asInstanceOf[Configuration]
val info = conf.getString("testParam", "")
.split(",")
info.map(line=>{
var ps = line.split(":")
paramMap +=(ps(0) -> ps(1).toDouble)
})
}
}
}
4.连接redis
package com.hc.myflinkone.exp
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import redis.clients.jedis.Jedis
//连接redis
case class Message(id:String,acttype:String,ts:Long)
case class Users(id:String,name:String,age:Int,gender:String,act:String,ts:Long)
object MyExp6 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val ds=env.fromCollection(Seq(
Message("1","buy",1521357896),
Message("2","collect",1521357896),
Message("3","browse",1521357896),
Message("4","order",1521357896),
Message("5","borwse",1521357896)
))
ds.map(new MyRich).print()
env.execute("mytrans")
}
}
class MyRich extends RichMapFunction[Message,Users] {
var jedis:Jedis =null
override def map(in: Message): Users ={
var lst=jedis.hmget("users",in.id)
if(lst.get(0)==null){
Users(in.id,"",0,"",in.acttype,in.ts)
}else{
var str =lst.get(0).split(",")
Users(in.id,str(0),str(1).toInt,str(2),in.acttype,in.ts)
}
}
override def open(parameters: Configuration): Unit = {
jedis=new Jedis("192.168.80.181",6379)
}
}
5.连接mongodb
package com.hc.myflinkone.exp
import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
object MyExp7 {
def main(args: Array[String]): Unit = {
val mc = MongoClient("192.168.80.181", 27017)
val db=mc("mydemo")//加载数据库
val tab = db("userinfos")
val where=MongoDBObject("name" -> "zhangsan")
val rdd=tab.map(x=>{
(x.get("id"),x.get("name"),x.get("age"))
}).foreach(println)
// tab.filter(_.get("name").toString.startsWith("zhang")).foreach(println)
// val row =MongoDBObject("id"->"4","name"->"zhaoyun","age"->1300)
// tab.insert(row)
}
}
6.flink做词频统计
package com.hc.myflinkone.exp
import org.apache.flink.streaming.api.scala._
object MyExp8 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val ds = env.fromCollection(Seq(
"hello world", "hello hadoop", "hello flink"
))
env.setParallelism(2)
ds.flatMap(x=>{
val info=x.split(" ")
info.map(word=>(word,1))
}).keyBy(0).reduce((x,y)=>(x._1,x._2+y._2))
.shuffle.print()
env.execute()
}
}
7.flink 检测机器温度报警(简单书写)
package com.hc.myflinkone.exp
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig
object MyExp9 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val prop=new Properties()
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.80.181:9092")
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer")
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer")
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest")
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"cm")
val ds = env.addSource(
new FlinkKafkaConsumer[String](
"mac",
new SimpleStringSchema(),
prop
)
)
//1234,4,153264221,34.5 --机器号,设备部位,时间,温度
val spstream = ds.split(line => {
val info = line.split(",", -1)
if (info(3).toDouble >= 50) {
Seq("warning")
} else {
Seq("normal")
}
})
spstream.select("warning").print()
env.execute("sensor")
}
}
8.flink 连接kafka读取数据 对比数据到mongo数据库中查询报警
package com.hc.myflinkone.exp
import java.util.Properties
import com.mongodb.casbah.imports.MongoClient
import com.mongodb.casbah.MongoCollection
import com.mongodb.casbah.commons.MongoDBObject
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig
case class MacTemplate(id:String,macname:String,
partid:String,partname:String,
maxtemp:Double,cTemp:Double,cTime:Long)
object MyExp10 {
def main(args: Array[String]): Unit = {
val env=StreamExecutionEnvironment.getExecutionEnvironment
val prop=new Properties()
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.80.181:9092")
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer")
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer")
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest")
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"cm")
val ds:DataStream[String] = env.addSource(
new FlinkKafkaConsumer[String](
"mac",
new SimpleStringSchema(),
prop
)
)
ds.map(new xptemp).split(x=>{
if (x.cTemp >= x.maxtemp) {
Seq("warning")
} else {
Seq("normal")
}
}).select("warning").print()
env.execute()
}
}
class xptemp extends RichMapFunction[String,MacTemplate]{
var tab:MongoCollection=_
override def map(in: String): MacTemplate = {
val info = in.split(",")
val mo = MongoDBObject("id" -> info(0),"partid"->info(1))
val ct =tab.findOne(mo)
MacTemplate(
ct.get.get("id").toString,
ct.get.get("macname").toString,
ct.get.get("partid").toString,
ct.get.get("partname").toString,
ct.get.get("maxtemp").toString.toDouble,
info(3).toDouble,
info(2).toLong
)
}
override def open(parameters: Configuration): Unit = {
val mc=MongoClient("192.168.80.181",27017)
val database = mc("mac")
tab = database("mac_temp")
}
}



