CountDownLatch

结合前面的知识,我们知道Thread.join()可以实现一个线程等待另一个线程结束再执行,但是有时候我们可能并不需要等到另一个线程结束,只需要等待特定的操作结束后就可以,可能有人会说可以通过wait/notify模型来实现,但是其实我们可以采用更加方便的工具类也就是 java.util.concurrent.CountDownLatch

Constructor and Description
CountDownLatch(int count) 构造一个以给定计数 CountDownLatch CountDownLatch。

案例

public class CountDownLatchTest {

private static Random random=new Random(System.currentTimeMillis());

private static ExecutorService executor=Executors.newFixedThreadPool(2);

private static CountDownLatch latch;


public static void main(String[] args) throws InterruptedException {
int []data=query();
//构造器的参数代表需要执行的先决条件的数量
latch=new CountDownLatch(data.length);
for (int i = 0; i <data.length ; i++) {
executor.execute(new SimpleRunnable(data,i,latch));
}
latch.await();
//异步关闭
executor.shutdown();
System.out.println("all of works done ");

}

static class SimpleRunnable implements Runnable{
private final int []data;
private final int index;
private final CountDownLatch latch;


public SimpleRunnable(int[] data, int index, CountDownLatch latch) {
this.data = data;
this.index = index;
this.latch=latch;
}

@Override
public void run() {
try {
Thread.sleep(random.nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
int value=data[index];
if (value%2==0) {
data[index]=2*value;
}else{
data[index]=10*value;
}
System.out.println(Thread.currentThread().getName()+" finished");
latch.countDown();
}
}

private static int[] query(){
return new int[]{1,2,3,4,5,6,7,8,9,10};
}
}

CountDownLatch内部维护了一个未执行操作的计数器 count这个count需要通过构造器传入,CountDownLatch 实例的countdown方法每执行一次count 就会减一 ,在count 减为0之前await 方法的执行线程会被暂停,

直到count减为0的时候才会被唤醒继续执行。

/**
* @author imlgw.top
* @date 2019/7/24 15:09
*/
public class CountDownLatchTest2 {
private static int data;

public static void main(String[] args) throws InterruptedException {
final CountDownLatch latch=new CountDownLatch(4);

new Thread(() -> {
for (int i = 1; i <10; i++) {
data=i;
latch.countDown();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
latch.await();
System.out.println("data:"+data);
}
}

CountDownLatch传入的count为4,循环执行了10次,结果是多少?当然是4了,当减为0的时候await调用就会返回不会再等待,后面即使count已经减为0再减也没有什么意义。

API

Modifier and Type Method and Description
void await() 导致当前线程等到锁存器计数到零,除非线程是 interrupted 。
boolean await(long timeout, TimeUnit unit) 使当前线程等待直到锁存器计数到零为止,除非线程为 interrupted或指定的等待时间过去。
void countDown() 减少锁存器的计数,如果计数达到零,释放所有等待的线程。
long getCount() 返回当前计数。
String toString() 返回一个标识此锁存器的字符串及其状态。

CyclicBarrier

有时候多个线程可能需要相互等待对方执行到某一步然后再执行下一步。生活中就有很多这样的情形?比如相约爬山:大家约定好集合的地点然后等所有人到达指定的集合地点后再一起去,先到达的人需要等待后到达的人,只有所有人到齐后才会出发去爬山。

/**
* @author imlgw.top
* @date 2019/7/24 15:54
*/
public class CyclicbarrierTest1 {
public static void main(String[] args){
CyclicBarrier cyclicBarrier=new CyclicBarrier(2);
new Thread(()->{
try {
//do something
Thread.sleep(2000);
System.out.println("t1 finished");
cyclicBarrier.await();
System.out.println("other thread is done too");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();

new Thread(()->{
try {
//do something
Thread.sleep(6000);
System.out.println("t2 finished");
cyclicBarrier.await();
System.out.println("other thread is done too");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}

上面的代码就体现了这种情况,两个线程开始先分别做各自的事情,然后到达一个点之后,需要相互等待,先到达的等待后到达的,上面的例子中很显然 t1 先到达它需要等待t2 到达后才能继续运行。

同时,如果外界是想要知道这个CyclicBarrier的任务执行情况可以在构造函数中加入一个回调的Runnable

CyclicBarrier cyclicBarrier=new CyclicBarrier(2,()->{
System.out.println("t1,t2 both arrived");
});

这样的方式还是很优雅的。

如果细心的话其实你还会发现相比上面的CountDown多了一个BrokenBarrierException,我们来看看doc

如果有线程正在waiting而 Barrier已经被reset了

If the barrier is {@link #reset} while any thread is waiting

or if the barrier {@linkplain #isBroken is broken} when

{@code await} is invoked, or while any thread is waiting, then

{@link BrokenBarrierException} is thrown.

如果某一个线程在waiting的时候被打断了,那么其他的waiting线程将会抛出这个异常

If any thread is {@linkplain Thread#interrupt interrupted} while waiting

then all other waiting threads will throw

{@link BrokenBarrierException} and the barrier is placed in the broken

state.

源码解析

CyclicBarrier并没有借助AQS而是利用Condition条件变量来实现的,关于Condition后面会介绍。

doAwait()

private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock(); //加锁
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

int index = --count; //和countDownLatch类似
if (index == 0) { // tripped 所有线程都抵达了
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); //执行构造器中传入的CallBack
ranAction = true;
nextGeneration(); //下一个轮回
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

nextGeneration()

private void nextGeneration() {
// signal completion of last generation
trip.signalAll(); //唤醒这个barrier上所有的等待线程,trip是一个condition
// set up next generation
count = parties; //count复位
generation = new Generation();
}

其实也可以用CountDownLatch实现和CyclicBarrier类似的功能,这里就不再演示。

API

Modifier and Type Method and Description
int await() 等待所有 parties到达屏障,并且在这个障碍上调用await
int await(long timeout, TimeUnit unit) 等待所有 parties已经在此屏障上调用 await ,或指定的等待时间过去。
int getNumberWaiting() 返回目前正在等待的线程的数量。
int getParties() 返回parties
boolean isBroken() 查询这个障碍是否处于Broken
void reset() 将屏障重置为初始状态。

区别

  • CountDownLatch不能reset(递减到0后不能复原),CyclicBarrier正如其名是可以循环使用的。

  • CountDownLatch工作线程之间互不关心,CyclicBarrier所有线程必须到达一个共同的点才会继续执行

Exchanger

JDK1.5引入的,看名字就猜得到大概是干嘛的,主要就是用于两个线程 之间的数据交换,其实也就相当于只有两个参与方的CyclicBarrier 当两个线程都达到exchanger point (集合点) 就会进行数据的交换,一手交钱,一手交货 所以在使用的时候也要注意两个线程能否正确的到达exchanger point 如果有一方无法到达则另一方就会陷入等待,当然你可以加上timeout。另外这个只适用与两个线程,如果有2个以上的线程参与将会造成数据传输混乱,无法控制。

/**
* @author imlgw.top
* @date 2019/7/25 13:54
*/
public class ExchangerTest1 {
public static void main(String[] args) {
final Exchanger<String> exchanger=new Exchanger<>();
new Thread(()->{
System.out.println(Thread.currentThread().getName()+" start");
try {
String res = exchanger.exchange("i am A",10,TimeUnit.SECONDS);
System.out.println("back msg=" +res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"done");
}).start();


new Thread(()->{
System.out.println(Thread.currentThread().getName()+" start");
try {
String res = exchanger.exchange("i am B");
System.out.println("back msg=" +res);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"done");
});
}
}

线程安全问题

确实Exchanger如果使用不当会造成很多问题,所以在使用的时候一定要谨慎,首先它只适用与两个线程之间传输数据,其次这里传输的对象是同一个 ,也就是说发送端和接收端的对象内存地址是一样的是一个对象。

/**
* @author imlgw.top
* @date 2019/7/25 15:14
*/
public class ExchangerTest2 {
public static void main(String[] args) {
final Exchanger<Simple> exchanger=new Exchanger<>();
new Thread(()->{
Simple simple = new Simple(1);
System.out.println(Thread.currentThread().getName()+" start. send to B"+simple);
try {
Simple res = exchanger.exchange(simple);
//休眠10s
TimeUnit.SECONDS.sleep(3);
System.out.println("A receive from B obj:"+res +" data:"+res.a);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" done");
},"A").start();

new Thread(()->{
Simple simple = new Simple(2);
System.out.println(Thread.currentThread().getName()+" start. send to A:"+simple);
try {
Simple res = exchanger.exchange(simple);
//修改发送出去的obj
simple.setA(100000);
System.out.println("B receive from A obj:"+res +" data:"+res.a);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" done");
},"B").start();
}

static class Simple{
private int a;

public Simple(int a) {
this.a = a;
}

public int getA() {
return a;
}

public void setA(int a) {
this.a = a;
}
}
}

结果

A start. send to Bjuc_study.tools.Exchanger.ExchangerTest2$Simple@72d6db17
B start. send to A:juc_study.tools.Exchanger.ExchangerTest2$Simple@4295c176
B receive from A obj:juc_study.tools.Exchanger.ExchangerTest2$Simple@72d6db17 data:1
B done
A receive from B obj:juc_study.tools.Exchanger.ExchangerTest2$Simple@4295c176 data:100000
A done

两个线程拿到的是同一个对象并不是拷贝,两个线程同时的去操作这个对象这其实是很危险的很有可能就会产生一些线程安全问题,使用的时候一定要注意

Semaphore

信号量,相信学过操作系统的同学肯定对这个很熟悉了,熟悉PV操作的话这个一看就懂了。

Constructor and Description
Semaphore(int permits) 创建一个 Semaphore与给定数量的许可证,默认非公平锁
Semaphore(int permits, boolean fair) 创建一个 Semaphore与给定数量的许可证和是否公平

Semaphore实现一个显示锁

/**
* @author imlgw.top
* @date 2019/7/25 15:48
*/
public class SemaphoreTest1 {
public static void main(String[] args) {
final SemaphoreLock semaphoreLock=new SemaphoreLock();

new Thread(()->{
try {
semaphoreLock.lock();
System.out.println(Thread.currentThread().getName()+" get the lock");
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphoreLock.unlock();
System.out.println(Thread.currentThread().getName()+"release the lock");
}

}).start();

new Thread(()->{
try {
semaphoreLock.lock();
System.out.println(Thread.currentThread().getName()+" get the lock");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphoreLock.unlock();
System.out.println(Thread.currentThread().getName()+"release the lock");
}

}).start();
}

static class SemaphoreLock{
private final Semaphore semaphore =new Semaphore(1);

private void lock() throws InterruptedException {
semaphore.acquire();
}

public void unlock(){
semaphore.release();
}
}
}

实现生产者消费者模型

生产者消费者模型其实也挺重要的,有时候面试会让你手写一个生产者消费者模型,在之前的文章中其实有实现过一个但是那个其实还是很简单的一个,那个是没有Buffer的,下面这个用信号量实现的就是有Buffer的,后面会单独整理出所有的生产者消费者模型的实现方法。

/**
* @author imlgw.top
* @date 2019/7/25 16:43
*/
public class ProduceConsumerV4 {
public static void main(String[] args) {
ProduceConsumerV4 pc = new ProduceConsumerV4();
Stream.of("Produce1", "Produce2", "Produce3", "Produce4").forEach(n -> {
new Thread(() -> {
while (true) {
pc.produce();
}
}, n).start();
});
Stream.of("Consumer1", "Consumer2", "Consumer3", "Consumer4").forEach(n -> {
new Thread(() -> {
while (true) {
pc.consumer();
}
}, n).start();
});
}

//buffer载体
private LinkedList<Object> buffer=new LinkedList<>();

private final Semaphore full = new Semaphore(0);

//buffer最大值
private final Semaphore empty = new Semaphore(3);

//互斥锁
private final Semaphore mutex= new Semaphore(1);

public void produce() {
//已经生产了
try {
empty.acquire();
//不能和上面的信号量交换
mutex.acquire();
buffer.add(new Object());
System.out.println(Thread.currentThread().getName() + " produce a obj , current list size:" +buffer.size());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mutex.release();
full.release();
}
}

public void consumer() {
try {
full.acquire();
mutex.acquire();
//移除最后一个
buffer.removeLast();
System.out.println(Thread.currentThread().getName() + " consumer a obj, current size: " + buffer.size());//consumer
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mutex.release();
empty.release();
}
}
}

API

Modifier and Type Method and Description
void acquire() 从该信号量获取许可证,阻塞直到获取到,可以被interrupt
void acquire(int permits) 从该信号量获取给定数量的许可证,阻塞直到获取到可以被interrupt
void acquireUninterruptibly() 从该信号量获取许可证,阻塞直到获取到,无视interrupt,不会被interrupt
void acquireUninterruptibly(int permits) 从该信号量获取给定数量的许可证,阻止直到获取到,不可被interrupt。
int availablePermits() 返回此信号量中当前可用的许可数。
int drainPermits() 获取并返回所有可立即获得的许可证。
protected Collection<Thread> getQueuedThreads() 返回一个包含可能正在等待获取的线程的集合。
int getQueueLength() 返回等待获取的线程数的估计值。
boolean hasQueuedThreads() 查询是否有线程等待获取。
boolean isFair() 是不是公平锁
protected void reducePermits(int reduction) 缩小可用许可证的数量。
void release() 释放许可证,将其返回到信号量。(0的时候也可以release)
void release(int permits) 释放给定数量的许可证,将其返回到信号量。
String toString() 返回一个标识此信号量的字符串及其状态。
boolean tryAcquire() 从这个信号量获得许可证,立即返回,不会阻塞。
boolean tryAcquire(int permits) 从这个信号量获取给定数量的许可证,不会阻塞
boolean tryAcquire(int permits, long timeout, TimeUnit unit) 从该信号量获取给定数量的许可证,如果在给定的等待时间获取到,就返回true,可以被打断
boolean tryAcquire(long timeout, TimeUnit unit) 从该信号量获取许可证,如果在给定的等待时间获取到,就返回true,可以被打断

Lock

通常被称为显式锁,与之对应的synchroized 则被称为内部锁,显式锁是jdk1.5引入的锁,他的作用与内部锁相同,但是它的功能比内部锁更加强大,但是并不是内部锁的替代品,其实在之前的 Java多线程基础 一文中就手写过一个带有限时等待的锁 ,那个其实就是模仿的Lock 接口

ReentrantLock

ReentrantLock是Lock接口的一个实现类也是用的最多的一个实现类,Reentrant 本意就是 可重入的,可再入的, 表示当一个线程试图获取一个它已经获取的锁时,这个获取动作就自动成功,synchnorized也是可重入的。

Constructor and Description
ReentrantLock() 创建一个 ReentrantLock的实例。
ReentrantLock(boolean fair) 根据给定的公平策略创建一个 ReentrantLock的实例。
/**
* @author imlgw.top
* @date 2019/7/25 20:07
*/
public class ReentrantLockTest1 {

private static final ReentrantLock lock = new ReentrantLock();

public static void main(String[] args) throws InterruptedException {
Thread thread0 = new Thread(() -> needLock() );
Thread thread1 = new Thread(() -> needLock() );
thread0.start();
thread1.start();
TimeUnit.SECONDS.sleep(3);
thread1.interrupt();
}

public static void needLock() {
try {
//可打断的获取锁
lock.lockInterruptibly();
System.out.println(Thread.currentThread().getName()+" is get the lock");
while (true){
//空转
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock(); //确保锁的释放
}
}
}

测试结果

Thread-0 is get the lock
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at juc_study.tools.Lock.ReentrantLockTest1.needLock(ReentrantLockTest1.java:26)
at juc_study.tools.Lock.ReentrantLockTest1.lambda$main$1(ReentrantLockTest1.java:16)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "Thread-1" java.lang.IllegalMonitorStateException
at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457)
at juc_study.tools.Lock.ReentrantLockTest1.needLock(ReentrantLockTest1.java:34)
at juc_study.tools.Lock.ReentrantLockTest1.lambda$main$1(ReentrantLockTest1.java:16)
at java.lang.Thread.run(Thread.java:748)

API

Modifier and Type Method and Description
int getHoldCount() 当前线程调用lock()的次数 (重入的次数)
protected Thread getOwner() 返回当前拥有此锁的线程,如果没有,返回 null
protected Collection<Thread> getQueuedThreads() 返回包含可能正在等待获取此锁的线程的集合。
int getQueueLength() 返回等待获取此锁的线程数的估计值。
protected Collection<Thread> getWaitingThreads(Condition condition) 返回包含可能在与此锁相关联的给定条件下等待的线程的集合。
int getWaitQueueLength(Condition condition) 返回与此锁相关联的给定条件等待的线程数的估计。
boolean hasQueuedThread(Thread thread) 查询给定线程是否等待获取此锁。
boolean hasQueuedThreads() 查询是否有线程正在等待获取此锁。
boolean hasWaiters(Condition condition) 查询任何线程是否等待与此锁相关联的给定条件。
boolean isFair() 如果此锁的公平设置为true,则返回 true
boolean isHeldByCurrentThread() 查询此锁是否由当前线程持有。
boolean isLocked() 查询此锁是否由任何线程持有。
void lock() 获得锁,不能被打断
void lockInterruptibly() 获得锁,可以被打断
Condition newCondition() 返回Condition用于这种用途实例Lock实例。
String toString() 返回一个标识此锁的字符串以及其锁定状态。
boolean tryLock() 尝试获取该锁,不会阻塞,直接返回,
boolean tryLock(long timeout, TimeUnit unit) 尝试在给定时间内获取锁,可以被打断
void unlock() 释放锁。

ReadWriteLock

看名字就知道是干啥的了,之前我们在 多线程设计模式 中也提到了读写锁,并且实现了一个简易的读写锁。

读写锁案例

/**
* @author imlgw.top
* @date 2019/7/26 12:15
*/
public class ReadWriteLockTest1 {

private final static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock(false);

private final static Lock readLock=readWriteLock.readLock();

private final static Lock writeLock=readWriteLock.writeLock();

private static final List<Long> data=new ArrayList<>();

public static void main(String[] args) {
Thread thread0 = new Thread(() -> write());
thread0.start();

Thread thread1 = new Thread(() -> read());
thread1.start();

Thread thread2 = new Thread(() -> read());
thread2.start();
}

public static void write(){
try {
writeLock.lock();
data.add(System.currentTimeMillis());
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
}

public static void read(){
try {
readLock.lock();
data.forEach(System.out::println);
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName()+" ====");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
}
}

显式锁对比内部锁

显式锁和内部锁都各有优缺点,谁也不能替代谁(这里的显式锁主要指ReentrantLock)。

特性上

  • 显式锁支持公平锁(显式锁,内部锁默认都是非公平的)

  • 显式锁可以tryLock()无需等待,内部锁只能一直等待锁释放

  • 显式锁有带超时的tryLock(long timeout, TimeUnit unit)

  • 显式锁可以响应中断请求 lockInterruptibly()

  • 显式锁提供了一系列的方法对锁的相关信息监控,内部锁则没有

  • ………

性能上

其实很多人会认为synchronized性能很差,不如显式锁

  • jdk1.5中,在高争用的情况下,确实显式锁要优于内部锁
  • jdk1.5之后, 对内部锁进行了一些优化,包括 锁消除锁粗化偏向锁,和适应性锁 (这些优化后面的文章会再做解释) 。这些优化使得内部锁的性能提升了很多,甚至在低争用情况下性能还要优于 显式锁。

使用上

  • Lock不是Java语言内置的,synchronized是Java语言的关键字,因此是内置特性。Lock是一个类,通过这个类可以实现同步访问。

  • Lock和synchronized有一点非常大的不同,采用synchronized不需要用户去手动释放锁,当synchronized方法或者synchronized代码块执行完之后,系统会自动让线程释放对锁的占用,而Lock则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出现死锁现象。

其实很明显内部锁相比显式锁使用起来要简单易用,所以保守一点默认优先使用内部锁,在需要显式锁的特性的时候再选用显式锁。

Condition

Condition对应的其实就是显式锁的通信方法,Lock.newCondition() 返回的就是一个Condition实例

Condition接口的await()/signal() 其实就对应了 synchronizewait()/notify() ,但是相对于 wait()/notify() Condition接口解决了 过早唤醒 问题以及 Object.wait(time) 无法区分是否是由于等待超时还是被唤醒的问题。

Object.wait()/notify()要求执行线程持有所属对象的内部锁,同样Condition.await()/notify()也需要线程持有创建该Condition的显式锁,每个Condition实例内部都维护了一个等待队列,不同的Condition之间不会相互影响,这样一来就解决了过早唤醒的问题,对于生产者消费者问题我们可以分别给消费者和生产者创建一个Condition,生产者通知消费者时候只会唤醒消费者,消费者通知生产者的时候也只会通知生产者。

解决过早唤醒

/**
* @author imlgw.top
* @date 2019/7/26 15:09
*/
public class ConditionTest1 {

private static final ReentrantLock LOCK = new ReentrantLock();

private static final Condition pCondition = LOCK.newCondition();

private static final Condition cCondition = LOCK.newCondition();

private static LinkedList<Long> buffer = new LinkedList<>();

private static final Integer MAX_BUFFER = 5;

public static void main(String[] args) {
ConditionTest1 conditionTest1 = new ConditionTest1();
Stream.of("p1", "p2", "p3", "p4", "p5", "p6", "p7", "p8", "p9", "p10", "p11").forEach(name -> {
new Thread(() -> conditionTest1.produce(), name).start();
});

Stream.of("c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10", "c11").forEach(name -> {
new Thread(() -> conditionTest1.consumer(), name).start();
});
}

public void produce() {
try {
LOCK.lock();
while (buffer.size() >= MAX_BUFFER) {
pCondition.await();
}
long l = System.currentTimeMillis();
buffer.add(l);
System.out.println(Thread.currentThread().getName() + " produced " + l + " ,current buffer size " + buffer.size());
//通知消费者
cCondition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
}

public void consumer() {
try {
LOCK.lock();
while (buffer.size() == 0) {
cCondition.await();
}
Long aLong = buffer.removeLast();
System.out.println(Thread.currentThread().getName() + " consumer " + aLong + " ,current buffer size " + buffer.size());
//通知生产者
pCondition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
}
}

解决Object.wait()无法区分其返回原因

/**
* @author imlgw.top
* @date 2019/7/26 17:48
*/
public class ConditionTest2 {
private static final Object obj = new Object();

private static final ReentrantLock LOCK = new ReentrantLock();

private static final Condition condition = LOCK.newCondition();

private static boolean isReady = false;

public static void main(String[] args) throws InterruptedException {

new Thread(() -> {
//3s+
Date date = new Date(System.currentTimeMillis() + 3000);
try {
LOCK.lock();
while (!isReady) {
boolean b = condition.awaitUntil(date);
if (!b) {
System.out.println("Fucking t0!!!!!, i am timeout");
return;
}
System.out.println("oh i get the lock!!!");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
}, "t1").start();

TimeUnit.SECONDS.sleep(5);

new Thread(() -> {
try {
LOCK.lock();
isReady = true;
condition.signal();
} finally {
LOCK.unlock();
}
}, "t0").start();
}
}

可见awaitUntil() 是有一个返回值的,返回true则表示在等待时间内获取到了锁,反之则是因为超时.

写这个Demo的时候是拿之前的改的,然后有一个地方的notify忘了改成signal,然后一直抛异常。。。因为condition里面也有wait/notify方法所以使用的时候一定要注意不要调错了。

API

Modifier and Type Method and Description
void await() 使当前线程等待直到发出信号或中断
boolean await(long time, TimeUnit unit) 使当前线程等待直到发出信号或中断,或指定的等待时间过去。
long awaitNanos(long nanosTimeout) 使当前线程等待直到发出信号或中断,或指定的等待时间过去。
void awaitUninterruptibly() 使当前线程等待直到发出信号。
boolean awaitUntil(Date deadline) 使当前线程等待直到发出信号或中断,或者指定的最后期限到达。
void signal() 唤醒一个等待线程。
void signalAll() 唤醒所有等待线程。

StampedLock

StampedLockJava8引入的一种新的锁机制,是对读写锁ReentrantReadWriteLock的增强,读写锁虽然分离了读和写的功能,使得读与读之间不互斥,但是读和写之间依然是互斥的,本质上仍然是悲观锁,如果有大量的读线程就会引起写线程的饥饿,而StampedLock则提供了一种乐观的读策略,这种乐观策略的锁非常类似于无锁的操作,使得乐观锁完全不会写线程

悲观锁策略

/**
* @author imlgw.top
* @date 2019/7/28 13:47
*/
public class StampedLockTest1 {

private static final StampedLock stampedLock = new StampedLock();

private final static List<Long> shareData = new ArrayList<>();

public static void main(String[] args) {
final ExecutorService executorService = Executors.newFixedThreadPool(10);
Runnable readRunnable = () -> {
while (true) {
read();
}
};

Runnable writeRunnable = () -> {
while (true) {
write();
}
};
executorService.submit(readRunnable);
executorService.submit(readRunnable);
executorService.submit(readRunnable);
executorService.submit(readRunnable);
executorService.submit(readRunnable);
executorService.submit(readRunnable);
executorService.submit(readRunnable);
executorService.submit(readRunnable);
executorService.submit(readRunnable);

executorService.submit(writeRunnable);

}

private static void read() {
long stamped = -1;
try {
stamped = stampedLock.readLock();
System.out.println(Thread.currentThread().getName() + " read " + shareData);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
stampedLock.unlockRead(stamped);
}
}

private static void write() {
long stamp = -1;
try {
stamp = stampedLock.writeLock();
shareData.add(System.currentTimeMillis());
System.out.println(Thread.currentThread().getName() + " Write ");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
stampedLock.unlockWrite(stamp);
}
}
}

其实这种方式就和ReadWriteLock 是等价的了。

乐观读策略

相对上面的方式,其实就是改变了读的策略

private static void read() {
long stamp = stampedLock.tryOptimisticRead(); //非阻塞
//暂存res
String res=Thread.currentThread().getName() + " read " + shareData;
if (!stampedLock.validate(stamp)) {
try {
//验证失败,说明有线程进行了写操作,可能造成数据不一致
//进行锁升级,获取共享读锁
stamp = stampedLock.readLock();
//覆盖res
res=Thread.currentThread().getName() + " read " + shareData;
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
stampedLock.unlockRead(stamp);
}
}
System.out.println(res);
}

先尝试获取乐观读锁(非阻塞)返回一个戳,然后进行读操作,缓存读的结果,然后根据前面返回的戳验证在此过程中是否有写线程被占用过,如果被占用过就表示数据可能不一致了,就需要转换成普通的共享读锁 ,再次读取数据刷新结果,保证数据的一致性,最后释放锁。

这里一开始被视频里面讲的搞懵了他写的是这样的

private static void read() {
long stamp = stampedLock.tryOptimisticRead(); //非阻塞
if (stampedLock.validate(stamp)) {
try {
stamp = stampedLock.readLock();
System.out.println(Thread.currentThread().getName() + " read " + shareData);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
stampedLock.unlockRead(stamp);
}
}
}

然后就说这样比ReadWriteLock效率高很多,我越看越觉得不对劲,然后自己查了下,看了下官方的Demo发现他写的确实是错的。。。

API什么的就不多说了,这里如果想了解更多可以看看这篇文章

另外如果想看看性能对比的可以看看 这篇文章

这里还要存个疑,最后的释放读锁之后到最后一步的时候不是也有可能有写线程进入么,那样不是也会造成数据不一致的情况么? 因为源码上注释的Demo也是这样写的所以这里还是有点疑问的。

需要注意的地方

  • 所有获取锁的方法,都返回一个邮戳(Stamp),Stamp为0表示获取失败,其余都表示成功;

  • 所有释放锁的方法,都需要一个邮戳(Stamp),这个Stamp必须是和成功获取锁时得到的Stamp一致;

  • StampedLock是不可重入的 (如果一个线程已经持有了写锁,再去获取写锁的话就会造成死锁),所以这就要我们开发人员不要去修改返回的戳或者让它逃逸出去

  • StampedLock 不支持Condition

ForkJoin

ForkJoin是Java7提供的原生多线程并行处理框架,其基本思想是将大任务分割成小任务,最后将小任务聚合起来得到结果。fork是分解的意思, join是收集的意思. 它非常类似于HADOOP提供的MapReduce框架,只是MapReduce的任务可以针对集群内的所有计算节点,可以充分利用集群的能力完成计算任务。ForkJoin更加类似于单机版的MapReduce。

RecursiveTask

计算sum和

/**
* @author imlgw.top
* @date 2019/7/30 11:58
*/
public class RecursiveTest1 {

//这个值代表一个线程可以处理的最大数据,不能太小也不能太大
private final static int MAX_THRESHOLD=3;

public static void main(String[] args) {
final ForkJoinPool forkJoinPool=new ForkJoinPool();
ForkJoinTask<Integer> submit = forkJoinPool.submit(new CaculateRecursiveTask(0, 100));
try {
Integer integer = submit.get();
System.out.println(integer);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}

private static class CaculateRecursiveTask extends RecursiveTask<Integer>{

private final int start;
private final int end;

private CaculateRecursiveTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
if(end-start<=MAX_THRESHOLD){
return IntStream.rangeClosed(start,end).sum();
}else{
int mid=(start+end)/2;
CaculateRecursiveTask leftTask=new CaculateRecursiveTask(start,mid);
CaculateRecursiveTask rightTask=new CaculateRecursiveTask(mid+1,end);
//阻塞
leftTask.fork();
rightTask.fork();
//返回结果
return leftTask.join()+rightTask.join();
}
}
}
}

RecursiveAction

计算sum和

/**
* @author imlgw.top
* @date 2019/7/30 13:08
*/
public class RecursiveActionTest1 {

private final static int MAX_THRESHOLD=3;

private static AtomicInteger SUM=new AtomicInteger(0);

public static void main(String[] args) throws InterruptedException {
final ForkJoinPool forkJoinPool=new ForkJoinPool();
forkJoinPool.submit(new CalculateRecursiveAction(0, 100));
TimeUnit.SECONDS.sleep(3);
System.out.println(SUM.get());
}

private static class CalculateRecursiveAction extends RecursiveAction{

private final int start;
private final int end;

private CalculateRecursiveAction(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected void compute() {

if(end-start<=MAX_THRESHOLD){
SUM.addAndGet(IntStream.rangeClosed(start,end).sum());
}else{
int mid=(start+end)/2;
CalculateRecursiveAction leftTask=new CalculateRecursiveAction(start,mid);
CalculateRecursiveAction rightTask=new CalculateRecursiveAction(mid+1,end);
//阻塞
leftTask.fork();
rightTask.fork();
}
}
}
}

Phaser

CountDownLatch和CyclicBarrier都是JDK 1.5引入的,而Phaser是JDK 1.7引入的。Phaser的功能与CountDownLatch和CyclicBarrier有部分重叠,同时也提供了更丰富的语义和更灵活的用法。

Phaser顾名思义,与阶段相关。Phaser比较适合这样一种场景,一种任务可以分为多个阶段,现希望多个线程去处理该批任务,对于每个阶段,多个线程可以并发进行,但是希望保证只有前面一个阶段的任务完成之后才能开始后面的任务。这种场景可以使用多个CyclicBarrier来实现,每个CyclicBarrier负责等待一个阶段的任务全部完成。但是使用CyclicBarrier的缺点在于,需要明确知道总共有多少个阶段,同时并行的任务数需要提前预定义好,且无法动态修改。而Phaser可同时解决这两个问题。

/**
* @author imlgw.top
* @date 2019/7/30 13:44
*/
public class PhaserTest1 {
public static void main(String[] args) {
final Phaser phaser = new Phaser();

Stream.of("t1", "t2", "t3", "t4", "t5").forEach(name -> new Thread(new Task(phaser), name).start());
//注册main线程
phaser.register();
phaser.arriveAndAwaitAdvance();
System.out.println("all of work finished");
}

static class Task implements Runnable {
private final Phaser phaser;

Task(Phaser phaser) {
this.phaser = phaser;
//动态增加
this.phaser.register();
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " is working");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//
phaser.arriveAndAwaitAdvance();
}
}
}

API

Modifier and Type Method and Description
int arrive() 抵达这个移相器,而不用等待别人到达。
int arriveAndAwaitAdvance() 到达这个移相器,等待其他人。
int arriveAndDeregister() 到达这个移相器并从其中注销,而无需等待别人到达。
int awaitAdvance(int phase) 等待该相位器的相位从给定相位值前进,如果当前相位不等于给定相位值,则立即返回,或者该相位器被终止。
int awaitAdvanceInterruptibly(int phase) 等待该移相器的阶段从给定的相位值推进,如果在等待时 InterruptedException则抛出 InterruptedException ,或者如果当前相位不等于给定的相位值或者该相位器被终止,则立即返回。
int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) 等待该移相器的阶段从给定的相位值或给定的超时时间 InterruptedException到等待时抛出 InterruptedException ,如果当前相位不等于给定的相位值,则立即返回,或者该相位器被终止。
int bulkRegister(int parties) 增加给定数量的新的有争议的派对到这个移相器。
void forceTermination() 强制此移相器进入终止状态。
int getArrivedParties() 返回在此移相器的当前阶段到达的已注册方的数量。
Phaser getParent() 返回此移相器的父级,如果没有,则返回 null
int getPhase() 返回当前相位数。
int getRegisteredParties() 返回在此移动设备上注册的各方数量。
Phaser getRoot() 返回此移相器的根祖先,如果它没有父代,则与该移相器相同。
int getUnarrivedParties() 返回尚未到达此移相器当前阶段的已注册方的数量。
boolean isTerminated() 返回 true如果移相器已被终止。
protected boolean onAdvance(int phase, int registeredParties) 在即将进行的相位提前执行动作的可覆盖方法,并控制终止。
int register() 添加一个新的unririved party到这个移相器。
String toString() 返回一个标识此移相器的字符串及其状态。

总结

大致归纳了一些常见的并发工具,当然只是浅显的记录了一下怎样使用,原理部分还没有深入的了解,后续肯定会去研究底层实现源码包括AQSLockSupport等等,其实越往后学就越想知道到底层是怎样个过程,到底是如何加锁如何解锁?CAS又起到了什么作用?Synchnorized底层又是如何实现?…学无止境啊. JUC除了这些并发工具外还有线程池阻塞队列 还没介绍,后面会继续介绍。