栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink的sink

flink的sink

目录

file sink

KafkaSink

RedisSink


file sink

首先是将数据写入到文件中

import org.apache.flink.streaming.api.scala._

object FileSink {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //这里设置并行度为1,如果不设置会生成多个分区的文件
    env.setParallelism(1)
    
    val flieDS = env.readTextFile("data\students.txt")

    val clazzDS = flieDS.map(_.split(",")(4))
      .map((_, 1))
      .keyBy(_._1)
      .reduce((x, y) => (x._1, x._2 + y._2))

    clazzDS.writeAsCsv("data\out.txt")

    clazzDS.print()

    env.execute()
    }
}


不过现在writeAsCsv等方法已经标记过时

那么我们要学习比较麻烦的写法,通过addSink方法实现写入文件

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._

object FileSink {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    val flieDS = env.readTextFile("data\students.txt")

    val stuDS: DataStream[Student] = flieDS.map(line => {
      val stuarr: Array[String] = line.split(",")
      Student(stuarr(0), stuarr(1), stuarr(2).toInt, stuarr(3), stuarr(4))
    })

    //writeAsCsv已经过时
    // clazzDS.writeAsCsv("data\out.txt")

    //forRowFormat,普通文件,按行存储即可
    stuDS.addSink(StreamingFileSink.forRowFormat(
      new Path("data\out.txt"),
      new SimpleStringEncoder[Student]()
    ).build()
    )

    flieDS.print()

    env.execute()
    }
}
case class Student(id:String,name:String,age:Int,gender:String,clazz:String)

 输出结果:

KafkaSink

导入依赖


    org.apache.flink
    flink-connector-kafka_2.11
    1.11.2
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer

object KafkaSink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    val flieDS = env.readTextFile("data\students.txt")

    val stuDS: DataStream[String] = flieDS.map(line => {
      val stuarr: Array[String] = line.split(",")
      Student(stuarr(0), stuarr(1), stuarr(2).toInt, stuarr(3), stuarr(4)).toString
    })

    //如果还是使用样例类,考虑序列化非常麻烦,还是toString后,使用String类型比较方便
    stuDS.addSink(new FlinkKafkaProducer[String]("doker:9092",
      "test_topic1",
      new SimpleStringSchema()
    ))

    env.execute()

  }
}

首先启动zookeeper和kafka

kafka-server-start.sh -daemon  /usr/local/soft/kafka_2.11-1.0.0/config/server.properties

首先我们创建topic,我使用的是伪分布式

kafka-topics.sh --create --zookeeper doker:2181 --replication-factor 1 --partitions 3 --topic test_topic1

创建消费者

 kafka-console-producer.sh --broker-list doker:9092 --topic test_topic1

启动flink代码 

RedisSink

导入依赖


    org.apache.bahir
    flink-connector-redis_2.11
    1.0

flinkJedisConfigbase:传入Jedis连接

RedisMapper :定义写入redis的数据和命令

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
object RedisSinkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    val flieDS = env.readTextFile("data\students.txt")

    val stuDS: DataStream[Student] = flieDS.map(line => {
      val stuarr: Array[String] = line.split(",")
      Student(stuarr(0), stuarr(1), stuarr(2).toInt, stuarr(3), stuarr(4))
    })

    //定义flinkJedisConfigbase
    val conf= new FlinkJedisPoolConfig.Builder()
        .setHost("doker")
        .setPort(6379)
        .build()

    
    //传入配置文件(redis连接),mapper(写入reids的命令,写入的数据)
    stuDS.addSink(new RedisSink[Student](conf,new MyRedisMapper))

    env.execute()
  }
}
class MyRedisMapper extends RedisMapper[Student]{
  //定义写入redis的命令,HSET 表名 key value
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET,"student_message")
  }
  //将
  override def getKeyFromData(data: Student): String = data.id

  override def getValueFromData(data: Student): String = data.name
}

成功写入redis中: 

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/761701.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号