JUC 并发工具包
CountDownLatch
结合前面的知识,我们知道Thread.join()
可以实现一个线程等待另一个线程结束
再执行,但是有时候我们可能并不需要等到另一个线程结束,只需要等待特定的操作结束后就可以,可能有人会说可以通过wait/notify
模型来实现,但是其实我们可以采用更加方便的工具类也就是 java.util.concurrent.CountDownLatch
。
Constructor and Description |
---|
CountDownLatch(int count) 构造一个以给定计数 CountDownLatch CountDownLatch。 |
案例
public class CountDownLatchTest {
private static Random random=new Random(System.currentTimeMillis());
private static ExecutorService executor=Executors.newFixedThreadPool(2);
private static CountDownLatch latch;
public static void main(String[] args) throws InterruptedException {
int []data=query();
//构造器的参数代表需要执行的先决条件的数量
latch=new CountDownLatch(data.length);
for (int i = 0; i <data.length ; i++) {
executor.execute(new SimpleRunnable(data,i,latch));
}
latch.await();
//异步关闭
executor.shutdown();
System.out.println("all of works done ");
}
static class SimpleRunnable implements Runnable{
private final int []data;
private final int index;
private final CountDownLatch latch;
public SimpleRunnable(int[] data, int index, CountDownLatch latch) {
this.data = data;
this.index = index;
this.latch=latch;
}
@Override
public void run() {
try {
Thread.sleep(random.nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
int value=data[index];
if (value%2==0) {
data[index]=2*value;
}else{
data[index]=10*value;
}
System.out.println(Thread.currentThread().getName()+" finished");
latch.countDown();
}
}
private static int[] query(){
return new int[]{1,2,3,4,5,6,7,8,9,10};
}
}
CountDownLatch
内部维护了一个未执行操作的计数器 count
这个 count 需要通过构造器传入,CountDownLatch 实例的 countdown 方法每执行一次count
就会减一 ,在count
减为 0 之前await
方法的执行线程会被暂停,
直到count
减为 0 的时候才会被唤醒继续执行。
/**
* @author imlgw.top
* @date 2019/7/24 15:09
*/
public class CountDownLatchTest2 {
private static int data;
public static void main(String[] args) throws InterruptedException {
final CountDownLatch latch=new CountDownLatch(4);
new Thread(() -> {
for (int i = 1; i <10; i++) {
data=i;
latch.countDown();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
latch.await();
System.out.println("data:"+data);
}
}
CountDownLatch 传入的 count 为 4,循环执行了 10 次,结果是多少?当然是 4 了,当减为 0 的时候 await 调用就会返回不会再等待,后面即使 count 已经减为 0 再减也没有什么意义。
API
Modifier and Type | Method and Description |
---|---|
void |
await() 导致当前线程等到锁存器计数到零,除非线程是 interrupted 。 |
boolean |
await(long timeout, TimeUnit unit) 使当前线程等待直到锁存器计数到零为止,除非线程为 interrupted 或指定的等待时间过去。 |
void |
countDown() 减少锁存器的计数,如果计数达到零,释放所有等待的线程。 |
long |
getCount() 返回当前计数。 |
String |
toString() 返回一个标识此锁存器的字符串及其状态。 |
CyclicBarrier
有时候多个线程可能需要相互等待对方执行到某一步然后再执行下一步。生活中就有很多这样的情形?比如相约爬山:大家约定好集合的地点然后等所有人到达指定的集合地点后再一起去,先到达的人需要等待后到达的人,只有所有人到齐后才会出发去爬山。
/**
* @author imlgw.top
* @date 2019/7/24 15:54
*/
public class CyclicbarrierTest1 {
public static void main(String[] args){
CyclicBarrier cyclicBarrier=new CyclicBarrier(2);
new Thread(()->{
try {
//do something
Thread.sleep(2000);
System.out.println("t1 finished");
cyclicBarrier.await();
System.out.println("other thread is done too");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
new Thread(()->{
try {
//do something
Thread.sleep(6000);
System.out.println("t2 finished");
cyclicBarrier.await();
System.out.println("other thread is done too");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
上面的代码就体现了这种情况,两个线程开始先分别做各自的事情,然后到达一个点之后,需要相互等待,先到达的等待后到达的,上面的例子中很显然 t1
先到达它需要等待t2
到达后才能继续运行。
同时,如果外界是想要知道这个 CyclicBarrier 的任务执行情况可以在构造函数中加入一个回调的Runnable
CyclicBarrier cyclicBarrier=new CyclicBarrier(2,()->{
System.out.println("t1,t2 both arrived");
});
这样的方式还是很优雅的。
如果细心的话其实你还会发现相比上面的 CountDown 多了一个 BrokenBarrierException,我们来看看 doc
如果有线程正在 waiting 而 Barrier 已经被 reset 了
If the barrier is {@link #reset} while any thread is waiting
or if the barrier {@linkplain #isBroken is broken} when
{@code await} is invoked, or while any thread is waiting, then
{@link BrokenBarrierException} is thrown.
如果某一个线程在 waiting 的时候被打断了,那么其他的 waiting 线程将会抛出这个异常
If any thread is {@linkplain Thread#interrupt interrupted} while waiting
then all other waiting threads will throw
{@link BrokenBarrierException} and the barrier is placed in the broken
state.
源码解析
CyclicBarrier
并没有借助 AQS 而是利用Condition
条件变量来实现的,关于Condition
后面会介绍。
doAwait()
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock(); //加锁
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count; //和 countDownLatch 类似
if (index == 0) { // tripped 所有线程都抵达了
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); //执行构造器中传入的 CallBack
ranAction = true;
nextGeneration(); //下一个轮回
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
nextGeneration()
private void nextGeneration() {
// signal completion of last generation
trip.signalAll(); //唤醒这个 barrier 上所有的等待线程,trip 是一个 condition
// set up next generation
count = parties; //count 复位
generation = new Generation();
}
其实也可以用 CountDownLatch 实现和 CyclicBarrier 类似的功能,这里就不再演示。
API
Modifier and Type | Method and Description |
---|---|
int |
await() 等待所有 parties 到达屏障,并且在这个障碍上调用await 。 |
int |
await(long timeout, TimeUnit unit) 等待所有 parties 已经在此屏障上调用 await ,或指定的等待时间过去。 |
int |
getNumberWaiting() 返回目前正在等待的线程的数量。 |
int |
getParties() 返回 parties |
boolean |
isBroken() 查询这个障碍是否处于 Broken |
void |
reset() 将屏障重置为初始状态。 |
区别
-
CountDownLatch
不能reset
(递减到 0 后不能复原),CyclicBarrier
正如其名是可以循环使用的。 -
CountDownLatch
工作线程之间互不关心,CyclicBarrier
所有线程必须到达一个共同的点才会继续执行
Exchanger
JDK1.5 引入的,看名字就猜得到大概是干嘛的,主要就是用于两个线程
之间的数据交换,其实也就相当于只有两个参与方的CyclicBarrier
当两个线程都达到exchanger point
(集合点) 就会进行数据的交换,一手交钱,一手交货
所以在使用的时候也要注意两个线程能否正确的到达exchanger point
如果有一方无法到达则另一方就会陷入等待,当然你可以加上 timeout。另外这个只适用与两个线程,如果有 2 个以上的线程参与将会造成数据传输混乱,无法控制。
/**
* @author imlgw.top
* @date 2019/7/25 13:54
*/
public class ExchangerTest1 {
public static void main(String[] args) {
final Exchanger<String> exchanger=new Exchanger<>();
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" start");
try {
String res = exchanger.exchange("i am A",10,TimeUnit.SECONDS);
System.out.println("back msg=" +res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"done");
}).start();
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" start");
try {
String res = exchanger.exchange("i am B");
System.out.println("back msg=" +res);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"done");
});
}
}
线程安全问题
确实 Exchanger 如果使用不当会造成很多问题,所以在使用的时候一定要谨慎,首先它只适用与两个线程之间传输数据,其次这里传输的对象是同一个
,也就是说发送端和接收端的对象内存地址是一样的是一个对象。
/**
* @author imlgw.top
* @date 2019/7/25 15:14
*/
public class ExchangerTest2 {
public static void main(String[] args) {
final Exchanger<Simple> exchanger=new Exchanger<>();
new Thread(()->{
Simple simple = new Simple(1);
System.out.println(Thread.currentThread().getName()+" start. send to B"+simple);
try {
Simple res = exchanger.exchange(simple);
//休眠 10s
TimeUnit.SECONDS.sleep(3);
System.out.println("A receive from B obj:"+res +" data:"+res.a);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" done");
},"A").start();
new Thread(()->{
Simple simple = new Simple(2);
System.out.println(Thread.currentThread().getName()+" start. send to A:"+simple);
try {
Simple res = exchanger.exchange(simple);
//修改发送出去的 obj
simple.setA(100000);
System.out.println("B receive from A obj:"+res +" data:"+res.a);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" done");
},"B").start();
}
static class Simple{
private int a;
public Simple(int a) {
this.a = a;
}
public int getA() {
return a;
}
public void setA(int a) {
this.a = a;
}
}
}
结果
A start. send to Bjuc_study.tools.Exchanger.ExchangerTest2$Simple@72d6db17
B start. send to A:juc_study.tools.Exchanger.ExchangerTest2$Simple@4295c176
B receive from A obj:juc_study.tools.Exchanger.ExchangerTest2$Simple@72d6db17 data:1
B done
A receive from B obj:juc_study.tools.Exchanger.ExchangerTest2$Simple@4295c176 data:100000
A done
两个线程拿到的是同一个对象并不是拷贝,两个线程同时的去操作这个对象这其实是很危险的很有可能就会产生一些线程安全问题,使用的时候一定要注意
Semaphore
信号量,相信学过操作系统的同学肯定对这个很熟悉了,熟悉 PV 操作的话这个一看就懂了。
Constructor and Description |
---|
Semaphore(int permits) 创建一个 Semaphore 与给定数量的许可证,默认非公平锁 |
Semaphore(int permits, boolean fair) 创建一个 Semaphore 与给定数量的许可证和是否公平 |
Semaphore 实现一个显示锁
/**
* @author imlgw.top
* @date 2019/7/25 15:48
*/
public class SemaphoreTest1 {
public static void main(String[] args) {
final SemaphoreLock semaphoreLock=new SemaphoreLock();
new Thread(()->{
try {
semaphoreLock.lock();
System.out.println(Thread.currentThread().getName()+" get the lock");
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphoreLock.unlock();
System.out.println(Thread.currentThread().getName()+"release the lock");
}
}).start();
new Thread(()->{
try {
semaphoreLock.lock();
System.out.println(Thread.currentThread().getName()+" get the lock");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphoreLock.unlock();
System.out.println(Thread.currentThread().getName()+"release the lock");
}
}).start();
}
static class SemaphoreLock{
private final Semaphore semaphore =new Semaphore(1);
private void lock() throws InterruptedException {
semaphore.acquire();
}
public void unlock(){
semaphore.release();
}
}
}
实现生产者消费者模型
生产者消费者模型其实也挺重要的,有时候面试会让你手写一个生产者消费者模型,在之前的文章中其实有实现过一个但是那个其实还是很简单的一个,那个是没有 Buffer 的,下面这个用信号量实现的就是有 Buffer 的,后面会单独整理出所有的生产者消费者模型的实现方法。
/**
* @author imlgw.top
* @date 2019/7/25 16:43
*/
public class ProduceConsumerV4 {
public static void main(String[] args) {
ProduceConsumerV4 pc = new ProduceConsumerV4();
Stream.of("Produce1", "Produce2", "Produce3", "Produce4").forEach(n -> {
new Thread(() -> {
while (true) {
pc.produce();
}
}, n).start();
});
Stream.of("Consumer1", "Consumer2", "Consumer3", "Consumer4").forEach(n -> {
new Thread(() -> {
while (true) {
pc.consumer();
}
}, n).start();
});
}
//buffer 载体
private LinkedList<Object> buffer=new LinkedList<>();
private final Semaphore full = new Semaphore(0);
//buffer 最大值
private final Semaphore empty = new Semaphore(3);
//互斥锁
private final Semaphore mutex= new Semaphore(1);
public void produce() {
//已经生产了
try {
empty.acquire();
//不能和上面的信号量交换
mutex.acquire();
buffer.add(new Object());
System.out.println(Thread.currentThread().getName() + " produce a obj , current list size:" +buffer.size());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mutex.release();
full.release();
}
}
public void consumer() {
try {
full.acquire();
mutex.acquire();
//移除最后一个
buffer.removeLast();
System.out.println(Thread.currentThread().getName() + " consumer a obj, current size: " + buffer.size());//consumer
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mutex.release();
empty.release();
}
}
}
API
Modifier and Type | Method and Description |
---|---|
void |
acquire() 从该信号量获取许可证,阻塞直到获取到,可以被 interrupt |
void |
acquire(int permits) 从该信号量获取给定数量的许可证,阻塞直到获取到可以被 interrupt |
void |
acquireUninterruptibly() 从该信号量获取许可证,阻塞直到获取到,无视 interrupt,不会被 interrupt |
void |
acquireUninterruptibly(int permits) 从该信号量获取给定数量的许可证,阻止直到获取到,不可被 interrupt。 |
int |
availablePermits() 返回此信号量中当前可用的许可数。 |
int |
drainPermits() 获取并返回所有可立即获得的许可证。 |
protected Collection<Thread> |
getQueuedThreads() 返回一个包含可能 正在等待获取的线程的集合。 |
int |
getQueueLength() 返回等待获取的线程数的估计值。 |
boolean |
hasQueuedThreads() 查询是否有线程等待获取。 |
boolean |
isFair() 是不是公平锁 |
protected void |
reducePermits(int reduction) 缩小可用许可证的数量。 |
void |
release() 释放许可证,将其返回到信号量。(0 的时候也可以 release) |
void |
release(int permits) 释放给定数量的许可证,将其返回到信号量。 |
String |
toString() 返回一个标识此信号量的字符串及其状态。 |
boolean |
tryAcquire() 从这个信号量获得许可证,立即返回,不会阻塞。 |
boolean |
tryAcquire(int permits) 从这个信号量获取给定数量的许可证,不会阻塞 |
boolean |
tryAcquire(int permits, long timeout, TimeUnit unit) 从该信号量获取给定数量的许可证,如果在给定的等待时间获取到,就返回 true,可以被打断 |
boolean |
tryAcquire(long timeout, TimeUnit unit) 从该信号量获取许可证,如果在给定的等待时间获取到,就返回 true,可以被打断 |
Lock
通常被称为显式锁,与之对应的synchroized
则被称为内部锁,显式锁是 jdk1.5 引入的锁,他的作用与内部锁相同,但是它的功能比内部锁更加强大,但是并不是内部锁的替代品,其实在之前的 Java 多线程基础 一文中就手写过一个带有限时等待
的锁 ,那个其实就是模仿的Lock
接口
ReentrantLock
ReentrantLock 是 Lock 接口的一个实现类也是用的最多的一个实现类,Reentrant
本意就是 可重入的,可再入的, 表示当一个线程试图获取一个它已经获取的锁时,这个获取动作就自动成功,synchnorized
也是可重入的。
Constructor and Description |
---|
ReentrantLock() 创建一个 ReentrantLock 的实例。 |
ReentrantLock(boolean fair) 根据给定的公平策略创建一个 ReentrantLock 的实例。 |
/**
* @author imlgw.top
* @date 2019/7/25 20:07
*/
public class ReentrantLockTest1 {
private static final ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread thread0 = new Thread(() -> needLock() );
Thread thread1 = new Thread(() -> needLock() );
thread0.start();
thread1.start();
TimeUnit.SECONDS.sleep(3);
thread1.interrupt();
}
public static void needLock() {
try {
//可打断的获取锁
lock.lockInterruptibly();
System.out.println(Thread.currentThread().getName()+" is get the lock");
while (true){
//空转
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock(); //确保锁的释放
}
}
}
测试结果
Thread-0 is get the lock
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at juc_study.tools.Lock.ReentrantLockTest1.needLock(ReentrantLockTest1.java:26)
at juc_study.tools.Lock.ReentrantLockTest1.lambda$main$1(ReentrantLockTest1.java:16)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "Thread-1" java.lang.IllegalMonitorStateException
at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457)
at juc_study.tools.Lock.ReentrantLockTest1.needLock(ReentrantLockTest1.java:34)
at juc_study.tools.Lock.ReentrantLockTest1.lambda$main$1(ReentrantLockTest1.java:16)
at java.lang.Thread.run(Thread.java:748)
API
Modifier and Type | Method and Description |
---|---|
int |
getHoldCount() 当前线程调用 lock() 的次数 (重入的次数) |
protected Thread |
getOwner() 返回当前拥有此锁的线程,如果没有,返回 null 。 |
protected Collection<Thread> |
getQueuedThreads() 返回包含可能正在等待获取此锁的线程的集合。 |
int |
getQueueLength() 返回等待获取此锁的线程数的估计值。 |
protected Collection<Thread> |
getWaitingThreads(Condition condition) 返回包含可能在与此锁相关联的给定条件下等待的线程的集合。 |
int |
getWaitQueueLength(Condition condition) 返回与此锁相关联的给定条件等待的线程数的估计。 |
boolean |
hasQueuedThread(Thread thread) 查询给定线程是否等待获取此锁。 |
boolean |
hasQueuedThreads() 查询是否有线程正在等待获取此锁。 |
boolean |
hasWaiters(Condition condition) 查询任何线程是否等待与此锁相关联的给定条件。 |
boolean |
isFair() 如果此锁的公平设置为 true,则返回 true 。 |
boolean |
isHeldByCurrentThread() 查询此锁是否由当前线程持有。 |
boolean |
isLocked() 查询此锁是否由任何线程持有。 |
void |
lock() 获得锁,不能被打断 |
void |
lockInterruptibly() 获得锁,可以被打断 |
Condition |
newCondition() 返回Condition 用于这种用途实例Lock 实例。 |
String |
toString() 返回一个标识此锁的字符串以及其锁定状态。 |
boolean |
tryLock() 尝试获取该锁,不会阻塞,直接返回, |
boolean |
tryLock(long timeout, TimeUnit unit) 尝试在给定时间内获取锁,可以被打断 |
void |
unlock() 释放锁。 |
ReadWriteLock
看名字就知道是干啥的了,之前我们在 多线程设计模式 中也提到了读写锁,并且实现了一个简易的读写锁。
读写锁案例
/**
* @author imlgw.top
* @date 2019/7/26 12:15
*/
public class ReadWriteLockTest1 {
private final static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock(false);
private final static Lock readLock=readWriteLock.readLock();
private final static Lock writeLock=readWriteLock.writeLock();
private static final List<Long> data=new ArrayList<>();
public static void main(String[] args) {
Thread thread0 = new Thread(() -> write());
thread0.start();
Thread thread1 = new Thread(() -> read());
thread1.start();
Thread thread2 = new Thread(() -> read());
thread2.start();
}
public static void write(){
try {
writeLock.lock();
data.add(System.currentTimeMillis());
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
}
public static void read(){
try {
readLock.lock();
data.forEach(System.out::println);
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName()+" ====");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
}
}
显式锁对比内部锁
显式锁和内部锁都各有优缺点,谁也不能替代谁(这里的显式锁主要指ReentrantLock
)。
特性上
-
显式锁支持公平锁(显式锁,内部锁默认都是非公平的)
-
显式锁可以
tryLock()
无需等待,内部锁只能一直等待锁释放 -
显式锁有带超时的
tryLock(long timeout, TimeUnit unit)
。 -
显式锁可以响应中断请求
lockInterruptibly()
-
显式锁提供了一系列的方法对锁的相关信息监控,内部锁则没有
-
………
性能上
其实很多人会认为synchronized
性能很差,不如显式锁
- jdk1.5 中,在高争用的情况下,确实显式锁要优于内部锁
- jdk1.5 之后, 对内部锁进行了一些优化,包括 锁消除,锁粗化,偏向锁,和适应性锁 (这些优化后面的文章会再做解释) 。这些优化使得内部锁的性能提升了很多,甚至在低争用情况下性能还要优于 显式锁。
使用上
-
Lock 不是 Java 语言内置的,synchronized 是 Java 语言的关键字,因此是内置特性。Lock 是一个类,通过这个类可以实现同步访问。
-
Lock 和 synchronized 有一点非常大的不同,采用 synchronized 不需要用户去手动释放锁,当 synchronized 方法或者 synchronized 代码块执行完之后,系统会自动让线程释放对锁的占用,而 Lock 则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出现死锁现象。
其实很明显内部锁相比显式锁使用起来要简单易用,所以保守一点默认优先使用内部锁,在需要显式锁的特性的时候再选用显式锁。
Condition
Condition 对应的其实就是显式锁的通信方法,Lock.newCondition()
返回的就是一个 Condition 实例
Condition
接口的await()/signal()
其实就对应了 synchronize
的 wait()/notify()
,但是相对于 wait()/notify()
Condition 接口解决了 过早唤醒 问题以及 Object.wait(time)
无法区分是否是由于等待超时还是被唤醒的问题。
Object.wait()/notify()
要求执行线程持有所属对象的内部锁,同样Condition.await()/notify()
也需要线程持有创建该 Condition 的显式锁,每个 Condition
实例内部都维护了一个等待队列,不同的 Condition 之间不会相互影响,这样一来就解决了过早唤醒的问题,对于生产者消费者问题我们可以分别给消费者和生产者创建一个 Condition,生产者通知消费者时候只会唤醒消费者,消费者通知生产者的时候也只会通知生产者。
解决过早唤醒
/**
* @author imlgw.top
* @date 2019/7/26 15:09
*/
public class ConditionTest1 {
private static final ReentrantLock LOCK = new ReentrantLock();
private static final Condition pCondition = LOCK.newCondition();
private static final Condition cCondition = LOCK.newCondition();
private static LinkedList<Long> buffer = new LinkedList<>();
private static final Integer MAX_BUFFER = 5;
public static void main(String[] args) {
ConditionTest1 conditionTest1 = new ConditionTest1();
Stream.of("p1", "p2", "p3", "p4", "p5", "p6", "p7", "p8", "p9", "p10", "p11").forEach(name -> {
new Thread(() -> conditionTest1.produce(), name).start();
});
Stream.of("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11").forEach(name -> {
new Thread(() -> conditionTest1.consumer(), name).start();
});
}
public void produce() {
try {
LOCK.lock();
while (buffer.size() >= MAX_BUFFER) {
pCondition.await();
}
long l = System.currentTimeMillis();
buffer.add(l);
System.out.println(Thread.currentThread().getName() + " produced " + l + " ,current buffer size " + buffer.size());
//通知消费者
cCondition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
}
public void consumer() {
try {
LOCK.lock();
while (buffer.size() == 0) {
cCondition.await();
}
Long aLong = buffer.removeLast();
System.out.println(Thread.currentThread().getName() + " consumer " + aLong + " ,current buffer size " + buffer.size());
//通知生产者
pCondition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
}
}
解决 Object.wait() 无法区分其返回原因
/**
* @author imlgw.top
* @date 2019/7/26 17:48
*/
public class ConditionTest2 {
private static final Object obj = new Object();
private static final ReentrantLock LOCK = new ReentrantLock();
private static final Condition condition = LOCK.newCondition();
private static boolean isReady = false;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
//3s+
Date date = new Date(System.currentTimeMillis() + 3000);
try {
LOCK.lock();
while (!isReady) {
boolean b = condition.awaitUntil(date);
if (!b) {
System.out.println("Fucking t0!!!!!, i am timeout");
return;
}
System.out.println("oh i get the lock!!!");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
}, "t1").start();
TimeUnit.SECONDS.sleep(5);
new Thread(() -> {
try {
LOCK.lock();
isReady = true;
condition.signal();
} finally {
LOCK.unlock();
}
}, "t0").start();
}
}
可见** awaitUntil()** 是有一个返回值的,返回 true 则表示在等待时间内获取到了锁,反之则是因为超时。
写这个 Demo 的时候是拿之前的改的,然后有一个地方的 notify 忘了改成 signal,然后一直抛异常。因为 condition 里面也有 wait/notify 方法所以使用的时候一定要注意不要调错了。
API
Modifier and Type | Method and Description |
---|---|
void |
await() 使当前线程等待直到发出信号或中断 |
boolean |
await(long time, TimeUnit unit) 使当前线程等待直到发出信号或中断,或指定的等待时间过去。 |
long |
awaitNanos(long nanosTimeout) 使当前线程等待直到发出信号或中断,或指定的等待时间过去。 |
void |
awaitUninterruptibly() 使当前线程等待直到发出信号。 |
boolean |
awaitUntil(Date deadline) 使当前线程等待直到发出信号或中断,或者指定的最后期限到达。 |
void |
signal() 唤醒一个等待线程。 |
void |
signalAll() 唤醒所有等待线程。 |
StampedLock
StampedLock
是Java8
引入的一种新的锁机制,是对读写锁ReentrantReadWriteLock
的增强,读写锁虽然分离了读和写的功能,使得读与读之间不互斥,但是读和写之间依然是互斥的,本质上仍然是悲观锁,如果有大量的读线程就会引起写线程的饥饿,而StampedLock
则提供了一种乐观的读策略,这种乐观策略的锁非常类似于无锁的操作,使得乐观锁完全不会写线程
悲观锁策略
/**
* @author imlgw.top
* @date 2019/7/28 13:47
*/
public class StampedLockTest1 {
private static final StampedLock stampedLock = new StampedLock();
private final static List<Long> shareData = new ArrayList<>();
public static void main(String[] args) {
final ExecutorService executorService = Executors.newFixedThreadPool(10);
Runnable readRunnable = () -> {
while (true) {
read();
}
};
Runnable writeRunnable = () -> {
while (true) {
write();
}
};
executorService.submit(readRunnable);
executorService.submit(writeRunnable);
}
private static void read() {
long stamped = -1;
try {
stamped = stampedLock.readLock();
System.out.println(Thread.currentThread().getName() + " read " + shareData);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
stampedLock.unlockRead(stamped);
}
}
private static void write() {
long stamp = -1;
try {
stamp = stampedLock.writeLock();
shareData.add(System.currentTimeMillis());
System.out.println(Thread.currentThread().getName() + " Write ");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
stampedLock.unlockWrite(stamp);
}
}
}
其实这种方式就和ReadWriteLock
是等价的了。
乐观读策略
相对上面的方式,其实就是改变了读的策略
private static void read() {
long stamp = stampedLock.tryOptimisticRead(); //非阻塞
//暂存 res
String res=Thread.currentThread().getName() + " read " + shareData;
if (!stampedLock.validate(stamp)) {
try {
//验证失败,说明有线程进行了写操作,可能造成数据不一致
//进行锁升级,获取共享读锁
stamp = stampedLock.readLock();
//覆盖 res
res=Thread.currentThread().getName() + " read " + shareData;
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
stampedLock.unlockRead(stamp);
}
}
System.out.println(res);
}
先尝试获取乐观读锁(非阻塞)返回一个戳,然后进行读操作,缓存读的结果,然后根据前面返回的戳验证在此过程中是否有写线程被占用过,如果被占用过就表示数据可能不一致了,就需要转换成普通的共享读锁
,再次读取数据刷新结果,保证数据的一致性,最后释放锁。
这里一开始被视频里面讲的搞懵了他写的是这样的
private static void read() {
long stamp = stampedLock.tryOptimisticRead(); //非阻塞
if (stampedLock.validate(stamp)) {
try {
stamp = stampedLock.readLock();
System.out.println(Thread.currentThread().getName() + " read " + shareData);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
stampedLock.unlockRead(stamp);
}
}
}
然后就说这样比 ReadWriteLock 效率高很多,我越看越觉得不对劲,然后自己查了下,看了下官方的 Demo 发现他写的确实是错的。
API 什么的就不多说了,这里如果想了解更多可以看看 这篇文章
另外如果想看看性能对比的可以看看 这篇文章
这里还要存个疑,最后的释放读锁之后到最后一步的时候不是也有可能有写线程进入么,那样不是也会造成数据不一致的情况么? 因为源码上注释的 Demo 也是这样写的所以这里还是有点疑问的。
需要注意的地方
-
所有获取锁的方法,都返回一个邮戳(Stamp),Stamp 为 0 表示获取失败,其余都表示成功;
-
所有释放锁的方法,都需要一个邮戳(Stamp),这个 Stamp 必须是和成功获取锁时得到的 Stamp 一致;
-
StampedLock 是不可重入的 (如果一个线程已经持有了写锁,再去获取写锁的话就会造成死锁),所以这就要我们开发人员不要去修改返回的戳或者让它逃逸出去
-
StampedLock 不支持 Condition
ForkJoin
ForkJoin 是 Java7 提供的原生多线程并行处理框架,其基本思想是将大任务分割成小任务,最后将小任务聚合起来得到结果。fork 是分解的意思,join 是收集的意思。它非常类似于 HADOOP 提供的 MapReduce 框架,只是 MapReduce 的任务可以针对集群内的所有计算节点,可以充分利用集群的能力完成计算任务。ForkJoin 更加类似于单机版的 MapReduce。
RecursiveTask
计算 sum 和
/**
* @author imlgw.top
* @date 2019/7/30 11:58
*/
public class RecursiveTest1 {
//这个值代表一个线程可以处理的最大数据,不能太小也不能太大
private final static int MAX_THRESHOLD=3;
public static void main(String[] args) {
final ForkJoinPool forkJoinPool=new ForkJoinPool();
ForkJoinTask<Integer> submit = forkJoinPool.submit(new CaculateRecursiveTask(0, 100));
try {
Integer integer = submit.get();
System.out.println(integer);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
private static class CaculateRecursiveTask extends RecursiveTask<Integer>{
private final int start;
private final int end;
private CaculateRecursiveTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if(end-start<=MAX_THRESHOLD){
return IntStream.rangeClosed(start,end).sum();
}else{
int mid=(start+end)/2;
CaculateRecursiveTask leftTask=new CaculateRecursiveTask(start,mid);
CaculateRecursiveTask rightTask=new CaculateRecursiveTask(mid+1,end);
//阻塞
leftTask.fork();
rightTask.fork();
//返回结果
return leftTask.join()+rightTask.join();
}
}
}
}
RecursiveAction
计算 sum 和
/**
* @author imlgw.top
* @date 2019/7/30 13:08
*/
public class RecursiveActionTest1 {
private final static int MAX_THRESHOLD=3;
private static AtomicInteger SUM=new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
final ForkJoinPool forkJoinPool=new ForkJoinPool();
forkJoinPool.submit(new CalculateRecursiveAction(0, 100));
TimeUnit.SECONDS.sleep(3);
System.out.println(SUM.get());
}
private static class CalculateRecursiveAction extends RecursiveAction{
private final int start;
private final int end;
private CalculateRecursiveAction(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if(end-start<=MAX_THRESHOLD){
SUM.addAndGet(IntStream.rangeClosed(start,end).sum());
}else{
int mid=(start+end)/2;
CalculateRecursiveAction leftTask=new CalculateRecursiveAction(start,mid);
CalculateRecursiveAction rightTask=new CalculateRecursiveAction(mid+1,end);
//阻塞
leftTask.fork();
rightTask.fork();
}
}
}
}
Phaser
CountDownLatch 和 CyclicBarrier 都是 JDK 1.5 引入的,而 Phaser 是 JDK 1.7 引入的。Phaser 的功能与 CountDownLatch 和 CyclicBarrier 有部分重叠,同时也提供了更丰富的语义和更灵活的用法。
Phaser 顾名思义,与阶段相关。Phaser 比较适合这样一种场景,一种任务可以分为多个阶段,现希望多个线程去处理该批任务,对于每个阶段,多个线程可以并发进行,但是希望保证只有前面一个阶段的任务完成之后才能开始后面的任务。这种场景可以使用多个 CyclicBarrier 来实现,每个 CyclicBarrier 负责等待一个阶段的任务全部完成。但是使用 CyclicBarrier 的缺点在于,需要明确知道总共有多少个阶段,同时并行的任务数需要提前预定义好,且无法动态修改。而 Phaser 可同时解决这两个问题。
/**
* @author imlgw.top
* @date 2019/7/30 13:44
*/
public class PhaserTest1 {
public static void main(String[] args) {
final Phaser phaser = new Phaser();
Stream.of("t1", "t2", "t3", "t4", "t5").forEach(name -> new Thread(new Task(phaser), name).start());
//注册 main 线程
phaser.register();
phaser.arriveAndAwaitAdvance();
System.out.println("all of work finished");
}
static class Task implements Runnable {
private final Phaser phaser;
Task(Phaser phaser) {
this.phaser = phaser;
//动态增加
this.phaser.register();
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is working");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//
phaser.arriveAndAwaitAdvance();
}
}
}
API
Modifier and Type | Method and Description |
---|---|
int |
arrive() 抵达这个移相器,而不用等待别人到达。 |
int |
arriveAndAwaitAdvance() 到达这个移相器,等待其他人。 |
int |
arriveAndDeregister() 到达这个移相器并从其中注销,而无需等待别人到达。 |
int |
awaitAdvance(int phase) 等待该相位器的相位从给定相位值前进,如果当前相位不等于给定相位值,则立即返回,或者该相位器被终止。 |
int |
awaitAdvanceInterruptibly(int phase) 等待该移相器的阶段从给定的相位值推进,如果在等待时 InterruptedException 则抛出 InterruptedException ,或者如果当前相位不等于给定的相位值或者该相位器被终止,则立即返回。 |
int |
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) 等待该移相器的阶段从给定的相位值或给定的超时时间 InterruptedException 到等待时抛出 InterruptedException ,如果当前相位不等于给定的相位值,则立即返回,或者该相位器被终止。 |
int |
bulkRegister(int parties) 增加给定数量的新的有争议的派对到这个移相器。 |
void |
forceTermination() 强制此移相器进入终止状态。 |
int |
getArrivedParties() 返回在此移相器的当前阶段到达的已注册方的数量。 |
Phaser |
getParent() 返回此移相器的父级,如果没有,则返回 null 。 |
int |
getPhase() 返回当前相位数。 |
int |
getRegisteredParties() 返回在此移动设备上注册的各方数量。 |
Phaser |
getRoot() 返回此移相器的根祖先,如果它没有父代,则与该移相器相同。 |
int |
getUnarrivedParties() 返回尚未到达此移相器当前阶段的已注册方的数量。 |
boolean |
isTerminated() 返回 true 如果移相器已被终止。 |
protected boolean |
onAdvance(int phase, int registeredParties) 在即将进行的相位提前执行动作的可覆盖方法,并控制终止。 |
int |
register() 添加一个新的 unririved party 到这个移相器。 |
String |
toString() 返回一个标识此移相器的字符串及其状态。 |
总结
大致归纳了一些常见的并发工具,当然只是浅显的记录了一下怎样使用,原理部分还没有深入的了解,后续肯定会去研究底层实现源码包括AQS
和LockSupport
等等,其实越往后学就越想知道到底层是怎样个过程,到底是如何加锁如何解锁?CAS 又起到了什么作用?Synchnorized 底层又是如何实现?… 学无止境啊。JUC 除了这些并发工具外还有线程池 ,阻塞队列 还没介绍,后面会继续介绍。