线程安全——线程池

概述

现在 CPU 都是有多个核心,并行已经成为事实,一方面我们希望最大限度利用机器性能(利用多线程提高吞吐率),另一方面机器的硬件资源是有限的,我们也不能无限制的去申请,这时候我们就需要线程池。它帮我们管理线程,避免频繁创建线程和销毁线程的资源损耗。

幸运的是,JDK 已经为我们提供了 ExecutorService 的实现,还提供了 Executors 工厂类方便我们生成模板线程池。

线程池的创建

在 JDK 1.5 之后推出了相关的 api,常见的创建线程池方式有以下几种:

  • Executors.newCachedThreadPool():无限线程池。
  • Executors.newFixedThreadPool(nThreads):创建固定大小的线程池。
  • Executors.newSingleThreadExecutor():创建单个线程的线程池。

其实看这三种方式创建的源码就会发现:

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

实际上还是利用 ThreadPoolExecutor 类实现的。

ThreadPoolExecutor

参数:

  • corePoolSize:
    指定线程池核心线程的数量

  • maximumPoolSize:
    指定线程池中线程的最大数量

  • keepAliveTime:
    当线程池线程的数量超过 corePoolSize 的时候,多余的空闲线程存活的时间,如果超过了 corePoolSize,在 keepAliveTime 的时间之后,销毁线程

  • unit:
    keepAliveTime 的单位

  • workQueue:
    工作队列,将被提交但尚未执行的任务缓存起来

  • threadFactory:
    线程工厂,用于创建线程,不指定为默认线程工厂 DefaultThreadFactory

  • handler:
    拒绝策略

执行过程:

  1. 如果核心线程还没满,则直接起线程;

  2. 如果核心线程已满而队列没满则直接入队;

  3. 如果队列满了但最大线程不够则再起线程达到最大线程;

  4. 如果队列多了则按抛弃策略来抛弃;

这就是线程池的一个基本运行过程

成员变量 clt

1
2
//CAS,无锁并发
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

成员变量 ctl 是由 AtomicInteger 这个类定义的,可以通过 CAS 达到无锁并发,效率比较高。

这个变量有双重身份,它的高三位表示线程池的状态,低 29 位表示线程池中现有的线程数,这也是 Doug Lea 一个天才的设计,用最少的变量来减少锁竞争,提高并发效率。

线程池状态

关于线程池的状态,有 5 种,

  1. RUNNING: 运行状态,值也是最小的,刚创建的线程池就是此状态。
  2. SHUTDOWN: 停工状态,不再接收新任务,已经接收的会继续执行
  3. STOP: 停止状态,不再接收新任务,已经接收正在执行的,也会中断
  4. TIDYING: 清空状态,所有任务都停止了,工作的线程也全部结束了
  5. TERMINATED: 终止状态,线程池已销毁

用图表示:

JAVA并发——线程池_2020-03-27-19-34-26.png

运行原理

数据结构

  1. Worker: 本身实现了 Runnable 接口,自然也实现了 run()方法,我们提交的任务实际上也是交给 Worker 来执行。

  2. Workers: 是一个 HashSet 结构的容器,用来存放要执行的线程。

  3. WorkQueue: 有界阻塞队列,核心线程已满,队列未满新的任务添加进队列

execute

  1. 当我们提交任务到线程池:通过 workerCountOf(c) 提取 Workers 里的 Worker 的数量。如果小于核心线程数,则会尝试进行 addWorker 操作,core为true,否则执行步骤 2。

  2. 如果 worker 数量大于等于核心线程数根据 c(c 就是 ctl 的值)判断线程池是否还在运行,并且尝试添加任务到队列中。成功则执行 3,失败则执行 4。

  3. 再次检查线程池的状态,如果线程池没有 RUNNING,且成功从阻塞队列中移除任务,则执行 reject 方法处理任务;

  4. 那么如果把任务放入阻塞队列失败,即队列已满呢(workQueue.offer(command)返回 false),这时候,直接尝试增加一个 worker,core为false,如果失败,则执行拒绝策略处理该条任务。

addWorker

  1. 判断线程池的状态,如果线程池的状态值大于或等 SHUTDOWN,则不处理提交的任务,直接返回;

  2. 通过参数 core 判断是否需要创建线程,ture与核心池最大数比较,flase与最大线程数比较,成功则添加Worker。

Worker

  1. 继承了 AQS 类,可以方便的实现工作线程的中止操作;

  2. 实现了 Runnable 接口,可以将自身作为一个任务在工作线程中执行;

  3. 当前提交的任务 firstTask 作为参数传入 Worker 的构造方法;

JAVA并发——线程池_2020-03-27-20-42-34.png

注意:这里有一个 Worker 的参数 firsttask,这里为什么第一个任务呢,因为一个 worker 创建出来,一开始会有个任务,这个任务执行完了呢,就会执行 getTask 方法,从队列中去获取任务。

runWorker

JAVA并发——线程池_2020-03-27-20-42-47.png

runWorker 方法是线程池的核心:

  1. 线程启动之后,通过 unlock 方法释放锁,设置 AQS 的 state 为 0,表示运行中断;

  2. 获取第一个任务 firstTask,执行任务的 run 方法,不过在执行任务之前,会进行加锁操作,任务执行完会释放锁;

  3. 在执行任务的前后,可以根据业务场景自定义 beforeExecute 和 afterExecute 方法;

  4. firstTask 执行完成之后,通过 getTask 方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask 方法会被阻塞并挂起,不会占用 cpu 资源;

getTask

JAVA并发——线程池_2020-03-27-20-44-24.png

整个 getTask 操作在自旋下完成:

  1. workQueue.take:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take 方法返回任务,并执行;

  2. workQueue.poll:如果在 keepAliveTime 时间内,阻塞队列还是没有任务,则返回 null;

所以,线程池中实现的线程可以一直执行由用户提交的任务。

淘汰策略

线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了 4 种策略:

  1. AbortPolicy:直接抛出异常,默认策略;

  2. CallerRunsPolicy:用调用者所在的线程来执行任务;

  3. DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;

  4. DiscardPolicy:直接丢弃任务; 当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

疑问

  1. 可以看到 execute 方法中没有用到重量级锁,ctl 虽然可以保证本身变化的原子性,但是不能保证方法内部的代码块的原子性,是否会有并发问题?

    execute 方法虽然没有加锁,但是在 addWorker 方法内部,加锁了,这样可以保证不会创建超过我们预期的线程数,大师在设计的时候,做到了在最小的范围内加锁,尽量减少锁竞争,

  1. 上面提到过,addWorker 方法可以添加工作线程(核心或者非核心),线程本身没有核心或者非核心的标识,core 参数只是用来确定当前线程数的比较对象是线程池设置的核心线程数还是最大线程数,真实情况是不是这样?

    可以看到,core 参数,只是用来判断当前线程数是否超量的时候跟 corePoolSize 还是 maxPoolSize 比较,Worker 本身无核心或者非核心的概念。

  2. 线程池的线程是如何做到复用的?

    线程池中的线程在循环中尝试取任务执行,这一步会被阻塞,就是任务在并不只执行创建时指定的 firstTask 第一任务,还会从任务队列的中自己主动取任务执行,而且是有/无时间限定的阻塞等待,保证线程的存活;

    如果设置了 allowCoreThreadTimeOut 为 true,则线程池中的所有线程都会在 keepAliveTime 时间超时后还未取到任务而退出。或者线程池已经 STOP,那么所有线程都会被中断,然后退出。

  3. 线程池是如何做到高效并发的?

    • 线程池状态和工作线程数量的变更。这个由一个 AtomicInteger 变量 ctl 来解决原子性问题。
    • 向工作 Worker 容器 workers 中添加新的 Worker 的时候。这个线程池本身已经加锁了。
    • 工作线程 Worker 从等待队列中取任务的时候。这个由工作队列本身来保证线程安全,比如 LinkedBlockingQueue 等。

总结

  • 线程池对于线程的复用很重要,避免了频繁创建线程造成的内存和 CPU 调度的消耗。
  • 线程池、队列大小要设计的合理,尽量的让任务从队列中获取执行。
  • 如果任务多,线程执行时间短可以调大 keepalive 值,使得线程尽量不被回收从而可以复用线程。
文章目录
  1. 1. 概述
  2. 2. 线程池的创建
  3. 3. ThreadPoolExecutor
    1. 3.1. 参数:
    2. 3.2. 执行过程:
    3. 3.3. 成员变量 clt
    4. 3.4. 线程池状态
    5. 3.5. 运行原理
      1. 3.5.1. 数据结构
      2. 3.5.2. execute
      3. 3.5.3. addWorker
      4. 3.5.4. Worker
      5. 3.5.5. runWorker
      6. 3.5.6. getTask
    6. 3.6. 淘汰策略
    7. 3.7. 疑问
  4. 4. 总结
|