栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Kotlin 之 协程(三)Flow异步流

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Kotlin 之 协程(三)Flow异步流

flow介绍

挂起函数可以异步返回单个值,那如何异步多次返回多个值呢?
使用flow,flow的特点:

flow{…}块中的代码可以挂起使用flow,suspend修饰符可以省略流使用emit函数发射值流使用collect的函数收集值flow类似冷流,flow中代码直到流被收集(调用collect)的时候才运行,类似lazy,什么时候用,什么时候执行。流的连续性:流收集都是按顺序收集的flowOn可更改流发射的上下文,即可以指定在主线程或子线程中执行

   //使用flow,suspend修饰符可以省略
    fun doflow() = flow {
        for (i in 1..5) {
            //这里是挂起,不是阻塞
            delay(500)
            emit(i)
        }
    }.flowOn(Dispatchers.IO)
//调用
  runBlocking {
            doflow().collect {
                log("value=$it")
            }
        }
打印(多次返回多个值)
com.z.zjetpack V/zx: value=1
com.z.zjetpack V/zx: value=2
com.z.zjetpack V/zx: value=3
com.z.zjetpack V/zx: value=4
com.z.zjetpack V/zx: value=5
flow的应用场景

文件下载场景

    //正在下载(文件总大小为5)
     fun doflow() = flow {
        for (i in 1..5) {
            delay(500)
            emit(i.toDouble())
        }
     //flowOn来指定在IO线程中下载
    }.flowOn(Dispatchers.IO)
//读取进度
 runBlocking {
            doflow().collect {
                log("当前下载=${it / 5 * 100}%")
            }
        }

打印:
com.z.zjetpack V/zx: 当前下载=20.0%
com.z.zjetpack V/zx: 当前下载=40.0%
com.z.zjetpack V/zx: 当前下载=60.0%
com.z.zjetpack V/zx: 当前下载=80.0%
com.z.zjetpack V/zx: 当前下载=100.0%
流构建器

flowof 和asflow

 runBlocking {
            flowOf(1, 2, 3)
                .onEach { delay(500) }
                .collect {
                    log("value = $it")
                }



            (5..8).asFlow()
                .onEach { delay(500) }
                .collect {
                    log("value = $it")
                }
        }

使用launchin替换collect在单独的协程中启动收集流。

        fun event() = (1..3)
        .asFlow()
        .onEach {
            delay(500)
        }.flowOn(Dispatchers.IO)
        
//调用

        runBlocking {
            val job =   event().onEach {
                log("value = $it")
            }.launchIn(CoroutineScope(Dispatchers.IO))
            //主线程可用this
            //.launchIn(this)

            job.join()
        }
流的取消

超时的时候取消

    fun cancelFlow() = flow {
        for (i in 1..5) {
            delay(1000)
            emit(i)
        }
    }

//调用
        runBlocking {
            //超时的时候取消流
            withTimeoutOrNull(2500) {
                cancelFlow().collect {
                    log("value = $it")
                }
            }
        }

打印:在2.5秒的时候超时了,取消了
com.z.zjetpack V/zx: value = 1
com.z.zjetpack V/zx: value = 2

直接取消

        runBlocking {
                cancelFlow().collect {
                    log("value = $it")
                    if(it == 3){
                        cancel()
                    }

                }
        }

繁忙的任务是不能直接取消的,需要检测取消(cancellable)

        runBlocking {
                (1..5).asFlow().cancellable().collect {
                    if(it == 3) {
                        cancel()
                    }
                }
        }

背压:生产者效率 > 消费者效率

使用缓冲和flowon来处理背压

buffer():并发运行流中发射元素的代码
conflate():合并发射项,不对每个值处理
collectLatest():取消并重新发送最后一个值

模拟背压代码:

    fun preFlow() = flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
            log("发送$i")
        }
    }

//调用
        //100ms发送一次,300ms接收一次就产生了背压
        runBlocking {
            val time = measureTimeMillis {
                preFlow()
                    //buffer可以增加缓冲,提高效率
                    //.buffer(100)
                    //flowOn自带缓冲功能
                    //.flowOn(Dispatchers.IO)
                    //conflate不对每个值处理
                    //.conflate()
                    //.collect
                    //取消并重新发送最后一个值
                    .collectLatest {
                        delay(300)
                        log("接收到:$it")
                    }
            }
            log("总耗时 $time")

        }
打印:
com.z.zjetpack V/zx: 接收到:1
com.z.zjetpack V/zx: 发送1
com.z.zjetpack V/zx: 接收到:2
com.z.zjetpack V/zx: 发送2
com.z.zjetpack V/zx: 接收到:3
com.z.zjetpack V/zx: 发送3
com.z.zjetpack V/zx: 接收到:4
com.z.zjetpack V/zx: 发送4
com.z.zjetpack V/zx: 接收到:5
com.z.zjetpack V/zx: 发送5
com.z.zjetpack V/zx: 总耗时 2033

使用buffer后
com.z.zjetpack V/zx: 发送1
com.z.zjetpack V/zx: 发送2
com.z.zjetpack V/zx: 发送3
com.z.zjetpack V/zx: 接收到:1
com.z.zjetpack V/zx: 发送4
com.z.zjetpack V/zx: 发送5
com.z.zjetpack V/zx: 接收到:2
com.z.zjetpack V/zx: 接收到:3
com.z.zjetpack V/zx: 接收到:4
com.z.zjetpack V/zx: 接收到:5
com.z.zjetpack V/zx: 总耗时 1634

使用flowOn后
com.z.zjetpack V/zx: 发送1
com.z.zjetpack V/zx: 发送2
com.z.zjetpack V/zx: 发送3
com.z.zjetpack V/zx: 接收到:1
com.z.zjetpack V/zx: 发送4
com.z.zjetpack V/zx: 发送5
com.z.zjetpack V/zx: 接收到:2
com.z.zjetpack V/zx: 接收到:3
com.z.zjetpack V/zx: 接收到:4
com.z.zjetpack V/zx: 接收到:5
com.z.zjetpack V/zx: 总耗时 1639

使用conflate后
com.z.zjetpack V/zx: 发送1
com.z.zjetpack V/zx: 发送2
com.z.zjetpack V/zx: 发送3
com.z.zjetpack V/zx: 接收到:1
com.z.zjetpack V/zx: 发送4
com.z.zjetpack V/zx: 发送5
com.z.zjetpack V/zx: 接收到:3
com.z.zjetpack V/zx: 接收到:5
com.z.zjetpack V/zx: 总耗时 1034

使用collectLatest后
com.z.zjetpack V/zx: 发送1
com.z.zjetpack V/zx: 发送2
com.z.zjetpack V/zx: 发送3
com.z.zjetpack V/zx: 发送4
com.z.zjetpack V/zx: 发送5
com.z.zjetpack V/zx: 接收到:5
com.z.zjetpack V/zx: 总耗时 843
操作符

转换操作符:map ,transform
限长操作符:取指定数量,take
末端操作符:末端操作符用于启动流收集的挂起函数,collect,tolist,toset,reduce,fold
组合操作符:zip
展平操作符:flatMapConcat(连接),flatMapMerge(合并),flatMapLatest(最新)

map
    suspend fun perRequest(req: Int): String {
        delay(1000)
        return "转换 $req"
    }

        runBlocking {
            (1..3).asFlow().map {
                perRequest(it)
            }.collect {
                log(it)
            }
      }
打印:
com.z.zjetpack V/zx: 转换 1
com.z.zjetpack V/zx: 转换 2
com.z.zjetpack V/zx: 转换 3
transform
 runBlocking {
(5..6).asFlow().transform {
                emit("s $it")
                emit(perRequest(it))
                emit("e $it")
            }
                //.take(4)
                .collect {
                    log(it)
                }
      }
打印:
com.z.zjetpack V/zx: s 5
com.z.zjetpack V/zx: 转换 5
com.z.zjetpack V/zx: e 5
com.z.zjetpack V/zx: s 6
com.z.zjetpack V/zx: 转换 6
com.z.zjetpack V/zx: e 6
take
加上take之后
com.z.zjetpack V/zx: s 5
com.z.zjetpack V/zx: 转换 5
com.z.zjetpack V/zx: e 5
com.z.zjetpack V/zx: s 6
末端操作符:collect,tolist,toset,reduce,fold
 runBlocking {
            val sum = (1..5).asFlow().map { it * it }.reduce { a, b -> a + b }
            log("sum = $sum")
            val nList = (1..5).asFlow().toList()
            log("nList = $nList")
            val nSet = listOf(1, 2, 2, 3, 3, 5).asFlow().toSet()
            log("nSet = $nSet")
       }
打印:
com.z.zjetpack V/zx: sum = 55
com.z.zjetpack V/zx: nList = [1, 2, 3, 4, 5]
com.z.zjetpack V/zx: nSet = [1, 2, 3, 5]
展平操作符

只使用map的时候

	//返回值是一个flow
    fun reqFlow(i: Int) = flow {
        emit("start $i")
        delay(500)
        emit("end $i")
    }
    
 runBlocking {
     (0..1).asFlow().map {
                reqFlow(it)
            }.collect {
                log("首次collect = $it")
                it.collect {
                    log("二次 = $it")
                }
            }
   }
打印:由于返回是flow所以需要collect 两次才能拿到值,Flow>
com.z.zjetpack V/zx: 首次collect = kotlinx.coroutines.flow.SafeFlow@63db1bf
com.z.zjetpack V/zx: 二次 = start 0
com.z.zjetpack V/zx: 二次 = end 0
com.z.zjetpack V/zx: 首次collect = kotlinx.coroutines.flow.SafeFlow@d27108c
com.z.zjetpack V/zx: 二次 = start 1
com.z.zjetpack V/zx: 二次 = end 1
flatMapConcat
 runBlocking {
    (0..1).asFlow().flatMapConcat {
                reqFlow(it)
            }.collect {
                log("首次collect = $it")
            }
      }
打印:直接展开了
com.z.zjetpack V/zx: 首次collect = start 0
com.z.zjetpack V/zx: 首次collect = end 0
com.z.zjetpack V/zx: 首次collect = start 1
com.z.zjetpack V/zx: 首次collect = end 1
 runBlocking {
  (0..1).asFlow().flatMapMerge {
                reqFlow(it)
            }.collect {
                log("首次collect = $it")
            }
      }
打印:
com.z.zjetpack V/zx: 首次collect = start 0
com.z.zjetpack V/zx: 首次collect = start 1
com.z.zjetpack V/zx: 首次collect = end 0
com.z.zjetpack V/zx: 首次collect = end 1
flatMapLatest
 runBlocking {
            (0..1).asFlow().flatMapLatest {
                reqFlow(it)
            }.collect {
                log("首次collect = $it")
            }
    }

打印:
com.z.zjetpack V/zx: 首次collect = start 0
com.z.zjetpack V/zx: 首次collect = start 1
com.z.zjetpack V/zx: 首次collect = end 1
流的异常处理

catch函数 和 try catch

            flow {
                emit(1)
                throw NullPointerException()
                //catch函数只捕获上游的异常
            }.catch {
                log("exception $it")
                //在异常后恢复
                emit(20)
            }.flowOn(Dispatchers.IO)
                .collect {
                    log("msg $it")
                }
打印:
com.z.zjetpack V/zx: exception java.lang.NullPointerException
com.z.zjetpack V/zx: msg 1
com.z.zjetpack V/zx: msg 20
            //不建议通过这种方式捕获上游的异常,违反了flow原则,这种适合捕获下游的异常
            try {
                (1..3).asFlow().collect {
                    check(it > 2) {
                        "ex $it"
                    }
                }
            } catch (e: Exception) {
                log("异常 $e")
            }
打印:
com.z.zjetpack V/zx: 异常 java.lang.IllegalStateException: ex 1
流的完成

finally 和 onCompletion

try {
                (1..3).asFlow().collect {
                    check(it > 2) {
                        "ex $it"
                    }
                }
            } catch (e: Exception) {
                log("异常 $e")
            } finally {
                log("流已完成")
            }

            //发生异常onCompletion可以拿到异常信息,但不会捕获
            try {
                (1..3).asFlow().onCompletion {
                    log("onCompletion $it")
                }.collect {
                    check(it > 2) {
                        "ex $it"
                    }
                }
            } catch (e: Exception) {
                log("异常 $e")
            }
打印:
com.z.zjetpack V/zx: 异常 java.lang.IllegalStateException: ex 1
com.z.zjetpack V/zx: 流已完成
com.z.zjetpack V/zx: onCompletion java.lang.IllegalStateException: ex 1
com.z.zjetpack V/zx: 异常 java.lang.IllegalStateException: ex 1

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/726597.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号