线程池



获取多线程的方法,我们都知道有三种,还有一种是实现Callable接口

  • 实现Runnable接口
  • 实现Callable接口
  • 实例化Thread类
  • 使用线程池获取

Callable接口

Callable接口,是一种让线程执行完成后,能够返回结果的

在说到Callable接口的时候,我们不得不提到Runnable接口

1
2
3
4
5
6
7
8
9
10
/**
* 实现Runnable接口
*/
class MyThread implements Runnable {

@Override
public void run() {

}
}

我们知道,实现Runnable接口的时候,需要重写run方法,也就是线程在启动的时候,会自动调用的方法

同理,我们实现Callable接口,也需要实现call方法,但是这个时候我们还需要有返回值,这个Callable接口的应用场景一般就在于批处理业务,比如转账的时候,需要给一会返回结果的状态码回来,代表本次操作成功还是失败

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Callable有返回值
* 批量处理的时候,需要带返回值的接口(例如支付失败的时候,需要返回错误状态)
*
*/
class MyThread2 implements Callable<Integer> {

@Override
public Integer call() throws Exception {
System.out.println("come in Callable");
return 1024;
}
}

最后我们需要做的就是通过Thread线程, 将MyThread2实现Callable接口的类包装起来

这里需要用到的是FutureTask类,他实现了Runnable接口,并且还需要传递一个实现Callable接口的类作为构造函数

1
2
3
// FutureTask:实现了Runnable接口,构造函数又需要传入 Callable接口
// 这里通过了FutureTask接触了Callable接口
FutureTask<Integer> futureTask = new FutureTask<>(new MyThread2());

然后在用Thread进行实例化,传入实现Runnabnle接口的FutureTask的类

1
2
Thread t1 = new Thread(futureTask, "aaa");
t1.start();

最后通过 utureTask.get() 获取到返回值

1
2
// 输出FutureTask的返回值
System.out.println("result FutureTask " + futureTask.get());

这就相当于原来我们的方式是main方法一条龙之心,后面在引入Callable后,对于执行比较久的线程,可以单独新开一个线程进行执行,最后在进行汇总输出

最后需要注意的是 要求获得Callable线程的计算结果,如果没有计算完成就要去强求,会导致阻塞,直到计算完成

image-20200317152541284

也就是说 futureTask.get() 需要放在最后执行,这样不会导致主线程阻塞

也可以使用下面算法,使用类似于自旋锁的方式来进行判断是否运行完毕

1
2
3
4
// 判断futureTask是否计算完成
while(!futureTask.isDone()) {

}

注意

多个线程执行 一个FutureTask的时候,只会计算一次

1
2
3
4
5
FutureTask<Integer> futureTask = new FutureTask<>(new MyThread2());

// 开启两个线程计算futureTask
new Thread(futureTask, "AAA").start();
new Thread(futureTask, "BBB").start();

如果我们要两个线程同时计算任务的话,那么需要这样写,需要定义两个futureTask

1
2
3
4
5
6
7
FutureTask<Integer> futureTask = new FutureTask<>(new MyThread2());
FutureTask<Integer> futureTask2 = new FutureTask<>(new MyThread2());

// 开启两个线程计算futureTask
new Thread(futureTask, "AAA").start();

new Thread(futureTask2, "BBB").start();

ThreadPoolExecutor

为什么用线程池

线程池做的主要工作就是控制运行的线程的数量,处理过程中,将任务放入到队列中,然后线程创建后,启动这些任务,如果线程数量超过了最大数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。

它的主要特点为:线程复用、控制最大并发数、管理线程

线程池中的任务是放入到阻塞队列中的

线程池的好处

多核处理的好处是:省略的上下文的切换开销

原来我们实例化对象的时候,是使用 new关键字进行创建,到了Spring后,我们学了IOC依赖注入,发现Spring帮我们将对象已经加载到了Spring容器中,只需要通过@Autowrite注解,就能够自动注入,从而使用

因此使用多线程有下列的好处

  • 降低资源消耗。通过重复利用已创建的线程,降低线程创建和销毁造成的消耗
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就立即执行
  • 提高线程的可管理性。线程是稀缺资源,如果无线创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

架构说明

Java中线程池是通过Executor框架实现的,该框架中用到了Executor,Executors(代表工具类),ExecutorService,ThreadPoolExecutor这几个类。

image-20200317175202647

image-20200317175241007

创建线程池

  • Executors.newFixedThreadPool(int i) :创建一个拥有 i 个线程的线程池
    • 执行长期的任务,性能好很多
    • 创建一个定长线程池,可控制线程数最大并发数,超出的线程会在队列中等待
  • Executors.newSingleThreadExecutor():创建一个只有1个线程的 单线程池
    • 一个任务一个任务执行的场景
    • 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行
  • Executors.newCachedThreadPool(); 创建一个可扩容的线程池
    • 执行很多短期异步的小程序或者负载教轻的服务器
    • 当线程池大小超过了处理任务所需的线程,那么就会回收部分空闲(一般是60秒无执行)的线程
    • 当有任务来时,又智能的添加新线程来执行。
  • Executors.newScheduledThreadPool(int corePoolSize):大小无限制的线程池,线程池支持定时以及周期性执行任务,创建一个corePoolSize为传入参数,最大线程数为整形的最大数的线程池

具体使用,首先我们需要使用Executors工具类,进行创建线程池,这里创建了一个拥有5个线程的线程池

1
2
3
4
5
6
7
8
// 一池5个处理线程(用池化技术,一定要记得关闭)
ExecutorService threadPool = Executors.newFixedThreadPool(5);

// 创建一个只有一个线程的线程池
ExecutorService threadPool = Executors.newSingleThreadExecutor();

// 创建一个拥有N个线程的线程池,根据调度创建合适的线程
ExecutorService threadPool = Executors.newCacheThreadPool();

然后我们执行下面的的应用场景

1
模拟10个用户来办理业务,每个用户就是一个来自外部请求线程

我们需要使用 threadPool.execute执行业务,execute需要传入一个实现了Runnable接口的线程

1
2
3
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 给用户办理业务");
});

然后我们使用完毕后关闭线程池

1
threadPool.shutdown();

完整代码为:采用 Executors.newFixedThreadPool(int i)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 使用 Java多线程的方式,通过线程池
* @author: coderblue
*/
public class MyThreadPoolDemo {
public static void main(String[] args) {

// Array Arrays(辅助工具类)
// Collection Collections(辅助工具类)
// Executor Executors(辅助工具类)


// 一池5个处理线程(用池化技术,一定要记得关闭),不推荐这种
ExecutorService threadPool = Executors.newFixedThreadPool(5);

// 模拟10个用户来办理业务,每个用户就是一个来自外部请求线程
try {

// 循环十次,模拟业务办理,让5个线程处理这10个请求
for (int i = 0; i < 10; i++) {
final int tempInt = i;
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 给用户:" + tempInt + " 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}

}
}

最后结果:

1
2
3
4
5
6
7
8
9
10
pool-1-thread-1	 给用户:0 办理业务
pool-1-thread-5 给用户:4 办理业务
pool-1-thread-1 给用户:5 办理业务
pool-1-thread-4 给用户:3 办理业务
pool-1-thread-2 给用户:1 办理业务
pool-1-thread-3 给用户:2 办理业务
pool-1-thread-2 给用户:9 办理业务
pool-1-thread-4 给用户:8 办理业务
pool-1-thread-1 给用户:7 办理业务
pool-1-thread-5 给用户:6 办理业务

我们能够看到,一共有5个线程,在给10个用户办理业务

线程池中 submit() 和 execute()方法有什么区别?

execute() 参数 Runnable ;submit() 参数 (Runnable) 或 (Runnable 和 结果 T) 或 (Callable)
execute() 没有返回值;而 submit() 有返回值
submit() 的返回值 Future 调用get方法时,可以捕获处理异常

创建周期性执行任务的线程池

Executors.newScheduledThreadPool(int corePoolSize):

线程池支持定时以及周期性执行任务,创建一个corePoolSize为传入参数,最大线程数为整形的最大数的线程池

底层使用 ScheduledThreadPoolExecutor 来实现 ScheduledThreadPoolExecutor 为ThreadPoolExecutor子类

1
2
3
4
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

执行方法

1
2
3
4
5
6
7
8
9
10
  /**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* command:执行的任务 Callable或Runnable接口实现类
* delay:延时执行任务的时间
* unit:延迟时间单位
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
* command:执行的任务 Callable或Runnable接口实现类
* initialDelay 第一次执行任务延迟时间
* period 连续执行任务之间的周期,从上一个任务开始执行时计算延迟多少开始执行下一个任务,但是还会等上一个任务结束之后。
* unit:延迟时间单位
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
12
13
   /**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
* command:执行的任务 Callable或Runnable接口实现类
* initialDelay 第一次执行任务延迟时间
* delay:连续执行任务之间的周期,从上一个任务全部执行完成时计算延迟多少开始执行下一个任务
* unit:延迟时间单位
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit)

底层实现

我们通过查看源码,点击了Executors.newSingleThreadExecutor 和 Executors.newFixedThreadPool能够发现底层都是使用了ThreadPoolExecutor

image-20200317182004293

我们可以看到线程池的内部,还使用到了LinkedBlockingQueue 链表阻塞队列

同时在查看Executors.newCachedThreadPool 看到底层用的是 SynchronousBlockingQueue阻塞队列

最后查看一下,完整的三个创建线程的方法

image-20200317183202992

线程池的重要参数

image-20200317183600957

线程池在创建的时候,一共有7大参数

  • corePoolSize:核心线程数,线程池中的常驻核心线程数
    • 在创建线程池后,当有请求任务来之后,就会安排池中的线程去执行请求任务,近似理解为今日当值线程
    • 当线程池中的线程数目达到corePoolSize后,就会把到达的队列放到缓存队列中
  • maximumPoolSize:线程池能够容纳同时执行的最大线程数,此值必须大于等于1、
    • 相当有扩容后的线程数,这个线程池能容纳的最多线程数
  • keepAliveTime:多余的空闲线程存活时间
    • 当线程池数量超过corePoolSize时,当空闲时间达到keepAliveTime值时,多余的空闲线程会被销毁,直到只剩下corePoolSize个线程为止
    • 默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用
  • unit:keepAliveTime的单位
  • workQueue:任务队列,被提交的但未被执行的任务(类似于银行里面的候客区)
    • LinkedBlockingQueue:链表阻塞队列
    • SynchronousBlockingQueue:同步阻塞队列
  • threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程池 一般用默认即可
  • handler:拒绝策略,表示当队列满了并且工作线程大于线程池的最大线程数(maximumPoolSize3)时,如何来拒绝请求执行的Runnable的策略

新提交一个任务时的处理流程很明显

1、如果当前线程池的线程数还没有达到基本大小(poolSize < corePoolSize),无论是否有空闲的线程新增一个线程处理新提交的任务

2、如果当前线程池的线程数大于或等于基本大小(poolSize >= corePoolSize) 且任务队列未满时,就将新提交的任务提交到阻塞队列排队,等候处理workQueue.offer(command);

3、如果当前线程池的线程数大于或等于基本大小(poolSize >= corePoolSize),且任务队列满时

3.1、当前poolSize<maximumPoolSize,那么就新增线程来处理任务;

3.2、当前poolSize=maximumPoolSize,那么意味着线程池的处理能力已经达到了极限,此时需要拒绝新增加的任务。至于如何拒绝处理新增的任务,取决于线程池的饱和策略RejectedExecutionHandler。

参考-理解ThreadPoolExecutor线程池

当营业窗口和阻塞队列中都满了时候,就需要设置拒绝策略

image-20200317201150197

拒绝策略

以下所有拒绝策略都实现了RejectedExecutionHandler接口

  • AbortPolicy:默认,直接抛出RejectedExcutionException异常,阻止系统正常运行
  • DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常,如果运行任务丢失,这是一种好方案
  • CallerRunsPolicy:该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者
  • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务

线程池底层工作原理

线程池运行架构图

image-20200318154414717

文字说明

  1. 在创建了线程池后,等待提交过来的任务请求
  2. 当调用execute()方法添加一个请求任务时,线程池会做出如下判断
    1. 如果正在运行的线程池数量小于corePoolSize,那么马上创建线程运行这个任务
    2. 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列
    3. 如果这时候队列满了,并且正在运行的线程数量还小于maximumPoolSize,那么还是创建非核心线程like运行这个任务;
    4. 如果队列满了并且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行
  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行
  4. 当一个线程无事可做操作一定的时间(keepAliveTime)时,线程池会判断:
    1. 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉
    2. 所以线程池的所有任务完成后,它会最终收缩到corePoolSize的大小

以顾客去银行办理业务为例,谈谈线程池的底层工作原理

  1. 最开始假设来了两个顾客,因为corePoolSize为2,因此这两个顾客直接能够去窗口办理
  2. 后面又来了三个顾客,因为corePool已经被顾客占用了,因此只有去候客区,也就是阻塞队列中等待
  3. 后面的人又陆陆续续来了,候客区可能不够用了,因此需要申请增加处理请求的窗口,这里的窗口指的是线程池中的线程数,以此来解决线程不够用的问题
  4. 假设受理窗口已经达到最大数,并且请求数还是不断递增,此时候客区和线程池都已经满了,为了防止大量请求冲垮线程池,已经需要开启拒绝策略
  5. 临时增加的线程会因为超过了最大存活时间,就会销毁,最后从最大数削减到核心数

为什么不用默认创建的线程池?

线程池创建的方法有:固定数的,单一的,可变的,那么在实际开发中,应该使用哪个?

我们一个都不用,在生产环境中是使用自己自定义的

为什么不用Executors中JDK提供的?

根据阿里巴巴手册:并发控制这章

  • 线程资源必须通过线程池提供,不允许在应用中自行显式创建线程
    • 使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题,如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题
  • 线程池不允许使用Executors去创建,而是通过ThreadToolExecutors的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
    • Executors返回的线程池对象弊端如下:
      • FixedThreadPool和SingleThreadPool:
        • 运行的请求队列长度为:Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM
      • CacheThreadPool和ScheduledThreadPool
        • 运行的请求队列长度为:Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM

手写线程池

采用默认拒绝策略

从上面我们知道,因为默认的Executors创建的线程池,底层都是使用LinkBlockingQueue作为阻塞队列的,而LinkBlockingQueue虽然是有界的,但是它的界限是 Integer.MAX_VALUE 大概有20多亿,可以相当是无界的了,因此我们要使用ThreadPoolExecutor自己手动创建线程池,然后指定阻塞队列的大小

下面我们创建了一个 核心线程数为2,最大线程数为5,并且阻塞队列数为3的线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 手写线程池
final Integer corePoolSize = 2;
final Integer maximumPoolSize = 5;
final Long keepAliveTime = 1L;

// 自定义线程池,只改变了LinkBlockingQueue的队列大小
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

然后使用for循环,模拟10个用户来进行请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 模拟10个用户来办理业务,每个用户就是一个来自外部请求线程
try {

// 循环十次,模拟业务办理,让5个线程处理这10个请求
for (int i = 0; i < 10; i++) {
final int tempInt = i;
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 给用户:" + tempInt + " 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}

但是在用户执行到第九个的时候,触发了异常,程序中断

1
2
3
4
5
6
7
8
9
10
11
12
13
pool-1-thread-1	 给用户:0 办理业务
pool-1-thread-4 给用户:6 办理业务
pool-1-thread-3 给用户:5 办理业务
pool-1-thread-2 给用户:1 办理业务
pool-1-thread-2 给用户:4 办理业务
pool-1-thread-5 给用户:7 办理业务
pool-1-thread-4 给用户:2 办理业务
pool-1-thread-3 给用户:3 办理业务
java.util.concurrent.RejectedExecutionException: Task com.moxi.interview.study.thread.MyThreadPoolDemo$$Lambda$1/1747585824@4dd8dc3 rejected from java.util.concurrent.ThreadPoolExecutor@6d03e736[Running, pool size = 5, active threads = 3, queued tasks = 0, completed tasks = 5]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.moxi.interview.study.thread.MyThreadPoolDemo.main(MyThreadPoolDemo.java:34)

这是因为触发了拒绝策略,而我们设置的拒绝策略是默认的AbortPolicy,也就是抛异常的

触发条件是,请求的线程大于 阻塞队列大小 + 最大线程数 = 8 的时候,也就是说第9个线程来获取线程池中的线程时,就会抛出异常从而报错退出。

完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.concurrent.*;

/**
* 手写线程池
* @author coderblue
*/
public class ThreadPoolExecutorDemo {

public static void main(String[] args) {
// 手写线程池
final Integer corePoolSize = 2;
final Integer maximumPoolSize = 5;
final Long keepAliveTime = 1L;

// 自定义线程池,只改变了LinkBlockingQueue的队列大小,其中new ThreadPoolExecutor.DiscardPolicy()是代表线程策略
ExecutorService executorService = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());

// 模拟10个用户来办理业务,每个用户就是一个来自外部请求线程
try {
// 循环十次,模拟业务办理,让5个线程处理这10个请求
for (int i = 0; i < 10; i++) {
final int tempInt = i;
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 给用户:" + tempInt + " 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
executorService.shutdown();
}
}
}

采用CallerRunsPolicy拒绝策略

当我们更好其它的拒绝策略时,采用CallerRunsPolicy拒绝策略,也称为回退策略,就是把任务丢回原来的请求开启线程着,我们看运行结果

1
2
3
4
5
6
7
8
9
10
pool-1-thread-1	 给用户:0 办理业务
pool-1-thread-5 给用户:7 办理业务
pool-1-thread-4 给用户:6 办理业务
main 给用户:8 办理业务
pool-1-thread-3 给用户:5 办理业务
pool-1-thread-2 给用户:1 办理业务
pool-1-thread-3 给用户:9 办理业务
pool-1-thread-4 给用户:4 办理业务
pool-1-thread-5 给用户:3 办理业务
pool-1-thread-1 给用户:2 办理业务

我们发现,输出的结果里面出现了main线程,因为线程池出发了拒绝策略,把任务回退到main线程,然后main线程对任务进行处理

采用 DiscardPolicy 拒绝策略

1
2
3
4
5
6
7
8
pool-1-thread-1	 给用户:0 办理业务
pool-1-thread-3 给用户:5 办理业务
pool-1-thread-1 给用户:2 办理业务
pool-1-thread-2 给用户:1 办理业务
pool-1-thread-1 给用户:4 办理业务
pool-1-thread-5 给用户:7 办理业务
pool-1-thread-4 给用户:6 办理业务
pool-1-thread-3 给用户:3 办理业务

采用DiscardPolicy拒绝策略会,线程池会自动把后面的任务都直接丢弃,也不报异常,当任务无关紧要的时候,可以采用这个方式

采用DiscardOldestPolicy拒绝策略

1
2
3
4
5
6
7
8
pool-1-thread-1	 给用户:0 办理业务
pool-1-thread-4 给用户:6 办理业务
pool-1-thread-1 给用户:4 办理业务
pool-1-thread-3 给用户:5 办理业务
pool-1-thread-2 给用户:1 办理业务
pool-1-thread-1 给用户:9 办理业务
pool-1-thread-4 给用户:8 办理业务
pool-1-thread-5 给用户:7 办理业务

这个策略和刚刚差不多,会把最久的队列中的任务替换掉

线程池的四种使用方式

1.newCachedThreadPool

创建一个线程池,如果线程池中的线程数量过大,它可以有效的回收多余的线程,如果线程数不足,那么它可以创建新的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void method() throws Exception {

ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 5; i++) {

final int index = i;

Thread.sleep(1000);

executor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " " + index);
}
});
}
}

执行结果

以明显的看出,现在就需要几条线程来交替执行。

不足:这种方式虽然可以根据业务场景自动的扩展线程数来处理我们的业务,但是最多需要多少个线程同时处理缺是我们无法控制的;

优点:如果当第二个任务开始,第一个任务已经执行结束,那么第二个任务会复用第一个任务创建的线程,并 不会重新创建新的线程,提高了线程的复用率;

2.newFixedThreadPool

这种方式可以 指定线程池中的线程数。举个栗子,如果一间澡堂子最大只能容纳20个人同时洗澡,那么后面来的人只能在外面排队等待。如果硬往里冲,那么只会出现一种情景,摩擦摩擦…

首先测试一下最大容量为一个线程,那么会不会是我们预测的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void method_01() throws InterruptedException {

ExecutorService executor = Executors.newFixedThreadPool(1);

for (int i = 0; i < 10; i++) {

Thread.sleep(1000);
final int index = i;

executor.execute(() -> {
try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + index);
});
}
executor.shutdown();
}

输出结果

如果是三个线程数,那么就会有pool——1-thread-1到3了

优点:两个结果综合说明,newFixedThreadPool的 线程数是可以进行控制的,因此我们可以通过控制最大线程来使我们的服务器打到最大的使用率,同事又可以保证及时流量突然增大也不会占用服务器过多的资源。

3.newScheduledThreadPool

该线程池支持定时,以及周期性的任务执行,我们可以延迟任务的执行时间,也可以设置一个周期性的时间让任务重复执行。 该线程池中有以下两种延迟的方法。

  • scheduleAtFixedRate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/** 固定执行周期
* * @param1 命令任务执行
* * @param2 initialDelay 第一次执行的延迟时间
* * @param3 period 从上一次任务开始到下一次开始前的执行周期:
* * -- 如果执行周期时间大于任务的执行时间,任务不受执行时间的影响
* * -- 如果执行周期时间小于等于任务的执行时间,那么任务执行结束之后,会立马执行,至此间隔时间就会被打乱。
* * @param4 initialDelay and delay的单位,比如TimeUnit.SECONDS是秒
* public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
* long initialDelay,
* long period,
* TimeUnit unit);
*
* @author coderblue
*/
public class ScheduleAtFixedRate {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);

/**
* scheduleAtFixedRate 开始执行时间:9:41:50
* scheduleAtFixedRate 执行花费时间=2m
* scheduleAtFixedRate 执行完成时间:9:41:52
* ======================================
* scheduleAtFixedRate 开始执行时间:9:41:52
*/
executor.scheduleAtFixedRate(() -> ScheduleAtFixedRate.commonMethod(), 1, 2, TimeUnit.SECONDS);

/**
* scheduleAtFixedRate 开始执行时间:9:47:10
* scheduleAtFixedRate 执行花费时间=2m
* scheduleAtFixedRate 执行完成时间:9:47:12
* ======================================
* scheduleAtFixedRate 开始执行时间:9:47:15
*/
executor.scheduleAtFixedRate(() -> ScheduleAtFixedRate.commonMethod(), 1, 5, TimeUnit.SECONDS);
}

private static void commonMethod() {
long start = System.currentTimeMillis();
System.out.println("scheduleAtFixedRate 开始执行时间:" +
DateFormat.getTimeInstance().format(new Date()));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("scheduleAtFixedRate 执行花费时间=" + (end - start) / 1000 + "m");
System.out.println("scheduleAtFixedRate 执行完成时间:" + DateFormat.getTimeInstance().format(new Date()));
System.out.println("======================================");
}
}
  • scheduleWithFixedDelay
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
* 固定延迟时间:间隔时间不会受任务执行时间长短的影响
* * @param1 命令任务执行
* * @param2 initialDelay 第一次执行的延迟时间
* * @param3 delay 在两次执行间的间隔时间
* * @param4 initialDelay and delay的单位,比如TimeUnit.SECONDS是秒
* public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
* long initialDelay,
* long delay,
* TimeUnit unit);
* @author coderblue
*/
public class ScheduleWithFixedDelay {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);

/**
* 方式一
* scheduleWithFixedDelay 开始执行时间:9:56:51
* scheduleWithFixedDelay执行花费时间=1m
* scheduleWithFixedDelay执行完成时间:9:56:52
* ======================================
* scheduleWithFixedDelay 开始执行时间:9:56:56
*/
executor.scheduleWithFixedDelay(() -> {
ScheduleWithFixedDelay.commonMethod();
}, 1, 4, TimeUnit.SECONDS);

/**
* 方式二
* 等待十秒再运行第一次,然后每次任务间隔4秒
*/
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
ScheduleWithFixedDelay.commonMethod();
}
}, 10, 4, TimeUnit.SECONDS);
}

private static void commonMethod() {
long start = System.currentTimeMillis();
System.out.println("scheduleWithFixedDelay 开始执行时间:" +
DateFormat.getTimeInstance().format(new Date()));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("scheduleWithFixedDelay执行花费时间=" + (end - start) / 1000 + "m");
System.out.println("scheduleWithFixedDelay执行完成时间:"
+ DateFormat.getTimeInstance().format(new Date()));
System.out.println("======================================");
}
}

4.newSingleThreadExecutor

这是一个单线程池,至始至终都由一个线程来执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* pool-1-thread-1 0
* pool-1-thread-1 1
* pool-1-thread-1 2
*
* 单线程池,至始至终都由一个线程来执行
* @author coderblue
*/
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();

for (int i = 0; i < 5; i++) {
final int index = i;
executor.execute(() -> {
try {
Thread.sleep(1 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + index);
});
}
executor.shutdown();
}
}

指定调用哪个线程池

如果我们如下配置了多个线程池,如果不指定的话就会出现如图提示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package cn.coderblue.studyasync.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
* @author Blue
* @date 2020/12/2
**/
@Configuration
public class AsyncConfig implements AsyncConfigurer {

private static final int MAX_POOL_SIZE = 20;

private static final int CORE_POOL_SIZE = 2;

@Bean("asyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(CORE_POOL_SIZE);
// 设置最大线程数
executor.setMaxPoolSize(MAX_POOL_SIZE);
// 设置默认线程名称
executor.setThreadNamePrefix("thread-AsyncExecutor-");
// 设置拒绝策略rejection-policy:当pool已经达到max size的时候,如何处理新任务 CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 设置非核心线程超时回收时间
executor.setKeepAliveSeconds(60);
executor.setAllowCoreThreadTimeOut(true);
executor.initialize();
return executor;
}

@Bean("asyncTaskExecutor1")
public AsyncTaskExecutor asyncTaskExecutor1() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(CORE_POOL_SIZE);
// 设置最大线程数
executor.setMaxPoolSize(MAX_POOL_SIZE);
// 设置默认线程名称
executor.setThreadNamePrefix("thread-AsyncExecutor123-");
// 设置拒绝策略rejection-policy:当pool已经达到max size的时候,如何处理新任务 CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 设置非核心线程超时回收时间
executor.setKeepAliveSeconds(60);
executor.setAllowCoreThreadTimeOut(true);
executor.initialize();
return executor;
}
}

具体提示信息为:More than one TaskExecutor bean found within the context, and none is named ‘taskExecutor’. Mark one of them as primary or name it ‘taskExecutor’ (possibly as an alias) in order to use it for async processing: [asyncTaskExecutor, asyncTaskExecutor1]

因为 Spring 检测到全局存在多个Executor,就会降级使用默认的SimpleAsyncTaskExecutor

解决办法,我们可以通过注入的对应名称来调用,如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 指定调用的线程池
@Async("asyncTaskExecutor")
@Override
public Future<String> asyncTask(String s) {
long startTime = System.currentTimeMillis();
try {
//模拟耗时
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":Future<String> asyncTask(String s),耗时:" + (endTime - startTime));
return AsyncResult.forValue(s);
}

线程池的合理参数

生产环境中如何配置 corePoolSize 和 maximumPoolSize

这个是根据具体业务来配置的,分为CPU密集型和IO密集型

  • CPU密集型

CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行

CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程)

而在单核CPU上,无论你开几个模拟的多线程该任务都不可能得到加速,因为CPU总的运算能力就那些

CPU密集型任务配置尽可能少的线程数量:

一般公式:CPU核数 + 1个线程数

  • IO密集型

由于IO密集型任务线程并不是一直在执行任务,则可能多的线程,如 CPU核数 * 2

IO密集型,即该任务需要大量的IO操作,即大量的阻塞

在单线程上运行IO密集型的任务会导致浪费大量的CPU运算能力花费在等待上

所以IO密集型任务中使用多线程可以大大的加速程序的运行,即使在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。

IO密集时,大部分线程都被阻塞,故需要多配置线程数:

参考公式:CPU核数 / (1 - 阻塞系数) 阻塞系数在0.8 ~ 0.9左右

例如:8核CPU:8/ (1 - 0.9) = 80个线程数

打赏
  • 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  1. © 2020-2021 Lauy    湘ICP备20003709号

请我喝杯咖啡吧~

支付宝
微信