看到好玩的、想了解的,记录一下
从execute 方法的注释清晰得知,传统线程加入线程池执行过程分3步
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
优先offer到queue,queue满后再扩充线程到maxThread,如果已经到了maxThread就reject 比较适合于CPU密集型应用(比如runnable内部执行的操作都在JVM内部,memory copy, or compute等等)
java.util.concurrent.ThreadPoolExecutor#execute
corePoolSize和maximumPoolSize是ThreadPoolExecutor核心的两个参数,分别表示了核心线程数和最大线程数,corePoolSize一般设置为cpu核数或者核数的两倍,达到最大化利用cpu的效果,maximumPoolSize则通过设置最大线程数防止线程数暴涨影响系统运行。这个两个参数非常的重要,根据上面的理解,在结合阿里规约了解一下为啥禁止直接使用Executors,这里面最核心的问题就出在Executors对workQueue的处理上,在常用的newFixedThreadPool 和 newCachedThreadPool 两个创建方式上,newFixedThreadPool采用了一个不限容量的队列(new LinkedBlockingQueue()),而newCachedThreadPool采用了一个阻塞队列,结果就是在突发流量的请求下,newFixedThreadPool的线程数会保持corePoolSize大小,将来不及处理的请求压入等待队列,而newCachedThreadPool则选择创建不受限制的新线程去执行突发的请求。从系统安全性考虑,这两种方式在面对大量突发请求的情况下都将出现系统资源占用甚至耗尽问题,而从业务指标上来讲,排队和线程切换等问题的存在,这两种机制在吞吐和延迟上都无法作出有效的优化。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
核心线程数和最大线程数的使用规则?为啥JDK不是直接使用corePoolSize满了直接增加maximumPoolSize 然后在入队呢?ThreadPoolExecutor 比较适合于CPU密集型应用(memory copy, or compute),这个也是设计的初衷,但是副作用就是会牺牲系统的吞吐和延迟(ThreadPoolExecutor的实现的话,我们就可以看到ThreadPoolExecutor实现逻辑是先排队再扩线程,如果队列是个不限容量队列,那么超过corePoolSize的请求都将直接排队,maximumPoolSize的设置也将完全不起作用。这样设计的理由是尽量不去创建新线程复用现有线程,副作用就是会牺牲系统的吞吐和延迟。)
Tomcat的核心线程池实现类是StandardThreadExecutor,该类封装了对ThreadPoolExecutor(tomcat其实自己基于jdk的ThreadPoolExecutor封装了自己的ThreadPoolExecutor实现,StandardThreadExecutor最大的特点是实现了一个继承自LinkedBlockingQueue的定长队列TaskQueue,巧妙的使用了Queue的特性,通过重载LinkedBlockingQueue的offer方法,在触发排队的情况下,如果线程池中线程数小于最大线程数限制,则直接通过返回false的方式拒绝请求,强制线程池创建新的线程去处理请求,而只有线程数达到最大线程数限制之后才接受排队请求。
org.apache.catalina.core.StandardThreadExecutor#startInternal
// TaskQueue extends LinkedBlockingQueue<Runnable>
taskqueue = new TaskQueue(maxQueueSize);
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
executor.setThreadRenewalDelay(threadRenewalDelay);
if (prestartminSpareThreads) {
executor.prestartAllCoreThreads();
}
taskqueue.setParent(executor);
org.apache.tomcat.util.threads.TaskQueue 结合当前线程池的数量进行控制,在执行java.util.concurrent.ThreadPoolExecutor#execute 插入队列失败,导致强制进入创建线程池处理,这样破了JDK默认的策略。
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
StandardThreadExecutor 优先扩充线程到maxThread,再offer到queue,如果满了就reject,比较适合于业务处理需要远程资源的场景(请求处理时间存在限制,但同时希望保证可控的响应延迟,那么Tomcat的模式似乎更值得考虑一下)
如何合理的设置线程池ThreadPoolExecutor的大小
尽量使用较小的线程池,一般Cpu核心数+1 。
因为CPU密集型任务CPU的使用率很高,若开过多的线程,只能增加线程上下文的切换次数,带来额外的开销。
方法一:可以使用较大的线程池,一般CPU核心数 * 2
IO密集型CPU使用率不高,可以让CPU等待IO的时候处理别的任务,充分利用cpu时间。
方法二:线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。
下面举个例子:
比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)8=32。这个公式进一步转化为:
最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1) CPU数目
线程池的初衷就是为了更好的性能,通过分析业务的使用场景,选择合适的线程池就显得非常重要了,在方案的选择上,除了要考虑核心线程数和最大线程数的设置,线程池的调度逻辑也是需要考虑的因素之一。一般情况下使用JDK默认的基本满足需求啦,对于响应要求很高的,可以考虑一下Tomact线程池的设计,突破常规。
参考
原网址: 访问
创建于: 2021-01-29 09:58:00
目录: default
标签: 无
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论