一次线上线程池任务问题处理历程
[作者简介] 王日华,小米信息技术部订单组研发工程师,目前主要负责小米订单中台业务。
一、前言
在一次新功能上线过程中,出现线程池提交任务抛出 RejectedExecutionException 异常,即任务提交执行了拒绝策略的操作。查看业务情况和线程池配置,发现并行执行的任务数是小于线程池最大线程数的,为此展开了一次线程池问题排查历程。
二、业务情景
2.1. 任务描述
每次执行一组任务,一组任务最多有 15 个,多线程执行,每个线程处理一个任务;每次执行完一组任务后,再执行下一组,不存在上一组的任务和下一组一起执行的情况。
2.2. 任务提交流程
2.3. 线程池配置
1 2 3 4 5
| <bean id="executor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="14"/> <property name="maxPoolSize" value="30"/> <property name="queueCapacity" value="1"/> </bean>
|
三、出现问题
执行过程中出现 RejectedExecutionException 异常,由于是采用的是默认拒绝策略 AbortPolicy,因此,可以明确知道任务是提交到线程池后,线程池资源已满,导致任务被拒绝。
四、问题排查
4.1. 检查线程池配置
任务最多 15 个一组,核心线程有 14 个,阻塞队列是 1,最大线程 30,理论上 14 个核心线程+1 个阻塞队列即可完成一组任务,连非核心线程都无需使用,为什么会出现线程被占满的情况?
4.2. 检查业务代码
检查是否存在线程池被多处使用,或者有多批任务被同时执行的情况,并没有发现错误;
4.3. 线下重现
1 2 3 4 5
| <bean id="executor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="14"/> <property name="maxPoolSize" value="30"/> <property name="queueCapacity" value="1"/> </bean>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| @RunWith(SpringRunner.class) @SpringBootTest public class SpringBootStartApplicationTests {
@Resource private ThreadPoolTaskExecutor executor;
@Test public void contextLoads() throws Exception { for(int i = 0; i < 10; i++) { doOnceTasks(); System.out.println("---------------------------------------" + i); } }
private void doOnceTasks(){ List<Future> futureList = Lists.newArrayListWithCapacity(15); for(int i = 0; i < 15; ++i){ Future future = executor.submit(()->{ int sec = new Double(Math.random() * 5).intValue(); LockSupport.parkNanos(sec * 1000 * 1000 * 1000); System.out.println(Thread.currentThread().getName() + " end"); }); futureList.add(future); }
for(Future future : futureList){ try { future.get(); } catch (Exception e) { e.printStackTrace(); } } } }
|
五、线程池源码阅读
5.1. 线程池执行任务流程
- 当工作线程数 < corePoolSize 时,新创建一个新线程执行新提交任务,即使此时线程池中存在空闲线程;
- 当工作线程数 == corePoolSize 时,新提交任务将被放入 workQueue 中;
- 当 workQueue 已满,且工作线程数 < maximumPoolSize 时,新提交任务会创建新的非核心线程执行任务;
- 当 workQueue 已满,且 工作线程数==maximumPoolSize 时,新提交任务由 RejectedExecutionHandler 处理;
5.2. execute 线程池提交任务源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| class ThreadPoolExecutor{ public void execute(Runnable command) { if (command == null) throw new NullPointerException();
int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return;
c = ctl.get(); }
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command);
else if (workerCountOf(recheck) == 0) addWorker(null, false); }
else if (!addWorker(command, false)) reject(command); } }
|
5.3. addWorker 添加 worker 线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
| class ThreadPoolExecutor{
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;
for (;;) { int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;
if (compareAndIncrementWorkerCount(c)) break retry;
c = ctl.get(); if (runStateOf(c) != rs) continue retry; } }
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException();
workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); }
if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } }
|
六、问题定位
6.1. 定位执行拒绝策略入口
执行拒绝策略的位置只有这两个地方,在这两个地方打上断点,执行 demo,结果发现拒绝策略是在第二处执行的;
6.2. 定位执行拒绝策略原因
进入 addWorker 方法,只有这两个地方返回 false,创建线程失败,打断点,执行 demo,发现是在第二处返回 false 的;
七、问题确认
确实是创建的 worker 线程已经达到最大线程数,无法再创建,然后执行拒绝策略的,为什么会被创建到最大呢,每组任务最大只有 15 个,为什么会用到非核心线程?
八、定位原因
8.1. 分析 execute 方法
在添加非核心线程前,先尝试将任务放到阻塞队列中,如果阻塞队列已满,则尝试添加非核心线程,也就是说,创建非核心线程时:workQueue.offer(command) == false,即阻塞队列已满;
8.2. 猜测原因
因为我们阻塞队列只有 1,会不会提交任务的速度比线程从阻塞队列取任务的速度快,进而导致创建非核心线程执行任务,最终的结果就是:在多批任务之后,再无非核心线程可创建,导致执行拒绝策略。
8.3. 原因验证
8.3.1 阻塞队列选择
查看 Spring 的 ThreadPoolTaskExecutor 源码,发现如果阻塞队列数量>0,则使用 LinkedBlockingQueue,否则使用 SynchronousQueue。
8.3.2 LinkedBlockingQueue
- 查看 LinkedBlockingQueue#take 方法,如果队列已空,则所有取元素的线程会阻塞在一个 Lock 的 notEmpty 等待条件上,等有元素入队时,只会调用 signal 方法唤醒一个线程取元素,而不是所有线程。
1 2 3 4 5 6 7 8 9 10 11 12 13
| class LinkedBlockingQueue{ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } }
|
- 因为一个线程从唤醒到执行是有一段时间间隔的,阻塞被唤醒后,还要等待获取 cpu 时间片,而主线程一直在发布任务,此时就会造成队列中的元素来不及消费,只能创建非核心线程消费的现象。
九、解决方式
9.1. 使用 SynchronousQueue
使用 SynchronousQueue,即阻塞队列大小设置为 0,原因在于:SynchronousQueue 和 LinkedBlockingQueue 维度不一致,SynchronousQueue 是根据是否有等待线程而决定是否入队成功,而 LinkedBlockingQueue 是根据缓冲区,而不管是否已经有等待线程。
9.2. 根据业务情况配置阻塞队列
对于我们的业务情况,因为任务最多只有 15 个,将阻塞队列大小设置为 15,这样就保证了不会出现任务被拒绝。
作者
王日华,小米信息技术部订单组
招聘
信息部是小米公司整体系统规划建设的核心部门,支撑公司国内外的线上线下销售服务体系、供应链体系、ERP 体系、内网 OA 体系、数据决策体系等精细化管控的执行落地工作,服务小米内部所有的业务部门以及 40 家生态链公司。
同时部门承担大数据基础平台研发和微服务体系建设落,语言涉及 Java、Go,长年虚位以待对大数据处理、大型电商后端系统、微服务落地有深入理解和实践的各路英雄。
欢迎投递简历:jin.zhang(a)xiaomi.com(武汉)