栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

协程进阶:CoroutineContext

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

协程进阶:CoroutineContext

前言

CoroutineContext是协程中的上下文,通过它可以控制协程在哪个线程中执行,可以设置协程的名字,可以用它来捕获协程抛出的异常等。

通过 CoroutineScope.launch 方法或者CoroutineScope.async 方法可以启动一个协程。

// launch 源码 
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    ...
}

//  async 源码
public fun  CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred {
    ...
}

从源码中可以看出,这些方法第一个参数的类型就是 CoroutineContext。默认值是 EmptyCoroutineContext 单例对象。

CoroutineContext 类图

CoroutineContext 是一个包含了用户定义的一些各种不同元素的Element对象集合。其中主要元素是Job、协程调度器CoroutineDispatcher、处理协程异常CoroutineExceptionHandler、拦截器ContinuationInterceptor、协程名CoroutineName等。

根据类图结构可知:

CoroutineContext 是所有上下文相关类的父接口。CoroutineContext 直接子类是CombinedContext、Element、EmptyCoroutineContext。Element 直接子类是AbstractCoroutineContextElement、Job、CoroutineExceptionHandler、CoroutineInterceptor。AbstractCoroutineContextElement直接子类是CoroutineName、CoroutineDispatcher。 CoroutineContext 接口数据结构

public interface CoroutineContext {
    
    public operator fun  get(key: Key): E?

    
    public fun  fold(initial: R, operation: (R, Element) -> R): R

    
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }

    
    public fun minusKey(key: Key<*>): CoroutineContext

    
    public interface Key

    
    public interface Element : CoroutineContext {
        
        public val key: Key<*>

        public override operator fun  get(key: Key): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null

        public override fun  fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)

        public override fun minusKey(key: Key<*>): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this
    }
}

CoroutineContext是element的set和map的混合体,没有重复类型的element对象。集合中的每个element都有唯一的Key,Key可以用来检索元素。

Key 接口
public interface CoroutineContext {
    
    public interface Key
    
    public interface Element : CoroutineContext {
        
        public val key: Key<*>
    }
}

Key 是一个接口定义在 CoroutineContext中的接口,作为接口它没有声明任何的方法,它只是用来检索。同时它的泛型类型是Element 接口。具体看下,在协程中如何使用Key 接口

// Job 
public interface Job : CoroutineContext.Element {
    public companion object Key : CoroutineContext.Key {
        //省略...
    }
}

// CoroutineDispatcher
public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
      public companion object Key : AbstractCoroutineContextKey(
        ContinuationInterceptor,
        { it as? CoroutineDispatcher })
}

// CoroutineExceptionHandler
public interface CoroutineExceptionHandler : CoroutineContext.Element {
    public companion object Key : CoroutineContext.Key
}

// ContinuationInterceptor
public interface ContinuationInterceptor : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key
}

// CoroutineName
public data class CoroutineName(
    val name: String
) : AbstractCoroutineContextElement(CoroutineName) {
    public companion object Key : CoroutineContext.Key
}

从上面的源码来看,发现 Element 的子类都必须重写 Key 这个属性,而且Key 的泛型类型必须和类名相同。以 CoroutineName 例,Key 是一个伴生对象,同时Key 的泛型类型也是 CoroutineName。

get() 方法
public interface CoroutineContext {
    
    public operator fun  get(key: Key): E?
    
    public interface Element : CoroutineContext {

        public override operator fun  get(key: Key): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null
    }
}

// CombinedContext
internal class CombinedContext(
    private val left: CoroutineContext,
    private val element: Element
) : CoroutineContext, Serializable {

    override fun  get(key: Key): E? {
        var cur = this
        while (true) {
            cur.element[key]?.let { return it }
            val next = cur.left
            if (next is CombinedContext) {
                cur = next
            } else {
                return next[key]
            }
        }
    }
}

// EmptyCoroutineContext
public object EmptyCoroutineContext : CoroutineContext, Serializable {
    public override fun  get(key: Key): E? = null
}

Element 接口中的get方法:只要 Key 与当前 Element 的 Key 匹配上了,返回该 Element 否则返回 null。

CombinedContext get 方法: 遍历链表,查询与 Key 相等的 Element,如果没找到返回 null。

使用方式:

val coroutineContext1 = Job() + CoroutineName("这是第一个上下文")
// get 方法使用
println("--->${coroutineContext1[CoroutineName]}")
println("--->${coroutineContext1[Job]}")
plus() 方法
public interface CoroutineContext {
       public operator fun plus(context: CoroutineContext): CoroutineContext =
        //如果要相加的CoroutineContext为空,那么不做任何处理,直接返回
        if (context === EmptyCoroutineContext) this else
           //如果要相加的CoroutineContext不为空,那么对它进行fold操作
            context.fold(this) { acc, element -> // 注释1
                // 注释2
                val removed = acc.minusKey(element.key)
                // 注释3
                if (removed === EmptyCoroutineContext) element else {
                  //注释4
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                       
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }
}

注释1

我们可以把acc理解成+号左边的CoroutineContext,element理解成+号右边的CoroutineContext的某一个element

注释2

从左边CoroutineContext中删除右边的这个element

注释3

如果removed为空,说明左边CoroutineContext删除了和element相同的元素后为空,那么返回右边的element即可

注释4

如果removed不为空,说明左边CoroutineContext删除了和element相同的元素后还有其他元素,那么构造一个新的CombinedContext返回

plus方法 有个关键字operator表示这是一个运算符重载的方法,可以通过+号来返回一个包含原始集合和第二个操作数中的元素的结果。「加号运算符」把两个CoroutineContext相加后,返回一个CombinedContext,在组合成CombinedContext时,+号右边的CoroutineContext中的元素会「覆盖」+号左边的CoroutineContext中的含有相同key的元素。如下:

(Dispatchers.Main, "name") + (Dispatchers.IO) = (Dispatchers.IO, "name")

这个覆盖操作就在fold方法的参数「operation代码块」中完成,通过「minusKey」方法删除掉重复元素。

该方法源码中ContinuationInterceptor处理的目的是让ContinuationInterceptor在每次相加后都能变成CoroutineContext中的「最后」一个元素。这样协程在查找上下文的element时,总能最快找到拦截器,避免了递归查找,从而让拦截行为前置执行。
使用方式:

val coroutineContext1 = Job() + CoroutineName("这是第一个上下文")
    println("--->${coroutineContext1}")
    val  coroutineContext2 = coroutineContext1 + Dispatchers.Default + CoroutineName("这是第二个上下文")
    println("--->${coroutineContext2}")
    val coroutineContext3 = coroutineContext2 + Dispatchers.Main + CoroutineName("这是第三个上下文")
    println("--->${coroutineContext3}")

执行结果:
--->[JobImpl{Active}@51081592, CoroutineName(这是第一个上下文)]
--->[JobImpl{Active}@51081592, CoroutineName(这是第二个上下文), Dispatchers.Default]
--->[JobImpl{Active}@51081592, CoroutineName(这是第三个上下文), Dispatchers.Main[missing, cause=java.lang.RuntimeException: Stub!]]

将两个 CoroutineContext 组合成一个 CoroutineContext,如果是两个类型相同的 Element 会返回一个新的 Element。如果是两个不同类型的Element会返回一个CombinedContext。如果是多个不同类型的 Element 会返回一条 CombinedContext 链表。

CoroutineContext通过这种方式来添加元素的好处是,添加元素后,生成的CombinedContext,它也继承自CoroutineContext,从而在使用协程构造器函数,例如launch时,可以传入单个的CoroutineContext,也可以通过「+」来传入多个CoroutineContext的组合。

CombinedContext 分析

CombinedContext 是 CoroutineContext 的子类,left 也是 CoroutineContext 类型的,所以它的数据结构是链表。

get() 方法的分析
internal class CombinedContext(
    private val left: CoroutineContext,
    private val element: Element
) : CoroutineContext, Serializable {
    
    override fun  get(key: Key): E? {
        var cur = this
        while (true) {
            // 注释 1
            cur.element[key]?.let { return it }
            val next = cur.left
            if (next is CombinedContext) {// 注释 2
                cur = next
            } else {// 注释 3
                return next[key]
            }
        }
    }
}

注释1:

先看element是否是匹配,如果匹配,那么element就是需要找的元素,返回element,否则说明要找的元素在left中,继续从left开始找,根据left是CombinedContext还是Element转到2或3

注释 2

如果left又是一个CombinedContext,那么重复1

注释 3

如果left是Element,那么调用它的get方法返回

fold() 方法的分析
internal class CombinedContext(
    private val left: CoroutineContext,
    private val element: Element
) : CoroutineContext, Serializable {
    
    public override fun  fold(initial: R, operation: (R, Element) -> R): R =
        operation(left.fold(initial, operation), element)
}

从left到right到访问所有的element,即会从left到right的把每一个element传入operation方法中,作为operation方法的第二个参数,而operation方法第一个参数acc的初始值为fold方法传入的initial值,然后它会不断的更新,每次更新的值为上一次调用operation方法的返回值。

minusKey() 方法的分析
internal class CombinedContext(
    private val left: CoroutineContext,
    private val element: Element
) : CoroutineContext, Serializable {
    
   public override fun minusKey(key: Key<*>): CoroutineContext {
        // 注释1
        element[key]?.let { return left }
        val newLeft = left.minusKey(key)
        return when {
            newLeft === left -> this // 注释2
            newLeft === EmptyCoroutineContext -> element// 注释3
            else -> CombinedContext(newLeft, element)// 注释4
        }
    }
}

注释1:

先看element是否是匹配,如果匹配,那么element就是需要删除的元素,返回left,否则说明要删除的元素在left中,继续从left中删除对应的元素,根据left是否删除了要删除的元素转到2或3或4

注释 2

如果left中不存在要删除的元素,那么当前CombinedContext就不存在要删除的元素,直接返回当前CombinedContext实例就行

注释 3

如果left中存在要删除的元素,删除了这个元素后,left变为了空,那么直接返回当前CombinedContext的element就行

注释 4

如果left中存在要删除的元素,删除了这个元素后,left不为空,那么组合一个新的CombinedContext返回

可以发现CombinedContext中的get、fold、minusKey操作都是递归形式的操作,递归的终点就是当这个left是一个Element。

结合Element 接口分析
public interface Element : CoroutineContext {

    public val key: Key<*>

    //如果key和自己的key匹配,那么自己就是要找的Element,返回自己,否则返回null
    public override operator fun  get(key: Key): E? =
    if (this.key == key) this as E else null

    //对传入的initial和自己做operation操作
    public override fun  fold(initial: R, operation: (R, Element) -> R): R =
    operation(initial, this)

    //如果key和自己的key匹配,那么自己就是要删除的Element,返回EmptyCoroutineContext(表示删除了自己),否则说明自己不需要被删除,返回自己
    public override fun minusKey(key: Key<*>): CoroutineContext =
    if (this.key == key) EmptyCoroutineContext else this
}

从CombinedContext和Element结合来看,CombinedContext的整体结构如下:

有点像是一个链表,left就是指向下一个结点的指针,有了这个图我们再从整体看当调用CombinedContext的get、fold、minusKey操作时的访问顺序:get、minusKey操作大体逻辑都是先访问当前element,不满足,再访问left的element,顺序都是从right到left,而fold的操作大体逻辑是先访问left,直到递归到最后的element,然后再从left到right的返回,从而访问了所有的element。

CombinedContext 子类分析 Job

通过CoroutineScope的扩展方法launch启动一个协程后,它会返回一个Job对象,它是协程的唯一标识,控制协程的生命周期。

public interface Job : CoroutineContext.Element {
    public companion object Key : CoroutineContext.Key {
        init {
            CoroutineExceptionHandler
        }
    }
    
    public val isActive: Boolean
    
    public val isCompleted: Boolean
    
    public val isCancelled: Boolean
    
    public fun getCancellationException(): CancellationException
    
    public fun start(): Boolean
    
    public fun cancel(cause: CancellationException? = null)
    
    public fun cancel(): Unit = cancel(null)
    
    public fun cancel(cause: Throwable? = null): Boolean
    
    public val children: Sequence
    
    public fun attachChild(child: ChildJob): ChildHandle
    
    public suspend fun join()
    
    ...
}

协程的生命周期共有6个状态,分别为New、Active、Completing、Completed、cancelling、cancelled,其中New、Cancelling、Completing状态,外部是无法感知这三个状态的。Job只提供了isActive、isCancelled、isCompleted属性来供外部判断协程是否处于Active、Cancelled、Completed状态。

当协程处于Active状态时,isActive为true,isCancelled和isCompleted为false当协程处于Cancelled状态时,isCancelled和isCompleted为true,isActive为false当协程处于Completed状态时,isCompleted为true,isActive和isCancelled为false

协程有一个父子的概念,例如启动一个协程1,在协程中继续启动协程2、协程3;那么协程1就是协程2、协程3的父协程,协程2、协程3就是协程1的子协程,每个协程都会有一个对应的Job。协程之间的父子关系是通过Job对象维持的,如下图所示:


通常启动协程时没有特意地通过CoroutineContext指定一个Job,那么使用launch/async方法启动协程时返回的Job它会产生异常传播。

异常传播:Job因为除了CancellationException以外的异常而失败时,那么父Job就会感知到并抛出异常,在抛出异常之前,父Job会取消所有子Job的运行。

抑制这种异常传播的行为需要调用SupervisorJob方法去处理:

public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)

SupervisorJob方法会返回CompletableJob类型,CompletableJob是Job的一个子接口。

public interface CompletableJob : Job {
    public fun complete(): Boolean
    
    public fun completeExceptionally(exception: Throwable): Boolean
}

CompletableJob比Job接口多了一个complete方法,这意味着它可以调用complete方法让协程任务进入完成状态。说明这个Job它不会产生异常传播,每个Job可以单独被管理,当SupervisorJob因为除了CancellationException以外的异常而失败时,并不会影响到父Job和其他子Job。 SupervisorJob应用如下:

fun main(){
     val parentJob = GlobalScope.launch {
        //childJob是一个SupervisorJob
        val childJob = launch(SupervisorJob()){
            println("childJob complete")
            throw NullPointerException()
        }
        childJob.join()
        println("parent complete")
    }
    Thread.sleep(1000)
}

执行结果:
childJob complete
Exception in thread "DefaultDispatcher-worker-2" java.lang.NullPointerException
...
parent complete

childJob抛出异常并不会影响parentJob的运行,parentJob会继续运行并输出parent complete。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/736468.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号