栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

kotlin之冷流转热流

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

kotlin之冷流转热流

stateflow和sharedflow以及channel都属于热流,但是如何将一个cold flow转成hotflow? kotlin提供了一个shareIn和stateIn的方法,看一下方法的定义:

public fun  Flow.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow
public fun  Flow.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow 

shareIn方法将流转换为SharedFlow

stateIn方法将流转换为StateFLow

参数说明:

scope:flow需要在给定的conrutinue里面进行

started:开始流的方式,这里有三种:

​ 1.SharingStarted.Eagerly: 无论当前有没有订阅者,流都会开始,订阅者只能接收到replay个缓冲区的值

​ 2.SharingStarted.Lazily:当有第一个订阅者时,流才会开始,后面的订阅者只能接收到replay个缓冲区的值,当没有订阅者时流还是活跃的

​ 3.SharingStarted.WhileSubscribed

定义如下:

public fun WhileSubscribed(
    stopTimeoutMillis: Long = 0,
    replayExpirationMillis: Long = Long.MAX_VALUE
)

这个参数的功能类似livedata的生命周期感知的功能。

源码分析:

我们主要分析shareIn方法:

public fun  Flow.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow {
    val config = configureSharing(replay)
    val shared = MutableSharedFlow(
        replay = replay,
        extraBufferCapacity = config.extraBufferCapacity,
        onBufferOverflow = config.onBufferOverflow
    )
    @Suppress("UNCHECKED_CAST")
    val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
    return ReadonlySharedFlow(shared, job)
}

这个方法主要就是生成了一个SharedFlow,并且在指定的协程中去启动collect。

private fun  CoroutineScope.launchSharing(
    context: CoroutineContext,
    upstream: Flow,
    shared: MutableSharedFlow,
    started: SharingStarted,
    initialValue: T
): Job =
    launch(context) { // the single coroutine to rule the sharing
        // Optimize common built-in started strategies
        when {
            started === SharingStarted.Eagerly -> {
                // collect immediately & forever
                upstream.collect(shared)
            }
            started === SharingStarted.Lazily -> {
                // start collecting on the first subscriber - wait for it first
                shared.subscriptionCount.first { it > 0 }
                upstream.collect(shared)
            }
            else -> {
                // other & custom strategies
                started.command(shared.subscriptionCount)
                    .distinctUntilChanged() // only changes in command have effect
                    .collectLatest { // cancels block on new emission
                        when (it) {
                            SharingCommand.START -> upstream.collect(shared) // can be cancelled
                            SharingCommand.STOP -> {  }
                            SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> {
                                if (initialValue === NO_VALUE) {
                                    shared.resetReplayCache() // regular shared flow -> reset cache
                                } else {
                                    shared.tryEmit(initialValue) // state flow -> reset to initial value
                                }
                            }
                        }
                    }
            }
        }
    }

这里面就是根据启动的策略去进行collect。Eagerly和Lazily都好很好理解,主要就是WhileSubscribed的方式的处理。

initialValue:转为stateflow的初始值

replay:转换为SharedFlow之后,当有新的订阅者的时候发送缓存中值的个数

@Test
fun cold2hotflow()= runBlocking {
    val flow = flowOf(1,2,3,4,5).shareIn(GlobalScope,SharingStarted.Lazily)
    val job = GlobalScope.launch {
    println("start collect")
        flow.collect {
            println("job---$it")
        }

    }
    job.join()
}

执行结果:

start collect
job---1
job---2
job---3
job---4
job---5

当使用SharingStarted.Eagerly时的结果:

start collect

当使用SharingStarted.WhileSubscribed时

val flow = flowOf(1,2,3,4,5).shareIn(GlobalScope,SharingStarted.WhileSubscribed())
start collect
job---1
job---2
job---3
job---4
job---5
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/770264.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号