Java并发编程
转自:
第一个例子(没有阻塞主线程,会先输出over):
1 package javathreaddemo; 2 3 import java.util.concurrent.ArrayBlockingQueue; 4 import java.util.concurrent.ThreadPoolExecutor; 5 import java.util.concurrent.TimeUnit; 6 7 public class ThreadPoolExecutorDemo { 8 public static void main(String[] args) { 9 ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(5));10 int length = 15;11 for (int i = 0; i < length; i++) {12 MyTask myTask = new MyTask(i);13 executor.execute(myTask);14 System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" + executor.getQueue().size() + ",已执行玩别的任务数目:" + executor.getCompletedTaskCount());15 }16 executor.shutdown();17 System.out.println("over");18 }19 }20 21 class MyTask implements Runnable {22 private int taskNum;23 24 public MyTask(int num) {25 this.taskNum = num;26 }27 28 public void run() {29 System.out.println("正在执行task " + taskNum);30 try {31 Thread.sleep(4000);32 } catch (InterruptedException e) {33 e.printStackTrace();34 }35 System.out.println("task " + taskNum + "执行完毕");36 }37 }
运行结果:
正在执行task 0线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0正在执行task 1线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0正在执行task 2线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0正在执行task 3线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0正在执行task 4线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0正在执行task 10线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0正在执行task 11线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0正在执行task 12线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0正在执行task 13线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0正在执行task 14overtask 4执行完毕task 2执行完毕正在执行task 5task 1执行完毕task 3执行完毕正在执行task 7task 0执行完毕正在执行task 6task 10执行完毕task 11执行完毕task 12执行完毕正在执行task 9正在执行task 8task 14执行完毕task 13执行完毕task 5执行完毕task 7执行完毕task 8执行完毕task 6执行完毕task 9执行完毕
如果第10行,改成 length > 15的值,程序运行就会报错。
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task javathreaddemo.MyTask@2cc7d960 rejected from java.util.concurrent.ThreadPoolExecutor@74904497[Running, pool size = 10, active threads = 10, queued tasks = 5, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372) at javathreaddemo.ThreadPoolExecutorDemo.main(ThreadPoolExecutorDemo.java:17)
原因是任务拒绝策略
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
第9行的代码创建线程池对象代码为:new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(5));查看源码它里面使用的是AbortPolicy这个拒绝策略。 本以为这样就结束了,其实后来我发现真正的原因是因为使用了ArrayBlockingQueue (5)限制了队列里容量只能存放5个长度,如果将5改成10,和第二参数(maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程)一样就可以了,如果该值低于第二个参数值也会出现上面的错误。 总结一下:这种方式需要首先判断如果 (第一个参数 + 第二个参数) >= length,ArrayBlockingQueue (capacity)的capacity设置为第一个参数就可以了;如果 (第一个参数 + 第二个参数) < length , ArrayBlockingQueue (capacity)的capacity设置为第二个参数就可以了。 还有一种方式就是将new ArrayBlockingQueue (capacity)换成new LinkedBlockingQueue ()对象就可以了,因为LinkedBlockingQueue有无参的构造函数,这样就不用管capacity和length到底应该设置成什么了。 第二个例子(增加阻塞主线程,等程序执行完成之后在输出over):
1 package javathreaddemo; 2 3 import java.util.concurrent.ArrayBlockingQueue; 4 import java.util.concurrent.CountDownLatch; 5 import java.util.concurrent.ThreadPoolExecutor; 6 import java.util.concurrent.TimeUnit; 7 8 public class ThreadPoolExecutorCountDownLatchDemo { 9 public static void main(String[] args) throws InterruptedException {10 ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(5));11 int length = 15;12 final CountDownLatch latch = new CountDownLatch(length);13 for (int i = 0; i < length; i++) {14 final int index = i;15 executor.execute(new Runnable() {16 public void run() {17 System.out.println("正在执行task " + index);18 try {19 Thread.sleep(4000);20 } catch (InterruptedException e) {21 e.printStackTrace();22 } finally {23 System.out.println("task " + index + "执行完毕");24 latch.countDown();25 }26 }27 });28 System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" + executor.getQueue().size() + ",已执行玩别的任务数目:" + executor.getCompletedTaskCount());29 }30 // 使用CountDownLatch确保所有线程结束后才往下走31 latch.await();32 executor.shutdown();33 System.out.println("over");34 }35 }
运行结果:
正在执行task 0线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0正在执行task 1正在执行task 2线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0正在执行task 3线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0正在执行task 4线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0正在执行task 10线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0正在执行task 11线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0正在执行task 12线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0正在执行task 13线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0正在执行task 14task 0执行完毕正在执行task 5task 2执行完毕task 1执行完毕task 10执行完毕task 4执行完毕正在执行task 9正在执行task 8正在执行task 7task 3执行完毕正在执行task 6task 11执行完毕task 12执行完毕task 14执行完毕task 13执行完毕task 5执行完毕task 7执行完毕task 8执行完毕task 9执行完毕task 6执行完毕over
不过在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:
Executors.newCachedThreadPool(); //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUEExecutors.newSingleThreadExecutor(); //创建容量为1的缓冲池Executors.newFixedThreadPool(int); //创建固定容量大小的缓冲池
下面是这三个静态方法的具体实现;
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());}public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue ()));}public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue ());}
从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。 newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue; newSingleThreadExecutor将corePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue; newCachedThreadPool将corePoolSize设置为0,将maximumPoolSize设置为 Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。 实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。
使用Executors创建线程池:
1 package javathreaddemo; 2 3 import java.util.concurrent.CountDownLatch; 4 import java.util.concurrent.Executors; 5 import java.util.concurrent.ThreadPoolExecutor; 6 7 public class ExecutorsDemo { 8 public static void main(String[] args) throws InterruptedException { 9 int count = 5;10 ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(count);11 int length = 15;12 final CountDownLatch latch = new CountDownLatch(length);13 for (int i = 0; i < length; i++) {14 final int index = i;15 executor.execute(new Runnable() {16 public void run() {17 System.out.println("正在执行task " + index);18 try {19 Thread.sleep(4000);20 } catch (InterruptedException e) {21 e.printStackTrace();22 } finally {23 System.out.println("task " + index + "执行完毕");24 latch.countDown();25 }26 }27 });28 System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" + executor.getQueue().size() + ",已执行玩别的任务数目:" + executor.getCompletedTaskCount());29 }30 // 使用CountDownLatch确保所有线程结束后才往下走31 latch.await();32 executor.shutdown();33 System.out.println("over");34 }35 }
运行结果:
正在执行task 0线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0正在执行task 1线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0正在执行task 2线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0正在执行task 3线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0正在执行task 4线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0线程池中线程数目:5,队列中等待执行的任务数目:6,已执行玩别的任务数目:0线程池中线程数目:5,队列中等待执行的任务数目:7,已执行玩别的任务数目:0线程池中线程数目:5,队列中等待执行的任务数目:8,已执行玩别的任务数目:0线程池中线程数目:5,队列中等待执行的任务数目:9,已执行玩别的任务数目:0线程池中线程数目:5,队列中等待执行的任务数目:10,已执行玩别的任务数目:0task 2执行完毕task 0执行完毕task 3执行完毕task 1执行完毕正在执行task 7正在执行task 6正在执行task 5task 4执行完毕正在执行task 8正在执行task 9task 9执行完毕task 6执行完毕task 8执行完毕task 5执行完毕task 7执行完毕正在执行task 13正在执行task 12正在执行task 11正在执行task 10正在执行task 14task 13执行完毕task 10执行完毕task 11执行完毕task 12执行完毕task 14执行完毕over
使用Executors还有一个好处就是你不需要考虑count和length到底应该设置什么才不会出错,因为Executors.newFixedThreadPool里面使用的是LinkedBlockingQueue这个对象。