Java条件变量之阻塞队列实现
条件变量概念
条件变量,不需要代码层面盲目轮询一个条件,线程可以等待直到有其他线程唤醒。
条件变量使用
条件变量的使用,通常是基于一个互斥锁,并在while循环条件中使用
条件变量wait 分两步:
- 释放锁+该线程放入等待队列;
- 被唤醒后重新加锁;
条件变量demo
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class Test {
final ReentrantLock lock = new ReentrantLock();
final Condition condition = lock.newCondition();
final AtomicInteger total = new AtomicInteger(0);
public static void main(String[] args) {
Test test = new Test();
new Thread(() -> test.get(), "getThread").start();
new Thread(() -> test.set(), "setThread").start();
}
public void get() {
lock.lock();
try {
while (total.get() < 10) {
System.out.println(Thread.currentThread().getName() + " before await " + total.get());
// 条件变量等待唤醒(1. 释放锁+该线程放入等待队列;2. 被唤醒后重新加锁)
condition.await();
System.out.println(Thread.currentThread().getName() + " after await " + total.get());
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
System.out.println(Thread.currentThread().getName() + " total: " +total.get());
}
public void set() {
lock.lock();
try {
while (total.get() != 10) {
total.getAndIncrement();
}
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + " signal it");
// 唤醒条件变量上等待的线程
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
阻塞队列
JUC包中的LinkedBlockingQueue实现。
阻塞队列是线程安全的,应用场景在于可能有多个线程同时来读写队列。当队列是空的时候,多个读线程就要等待,当有元素时要被唤醒;同样当队列是满的时候,多个写线程也要等待,当队列不满的时候,也需要被唤醒。
这种等待与唤醒的线程同步关系,就适合用条件变量来实现。
两个条件变量
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
这个队列在读写上分了两把锁,也就对应了两个条件变量。
- 读线程上遇到空队列,需要通知写线程
- 写线程遇到满队列,需要通知读线程
以下是阻塞队列的take()和put()源码:
take()
读遇到空队列,通知写线程
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
// 获取并减少,返回值是-1前的值
c = count.getAndDecrement();
// 这里为什么还会通知notEmpty条件变量呢?
// A线程take()发现空队列阻塞,await释放锁;B线程take()进入,此时C线程已经put(),因此B线程能进入到这,可能c>1,触发通知A线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果刚是满的,上边-1了,这里就会在notFull条件变量上通知 put()阻塞的线程 可以put了
if (c == capacity)
signalNotFull();
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
put()
写遇到满队列,通知读线程
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 如果刚是空的,这里会在notEmpty条件变量上通知 take()阻塞的线程
if (c == 0)
signalNotEmpty();
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
阻塞队列实现类
阻塞队列 | 备注 |
---|---|
ArrayBlockingQueue | 由数组结构组成的有界阻塞队列 |
LinkedBlockingQueue | 由链表结构组成的有界阻塞队列 |
PriorityBlockingQueue | 支持优先级排序的无界阻塞队列 |
DelayQueue | 使用优先级队列实现的无界阻塞队列 |
SynchronousQueue | 不存储元素的阻塞队列 |
LinkedTransferQueue | 由链表结构组成的无界阻塞队列 |
LinkedBlockingDeque | 由链表结构组成的双向阻塞队列 |