java并发编程-线程池

以下是本文的目录大纲:

     一.Java中的Executor接口

     一.Java中的ExecutorService接口

   三.Java中的ThreadPoolExecutor类

   四.深入剖析线程池实现原理

   五.使用示例

   六.如何合理配置线程池的大小 

Executor接口

 JUC包中除了一系列的同步类之外,就是Executor执行框架相关的类。对于一个执行框架来说,可以分为两部分   

   1. 任务的提交

   2. 任务的执行

 这是一个生产者消费者模式,提交任务的操作是生产者,执行任务的线程相当于消费者。

 Excutor 整体结构如下:

blob.png

 Executor 接口定义了最基本的 execute 方法,用于接收用户提交任务。

 ExecutorService 接口继承Executor, 定义了线程池终止和创建及提交 futureTask 任务支持的方法。

 AbstractExecutorService 是抽象类,继承ExecutorService,主要实现了 ExecutorService 和 futureTask 相关的一些任务创建和提交的方法。

 ThreadPoolExecutor 是最核心的一个类,是线程池的内部实现。线程池的功能都在这里实现了,平时用的最多的基本就是这个了。

 ScheduledThreadPoolExecutor 在 ThreadPoolExecutor 的基础上提供了支持定时调度的功能。线程任务可以在一定延时时间后才被触发执行。

  

  Executor接口设计的目的是专注于任务的执行,和任务的提交解耦。任务的提交由任务的创建者处理。Executor接口封装了任务执行的细节,比如如何使用线程,是否定时执行等等。Executor接口很简单,就一个execute方法。

public interface Executor {  
    void execute(Runnable command);  
}

 如果不使用Executor接口,直接用Thread显式地来执行一个Runnable,代码是这样的

new Thread(new RunnableTask()).start()

 

而使用Executor接口执行Runnable的代码是这样的

Executor executor = xxxExecutor;  
executor.execute(new RunnableTask());

可以看到Executor接口的一个是屏蔽了任务执行的细节。它完全可以直接使用Executor所在的线程直接同步执行任务

class DirectExecutor implements  Executor {  
    public void execute(Runnable r) {  
       r.run();  
    }  
}

也可以使用单独的线程来执行任务,从而异步的执行任务。

class ThreadPerTaskExecutor implements Executor {  
   public void execute(Runnable r) {  
        new Thread(r).start();  
     }  
}

ExecutorService接口的设计

public interface ExecutorService extends Executor {  
  
    void shutdown();  
  
    List<Runnable> shutdownNow();  
      
    boolean isShutdown();  
  
    boolean isTerminated();  
    boolean awaitTermination(long timeout, TimeUnit unit)  
        throws InterruptedException;  
    <T> Future<T> submit(Callable<T> task);  
      
    <T> Future<T> submit(Runnable task, T result);  
     
    Future<?> submit(Runnable task);  
    
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)  
        throws InterruptedException;  
      
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,  
                                  long timeout, TimeUnit unit)  
        throws InterruptedException;  
      
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)  
        throws InterruptedException, ExecutionException;  
   
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,  
                    long timeout, TimeUnit unit)  
        throws InterruptedException, ExecutionException, TimeoutException;  
}

  ExecutorService负责执行任务。它的生命周期有3个:运行,关闭和已终止在运行的任何时刻,有些 任务可能已经完成,有些可能正在运行,有些可能正在队列中等待执行。所以如果要关闭Executor的话,就有多种方式,比如优雅平滑的关闭,当执行关闭时就不在接受新的任务请求,并且等到已运行的任务执行完成再关闭Executor;比如最粗暴的关闭,直接关闭Executor,不管在运行的任务是否执行完毕。

      1. shutdown()方法是优雅关闭的方式

      2. shutdownNow()是粗暴关闭的方式,它返回一个还未开始执行的任务的列表。对于正在运行的任务,它采用Thread.interrupt()的方式来取消任务。如果任务不响应线程的中断,那么这个任务可能会一直执行。

      3. awaitTermination()方法会等待一段时间再来终止执行的任务

      4、isTerminated() 如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。

         while(true){  
         if(exe.isTerminated()){  
            System.out.println("所有的子线程都结束了!");  
            break;  
         }  
         Thread.sleep(1000);    
    }

   除了关闭Executor和检查Executor状态的方法外,ExecutorService还定义了一系列的submit()和invoke()来支持任务异步执行和批量执行。

    submit()方法支持Callable接口和普通的Runnable接口,它的目的是返回一个Future对象来支持任务的异步执行。调用者可以通过Future对象来检查任务的执行状态。后面会具体介绍Future相关的内容。 invokeAll()和invokeAny()提供了批量执行任务的接口。

ThreadPoolExecutor介绍:

   从 Java 5 开始,Java 提供了自己的线程池。线程池就是一个线程的容器,每次只执行额定数量的线程。ThreadPoolExecutor是Executor执行框架最重要的一个实现类,提供了线程池管理和任务管理是两个最基本的能力。这篇通过分析ThreadPoolExecutor的源码来看看如何设计和实现一个基于生产者消费者模型的执行器。

aaaaa.png

在ThreadPoolExecutor类中提供了四个构造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

   从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

  

  下面解释下一下构造器中各个参数的含义:

     corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

    maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;如果workQueue采用的是LinkedBlockingQueue无界队列,maximumPoolSize这个值将会无效,因为线程池将会无限的创建新线程至缓存队列中。

    keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有 currentPoolSize>corePoolSize时,keepAliveTime才会起作用. 即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize

    但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

   unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

    TimeUnit.DAYS;               //天

TimeUnit.HOURS;             //小时

TimeUnit.MINUTES;           //分钟

TimeUnit.SECONDS;           //秒

TimeUnit.MILLISECONDS;      //毫秒

TimeUnit.MICROSECONDS;      //微妙

TimeUnit.NANOSECONDS;       //纳秒

 

   workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

   ArrayBlockingQueue;

   LinkedBlockingQueue;

   SynchronousQueue;

   1. 采用(直接提交)SynchronousQueue直接将任务传递给空闲的线程执行,不额外存储任务。这种方式需要无限制的MaximumPoolSize,可以创建无限制的工作线程来处理提交的任务。这种方式的好处是任务可以很快被执行,适用于任务到达时间大于任务处理时间的情况。缺点是当任务量很大时,会占用大量线程。newCachedThreadPool采用的便是这种策略。

   2. (无界队列) 使用无界队列(典型的便是采用预定义容量的 LinkedBlockingQueue,理论上是该缓冲队列可以对无限多的任务排队)将导致在所有 corePoolSize 线程都工作的情况下将新任务加入到缓冲队列中。这样,创建的线程就不会超过 corePoolSize,也因此,maximumPoolSize 的值也就无效了。这种方式好处是使用的线程数量是稳定的,当内存足够大时,可以处理足够多的请求。当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列。

   缺点是如果任务直接有依赖,很有可能形成死锁,因为当工作线程被消耗完时,不会创建新的工作现场,只会把任务加入工作队列。并且可能由于内存耗尽引发内存溢出OOM

   3. 采用有界的工作队列AraayBlockingQueue。这种情况下对于内存资源是可控的,但是需要合理调节MaximumPoolSize和工作队列的长度,这两个值是相互影响的。当工作队列长度比较小的时,必定会创建更多的线程。而更多的线程会引起上下文切换等额外的消耗。当工作队列大,MaximumPoolSize小的时候,会影响吞吐量,并且会触发拒绝机制。

上述简单的介绍,接下来将细致分析每一个Queue。

 

  threadFactory:线程工厂,主要用来创建线程;

     采用threadFactory创建新的线程

  handler:表示当拒绝处理任务时的策略,有以下四种取值:

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

   ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系:

    Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

  然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

  抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

  然后ThreadPoolExecutor继承了类AbstractExecutorService。

线程池中的线程初始化

  默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。

  在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:

     prestartCoreThread():初始化一个核心线程;

     prestartAllCoreThreads():初始化所有核心线程

  线程池的关闭

  ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

    shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务

    shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务.

 

  线程池容量的动态调整

  ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

    setCorePoolSize:设置核心池大小

    setMaximumPoolSize:设置线程池最大能创建的线程数目大小

  当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。

线程池的工作过程如下:

   1、线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。 你也可以预制线程。

  2、当调用 execute() 方法添加一个任务时,线程池会做如下判断:

      a)    如果线程池中的线程数量少于corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务;

      b)    如果线程池中的线程数量大于等于corePoolSize,但缓冲队列workQueue未满,则将新添加的任务放到workQueue中,按照FIFO的原则依次等待执行(线程池中有线程空闲出来后依次将缓冲队列中的任务交付给空闲的线程执行);

      c)    如果线程池中的线程数量大于等于corePoolSize,且缓冲队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;

      d)    如果线程池中的线程数量等于了maximumPoolSize,有4种才处理方式(该构造方法调用了含有5个参数的构造方法,并将最后一个构造方法为RejectedExecutionHandler类型,它在处理线程溢出时有4种方式)。

总结起来,也即是说,当有新的任务要处理时,先看线程池中的线程数量是否大于corePoolSize,再看缓冲队列workQueue是否满,最后看线程池中的线程数量是否大于maximumPoolSize。

  3、当一个线程完成任务时,它会从队列中取下一个任务来执行。

   4、当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

 

线程池案例分析:

比如:假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。

  因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;

  当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;

  如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;

  然后就将任务也分配给这4个临时工人做;

  如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。

  当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。

  这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

  也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。

线程池的例子:

public class ThreadPoolTest {  
    //线程池维护线程的最少数量    
    private static final int COREPOOLSIZE = 2;    
    //线程池维护线程的最大数量    
    private static final int MAXINUMPOOLSIZE = 5;    
    //线程池维护线程所允许的空闲时间    
    private static final long KEEPALIVETIME = 4;    
    //线程池维护线程所允许的空闲时间的单位    
    private static final TimeUnit UNIT = TimeUnit.SECONDS;    
    //线程池所使用的缓冲队列,这里队列大小为3    
    private static final BlockingQueue<Runnable> WORKQUEUE = new ArrayBlockingQueue<Runnable>(3);    
    //线程池对拒绝任务的处理策略:AbortPolicy为抛出异常;CallerRunsPolicy为重试添加当前的任务,他会自动重复调用execute()方法;DiscardOldestPolicy为抛弃旧的任务,DiscardPolicy为抛弃当前的任务    
    private static final AbortPolicy HANDLER = new ThreadPoolExecutor.AbortPolicy();    
    
    public static void main(String[] args) {    
        // TODO 初始化线程池    
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(COREPOOLSIZE, MAXINUMPOOLSIZE, KEEPALIVETIME, UNIT, WORKQUEUE, HANDLER);    
        for (int i = 1; i < 11; i++) {    
            String task = "task@"+i;    
            System.out.println("put->"+task);    
            //一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法    
            //处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务    
            //设此时线程池中的数量为currentPoolSize,若currentPoolSize>corePoolSize,则创建新的线程执行被添加的任务,    
            //当corePoolSize+workQueue>currentPoolSize>=corePoolSize,新增任务被放入缓冲队列,    
            //当maximumPoolSize>currentPoolSize>=corePoolSize+workQueue,建新线程来处理被添加的任务,    
            //当currentPoolSize>=maximumPoolSize,通过 handler所指定的策略来处理新添加的任务    
            //本例中可以同时可以被处理的任务最多为maximumPoolSize+WORKQUEUE=8个,其中最多5个在线程中正在处理,3个在缓冲队列中等待被处理    
            //当currentPoolSize>corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数    
            threadPool.execute(new ThreadPoolTask(task));    
            try {    
                Thread.sleep(1000);    
            } catch (InterruptedException e) {    
                // TODO Auto-generated catch block    
                e.printStackTrace();    
            }    
        }    
        threadPool.shutdown();//关闭主线程,但线程池会继续运行,直到所有任务执行完才会停止。若不调用该方法线程池会一直保持下去,以便随时添加新的任务    
    }    
}  
import java.io.Serializable;  
public class ThreadPoolTask implements Runnable,Serializable{  
     private static final long serialVersionUID = -8568367025140842876L;    
        
        private Object threadPoolTaskData;    
        private static int produceTaskSleepTime = 10000;    
            
        public ThreadPoolTask(Object threadPoolTaskData) {    
            super();    
            this.threadPoolTaskData = threadPoolTaskData;    
        }    
        
        public void run() {    
            // TODO Auto-generated method stub    
            System.out.println("start..."+threadPoolTaskData);    
            try {    
                //模拟线程正在执行任务    
                Thread.sleep(produceTaskSleepTime);    
            } catch (InterruptedException e) {    
                // TODO Auto-generated catch block    
                e.printStackTrace();    
            }    
            System.out.println("stop..."+threadPoolTaskData);    
            threadPoolTaskData = null;    
        }    
            
        public Object getTask(){    
            return this.threadPoolTaskData;    
        }    
}

参考链接:

 ExecutorService生命周期

 Java并发编程:线程池的使用

 

发表评论