侧边栏壁纸
博主头像
会飞的大象博主等级

爱运动的程序猿

  • 累计撰写 126 篇文章
  • 累计创建 158 个标签
  • 累计收到 0 条评论
标签搜索

目 录CONTENT

文章目录

多线程案例一(使用阻塞队列,线程池,计算器控制

会飞的大象
2021-04-25 / 0 评论 / 0 点赞 / 925 阅读 / 1,293 字

1.BlockingQueue定义的常用方法如下:

抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用

1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则招聘异常

2)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.

3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

4)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null

5)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止

其中:BlockingQueue 不接受null 元素。试图add、put 或offer 一个null 元素时,某些实现会抛出NullPointerException。null 被用作指示poll 操作失败的警戒值
1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则招聘异常

2)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.

3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

4)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null

5)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止

其中:BlockingQueue 不接受null 元素。试图add、put 或offer 一个null 元素时,某些实现会抛出NullPointerException。null 被用作指示poll 操作失败的警戒值

2.简要概述BlockingQueue常用的四个实现类

1)ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的.

2)LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的

3)PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序.

4)SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的.

其中LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.  

3.实现场景为单线程,并行执行完多线程后继续单线程完成接下来的任务。(也可以使用Future,及Future.get()进行堵塞,当线程执行完了继续执行)

BlockingQueue blockingQueue=new LinkedBlockingDeque();
        for(int i=0;i<20000;i++)
        {
            try {
                blockingQueue.put(new User("小猪佩奇"+i,i));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        long start=System.currentTimeMillis();
        int batch=5;
        CountDownLatch count=new CountDownLatch(batch);
        ExecutorService service= Executors.newFixedThreadPool(batch);
        Runnable run=() ->{
         while(blockingQueue.size()!=0){
                try {
                    User user= (User) blockingQueue.take();
                    System.out.println(user.toString());
                    if(blockingQueue.size()==0){
                        count.countDown();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i=0;i<batch;i++){
            service.submit(run);
        }
        try {
            count.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        service.shutdown();
        long end=System.currentTimeMillis();
        System.out.println("共用了:"+(end-start));

2.使用Future(单子线程稳定,多子线程不稳定)

BlockingQueue blockingQueue=new LinkedBlockingDeque();
       for(Integer i=0;i<20000;i++)
       {
           try {
               blockingQueue.put(new User("小猪佩奇"+i,i));
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }


       long start=System.currentTimeMillis();
       int batch=5;
       CountDownLatch count=new CountDownLatch(batch);
       ExecutorService service= Executors.newFixedThreadPool(batch);
       Runnable run=() ->{
           while(blockingQueue.size()!=0){
               try {
                   User user= (User) blockingQueue.take();
                   System.out.println(user.toString());
//                    if(blockingQueue.size()==0){
//                        count.countDown();
//                    }
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       };
       Future<?> submit = null;
       for (int i=0;i<batch;i++){
           submit=service.submit(run);
       }
//        try {
//            count.await();
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
       try {
           submit.get();
       } catch (InterruptedException e) {
           e.printStackTrace();
       } catch (ExecutionException e) {
           e.printStackTrace();
       }
       //是否执行完成
       System.out.println(submit.isDone());
       service.shutdown();
       long end=System.currentTimeMillis();
       System.out.println("共用了:"+(end-start));

3.多子线程使用Future(稳定,建议使用)

public static void main(String[] args) {
       BlockingQueue blockingQueue=new LinkedBlockingDeque();
       for(Integer i=0;i<20000;i++)
       {
           try {
               blockingQueue.put(new User("小猪佩奇"+i,i));
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }


       long start=System.currentTimeMillis();
       int batch=10;
       CountDownLatch count=new CountDownLatch(batch);
       ExecutorService service= Executors.newFixedThreadPool(batch);
       Runnable run=() ->{
           while(blockingQueue.size()!=0){
               try {
                   User user= (User) blockingQueue.take();
                   System.out.println(Thread.currentThread().getName()+user.toString());
//                    if(blockingQueue.size()==0){
//                        count.countDown();
//                    }
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
       };

       ArrayList<Future<?>> futures = new ArrayList<Future<?>>();
       Future<?> submit = null;
       for (int i=0;i<batch;i++){
           submit=service.submit(run);
           futures.add(submit);
       }
//        try {
//            count.await();
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
       System.out.println("获取结果中...");
       for (Future<?> f : futures) {
           try {
//                 if(f.isDone())
               System.out.println(f.get());
		System.out.println(f.isDone());
           } catch (Exception e) {
               e.printStackTrace();
           }
       }
       //是否执行完成
       service.shutdown();
       long end=System.currentTimeMillis();
       System.out.println("共用了:"+(end-start));
   }

0

评论区