Java并发编程-线程池

线程池简单来说就是一个存放空闲线程的容器,是为了避免系统频繁创建和销毁线程提出的一个概念,类似数据库连接池。有了线程池之后线程的创建和销毁就变成了从线程池获取空闲线程和归还线程,很大程度节省了创建和销毁线程的系统开销。

一、JDK内置线程池

JDK本身提供了内置的线程池支持,即Executor框架。

Executors工厂类中声明了几种内置的线程池支持,以下是Executors中几种线程池的代码和注释

1、newFixedThreadPool

创建一个固定线程数量的线程池,线程池中线程数量固定不变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue, using the provided
* ThreadFactory to create new threads when needed. At any point,
* at most {@code nThreads} threads will be active processing
* tasks. If additional tasks are submitted when all threads are
* active, they will wait in the queue until a thread is
* available. If any thread terminates due to a failure during
* execution prior to shutdown, a new one will take its place if
* needed to execute subsequent tasks. The threads in the pool will
* exist until it is explicitly {@link ExecutorService#shutdown
* shutdown}.
*
* @param nThreads the number of threads in the pool
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

2、newSingleThreadExecutor

创建一个只有一个线程的线程池,如果多个提交多个任务,则会直接保存到阻塞队列中,待线程空闲根据先进先出顺序执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue, and uses the provided ThreadFactory to
* create a new thread when needed. Unlike the otherwise
* equivalent {@code newFixedThreadPool(1, threadFactory)} the
* returned executor is guaranteed not to be reconfigurable to use
* additional threads.
*
* @param threadFactory the factory to use when creating new
* threads
*
* @return the newly created single-threaded Executor
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

3、newCachedThreadPool

创建一个可以根据实际情况调整线程数量的线程池,线程池的线程数量不确定,如果当前时刻线程池中有空闲线程可以使用,则新提交的任务优先使用空闲线程执行,否则,新提交的线程将直接进入阻塞队列等待空闲线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

4、newSingleThreadScheduledExecutor

创建一个ScheduledExecutorService对象,线程池大小为1,ScheduledExecutorService接口在ExecutorService接口之上扩展了在给定时间执行某个任务的功能,即定时任务单线程线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically.
* (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newScheduledThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
* @return the newly created scheduled executor
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}

/**
* Creates a single-threaded executor that can schedule commands
* to run after a given delay, or to execute periodically. (Note
* however that if this single thread terminates due to a failure
* during execution prior to shutdown, a new one will take its
* place if needed to execute subsequent tasks.) Tasks are
* guaranteed to execute sequentially, and no more than one task
* will be active at any given time. Unlike the otherwise
* equivalent {@code newScheduledThreadPool(1, threadFactory)}
* the returned executor is guaranteed not to be reconfigurable to
* use additional threads.
* @param threadFactory the factory to use when creating new
* threads
* @return a newly created scheduled executor
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}

5、newScheduledThreadPool

创建一个指定数量的ScheduledExecutorService线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @param threadFactory the factory to use when the executor
* creates a new thread
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

二、核心线程池内部实现

对于几个内置线程池来说,虽然具备不同个功能,但是其内部实现还是使用了ThreadPoolExecutor来实现,接下来研究一下ThreadPoolExecutor类,先看构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

构造函数的参数含义:
corePoolSize:指定了线程池中线程的数量。
maximumPoolSize:指定了线程池中最大线程数量。
keepAliveTime:当线程池中线程数量超过corePoolSize时,多余的空闲线程的存活时间。
unit:keepAliveTime的单位。
workQueue:任务队列,被提交但尚未被执行的任务将进入任务队列。
threadFactory:线程工厂,用于创建线程。
handler:拒绝策略,当线程池任务太多,超出处理能力,采取何种方式拒绝任务。

三、任务队列

参数wordQueue队列存放被提交但是未被执行的任务,它是一个BlockingQueue接口的对象,仅用于存放Runnable对象,任务队列又分为以下几种:

1、直接提交队列:直接提交任务队列由SynchronousQueue对象提供,SynchronousQueue没有容量,每一个插入操作,都要对应一个删除操作,如果使用SynchronousQueue,提交的任务不会被保存而是直接提交给线程执行,如果没有空闲线程,则创建线程执行,如果线程数量达到maximumPoolSize,则执行拒绝策略,所以如果使用SynchronousQueue作为任务队列,maximumPoolSize通常要设置很大,否则很容易被拒绝。

2、有界任务队列:ArrayBlockingQueue是有界任务队列的实现代表,ArrayBlockingQueue的构造函数必须有一个定义最大容量的参数,在使用有界任务队列时,当一个新任务提交,如果线程池的线程数小于corePoolSize数量,则直接创建线程执行任务,如果线程数量大于corePoolSize数量,则将任务放入任务队列中,如果持续新任务提交导致任务队列已满,无法加入,此时如果线程池中线程数小于maximumPoolSize,则创建新线程执行任务,如果线程数大于maximumPoolSize,则直接执行拒绝策略。

3、无界任务队列:LinkedBlockingQueue是无界任务队列的实现代表,无界任务队列除非系统资源耗尽,否则不会出现队列已满,无法加入队列的情况,在使用无界任务队列时,当一个新任务提交,如果线程池的线程数小于corePoolSize数量,则直接创建线程执行任务,但当线程数量达到corePoolSize数量时,因为无界队列没有容量界限,所以在这种情况线后续提交的任务都将进入任务队列,如果任务提交速度远大于任务处理速度,这可能导致任务队列快速增长,直到耗尽内存。

4、优先任务队列:PriorityBlockingQueue是优先任务队列的实现代表,优先任务队列是带有优先级的任务队列,可以控制执行顺序,是一个特殊的无界队列,ArrayBlockingQueue和LinkedBlockingQueue都是按照先进先出的策略执行任务,PriorityBlockingQueue可以根据任务优先级控制任务的执行顺序。

对于JDK内置的几个线程池来说,newFixedThreadPool和newSingleThreadExecutor使用的是LinkedBlockingQueue无界队列,所以就存在任务提交速度快于任务执行速度,导致任务队列快速膨胀,最终导致内存耗尽的风险。

newCachedThreadPool是corePoolSize为0,maximumPoolSize无穷大的线程池,这就是说,没有任务提交时,线程池中没有线程,当任务提交时,如果线程池中没有空闲线程,则线程池就要创建新的线程执行任务,当线程执行完成任务后,由于corePoolSize为0,所以线程将在60s内被回收,同样的,如果同时大量任务持续提交,就会创建大量线程执行任务,如果任务执行速度又很慢,那么大量任务可能很快耗尽系统资源。

四、拒绝策略

拒绝策略就是当线程池中任务队列已满且线程数超过maximumPoolSize时,这是线程池已经无法再接纳新的任务,所以要拒绝新的任务,具体如何拒绝新任务,就靠拒绝策略的实现。

JDK内置四种拒绝策略:

  • AbortPolicy:直接拒绝,抛出异常,阻止系统执行。
  • CallerRunsPolicy:只要线程池未关闭,被拒绝的任务将直接在调用者的线程中执行。
  • DiscardOledestPolicy:丢弃最老的请求,然后再次尝试提交任务。
  • DiscardPolicy:直接丢弃无法提交的任务。

以上策略实现了RejectedExecutionHandler接口,实际应用过程中可以自己实现定制化的拒绝策略。

坚持原创技术分享,您的支持将鼓励我继续创作!