记录一下学习kotlin协程的过程

Mon Jun 03 2024

suspend关键字

首先写一个suspend函数

suspend fun noSuspend() = suspendCoroutine<Int> { continuation ->
    continuation.resume(10)
}

在java种调用这个函数, 发现需要传一个参数Continuation, 有一个Object类型的返回值。

        Object result = CoroutineTestKt.noSuspend(new Continuation<Integer>() {
            @NotNull
            @Override
            public CoroutineContext getContext() {
                return null;
            }
 
            @Override
            public void resumeWith(@NotNull Object o) {
 
            }
        });

在suspend函数中, 隐含了一个Continuation的对象。 所以在普通函数中不能调用suspend函数, 因为不能提供这个Continuation对象。

这个返回值可能是COROUTINE_SUSPENDED标记, 表示协程被挂起。 也可能是函数的返回值。

协程构造函数

        val continuation = suspend {
            "AAA"
        }.createCoroutine(object : Continuation<String> {
            override val context: CoroutineContext
                get() = EmptyCoroutineContext
 
            override fun resumeWith(result: Result<String>) {
                println("result $result")
            }
        })
        continuation.resume(Unit)

Debug发现,createCoroutine调用了这个kotlin stdlib的函数, 并且走到了*分支处。

疑问1:这里this是Reciver的(suspend () -> T), 然而他被认为是BaseContinuationImpl类。。

kotlin-stdlib-1.9.21-sources.jar!/jvmMain/kotlin/coroutines/intrinsics/IntrinsicsJvm.kt

public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(probeCompletion)  *
    else
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function1<Continuation<T>, Any?>).invoke(it)
        }
}

疑问2:单步进入create函数,调用了SuspendLambda的init方法。BaseContinuationImpl实现类在哪?

internal abstract class SuspendLambda(
    public override val arity: Int,
    completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
    constructor(arity: Int) : this(arity, null)
 
    public override fun toString(): String =
        if (completion == null)
            Reflection.renderLambdaToString(this) // this is lambda
        else
            super.toString() // this is continuation
}

协程的挂起

不是所有的suspend函数都会挂起协程。 只有函数调用栈不同时, 才会挂起函数, 比如切线程, 或用事件循环(例如Android的Handler), 或者把continuation实例当成变量记住, 在将来某一时刻调用。

直接看一个例子: foo是一个suspend函数, 在suspendCoroutine的参数的block里面,在子线程里面调用了cont.resume("1")

首先构造一个协程,在suspend lambda中调用foo函数。

    @Test
    fun coroutine_constructor_test() {
        suspend fun foo() = suspendCoroutine { cont ->
            thread {
                println("thread current thread ${Thread.currentThread().name}")
                cont.resume("1")
            }
        }
 
        val continuation = suspend {
            foo()
            "result1"
        }.createCoroutine(object : Continuation<String> {
            override val context: CoroutineContext
                get() = EmptyCoroutineContext
 
            override fun resumeWith(result: Result<String>) {
                println("result $result")
            }
        })
        continuation.resume(Unit)
    }

用debug追踪: 首先这里continuation.resume(Unit)启动协程。 ①处调回到我们写的suspend lambda中。 然后调用foo函数。

foo函数的处理是在suspendCoroutine的block中, 它把当前的continuation对象又包了一层SafeContinuation,调用block后又调了getOrThrow。

在我们的例子中block中只是切了一个线程, 并没有立即调cont.resume, 所以在getOrThrow函数中,result还是初始的UNDECIDED, 所以④处result被赋值COROUTINE_SUSPENDED,并且返回COROUTINE_SUSPENDED, 这个返回值也是suspendCoroutineUninterceptedOrReturn 的参数block的返回值。

suspend函数的不是表面看着的那样,当suspendCoroutineUninterceptedOrReturn返回COROUTINE_SUSPENDED时, foo函数以下的处理并不会执行。 像是在foo函数底下塞了一个return。 然后在②处会跳出这个循环,当前函数所有的流程都走完了。

直到线程被启动, cont.resume("1")执行时, ③处判断当前result为COROUTINE_SUSPENDED, 再次调到①处的invokeSuspend。 再次回到我们写的suspend lambda中, 但是我不能理解的是, 这个调用直接调到了我们写的suspend lambdafoo函数之后的地方, 这肯定是理想的情况, 但是如何做到的, 我想不通。

benny huo 深入理解协程的状态转移 介绍了suspend lambda的真是面貌 在resumeWith->invokeSuspend过程,会管理一个flag, 并在恢复执行时根据这个flag选择恢复的位置。 这件事在代码中看不到, 可能在编译器代码里能找到吧。。。

KotlinConf 2017 - Deep Dive into Coroutines on JVM by Roman Elizarov 7:40 也提到了这件事。

kotlin-stdlib-1.9.21-sources.jar!/commonMain/kotlin/coroutines/Continuation.kt

public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
    return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
        val safe = SafeContinuation(c.intercepted())
        block(safe)
        safe.getOrThrow()
    }
}

kotlin-stdlib-1.9.21-sources.jar!/jvmMain/kotlin/coroutines/SafeContinuationJvm.kt

    internal actual fun getOrThrow(): Any? {
        var result = this.result // atomic read
        if (result === UNDECIDED) {
            if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED ④
            result = this.result // reread volatile var
        }
        return when {
            result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
            result is Result.Failure -> throw result.exception
            else -> result // either COROUTINE_SUSPENDED or data
        }
    }

kotlin-stdlib-1.9.21-sources.jar!/jvmMain/kotlin/coroutines/SafeContinuationJvm.kt

    public actual override fun resumeWith(result: Result<T>) {
        while (true) { // lock-free loop
            val cur = this.result // atomic read
            when {
                cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
                cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
                    delegate.resumeWith(result)         ③
                    return
                }
                else -> throw IllegalStateException("Already resumed")
            }
        }
    }

/kotlin-stdlib-1.9.21-sources.jar!/jvmMain/kotlin/coroutines/jvm/internal/ContinuationImpl.kt

    public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)              ①
                        if (outcome === COROUTINE_SUSPENDED) return     ②
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

协程的拦截器

拦截器在Okhttp中很常用, 可以获取当前的请求或这结果, 做一些额外的处理, 协程的拦截器也是同样的。

KotlinConf 2017 - Deep Dive into Coroutines on JVM by Roman Elizarov 20:40 提到了在当初设计协程时时没有CoroutineContext的。为了解决协程恢复执行时, 在哪个线程执行的问题, 加了这个CoroutineContext的设计。

ContinuationImpl中有一个intercepted方法,他从CoroutineContext取得ContinuationInterceptor, 调用interceptContinuation来获取有拦截处理的Continuation

kotlin-stdlib-1.9.21-sources.jar!/jvmMain/kotlin/coroutines/jvm/internal/ContinuationImpl.kt

    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

在协程创建时, SafeContinuation的代理对象就是拦截后的Continuation

kotlin-stdlib-1.9.21-sources.jar!/commonMain/kotlin/coroutines/Continuation.kt

public fun <T> (suspend () -> T).createCoroutine(
    completion: Continuation<T>
): Continuation<Unit> =
    SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)

suspendCoroutine函数中, SafeContinuation的代理对象也是拦截后的Continuation。 kotlin-stdlib-1.9.21-sources.jar!/commonMain/kotlin/coroutines/Continuation.kt

public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
    contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
    return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
        val safe = SafeContinuation(c.intercepted())
        block(safe)
        safe.getOrThrow()
    }
}

需要注意的是,不是每个suspend函数都会触发拦截, 只有挂起的情况下, 走resumeWith时才会调用delegate.resumeWith(result) 走到拦截器的地方。

    public actual override fun resumeWith(result: Result<T>) {
        while (true) { // lock-free loop
            val cur = this.result // atomic read
            when {
                cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
                cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
                    delegate.resumeWith(result) *
                    return
                }
                else -> throw IllegalStateException("Already resumed")
            }
        }
    }

← Back to home