java并发编程-线程关闭与取消

以下是本文目录大纲:

  线程取消与关闭

  Futre Cancel方法;

  线程中断

  Thread interrupt方法;

  线程关闭及线程池关闭;

线程取消与关闭:

     要使任务和线程能安全、快速、可靠地停止下来,并不是一件容易的事。Java没有提供任何机制来安全地终止线程(虽然Thread.stop和suspend方法提供了这样的机制,但由于存在缺陷,因此应该避免使用)。但它提供了中断,这是一种协作机制,能够使一个线程终止另一个线程的当前工作。

这种协作式的方法是必要的,我们很少希望某个任务、线程或服务立即停止,因为这种立即停止会使共享的数据结构处于不一致的状态。相反,在编写任务和服务时可以使用一种协作的方式:当需要停止时,它们首先会清除当前正在执行的工作,然后再结束。这提供了更好的灵活性,因为任务本身的代码比发出取消请求的代码更清楚如何执行清除工作。

生命周期结束的问题会使任务、服务以及程序的设计和实现等过程变得复杂,而这个在程序设计中非常重要的要素却经常被忽略。一个在行为良好的软件与勉强运行的软件之间最主要的区别就是,行为良好的软件能很完善地处理失败、关闭和取消等过程。

通过协作标识来取消任务执行:

     其中一种协作机制能设置某个“已请求取消”标志,而任务将定期地查看该标志。如果设置了这个标志,那么任务将提前结束。下面程序就使用了这项技术,其中PrimeGenerator持续地枚举素数,直到它被取消。cancel方法将设置canceled标志,并且主循环在搜索下一个素数之前会首先检查这个标志。(为了使这个过程能可靠地工作,标志canceled必须为volatile类型。)

public class PrimeGenerator implements Runnable {
    private static ExecutorService exec = Executors.newCachedThreadPool();
    @GuardedBy("this") private final List<BigInteger> primes = new ArrayList<BigInteger>();
    private volatile boolean cancelled;
    public void run() {
        BigInteger p = BigInteger.ONE;
        while (!cancelled) {
            p = p.nextProbablePrime();
            synchronized (this) {
                primes.add(p);
            }
        }
    }
    public void cancel() {
        cancelled = true;
    }
    public synchronized List<BigInteger> get() {
        return new ArrayList<BigInteger>(primes);
    }
    static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
        PrimeGenerator generator = new PrimeGenerator();
        exec.execute(generator);
        try {
            SECONDS.sleep(1);
        } finally {
            generator.cancel();
        }
        return generator.get();
    }
}

     程序给出了这个类的使用示例,即让素数生成器运行1秒后取消。素数生成器通常不会刚好在运行1秒钟后停止,因为在请求取消的时刻和run方法中循环执行下一次检查之间可能存在延迟。cancel方法由finally块调用,从而确保即使在调用sleep时被中断也能取消素数生成器的执行。如果cancel没有被调用,那么搜索素数的线程将永远执行下去,不断消耗CPU的时钟周期,并使JVM不能正常退出。

线程中断:

     PrimeGenerator中的取消机制最终会使地搜索素数的任务退出,但并不是立刻发生的,需要花费一定的时间。然而,如果使用这种方法的任务调用了一个阻塞方法,例如BlockingQueue.put,那么可能会产生一个更严重的问题——任务可能永远不会检查取消标志,因此永远不会取消。 

    如下程序说明了这一点,生产者线程生成素数,并将它们放入一个阻塞队列。如果生产者的速度超过了消费者的处理速度,队列将被填满,put方法也会阻塞。当生产者在put方法中阻塞时,如果消费者希望取消生产者任务,那么将发生什么情况?它可以调用cancel方法来设置canceled标志,但此时生产者却永远不能检查这个标志,因为它无法从阻塞的put方法中恢复过来(因为消费者此时已经停止从队列中取出素数,所以put方法将一直保持阻塞状态)。

class BrokenPrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;
    private volatile boolean cancelled = false;
    BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }
    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!cancelled)
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
        }
    }
    public void cancel() {
        cancelled = true;
    }
}

      一些特殊的阻塞库的方法支持中断。线程中断是一种协作机制,线程可以通过这种机制来通知另一个线程,告诉它在合适的或者可能的情况下停止当前工作,并转而执行其他的工作。

Java提供了改变线程的中断状态的方法:

      在JDK1.0中,可以用stop方法来终止,但是现在这种方法已经被禁用了,改用interrupt方法。Thread.interrupt()方法不会中断一个正在运行的线程。它的作用是,在线程受到阻塞时抛出一个中断信号,这样线程就得以退出阻塞的状态。更确切的说,如果线程被Object.wait, Thread.join和Thread.sleep三种方法之一阻塞,那么,它将接收到一个中断异常(InterruptedException),从而提早地终结被阻塞状态。

在Java提供的线程支持类Thread中,有三个用于线程中断的方法: 

      public void interrupt(); 中断线程。其实是改变中断状态而已

      public static boolean interrupted(); 是一个静态方法,用于测试当前线程是否已经中断,并将线程的中断状态 清除。所以如果线程已经中断,调用两次interrupted,第二次时会返回false,因为第一次返回true后会清除中断状态。 

      public boolean isInterrupted(); 测试线程是否已经中断,并未消除中断状态

要想中断离开线程的两种常用方式:

  抛出InterruptedException和用Thread.interrupted()检查是否发生中断;详细这可以查看

      http://www.dczou.com/viemall/284.html

  所以使用中断而不是boolean标志来请求取消可以很好地解决BrokenPrimeProducer中的问题,如下程序所示:

public class PrimeProducer extends Thread {
    private final BlockingQueue<BigInteger> queue;
    PrimeProducer(BlockingQueue<BigInteger> queue) {
        this.queue = queue;
    }
    public void run() {
        try {
            BigInteger p = BigInteger.ONE;
            while (!Thread.currentThread().isInterrupted())
                queue.put(p = p.nextProbablePrime());
        } catch (InterruptedException consumed) {
            /* Allow thread to exit */
        }
    }
    public void cancel() {
        interrupt();
    }
}

通过Future来实现取消

  可以通过Future Cancel方法取消任务执行:

   cancel方法用来取消任务:

        当你想要取消你已提交给执行者的任务,使用Future接口的cancel()方法。根据cancel()方法参数和任务的状态不同,这个方法的行为将不同:

如果这个任务已经完成或之前的已被取消或由于其他原因不能被取消,那么这个方法将会返回false并且这个任务不会被取消。

如果这个任务正在等待执行者获取执行它的线程,那么这个任务将被取消而且不会开始它的执行。如果这个任务已经正在运行,则视方法的参数情况而定。 cancel()方法接收一个Boolean值参数(mayInterruptIfRunning)。如果参数为true并且任务正在运行,那么这个任务将被取消。如果参数为false并且任务正在运行,那么这个任务将不会被取消。 

   isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。

   isDone方法表示任务是否已经完成,若任务完成,则返回true;

如:任务超时取消任务执行:

Future<?> task = taskExec.submit(r);//提交任务
    try {
        task.get(timeout, unit);//指定任务超时时间
    } catch (TimeoutException e) {
        // 接下来任务将被取消
        // 在finally调用cancel
    } catch (ExecutionException e) {
        // 如果在任务中抛出了异常,那么重新抛出该异常
        throw launderThrowable(e.getCause());
    } finally {
        //取消那些不在需要结果的任务
        //如果任务已经结束,那么执行取消操作也不会带来任何影响
        task.cancel(true);  // 如果任务正在运行,那么将被中断
}

停止基于线程的服务

      应用程序通常会创建基于线程的服务,如线程池。这些服务的时间一般比创建它的方法更长。如果应用程序完成退出,这些服务线程也要结果。由于无法通过抢占式的方法来停止线程,因此它们需要自行结束。在ExecutorService中提供了shutdown和shutdownNow等方法。同样,在其他拥有线程的服务中也应该提供类似的关闭机制。

     1. shutdown()方法是优雅关闭的方式

    2. shutdownNow()是粗暴关闭的方式,它返回一个还未开始执行的任务的列表。对于正在运行的任务,它采用Thread.interrupt()的方式来取消任务。如果任务不响应线程的中断,那么这个任务可能会一直执行。

    3. awaitTermination()方法会等待一段时间再来终止执行的任务

    4、isTerminated() 如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用 shutdown 或 shutdownNow,否则 isTerminated 永不为 true。

    while(true){  
         if(exe.isTerminated()){  
            System.out.println("所有的子线程都结束了!");  
            break;  
         }  
         Thread.sleep(1000);    
         }

如果ExecutorService已关闭,再向它提交任务时会抛RejectedExecutionException异常。

处理不可中断的阻塞

     在Java库中,许多可阻塞的方法都是通过提前返回或者抛出InterruptedException来响应中断请求的,从而使开发人员更容易构建出能响应取消请求的任务。然而,并非所有的可阻塞方法或者阻塞机制都能响应中断:如果一个线程由于执行同步的Socket I/O或者等待获得内置锁而阻塞,那么中断请求只能设置线程的中断状态,除此之外没有其他任何作用。

    对于那些由于执行不可中断操作而被阻塞的线程,可以使用类似于中断的手段来停止这些线程,但这要求我们必须知道线程阻塞的原因。

         Java.io包中的同步Socket I/O。在服务器应用程序中,最常见的阻塞I/O形式就是对套接字进行读取和写入。虽然InputStream和OutputStream中的read和write等方法都不会响应中断,但通过关闭底层的套接字,可以使得由于执行read或write等方法而被阻塞的线程抛出一个SocketException。

        Java.io包中的同步I/O。当中断一个正在InterruptibleChannel上等待的线程时,将抛出ClosedByInterruptedException)并关闭链路(这还会使得其他在这条链路上阻塞的线程同样抛出ClosedByInterruptException)。当关闭一个InterruptibleChannel时,将导致所有在链路操作上阻塞的线程抛出AsynchronousCloseException。大多数标准的Channel都实现了InterruptibleChannel。

      Selector的异步I/O。如果一个线程在调用Selector.select方法(在java.nio.channels中)时阻塞了,那么调用close或wakeup方法会使线程抛出ClosedSelectorException并提前返回。

      获取某个锁。如果一个线程由于等待某个内置锁而被阻塞,那么将无法响应中断,因为线程认为它肯定获得锁,所以将不会理会中断请求。但是,在Lock类中提供了lockInterruptibly方法,该方法允许在等待一个锁的同时仍能响应中断。(尝试获取一个内部锁的操作(进入一个 synchronized 块)是不能被中断的)

下面ReaderThread展示了一项用来封装非标准取消任务的技术(非标准中断技术,标准的中断是调用线程的interrupt方法来抛出InterruptException异常?),通过重写Thread的interrupt来封装:

public class ReaderThread extends Thread {
    private final Socket socket;
    private final InputStream in;
    public ReaderThread(Socket socket) throws IOException {
        this.socket = socket;
        this.in = socket.getInputStream();
    }
    //重写Thread的interrupt,即支持标准的中断,也关闭了底层的socket
    public void  interrupt()  {//
        try {
            socket.close();//要中断socket的阻塞方法则需要关闭socket
        }
        catch (IOException ignored) { }
        finally {
            super.interrupt();//不要忘了调用标准中断interrupt
        }
    }
    public void run() {
        try {
            byte[] buf = new byte[BUFSZ];
            while (true) {
                int count = in.read(buf);//这里可能会阻塞,如果在阻塞时socket被关闭,则会抛出异常,从而可以跳出阻塞状态
                if (count < 0)
                    break;
                else if (count > 0)
                    processBuffer(buf, count);
            }
        } catch (IOException e) { /*  允许线程退出  */  }
    }
}

JVM的关闭;

 JVM既可通过正常手段来关闭,也可强行关闭。

    正常关闭:当最后一个“正常(非守护)”线程结束时、当有人调用了System.exit时、或者通过其他特定于平台的方法关闭时(例如发送了SIGINT信号或键入Ctrl-C)。

    强行关闭:杀死JVM操作系统进程 kill

   监听JVM关闭,可以用于实现服务或应用程序清理工作,例如删除临时文件,或者清除无法由操作系统自动清除的资源。如下所示,在start方法中注册一个关闭钩子,从而确保在退出时关闭日志文件:

public void start()
{
    Runnable.getRuntime().addShutdownHook(new Thread(){
        public void run()
        {
            try{LogService.this.stop();}
            catch(InterruptedException ignored){}
        }
    });
}

     由于关闭钩子将并发执行,因此在关闭日志文件时可能导致其他需要日志服务的关闭钩子产生问题。为了避免这种情况,关闭钩子不应该依赖那些可能被应用程序或其他关闭钩子关闭的服务。实现这种功能的一种方式是对所有服务使用同一个关闭钩子(而不是每个服务使用一个不同的关闭钩子),并且在该关闭钩子中执行一系列的关闭操作。这确保了关闭操作在单个线程中串行执行,从而避免了在关闭操作之间出现竞态条件或死锁问题。

参考资料<<Java并发编程>>7章;

发表评论