上述就是android开发分享Android Dispatchers.IO线程池深入刨析的全部内容,如果对大家有所用处且需要了解更多关于Android学习教程,希望大家多多关注—计算机技术网(www.ctvol.com)!
一. dispatchers.io
1.dispatchers.io
在协程中,当需要执行io任务时,会在上下文中指定dispatchers.io来进行线程的切换调度。 而io实际上是coroutinedispatcher类型的对象,实际的值为defaultscheduler类的常量对象io,代码如下:
public actual object dispatchers { ... @jvmstatic public val io: coroutinedispatcher = defaultscheduler.io }
2.defaultscheduler类
defaultscheduler类继承自experimentalcoroutinedispatcher类,内部提供了类型为limitingdispatcher的io对象,代码如下:
// 系统配置变量 public const val io_parallelism_property_name: string = "kotlinx.coroutines.io.parallelism" ... // 表示不会阻塞的任务,纯cpu任务 internal const val task_non_blocking = 0 // 表示执行过程中可能会阻塞的任务,非纯cpu任务 internal const val task_probably_blocking = 1 ... // 默认线程池名称 internal const val default_dispatcher_name = "dispatchers.default" ... internal object defaultscheduler : experimentalcoroutinedispatcher() { // 创建名为dispatchers.io的线程池 // 最大并发数量为kotlinx.coroutines.io.parallelism指定的值,默认为64与cpu数量中的较大者 // 默认的执行的任务类型为task_probably_blocking val io: coroutinedispatcher = limitingdispatcher( this, systemprop(io_parallelism_property_name, 64.coerceatleast(available_processors)), "dispatchers.io", task_probably_blocking ) override fun close() { throw unsupportedoperationexception("$default_dispatcher_name cannot be closed") } // 可以看出io和default共用一个线程池 override fun tostring(): string = default_dispatcher_name @internalcoroutinesapi @suppress("unused") public fun todebugstring(): string = super.tostring() }
3.limitingdispatcher类
limitingdispatcher类继承自executorcoroutinedispatcher类,实现了taskcontext接口和executor接口。
limitingdispatcher类的核心是构造方法中类型为experimentalcoroutinedispatcher的dispatcher对象。
limitingdispatcher类看起来是一个标准的线程池,但实际上limitingdispatcher类只对类参数中传入的dispatcher进行包装和功能扩展。如同免费精选名字大全中的litmit一样,limitingdispatcher类主要用于对任务执行数量进行限制,代码如下:
// dispatcher参数传入了defaultscheduler对象 // parallelism表示并发执行的任务数量 // name表示线程池的免费精选名字大全 // taskmode表示任务模式,taskcontext接口中的常量 private class limitingdispatcher( private val dispatcher: experimentalcoroutinedispatcher, private val parallelism: int, private val name: string?, override val taskmode: int ) : executorcoroutinedispatcher(), taskcontext, executor { // 用于保存任务的队列 private val queue = concurrentlinkedqueue<runnable>() // 用于记录当前正在执行的任务的数量 private val inflighttasks = atomic(0) // 获取当前线程池 override val executor: executor get() = this // executor接口的实现,线程池的核心方法,通过dispatch实现 override fun execute(command: runnable) = dispatch(command, false) override fun close(): unit = error("close cannot be invoked on limitingblockingdispatcher") // coroutinedispatcher接口的实现 override fun dispatch(context: coroutinecontext, block: runnable) = dispatch(block, false) // 任务分发的核心方法 private fun dispatch(block: runnable, taildispatch: boolean) { // 获取当前要执行的任务 var tasktoschedule = block // 死循环 while (true) { // 当前执行的任务数加一,也可理解生成生成当前要执行的任务的编号 val inflight = inflighttasks.incrementandget() // 如果当前需要执行的任务数小于允许的并发执行任务数量,说明可以执行, if (inflight <= parallelism) { // 调用参数中的dispatcher对象,执行任务 dispatcher.dispatchwithcontext(tasktoschedule, this, taildispatch) // 返回,退出循环 return } // 如果达到的最大并发数的限制,则将任务加入到队列中 queue.add(tasktoschedule) // 下面的代码防止线程竞争导致任务卡在队列里不被执行,case如下: // 线程1:inflighttasks = 1 ,执行任务 // 线程2:inflighttasks = 2,当前达到了parallelism限制, // 线程1:执行结束,inflighttasks = 1 // 线程2:将任务添加到队列里,执行结束,inflighttasks = 0 // 由于未执行,因此这里当前执行的任务数先减一 // 减一后如果仍然大于等于在大并发数,则直接返回,退出循环 if (inflighttasks.decrementandget() >= parallelism) { return } // 如果减一后,发现可以执行任务,则从队首获取任务,进行下一次循环 // 如果队列为空,说明没有任务,则返回,退出循环 tasktoschedule = queue.poll() ?: return } } // coroutinedispatcher接口的实现,用于yield挂起协程时的调度处理 override fun dispatchyield(context: coroutinecontext, block: runnable) { // 也是通过dispatch方法实现,注意这里taildispatch参数为true dispatch(block, taildispatch = true) } override fun tostring(): string { return name ?: "${super.tostring()}[dispatcher = $dispatcher]" } // taskcontext接口的实现,用于在一个任务执行完进行回调 override fun aftertask() { // 从队首获取一个任务 var next = queue.poll() // 若可以获取到 if (next != null) { // 则执行任务,注意这里taildispatch参数为true dispatcher.dispatchwithcontext(next, this, true) // 返回 return } // 任务执行完毕,当前执行的任务数量减一 inflighttasks.decrementandget() // 下面的代码防止线程竞争导致任务卡在队列里不被执行,case如下: // 线程1:inflighttasks = 1 ,执行任务 // 线程2:inflighttasks = 2 // 线程1:执行结束,执行aftertask方法,发现队列为空,此时inflighttasks = 2 // 线程2:inflighttasks当前达到了parallelism限制, // 将任务加入到队列中,执行结束,inflighttasks = 1 // 线程1:inflighttasks=1,执行结束 // 从队列中取出任务,队列为空则返回 next = queue.poll() ?: return // 执行任务,注意这里taildispatch参数为true dispatch(next, true) } }
dispatcher的dispatch方法定义在experimentalcoroutinedispatcher类中。
4.experimentalcoroutinedispatcher类
experimentalcoroutinedispatcher类继承自executorcoroutinedispatcher类,代码如下:
// corepoolsize线程池核心线程数 // maxpoolsize表示线程池最大线程数 // schedulername表示内部协程调度器的免费精选名字大全 // idleworkerkeepalivens表示空闲的线程存活时间 @internalcoroutinesapi public open class experimentalcoroutinedispatcher( private val corepoolsize: int, private val maxpoolsize: int, private val idleworkerkeepalivens: long, private val schedulername: string = "coroutinescheduler" ) : executorcoroutinedispatcher() { // 我们在defaultscheduler类中就是通过默认的构造方法, // 创建的父类experimentalcoroutinedispatcher对象 public constructor( corepoolsize: int = core_pool_size, maxpoolsize: int = max_pool_size, schedulername: string = default_scheduler_name ) : this(corepoolsize, maxpoolsize, idle_worker_keep_alive_ns, schedulername) ... // 创建coroutinescheduler对象 private var coroutinescheduler = createscheduler() // 核心的分发方法 override fun dispatch(context: coroutinecontext, block: runnable): unit = try { // 调用coroutinescheduler对象的dispatch方法 coroutinescheduler.dispatch(block) } catch (e: rejectedexecutionexception) { // 只有当coroutinescheduler正在关闭时,才会拒绝执行,抛出异常 defaultexecutor.dispatch(context, block) } ... private fun createscheduler() = coroutinescheduler(corepoolsize, maxpoolsize, idleworkerkeepalivens, schedulername) ... } // 核心线程数 @jvmfield internal val core_pool_size = systemprop( "kotlinx.coroutines.scheduler.core.pool.size", available_processors.coerceatleast(2), // !!! at least two here minvalue = coroutinescheduler.min_supported_pool_size ) // 最大线程数 @jvmfield internal val max_pool_size = systemprop( "kotlinx.coroutines.scheduler.max.pool.size", (available_processors * 128).coercein( core_pool_size, coroutinescheduler.max_supported_pool_size ), maxvalue = coroutinescheduler.max_supported_pool_size ) // 空闲线程的存活时间 @jvmfield internal val idle_worker_keep_alive_ns = timeunit.seconds.tonanos( systemprop("kotlinx.coroutines.scheduler.keep.alive.sec", 60l) )
在experimentalcoroutinedispatcher类的dispatch方法内部,通过调用类型为coroutinescheduler的对象的dispatch方法实现。
二.coroutinescheduler类
1.coroutinescheduler类的继承关系
在对coroutinescheduler类的dispatch方法分析之前,首先分析一下coroutinescheduler类的继承关系,代码如下:
// 实现了executor和closeable接口 // corepoolsize线程池核心线程数 // maxpoolsize表示线程池最大线程数 // schedulername表示内部协程调度器的免费精选名字大全 // idleworkerkeepalivens表示空闲的线程存活时间 internal class coroutinescheduler( @jvmfield val corepoolsize: int, @jvmfield val maxpoolsize: int, @jvmfield val idleworkerkeepalivens: long = idle_worker_keep_alive_ns, @jvmfield val schedulername: string = default_scheduler_name ) : executor, closeable { init { // 核心线程数量必须大于等于min_supported_pool_size require(corepoolsize >= min_supported_pool_size) { "core pool size $corepoolsize should be at least $min_supported_pool_size" } // 最大线程数量必须大于等于核心线程数量 require(maxpoolsize >= corepoolsize) { "max pool size $maxpoolsize should be greater than or equals to core pool size $corepoolsize" } // 最大线程数量必须小于等于max_supported_pool_size require(maxpoolsize <= max_supported_pool_size) { "max pool size $maxpoolsize should not exceed maximal supported number of threads $max_supported_pool_size" } // 空闲的线程存活时间必须大于0 require(idleworkerkeepalivens > 0) { "idle worker keep alive time $idleworkerkeepalivens must be positive" } } ... // executor接口中的实现,通过dispatch方法实现 override fun execute(command: runnable) = dispatch(command) // closeable接口中的实现,通过shutdown方法实现 override fun close() = shutdown(10_000l) ... }
2.coroutinescheduler类的全局变量
接下来对coroutinescheduler类中重要的全局变量进行分析,代码如下:
// 用于存储全局的纯cpu(不阻塞)任务 @jvmfield val globalcpuqueue = globalqueue() // 用于存储全局的执行非纯cpu(可能阻塞)任务 @jvmfield val globalblockingqueue = globalqueue() ... // 用于记录当前处于parked状态(一段时间后自动终止)的线程的数量 private val parkedworkersstack = atomic(0l) ... // 用于保存当前线程池中的线程 // workers[0]永远为null,作为哨兵位 // index从1到maxpoolsize为有效线程 @jvmfield val workers = atomicreferencearray<worker?>(maxpoolsize + 1) ... // 控制状态 private val controlstate = atomic(corepoolsize.tolong() shl cpu_permits_shift) // 表示已经创建的线程的数量 private val createdworkers: int inline get() = (controlstate.value and created_mask).toint() // 表示可以获取的cpu令牌数量,初始值为线程池核心线程数量 private val availablecpupermits: int inline get() = availablecpupermits(controlstate.value) // 获取指定的状态的已经创建的线程的数量 private inline fun createdworkers(state: long): int = (state and created_mask).toint() // 获取指定的状态的执行阻塞任务的数量 private inline fun blockingtasks(state: long): int = (state and blocking_mask shr blocking_shift).toint() // 获取指定的状态的cpu令牌数量 public inline fun availablecpupermits(state: long): int = (state and cpu_permits_mask shr cpu_permits_shift).toint() // 当前已经创建的线程数量加1 private inline fun incrementcreatedworkers(): int = createdworkers(controlstate.incrementandget()) // 当前已经创建的线程数量减1 private inline fun decrementcreatedworkers(): int = createdworkers(controlstate.getanddecrement()) // 当前执行阻塞任务的线程数量加1 private inline fun incrementblockingtasks() = controlstate.addandget(1l shl blocking_shift) // 当前执行阻塞任务的线程数量减1 private inline fun decrementblockingtasks() { controlstate.addandget(-(1l shl blocking_shift)) } // 尝试获取cpu令牌 private inline fun tryacquirecpupermit(): boolean = controlstate.loop { state -> val available = availablecpupermits(state) if (available == 0) return false val update = state - (1l shl cpu_permits_shift) if (controlstate.compareandset(state, update)) return true } // 释放cpu令牌 private inline fun releasecpupermit() = controlstate.addandget(1l shl cpu_permits_shift) // 表示当前线程池是否关闭 private val _isterminated = atomic(false) val isterminated: boolean get() = _isterminated.value companion object { // 用于标记一个线程是否在parkedworkersstack中(处于parked状态) @jvmfield val not_in_stack = symbol("not_in_stack") // 线程的三个状态 // claimed表示线程可以执行任务 // parked表示线程暂停执行任务,一段时间后会自动进入终止状态 // terminated表示线程处于终止状态 private const val parked = -1 private const val claimed = 0 private const val terminated = 1 // 以下五个常量为掩码 private const val blocking_shift = 21 // 2x1024x1024 // 1-21位 private const val created_mask: long = (1l shl blocking_shift) - 1 // 22-42位 private const val blocking_mask: long = created_mask shl blocking_shift // 42 private const val cpu_permits_shift = blocking_shift * 2 // 43-63位 private const val cpu_permits_mask = created_mask shl cpu_permits_shift // 以下两个常量用于require中参数判断 internal const val min_supported_pool_size = 1 // 2x1024x1024-2 internal const val max_supported_pool_size = (1 shl blocking_shift) - 2 // parkedworkersstack的掩码 private const val parked_index_mask = created_mask // inv表示01反转 private const val parked_version_mask = created_mask.inv() private const val parked_version_inc = 1l shl blocking_shift }
coroutinescheduler类中对线程的状态与权限控制:
availablecpupermits的初始值为参数中核心线程数corepoolsize的值,表示coroutinescheduler类中最多只有corepoolsize个核心线程。执行纯cpu任务的线程每次执行任务之前需要在availablecpupermits中进行记录与申请。blockingtasks表示执行非纯cpu任务的数量。这部分线程在执行时不需要cpu令牌。createdworkers表示当前线程池中所有线程的数量,每个线程在创建或终止时都需要通过在这里进行记录。这些变量的具体关系如下:
createdworkers = blockingtasks + corepoolsize – availablecpupermits
cpu令牌是线程池自定义的概念,不代表时间片,只是为了保证核心线程的数量。
三.worker类与workerstate类
在分析coroutinescheduler类的dispatch方法之前,还需要分析一下coroutinescheduler类中的两个重要的内部类worker类以及其对应的状态类workerstate类。
worker是一个线程池中任务的核心执行者,几乎在所有的线程池中都存在worker的概念。
1.workerstate类
首先分析一下workerstate类,代码如下:
// 一个枚举类,表示worker的状态 enum class workerstate { // 拥有了cpu令牌,可以执行纯cpu任务,也可以执行非纯cpu任务 cpu_acquired, // 可以执行非纯cpu任务 blocking, // 当前已经暂停,一段时间后将终止,也有可能被再次使用 parking, // 休眠状态,用于初始状态,只能执行自己本地任务 dormant, // 终止状态,将不再被使用 terminated }
2.worker类的继承关系与全局变量
接下来对worker类的继承关系以及其中重要的全局变量进行分析,代码如下:
// 继承自thread类 // 私有化无参的构造方法 internal inner class worker private constructor() : thread() { init { // 标记为守护线程 isdaemon = true } // 当前线程在存储线程池线程的数组workers中的索引位置 @volatile var indexinarray = 0 set(index) { // 设置线程名 name = "$schedulername-worker-${if (index == 0) "terminated" else index.tostring()}" field = index } // 构造方法 constructor(index: int) : this() { indexinarray = index } // 获取当前线程的调度器 inline val scheduler get() = this@coroutinescheduler // 线程存储任务的本地队列 @jvmfield val localqueue: workqueue = workqueue() // 线程的状态 (内部转换) @jvmfield var state = workerstate.dormant // 线程的控制状态(外部赋予) val workerctl = atomic(claimed) // 终止截止时间,表示处于parking状态的线程,在terminationdeadline毫秒后终止 private var terminationdeadline = 0l // 表示当线程处于parking状态,进入parkedworkersstack后, // 下一个处于parking状态并进入parkedworkersstack的线程的引用 @volatile var nextparkedworker: any? = not_in_stack // 偷取其他线程的本地队列的任务的冷却时间,后面会解释 private var mindelayuntilstealabletaskns = 0l // 生成随机数,配合算法,用于任务寻找 private var rngstate = random.nextint() ... // 表示当前线程的本地队列是否有任务 @jvmfield var mayhavelocaltasks = false ... }
3.worker类的run方法
接下来分析worker类的核心方法——run方法的实现,代码入下:
override fun run() = runworker() private fun runworker() { // 用于配合mindelayuntilstealabletaskns自旋 var rescanned = false // 线程池未关闭,线程没有终止,则循环 while (!isterminated && state != workerstate.terminated) { // 寻找并获取任务 val task = findtask(mayhavelocaltasks) // 如果找到了任务 if (task != null) { // 重制两个变量 rescanned = false mindelayuntilstealabletaskns = 0l // 执行任务 executetask(task) // 继续循环 continue } else { // 如果没有找到任务,说明本地队列肯定没有任务,因为本地队列优先查找 // 设置标志位 mayhavelocaltasks = false } // 走到这里,说明没有找到任务 // 如果偷取任务的冷却时间不为0,说明之前偷到过任务 if (mindelayuntilstealabletaskns != 0l) { // 这里通过rescanned,首次mindelayuntilstealabletaskns不为0, // 不会立刻进入parking状态,而是再次去寻找任务 // 因为当过多的线程进入parking状态,再次唤起大量的线程很难控制 if (!rescanned) { rescanned = true } else {// 再次扫描,仍然没有找到任务 // 置位 rescanned = false // 尝试释放cpu令牌,并进入workerstate.parking状态 tryreleasecpu(workerstate.parking) // 清除中断标志位 interrupted() // 阻塞mindelayuntilstealabletaskns毫秒 locksupport.parknanos(mindelayuntilstealabletaskns) // 清零 mindelayuntilstealabletaskns = 0l } // 阻塞完成后继续执行 continue } // 走到这里,说明线程可能很长时间都没有执行任务了,则对其进行暂停处理 // trypark比tryreleasecpu要严格的多,会被线程会被计入到parkedworkersstack, // 同时会修改workerctl状态 trypark() } // 退出循环 // 尝试释放cpu令牌,并进入终止状态 tryreleasecpu(workerstate.terminated) }
4.worker类的任务寻找机制
接下来分析worker线程如何寻找任务,代码如下:
// 寻找任务 fun findtask(scanlocalqueue: boolean): task? { // 尝试获取cpu令牌,如果获取到了,则调用findanytask方法,寻找任务 if (tryacquirecpupermit()) return findanytask(scanlocalqueue) // 如果没有获取到cpu令牌,只能去找非纯cpu任务了 // 如果允许扫描本地的任务队列,则优先在本地队列中寻找, // 找不到则在全局队列中寻找,从队首中获取 val task = if (scanlocalqueue) { localqueue.poll() ?: globalblockingqueue.removefirstornull() } else { globalblockingqueue.removefirstornull() } // 如果在本地队列和全局队列中都找不到,则尝试去其他线程的队列里偷一个任务 return task ?: trysteal(blockingonly = true) } // 寻找cpu任务 private fun findanytask(scanlocalqueue: boolean): task? { // 如果允许扫描本地的任务队列,则在本地队列和全局队列中随机二选一, // 找不到则在全局队列中寻找,从队首中获取 if (scanlocalqueue) { // 随机确定本地队列和全局队列的优先顺序 val globalfirst = nextint(2 * corepoolsize) == 0 // 获取任务 if (globalfirst) pollglobalqueues()?.let { return it } localqueue.poll()?.let { return it } if (!globalfirst) pollglobalqueues()?.let { return it } } else { // 只能从全局获取 pollglobalqueues()?.let { return it } } // 走到这里,说明本地队列和全局队列中都找不到 // 那么就尝试去其他线程的队列里偷一个任务 return trysteal(blockingonly = false) } // 从全局队列获取任务 private fun pollglobalqueues(): task? { // 随机获取cpu任务或者非cpu任务 if (nextint(2) == 0) { // 优先获取cpu任务 globalcpuqueue.removefirstornull()?.let { return it } return globalblockingqueue.removefirstornull() } else { // 优先获取非cpu任务 globalblockingqueue.removefirstornull()?.let { return it } return globalcpuqueue.removefirstornull() } } // 偷取其他线程的本地队列的任务 // blockingonly表示是否只偷取阻塞任务 private fun trysteal(blockingonly: boolean): task? { // 只有当前线程的本地队列为空的时候,才能偷其他线程的本地队列 assert { localqueue.size == 0 } // 获取已经存在的线程的数量 val created = createdworkers // 如果线程总数为0或1,则不偷取,直接返回 // 0:需要等待初始化 // 1:避免在单线程机器上过度偷取 if (created < 2) { return null } // 随机生成一个存在的线程索引 var currentindex = nextint(created) // 默认的偷取冷却时间 var mindelay = long.max_value // 循环遍历 repeat(created) { // 每次循环索引自增,带着下一行代码表示,从位置currentindex开始偷 ++currentindex // 如果超出了,则从头继续 if (currentindex > created) currentindex = 1 // 从数组中获取线程 val worker = workers[currentindex] // 如果线程不为空,并且不是自己 if (worker !== null && worker !== this) { assert { localqueue.size == 0 } // 根据偷取的类型进行偷取 val stealresult = if (blockingonly) { // 偷取非cpu任务到本地队列中 localqueue.trystealblockingfrom(victim = worker.localqueue) } else { // 偷取任务到本地队列中 localqueue.trystealfrom(victim = worker.localqueue) } // 如果返回值为task_stolen,说明偷到了 // 如果返回值为nothing_to_steal,说明要偷的线程的本地队列是空的 if (stealresult == task_stolen) { // 从队列的队首拿出来返回 return localqueue.poll() // 如果返回值大于零,表示偷取的冷却时间,说明没有偷到 } else if (stealresult > 0) { // 说明至少还要等待stealresult时间才能偷取这个任务 // 计算偷取冷却时间 mindelay = min(mindelay, stealresult) } } } // 设置偷取等待时间 mindelayuntilstealabletaskns = if (mindelay != long.max_value) mindelay else 0 // 返回空 return null } // 基于marsaglia xorshift rng算法 // 用于在2^32-1范围内计算偷取目标 internal fun nextint(upperbound: int): int { var r = rngstate r = r xor (r shl 13) r = r xor (r shr 17) r = r xor (r shl 5) rngstate = r val mask = upperbound - 1 // fast path for power of two bound if (mask and upperbound == 0) { return r and mask } return (r and int.max_value) % upperbound }
通过对这部分代码的分析,可以知道线程在寻找任务时,首先会尝试获取cpu令牌,成为核心线程。如果线程成为了核心线程,则随机从本地或全局的两个队列中获取一个任务,获取不到则去随机偷取一个任务。如果没有获取到cpu令牌,则优先在本地获取任务,获取不到则在全局非cpu任务队列中获取任务,获取不到则去偷取一个非cpu任务。
如果偷取的任务没有达到最小的可偷取时间,则返回需要等待的时间。如果偷取任务成功,则直接加入到本地队列中。偷取的核心过程,会在后面进行分析。
5.worker类的任务执行机制
接下来分析任务被获取到后如何被执行,代码如下:
// 执行任务 private fun executetask(task: task) { // 获取任务类型,类型为纯cpu或可能阻塞 val taskmode = task.mode // 重置线程闲置状态 idlereset(taskmode) // 任务执行前 beforetask(taskmode) // 执行任务 runsafely(task) // 任务执行后 aftertask(taskmode) } // 重置线程闲置状态 private fun idlereset(mode: int) { // 重置从parking状态到terminated状态的时间 terminationdeadline = 0l // 如果当前状态为parking,说明寻找任务时没有获取到cpu令牌 if (state == workerstate.parking) { assert { mode == task_probably_blocking } // 设置状态为blocking state = workerstate.blocking } } // 任务执行前 private fun beforetask(taskmode: int) { // 如果执行的任务为纯cpu任务,说明当前线程获取到了cpu令牌,是核心线程,直接返回 if (taskmode == task_non_blocking) return // 走到这里,说明线程执行的是非纯cpu任务, // 没有cpu令牌也可以执行,因此尝试释放cpu令牌,进入workerstate.blocking if (tryreleasecpu(workerstate.blocking)) { // 如果释放cpu令牌成功,则唤起一个线程去申请cpu令牌 signalcpuwork() } } // 执行任务 fun runsafely(task: task) { try { task.run() } catch (e: throwable) { // 异常发生时,通知当前线程的异常处理handler val thread = thread.currentthread() thread.uncaughtexceptionhandler.uncaughtexception(thread, e) } finally { untracktask() } } // 任务执行后 private fun aftertask(taskmode: int) { // 如果执行的任务为纯cpu任务,说明当前线程获取到了cpu令牌,是核心线程,直接返回 if (taskmode == task_non_blocking) return // 如果执行的是非cpu任务 // 当前执行的非cpu任务数量减一 decrementblockingtasks() // 获取当前线程状态 val currentstate = state // 如果线程当前不是终止状态 if (currentstate !== workerstate.terminated) { assert { currentstate == workerstate.blocking } // 设置为休眠状态 state = workerstate.dormant } }
四.coroutinescheduler类的dispatch方法
了解worker类的工作机制后,接下来分析coroutinescheduler类的dispatch方法,代码如下:
// block表示要执行的任务 // taskcontext表示任务执行的上下文,里面包含任务的类型,和执行完成后的回调 // taildispatch表示当前任务是否进行队列尾部调度, // 当taildispatch为true时,当前block会在当前线程的本地队列里的任务全部执行完后再执行 fun dispatch(block: runnable, taskcontext: taskcontext = nonblockingcontext, taildispatch: boolean = false) { // 上报时间,timesource相关,无需关注 tracktask() // 创建任务 val task = createtask(block, taskcontext) // 获取当前的worker,可能获取不到 val currentworker = currentworker() // 将当前的任务添加到当前线程的本地队列中 val notadded = currentworker.submittolocalqueue(task, taildispatch) // 不为空,说明没有添加进去,说明当前的线程不是worker if (notadded != null) { // 将任务添加到全局队列中,如果添加失败了 if (!addtoglobalqueue(notadded)) { // 说明线程池正在关闭,抛出异常 throw rejectedexecutionexception("$schedulername was terminated") } } // skipunpark表示是否跳过唤起状态,取决于这下面两个参数 val skipunpark = taildispatch && currentworker != null // 如果当前类型为纯cpu任务 if (task.mode == task_non_blocking) { // 如果跳过唤醒,则直接返回 if (skipunpark) return // 唤醒一个执行纯cpu任务的线程 signalcpuwork() } else { // 唤醒一个执行非cpu任务的线程 signalblockingwork(skipunpark = skipunpark) } } // 创建任务 internal fun createtask(block: runnable, taskcontext: taskcontext): task { // 获取当前时间 val nanotime = schedulertimesource.nanotime() // 如果当前的block是task类型的 if (block is task) { // 重新设置提交时间和任务上下文 block.submissiontime = nanotime block.taskcontext = taskcontext // 返回 return block } // 封装成taskimpl,返回 return taskimpl(block, nanotime, taskcontext) } // 任务模型 // block表示执行的任务 // submissiontime表示任务提交时间 // taskcontext表示任务执行的上下文 internal class taskimpl( @jvmfield val block: runnable, submissiontime: long, taskcontext: taskcontext ) : task(submissiontime, taskcontext) { override fun run() { try { block.run() } finally { // 任务执行完毕后,会在同一个worker线程中回调aftertask方法 taskcontext.aftertask() } } override fun tostring(): string = "task[${block.classsimplename}@${block.hexaddress}, $submissiontime, $taskcontext]" } // 将任务添加到本地队列 private fun worker?.submittolocalqueue(task: task, taildispatch: boolean): task? { // 如果当前线程为空,则返回任务 if (this == null) return task // 如果线程处于终止状态,则返回任务 if (state === workerstate.terminated) return task // 如果任务为纯cpu任务,但是线程没有cpu令牌 if (task.mode == task_non_blocking && state === workerstate.blocking) { // 则返回任务 return task } // 标记本地队列有任务 mayhavelocaltasks = true // 添加到队列 return localqueue.add(task, fair = taildispatch) } // 添加到全局队列 private fun addtoglobalqueue(task: task): boolean { // 根据任务的类型,添加到全局队列的队尾 return if (task.isblocking) { globalblockingqueue.addlast(task) } else { globalcpuqueue.addlast(task) } } // 对当前线程进行强制转换,如果调度器也是当前的调度器则返回worker对象 private fun currentworker(): worker? = (thread.currentthread() as? worker)?.takeif { it.scheduler == this } // 唤起一个执行非纯cpu任务的线程 private fun signalblockingwork(skipunpark: boolean) { // 当前执行阻塞任务的线程数量加1,并获取当前的控制状态 val statesnapshot = incrementblockingtasks() // 如果跳过唤起,则返回 if (skipunpark) return // 尝试唤起,唤起成功,则返回 if (tryunpark()) return // 唤起失败,则根据当前的控制状态,尝试创建新线程,成功则返回 if (trycreateworker(statesnapshot)) return // 再次尝试唤起,防止多线程竞争情况下,上面的tryunpark方法正好卡在线程释放cpu令牌与进入parking状态之间 // 因为线程先释放cpu令牌,后进入parking状态 tryunpark() } // 唤起一个执行纯cpu任务的线程 internal fun signalcpuwork() { // 尝试唤起,唤起成功,则返回 if (tryunpark()) return // 唤起失败,则尝试创建新线程,成功则返回 if (trycreateworker()) return // 再次尝试唤起,防止多线程竞争情况下,上面的tryunpark方法正好卡在线程释放cpu令牌与进入parking状态之间 // 因为线程先释放cpu令牌,后进入parking状态 tryunpark() }
通过对上面的代码进行分析,可以知道coroutinescheduler类的dispatch方法,首先会对任务进行封装。正常情况下,任务都会根据类型添加到全局队列中,接着根据任务类型,随机唤起一个执行对应类型任务的线程去执行任务。
当任务执行完毕后,会回调任务中自带的aftertask方法。根据之前对limitingdispatcher的分析,可以知道,此时taildispatch参数为true,同时当前的线程也是worker线程,因此会被直接添加到线程的本地队列中,由于任务有对应的线程执行,因此跳过了唤起其他线程执行任务的阶段。这里我们可以称这个机制为尾调机制。
为什么coroutinescheduler类中要设计一个尾调机制呢?
在传统的线程池的线程充足情况下,一个任务到来时,会被分配一个线程。假设前后两个任务a与b有依赖关系,需要在执行a再执行b,这时如果两个任务同时到来,执行a任务的线程会直接执行,而执行b线程的任务可能需要被阻塞。而一旦线程阻塞会造成线程资源的浪费。而协程本质上就是多个小段程序的相互协作,因此这种场景会非常多,通过这种机制可以保证任务的执行顺序,同时减少资源浪费,而且可以最大限度的保证一个连续的任务执行在同一个线程中。
至此,dispatchers.io线程池的工作原理全部分析完毕。
五.浅谈workqueue类
1.add方法
接下来分析一些更加细节的过程。首先分析一下worker线程本地队列调用的add方法是如何添加任务的,代码如下:
// 本地队列中存储最后一次尾调的任务 private val lastscheduledtask = atomic<task?>(null) // fair表示是否公平的执行任务,fifo,默认为false fun add(task: task, fair: boolean = false): task? { // fair为true,则添加到队尾 if (fair) return addlast(task) // 如果fair为false,则从lastscheduledtask中取出上一个尾调的任务, // 并把这次的新尾调任务保存到lastscheduledtask val previous = lastscheduledtask.getandset(task) ?: return null // 如果获取上一次的尾调任务不为空,则添加到队尾 return addlast(previous) }
2.任务偷取机制
根据之前对worker类的分析,任务偷取的核心代码锁定在了workqueue类的两个方法上:一个是偷取非纯cpu任务的trystealblockingfrom方法,另一个可以偷所有类型任务的trystealfrom方法,代码如下:
internal const val buffer_capacity_base = 7 internal const val buffer_capacity = 1 shl buffer_capacity_base // 1000 0000 internal const val mask = buffer_capacity - 1 // 0111 1111 // 存储任务的数组,最多存储128 private val buffer: atomicreferencearray<task?> = atomicreferencearray(buffer_capacity) // producerindex表示上一次向任务数组中添加任务的索引 // consumerindex表示上一次消费的任务索引 // producerindex永远大于等于consumerindex // 二者差值就是当前任务数组中任务的数量 private val producerindex = atomic(0) private val consumerindex = atomic(0) // buffer中非纯cpu任务的数量(避免遍历扫描) private val blockingtasksinbuffer = atomic(0) // 偷所有类型任务 fun trystealfrom(victim: workqueue): long { assert { buffersize == 0 } // 从要偷取线程的本地队列中轮训获取一个任务 val task = victim.pollbuffer() // 如果获取到了任务 if (task != null) { // 将它添加到自己的本地队列中 val notadded = add(task) assert { notadded == null } // 返回偷取成功的标识 return task_stolen } // 如果偷取失败,尝试偷取指定线程的尾调任务 return trysteallastscheduled(victim, blockingonly = false) } // 轮训获取任务 private fun pollbuffer(): task? { // 死循环 while (true) { // 获取上一次消费的任务索引 val taillocal = consumerindex.value // 如果当前任务数组中没有多处的任务,则返回空 if (taillocal - producerindex.value == 0) return null // 计算偷取位置,防止数组过界 val index = taillocal and mask // 通过cas方式,将consumerindex加一,表示下一次要从taillocal + 1处开始偷取 if (consumerindex.compareandset(taillocal, taillocal + 1)) { // 从偷取位置初取出任务,如果偷取的任务为空,则继续循环 val value = buffer.getandset(index, null) ?: continue // 偷取成功 // 若任务为阻塞任务,blockingtasksinbuffer的值减一 value.decrementifblocking() // 返回任务 return value } } } // 偷取非纯cpu任务 fun trystealblockingfrom(victim: workqueue): long { assert { buffersize == 0 } // 从consumerindex位置开始偷 var start = victim.consumerindex.value // 偷到producerindex处截止 val end = victim.producerindex.value // 获取任务数组 val buffer = victim.buffer // 循环偷取 while (start != end) { // 计算偷取位置,防止数组过界 val index = start and mask // 如果非纯cpu任务数为0,则直接退出循环 if (victim.blockingtasksinbuffer.value == 0) break // 获取index处的任务 val value = buffer[index] // 如果任务存在,而且是非纯cpu任务,同时成功的通过cas设置为空 if (value != null && value.isblocking && buffer.compareandset(index, value, null)) { // blockingtasksinbuffer的值减一 victim.blockingtasksinbuffer.decrementandget() // 将偷取的任务添加到当前线程的本地队列中 add(value) // 返回偷取成功标识 return task_stolen } else { // 如果偷取失败,自增再次循环,从下一个位置开始偷 ++start } } // 如果从任务数组中偷取失败,尝试偷取指定线程的尾调任务 return trysteallastscheduled(victim, blockingonly = true) } // 偷取指定线程的尾调任务 private fun trysteallastscheduled(victim: workqueue, blockingonly: boolean): long { // 死循环 while (true) { // 获取指定线程的尾调任务,如果任务不存在,则返回偷取失败标识符 val lastscheduled = victim.lastscheduledtask.value ?: return nothing_to_steal // 如果要偷取的是非纯cpu任务,但是任务类型为纯cpu任务,说明只有核心线程才能偷 // 返回偷取失败标识符 if (blockingonly && !lastscheduled.isblocking) return nothing_to_steal // 获取当前时间 val time = schedulertimesource.nanotime() //计算任务从添加开始到现在经过的时长 val staleness = time - lastscheduled.submissiontime // 如果时长小于偷取冷却时间 if (staleness < work_stealing_time_resolution_ns) { // 返回当前线程需要等待的时间 return work_stealing_time_resolution_ns - staleness } // 通过cas,将lastscheduledtask设置为空,防止被其他线程执行 if (victim.lastscheduledtask.compareandset(lastscheduled, null)) { // 偷取成功,加入到当前线程的队列中 add(lastscheduled) // 返回偷取成功表示 return task_stolen } // 继续循环 continue } } // 偷取冷却时间,尾调任务从添加开始, // 最少经过work_stealing_time_resolution_ns时间才可以被偷 @jvmfield internal val work_stealing_time_resolution_ns = systemprop( "kotlinx.coroutines.scheduler.resolution.ns", 100000l )
六.总结
1.两个线程池
coroutinescheduler类是核心的线程池,用于任务的执行。limitingdispatcher类对coroutinescheduler类进行代理,是coroutinescheduler类尾调机制的使用者,对任务进行初步排队。
2.四种队列
limitingdispatcher类中的任务队列。coroutinescheduler类中的两个全局队列。worker类中的本地队列。
3.尾调机制
一个任务执行完,可以通过回调,在同一个worker线程中再存储一个待执行任务,该任务将在worker线程本地队列目前已存在的任务,执行完毕后再执行。
4.任务分类与权限控制
所有任务分成纯cpu任务和非纯cpu任务两种,对应着核心线程和非核心线程。
所有线程在执行前都先尝试成为核心线程,核心线程可以从两种任务中任意选择执行,非核心线程只能执行非纯cpu任务。核心线程如果选择执行非纯cpu任务会变成非核心线程
5.任务偷取机制
workqueue类根据随机算法提供任务偷取机制,一个worker线程可以从其他worker线程的本地队列中偷取任务。
6.执行梳理图
到此这篇关于android dispatchers.io线程池深入刨析的文章就介绍到这了,更多相关android dispatchers.io内容请搜索<计算机技术网(www.ctvol.com)!!>以前的文章或继续浏览下面的相关文章希望大家以后多多支持<计算机技术网(www.ctvol.com)!!>!
本文来自网络收集,不代表计算机技术网立场,如涉及侵权请联系管理员删除。
ctvol管理联系方式QQ:251552304
本文章地址:https://www.ctvol.com/addevelopment/1209175.html