挂起函数可以异步返回单个值,那如何异步多次返回多个值呢?
使用flow,flow的特点:
flow{…}块中的代码可以挂起使用flow,suspend修饰符可以省略流使用emit函数发射值流使用collect的函数收集值flow类似冷流,flow中代码直到流被收集(调用collect)的时候才运行,类似lazy,什么时候用,什么时候执行。流的连续性:流收集都是按顺序收集的flowOn可更改流发射的上下文,即可以指定在主线程或子线程中执行
//使用flow,suspend修饰符可以省略
fun doflow() = flow {
for (i in 1..5) {
//这里是挂起,不是阻塞
delay(500)
emit(i)
}
}.flowOn(Dispatchers.IO)
//调用
runBlocking {
doflow().collect {
log("value=$it")
}
}
打印(多次返回多个值)
com.z.zjetpack V/zx: value=1
com.z.zjetpack V/zx: value=2
com.z.zjetpack V/zx: value=3
com.z.zjetpack V/zx: value=4
com.z.zjetpack V/zx: value=5
flow的应用场景
文件下载场景
//正在下载(文件总大小为5)
fun doflow() = flow {
for (i in 1..5) {
delay(500)
emit(i.toDouble())
}
//flowOn来指定在IO线程中下载
}.flowOn(Dispatchers.IO)
//读取进度
runBlocking {
doflow().collect {
log("当前下载=${it / 5 * 100}%")
}
}
打印:
com.z.zjetpack V/zx: 当前下载=20.0%
com.z.zjetpack V/zx: 当前下载=40.0%
com.z.zjetpack V/zx: 当前下载=60.0%
com.z.zjetpack V/zx: 当前下载=80.0%
com.z.zjetpack V/zx: 当前下载=100.0%
流构建器
flowof 和asflow
runBlocking {
flowOf(1, 2, 3)
.onEach { delay(500) }
.collect {
log("value = $it")
}
(5..8).asFlow()
.onEach { delay(500) }
.collect {
log("value = $it")
}
}
使用launchin替换collect在单独的协程中启动收集流。
fun event() = (1..3)
.asFlow()
.onEach {
delay(500)
}.flowOn(Dispatchers.IO)
//调用
runBlocking {
val job = event().onEach {
log("value = $it")
}.launchIn(CoroutineScope(Dispatchers.IO))
//主线程可用this
//.launchIn(this)
job.join()
}
流的取消
超时的时候取消
fun cancelFlow() = flow{ for (i in 1..5) { delay(1000) emit(i) } } //调用 runBlocking { //超时的时候取消流 withTimeoutOrNull(2500) { cancelFlow().collect { log("value = $it") } } } 打印:在2.5秒的时候超时了,取消了 com.z.zjetpack V/zx: value = 1 com.z.zjetpack V/zx: value = 2
直接取消
runBlocking {
cancelFlow().collect {
log("value = $it")
if(it == 3){
cancel()
}
}
}
繁忙的任务是不能直接取消的,需要检测取消(cancellable)
runBlocking {
(1..5).asFlow().cancellable().collect {
if(it == 3) {
cancel()
}
}
}
背压:生产者效率 > 消费者效率
使用缓冲和flowon来处理背压
buffer():并发运行流中发射元素的代码
conflate():合并发射项,不对每个值处理
collectLatest():取消并重新发送最后一个值
模拟背压代码:
fun preFlow() = flow操作符{ for (i in 1..5) { delay(100) emit(i) log("发送$i") } } //调用 //100ms发送一次,300ms接收一次就产生了背压 runBlocking { val time = measureTimeMillis { preFlow() //buffer可以增加缓冲,提高效率 //.buffer(100) //flowOn自带缓冲功能 //.flowOn(Dispatchers.IO) //conflate不对每个值处理 //.conflate() //.collect //取消并重新发送最后一个值 .collectLatest { delay(300) log("接收到:$it") } } log("总耗时 $time") } 打印: com.z.zjetpack V/zx: 接收到:1 com.z.zjetpack V/zx: 发送1 com.z.zjetpack V/zx: 接收到:2 com.z.zjetpack V/zx: 发送2 com.z.zjetpack V/zx: 接收到:3 com.z.zjetpack V/zx: 发送3 com.z.zjetpack V/zx: 接收到:4 com.z.zjetpack V/zx: 发送4 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 发送5 com.z.zjetpack V/zx: 总耗时 2033 使用buffer后 com.z.zjetpack V/zx: 发送1 com.z.zjetpack V/zx: 发送2 com.z.zjetpack V/zx: 发送3 com.z.zjetpack V/zx: 接收到:1 com.z.zjetpack V/zx: 发送4 com.z.zjetpack V/zx: 发送5 com.z.zjetpack V/zx: 接收到:2 com.z.zjetpack V/zx: 接收到:3 com.z.zjetpack V/zx: 接收到:4 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 总耗时 1634 使用flowOn后 com.z.zjetpack V/zx: 发送1 com.z.zjetpack V/zx: 发送2 com.z.zjetpack V/zx: 发送3 com.z.zjetpack V/zx: 接收到:1 com.z.zjetpack V/zx: 发送4 com.z.zjetpack V/zx: 发送5 com.z.zjetpack V/zx: 接收到:2 com.z.zjetpack V/zx: 接收到:3 com.z.zjetpack V/zx: 接收到:4 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 总耗时 1639 使用conflate后 com.z.zjetpack V/zx: 发送1 com.z.zjetpack V/zx: 发送2 com.z.zjetpack V/zx: 发送3 com.z.zjetpack V/zx: 接收到:1 com.z.zjetpack V/zx: 发送4 com.z.zjetpack V/zx: 发送5 com.z.zjetpack V/zx: 接收到:3 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 总耗时 1034 使用collectLatest后 com.z.zjetpack V/zx: 发送1 com.z.zjetpack V/zx: 发送2 com.z.zjetpack V/zx: 发送3 com.z.zjetpack V/zx: 发送4 com.z.zjetpack V/zx: 发送5 com.z.zjetpack V/zx: 接收到:5 com.z.zjetpack V/zx: 总耗时 843
转换操作符:map ,transform
限长操作符:取指定数量,take
末端操作符:末端操作符用于启动流收集的挂起函数,collect,tolist,toset,reduce,fold
组合操作符:zip
展平操作符:flatMapConcat(连接),flatMapMerge(合并),flatMapLatest(最新)
suspend fun perRequest(req: Int): String {
delay(1000)
return "转换 $req"
}
runBlocking {
(1..3).asFlow().map {
perRequest(it)
}.collect {
log(it)
}
}
打印:
com.z.zjetpack V/zx: 转换 1
com.z.zjetpack V/zx: 转换 2
com.z.zjetpack V/zx: 转换 3
transform
runBlocking {
(5..6).asFlow().transform {
emit("s $it")
emit(perRequest(it))
emit("e $it")
}
//.take(4)
.collect {
log(it)
}
}
打印:
com.z.zjetpack V/zx: s 5
com.z.zjetpack V/zx: 转换 5
com.z.zjetpack V/zx: e 5
com.z.zjetpack V/zx: s 6
com.z.zjetpack V/zx: 转换 6
com.z.zjetpack V/zx: e 6
take
加上take之后 com.z.zjetpack V/zx: s 5 com.z.zjetpack V/zx: 转换 5 com.z.zjetpack V/zx: e 5 com.z.zjetpack V/zx: s 6末端操作符:collect,tolist,toset,reduce,fold
runBlocking {
val sum = (1..5).asFlow().map { it * it }.reduce { a, b -> a + b }
log("sum = $sum")
val nList = (1..5).asFlow().toList()
log("nList = $nList")
val nSet = listOf(1, 2, 2, 3, 3, 5).asFlow().toSet()
log("nSet = $nSet")
}
打印:
com.z.zjetpack V/zx: sum = 55
com.z.zjetpack V/zx: nList = [1, 2, 3, 4, 5]
com.z.zjetpack V/zx: nSet = [1, 2, 3, 5]
展平操作符
只使用map的时候
//返回值是一个flow
fun reqFlow(i: Int) = flow {
emit("start $i")
delay(500)
emit("end $i")
}
runBlocking {
(0..1).asFlow().map {
reqFlow(it)
}.collect {
log("首次collect = $it")
it.collect {
log("二次 = $it")
}
}
}
打印:由于返回是flow所以需要collect 两次才能拿到值,Flow>
com.z.zjetpack V/zx: 首次collect = kotlinx.coroutines.flow.SafeFlow@63db1bf
com.z.zjetpack V/zx: 二次 = start 0
com.z.zjetpack V/zx: 二次 = end 0
com.z.zjetpack V/zx: 首次collect = kotlinx.coroutines.flow.SafeFlow@d27108c
com.z.zjetpack V/zx: 二次 = start 1
com.z.zjetpack V/zx: 二次 = end 1
flatMapConcat
runBlocking {
(0..1).asFlow().flatMapConcat {
reqFlow(it)
}.collect {
log("首次collect = $it")
}
}
打印:直接展开了
com.z.zjetpack V/zx: 首次collect = start 0
com.z.zjetpack V/zx: 首次collect = end 0
com.z.zjetpack V/zx: 首次collect = start 1
com.z.zjetpack V/zx: 首次collect = end 1
runBlocking {
(0..1).asFlow().flatMapMerge {
reqFlow(it)
}.collect {
log("首次collect = $it")
}
}
打印:
com.z.zjetpack V/zx: 首次collect = start 0
com.z.zjetpack V/zx: 首次collect = start 1
com.z.zjetpack V/zx: 首次collect = end 0
com.z.zjetpack V/zx: 首次collect = end 1
flatMapLatest
runBlocking {
(0..1).asFlow().flatMapLatest {
reqFlow(it)
}.collect {
log("首次collect = $it")
}
}
打印:
com.z.zjetpack V/zx: 首次collect = start 0
com.z.zjetpack V/zx: 首次collect = start 1
com.z.zjetpack V/zx: 首次collect = end 1
流的异常处理
catch函数 和 try catch
flow {
emit(1)
throw NullPointerException()
//catch函数只捕获上游的异常
}.catch {
log("exception $it")
//在异常后恢复
emit(20)
}.flowOn(Dispatchers.IO)
.collect {
log("msg $it")
}
打印:
com.z.zjetpack V/zx: exception java.lang.NullPointerException
com.z.zjetpack V/zx: msg 1
com.z.zjetpack V/zx: msg 20
//不建议通过这种方式捕获上游的异常,违反了flow原则,这种适合捕获下游的异常
try {
(1..3).asFlow().collect {
check(it > 2) {
"ex $it"
}
}
} catch (e: Exception) {
log("异常 $e")
}
打印:
com.z.zjetpack V/zx: 异常 java.lang.IllegalStateException: ex 1
流的完成
finally 和 onCompletion
try {
(1..3).asFlow().collect {
check(it > 2) {
"ex $it"
}
}
} catch (e: Exception) {
log("异常 $e")
} finally {
log("流已完成")
}
//发生异常onCompletion可以拿到异常信息,但不会捕获
try {
(1..3).asFlow().onCompletion {
log("onCompletion $it")
}.collect {
check(it > 2) {
"ex $it"
}
}
} catch (e: Exception) {
log("异常 $e")
}
打印:
com.z.zjetpack V/zx: 异常 java.lang.IllegalStateException: ex 1
com.z.zjetpack V/zx: 流已完成
com.z.zjetpack V/zx: onCompletion java.lang.IllegalStateException: ex 1
com.z.zjetpack V/zx: 异常 java.lang.IllegalStateException: ex 1
完



