没有合适的资源?快使用搜索试试~ 我知道了~
首页Kotlin协程分析(一)——协程的创建过程和执行过程.pdf
Kotlin协程分析(一)——协程的创建过程和执行过程.pdf
需积分: 0 19 下载量 7 浏览量
更新于2023-03-16
评论 1
收藏 521KB PDF 举报
Kotlin协程分析(一)——协程的创建过程和执行过程,资料参考Kotlin v1.4.0 源码及stdlib
资源详情
资源评论
资源推荐
public fun <T> (suspend () -> T).createCoroutine(completion: Continuation<T>): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
internal actual class SafeContinuation<in T>
internal actual constructor(
private val delegate: Continuation<T>,
initialResult: Any?
) : Continuation<T>, CoroutineStackFrame {
@PublishedApi
internal actual constructor(delegate: Continuation<T>) : this(delegate, UNDECIDED)
public actual override val context: CoroutineContext
get() = delegate.context
@Volatile
private var result: Any? = initialResult
private companion object {
@Suppress("UNCHECKED_CAST")
@JvmStatic
private val RESULT = AtomicReferenceFieldUpdater.newUpdater<SafeContinuation<*>, Any?>(
SafeContinuation::class.java, Any::class.java as Class<Any?>, "result"
)
}
public actual override fun resumeWith(result: Result<T>) {
while (true) { // lock-free loop
val cur = this.result // atomic read
when {
cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
delegate.resumeWith(result)
return
}
else -> throw IllegalStateException("Already resumed")
}
}
}
@PublishedApi
internal actual fun getOrThrow(): Any? {
var result = this.result // atomic read
if (result === UNDECIDED) {
if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) return COROUTINE_SUSPENDED
result = this.result // reread volatile var
}
return when {
result === RESUMED -> COROUTINE_SUSPENDED // already called continuation, indicate COROUTINE_SUSPENDED upstream
result is Result.Failure -> throw result.exception
else -> result // either COROUTINE_SUSPENDED or data
}
}
// --- CoroutineStackFrame implementation
public override val callerFrame: CoroutineStackFrame?
get() = delegate as? CoroutineStackFrame
override fun getStackTraceElement(): StackTraceElement? =
null
override fun toString(): String =
"SafeContinuation for $delegate"
}
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T> //这个completion是最外层创建的匿名类
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion) //貌似是Debug用的,可看作直接返回completion
return if (this is BaseContinuationImpl) //this 是 suspend function,应该是返回false
create(probeCompletion)
else
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}
private inline fun <T> createCoroutineFromSuspendFunction(
completion: Continuation<T>,
crossinline block: (Continuation<T>) -> Any?
): Continuation<Unit> {
val context = completion.context
// label == 0 when coroutine is not started yet (initially) or label == 1 when it was
return if (context === EmptyCoroutineContext) //用户自订的上下文环境,Source是EmptyCoroutineContext
object : RestrictedContinuationImpl(completion as Continuation<Any?>) {
private var label = 0
override fun invokeSuspend(result: Result<Any?>): Any? =
when (label) {
0 -> {
label = 1
result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
block(this) // run the block, may return or suspend
}
1 -> {
label = 2
result.getOrThrow() // this is the result if the block had suspended
}
else -> error("This coroutine had already completed")
}
}
else
object : ContinuationImpl(completion as Continuation<Any?>, context) {
private var label = 0
override fun invokeSuspend(result: Result<Any?>): Any? =
when (label) {
0 -> {
label = 1
result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
block(this) // run the block, may return or suspend
}
1 -> {
label = 2
result.getOrThrow() // this is the result if the block had suspended
}
else -> error("This coroutine had already completed")
}
}
}
SourceCode
val suspendFun = suspend {
println("作些什么")
"Hello"
}
val coroutine = suspendFun.createCoroutine(object :Continuation<String>{
override val context: CoroutineContext = EmptyCoroutineContext
override fun resumeWith(result: Result<String>) {
println("Resume:${result.getOrNull()}")
}
})
coroutine.resume(Unit)
internal abstract class RestrictedContinuationImpl(
completion: Continuation<Any?>?
) : BaseContinuationImpl(completion) {
init {
completion?.let {
require(it.context === EmptyCoroutineContext) {
"Coroutines with restricted suspension must have EmptyCoroutineContext"
}
}
}
public override val context: CoroutineContext
get() = EmptyCoroutineContext
}
@SinceKotlin("1.3")
internal abstract class BaseContinuationImpl(
// This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
// it has a public getter (since even untrusted code is allowed to inspect its call stack).
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result //result = Result.success(Unit)
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current) //此处还是Debug相关,所以无视即可
with(current) { //Completion 是最外层我们自定义的匿名类
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted () // this state machine instance is terminating
if (completion is BaseContinuationImpl) { //按照之前的逻辑,这里走false
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome) //回调最外层我们自己创建的匿名类中
return
}
}
}
}
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
protected open fun releaseIntercepted() {
// does nothing here, overridden in ContinuationImpl
}
public open fun create(completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Continuation) has not been overridden")
}
public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
}
public override fun toString(): String =
"Continuation at ${getStackTraceElement() ?: this::class.java.name}"
// --- CoroutineStackFrame implementation
public override val callerFrame: CoroutineStackFrame?
get() = completion as? CoroutineStackFrame
public override fun getStackTraceElement(): StackTraceElement? =
getStackTraceElementImpl()
}
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
@SinceKotlin("1.3")
// State machines for named suspend functions extend from this class
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
protected override fun releaseIntercepted() {
val intercepted = intercepted
if (intercepted != null && intercepted !== this) {
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
}
this.intercepted = CompletedContinuation // just in case
}
}
因为 createCoroutineUnintercepted() 返回的是
RestrictedContinuationImpl,所以返回自己
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
调用
调用
resumeWith
public inline fun <T> Result<T>.getOrThrow(): T {
throwOnFailure()
return value as T
}
internal fun Result<*>.throwOnFailure() {
if (value is Result.Failure) throw value.exception
}
此时的 value = Result.success(Unit),所以不会抛出异常
就是执行suspend函数体内的代码
真实调用
catzifeng
- 粉丝: 179
- 资源: 3
上传资源 快速赚钱
- 我的内容管理 收起
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
会员权益专享
最新资源
- stc12c5a60s2 例程
- Android通过全局变量传递数据
- c++校园超市商品信息管理系统课程设计说明书(含源代码) (2).pdf
- 建筑供配电系统相关课件.pptx
- 企业管理规章制度及管理模式.doc
- vb打开摄像头.doc
- 云计算-可信计算中认证协议改进方案.pdf
- [详细完整版]单片机编程4.ppt
- c语言常用算法.pdf
- c++经典程序代码大全.pdf
- 单片机数字时钟资料.doc
- 11项目管理前沿1.0.pptx
- 基于ssm的“魅力”繁峙宣传网站的设计与实现论文.doc
- 智慧交通综合解决方案.pptx
- 建筑防潮设计-PowerPointPresentati.pptx
- SPC统计过程控制程序.pptx
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
评论0