博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
第九章 LinkedBlockingQueue源码解析
阅读量:4616 次
发布时间:2019-06-09

本文共 11965 字,大约阅读时间需要 39 分钟。

1、对于LinkedBlockingQueue需要掌握以下几点

  • 创建
  • 入队(添加元素)
  • 出队(删除元素)

2、创建

Node节点内部类与LinkedBlockingQueue的一些属性

static class Node
{ E item;//节点封装的数据 /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node
next;//下一个节点 Node(E x) { item = x; } } /** 指定链表容量 */ private final int capacity; /** 当前的元素个数 */ private final AtomicInteger count = new AtomicInteger(0); /** 链表头节点 */ private transient Node
head; /** 链表尾节点 */ private transient Node
last; /** 出队锁 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 出队等待条件 */ private final Condition notEmpty = takeLock.newCondition(); /** 入队锁 */ private final ReentrantLock putLock = new ReentrantLock(); /** 入队等待条件 */ private final Condition notFull = putLock.newCondition();
View Code

2.1、public LinkedBlockingQueue(int capacity)

使用方法:

Queue<String> abq = new LinkedBlockingQueue<String>(1000);

源代码:

/**     * 创建一个 LinkedBlockingQueue,容量为指定容量     */    public LinkedBlockingQueue(int capacity) {        if (capacity <= 0) throw new IllegalArgumentException();        this.capacity = capacity;        last = head = new Node
(null);//初始化头节点和尾节点,均为封装了null数据的节点 }
View Code

注意点:

  • LinkedBlockingQueue的组成一个链表+两把锁+两个条件

 

2.2、public LinkedBlockingQueue()

使用方法:

Queue<String> abq = new LinkedBlockingQueue<String>();

源代码:

/**     * 创建一个LinkedBlockingQueue,容量为整数最大值     */    public LinkedBlockingQueue() {        this(Integer.MAX_VALUE);    }
View Code

注意点:默认容量为整数最大值,可以看做没有容量限制

 

3、入队:

3.1、public boolean offer(E e)

原理:

  • 在队尾插入一个元素, 如果队列没满,立即返回true; 如果队列满了,立即返回false

使用方法:

  • abq.offer("hello1");

源代码:

/**     * 在队尾插入一个元素, 容量没满,可以立即插入,返回true; 队列满了,直接返回false     * 注:如果使用了限制了容量的队列,这个方法比add()好,因为add()插入失败就会抛出异常     */    public boolean offer(E e) {        if (e == null)            throw new NullPointerException();        final AtomicInteger count = this.count;// 获取队列中的元素个数        if (count.get() == capacity)// 队列满了            return false;        int c = -1;        final ReentrantLock putLock = this.putLock;        putLock.lock();// 获取入队锁        try {            if (count.get() < capacity) {
// 容量没满 enqueue(e);// 入队 c = count.getAndIncrement();// 容量+1,返回旧值(注意) if (c + 1 < capacity)// 如果添加元素后的容量,还小于指定容量(说明在插入当前元素后,至少还可以再插一个元素) notFull.signal();// 唤醒等待notFull条件的其中一个线程 } } finally { putLock.unlock();// 释放入队锁 } if (c == 0)// 如果c==0,这是什么情况?一开始如果是个空队列,就会是这样的值,要注意的是,上边的c返回的是旧值 signalNotEmpty(); return c >= 0; }
View Code
/**     * 创建一个节点,并加入链表尾部     * @param x     */    private void enqueue(E x) {        /*         * 封装新节点,并赋给当前的最后一个节点的下一个节点,然后在将这个节点设为最后一个节点         */        last = last.next = new Node
(x); }
View Code
private void signalNotEmpty() {        final ReentrantLock takeLock = this.takeLock;        takeLock.lock();//获取出队锁        try {            notEmpty.signal();//唤醒等待notEmpty条件的线程中的一个        } finally {            takeLock.unlock();//释放出队锁        }    }
View Code

如果,入队逻辑不懂,查看最后总结部分入队逻辑的图,代码非常简单,流程看注释即可。

 

3.2、public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException

原理:

  • 在队尾插入一个元素,,如果队列已满,则进入等待,直到出现以下三种情况:
    • 被唤醒
    • 等待时间超时
    • 当前线程被中断

使用方法:

try {            abq.offer("hello2",1000,TimeUnit.MILLISECONDS);        } catch (InterruptedException e) {            e.printStackTrace();        }
View Code

源代码:

/**     * 在队尾插入一个元素,,如果队列已满,则进入等待,直到出现以下三种情况:      * 1、被唤醒      * 2、等待时间超时      * 3、当前线程被中断     */    public boolean offer(E e, long timeout, TimeUnit unit)            throws InterruptedException {        if (e == null)            throw new NullPointerException();        long nanos = unit.toNanos(timeout);// 转换为纳秒        int c = -1;        final ReentrantLock putLock = this.putLock;// 入队锁        final AtomicInteger count = this.count;// 总数量        putLock.lockInterruptibly();        try {            while (count.get() == capacity) {
// 容量已满 if (nanos <= 0)// 已经超时 return false; /* * 进行等待: 在这个过程中可能发生三件事: * 1、被唤醒-->继续当前这个while循环 * 2、超时-->继续当前这个while循环 * 3、被中断-->抛出中断异常InterruptedException */ nanos = notFull.awaitNanos(nanos); } enqueue(e);// 入队 c = count.getAndIncrement();// 入队元素数量+1 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
View Code

注意:

  • awaitNanos(nanos)是AQS中的一个方法,这里就不详细说了,有兴趣的自己去查看AQS的源代码。

 

3.3、public void put(E e) throws InterruptedException

原理:

  • 在队尾插入一个元素,如果队列满了,一直阻塞,直到队列不满了或者线程被中断

使用方法:

try {            abq.put("hello1");        } catch (InterruptedException e) {            e.printStackTrace();        }
View Code

源代码:

/**     * 在队尾插一个元素     * 如果队列满了,一直阻塞,直到队列不满了或者线程被中断     */    public void put(E e) throws InterruptedException {        if (e == null) throw new NullPointerException();        int c = -1;        final ReentrantLock putLock = this.putLock;//入队锁        final AtomicInteger count = this.count;//当前队列中的元素个数        putLock.lockInterruptibly();//加锁        try {            while (count.get() == capacity) {
//如果队列满了 /* * 加入notFull等待队列,直到队列元素不满了, * 被其他线程使用notFull.signal()唤醒 */ notFull.await(); } enqueue(e);//入队 c = count.getAndIncrement();//入队数量+1 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
View Code

 

4、出队

4.1、public E poll()

原理:

  • 如果没有元素,直接返回null;如果有元素,出队

使用方法:

abq.poll();

源代码:

/**     * 出队:      * 1、如果没有元素,直接返回null      * 2、如果有元素,出队     */    public E poll() {        final AtomicInteger count = this.count;// 获取元素数量        if (count.get() == 0)// 没有元素            return null;        E x = null;        int c = -1;        final ReentrantLock takeLock = this.takeLock;        takeLock.lock();// 获取出队锁        try {            if (count.get() > 0) {
// 有元素 x = dequeue();// 出队 // 元素个数-1(注意:该方法是一个无限循环,直到减1成功为止,且返回旧值) c = count.getAndDecrement(); if (c > 1)// 还有元素(如果旧值c==1的话,那么通过上边的操作之后,队列就空了) notEmpty.signal();// 唤醒等待在notEmpty队列中的其中一条线程 } } finally { takeLock.unlock();// 释放出队锁 } if (c == capacity)// c == capacity是怎么发生的?如果队列是一个满队列,注意:上边的c返回的是旧值 signalNotFull(); return x; }
View Code
/**     * 从队列头部移除一个节点     */    private E dequeue() {        Node
h = head;//获取头节点:x==null Node
first = h.next;//将头节点的下一个节点赋值给first h.next = h; // 将当前将要出队的节点置null(为了使其做head节点做准备) head = first;//将当前将要出队的节点作为了头节点 E x = first.item;//获取出队节点的值 first.item = null;//将出队节点的值置空 return x; }
View Code
private void signalNotFull() {        final ReentrantLock putLock = this.putLock;        putLock.lock();        try {            notFull.signal();        } finally {            putLock.unlock();        }    }
View Code

注意:出队逻辑如果不懂,查看最后总结部分的图

 

4.2、public E poll(long timeout, TimeUnit unit) throws InterruptedException

原理:

  • 从队头删除一个元素,如果队列不空,出队;如果队列已空且已经超时,返回null;如果队列已空且时间未超时,则进入等待,直到出现以下三种情况:
    • 被唤醒
    • 等待时间超时
    • 当前线程被中断

使用方法:

try {            abq.poll(1000, TimeUnit.MILLISECONDS);        } catch (InterruptedException e) {            e.printStackTrace();        }
View Code

源代码:

/**     * 从队列头部删除一个元素,     * 如果队列不空,出队;     * 如果队列已空,判断时间是否超时,如果已经超时,返回null     * 如果队列已空且时间未超时,则进入等待,直到出现以下三种情况:     * 1、被唤醒     * 2、等待时间超时     * 3、当前线程被中断     */    public E poll(long timeout, TimeUnit unit) throws InterruptedException {        E x = null;        int c = -1;        long nanos = unit.toNanos(timeout);        final AtomicInteger count = this.count;        final ReentrantLock takeLock = this.takeLock;        takeLock.lockInterruptibly();        try {            while (count.get() == 0) {
//如果队列没有元素 if (nanos <= 0)//已经超时 return null; /* * 进行等待: * 在这个过程中可能发生三件事: * 1、被唤醒-->继续当前这个while循环 * 2、超时-->继续当前这个while循环 * 3、被中断-->抛出异常 */ nanos = notEmpty.awaitNanos(nanos); } x = dequeue();//出队 c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
View Code

 

4.3、public E take() throws InterruptedException

原理:

  • 将队头元素出队,如果队列空了,一直阻塞,直到队列不为空或者线程被中断

使用方法:

try {            abq.take();        } catch (InterruptedException e) {            e.printStackTrace();        }
View Code

源代码:

/**     * 出队:     * 如果队列空了,一直阻塞,直到队列不为空或者线程被中断     */    public E take() throws InterruptedException {        E x;        int c = -1;        final AtomicInteger count = this.count;//获取队列中的元素总量        final ReentrantLock takeLock = this.takeLock;        takeLock.lockInterruptibly();//获取出队锁        try {            while (count.get() == 0) {
//如果没有元素,一直阻塞 /* * 加入等待队列, 一直等待条件notEmpty(即被其他线程唤醒) * (唤醒其实就是,有线程将一个元素入队了,然后调用notEmpty.signal()唤醒其他等待这个条件的线程,同时队列也不空了) */ notEmpty.await(); } x = dequeue();//出队 c = count.getAndDecrement();//元素数量-1 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
View Code

 

总结:

1、具体入队与出队的原理图

图中每一个节点前半部分表示封装的数据x,后边的表示指向的下一个引用。

1.1、初始化

 初始化之后,初始化一个数据为null,且head和last节点都是这个节点。

1.2、入队两个元素过后

这个可以根据入队方法enqueue(E x)来看,源代码再贴一遍:

/**     * 创建一个节点,并加入链表尾部     *      * @param x     */    private void enqueue(E x) {        /*         * 封装新节点,并赋给当前的最后一个节点的下一个节点,然后在将这个节点设为最后一个节点         */        last = last.next = new Node
(x); }
View Code

其实这我们就可以发现其实真正意义上出队的头节点是Head节点的下一个节点。(这也就是Node这个内部类中对next的注释,我没有翻译)

1.3、出队一个元素后

表面上看,只是将头节点的next指针指向了要删除的x1.next,事实上这样我觉的就完全可以,但是jdk实际上是将原来的head节点删除了,而上边看到的这个head节点,正是刚刚出队的x1节点,只是其值被置空了。

这一块对应着源代码来看:dequeue()

/**     * 从队列头部移除一个节点     */    private E dequeue() {        Node
h = head;// 获取头节点:x==null Node
first = h.next;// 将头节点的下一个节点赋值给first h.next = h; // 将当前将要出队的节点置null(为了使其做head节点做准备) head = first;// 将当前将要出队的节点作为了头节点 E x = first.item;// 获取出队节点的值 first.item = null;// 将出队节点的值置空 return x; }
View Code

 

2、三种入队对比:

  • offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false-->不阻塞
  • put(E e):如果队列满了,一直阻塞,直到队列不满了或者线程被中断-->阻塞
  • offer(E e, long timeout, TimeUnit unit):在队尾插入一个元素,,如果队列已满,则进入等待,直到出现以下三种情况:-->阻塞
    • 被唤醒
    • 等待时间超时
    • 当前线程被中断

 

3、三种出队对比:

  • poll():如果没有元素,直接返回null;如果有元素,出队
  • take():如果队列空了,一直阻塞,直到队列不为空或者线程被中断-->阻塞
  • poll(long timeout, TimeUnit unit):如果队列不空,出队;如果队列已空且已经超时,返回null;如果队列已空且时间未超时,则进入等待,直到出现以下三种情况:
    • 被唤醒
    • 等待时间超时
    • 当前线程被中断

 

4、ArrayBlockingQueue与LinkedBlockingQueue对比

  • ArrayBlockingQueue:
    • 一个对象数组+一把锁+两个条件
    • 入队与出队都用同一把锁
    • 在只有入队高并发或出队高并发的情况下,因为操作数组,且不需要扩容,性能很高
    • 采用了数组,必须指定大小,即容量有限
  • LinkedBlockingQueue:
    • 一个单向链表+两把锁+两个条件
    • 两把锁,一把用于入队,一把用于出队,有效的避免了入队与出队时使用一把锁带来的竞争。
    • 在入队与出队都高并发的情况下,性能比ArrayBlockingQueue高很多
    • 采用了链表,最大容量为整数最大值,可看做容量无限

 两个疑问:

  • 入队时:c==0是怎样出现的?
  • 出队时:c==capcity是怎样出现的?

这两个疑问,都是基于对于AtomicInteger的不熟,不明白LinkedBlockingQueue引用的这两个方法(getAndIncrement和getAndDecrement)先返回旧值还是新值,关于AtomicInteger的源码介绍,请查看《》,具体链接如下:

转载于:https://www.cnblogs.com/java-zhao/p/5135958.html

你可能感兴趣的文章
gui编程实践(3)--记事本界面 JMenuBar JMenu
查看>>
黑马程序员--抽象类与接口
查看>>
IaaS,PaaS,SaaS 的区别
查看>>
Python复习基础篇
查看>>
关于Cocos2d-x中背景音乐和音效的添加
查看>>
checkbox和文字对齐
查看>>
%s的用法
查看>>
java中==和equals
查看>>
CCActionPageTurn3D
查看>>
python random
查看>>
esp32-智能语音-cli(调试交互命令)
查看>>
netty与MQ使用心得
查看>>
关于dl dt dd 文字过长换行在移动端显示对齐的探讨总结
查看>>
swoolefy PHP的异步、并行、高性能网络通信引擎内置了Http/WebSocket服务器端/客户端...
查看>>
Python学习笔记
查看>>
unshift()与shift()
查看>>
使用 NPOI 、aspose实现execl模板公式计算
查看>>
行为型模式:中介者模式
查看>>
How to Notify Command to evaluate in mvvmlight
查看>>
33. Search in Rotated Sorted Array
查看>>