壅闭行列
观点
行列
行列就可以想成是一个数组,从一头进入,一头出去,排队买饭
壅闭行列
BlockingQueue 壅闭行列,排队拥堵,首先它是一个行列,而一个壅闭行列在数据结构中所起的作用大致如下图所示:
线程1往壅闭行列中添加元素,而线程2从壅闭行列中移除元素
-
当壅闭行列是空时,从行列中获取元素的操作将会被壅闭
- 当蛋糕店的柜子空的时刻,无法从柜子内里获取蛋糕
-
当壅闭行列是满时,从行列中添加元素的操作将会被壅闭
- 当蛋糕店的柜子满的时刻,无法继续向柜子内里添加蛋糕了
也就是说 试图从空的壅闭行列中获取元素的线程将会被壅闭,直到其它线程往空的行列插入新的元素
同理,试图往已经满的壅闭行列中添加新元素的线程,直到其它线程往满的行列中移除一个或多个元素,或者完全清空行列后,使行列重新变得空闲起来,并后续新增
为什么要用?
去海底捞用饭,大厅满了,需要进候厅守候,然则这些守候的客户能够对商家带来利润,因此我们异常迎接他们壅闭
在多线程领域:所谓的壅闭,在某些清空下会挂起线程(即壅闭),一旦条件知足,被挂起的线程又会自动叫醒
为什么需要BlockingQueue
利益是我们不需要体贴什么时刻需要壅闭线程,什么时刻需要叫醒线程,由于这一切BlockingQueue都帮你一手包办了
在concurrent包公布以前,在多线程环境下,我们每个程序员都必须自己取控制这些细节,尤其还要兼顾效率和线程平安,而这会给我们的程序带来不小的复杂度。
架构
// 你用过List聚集类
// ArrayList聚集类熟悉么?
// 还用过 CopyOnWriteList 和 BlockingQueue
BlockingQueue壅闭行列是属于一个接口,底下有七个实现类
- ArrayBlockQueue:由数组结构组成的有界壅闭行列
- LinkedBlockingQueue:由链表结构组成的有界(然则默认巨细 Integer.MAX_VALUE)的壅闭行列
- 有界,然则界线异常大,相当于无界,可以当成无界
- PriorityBlockQueue:支持优先级排序的无界壅闭行列
- DelayQueue:使用优先级行列实现的延迟无界壅闭行列
- SynchronousQueue:不存储元素的壅闭行列,也即单个元素的行列
- 生产一个,消费一个,不存储元素,不消费不生产
- LinkedTransferQueue:由链表结构组成的无界壅闭行列
- LinkedBlockingDeque:由链表结构组成的双向壅闭行列
这里需要掌握的是:ArrayBlockQueue、LinkedBlockingQueue、SynchronousQueue
BlockingQueue焦点方式
抛出异常 | 当壅闭行列满时:在往行列中add插入元素会抛出 IIIegalStateException:Queue full 当壅闭行列空时:再往行列中remove移除元素,会抛出NoSuchException |
---|---|
特殊性 | 插入方式,乐成true,失败false 移除方式:乐成返回出行列元素,行列没有就返回空 |
一直壅闭 | 当壅闭行列满时,生产者继续往行列里put元素,行列会一直壅闭生产线程直到put数据or响应中止退出, 当壅闭行列空时,消费者线程试图从行列里take元素,行列会一直壅闭消费者线程直到行列可用。 |
超时退出 | 当壅闭行列满时,队里会壅闭生产者线程一定时间,跨越限时后生产者线程会退出 |
抛出异常组
但执行add方式,向已经满的ArrayBlockingQueue中添加元素时刻,会抛出异常
// 壅闭行列,需要填入默认值
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.add("XXX"));
运行后:
true
true
true
Exception in thread "main" java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at java.util.concurrent.ArrayBlockingQueue.add(ArrayBlockingQueue.java:312)
at com.moxi.interview.study.queue.BlockingQueueDemo.main(BlockingQueueDemo.java:25)
同时若是我们多取出元素的时刻,也会抛出异常,我们假设只存储了3个值,然则取的时刻,取了四次
// 壅闭行列,需要填入默认值
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
那么泛起异常
true
true
true
a
b
c
Exception in thread "main" java.util.NoSuchElementException
at java.util.AbstractQueue.remove(AbstractQueue.java:117)
at com.moxi.interview.study.queue.BlockingQueueDemo.main(BlockingQueueDemo.java:30)
布尔类型组
我们使用 offer的方式,添加元素时刻,若是壅闭行列满了后,会返回false,否者返回true
同时在取的时刻,若是行列已空,那么会返回null
BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("d"));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
运行效果
true
true
true
false
a
b
c
null
壅闭行列组
我们使用 put的方式,添加元素时刻,若是壅闭行列满了后,添加新闻的线程,会一直壅闭,直到行列元素削减,会被清空,才会叫醒
一样平常在新闻中间件,好比RabbitMQ中会使用到,由于需要保证新闻百分百不丢失,因此只有让它壅闭
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
System.out.println("================");
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
同时使用take取新闻的时刻,若是内容不存在的时刻,也会被壅闭
不见不散组
offer( ) , poll 加时间
使用offer插入的时刻,需要指定时间,若是2秒还没有插入,那么就放弃插入
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("b", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("c", 2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("d", 2L, TimeUnit.SECONDS));
同时取的时刻也举行判断
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS));
若是2秒内取不出来,那么就返回null
SynchronousQueue
SynchronousQueue没有容量,与其他BlockingQueue差别,SynchronousQueue是一个不存储的BlockingQueue,每一个put操作必须守候一个take操作,否者不能继续添加元素
下面我们测试SynchronousQueue添加元素的历程
首先我们创建了两个线程,一个线程用于生产,一个线程用于消费
生产的线程划分put了 A、B、C这三个字段
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t put A ");
blockingQueue.put("A");
System.out.println(Thread.currentThread().getName() + "\t put B ");
blockingQueue.put("B");
System.out.println(Thread.currentThread().getName() + "\t put C ");
blockingQueue.put("C");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
消费线程使用take,消费壅闭行列中的内容,而且每次消费前,都守候5秒
new Thread(() -> {
try {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "\t take A ");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "\t take B ");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "\t take C ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
最后效果输出为:
t1 put A
t2 take A
5秒后...
t1 put B
t2 take B
5秒后...
t1 put C
t2 take C
我们从最后的运行效果可以看出,每次t1线程向行列中添加壅闭行列添加元素后,t1输入线程就会守候 t2消费线程,t2消费后,t2处于挂起状态,守候t1在存入,从而周而复始,形成 一存一取的状态
壅闭行列的用处
生产者消费者模式
一个初始值为0的变量,两个线程对其交替操作,一个加1,一个减1,来5轮
关于多线程的操作,我们需要记着下面几句
- 线程 操作 资源类
- 判断 干活 通知
- 防止虚伪叫醒机制
我们下面实现一个简朴的生产者消费者模式,首先有资源类ShareData
/**
* 资源类
*/
class ShareData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws Exception{
// 同步代码块,加锁
lock.lock();
try {
// 判断
while(number != 0) {
// 守候不能生产
condition.await();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// 通知 叫醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() throws Exception{
// 同步代码块,加锁
lock.lock();
try {
// 判断
while(number == 0) {
// 守候不能消费
condition.await();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// 通知 叫醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
内里有一个number变量,同时提供了increment 和 decrement的方式,划分让number 加1和减1
然则我们在举行判断的时刻,为了防止泛起虚伪叫醒机制,不能使用if来举行判断,而应该使用while
// 判断
while(number != 0) {
// 守候不能生产
condition.await();
}
不能使用 if判断
// 判断
if(number != 0) {
// 守候不能生产
condition.await();
}
完整代码
/**
* 生产者消费者 传统版
* 问题:一个初始值为0的变量,两个线程对其交替操作,一个加1,一个减1,来5轮
*/
/**
* 线程 操作 资源类
* 判断 干活 通知
* 防止虚伪叫醒机制
*/
/**
* 资源类
*/
class ShareData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws Exception{
// 同步代码块,加锁
lock.lock();
try {
// 判断
while(number != 0) {
// 守候不能生产
condition.await();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// 通知 叫醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decrement() throws Exception{
// 同步代码块,加锁
lock.lock();
try {
// 判断
while(number == 0) {
// 守候不能消费
condition.await();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// 通知 叫醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ProdConsumerTraditionDemo {
public static void main(String[] args) {
// 高内聚,低耦合 内聚指的是,一个空调,自身带有调治温度崎岖的方式
ShareData shareData = new ShareData();
// t1线程,生产
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "t1").start();
// t2线程,消费
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "t2").start();
}
}
最后运行乐成后,我们一个举行生产,一个举行消费
t1 1
t2 0
t1 1
t2 0
t1 1
t2 0
t1 1
t2 0
t1 1
t2 0
天生者和消费者3.0
在concurrent包公布以前,在多线程环境下,我们每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程平安,则这会给我们的程序带来不小的时间复杂度
现在我们使用新版的壅闭行列版生产者和消费者,使用:volatile、CAS、atomicInteger、BlockQueue、线程交互、原子引用
/**
* 生产者消费者 壅闭行列版
* 使用:volatile、CAS、atomicInteger、BlockQueue、线程交互、原子引用
*
*/
class MyResource {
// 默认开启,举行生产消费
// 这里用到了volatile是为了保持数据的可见性,也就是当TLAG修改时,要马上通知其它线程举行修改
private volatile boolean FLAG = true;
// 使用原子包装类,而不用number++
private AtomicInteger atomicInteger = new AtomicInteger();
// 这里不能为了知足条件,而实例化一个详细的SynchronousBlockingQueue
BlockingQueue<String> blockingQueue = null;
// 而应该接纳依赖注入内里的,组织注入方式传入
public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
// 查询出传入的class是什么
System.out.println(blockingQueue.getClass().getName());
}
/**
* 生产
* @throws Exception
*/
public void myProd() throws Exception{
String data = null;
boolean retValue;
// 多线程环境的判断,一定要使用while举行,防止泛起虚伪叫醒
// 当FLAG为true的时刻,最先生产
while(FLAG) {
data = atomicInteger.incrementAndGet() + "";
// 2秒存入1个data
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if(retValue) {
System.out.println(Thread.currentThread().getName() + "\t 插入行列:" + data + "乐成" );
} else {
System.out.println(Thread.currentThread().getName() + "\t 插入行列:" + data + "失败" );
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "\t 住手生产,示意FLAG=false,生产先容");
}
/**
* 消费
* @throws Exception
*/
public void myConsumer() throws Exception{
String retValue;
// 多线程环境的判断,一定要使用while举行,防止泛起虚伪叫醒
// 当FLAG为true的时刻,最先生产
while(FLAG) {
// 2秒存入1个data
retValue = blockingQueue.poll(2L, TimeUnit.SECONDS);
if(retValue != null && retValue != "") {
System.out.println(Thread.currentThread().getName() + "\t 消费行列:" + retValue + "乐成" );
} else {
FLAG = false;
System.out.println(Thread.currentThread().getName() + "\t 消费失败,行列中已为空,退出" );
// 退出消费行列
return;
}
}
}
/**
* 住手生产的判断
*/
public void stop() {
this.FLAG = false;
}
}
public class ProdConsumerBlockingQueueDemo {
public static void main(String[] args) {
// 传入详细的实现类, ArrayBlockingQueue
MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 生产线程启动");
System.out.println("");
System.out.println("");
try {
myResource.myProd();
System.out.println("");
System.out.println("");
} catch (Exception e) {
e.printStackTrace();
}
}, "prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t 消费线程启动");
try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "consumer").start();
// 5秒后,住手生产和消费
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("");
System.out.println("");
System.out.println("5秒中后,生产和消费线程住手,线程竣事");
myResource.stop();
}
}
最后运行效果
java.util.concurrent.ArrayBlockingQueue
prod 生产线程启动
consumer 消费线程启动
prod 插入行列:1乐成
consumer 消费行列:1乐成
prod 插入行列:2乐成
consumer 消费行列:2乐成
prod 插入行列:3乐成
consumer 消费行列:3乐成
prod 插入行列:4乐成
consumer 消费行列:4乐成
prod 插入行列:5乐成
consumer 消费行列:5乐成
5秒中后,生产和消费线程住手,线程竣事
prod 住手生产,示意FLAG=false,生产先容
,申博sunbet www.sunbet88.us是Sunbet指定的Sunbet官网,Sunbet提供Sunbet(Sunbet)、Sunbet、申博代理合作等业务。
转载声明:本站发布文章及版权归原作者所有,转载本站文章请注明文章来源!
网友评论
2条评论澳洲5官网(a55555.net)
回复soicầuxsmbvip
回复四、做好疫苗接种是疫情防控的重要手段,请适龄、无禁忌症人群尽快接种新冠病毒疫苗,做到“应接尽接”;符合加强免疫接种条件的市民,应尽快接种“加强针”。下笔如有神