1.BlockingQueue定义的常用方法如下:
抛出异常 | 特殊值 | 阻塞 | 超时 | |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检查 | element() | peek() | 不可用 | 不可用 |
1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则招聘异常
2)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.
3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.
4)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null
5)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
其中:BlockingQueue 不接受null 元素。试图add、put 或offer 一个null 元素时,某些实现会抛出NullPointerException。null 被用作指示poll 操作失败的警戒值
1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则招聘异常
2)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.
3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.
4)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null
5)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
其中:BlockingQueue 不接受null 元素。试图add、put 或offer 一个null 元素时,某些实现会抛出NullPointerException。null 被用作指示poll 操作失败的警戒值
2.简要概述BlockingQueue常用的四个实现类
1)ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的.
2)LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的
3)PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序.
4)SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的.
其中LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.
3.实现场景为单线程,并行执行完多线程后继续单线程完成接下来的任务。(也可以使用Future,及Future.get()进行堵塞,当线程执行完了继续执行)
BlockingQueue blockingQueue=new LinkedBlockingDeque();
for(int i=0;i<20000;i++)
{
try {
blockingQueue.put(new User("小猪佩奇"+i,i));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long start=System.currentTimeMillis();
int batch=5;
CountDownLatch count=new CountDownLatch(batch);
ExecutorService service= Executors.newFixedThreadPool(batch);
Runnable run=() ->{
while(blockingQueue.size()!=0){
try {
User user= (User) blockingQueue.take();
System.out.println(user.toString());
if(blockingQueue.size()==0){
count.countDown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i=0;i<batch;i++){
service.submit(run);
}
try {
count.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
service.shutdown();
long end=System.currentTimeMillis();
System.out.println("共用了:"+(end-start));
2.使用Future(单子线程稳定,多子线程不稳定)
BlockingQueue blockingQueue=new LinkedBlockingDeque();
for(Integer i=0;i<20000;i++)
{
try {
blockingQueue.put(new User("小猪佩奇"+i,i));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long start=System.currentTimeMillis();
int batch=5;
CountDownLatch count=new CountDownLatch(batch);
ExecutorService service= Executors.newFixedThreadPool(batch);
Runnable run=() ->{
while(blockingQueue.size()!=0){
try {
User user= (User) blockingQueue.take();
System.out.println(user.toString());
// if(blockingQueue.size()==0){
// count.countDown();
// }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Future<?> submit = null;
for (int i=0;i<batch;i++){
submit=service.submit(run);
}
// try {
// count.await();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
try {
submit.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
//是否执行完成
System.out.println(submit.isDone());
service.shutdown();
long end=System.currentTimeMillis();
System.out.println("共用了:"+(end-start));
3.多子线程使用Future(稳定,建议使用)
public static void main(String[] args) {
BlockingQueue blockingQueue=new LinkedBlockingDeque();
for(Integer i=0;i<20000;i++)
{
try {
blockingQueue.put(new User("小猪佩奇"+i,i));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long start=System.currentTimeMillis();
int batch=10;
CountDownLatch count=new CountDownLatch(batch);
ExecutorService service= Executors.newFixedThreadPool(batch);
Runnable run=() ->{
while(blockingQueue.size()!=0){
try {
User user= (User) blockingQueue.take();
System.out.println(Thread.currentThread().getName()+user.toString());
// if(blockingQueue.size()==0){
// count.countDown();
// }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
ArrayList<Future<?>> futures = new ArrayList<Future<?>>();
Future<?> submit = null;
for (int i=0;i<batch;i++){
submit=service.submit(run);
futures.add(submit);
}
// try {
// count.await();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
System.out.println("获取结果中...");
for (Future<?> f : futures) {
try {
// if(f.isDone())
System.out.println(f.get());
System.out.println(f.isDone());
} catch (Exception e) {
e.printStackTrace();
}
}
//是否执行完成
service.shutdown();
long end=System.currentTimeMillis();
System.out.println("共用了:"+(end-start));
}
评论区