stateflow和sharedflow以及channel都属于热流,但是如何将一个cold flow转成hotflow? kotlin提供了一个shareIn和stateIn的方法,看一下方法的定义:
public funFlow .stateIn( scope: CoroutineScope, started: SharingStarted, initialValue: T ): StateFlow
public funFlow .shareIn( scope: CoroutineScope, started: SharingStarted, replay: Int = 0 ): SharedFlow
shareIn方法将流转换为SharedFlow
stateIn方法将流转换为StateFLow
参数说明:
scope:flow需要在给定的conrutinue里面进行
started:开始流的方式,这里有三种:
1.SharingStarted.Eagerly: 无论当前有没有订阅者,流都会开始,订阅者只能接收到replay个缓冲区的值
2.SharingStarted.Lazily:当有第一个订阅者时,流才会开始,后面的订阅者只能接收到replay个缓冲区的值,当没有订阅者时流还是活跃的
3.SharingStarted.WhileSubscribed
定义如下:
public fun WhileSubscribed(
stopTimeoutMillis: Long = 0,
replayExpirationMillis: Long = Long.MAX_VALUE
)
这个参数的功能类似livedata的生命周期感知的功能。
源码分析:
我们主要分析shareIn方法:
public funFlow .shareIn( scope: CoroutineScope, started: SharingStarted, replay: Int = 0 ): SharedFlow { val config = configureSharing(replay) val shared = MutableSharedFlow ( replay = replay, extraBufferCapacity = config.extraBufferCapacity, onBufferOverflow = config.onBufferOverflow ) @Suppress("UNCHECKED_CAST") val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T) return ReadonlySharedFlow(shared, job) }
这个方法主要就是生成了一个SharedFlow,并且在指定的协程中去启动collect。
private funCoroutineScope.launchSharing( context: CoroutineContext, upstream: Flow , shared: MutableSharedFlow , started: SharingStarted, initialValue: T ): Job = launch(context) { // the single coroutine to rule the sharing // Optimize common built-in started strategies when { started === SharingStarted.Eagerly -> { // collect immediately & forever upstream.collect(shared) } started === SharingStarted.Lazily -> { // start collecting on the first subscriber - wait for it first shared.subscriptionCount.first { it > 0 } upstream.collect(shared) } else -> { // other & custom strategies started.command(shared.subscriptionCount) .distinctUntilChanged() // only changes in command have effect .collectLatest { // cancels block on new emission when (it) { SharingCommand.START -> upstream.collect(shared) // can be cancelled SharingCommand.STOP -> { } SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> { if (initialValue === NO_VALUE) { shared.resetReplayCache() // regular shared flow -> reset cache } else { shared.tryEmit(initialValue) // state flow -> reset to initial value } } } } } } }
这里面就是根据启动的策略去进行collect。Eagerly和Lazily都好很好理解,主要就是WhileSubscribed的方式的处理。
initialValue:转为stateflow的初始值
replay:转换为SharedFlow之后,当有新的订阅者的时候发送缓存中值的个数
@Test
fun cold2hotflow()= runBlocking {
val flow = flowOf(1,2,3,4,5).shareIn(GlobalScope,SharingStarted.Lazily)
val job = GlobalScope.launch {
println("start collect")
flow.collect {
println("job---$it")
}
}
job.join()
}
执行结果:
start collect job---1 job---2 job---3 job---4 job---5
当使用SharingStarted.Eagerly时的结果:
start collect
当使用SharingStarted.WhileSubscribed时
val flow = flowOf(1,2,3,4,5).shareIn(GlobalScope,SharingStarted.WhileSubscribed())
start collect job---1 job---2 job---3 job---4 job---5



