欢迎访问www.allbetgaming.com!

首页科技正文

tm商标:壅闭行列BlockingQueue

admin2023-05-05294

壅闭行列

观点

行列

行列就可以想成是一个数组,从一头进入,一头出去,排队买饭

壅闭行列

BlockingQueue 壅闭行列,排队拥堵,首先它是一个行列,而一个壅闭行列在数据结构中所起的作用大致如下图所示:

线程1往壅闭行列中添加元素,而线程2从壅闭行列中移除元素

  • 当壅闭行列是空时,从行列中获取元素的操作将会被壅闭

    • 当蛋糕店的柜子空的时刻,无法从柜子内里获取蛋糕
  • 当壅闭行列是满时,从行列中添加元素的操作将会被壅闭

    • 当蛋糕店的柜子满的时刻,无法继续向柜子内里添加蛋糕了

也就是说 试图从空的壅闭行列中获取元素的线程将会被壅闭,直到其它线程往空的行列插入新的元素

同理,试图往已经满的壅闭行列中添加新元素的线程,直到其它线程往满的行列中移除一个或多个元素,或者完全清空行列后,使行列重新变得空闲起来,并后续新增

为什么要用?

去海底捞用饭,大厅满了,需要进候厅守候,然则这些守候的客户能够对商家带来利润,因此我们异常迎接他们壅闭

在多线程领域:所谓的壅闭,在某些清空下会挂起线程(即壅闭),一旦条件知足,被挂起的线程又会自动叫醒

为什么需要BlockingQueue

利益是我们不需要体贴什么时刻需要壅闭线程,什么时刻需要叫醒线程,由于这一切BlockingQueue都帮你一手包办了

在concurrent包公布以前,在多线程环境下,我们每个程序员都必须自己取控制这些细节,尤其还要兼顾效率和线程平安,而这会给我们的程序带来不小的复杂度。

架构

// 你用过List聚集类

// ArrayList聚集类熟悉么?

// 还用过 CopyOnWriteList  和 BlockingQueue

BlockingQueue壅闭行列是属于一个接口,底下有七个实现类

  • ArrayBlockQueue:由数组结构组成的有界壅闭行列
  • LinkedBlockingQueue:由链表结构组成的有界(然则默认巨细 Integer.MAX_VALUE)的壅闭行列
    • 有界,然则界线异常大,相当于无界,可以当成无界
  • PriorityBlockQueue:支持优先级排序的无界壅闭行列
  • DelayQueue:使用优先级行列实现的延迟无界壅闭行列
  • SynchronousQueue:不存储元素的壅闭行列,也即单个元素的行列
    • 生产一个,消费一个,不存储元素,不消费不生产
  • LinkedTransferQueue:由链表结构组成的无界壅闭行列
  • LinkedBlockingDeque:由链表结构组成的双向壅闭行列

这里需要掌握的是:ArrayBlockQueue、LinkedBlockingQueue、SynchronousQueue

BlockingQueue焦点方式

抛出异常 当壅闭行列满时:在往行列中add插入元素会抛出 IIIegalStateException:Queue full 当壅闭行列空时:再往行列中remove移除元素,会抛出NoSuchException
特殊性 插入方式,乐成true,失败false 移除方式:乐成返回出行列元素,行列没有就返回空
一直壅闭 当壅闭行列满时,生产者继续往行列里put元素,行列会一直壅闭生产线程直到put数据or响应中止退出, 当壅闭行列空时,消费者线程试图从行列里take元素,行列会一直壅闭消费者线程直到行列可用。
超时退出 当壅闭行列满时,队里会壅闭生产者线程一定时间,跨越限时后生产者线程会退出

抛出异常组

但执行add方式,向已经满的ArrayBlockingQueue中添加元素时刻,会抛出异常

// 壅闭行列,需要填入默认值
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);

System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));

System.out.println(blockingQueue.add("XXX"));

运行后:

true
true
true
Exception in thread "main" java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(AbstractQueue.java:98)
	at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
	at com.moxi.interview.study.queue.BlockingQueueDemo.main(BlockingQueueDemo.java:25)

同时若是我们多取出元素的时刻,也会抛出异常,我们假设只存储了3个值,然则取的时刻,取了四次

// 壅闭行列,需要填入默认值
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));

System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());

那么泛起异常

true
true
true
a
b
c
Exception in thread "main" java.util.NoSuchElementException
	at java.util.AbstractQueue.remove(AbstractQueue.java:117)
	at com.moxi.interview.study.queue.BlockingQueueDemo.main(BlockingQueueDemo.java:30)

布尔类型组

我们使用 offer的方式,添加元素时刻,若是壅闭行列满了后,会返回false,否者返回true

同时在取的时刻,若是行列已空,那么会返回null

BlockingQueue blockingQueue = new ArrayBlockingQueue(3);

System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d"));

System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());

运行效果

true
true
true
false
a
b
c
null

壅闭行列组

我们使用 put的方式,添加元素时刻,若是壅闭行列满了后,添加新闻的线程,会一直壅闭,直到行列元素削减,会被清空,才会叫醒

一样平常在新闻中间件,好比RabbitMQ中会使用到,由于需要保证新闻百分百不丢失,因此只有让它壅闭

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
System.out.println("================");

blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();

同时使用take取新闻的时刻,若是内容不存在的时刻,也会被壅闭

不见不散组

offer( ) , poll 加时间

使用offer插入的时刻,需要指定时间,若是2秒还没有插入,那么就放弃插入

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("b", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("c", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("d", 2L, TimeUnit.SECONDS));

同时取的时刻也举行判断

System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));

若是2秒内取不出来,那么就返回null

SynchronousQueue

SynchronousQueue没有容量,与其他BlockingQueue差别,SynchronousQueue是一个不存储的BlockingQueue,每一个put操作必须守候一个take操作,否者不能继续添加元素

下面我们测试SynchronousQueue添加元素的历程

首先我们创建了两个线程,一个线程用于生产,一个线程用于消费

生产的线程划分put了 A、B、C这三个字段

BlockingQueue<String> blockingQueue = new SynchronousQueue<>();

new Thread(() -> {
    try {       
        System.out.println(Thread.currentThread().getName() + "\t put A ");
        blockingQueue.put("A");
       
        System.out.println(Thread.currentThread().getName() + "\t put B ");
        blockingQueue.put("B");        
        
        System.out.println(Thread.currentThread().getName() + "\t put C ");
        blockingQueue.put("C");        
        
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}, "t1").start();

消费线程使用take,消费壅闭行列中的内容,而且每次消费前,都守候5秒

        new Thread(() -> {
            try {

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + "\t take A ");

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + "\t take B ");

                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                blockingQueue.take();
                System.out.println(Thread.currentThread().getName() + "\t take C ");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t2").start();

最后效果输出为:

t1	 put A 
t2	 take A 

5秒后...

t1	 put B 
t2	 take B 

5秒后...

t1	 put C 
t2	 take C 

我们从最后的运行效果可以看出,每次t1线程向行列中添加壅闭行列添加元素后,t1输入线程就会守候 t2消费线程,t2消费后,t2处于挂起状态,守候t1在存入,从而周而复始,形成 一存一取的状态

壅闭行列的用处

生产者消费者模式

一个初始值为0的变量,两个线程对其交替操作,一个加1,一个减1,来5轮

关于多线程的操作,我们需要记着下面几句

  • 线程 操作 资源类
  • 判断 干活 通知
  • 防止虚伪叫醒机制

我们下面实现一个简朴的生产者消费者模式,首先有资源类ShareData

/**
 * 资源类
 */
class ShareData {

    private int number = 0;

    private Lock lock = new ReentrantLock();

    private Condition condition = lock.newCondition();

    public void increment() throws Exception{
        // 同步代码块,加锁
        lock.lock();
        try {
            // 判断
            while(number != 0) {
                // 守候不能生产
                condition.await();
            }

            // 干活
            number++;

            System.out.println(Thread.currentThread().getName() + "\t " + number);

            // 通知 叫醒
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void decrement() throws Exception{
        // 同步代码块,加锁
        lock.lock();
        try {
            // 判断
            while(number == 0) {
                // 守候不能消费
                condition.await();
            }

            // 干活
            number--;

            System.out.println(Thread.currentThread().getName() + "\t " + number);

            // 通知 叫醒
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}

内里有一个number变量,同时提供了increment 和 decrement的方式,划分让number 加1和减1

然则我们在举行判断的时刻,为了防止泛起虚伪叫醒机制,不能使用if来举行判断,而应该使用while

// 判断
while(number != 0) {
    // 守候不能生产
    condition.await();
}

不能使用 if判断

// 判断
if(number != 0) {
    // 守候不能生产
    condition.await();
}

完整代码

/**
 * 生产者消费者 传统版
 * 问题:一个初始值为0的变量,两个线程对其交替操作,一个加1,一个减1,来5轮
 */
/**
 * 线程 操作 资源类
 * 判断 干活 通知
 * 防止虚伪叫醒机制
 */

/**
 * 资源类
 */
class ShareData {

    private int number = 0;

    private Lock lock = new ReentrantLock();

    private Condition condition = lock.newCondition();

    public void increment() throws Exception{
        // 同步代码块,加锁
        lock.lock();
        try {
            // 判断
            while(number != 0) {
                // 守候不能生产
                condition.await();
            }

            // 干活
            number++;

            System.out.println(Thread.currentThread().getName() + "\t " + number);

            // 通知 叫醒
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void decrement() throws Exception{
        // 同步代码块,加锁
        lock.lock();
        try {
            // 判断
            while(number == 0) {
                // 守候不能消费
                condition.await();
            }

            // 干活
            number--;

            System.out.println(Thread.currentThread().getName() + "\t " + number);

            // 通知 叫醒
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}
public class ProdConsumerTraditionDemo {

    public static void main(String[] args) {

        // 高内聚,低耦合    内聚指的是,一个空调,自身带有调治温度崎岖的方式
        ShareData shareData = new ShareData();

        // t1线程,生产
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.increment();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "t1").start();

        // t2线程,消费
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.decrement();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "t2").start();
    }
}

最后运行乐成后,我们一个举行生产,一个举行消费

t1	 1
t2	 0
t1	 1
t2	 0
t1	 1
t2	 0
t1	 1
t2	 0
t1	 1
t2	 0

天生者和消费者3.0

在concurrent包公布以前,在多线程环境下,我们每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程平安,则这会给我们的程序带来不小的时间复杂度

现在我们使用新版的壅闭行列版生产者和消费者,使用:volatile、CAS、atomicInteger、BlockQueue、线程交互、原子引用

/**
 * 生产者消费者  壅闭行列版
 * 使用:volatile、CAS、atomicInteger、BlockQueue、线程交互、原子引用
 *
 */

class MyResource {
    // 默认开启,举行生产消费
    // 这里用到了volatile是为了保持数据的可见性,也就是当TLAG修改时,要马上通知其它线程举行修改
    private volatile boolean FLAG = true;

    // 使用原子包装类,而不用number++
    private AtomicInteger atomicInteger = new AtomicInteger();

    // 这里不能为了知足条件,而实例化一个详细的SynchronousBlockingQueue
    BlockingQueue<String> blockingQueue = null;

    // 而应该接纳依赖注入内里的,组织注入方式传入
    public MyResource(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
        // 查询出传入的class是什么
        System.out.println(blockingQueue.getClass().getName());
    }

    /**
     * 生产
     * @throws Exception
     */
    public void myProd() throws Exception{
        String data = null;
        boolean retValue;
        // 多线程环境的判断,一定要使用while举行,防止泛起虚伪叫醒
        // 当FLAG为true的时刻,最先生产
        while(FLAG) {
            data = atomicInteger.incrementAndGet() + "";

            // 2秒存入1个data
            retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
            if(retValue) {
                System.out.println(Thread.currentThread().getName() + "\t 插入行列:" + data  + "乐成" );
            } else {
                System.out.println(Thread.currentThread().getName() + "\t 插入行列:" + data  + "失败" );
            }

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println(Thread.currentThread().getName() + "\t 住手生产,示意FLAG=false,生产先容");
    }

    /**
     * 消费
     * @throws Exception
     */
    public void myConsumer() throws Exception{
        String retValue;
        // 多线程环境的判断,一定要使用while举行,防止泛起虚伪叫醒
        // 当FLAG为true的时刻,最先生产
        while(FLAG) {
            // 2秒存入1个data
            retValue = blockingQueue.poll(2L, TimeUnit.SECONDS);
            if(retValue != null && retValue != "") {
                System.out.println(Thread.currentThread().getName() + "\t 消费行列:" + retValue  + "乐成" );
            } else {
                FLAG = false;
                System.out.println(Thread.currentThread().getName() + "\t 消费失败,行列中已为空,退出" );

                // 退出消费行列
                return;
            }
        }
    }

    /**
     * 住手生产的判断
     */
    public void stop() {
        this.FLAG = false;
    }

}
public class ProdConsumerBlockingQueueDemo {

    public static void main(String[] args) {
        // 传入详细的实现类, ArrayBlockingQueue
        MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10));

        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "\t 生产线程启动");
            System.out.println("");
            System.out.println("");
            try {
                myResource.myProd();
                System.out.println("");
                System.out.println("");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "prod").start();


        new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + "\t 消费线程启动");

            try {
                myResource.myConsumer();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "consumer").start();

        // 5秒后,住手生产和消费
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("");
        System.out.println("");
        System.out.println("5秒中后,生产和消费线程住手,线程竣事");
        myResource.stop();
    }
}

最后运行效果

java.util.concurrent.ArrayBlockingQueue
prod	 生产线程启动


consumer	 消费线程启动
prod	 插入行列:1乐成
consumer	 消费行列:1乐成
prod	 插入行列:2乐成
consumer	 消费行列:2乐成
prod	 插入行列:3乐成
consumer	 消费行列:3乐成
prod	 插入行列:4乐成
consumer	 消费行列:4乐成
prod	 插入行列:5乐成
consumer	 消费行列:5乐成


5秒中后,生产和消费线程住手,线程竣事
prod	 住手生产,示意FLAG=false,生产先容
,

申博sunbet

申博sunbet www.sunbet88.us是Sunbet指定的Sunbet官网,Sunbet提供Sunbet(Sunbet)、Sunbet、申博代理合作等业务。

转载声明:本站发布文章及版权归原作者所有,转载本站文章请注明文章来源!

本文链接:https://www.guangxianchuangan.com/post/743.html

网友评论

2条评论
  • 2023-05-05 00:00:18

    四、做好疫苗接种是疫情防控的重要手段,请适龄、无禁忌症人群尽快接种新冠病毒疫苗,做到“应接尽接”;符合加强免疫接种条件的市民,应尽快接种“加强针”。下笔如有神

最新评论

热门标签