本文主要是根据华黎的《大型网站系统与Java中间件实践》和 Jason 的几篇博客,对 Java 并发方面的内容做的一些总结,会着重讲述并发方面一些常见的类、接口和方法。

多线程编程

对于多线程编程,线程安全是我们首先要考虑的问题,关于线程安全有三个核心概念:原子性、可见性和顺序性,这三个概念需要先理解清楚。

三个核心概念

原子性

与数据库中事务的原子性概念相似,即对于一个操作(有可能包含有多个子操作)要么全部执行,要么全部都不执行。

关于原子性,最经典的例子就是银行转账问题:比如A和B同时向C转账10万元。如果转账操作不具有原子性,A在向C转账时,读取了C的余额为20万,然后加上转账的10万,计算出此时应该有30万,但还未来及将30万写回C的账户,此时B的转账请求过来了,B发现C的余额为20万,然后将其加10万并写回。然后A的转账操作技术——将30万写回C的余额。这种情况下C的最终余额为30万,而非预期的40万。

可见性

可见性是指,当多个线程并发访问共享变量时,一个线程对共享变量的修改,其它线程能够立即看到。可见性问题是好多人忽略或者理解错误的一点。

CPU从主内存中读数据的效率相对来说不高,现在主流的计算机中,都有几级缓存。每个线程读取共享变量时,都会将该变量加载进其对应CPU的高速缓存里,修改该变量后,CPU会立即更新该缓存,但并不一定会立即将其写回主内存(实际上写回主内存的时间不可预期)。此时其它线程(尤其是不在同一个CPU上执行的线程)访问该变量时,从主内存中读到的就是旧的数据,而非第一个线程更新后的数据。

这一点是操作系统或者说是硬件层面的机制,所以很多应用开发人员经常会忽略。

顺序性

顺序性指的是,程序执行的顺序按照代码的先后顺序执行。

以下面这段代码为例

1
2
3
4
boolean started = false; // 语句1
long counter = 0L; // 语句2
counter = 1; // 语句3
started = true; // 语句4

从代码顺序上看,上面四条语句应该依次执行,但实际上JVM真正在执行这段代码时,并不保证它们一定完全按照此顺序执行。

处理器为了提高程序整体的执行效率,可能会对代码进行优化,其中的一项优化方式就是调整代码顺序,按照更高效的顺序执行代码。

讲到这里,有人要着急了——什么,CPU不按照我的代码顺序执行代码,那怎么保证得到我们想要的效果呢?实际上,大家大可放心,CPU虽然并不保证完全按照代码顺序执行,但它会保证程序最终的执行结果和代码顺序执行时的结果一致。

Java如何解决多线程并发问题

上面已经提出了这三个核心的概念,在 Java 多线程中,我们会经常遇到这三个概念引发的多线程并发问题,下面讲述一下 Java 如果解决这些问题。

Java如何保证原子性

锁和同步

常用的保证Java操作原子性的工具是同步方法(或者同步代码块)。使用锁,可以保证同一时间只有一个线程能拿到锁,也就保证了同一时间只有一个线程能执行申请锁和释放锁之间的代码。

1
2
3
4
5
6
7
8
9
public void testLock () {
lock.lock();
try{
int j = i;
i = j + 1;
} finally {
lock.unlock();
}
}

与锁类似的是同步方法或者同步代码块。使用非静态同步方法时,锁住的是当前实例;使用静态同步方法时,锁住的是该类的Class对象;使用静态代码块时,锁住的是synchronized关键字后面括号内的对象。下面是同步代码块示例

1
2
3
4
5
6
public void testLock () {
synchronized (anyObject){
int j = i;
i = j + 1;
}
}

无论使用锁还是synchronized,本质都是一样,通过锁来实现资源的排它性,从而实际目标代码段同一时间只会被一个线程执行,进而保证了目标代码段的原子性。这是一种以牺牲性能为代价的方法。(这一部分会后面详细讲述)

CAS(compare and swap)

基础类型变量自增(i++)是一种常被新手误以为是原子操作而实际不是的操作。Java中提供了对应的原子操作类来实现该操作,并保证原子性,其本质是利用了CPU级别的CAS指令。由于是CPU级别的指令,其开销比需要操作系统参与的锁的开销小。AtomicInteger使用方法如下。

1
2
3
4
5
6
7
8
AtomicInteger atomicInteger = new AtomicInteger();
for(int b = 0; b < numThreads; b++) {
new Thread(() -> {
for(int a = 0; a < iteration; a++) {
atomicInteger.incrementAndGet();
}
}).start();
}

Java如何保证可见性

Java提供了volatile关键字来保证可见性。当使用volatile修饰某个变量时,它会保证对该变量的修改会立即被更新到内存中,并且将其它缓存中对该变量的缓存设置成无效,因此其它线程需要读取该值时必须从主内存中读取,从而得到最新的值。

Java如何保证顺序性

上文讲过编译器和处理器对指令进行重新排序时,会保证重新排序后的执行结果和代码顺序执行的结果一致,所以重新排序过程并不会影响单线程程序的执行,却可能影响多线程程序并发执行的正确性。

Java中可通过volatile在一定程序上保证顺序性,另外还可以通过synchronized和锁来保证顺序性。

synchronized和锁保证顺序性的原理和保证原子性一样,都是通过保证同一时间只会有一个线程执行目标代码段来实现的。

除了从应用层面保证目标代码段执行的顺序性外,JVM还通过被称为happens-before原则隐式的保证顺序性。两个操作的执行顺序只要可以通过happens-before推导出来,则JVM会保证其顺序性,反之JVM对其顺序性不作任何保证,可对其进行任意必要的重新排序以获取高效率。

happens-before原则(先行发生原则)

  • 传递规则:如果操作1在操作2前面,而操作2在操作3前面,则操作1肯定会在操作3前发生。该规则说明了happens-before原则具有传递性
  • 锁定规则:一个unlock操作肯定会在后面对同一个锁的lock操作前发生。这个很好理解,锁只有被释放了才会被再次获取
  • volatile变量规则:对一个被volatile修饰的写操作先发生于后面对该变量的读操作
  • 程序次序规则:一个线程内,按照代码顺序执行
  • 线程启动规则:Thread对象的start()方法先发生于此线程的其它动作
  • 线程终结原则:线程的终止检测后发生于线程中其它的所有操作
  • 线程中断规则: 对线程interrupt()方法的调用先发生于对该中断异常的获取
  • 对象终结规则:一个对象构造先于它的finalize发生

线程池

多核时代,面向多核编程就非常重要了,基于 java 的并发和多线程开发非常重要。与其每次需要时都创建线程相比,线程池可以降低创建线程的开销,线程池在线程执行结束后进行的是回收操作,而不是真正的销毁线程。

线程池的好处:

  1. 降低资源消耗,通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
  2. 提高响应速度,当任务到达时,任务可以不需要等到线程创建就能立即执行;
  3. 提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Created by matt on 16/8/8.
*/
public class ThreadPoolExecutorDemo {
public static void threadPoolTest(int count) {
long startTime = System.currentTimeMillis();
final List<Integer> list = new LinkedList<Integer>();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(count));
final Random random = new Random();
for (int i = 0; i < count; i++) {
threadPoolExecutor.execute(new Runnable() {
public void run() {
list.add(random.nextInt());
}
});
}
threadPoolExecutor.shutdown();
try {
threadPoolExecutor.awaitTermination(1,TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println("ThreadPool demo runs "+count+ " times, the total time of spending is: "+(System.currentTimeMillis()-startTime));
System.out.println(list.size());
}

public static void threadTest(int count) {
long startTime = System.currentTimeMillis();
final List<Integer> list = new LinkedList<Integer>();
final Random random = new Random();
for (int i = 0; i < count; i++) {
Thread thread=new Thread(){
@Override
public void run(){
list.add(random.nextInt());
}
};
thread.start();
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Thread demo runs "+count+ " times, the total time of spending is: "+(System.currentTimeMillis()-startTime));
System.out.println(list.size());
}


public static void main(String[] args) {
int count=10000;
threadPoolTest(count);
threadTest(count);
}
}

输出结果:

1
2
3
4
ThreadPool demo runs 10000 times, the total time of spending is: 66
10000
Thread demo runs 10000 times, the total time of spending is: 1333
10000

从例子中,可以直接地看到,使用线程池能极大地提高程序的运行速度。

两种方式差别在于,使用线程池的方式是复用线程的,而不使用线程池的方式是每次都要创建线程的。不使用线程时消耗时间过多,主要是由于创建线程的开销占整个时间的比例比较大。还有另外两种线程池:

  • newFixedThreadPool创建一个指定工作线程数量的线程池(固定数量的线程 )。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。
  • newCachedThreadPool创建一个可缓存的线程池(线程数量根据任务数量动态变化 )。这种类型的线程池特点是:
    • 工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
    • 如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
    • 该方法返回的线程池是没有线程上限的,因为没有办法去控制总体的线程数量,而每个线程都是消耗内存的,这可能会导致过多的内存被占用。
  • newSingleThreadExecutor创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,如果这个线程异常结束,会有另一个取代它,保证顺序执行(我觉得这点是它的特色)。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的 。
  • newScheduleThreadPool创建一个定长的线程池,而且支持定时的以及周期性的任务执行,类似于Timer。(这种线程池原理暂还没完全了解透彻)

关于线程池内部原理部门可以看一下这两篇文章,未来也会把主要内容总结到博客里面

synchronized

每个Java对象都可以用做一个实现同步的互斥锁,这些锁被称为内置锁。线程进入同步代码块或方法时自动获得内置锁,退出同步代码块或方法时自动释放该内置锁。进入同步代码块或者同步方法是获得内置锁的唯一途径。

实例同步方法

synchronized用于修饰实例方法(非静态方法)时,执行该方法需要获得的是该类实例对象的内置锁(同一个类的不同实例拥有不同的内置锁)。如果多个实例方法都被synchronized修饰,则当多个线程调用同一实例的不同同步方法(或者同一方法)时,需要竞争锁。但当调用的是不同实例的方法时,并不需要竞争锁。

1
2
3
4
5
6
public class SynchronizedDemo1{
public synchronized void foo1(){
}
public synchronized void foo2(){
}
}

foo1()foo2()SynchronizedDemo1 的两个成员方法,在多线程编程中,调用同一个对象的 foo1() 或者 foo2()是互斥的,这是针对同一个对象的多线程方法调用互斥。

静态同步方法

synchronized用于修饰静态方法时,执行该方法需要获得的是该类的class对象的内置锁(一个类只有唯一一个class对象)。调用同一个类的不同静态同步方法时会产生锁竞争。

1
2
3
4
5
6
public class SynchronizedDemo2{
public static synchronized void foo3(){
}
public static synchronized void foo4(){
}
}

foo3()foo4()SynchronizedDemo2 类的两个静态方法。在不同的线程中,这两个方法的调用是互斥的,不仅它们之间,任何两个不同线程之间的调用也是互斥的。

同步代码块

synchronized用于修饰代码块时,进入同步代码块需要获得synchronized关键字后面括号内的对象(可以是实例对象也可以是class对象)的内置锁。

1
2
3
4
5
6
7
8
9
10
public class SynchronizedDemo3{
public void foo5(){
synchronized(this){
}
}
public void foo6(){
synchronized(SynchronizedDemo3.class){
}
}
}

在这个例子中,synchronized(this)SynchronizedDemo3中加synchronized的成员方法是互斥的,而synchronized(SynchronizedDemo3.class)SynchronizedDemo3synchronized的静态方法是互斥的。

synchronized用于修饰代码块会更加灵活,因为其后的参数可以是任意对象。

synchronized使用总结

锁的使用是为了操作临界资源的正确性,而往往一个方法中并非所有的代码都操作临界资源。换句话说,方法中的代码往往并不都需要同步。此时建议不使用同步方法,而使用同步代码块,只对操作临界资源的代码,也即需要同步的代码加锁。这样做的好处是,当一个线程在执行同步代码块时,其它线程仍然可以执行该方法内同步代码块以外的部分,充分发挥多线程并发的优势,从而相较于同步整个方法而言提升性能。

释放Java内置锁的唯一方式是synchronized方法或者代码块执行结束。若某一线程在synchronized方法或代码块内发生死锁,则对应的内置锁无法释放,其它线程也无法获取该内置锁(即进入跟该内置锁相关的synchronized方法或者代码块)。

ReentrantLock

ReentrantLock是java.util.concurrent.locks中的一个类,是从 JDK5开始加入的,与 synchronized 用法类似,不过它需要显式地进行 unlock。Java中的重入锁(即ReentrantLock)与Java内置锁一样,是一种排它锁。使用synchronized的地方一定可以用ReentrantLock代替。

重入锁需要显示请求获取锁,并显示释放锁。为了避免获得锁后,没有释放锁,而造成其它线程无法获得锁而造成死锁,一般建议将释放锁操作放在finally块里,如下所示。

1
2
3
4
5
6
try{
renentrantLock.lock();
// 用户操作
} finally {
renentrantLock.unlock();
}

如果重入锁已经被其它线程持有,则当前线程的lock操作会被阻塞。除了lock()方法之外,重入锁(或者说锁接口)还提供了其它获取锁的方法以实现不同的效果。

  1. lockInterruptibly():该方法尝试获取锁,若获取成功立即返回;若获取不成功则阻塞等待。与lock方法不同的是,在阻塞期间,如果当前线程被打断(interrupt)则该方法抛出InterruptedException。该方法提供了一种解除死锁的途径。
  2. tryLock():该方法试图获取锁,若该锁当前可用,则该方法立即获得锁并立即返回true;若锁当前不可用,则立即返回false。该方法不会阻塞,并提供给用户对于成功获利锁与获取锁失败进行不同操作的可能性。
  3. tryLock(long time, TimeUnit unit):该方法试图获得锁,若该锁当前可用,则立即获得锁并立即返回true。若锁当前不可用,则等待相应的时间(由该方法的两个参数决定):1)若该时间内锁可用,则获得锁,并返回true;2)若等待期间当前线程被打断,则抛出InterruptedException;3)若等待时间结束仍未获得锁,则返回false。

重入锁可定义为公平锁或非公平锁,默认实现为非公平锁。

  1. 公平锁是指多个线程获取锁被阻塞的情况下,锁变为可用时,最新申请锁的线程获得锁。可通过在重入锁(RenentrantLock)的构造方法中传入true构建公平锁,如Lock lock = new RenentrantLock(true)
  2. 非公平锁是指多个线程等待锁的情况下,锁变为可用状态时,哪个线程获得锁是随机的。synchonized相当于非公平锁。可通过在重入锁的构造方法中传入false或者使用无参构造方法构建非公平锁。效率相对高一点。

ReentrantReadWriteLock 读写锁

这个主要用于读多写少并且读不需要互斥的场景,这样场景使用读写锁会比使用全部互斥的锁性能高出很多,ReentrantReadWriteLock通过readLock()writeLock()两个方法获取读锁和写锁。

实际上,ReadWriteLock接口并非继承自Lock接口,ReentrantReadWriteLock也只实现了ReadWriteLock接口而未实现Lock接口。ReadLock()WriteLock(),是ReentrantReadWriteLock类的静态内部类,它们实现了Lock接口。

一个ReentrantReadWriteLock实例包含一个ReentrantReadWriteLock.ReadLock实例和一个ReentrantReadWriteLock.WriteLock实例。通过readLock()writeLock()方法可分别获得读锁实例和写锁实例,并通过Lock接口提供的获取锁方法获得对应的锁。

读写锁的锁定规则如下:

  • 获得读锁后,其它线程可获得读锁而不能获取写锁
  • 获得写锁后,其它线程既不能获得读锁也不能获得写锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.test.thread;

import java.util.Date;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockDemo {

public static void main(String[] args) {
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

new Thread(() -> {
readWriteLock.readLock().lock();
try {
System.out.println(new Date() + "\tThread 1 started with read lock");
try {
Thread.sleep(2000);
} catch (Exception ex) {
}
System.out.println(new Date() + "\tThread 1 ended");
} finally {
readWriteLock.readLock().unlock();
}
}).start();

new Thread(() -> {
readWriteLock.readLock().lock();
try {
System.out.println(new Date() + "\tThread 2 started with read lock");
try {
Thread.sleep(2000);
} catch (Exception ex) {
}
System.out.println(new Date() + "\tThread 2 ended");
} finally {
readWriteLock.readLock().unlock();
}
}).start();

new Thread(() -> {
Lock lock = readWriteLock.writeLock();
lock.lock();
try {
System.out.println(new Date() + "\tThread 3 started with write lock");
try {
Thread.sleep(2000);
} catch (Exception ex) {
ex.printStackTrace();
}
System.out.println(new Date() + "\tThread 3 ended");
} finally {
lock.unlock();
}
}).start();
}
}

输出:

1
2
3
4
5
6
Sat Jun 18 21:33:46 CST 2016  Thread 1 started with read lock
Sat Jun 18 21:33:46 CST 2016 Thread 2 started with read lock
Sat Jun 18 21:33:48 CST 2016 Thread 2 ended
Sat Jun 18 21:33:48 CST 2016 Thread 1 ended
Sat Jun 18 21:33:48 CST 2016 Thread 3 started with write lock
Sat Jun 18 21:33:50 CST 2016 Thread 3 ended

从上面的执行结果可见,thread 1和thread 2都只需获得读锁,因此它们可以并行执行。而thread 3因为需要获取写锁,必须等到thread 1和thread 2释放锁后才能获得锁。

volatitle

synchronized保证了一个线程中变量的可见性,而volatile则是保证了所修饰变量的可见性(可见性可以参考前面所述)。volatile是轻量级的实现变量可见性的方法,其具体使用也很简单。

对于同一个变量线程间的可见性与多个线程中操作互斥是两件事情,操作互斥是提供了操作整体的原子性,下面通过一个例子来看。

对于读操作来说,示例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int i1;
public int getI1(){
return i1;
}

volatile int i2;
public int getI2(){
return i2;
}

int i3;
public synchronized int getI3(){
return i3;
}

分析一下这三种情况:

  1. getI1():该方法调用获取的是当前线程中的副本,这个值不一定是最新的值;
  2. getI2():因为 i2 是被volatile修饰,因此对于 JVM 来说,这个变量不会又线程的本地副本,只会放在主存中,所以得到的值一定是最新的;
  3. getI3():因为有synchronized关键字修饰,保证了线程的本地副本与主存的同步,所以也会得到最新的值。

再对比一下它们的写操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int i1;
public void setI1(int i) {
i1 = i;
}

volatile int i2;
public void setI2(int i) {
i2 = i;
}

int i3;
public synchronized void setI3(int i) {
i3 = 1;
}

分析一下这三种情况:

  1. setI1():当前线程调用之后会得到最新的 i1 值,而另外的线程获取不一定可以立刻看到最新而值;
  2. setI2():可以立刻在其他线程看到新的值,因为volatile保证了只有一份主存中的数据;
  3. setI3():调用后必须在synchronized修饰的方法或代码中读取 i3 的值才可以看到最新值,因为synchronized不仅会把当前线程修改的本地副本同步给主存,还会从主存读取数据更新本地副本。

volatile适用场景

因为volatile只是保证了同一个变量在多线程中的可见性,所以它更多是用于修饰作为开关状态的变量。

volatile适用于不需要保证原子性,但却需要保证可见性的场景。一种典型的使用场景是用它修饰用于停止线程的状态标记。如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
boolean isRunning = true;

public void start () {
new Thread( () -> {
while(isRunning) {
someOperation();
}
}).start();
}

public void stop () {
isRunning = false;
}

在这种实现方式下,即使其它线程通过调用stop()方法将isRunning设置为false,循环也不一定会立即结束。可以通过volatile关键字,保证while循环及时得到isRunning最新的状态从而及时停止循环,结束线程。

Atomics

在 JDK5 中增加了java.util.concurrent.atomic包,这个包是一些以Atomic开头的类,这些类主要提供一些相关的原子操作。

AtomicInteger为例来看一个多线程计数器的场景,场景很简单,就是让多个线程都对计数器进行加1操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Counter1 {
private int counter = 0;

public int increase() {
synchronized (this) {
counter = counter + 1;
return counter;
}
}

public int decrease() {
synchronized (this) {
counter = counter - 1;
return counter;
}
}
}

在采用了AtomicInteger之后,代码就会变成下面这个样子:

1
2
3
4
5
6
7
8
9
10
11
public class Counter2 {
private AtomicInteger counter = new AtomicInteger();

public int increase() {
return counter.incrementAndGet();
}

public int decrease() {
return counter.decrementAndGet();
}
}

采用AtomicInteger之后代码变得简洁了,更重要的是性能得到了提升,而且还比较明显的提升,原因是AtomicInteger内部通过 JNI 的方式使用了硬件支持的 CAS 指令。

wait、notify 和 notifyAll

wait、notify 和 notifyAll 是 java Object 对象上的三个方法,也就是所有的Java类都可以调用这三个方法。

在多线程情况下,可以把某个对象作为事件对象,通过这个对象的 wait、notify 和 notifyAll方法来完成线程间的状态通知,三个方法的作用如下:

  • wait:是当前线程进行等待;
  • notify:是唤醒同一个对象 wait 方法的线程,但是只是唤醒一个等待线程;
  • notifyAll:是唤醒同一个对象 wait 方法的线程,唤醒所有的等待线程。

注意:

wait方法需要释放锁,前提条件是它已经持有锁。所以wait和notify(或者notifyAll)方法都必须被包裹在synchronized语句块中,并且synchronized后锁的对象应该与调用wait方法的对象一样。否则抛出IllegalMonitorStateException.

wait 与 sleep 的区别

  • wait:它是在当前线程持有 wait 对象锁的情况下,暂时放弃锁,并让出 CPU 资源,并积极等待其它线程调用同一对象的 notify 或者 notifyAll 方法。换言之,即使notify被调用,但只要锁没有被释放,原等待线程因为未获得锁仍然无法继续执行。
  • sleep:它告诉操作系统至少指定时间内不需为线程调度器为该线程分配执行时间片,并不释放锁(如果当前已经持有锁)。

线程间通信

CountDownLatch

CountDownLatchjava.util.concurrent包中的一个类,CountDownLatch主要提供的机制是当多个线程达到了预期状态或完成预期工作时触发事件,其他线程可以等待这个事件来触发自己后续的工作。需要注意的是,等待线程可以是多个,即 CountDownLatch 是可以唤醒多个等待的线程的。达到自己预期状态的线程会调用CountDownLatchcountDown方法,而等待线程会调用CountDownLatchwait方法。

如果CountDownLatch初始化的 count 值为1,那么这就变成了单一事件了,即由一个线程来通知其他线程,效果等同于对象的waitnotifyAll。count 值大于1是常用的方式,目的是让多个线程达到各自的预期状态,变为一个事件进行通知,线程则继续自己的行为。

CountDownLatch适用场景

Java多线程编程中经常会碰到这样一种场景——某个线程需要等待一个或多个线程操作结束(或达到某种状态)才开始执行。比如开发一个并发测试工具时,主线程需要等到所有测试线程均执行完成再开始统计总共耗费的时间,此时可以通过CountDownLatch轻松实现。

CountDownLatch实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package countdownlatch;

import java.util.Date;
import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int totalThread = 3;
long start = System.currentTimeMillis();
final CountDownLatch countDown = new CountDownLatch(totalThread);
for (int i = 0; i < totalThread; i++) {
final String threadName = "Thread " + i;
Thread thread=new Thread() {
public void run() {
System.out.println(String.format("%s\t%s %s", new Date(), threadName, "started"));
try {
Thread.sleep(1000);
} catch (Exception ex) {
ex.printStackTrace();
}
countDown.countDown();
System.out.println(String.format("%s\t%s %s", new Date(), threadName, "ended"));
}
};
thread.start();
}
countDown.await();
long stop = System.currentTimeMillis();
System.out.println(String.format("Total time : %sms", (stop - start)));
}
}

执行结果

1
2
3
4
5
6
7
Tue Aug 09 14:44:19 CST 2016	Thread 0 started
Tue Aug 09 14:44:19 CST 2016 Thread 2 started
Tue Aug 09 14:44:19 CST 2016 Thread 1 started
Tue Aug 09 14:44:20 CST 2016 Thread 2 ended
Tue Aug 09 14:44:20 CST 2016 Thread 1 ended
Tue Aug 09 14:44:20 CST 2016 Thread 0 ended
Total time : 1029ms

可以看到,主线程等待所有3个线程都执行结束后才开始执行。

CountDownLatch主要接口分析

CountDownLatch工作原理相对简单,可以简单看成一个倒计时器,在构造方法中指定初始值,每次调用countDown()方法时讲计数器减1,而await()会等待计数器变为0。CountDownLatch关键接口如下

  • countDown() 如果当前计数器的值大于1,则将其减1;若当前值为1,则将其置为0并唤醒所有通过await等待的线程;若当前值为0,则什么也不做直接返回。
  • await() 等待计数器的值为0,若计数器的值为0则该方法返回;若等待期间该线程被中断,则抛出InterruptedException并清除该线程的中断状态。
  • await(long timeout, TimeUnit unit) 在指定的时间内等待计数器的值为0,若在指定时间内计数器的值变为0,则该方法返回true;若指定时间内计数器的值仍未变为0,则返回false;若指定时间内计数器的值变为0之前当前线程被中断,则抛出InterruptedException并清除该线程的中断状态。
  • getCount() 读取当前计数器的值,一般用于调试或者测试。

CyclicBarrier

CyclicBarrier适用场景

CyclicBarrier,从字面理解是指循环屏障,CyclicBarrier可以在构造时指定需要在屏障前执行await的个数,所有对await的调用都会等待,只到调用await的次数达到预定指,所有等待都会立即被唤醒。

从使用场景上来说,CyclicBarrier是让多个线程互相等待某一事件的发生,然后同时被唤醒。而上文讲的CountDownLatch是让某一线程等待多个线程的状态,然后该线程被唤醒。

CyclicBarrier实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package cyclicbarrier;

import java.util.Date;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {

public static void main(String[] args) {
int totalThread = 5;
final CyclicBarrier barrier = new CyclicBarrier(totalThread);

for (int i = 0; i < totalThread; i++) {
final String threadName = "Thread " + i;
Thread thread=new Thread(){
@Override
public void run() {
System.out.println(String.format("%s\t%s %s", new Date(), threadName, " is waiting"));
try {
barrier.await();
} catch (Exception ex) {
ex.printStackTrace();
}
System.out.println(String.format("%s\t%s %s", new Date(), threadName, "ended"));
}
};
thread.start();
}
}
}

执行结果如下

1
2
3
4
5
6
7
8
9
10
Tue Aug 09 18:54:39 CST 2016	Thread 4  is waiting
Tue Aug 09 18:54:39 CST 2016 Thread 0 is waiting
Tue Aug 09 18:54:39 CST 2016 Thread 3 is waiting
Tue Aug 09 18:54:39 CST 2016 Thread 2 is waiting
Tue Aug 09 18:54:39 CST 2016 Thread 1 is waiting
Tue Aug 09 18:54:39 CST 2016 Thread 4 ended
Tue Aug 09 18:54:39 CST 2016 Thread 1 ended
Tue Aug 09 18:54:39 CST 2016 Thread 2 ended
Tue Aug 09 18:54:39 CST 2016 Thread 0 ended
Tue Aug 09 18:54:39 CST 2016 Thread 3 ended

从执行结果可以看到,每个线程都不会在其它所有线程执行await()方法前继续执行,而等所有线程都执行await()方法后所有线程的等待都被唤醒从而继续执行。

CyclicBarrier主要接口分析

CyclicBarrier提供的关键方法如下

  • await():等待其它参与方的到来(调用await())。如果当前调用是最后一个调用,则唤醒所有其它的线程的等待并且如果在构造CyclicBarrier时指定了action,当前线程会去执行该action,然后该方法返回该线程调用await的次序(getParties()-1说明该线程是第一个调用await的,0说明该线程是最后一个执行await的),接着该线程继续执行await后的代码;如果该调用不是最后一个调用,则阻塞等待;如果等待过程中,当前线程被中断,则抛出InterruptedException;如果等待过程中,其它等待的线程被中断,或者其它线程等待超时,或者该barrier被reset,或者当前线程在执行barrier构造时注册的action时因为抛出异常而失败,则抛出BrokenBarrierException。
  • await(long timeout, TimeUnit unit):与await()唯一的不同点在于设置了等待超时时间,等待超时时会抛出TimeoutException。
  • reset():该方法会将该barrier重置为它的初始状态,并使得所有对该barrier的await调用抛出BrokenBarrierException。

CountDownLatch 与 CyclicBarrier

CountDownLatchCyclicBarrier 都是用于多个线程间的协调,二者的一个差别是:

  1. CountDownLatch:它是在多个线程都进行了latch.countDown后才会触发事件,唤醒await在 latch 上的线程,而执行countDown的线程,执行完countDown后继续进行自己的工作,也就是说,countDown的线程会继续执行,而唤醒的是await的线程;
  2. CyclicBarrier:它是一个栅栏,用于同步所有调用await方法的线程,并且等待所有线程都到了await方法时,这些线程才一起返回继续各自的工作,因为使用CyclicBarrier的线程都会阻塞在await方法上,所以在线程池中使用CyclicBarrier时要特别小心,如果线程池的线程数过少,那么很容易发生死锁。

Phaser

Phaser适用场景

CountDownLatchCyclicBarrier都是JDK 1.5引入的,而Phaser是JDK 1.7引入的。Phaser的功能与CountDownLatchCyclicBarrier有部分重叠,同时也提供了更丰富的语义和更灵活的用法。

Phaser顾名思义,与阶段相关。Phaser比较适合这样一种场景,一种任务可以分为多个阶段,现希望多个线程去处理该批任务,对于每个阶段,多个线程可以并发进行,但是希望保证只有前面一个阶段的任务完成之后才能开始后面的任务。这种场景可以使用多个CyclicBarrier来实现,每个CyclicBarrier负责等待一个阶段的任务全部完成。但是使用CyclicBarrier的缺点在于,需要明确知道总共有多少个阶段,同时并行的任务数需要提前预定义好,且无法动态修改。而Phaser可同时解决这两个问题。

Phaser实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package phaser;

import java.io.IOException;
import java.util.concurrent.Phaser;

public class PhaserDemo {

public static void main(String[] args) throws IOException {
int parties = 3;
final int phases = 4;
final Phaser phaser = new Phaser(parties) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("====== Phase : " + phase + " ======");
return registeredParties == 0;
}
};

for (int i = 0; i < parties; i++) {
final int threadId = i;
final Thread thread = new Thread() {
public void run() {
for (int phase = 0; phase < phases; phase++) {
System.out.println(String.format("Thread %s, phase %s", threadId, phase));
phaser.arriveAndAwaitAdvance();
}
}

};
thread.start();
}
}
}

执行结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Thread 1, phase 0
Thread 2, phase 0
Thread 0, phase 0
====== Phase : 0 ======
Thread 0, phase 1
Thread 2, phase 1
Thread 1, phase 1
====== Phase : 1 ======
Thread 1, phase 2
Thread 2, phase 2
Thread 0, phase 2
====== Phase : 2 ======
Thread 1, phase 3
Thread 2, phase 3
Thread 0, phase 3
====== Phase : 3 ======

从上面的结果可以看到,多个线程必须等到其它线程的同一阶段的任务全部完成才能进行到下一个阶段,并且每当完成某一阶段任务时,Phaser都会执行其onAdvance方法。

Phaser主要接口分析

Phaser主要接口如下

  • arriveAndAwaitAdvance():当前线程当前阶段执行完毕,等待其它线程完成当前阶段。如果当前线程是该阶段最后一个未到达的,则该方法直接返回下一个阶段的序号(阶段序号从0开始),同时其它线程的该方法也返回下一个阶段的序号。
  • arriveAndDeregister():该方法立即返回下一阶段的序号,并且其它线程需要等待的个数减一,并且把当前线程从之后需要等待的成员中移除。如果该Phaser是另外一个Phaser的子Phaser(层次化Phaser会在后文中讲到),并且该操作导致当前Phaser的成员数为0,则该操作也会将当前Phaser从其父Phaser中移除。
  • arrive():该方法不作任何等待,直接返回下一阶段的序号。
  • awaitAdvance(int phase):该方法等待某一阶段执行完毕。如果当前阶段不等于指定的阶段或者该Phaser已经被终止,则立即返回。该阶段数一般由arrive()方法或者arriveAndDeregister()方法返回。返回下一阶段的序号,或者返回参数指定的值(如果该参数为负数),或者直接返回当前阶段序号(如果当前Phaser已经被终止)。
  • awaitAdvanceInterruptibly(int phase):效果与awaitAdvance(int phase)相当,唯一的不同在于若该线程在该方法等待时被中断,则该方法抛出InterruptedException。
  • awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit):效果与awaitAdvanceInterruptibly(int phase)相当,区别在于如果超时则抛出TimeoutException。
  • bulkRegister(int parties):注册多个party。如果当前phaser已经被终止,则该方法无效,并返回负数。如果调用该方法时,onAdvance方法正在执行,则该方法等待其执行完毕。如果该Phaser有父Phaser则指定的party数大于0,且之前该Phaser的party数为0,那么该Phaser会被注册到其父Phaser中。
  • forceTermination():强制让该Phaser进入终止状态。已经注册的party数不受影响。如果该Phaser有子Phaser,则其所有的子Phaser均进入终止状态。如果该Phaser已经处于终止状态,该方法调用不造成任何影响。

信号量 Semaphore

信号量维护一个许可集,构造时需要传入参数,总数就是控制并发的数量,在执行可通过acquire()获取许可(如果acquire 成功返回,Semaphore 可用的信号量就会减少一个,若无可用许可acquire 就会阻塞,等待有 release 释放信号后,acquire 才会得到信号并返回),通过release()释放许可,从而可能唤醒一个阻塞等待许可的线程。

与互斥锁类似,信号量限制了同一时间访问临界资源的线程的个数,并且信号量也分公平信号量非公平信号量。而不同的是,互斥锁保证同一时间只会有一个线程访问临界资源,而信号量可以允许同一时间多个线程访问特定资源。所以信号量并不能保证原子性。

信号量的一个典型使用场景是限制系统访问量。每个请求进来后,处理之前都通过acquire获取许可,若获取许可成功则处理该请求,若获取失败则等待处理或者直接不处理该请求。

信号量的使用方法

  • acquire(int permits):申请permits(必须为非负数)个许可,若获取成功,则该方法返回并且当前可用许可数减permits;若当前可用许可数少于permits指定的个数,则继续等待可用许可数大于等于permits;若等待过程中当前线程被中断,则抛出InterruptedException;
  • acquire():等价于acquire(1);
  • acquireUninterruptibly(int permits):申请permits(必须为非负数)个许可,若获取成功,则该方法返回并且当前可用许可数减permits;若当前许可数少于permits,则继续等待可用许可数大于等于permits;若等待过程中当前线程被中断,继续等待可用许可数大于等于permits,并且获取成功后设置线程中断状态;
  • acquireUninterruptibly():等价于acquireUninterruptibly(1);
  • drainPermits():获取所有可用许可,并返回获取到的许可个数,该方法不阻塞;
  • tryAcquire(int permits):尝试获取permits个可用许可,如果当前许可个数大于等于permits,则返回true并且可用许可数减permits;否则返回false并且可用许可数不变;
  • tryAcquire():等价于tryAcquire(1);
  • tryAcquire(int permits, long timeout, TimeUnit unit):尝试获取permits(必须为非负数)个许可,若在指定时间内获取成功则返回true并且可用许可数减permits;若指定时间内当前线程被中断,则抛出InterruptedException;若指定时间内可用许可数均小于permits,则返回false;
  • tryAcquire(long timeout, TimeUnit unit):等价于tryAcquire(1, long timeout, TimeUnit unit);
  • release(int permits):释放permits个许可,该方法不阻塞并且某线程调用release方法前并不需要先调用acquire方法;
  • release():等价于release(1)。

注意:与wait/notifyawait/signal不同,acquire/release完全与锁无关,因此acquire等待过程中,可用许可满足要求时acquire可立即返回,而不用像锁的wait和条件变量的await那样重新获取锁才能返回。或者可以理解成,只要可用许可满足需求,就已经获得了锁。

如果Semaphore管理的信号量只有1个,那么就是互斥锁了;如果多于1个信号量,则主要用于控制并发数。

Exchanger

Exchanger从名字上来看,就是交换的意思,Exchanger用于在两个线程之间进行数据交换,线程会阻塞在Exchanger的exchange方法上,直到另外一个线程也到了同一个Exchanger的exchange方法时,二者进行交换,然后两个线程会继续执行自身相关的代码。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package exchanger;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;

public class ExchangerDemo {
public static void main(String[] args) {
final Exchanger<List<Integer>> exchanger = new Exchanger<List<Integer>>();
new Thread() {
@Override
public void run() {
List<Integer> list = new ArrayList<Integer>(2);
list.add(1);
list.add(2);
try {
list = exchanger.exchange(list);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread1" + list);
}
}.start();
new Thread(){
@Override
public void run(){
List<Integer> list=new ArrayList<Integer>(2);
list.add(4);
list.add(5);
try {
list=exchanger.exchange(list);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread2" + list);
}
}.start();
}
}

运行结果

1
2
Thread2[1, 2]
Thread1[4, 5]

Future 和 Future Task

Future是一个接口,Future Task是一个具体实现类。

在实际开发的环境中,我们经常会遇到这样一种场景中,在一个函数中我们调用了一个函数,正常情况下,程序会在理阻塞,知道调用函数返回结果,而很多情况下返回的结果我们并不会马上使用,这样的话就浪费很多时间。我们期待的情况是:调用函数后马上返回,然后继续向下执行,等需要用数据时再来用,或者说再来等待这个数据,具体的实现方式有两种方式,一个是用Future,一个是用回调函数。

1
2
3
Future<HashMap> future = getDataFromRemote2();
// do something
HashMap data = (HashMap) future.get();

可以看到,我们调用的方式返回的是一个 Future 对象,然后接着进行自己的处理,后面通过future.get()来获得真正的返回值。也就说,在调用了getDataFromRemote2后,就已经启动了对远程计算结果的获取,同时自己的线程还在继续处理,直到需要时再获取数据。我们先看一下getDataFromRemote2的实现:

1
2
3
4
5
6
7
private Future<HashMap> getDataFromRemote2(){
return threadPool.submit(new Callback<HashMap>(){
public HashMap call() throws Exception{
returngetDataFromRemote();
}
});
}

getDataFromRemote()方法是从远程获取一些计算结果

1
HashMap getDataFromRemote();

getDataFromRemote2中使用了getDataFromRemote来完成具体操作,并且使用到了线程池,把任务添加到线程池中,把 Future 对象返回出去。我们调用了getDataFromRemote2的线程,然后回来继续下面的执行,而背后是另外的线程在进行远程调用及等待的工作。

回调函数

参考回调函数(callback)是什么?。调用回调函数的函数这里称作中间函数,而调用中间函数的函数我们成为起始函数。回调函数是作为函数的参数传入到中间函数中,中间函数在运行时,在需要调用这个函数的地方就调用回调函数,并将结果返回给中间函数,中间函数再把处理后的结果返回给起始函数。

回调实际上有两种:阻塞式回调和延迟式回调。

  • 阻塞式回调里,回调函数的调用一定发生在起始函数返回之前;
  • 延迟式回调里,回调函数的调用有可能是在起始函数返回之后。

一般使用的回调函数都是阻塞式回调,而延迟式回调通常牵扯到多线程。


参考: