栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

王学岗Kotlin协程(五)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

王学岗Kotlin协程(五)

使用chanel进行通信
 @Test
    fun `test know channel`() = runBlocking {
        val channel = Channel()
        //生产者
        val producer = GlobalScope.launch {
            var i = 0
            while (true) {
                delay(1000)
                channel.send(++i)
                println("send $i")
            }
        }

        //消费者
        val consumer = GlobalScope.launch {
            while (true) {
                val element = channel.receive()
                println("receive $element")
            }
        }
        joinAll(producer, consumer)

    }


channel的容量

channel是一个队列,队列就有大小,其大小就是缓冲区的大小。‘’

@Test
    fun `test know channel2`() = runBlocking {
        val channel = Channel()
        //生产者,发完后会等着,消费完后再生产,在继续发
        val producer = GlobalScope.launch {
            var i = 0
            while (true) {
                delay(1000)
                channel.send(++i)
                println("send $i")
            }
        }

        //消费者
        val consumer = GlobalScope.launch {
            while (true) {
                delay(2000)
                val element = channel.receive()
                println("receive $element")
            }
        }
        joinAll(producer, consumer)

    }

生产效率大于消费效率,缓冲区(默认大小是0)满的时候会挂起

迭代chanel

   @Test
    fun `test iterate channel`() = runBlocking {
//缓冲区大小为Channel.UNLIMITED

        val channel = Channel(Channel.UNLIMITED)
        //生产者,快速发
        val producer = GlobalScope.launch {
            for (x in 1..5) {
                channel.send(x * x)
                println("send ${x * x}")
            }
        }

        //消费者,慢慢消费
        val consumer = GlobalScope.launch {
            val iterator = channel.iterator()
            while (iterator.hasNext()){
                val element = iterator.next()
                println("receive $element")
                delay(2000)//每隔两面取出一个元素
            }
           }

        joinAll(producer, consumer)

    }

for in的写法

   @Test
    fun `test iterate channel`() = runBlocking {
//缓冲区大小为Channel.UNLIMITED

        val channel = Channel(Channel.UNLIMITED)
        //生产者,快速发
        val producer = GlobalScope.launch {
            for (x in 1..5) {
                channel.send(x * x)
                println("send ${x * x}")
            }
        }

        //消费者,慢慢消费
        val consumer = GlobalScope.launch {
        for (element in channel) {
                println("receive $element")
                delay(2000)
            }
        }

        joinAll(producer, consumer)

    }
produce 与actor

@Test
    fun `test fast producer channel`() = runBlocking {
        val receiveChannel: ReceiveChannel = GlobalScope.produce {
            repeat(100) {
                delay(1000)
                send(it)
            }
        }

        val consumer = GlobalScope.launch {
            for (i in receiveChannel) {
                println("received: $i")
            }
        }
        consumer.join()
    }
 @Test
    fun `test fast consumer channel`() = runBlocking {
        val sendChannel: SendChannel = GlobalScope.actor {
            while (true) {
                val element = receive()
                println(element)
            }
        }

        val producer = GlobalScope.launch {
            for (i in 0..3) {
                sendChannel.send(i)
            }
        }

        producer.join()
    }
channel的关闭

 @Test
    fun `test close channel`() = runBlocking {
        val channel = Channel(3)
        //生产者
        val producer = GlobalScope.launch {
            List(3) {
                channel.send(it)
                println("send $it")
            }
             //发完就关闭掉
            channel.close()
            println("""close channel. 
                |  - ClosedForSend: ${channel.isClosedForSend}
                |  - ClosedForReceive: ${channel.isClosedForReceive}""".trimMargin())
        }

        //消费者
        val consumer = GlobalScope.launch {
            for (element in channel){
                println("receive $element")
                delay(1000)//每隔一秒消费一次
            }
            println("""After Consuming. 
                |   - ClosedForSend: ${channel.isClosedForSend} 
                |   - ClosedForReceive: ${channel.isClosedForReceive}""".trimMargin())
        }

        joinAll(producer, consumer)
    }


BroadCastChannel

@Test
    fun `test broadcast`() = runBlocking {
        val broadcastChannel = BroadcastChannel(Channel.BUFFERED)
       val producer = GlobalScope.launch {
            List(3){
                delay(100)
                broadcastChannel.send(it)
            }
            broadcastChannel.close()
        }

        List(3){ index ->//启动三个协程
            GlobalScope.launch {
                val receiveChannel = broadcastChannel.openSubscription()
                for (i in receiveChannel){
                    println("[#$index] received: $i")
                }
            }
        }.joinAll()
    }

channel 与broadcastchannel的转换

 @Test
    fun `test broadcast`() = runBlocking {
        //val broadcastChannel = BroadcastChannel(Channel.BUFFERED)
        val channel = Channel()
        val broadcastChannel = channel.broadcast(3)
        val producer = GlobalScope.launch {
            List(3){
                delay(100)
                broadcastChannel.send(it)
            }
            broadcastChannel.close()
        }

        List(3){ index ->//启动三个协程
            GlobalScope.launch {
                val receiveChannel = broadcastChannel.openSubscription()
                for (i in receiveChannel){
                    println("[#$index] received: $i")
                }
            }
        }.joinAll()
    }
await 多路复用

package com.dongnaoedu.kotlincoroutinechannel

import com.dongnaoedu.kotlincoroutinechannel.api.User
import com.dongnaoedu.kotlincoroutinechannel.api.userServiceApi
import com.google.gson.Gson
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.selects.select
import org.junit.Test
import java.io.File


private val cachePath = "E://coroutine.cache"
private val gson = Gson()

data class Response(val value: T, val isLocal: Boolean)
//变成CoroutineScope的扩展函数,这样可以使用async(){}
fun CoroutineScope.getUserFromLocal(name: String) = async(Dispatchers.IO) {
    //delay(1000) //故意的延迟
    File(cachePath).readText().let { gson.fromJson(it, User::class.java) }
}

fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
    userServiceApi.getUser(name)
}


class CoroutineTest02 {

    @Test
    fun `test select await`() = runBlocking {
        GlobalScope.launch {
            val localRequest = getUserFromLocal("xxx")
            val remoteRequest = getUserFromRemote("yyy")

            val userResponse = select> {
                localRequest.onAwait { Response(it, true) }
                remoteRequest.onAwait { Response(it, false) }
            }

            userResponse.value?.let { println(it) }
        }.join()
    }
复用多个channel

 @Test
    fun `test select channel`() = runBlocking {
        val channels = listOf(Channel(), Channel())
        GlobalScope.launch {
            delay(100)
            channels[0].send(200)
        }

        GlobalScope.launch {
            delay(50)
            channels[1].send(100)
        }

        val result = select {
            channels.forEach { channel ->
                channel.onReceive { it }
            }
        }
        println(result)
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/584357.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号