@Test
fun `test know channel`() = runBlocking {
val channel = Channel()
//生产者
val producer = GlobalScope.launch {
var i = 0
while (true) {
delay(1000)
channel.send(++i)
println("send $i")
}
}
//消费者
val consumer = GlobalScope.launch {
while (true) {
val element = channel.receive()
println("receive $element")
}
}
joinAll(producer, consumer)
}
channel的容量
channel是一个队列,队列就有大小,其大小就是缓冲区的大小。‘’
@Test
fun `test know channel2`() = runBlocking {
val channel = Channel()
//生产者,发完后会等着,消费完后再生产,在继续发
val producer = GlobalScope.launch {
var i = 0
while (true) {
delay(1000)
channel.send(++i)
println("send $i")
}
}
//消费者
val consumer = GlobalScope.launch {
while (true) {
delay(2000)
val element = channel.receive()
println("receive $element")
}
}
joinAll(producer, consumer)
}
生产效率大于消费效率,缓冲区(默认大小是0)满的时候会挂起
迭代chanel @Test
fun `test iterate channel`() = runBlocking {
//缓冲区大小为Channel.UNLIMITED
val channel = Channel(Channel.UNLIMITED)
//生产者,快速发
val producer = GlobalScope.launch {
for (x in 1..5) {
channel.send(x * x)
println("send ${x * x}")
}
}
//消费者,慢慢消费
val consumer = GlobalScope.launch {
val iterator = channel.iterator()
while (iterator.hasNext()){
val element = iterator.next()
println("receive $element")
delay(2000)//每隔两面取出一个元素
}
}
joinAll(producer, consumer)
}
for in的写法
@Test
fun `test iterate channel`() = runBlocking {
//缓冲区大小为Channel.UNLIMITED
val channel = Channel(Channel.UNLIMITED)
//生产者,快速发
val producer = GlobalScope.launch {
for (x in 1..5) {
channel.send(x * x)
println("send ${x * x}")
}
}
//消费者,慢慢消费
val consumer = GlobalScope.launch {
for (element in channel) {
println("receive $element")
delay(2000)
}
}
joinAll(producer, consumer)
}
produce 与actor
@Test
fun `test fast producer channel`() = runBlocking {
val receiveChannel: ReceiveChannel = GlobalScope.produce {
repeat(100) {
delay(1000)
send(it)
}
}
val consumer = GlobalScope.launch {
for (i in receiveChannel) {
println("received: $i")
}
}
consumer.join()
}
@Test
fun `test fast consumer channel`() = runBlocking {
val sendChannel: SendChannel = GlobalScope.actor {
while (true) {
val element = receive()
println(element)
}
}
val producer = GlobalScope.launch {
for (i in 0..3) {
sendChannel.send(i)
}
}
producer.join()
}
channel的关闭
@Test
fun `test close channel`() = runBlocking {
val channel = Channel(3)
//生产者
val producer = GlobalScope.launch {
List(3) {
channel.send(it)
println("send $it")
}
//发完就关闭掉
channel.close()
println("""close channel.
| - ClosedForSend: ${channel.isClosedForSend}
| - ClosedForReceive: ${channel.isClosedForReceive}""".trimMargin())
}
//消费者
val consumer = GlobalScope.launch {
for (element in channel){
println("receive $element")
delay(1000)//每隔一秒消费一次
}
println("""After Consuming.
| - ClosedForSend: ${channel.isClosedForSend}
| - ClosedForReceive: ${channel.isClosedForReceive}""".trimMargin())
}
joinAll(producer, consumer)
}
BroadCastChannel
@Test
fun `test broadcast`() = runBlocking {
val broadcastChannel = BroadcastChannel(Channel.BUFFERED)
val producer = GlobalScope.launch {
List(3){
delay(100)
broadcastChannel.send(it)
}
broadcastChannel.close()
}
List(3){ index ->//启动三个协程
GlobalScope.launch {
val receiveChannel = broadcastChannel.openSubscription()
for (i in receiveChannel){
println("[#$index] received: $i")
}
}
}.joinAll()
}
channel 与broadcastchannel的转换
@Test
fun `test broadcast`() = runBlocking {
//val broadcastChannel = BroadcastChannel(Channel.BUFFERED)
val channel = Channel()
val broadcastChannel = channel.broadcast(3)
val producer = GlobalScope.launch {
List(3){
delay(100)
broadcastChannel.send(it)
}
broadcastChannel.close()
}
List(3){ index ->//启动三个协程
GlobalScope.launch {
val receiveChannel = broadcastChannel.openSubscription()
for (i in receiveChannel){
println("[#$index] received: $i")
}
}
}.joinAll()
}
await 多路复用
package com.dongnaoedu.kotlincoroutinechannel import com.dongnaoedu.kotlincoroutinechannel.api.User import com.dongnaoedu.kotlincoroutinechannel.api.userServiceApi import com.google.gson.Gson import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.merge import kotlinx.coroutines.selects.select import org.junit.Test import java.io.File private val cachePath = "E://coroutine.cache" private val gson = Gson() data class Response复用多个channel(val value: T, val isLocal: Boolean) //变成CoroutineScope的扩展函数,这样可以使用async(){} fun CoroutineScope.getUserFromLocal(name: String) = async(Dispatchers.IO) { //delay(1000) //故意的延迟 File(cachePath).readText().let { gson.fromJson(it, User::class.java) } } fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) { userServiceApi.getUser(name) } class CoroutineTest02 { @Test fun `test select await`() = runBlocking { GlobalScope.launch { val localRequest = getUserFromLocal("xxx") val remoteRequest = getUserFromRemote("yyy") val userResponse = select > { localRequest.onAwait { Response(it, true) } remoteRequest.onAwait { Response(it, false) } } userResponse.value?.let { println(it) } }.join() }
@Test
fun `test select channel`() = runBlocking {
val channels = listOf(Channel(), Channel())
GlobalScope.launch {
delay(100)
channels[0].send(200)
}
GlobalScope.launch {
delay(50)
channels[1].send(100)
}
val result = select {
channels.forEach { channel ->
channel.onReceive { it }
}
}
println(result)
}



