本文的小例子主要是说明如何安全中断阻塞队列中的任务,避免使用interrupt()中断线程,造成堵塞队列中没有被消费的任务都被忽略。
package com.thread;
import java.io.PrintWriter;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* 单个线程的安全取消
*
* 这里模拟的场景是单个线程,处理阻塞队列中要处理的日志信息
*
* 分析:由于使用了阻塞队列,而阻塞队列的take操作响应中断;如果日志线程被修改为捕获InterruptedException就退出,那么
* 中断日志就能够停止了。
* 但是,这样使用Interrupt来中断线程,突然的关闭会忽略等待中需要被记录的日志。
*
* 解决方案:添加“已请求关闭”标志,和队列中要处理的日志的数量。
* 当发起Interrupt来中断线程时,并不会直接中断线程,只是改变“已请求关闭”标志为ture,
* 只有当标志为ture并且队列中已请求进来的要处理的日志都处理完了才会中断线程。
*
* @author hadoop
*
*/
public class SafeCancellThread {
class LogService{
private final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(50);
private final LoggerThread loggerThread = new LoggerThread();
private PrintWriter writer ;
private boolean isShutdown;
private int reservations;
public LogService(PrintWriter writer) {
this.writer = writer;
}
public void start(){
loggerThread.start();
}
public void stop(){
synchronized (this) {
isShutdown = true;
}
loggerThread.interrupt();
}
public void log(String msg) throws InterruptedException{
synchronized (this) {
if(isShutdown){
throw new IllegalStateException();
}
++reservations;
}
System.out.println("向队列中添加:"+msg);
queue.put(msg);//向阻塞队列中添加任务
}
class LoggerThread extends Thread{
public void run(){
try {
while(true){
try {
synchronized(LogService.this){
if(isShutdown && reservations == 0){
System.out.println("队列中的任务处理完成");
break;
}
}
//消费阻塞队列
String msg = queue.take();
synchronized(LogService.this){
--reservations;
}
Thread.currentThread().sleep(100);
System.out.println("处理"+msg);
writer.println(msg);
} catch (InterruptedException e) {
System.out.println("接到中断请求,重试知道队列中的任务消费完");
}
}
} finally{
writer.close();
}
}
}
}
public static void main(String[] args) throws Exception {
SafeCancellThread sct = new SafeCancellThread();
PrintWriter writer = new PrintWriter("C:\\Users\\Administrator\\Desktop\\input\\temp.txt");
writer.write("开始");
final LogService logService = sct.new LogService(writer);
new Thread(new Runnable() {
@Override
public void run() {
int incre = 1;
while(!logService.isShutdown){//如果服务停止,就不继续生产日志任务了。默认为false
try {
Thread.sleep(2*100);//每200ms产生一条日志写入任务
logService.log("日志"+incre);//添加日志时会出现堵塞
incre++;
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}).start();
Thread.sleep(6*1000);
System.out.println("开始处理任务");
logService.start();//开始处理
//等待4s后执行取消任务的请求
Thread.sleep(4*1000);
System.out.println("发起取消请求");
logService.stop();
}
}
执行结果:
可以发现,调用了中断请求,停止了向阻塞队列中继续加入任务,并且没有立刻停止线程,而是继续将已添加到队列中的任务执行完,才停止线程。避免了加入阻塞队列中的任务由于线程中断而被忽略。
还可以使用基于服务的消息中断,管理消息的生命周期
package com.thread;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* 使用Executor的日志服务
* @author hadoop
*
*/
public class LogService {
private final ExecutorService exec = Executors.newSingleThreadExecutor();
private static final int TIMEOUT = 5;
private static final TimeUnit UNIT = TimeUnit.SECONDS;
private final PrintWriter writer ;
public void start(){
}
public LogService(PrintWriter writer) {
this.writer = writer;
}
public void stop() throws InterruptedException{
try {
System.out.println("停止");
exec.shutdown();//会等待已提交到线程池中的任务执行完毕后,在中断线程
exec.awaitTermination(TIMEOUT, UNIT);//
}finally{
writer.close();
}
}
class WriteTask implements Runnable{
private String msg;
public WriteTask(String msg) {
this.msg = msg;
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("处理"+msg);
writer.write(msg);
}
}
public void log(String msg){
try {
exec.execute(new WriteTask(msg));
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws FileNotFoundException {
PrintWriter writer = new PrintWriter("C:\\Users\\Administrator\\Desktop\\input\\temp.txt");
LogService logService = new LogService(writer);
for (int i = 0; i < 100; i++) {
System.out.println("日志" + i);
try {
Thread.sleep(300);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
logService.log("日志"+i);
}
try {
logService.stop();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.out.println("线程任务被中断");
}
}
}