# This affects logging for both user code and Flink
log4j.rootLogger=INFO, console
# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
1.4、添加Scala支持
在main文件下新建一个scala目录,并设置为source root
添加scala支持
二、案例:Flink单词计算本地实战
代码:
package demo
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object WindowWordCount {
def main(args: Array[String]): Unit = {
val host = "node110"
val port = 9999
val windowSeconds = 5
//get env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//create data source
val source = env.socketTextStream(host, port)
val counts = source
.flatMap { line => line.toLowerCase.split("\W+").filter(word => word.nonEmpty) }
.map { word => (word, 1) }
.keyBy(0)
.timeWindow(Time.seconds(windowSeconds))
.sum(1)
//add sink
counts.print()
//execute
env.execute("Window Stream Word Count with paremters")
}
}
测试: linux执行命令:nc -lk 9999
三、Flink集群运行实战
3.1、代码修改
if(args.length != 3){
println("Usage: WindowWordCount ")
System.exit(1)
}
val host = args(0)
val port = args(1).toInt
val windowSeconds = args(2).toInt
3.2、程序打包 & 上传
3.3、运行
先开启一个会话(注意先开启,否则执行会出现一堆东西)
再开一个会话
输入数据:
网页查看:
四、Dataset API实现(单词计数)
数据文件 input.txt:
I am a student
I love the world
代码:
package demo
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironment
object WordCount {
def main(args: Array[String]): Unit = {
//get env
val env = ExecutionEnvironment.getExecutionEnvironment
//create data source
val source = env.readTextFile("D:\java test\flink_test\src\main\resources\input.txt")
val counts = source
.flatMap { line => line.toLowerCase.split("\W+").filter(word => word.nonEmpty) }
.map { word => (word, 1) }
.groupBy(0)
.sum(1)
//add sink
counts.writeAsText("D:\java test\flink_test\target\output00")
//execute
env.execute("Batch Word Count")
}
}