Flow被称为是冷流,既然有冷流,那么就有热流,管道Channel就是热流。Channel是一个并发安全的队列,主要用于处理协程之间的通信
发送者和接受者是两个不同的协程,两者之间发送传递数据就是通过这个管道,一个send,一个receive
val channel = Channel{ } viewModelScope.launch { channel.send("123") } viewModelScope.launch { val receive = channel.receive() if(receive == "123"){ Log.e(TAG,"收到了") } }
Channel创建了一个通道,用来发送字符串的,那么协程1发送了1个数据之后,在协程2中接收到了这个参数,从而继续执行后续的操作
管道 Channel1 Channel的容量2 Channel的迭代3 Channel的构建器
3.1 produce3.2 actor 4 Channel的关闭5 await多路复用
5.1 传统方式5.2 select5.3 多Channel复用5.4 SelectClause类型解析 6 Flow的多路复用7 协程的并发安全问题
1 Channel的容量既然Channel是一个队列,那么就会有容量,默认情况下capacity = 1
public funChannel( capacity: Int = RENDEZVOUS, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, onUndeliveredElement: ((E) -> Unit)? = null ): Channel = when (capacity) { RENDEZVOUS -> { if (onBufferOverflow == BufferOverflow.SUSPEND) RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel else ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel }
同时,还有一个缓存区BufferOverflow用来存储接收的数据,当缓存区满了之后,send就会挂起,直到缓存区有了新空间;同样,如果缓存区为空,receive就会挂起
2 Channel的迭代Channel是有迭代器的,通过拿到管道的迭代器,可以拿到收取的数据,可以代替receive
viewModelScope.launch {
(1..3).asFlow().collect {
channel.send("123")
}
}
viewModelScope.launch {
val iterator = channel.iterator()
while (iterator.hasNext()){
Log.e(TAG,"收到了 ${iterator.next()}")
}
}
3 Channel的构建器
在前言中,有使用到一种构建器就是Channel可以直接构建一个管道,下面还有2种构建生产者或者消费者协程管道方式
3.1 produceproduce用于生成一个ReceiveChannel,这个管道可用于接收数据,相当于创建了一个生产者,这同样是CorotinueScope的一个扩展函数,相当于创建一个协程,同时又创建了一个管道
val channel= viewModelScope.produce3.2 actor{ send("123") } viewModelScope.launch { val receive = channel.receive() if(receive == "123"){ Log.e(TAG,"收到了") } }
与produce相反,actor创建的是一个SendChannel,用于发送数据的通道,在协程作用域中可以接收数据
val channel = viewModelScope.actor4 Channel的关闭{ val receive = receive() if (receive == "123") { Log.e(TAG, "收到了") } } viewModelScope.launch { channel.send("123") }
因为数据的发送和接收都是在协程中执行的,当协程执行完毕之后,Channel也就关闭了,这是自然的关闭
如果 主动调用Channel的close方法,Channel就会立即停止发送新的元素,Channel的属性 isClosedForSend 立即返回true
val channel = viewModelScope.actor(Dispatchers.Main) { while (true){ val receive = receive() if (receive == "123") { Log.e(TAG, "收到了") } } } viewModelScope.launch { (1..5).asFlow().collect { delay(500) channel.send("123") } } delay(1000) Log.e(TAG, "准备关闭管道") channel.close()
当1s钟之后,关闭了通道,这个时候,将会立刻停止发送数据,接收端抛出异常 ClosedReceiveChannelException 提示当前通道已经关闭了没法接收数据
5 await多路复用多路复用是网络中的一个概念,传输媒体的带宽或者容量往往大于单一传输信号的需求,希望在一个信道同时传输多路信号
在Channel中,就只有一个管道,那么我希望在接收方能够接收多种数据来源,而不是单一的发送一个String数据,就接收一个String数据
上面就是一个场景,在请求数据的时候,一般会有2种数据来源,从云端或者本地,但是两者获取数据的速度是不一致的,Select函数就是来做选择,看哪一端的数据来的最快,那么就拿谁的数据来做渲染
fun getLocal() = viewModelScope.async {
delay(2000)
"这是本地的数据"
}
fun getNet() = viewModelScope.async {
delay(1000)
"这是网络的数据"
}
5.1 传统方式
如果要根据哪个api返回的数据快慢来决定渲染哪个数据,只是通过一个协程是办不到的,getLocal和getNet是异步的,那么在拿数据的时候,只有等到最慢的返回之后,才能拿到全部的数据,所以只能通过开2个协程,单独监听各自的数据返回
5.2 select那么在select函数中,就不需要开辟两个协程就能达到2个协程的效果
viewModelScope.launch {
val result = select> {
getLocal().onAwait{Response(it)}
getNet().onAwait{Response(it)}
}
Log.e("TAG","${result.t}")
}
看这里没有使用到await,而是onAwait函数,相当于在select中注册了一个回调,不管哪个await先返回数据,就将这个数据回调出去
class Response(var t:T)
这样就使用了一个协程,完成了2个协程能够做完的事情
5.3 多Channel复用类似于await复用,有多个通道发送数据,检测到哪个通道发送到了数据,就将数据打印
fun CoroutineScope.channel1() = produce{ delay(200) send("123") } fun CoroutineScope.channel2() = produce { delay(400) send("123456") }
viewModelScope.launch {
val result = select> {
channel1().onReceive { Response(it) }
channel2().onReceive { Response(it) }
}
Log.e("TAG", "${result.t}")
}
这里还是跟await有点儿类似,没有使用receive,而是使用了onReceive回调
5.4 SelectClause类型解析既然上面都用到了select函数,那么到底哪些事件能够被select呢?
从SelectBuilder中可以看到,SelectClause0、SelectClause1、SelectClause2类型的事件都可以被select
public interface SelectBuilder{ public operator fun SelectClause0.invoke(block: suspend () -> R) public operator fun SelectClause1.invoke(block: suspend (Q) -> R) public operator funSelectClause2
.invoke(param: P, block: suspend (Q) -> R) public operator fun
SelectClause2
.invoke(block: suspend (Q) -> R): Unit = invoke(null, block) @ExperimentalCoroutinesApi public fun onTimeout(timeMillis: Long, block: suspend () -> R) }
SelectClause0事件:没有返回值的事件。例如join,对应的onJoin就是SelectClause0事件,是无参构造函数
fun CoroutineScope.co1() = launch {
delay(100)
Log.e(TAG,"co1 finish")
}
fun CoroutineScope.co2() = launch {
delay(10)
Log.e(TAG,"co2 finish")
}
没有返回值,就是Unit类型
select{ co1().onJoin{ print("co1 finish")} co2().onJoin{ print("co2 finish")} }
SelectClause1事件:那么像await,receive就是有返回值,那么就是对应的onAwait和onReceive,通过回调的方式拿到返回值
SelectClause2事件:对应的事件有返回值,而且需要一个额外的参数,例如Channel的send函数,需要发送一个数据,对应的onSend就需要发送一个数据
select6 Flow的多路复用{ launch { delay(200) channel1().onSend("2000"){ print("send in $it") } } launch { delay(100) channel2().onSend("1000"){ print("send in $it") } } }
例如之前的例子,从本地和网络拿数据,需要开启两个协程取数据,那么其实还可以通过merge操作符,将两个流合并为一个流输出
viewModelScope.launch {
listOf(::getLocal,::getNet)
.map { kFunction0 ->
//执行了函数
kFunction0.invoke()
}.map { function ->
flow {
emit(function.await())
}
}.merge().collect {
Log.e(TAG,"结果 :$it")
}
}
7 协程的并发安全问题
和Java的多线程一样,多协程的并发也是会带来安全问题
解决的方式:Channel、Mutex(加锁)、信号量等,和Java中的处理方式一致



