android开发分享Android Dispatchers.IO线程池深入刨析

一. dispatchers.io1.dispatchers.io在协程中,当需要执行io任务时,会在上下文中指定dispatchers.io来进行线程的切换调度。 而io实际上是coroutined

上述就是android开发分享Android Dispatchers.IO线程池深入刨析的全部内容,如果对大家有所用处且需要了解更多关于Android学习教程,希望大家多多关注—计算机技术网(www.ctvol.com)!

一. dispatchers.io

1.dispatchers.io

在协程中,当需要执行io任务时,会在上下文中指定dispatchers.io来进行线程的切换调度。 而io实际上是coroutinedispatcher类型的对象,实际的值为defaultscheduler类的常量对象io,代码如下:

public actual object dispatchers {      ...      @jvmstatic      public val io: coroutinedispatcher = defaultscheduler.io  }  

2.defaultscheduler类

defaultscheduler类继承自experimentalcoroutinedispatcher类,内部提供了类型为limitingdispatcher的io对象,代码如下:

// 系统配置变量  public const val io_parallelism_property_name: string = "kotlinx.coroutines.io.parallelism"  ...  // 表示不会阻塞的任务,纯cpu任务  internal const val task_non_blocking = 0  // 表示执行过程中可能会阻塞的任务,非纯cpu任务  internal const val task_probably_blocking = 1  ...  // 默认线程池名称  internal const val default_dispatcher_name = "dispatchers.default"  ...  internal object defaultscheduler : experimentalcoroutinedispatcher() {      // 创建名为dispatchers.io的线程池      // 最大并发数量为kotlinx.coroutines.io.parallelism指定的值,默认为64与cpu数量中的较大者      // 默认的执行的任务类型为task_probably_blocking      val io: coroutinedispatcher = limitingdispatcher(          this,          systemprop(io_parallelism_property_name, 64.coerceatleast(available_processors)),          "dispatchers.io",          task_probably_blocking      )      override fun close() {          throw unsupportedoperationexception("$default_dispatcher_name cannot be closed")      }      // 可以看出io和default共用一个线程池      override fun tostring(): string = default_dispatcher_name      @internalcoroutinesapi      @suppress("unused")      public fun todebugstring(): string = super.tostring()  }

3.limitingdispatcher类

limitingdispatcher类继承自executorcoroutinedispatcher类,实现了taskcontext接口和executor接口。

limitingdispatcher类的核心是构造方法中类型为experimentalcoroutinedispatcher的dispatcher对象。

limitingdispatcher类看起来是一个标准的线程池,但实际上limitingdispatcher类只对类参数中传入的dispatcher进行包装和功能扩展。如同免费精选名字大全中的litmit一样,limitingdispatcher类主要用于对任务执行数量进行限制,代码如下:

// dispatcher参数传入了defaultscheduler对象  // parallelism表示并发执行的任务数量  // name表示线程池的免费精选名字大全  // taskmode表示任务模式,taskcontext接口中的常量  private class limitingdispatcher(      private val dispatcher: experimentalcoroutinedispatcher,      private val parallelism: int,      private val name: string?,      override val taskmode: int  ) : executorcoroutinedispatcher(), taskcontext, executor {      // 用于保存任务的队列      private val queue = concurrentlinkedqueue<runnable>()      // 用于记录当前正在执行的任务的数量      private val inflighttasks = atomic(0)      // 获取当前线程池      override val executor: executor          get() = this      // executor接口的实现,线程池的核心方法,通过dispatch实现      override fun execute(command: runnable) = dispatch(command, false)      override fun close(): unit = error("close cannot be invoked on limitingblockingdispatcher")      // coroutinedispatcher接口的实现      override fun dispatch(context: coroutinecontext, block: runnable) = dispatch(block, false)      // 任务分发的核心方法      private fun dispatch(block: runnable, taildispatch: boolean) {          // 获取当前要执行的任务          var tasktoschedule = block          // 死循环          while (true) {              // 当前执行的任务数加一,也可理解生成生成当前要执行的任务的编号              val inflight = inflighttasks.incrementandget()              // 如果当前需要执行的任务数小于允许的并发执行任务数量,说明可以执行,              if (inflight <= parallelism) {                  // 调用参数中的dispatcher对象,执行任务                  dispatcher.dispatchwithcontext(tasktoschedule, this, taildispatch)                  // 返回,退出循环                  return              }              // 如果达到的最大并发数的限制,则将任务加入到队列中              queue.add(tasktoschedule)              // 下面的代码防止线程竞争导致任务卡在队列里不被执行,case如下:              // 线程1:inflighttasks = 1 ,执行任务              // 线程2:inflighttasks = 2,当前达到了parallelism限制,              // 线程1:执行结束,inflighttasks = 1              // 线程2:将任务添加到队列里,执行结束,inflighttasks = 0              // 由于未执行,因此这里当前执行的任务数先减一              // 减一后如果仍然大于等于在大并发数,则直接返回,退出循环              if (inflighttasks.decrementandget() >= parallelism) {                  return              }              // 如果减一后,发现可以执行任务,则从队首获取任务,进行下一次循环              // 如果队列为空,说明没有任务,则返回,退出循环              tasktoschedule = queue.poll() ?: return          }      }      // coroutinedispatcher接口的实现,用于yield挂起协程时的调度处理      override fun dispatchyield(context: coroutinecontext, block: runnable) {          // 也是通过dispatch方法实现,注意这里taildispatch参数为true          dispatch(block, taildispatch = true)      }      override fun tostring(): string {          return name ?: "${super.tostring()}[dispatcher = $dispatcher]"      }      // taskcontext接口的实现,用于在一个任务执行完进行回调      override fun aftertask() {          // 从队首获取一个任务          var next = queue.poll()          // 若可以获取到          if (next != null) {              // 则执行任务,注意这里taildispatch参数为true              dispatcher.dispatchwithcontext(next, this, true)              // 返回              return          }          // 任务执行完毕,当前执行的任务数量减一          inflighttasks.decrementandget()          // 下面的代码防止线程竞争导致任务卡在队列里不被执行,case如下:          // 线程1:inflighttasks = 1 ,执行任务          // 线程2:inflighttasks = 2          // 线程1:执行结束,执行aftertask方法,发现队列为空,此时inflighttasks = 2          // 线程2:inflighttasks当前达到了parallelism限制,          //      将任务加入到队列中,执行结束,inflighttasks = 1          // 线程1:inflighttasks=1,执行结束          // 从队列中取出任务,队列为空则返回          next = queue.poll() ?: return          // 执行任务,注意这里taildispatch参数为true          dispatch(next, true)      }  }

dispatcher的dispatch方法定义在experimentalcoroutinedispatcher类中。

4.experimentalcoroutinedispatcher类

experimentalcoroutinedispatcher类继承自executorcoroutinedispatcher类,代码如下:

// corepoolsize线程池核心线程数  // maxpoolsize表示线程池最大线程数  // schedulername表示内部协程调度器的免费精选名字大全  // idleworkerkeepalivens表示空闲的线程存活时间  @internalcoroutinesapi  public open class experimentalcoroutinedispatcher(      private val corepoolsize: int,      private val maxpoolsize: int,      private val idleworkerkeepalivens: long,      private val schedulername: string = "coroutinescheduler"  ) : executorcoroutinedispatcher() {      // 我们在defaultscheduler类中就是通过默认的构造方法,      // 创建的父类experimentalcoroutinedispatcher对象      public constructor(          corepoolsize: int = core_pool_size,          maxpoolsize: int = max_pool_size,          schedulername: string = default_scheduler_name      ) : this(corepoolsize, maxpoolsize, idle_worker_keep_alive_ns, schedulername)      ...      // 创建coroutinescheduler对象      private var coroutinescheduler = createscheduler()      // 核心的分发方法      override fun dispatch(context: coroutinecontext, block: runnable): unit =          try {              // 调用coroutinescheduler对象的dispatch方法              coroutinescheduler.dispatch(block)          } catch (e: rejectedexecutionexception) {              // 只有当coroutinescheduler正在关闭时,才会拒绝执行,抛出异常              defaultexecutor.dispatch(context, block)          }      ...      private fun createscheduler() = coroutinescheduler(corepoolsize, maxpoolsize, idleworkerkeepalivens, schedulername)      ...  }  // 核心线程数  @jvmfield  internal val core_pool_size = systemprop(      "kotlinx.coroutines.scheduler.core.pool.size",      available_processors.coerceatleast(2), // !!! at least two here      minvalue = coroutinescheduler.min_supported_pool_size  )  // 最大线程数  @jvmfield  internal val max_pool_size = systemprop(      "kotlinx.coroutines.scheduler.max.pool.size",      (available_processors * 128).coercein(          core_pool_size,          coroutinescheduler.max_supported_pool_size      ),      maxvalue = coroutinescheduler.max_supported_pool_size  )  // 空闲线程的存活时间  @jvmfield  internal val idle_worker_keep_alive_ns = timeunit.seconds.tonanos(      systemprop("kotlinx.coroutines.scheduler.keep.alive.sec", 60l)  )

在experimentalcoroutinedispatcher类的dispatch方法内部,通过调用类型为coroutinescheduler的对象的dispatch方法实现。

二.coroutinescheduler类

1.coroutinescheduler类的继承关系

在对coroutinescheduler类的dispatch方法分析之前,首先分析一下coroutinescheduler类的继承关系,代码如下:

// 实现了executor和closeable接口  // corepoolsize线程池核心线程数  // maxpoolsize表示线程池最大线程数  // schedulername表示内部协程调度器的免费精选名字大全  // idleworkerkeepalivens表示空闲的线程存活时间  internal class coroutinescheduler(      @jvmfield val corepoolsize: int,      @jvmfield val maxpoolsize: int,      @jvmfield val idleworkerkeepalivens: long = idle_worker_keep_alive_ns,      @jvmfield val schedulername: string = default_scheduler_name  ) : executor, closeable {      init {          // 核心线程数量必须大于等于min_supported_pool_size          require(corepoolsize >= min_supported_pool_size) {              "core pool size $corepoolsize should be at least $min_supported_pool_size"          }          // 最大线程数量必须大于等于核心线程数量          require(maxpoolsize >= corepoolsize) {              "max pool size $maxpoolsize should be greater than or equals to core pool size $corepoolsize"          }          // 最大线程数量必须小于等于max_supported_pool_size          require(maxpoolsize <= max_supported_pool_size) {              "max pool size $maxpoolsize should not exceed maximal supported number of threads $max_supported_pool_size"          }          // 空闲的线程存活时间必须大于0          require(idleworkerkeepalivens > 0) {              "idle worker keep alive time $idleworkerkeepalivens must be positive"          }      }      ...      // executor接口中的实现,通过dispatch方法实现      override fun execute(command: runnable) = dispatch(command)      // closeable接口中的实现,通过shutdown方法实现      override fun close() = shutdown(10_000l)      ...  }

2.coroutinescheduler类的全局变量

接下来对coroutinescheduler类中重要的全局变量进行分析,代码如下:

// 用于存储全局的纯cpu(不阻塞)任务  @jvmfield  val globalcpuqueue = globalqueue()  // 用于存储全局的执行非纯cpu(可能阻塞)任务  @jvmfield  val globalblockingqueue = globalqueue()  ...  // 用于记录当前处于parked状态(一段时间后自动终止)的线程的数量  private val parkedworkersstack = atomic(0l)  ...  // 用于保存当前线程池中的线程  // workers[0]永远为null,作为哨兵位  // index从1到maxpoolsize为有效线程  @jvmfield  val workers = atomicreferencearray<worker?>(maxpoolsize + 1)  ...  // 控制状态  private val controlstate = atomic(corepoolsize.tolong() shl cpu_permits_shift)  // 表示已经创建的线程的数量  private val createdworkers: int inline get() = (controlstate.value and created_mask).toint()  // 表示可以获取的cpu令牌数量,初始值为线程池核心线程数量  private val availablecpupermits: int inline get() = availablecpupermits(controlstate.value)  // 获取指定的状态的已经创建的线程的数量  private inline fun createdworkers(state: long): int = (state and created_mask).toint()  // 获取指定的状态的执行阻塞任务的数量  private inline fun blockingtasks(state: long): int = (state and blocking_mask shr blocking_shift).toint()  // 获取指定的状态的cpu令牌数量  public inline fun availablecpupermits(state: long): int = (state and cpu_permits_mask shr cpu_permits_shift).toint()  // 当前已经创建的线程数量加1  private inline fun incrementcreatedworkers(): int = createdworkers(controlstate.incrementandget())  // 当前已经创建的线程数量减1  private inline fun decrementcreatedworkers(): int = createdworkers(controlstate.getanddecrement())  // 当前执行阻塞任务的线程数量加1  private inline fun incrementblockingtasks() = controlstate.addandget(1l shl blocking_shift)  // 当前执行阻塞任务的线程数量减1  private inline fun decrementblockingtasks() {      controlstate.addandget(-(1l shl blocking_shift))  }  // 尝试获取cpu令牌  private inline fun tryacquirecpupermit(): boolean = controlstate.loop { state ->      val available = availablecpupermits(state)      if (available == 0) return false      val update = state - (1l shl cpu_permits_shift)      if (controlstate.compareandset(state, update)) return true  }  // 释放cpu令牌  private inline fun releasecpupermit() = controlstate.addandget(1l shl cpu_permits_shift)  // 表示当前线程池是否关闭  private val _isterminated = atomic(false)  val isterminated: boolean get() = _isterminated.value  companion object {      // 用于标记一个线程是否在parkedworkersstack中(处于parked状态)      @jvmfield      val not_in_stack = symbol("not_in_stack")      // 线程的三个状态      // claimed表示线程可以执行任务      // parked表示线程暂停执行任务,一段时间后会自动进入终止状态      // terminated表示线程处于终止状态      private const val parked = -1      private const val claimed = 0      private const val terminated = 1      // 以下五个常量为掩码      private const val blocking_shift = 21 // 2x1024x1024      // 1-21位      private const val created_mask: long = (1l shl blocking_shift) - 1      // 22-42位      private const val blocking_mask: long = created_mask shl blocking_shift      // 42      private const val cpu_permits_shift = blocking_shift * 2      // 43-63位      private const val cpu_permits_mask = created_mask shl cpu_permits_shift      // 以下两个常量用于require中参数判断      internal const val min_supported_pool_size = 1      // 2x1024x1024-2      internal const val max_supported_pool_size = (1 shl blocking_shift) - 2      // parkedworkersstack的掩码      private const val parked_index_mask = created_mask      // inv表示01反转      private const val parked_version_mask = created_mask.inv()      private const val parked_version_inc = 1l shl blocking_shift  }

coroutinescheduler类中对线程的状态与权限控制:

Android Dispatchers.IO线程池深入刨析

availablecpupermits的初始值为参数中核心线程数corepoolsize的值,表示coroutinescheduler类中最多只有corepoolsize个核心线程。执行纯cpu任务的线程每次执行任务之前需要在availablecpupermits中进行记录与申请。blockingtasks表示执行非纯cpu任务的数量。这部分线程在执行时不需要cpu令牌。createdworkers表示当前线程池中所有线程的数量,每个线程在创建或终止时都需要通过在这里进行记录。这些变量的具体关系如下:

createdworkers = blockingtasks + corepoolsize – availablecpupermits

cpu令牌是线程池自定义的概念,不代表时间片,只是为了保证核心线程的数量。

三.worker类与workerstate类

在分析coroutinescheduler类的dispatch方法之前,还需要分析一下coroutinescheduler类中的两个重要的内部类worker类以及其对应的状态类workerstate类。

worker是一个线程池中任务的核心执行者,几乎在所有的线程池中都存在worker的概念。

1.workerstate类

首先分析一下workerstate类,代码如下:

// 一个枚举类,表示worker的状态  enum class workerstate {      // 拥有了cpu令牌,可以执行纯cpu任务,也可以执行非纯cpu任务      cpu_acquired,      // 可以执行非纯cpu任务      blocking,      // 当前已经暂停,一段时间后将终止,也有可能被再次使用      parking,      // 休眠状态,用于初始状态,只能执行自己本地任务      dormant,      // 终止状态,将不再被使用      terminated  }

2.worker类的继承关系与全局变量

接下来对worker类的继承关系以及其中重要的全局变量进行分析,代码如下:

// 继承自thread类  // 私有化无参的构造方法  internal inner class worker private constructor() : thread() {      init {          // 标记为守护线程          isdaemon = true      }      // 当前线程在存储线程池线程的数组workers中的索引位置      @volatile      var indexinarray = 0          set(index) {              // 设置线程名              name = "$schedulername-worker-${if (index == 0) "terminated" else index.tostring()}"              field = index          }      // 构造方法      constructor(index: int) : this() {          indexinarray = index      }      // 获取当前线程的调度器      inline val scheduler get() = this@coroutinescheduler      // 线程存储任务的本地队列      @jvmfield      val localqueue: workqueue = workqueue()      // 线程的状态 (内部转换)      @jvmfield      var state = workerstate.dormant      // 线程的控制状态(外部赋予)      val workerctl = atomic(claimed)      // 终止截止时间,表示处于parking状态的线程,在terminationdeadline毫秒后终止      private var terminationdeadline = 0l      // 表示当线程处于parking状态,进入parkedworkersstack后,      // 下一个处于parking状态并进入parkedworkersstack的线程的引用      @volatile      var nextparkedworker: any? = not_in_stack      // 偷取其他线程的本地队列的任务的冷却时间,后面会解释      private var mindelayuntilstealabletaskns = 0l      // 生成随机数,配合算法,用于任务寻找      private var rngstate = random.nextint()      ...      // 表示当前线程的本地队列是否有任务      @jvmfield      var mayhavelocaltasks = false      ...  }

3.worker类的run方法

接下来分析worker类的核心方法——run方法的实现,代码入下:

override fun run() = runworker()  private fun runworker() {      // 用于配合mindelayuntilstealabletaskns自旋      var rescanned = false      // 线程池未关闭,线程没有终止,则循环      while (!isterminated && state != workerstate.terminated) {          // 寻找并获取任务          val task = findtask(mayhavelocaltasks)          // 如果找到了任务          if (task != null) {              // 重制两个变量              rescanned = false              mindelayuntilstealabletaskns = 0l              // 执行任务              executetask(task)              // 继续循环              continue          } else { // 如果没有找到任务,说明本地队列肯定没有任务,因为本地队列优先查找              // 设置标志位              mayhavelocaltasks = false          }          // 走到这里,说明没有找到任务          // 如果偷取任务的冷却时间不为0,说明之前偷到过任务          if (mindelayuntilstealabletaskns != 0l) {              // 这里通过rescanned,首次mindelayuntilstealabletaskns不为0,              // 不会立刻进入parking状态,而是再次去寻找任务              // 因为当过多的线程进入parking状态,再次唤起大量的线程很难控制              if (!rescanned) {                  rescanned = true              } else {// 再次扫描,仍然没有找到任务                  // 置位                  rescanned = false                  // 尝试释放cpu令牌,并进入workerstate.parking状态                  tryreleasecpu(workerstate.parking)                  // 清除中断标志位                  interrupted()                  // 阻塞mindelayuntilstealabletaskns毫秒                  locksupport.parknanos(mindelayuntilstealabletaskns)                  // 清零                  mindelayuntilstealabletaskns = 0l              }              // 阻塞完成后继续执行              continue          }          // 走到这里,说明线程可能很长时间都没有执行任务了,则对其进行暂停处理          // trypark比tryreleasecpu要严格的多,会被线程会被计入到parkedworkersstack,          // 同时会修改workerctl状态          trypark()      }      // 退出循环      // 尝试释放cpu令牌,并进入终止状态      tryreleasecpu(workerstate.terminated)  }

4.worker类的任务寻找机制

接下来分析worker线程如何寻找任务,代码如下:

// 寻找任务  fun findtask(scanlocalqueue: boolean): task? {      // 尝试获取cpu令牌,如果获取到了,则调用findanytask方法,寻找任务      if (tryacquirecpupermit()) return findanytask(scanlocalqueue)      // 如果没有获取到cpu令牌,只能去找非纯cpu任务了      // 如果允许扫描本地的任务队列,则优先在本地队列中寻找,      // 找不到则在全局队列中寻找,从队首中获取      val task = if (scanlocalqueue) {          localqueue.poll() ?: globalblockingqueue.removefirstornull()      } else {          globalblockingqueue.removefirstornull()      }      // 如果在本地队列和全局队列中都找不到,则尝试去其他线程的队列里偷一个任务      return task ?: trysteal(blockingonly = true)  }  // 寻找cpu任务  private fun findanytask(scanlocalqueue: boolean): task? {      // 如果允许扫描本地的任务队列,则在本地队列和全局队列中随机二选一,      // 找不到则在全局队列中寻找,从队首中获取      if (scanlocalqueue) {          // 随机确定本地队列和全局队列的优先顺序          val globalfirst = nextint(2 * corepoolsize) == 0          // 获取任务          if (globalfirst) pollglobalqueues()?.let { return it }          localqueue.poll()?.let { return it }          if (!globalfirst) pollglobalqueues()?.let { return it }      } else {          // 只能从全局获取          pollglobalqueues()?.let { return it }      }      // 走到这里,说明本地队列和全局队列中都找不到      // 那么就尝试去其他线程的队列里偷一个任务      return trysteal(blockingonly = false)  }  // 从全局队列获取任务  private fun pollglobalqueues(): task? {      // 随机获取cpu任务或者非cpu任务      if (nextint(2) == 0) {          // 优先获取cpu任务          globalcpuqueue.removefirstornull()?.let { return it }          return globalblockingqueue.removefirstornull()      } else {          // 优先获取非cpu任务          globalblockingqueue.removefirstornull()?.let { return it }          return globalcpuqueue.removefirstornull()      }  }  // 偷取其他线程的本地队列的任务  // blockingonly表示是否只偷取阻塞任务  private fun trysteal(blockingonly: boolean): task? {      // 只有当前线程的本地队列为空的时候,才能偷其他线程的本地队列      assert { localqueue.size == 0 }      // 获取已经存在的线程的数量      val created = createdworkers      // 如果线程总数为0或1,则不偷取,直接返回      // 0:需要等待初始化      // 1:避免在单线程机器上过度偷取      if (created < 2) {          return null      }      // 随机生成一个存在的线程索引      var currentindex = nextint(created)      // 默认的偷取冷却时间      var mindelay = long.max_value      // 循环遍历      repeat(created) {          // 每次循环索引自增,带着下一行代码表示,从位置currentindex开始偷          ++currentindex          // 如果超出了,则从头继续          if (currentindex > created) currentindex = 1          // 从数组中获取线程          val worker = workers[currentindex]          // 如果线程不为空,并且不是自己          if (worker !== null && worker !== this) {                 assert { localqueue.size == 0 }              // 根据偷取的类型进行偷取              val stealresult = if (blockingonly) {                  // 偷取非cpu任务到本地队列中                  localqueue.trystealblockingfrom(victim = worker.localqueue)              } else {                  // 偷取任务到本地队列中                  localqueue.trystealfrom(victim = worker.localqueue)              }              // 如果返回值为task_stolen,说明偷到了              // 如果返回值为nothing_to_steal,说明要偷的线程的本地队列是空的              if (stealresult == task_stolen) {                  // 从队列的队首拿出来返回                  return localqueue.poll()              // 如果返回值大于零,表示偷取的冷却时间,说明没有偷到                } else if (stealresult > 0) { // 说明至少还要等待stealresult时间才能偷取这个任务                  // 计算偷取冷却时间                  mindelay = min(mindelay, stealresult)              }          }      }      // 设置偷取等待时间      mindelayuntilstealabletaskns = if (mindelay != long.max_value) mindelay else 0      // 返回空      return null  }  // 基于marsaglia xorshift rng算法  // 用于在2^32-1范围内计算偷取目标  internal fun nextint(upperbound: int): int {      var r = rngstate      r = r xor (r shl 13)      r = r xor (r shr 17)      r = r xor (r shl 5)      rngstate = r      val mask = upperbound - 1      // fast path for power of two bound      if (mask and upperbound == 0) {          return r and mask      }      return (r and int.max_value) % upperbound  }

通过对这部分代码的分析,可以知道线程在寻找任务时,首先会尝试获取cpu令牌,成为核心线程。如果线程成为了核心线程,则随机从本地或全局的两个队列中获取一个任务,获取不到则去随机偷取一个任务。如果没有获取到cpu令牌,则优先在本地获取任务,获取不到则在全局非cpu任务队列中获取任务,获取不到则去偷取一个非cpu任务。

如果偷取的任务没有达到最小的可偷取时间,则返回需要等待的时间。如果偷取任务成功,则直接加入到本地队列中。偷取的核心过程,会在后面进行分析。

5.worker类的任务执行机制

接下来分析任务被获取到后如何被执行,代码如下:

// 执行任务  private fun executetask(task: task) {      // 获取任务类型,类型为纯cpu或可能阻塞      val taskmode = task.mode      // 重置线程闲置状态      idlereset(taskmode)      // 任务执行前      beforetask(taskmode)      // 执行任务      runsafely(task)      // 任务执行后      aftertask(taskmode)  }  // 重置线程闲置状态  private fun idlereset(mode: int) {      // 重置从parking状态到terminated状态的时间      terminationdeadline = 0l      // 如果当前状态为parking,说明寻找任务时没有获取到cpu令牌      if (state == workerstate.parking) {          assert { mode == task_probably_blocking }          // 设置状态为blocking          state = workerstate.blocking      }  }  // 任务执行前  private fun beforetask(taskmode: int) {      // 如果执行的任务为纯cpu任务,说明当前线程获取到了cpu令牌,是核心线程,直接返回      if (taskmode == task_non_blocking) return      // 走到这里,说明线程执行的是非纯cpu任务,      // 没有cpu令牌也可以执行,因此尝试释放cpu令牌,进入workerstate.blocking      if (tryreleasecpu(workerstate.blocking)) {          // 如果释放cpu令牌成功,则唤起一个线程去申请cpu令牌          signalcpuwork()      }  }  // 执行任务  fun runsafely(task: task) {      try {          task.run()      } catch (e: throwable) {          // 异常发生时,通知当前线程的异常处理handler          val thread = thread.currentthread()          thread.uncaughtexceptionhandler.uncaughtexception(thread, e)      } finally {          untracktask()      }  }  // 任务执行后  private fun aftertask(taskmode: int) {      // 如果执行的任务为纯cpu任务,说明当前线程获取到了cpu令牌,是核心线程,直接返回      if (taskmode == task_non_blocking) return      // 如果执行的是非cpu任务      // 当前执行的非cpu任务数量减一      decrementblockingtasks()      // 获取当前线程状态      val currentstate = state      // 如果线程当前不是终止状态      if (currentstate !== workerstate.terminated) {          assert { currentstate == workerstate.blocking }          // 设置为休眠状态          state = workerstate.dormant      }  }

四.coroutinescheduler类的dispatch方法

了解worker类的工作机制后,接下来分析coroutinescheduler类的dispatch方法,代码如下:

// block表示要执行的任务  // taskcontext表示任务执行的上下文,里面包含任务的类型,和执行完成后的回调  // taildispatch表示当前任务是否进行队列尾部调度,  // 当taildispatch为true时,当前block会在当前线程的本地队列里的任务全部执行完后再执行  fun dispatch(block: runnable, taskcontext: taskcontext = nonblockingcontext, taildispatch: boolean = false) {      // 上报时间,timesource相关,无需关注      tracktask()      // 创建任务      val task = createtask(block, taskcontext)      // 获取当前的worker,可能获取不到      val currentworker = currentworker()      // 将当前的任务添加到当前线程的本地队列中      val notadded = currentworker.submittolocalqueue(task, taildispatch)      // 不为空,说明没有添加进去,说明当前的线程不是worker      if (notadded != null) {           // 将任务添加到全局队列中,如果添加失败了          if (!addtoglobalqueue(notadded)) {              // 说明线程池正在关闭,抛出异常              throw rejectedexecutionexception("$schedulername was terminated")          }      }      // skipunpark表示是否跳过唤起状态,取决于这下面两个参数      val skipunpark = taildispatch && currentworker != null      // 如果当前类型为纯cpu任务      if (task.mode == task_non_blocking) {          // 如果跳过唤醒,则直接返回          if (skipunpark) return          // 唤醒一个执行纯cpu任务的线程          signalcpuwork()      } else {          // 唤醒一个执行非cpu任务的线程          signalblockingwork(skipunpark = skipunpark)      }  }  // 创建任务  internal fun createtask(block: runnable, taskcontext: taskcontext): task {      // 获取当前时间      val nanotime = schedulertimesource.nanotime()      // 如果当前的block是task类型的      if (block is task) {          // 重新设置提交时间和任务上下文          block.submissiontime = nanotime          block.taskcontext = taskcontext          // 返回          return block      }      // 封装成taskimpl,返回      return taskimpl(block, nanotime, taskcontext)  }  // 任务模型  // block表示执行的任务  // submissiontime表示任务提交时间  // taskcontext表示任务执行的上下文  internal class taskimpl(      @jvmfield val block: runnable,      submissiontime: long,      taskcontext: taskcontext  ) : task(submissiontime, taskcontext) {      override fun run() {          try {              block.run()          } finally {              // 任务执行完毕后,会在同一个worker线程中回调aftertask方法              taskcontext.aftertask()          }      }      override fun tostring(): string =          "task[${block.classsimplename}@${block.hexaddress}, $submissiontime, $taskcontext]"  }  // 将任务添加到本地队列  private fun worker?.submittolocalqueue(task: task, taildispatch: boolean): task? {      // 如果当前线程为空,则返回任务      if (this == null) return task      // 如果线程处于终止状态,则返回任务      if (state === workerstate.terminated) return task      // 如果任务为纯cpu任务,但是线程没有cpu令牌      if (task.mode == task_non_blocking && state === workerstate.blocking) {          // 则返回任务          return task      }      // 标记本地队列有任务      mayhavelocaltasks = true      // 添加到队列      return localqueue.add(task, fair = taildispatch)  }  // 添加到全局队列  private fun addtoglobalqueue(task: task): boolean {      // 根据任务的类型,添加到全局队列的队尾      return if (task.isblocking) {          globalblockingqueue.addlast(task)      } else {          globalcpuqueue.addlast(task)      }  }  // 对当前线程进行强制转换,如果调度器也是当前的调度器则返回worker对象  private fun currentworker(): worker? = (thread.currentthread() as? worker)?.takeif { it.scheduler == this }  // 唤起一个执行非纯cpu任务的线程  private fun signalblockingwork(skipunpark: boolean) {      // 当前执行阻塞任务的线程数量加1,并获取当前的控制状态      val statesnapshot = incrementblockingtasks()      // 如果跳过唤起,则返回      if (skipunpark) return      // 尝试唤起,唤起成功,则返回      if (tryunpark()) return      // 唤起失败,则根据当前的控制状态,尝试创建新线程,成功则返回      if (trycreateworker(statesnapshot)) return      // 再次尝试唤起,防止多线程竞争情况下,上面的tryunpark方法正好卡在线程释放cpu令牌与进入parking状态之间      // 因为线程先释放cpu令牌,后进入parking状态      tryunpark()  }  // 唤起一个执行纯cpu任务的线程  internal fun signalcpuwork() {      // 尝试唤起,唤起成功,则返回      if (tryunpark()) return      // 唤起失败,则尝试创建新线程,成功则返回      if (trycreateworker()) return      // 再次尝试唤起,防止多线程竞争情况下,上面的tryunpark方法正好卡在线程释放cpu令牌与进入parking状态之间      // 因为线程先释放cpu令牌,后进入parking状态      tryunpark()  }

通过对上面的代码进行分析,可以知道coroutinescheduler类的dispatch方法,首先会对任务进行封装。正常情况下,任务都会根据类型添加到全局队列中,接着根据任务类型,随机唤起一个执行对应类型任务的线程去执行任务。

当任务执行完毕后,会回调任务中自带的aftertask方法。根据之前对limitingdispatcher的分析,可以知道,此时taildispatch参数为true,同时当前的线程也是worker线程,因此会被直接添加到线程的本地队列中,由于任务有对应的线程执行,因此跳过了唤起其他线程执行任务的阶段。这里我们可以称这个机制为尾调机制。

为什么coroutinescheduler类中要设计一个尾调机制呢?

在传统的线程池的线程充足情况下,一个任务到来时,会被分配一个线程。假设前后两个任务a与b有依赖关系,需要在执行a再执行b,这时如果两个任务同时到来,执行a任务的线程会直接执行,而执行b线程的任务可能需要被阻塞。而一旦线程阻塞会造成线程资源的浪费。而协程本质上就是多个小段程序的相互协作,因此这种场景会非常多,通过这种机制可以保证任务的执行顺序,同时减少资源浪费,而且可以最大限度的保证一个连续的任务执行在同一个线程中。

至此,dispatchers.io线程池的工作原理全部分析完毕。

五.浅谈workqueue类

1.add方法

接下来分析一些更加细节的过程。首先分析一下worker线程本地队列调用的add方法是如何添加任务的,代码如下:

// 本地队列中存储最后一次尾调的任务  private val lastscheduledtask = atomic<task?>(null)  // fair表示是否公平的执行任务,fifo,默认为false  fun add(task: task, fair: boolean = false): task? {      // fair为true,则添加到队尾      if (fair) return addlast(task)      // 如果fair为false,则从lastscheduledtask中取出上一个尾调的任务,      // 并把这次的新尾调任务保存到lastscheduledtask      val previous = lastscheduledtask.getandset(task) ?: return null      // 如果获取上一次的尾调任务不为空,则添加到队尾      return addlast(previous)  }

2.任务偷取机制

根据之前对worker类的分析,任务偷取的核心代码锁定在了workqueue类的两个方法上:一个是偷取非纯cpu任务的trystealblockingfrom方法,另一个可以偷所有类型任务的trystealfrom方法,代码如下:

internal const val buffer_capacity_base = 7  internal const val buffer_capacity = 1 shl buffer_capacity_base // 1000 0000  internal const val mask = buffer_capacity - 1 // 0111 1111  // 存储任务的数组,最多存储128  private val buffer: atomicreferencearray<task?> = atomicreferencearray(buffer_capacity)  // producerindex表示上一次向任务数组中添加任务的索引  // consumerindex表示上一次消费的任务索引  // producerindex永远大于等于consumerindex  // 二者差值就是当前任务数组中任务的数量  private val producerindex = atomic(0)  private val consumerindex = atomic(0)  // buffer中非纯cpu任务的数量(避免遍历扫描)  private val blockingtasksinbuffer = atomic(0)  // 偷所有类型任务  fun trystealfrom(victim: workqueue): long {      assert { buffersize == 0 }      // 从要偷取线程的本地队列中轮训获取一个任务      val task  = victim.pollbuffer()      // 如果获取到了任务      if (task != null) {          // 将它添加到自己的本地队列中          val notadded = add(task)          assert { notadded == null }          // 返回偷取成功的标识          return task_stolen      }      // 如果偷取失败,尝试偷取指定线程的尾调任务      return trysteallastscheduled(victim, blockingonly = false)  }  // 轮训获取任务  private fun pollbuffer(): task? {      // 死循环      while (true) {          // 获取上一次消费的任务索引          val taillocal = consumerindex.value          // 如果当前任务数组中没有多处的任务,则返回空          if (taillocal - producerindex.value == 0) return null          // 计算偷取位置,防止数组过界          val index = taillocal and mask          // 通过cas方式,将consumerindex加一,表示下一次要从taillocal + 1处开始偷取          if (consumerindex.compareandset(taillocal, taillocal + 1)) {              // 从偷取位置初取出任务,如果偷取的任务为空,则继续循环              val value = buffer.getandset(index, null) ?: continue              // 偷取成功              // 若任务为阻塞任务,blockingtasksinbuffer的值减一              value.decrementifblocking()              // 返回任务              return value          }      }  }  // 偷取非纯cpu任务  fun trystealblockingfrom(victim: workqueue): long {      assert { buffersize == 0 }      // 从consumerindex位置开始偷      var start = victim.consumerindex.value      // 偷到producerindex处截止      val end = victim.producerindex.value      // 获取任务数组      val buffer = victim.buffer      // 循环偷取      while (start != end) {          // 计算偷取位置,防止数组过界          val index = start and mask          // 如果非纯cpu任务数为0,则直接退出循环          if (victim.blockingtasksinbuffer.value == 0) break          // 获取index处的任务          val value = buffer[index]          // 如果任务存在,而且是非纯cpu任务,同时成功的通过cas设置为空          if (value != null && value.isblocking && buffer.compareandset(index, value, null)) {              // blockingtasksinbuffer的值减一              victim.blockingtasksinbuffer.decrementandget()              // 将偷取的任务添加到当前线程的本地队列中              add(value)              // 返回偷取成功标识              return task_stolen          } else {              // 如果偷取失败,自增再次循环,从下一个位置开始偷              ++start          }      }      // 如果从任务数组中偷取失败,尝试偷取指定线程的尾调任务      return trysteallastscheduled(victim, blockingonly = true)  }  // 偷取指定线程的尾调任务  private fun trysteallastscheduled(victim: workqueue, blockingonly: boolean): long {      // 死循环      while (true) {          // 获取指定线程的尾调任务,如果任务不存在,则返回偷取失败标识符          val lastscheduled = victim.lastscheduledtask.value ?: return nothing_to_steal          // 如果要偷取的是非纯cpu任务,但是任务类型为纯cpu任务,说明只有核心线程才能偷          // 返回偷取失败标识符          if (blockingonly && !lastscheduled.isblocking) return nothing_to_steal          // 获取当前时间          val time = schedulertimesource.nanotime()          //计算任务从添加开始到现在经过的时长          val staleness = time - lastscheduled.submissiontime          // 如果时长小于偷取冷却时间          if (staleness < work_stealing_time_resolution_ns) {              // 返回当前线程需要等待的时间              return work_stealing_time_resolution_ns - staleness          }          // 通过cas,将lastscheduledtask设置为空,防止被其他线程执行          if (victim.lastscheduledtask.compareandset(lastscheduled, null)) {              // 偷取成功,加入到当前线程的队列中              add(lastscheduled)              // 返回偷取成功表示              return task_stolen          }          // 继续循环          continue      }  }  // 偷取冷却时间,尾调任务从添加开始,  // 最少经过work_stealing_time_resolution_ns时间才可以被偷  @jvmfield  internal val work_stealing_time_resolution_ns = systemprop(      "kotlinx.coroutines.scheduler.resolution.ns", 100000l  )

六.总结

1.两个线程池

coroutinescheduler类是核心的线程池,用于任务的执行。limitingdispatcher类对coroutinescheduler类进行代理,是coroutinescheduler类尾调机制的使用者,对任务进行初步排队。

2.四种队列

limitingdispatcher类中的任务队列。coroutinescheduler类中的两个全局队列。worker类中的本地队列。

3.尾调机制

一个任务执行完,可以通过回调,在同一个worker线程中再存储一个待执行任务,该任务将在worker线程本地队列目前已存在的任务,执行完毕后再执行。

4.任务分类与权限控制

所有任务分成纯cpu任务和非纯cpu任务两种,对应着核心线程和非核心线程。

所有线程在执行前都先尝试成为核心线程,核心线程可以从两种任务中任意选择执行,非核心线程只能执行非纯cpu任务。核心线程如果选择执行非纯cpu任务会变成非核心线程

5.任务偷取机制

workqueue类根据随机算法提供任务偷取机制,一个worker线程可以从其他worker线程的本地队列中偷取任务。

6.执行梳理图

Android Dispatchers.IO线程池深入刨析

到此这篇关于android dispatchers.io线程池深入刨析的文章就介绍到这了,更多相关android dispatchers.io内容请搜索<计算机技术网(www.ctvol.com)!!>以前的文章或继续浏览下面的相关文章希望大家以后多多支持<计算机技术网(www.ctvol.com)!!>!

本文来自网络收集,不代表计算机技术网立场,如涉及侵权请联系管理员删除。

ctvol管理联系方式QQ:251552304

本文章地址:https://www.ctvol.com/addevelopment/1209175.html

(0)
上一篇 2022年8月30日
下一篇 2022年8月30日

精彩推荐