并发编程: 共享模型

张天宇 on 2020-06-24

Java并发编程第三篇,共享模型剩余部分内容。

1. Java内存模型

Java Memory Model,它定义了主存、工作内存抽象概念,底层对应着CPU寄存器、缓存、硬件内存、CPU指令优化等。

  • 原子性 - 保证指令不会受到线程上下文切换的影响
  • 可见性 - 保证指令不会受CPU缓存的影响
  • 有序性 - 保证指令不会受CPU指令并行优化的影响

2. 可见性

问题

1
2
3
4
5
6
7
8
9
10
11
static boolean run = true;
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(()->{
while(run){
// ....
}
});
t.start();
sleep(1);
run = false; // 线程t不会如预想的停下来
}
  • 初始状态,t线程刚开始从主内存读取了run的值到工作内存
  • 因为t线程要频繁从主内存中读取run的值,JIT编译器会将run的值缓存到自己的工作内存的高速缓存中,减少对主存中run的访问,提高效率
  • 1秒后,main线程修改了run的值,并同步至内存,而t是从自己工作内存中的高速缓存中读取这个变量的值,结果永远是旧值

解决

1
volatile static boolean run = true;	// 必须去主存得值

volatile易变关键字,它可以用来修饰成员变量和静态成员变量,可以避免线程从自己得工作缓存中查找变量的值,必须到主存中获取它的值,线程操作volatile变量都是直接操作主存。

可见性VS原子性

synchronized语句块既可以保证代码块的原子性,也同时保证代码内变量的可见性,但缺点是属于重量级操作,性能相对较低。

终止模式之两阶段终止模式

这次利用volatile实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 停止标记用 volatile 是为了保证该变量在多个线程之间的可见性
// 我们的例子中,即主线程把它修改为 true 对 t1 线程可见
class TPTVolatile {
private Thread thread;
private volatile boolean stop = false;
public void start(){
thread = new Thread(() -> {
while(true) {
Thread current = Thread.currentThread();
if(stop) {
log.debug("料理后事");
break;
}
try {
Thread.sleep(1000);
log.debug("将结果保存");
} catch (InterruptedException e) {
}
// 执行监控操作
}
},"监控线程");
thread.start();
}
public void stop() {
stop = true;
thread.interrupt(); // 不让它停止一秒,直接出去
}
}
// 测试
TPTVolatile t = new TPTVolatile();
t.start();
Thread.sleep(3500);
log.debug("stop");
t.stop();
/**
11:54:52.003 c.TPTVolatile [监控线程] - 将结果保存
11:54:53.006 c.TPTVolatile [监控线程] - 将结果保存
11:54:54.007 c.TPTVolatile [监控线程] - 将结果保存
11:54:54.502 c.TestTwoPhaseTermination [main] - stop
11:54:54.502 c.TPTVolatile [监控线程] - 料理后事
**/

同步模式之Balking模式

Balking犹豫模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class MonitorService {
// 用来表示是否已经有线程已经在执行启动了
private volatile boolean starting;
public void start() {
log.info("尝试启动监控线程...");
synchronized (this) {
if (starting) {
return;
}
starting = true;
}
// 真正启动监控线程...
}
}

它还经常用来实现线程安全的单例。

1
2
3
4
5
6
7
8
9
10
11
12
public final class Singleton {
private Singleton() {
}
private static Singleton INSTANCE = null;
public static synchronized Singleton getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}

保护性暂停模式是用在一个线程等待另一个线程的执行结果,当条件不满足时,线程等待。

3. 有序性

多线程下指令重排会影响正确性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 该程序因为线程时间片的原因出现多种结果
int num = 0;
boolean ready = false;
// 线程1 执行此方法
public void actor1(I_Result r) {
if(ready) {
r.r1 = num + num;
} else {
r.r1 = 1;
}
}
// 线程2 执行此方法
public void actor2(I_Result r) {
num = 2;
ready = true;
}

禁用指令重排。

1
volatile boolean ready = false;	// 加在ready即可,写屏障可以防止之前的代码重排序

volatile原理

底层实现是内存屏障的技术,Memory Barrier(Memory Fence)。对volatile变量的写指令后会加入写屏障,对volatile变量的读指令前会加入读屏障。

保证可见性
  • 写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中。

    1
    2
    3
    4
    5
    public void actor(I_Result r){
    num = 2;
    ready = true; // ready是volatile赋值带写屏障
    // 写屏障
    }
  • 而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新的数据。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public void actor(I_Result r){
    //读屏障
    // ready是volatile读取值带读屏障
    if (ready){
    r.r1 = num + num;
    } else {
    r.r1 = 1;
    }
    }
保证有序性
  • 写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后。

    1
    2
    3
    4
    5
    public void actor(I_Result r){
    num = 2;
    ready = true; // ready是volatile赋值带写屏障
    // 写屏障
    }
  • 读屏障会确保指令重排序时候,不会将读屏障之后的代码排在读屏障之前

  • 但是不能解决指令交错

    • 写屏障只是保证之后的读能读到最新的结果,但不能保证读跑到他前面去
    • 有序性的保证也只是保证了本线程的相关代码不被重排序
double-checked locking问题

以double-cheaked locking为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final class Singleton{
private Singleton{}
private static Singleton INSTANCE = null;
public static Singleton getInstance(){
if (INSTANCE == null){
synchronized(Singleton.class){
if (INSTANCE == null){
INSTANCE = new Singleton();
}
}
}
return INSTANCE;
}
}

以上实现的特点是:

  • 懒惰实例化
  • 首次使用getInstance()才会使用synchronized加锁,后续使用时候无需加锁
  • 隐含的一点是,第一个if使用了INSTANCE变量,是在同步块之外的

问题出现:

INSTANCE = new Singleton()不是一个原子操作,他可能会出现,先赋值再调用构造方法的可能。因此如果要在构造方法中执行很多初始化操作,那么t2拿到的将是一个未初始化完毕的单例。

解决:

1
private static volatile Singleton INSTANCE = null;
happens-before规则

可参见虚拟机部分博客

习题

balking模式

以下代码想让doInit()仅被调用一次,是否有问题?

1
2
3
4
5
6
7
8
9
10
11
12
public class TestVolatile {
volatile boolean initialized = false;
void init() {
if (initialized) {
return;
}
doInit();
initialized = true;
}
private void doInit() {
}
}

问题:

涉及对共享变量的读写,init()不是原子操作,容易导致:线程1进入doInit()操作时,由于整个函数并不互斥,这时线程2进行判断发现未初始化,接下来也会进入doInit()操作。

volatile主要适用于,一个线程写,其他线程读的情况;double-checked locking时,保证synchronized代码块外的指令重排序问题。

解决:

1
synchronized void init() {

线程安全单例

单例模式有很多实现方法,懒汉、饿汉、静态内部类、枚举类

  • 饿汉式:类加载就会导致该单实例对象被创建
  • 懒汉式:类加载不会导致该单实例对象被创建,而是首次使用该对象时才会创建
实现1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 问题1:为什么加 final
// 担心将来它有子类,而子类中一些方法可能会破坏单例实现
// 问题2:如果实现了序列化接口, 还要做什么来防止反序列化破坏单例
// 增加public Object readResolve()方法,在反序列化创建对象时候,发现readResolve后就会使用函数返回的对象
public final class Singleton implements Serializable {
// 问题3:为什么设置为私有? 是否能防止反射创建新的实例?
// 防止别的类继续创建对象,不能防止反射创建新的实例
private Singleton() {}
// 问题4:这样初始化是否能保证单例对象创建时的线程安全?
// 静态变量赋值在类加载阶段完成,由JVM保证线程安全
private static final Singleton INSTANCE = new Singleton();
// 问题5:为什么提供静态方法而不是直接将 INSTANCE 设置为 public, 说出你知道的理由
// 提供更好的封装性,比如提供懒惰的初始化,对单例对象有更多的控制,还可以提供泛型的支持
public static Singleton getInstance() {
return INSTANCE;
}
public Object readResolve() {
return INSTANCE;
}
}
实现2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 问题1:枚举单例是如何限制实例个数的
// 实例相当于枚举类的静态成员变量
// 问题2:枚举单例在创建时是否有并发问题
// 没有,静态成员变量线程安全性是在类加载阶段完成的
// 问题3:枚举单例能否被反射破坏单例
// 不能
// 问题4:枚举单例能否被反序列化破坏单例
// Enum实现了序列化接口,但可以避免反序列破坏单例
// 问题5:枚举单例属于懒汉式还是饿汉式
// 饿汉式
// 问题6:枚举单例如果希望加入一些单例创建时的初始化逻辑该如何做
// 加一个构造方法即可
enum Singleton {
INSTANCE;
}
实现3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 懒汉式,犹豫模式
public final class Singleton {
private Singleton() { }
private static Singleton INSTANCE = null;
// 分析这里的线程安全, 并说明有什么缺点
// 保证了多线程下的线程安全,但是锁的范围有点大,每次调用都会加锁,导致性能比较低
public static synchronized Singleton getInstance() {
if( INSTANCE != null ){
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
实现4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// dcl,对实现3的改进
public final class Singleton {
private Singleton() { }
// 问题1:解释为什么要加 volatile?
// 因为synchronized中的指令还是会被重排序的(如果变量完全被synchronized管理可能就不会),防止拿到不完整的对象
private static volatile Singleton INSTANCE = null;
// 问题2:对比实现3, 说出这样做的意义
// 后续调用时候不涉及锁,性能比较优越一些
public static Singleton getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
synchronized (Singleton.class) {
// 问题3:为什么还要在这里加为空判断, 之前不是判断过了吗
// 为了防止首次创建的时候的多个线程并发问题,比如t1执行到下面的构造方法时,t2来到上面的if会判断为空,等待t1释放后再进入,此时t1完成的初始化赋值,因此结果不准了需要再次判断
if (INSTANCE != null) {
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
}
实现5
1
2
3
4
5
6
7
8
9
10
11
12
13
public final class Singleton {
private Singleton() { }
// 问题1:属于懒汉式还是饿汉式
// 因为类加载是懒惰的,所以属于懒汉式
private static class LazyHolder {
static final Singleton INSTANCE = new Singleton();
}
// 问题2:在创建时是否有并发问题
// JVM管理,不存在,是一个比较推荐的单例实现模式
public static Singleton getInstance() {
return LazyHolder.INSTANCE;
}
}

4. 共享模型之无锁

问题

账户转账类,会在多线程执行时,指令交错从而导致结果不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
interface Account {
// 获取余额
Integer getBalance();
// 取款
void withdraw(Integer amount);
/**
* 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
* 如果初始余额为 10000 那么正确的结果应当是 0
*/
static void demo(Account account) {
List<Thread> ts = new ArrayList<>();
long start = System.nanoTime();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
account.withdraw(10);
}));
}
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(account.getBalance()
+ " cost: " + (end-start)/1000_000 + " ms");
}
}
class AccountUnsafe implements Account {
private Integer balance;
public AccountUnsafe(Integer balance) {
this.balance = balance;
}
@Override
public Integer getBalance() {
return balance;
}
@Override
public void withdraw(Integer amount) {
balance -= amount;
}
}
// 测试代码
public static void main(String[] args) {
Account.demo(new AccountUnsafe(10000));
}

解决思路

  • 使用锁解决,给Account加锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    class AccountUnsafe implements Account {
    private Integer balance;
    public AccountUnsafe(Integer balance) {
    this.balance = balance;
    }
    @Override
    public synchronized Integer getBalance() {
    return balance;
    }
    @Override
    public synchronized void withdraw(Integer amount) {
    balance -= amount;
    }
    }
  • 无锁CAS解决

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    class AccountSafe implements Account {
    private AtomicInteger balance; // 原子类
    public AccountSafe(Integer balance) {
    this.balance = new AtomicInteger(balance);
    }
    @Override
    public Integer getBalance() {
    return balance.get();
    }
    @Override
    public void withdraw(Integer amount) {
    while (true) {
    int prev = balance.get();
    int next = prev - amount;
    if (balance.compareAndSet(prev, next)) { // 等待prev和当前值一致
    break;
    }
    }
    }
    }
    // 执行测试方法
    public static void main(String[] args) {
    Account.demo(new AccountSafe(10000));
    }

CAS与Volatile

Volatile

获取共享变量时,为了保证该变量的可见性,需要Volatile修饰。

它可以用来修饰成员变量和静态成员变量,它可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作Volatile变量都是直接操作主存,即一个线程对Volatile变量的修改,对另一个线程可见。

Volatile仅仅保证了共享变量的可见性,让其他线程能够看到最新值,但不能解决指令交错问题,即不能保证原子性。

CAS必须借助Volatile才能读取到共享变量的最新值来实现比较并交换的结果。

为什么CAS比synchronized效率高
  • 在无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而synchronized会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。
  • 但是在无锁情况下,因为线程要保持运行,需要额外的CPU的支持,没有额外的CPU,线程想要高速运行也无从谈起,虽然不会阻塞,但由于没有时间片,仍然会进入可运行状态,还是会导致上下文切换。
CAS的特点

结合CAS和Volatile可以实现无锁并发,适用于线程数少、多核CPU的场景下:

  • CAS是基于乐观锁的思想,最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,重试即可。
  • synchronized是基于悲观锁的思想,最悲观的估计,防着其他线程前来修改共享变量,上锁之后谁都不想改,改完再解锁。
  • CAS体现的是无锁并发、无阻塞并发
    • 因为没有使用synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
    • 如果竞争激励,重试必然频繁发生,反而效率会受到影响

原子整数

J.U.C并发包提供了:

  • AtomicBoolean
  • AtomicInteger
  • AtomicLong

以AtomicInteger为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
AtomicInteger i = new AtomicInteger(0);
// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
System.out.println(i.getAndIncrement());
// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
System.out.println(i.incrementAndGet());
// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
System.out.println(i.decrementAndGet());
// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
System.out.println(i.getAndDecrement());
// 获取并加值(i = 0, 结果 i = 5, 返回 0)
System.out.println(i.getAndAdd(5));
// 加值并获取(i = 5, 结果 i = 0, 返回 0)
System.out.println(i.addAndGet(-5));
// 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.getAndUpdate(p -> p - 2));
// 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.updateAndGet(p -> p + 2));
// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
// getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
// getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));

原子引用

  • AtomicReference
  • AtomicMarkableReference
  • AtomicStampedReferenced
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public interface DecimalAccount {
// 获取余额
BigDecimal getBalance();
// 取款
void withdraw(BigDecimal amount);
/**
* 方法内会启动 1000 个线程,每个线程做 -10 元 的操作
* 如果初始余额为 10000 那么正确的结果应当是 0
*/
static void demo(DecimalAccount account) {
List<Thread> ts = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
ts.add(new Thread(() -> {
account.withdraw(BigDecimal.TEN);
}));
}
ts.forEach(Thread::start);
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(account.getBalance());
}
}

锁实现安全取款操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class DecimalAccountSafeLock implements DecimalAccount {
private final Object lock = new Object();
BigDecimal balance;
public DecimalAccountSafeLock(BigDecimal balance) {
this.balance = balance;
}
@Override
public BigDecimal getBalance() {
return balance;
}
@Override
public void withdraw(BigDecimal amount) {
synchronized (lock) {
BigDecimal balance = this.getBalance();
this.balance = balance.subtract(amount);
}
}
}
AtomicReference

CAS实现安全取款(事实上1000个线程不太符合CAS的应用场景:线程较少,多核CPU)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class DecimalAccountSafeCas implements DecimalAccount {
AtomicReference<BigDecimal> ref;
public DecimalAccountSafeCas(BigDecimal balance) {
ref = new AtomicReference<>(balance);
}
@Override
public BigDecimal getBalance() {
return ref.get();
}
@Override
public void withdraw(BigDecimal amount) {
while (true) {
BigDecimal prev = ref.get();
BigDecimal next = prev.subtract(amount);
if (ref.compareAndSet(prev, next)) {
break;
}
}
}
}

ABA问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static AtomicReference<String> ref = new AtomicReference<>("A");
public static void main(String[] args) throws InterruptedException {
log.debug("main start...");
// 获取值 A
// 这个共享变量被它线程修改过?
String prev = ref.get();
other();
sleep(1);
// 尝试改为 C
log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
}
private static void other() {
new Thread(() -> {
log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));
}, "t1").start();
sleep(0.5);
new Thread(() -> {
log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));
}, "t2").start();
}
/**
11:29:52.325 c.Test36 [main] - main start...
11:29:52.379 c.Test36 [t1] - change A->B true
11:29:52.879 c.Test36 [t2] - change B->A true
11:29:53.880 c.Test36 [main] - change A->C true
**/
AtomicStampedReference

主线程仅能判断出共享变量的值与最初值A是否相同,不能感知到这种从A改为B又改回A的情况。如果主线程希望,只要有其他线程动过了共享变量,那么自己的CAS就算失败,这时候仅比较值是不够的,还需要加一个版本号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
public static void main(String[] args) throws InterruptedException {
log.debug("main start...");
// 获取值 A
String prev = ref.getReference();
// 获取版本号
int stamp = ref.getStamp();
log.debug("版本 {}", stamp);
// 如果中间有其它线程干扰,发生了 ABA 现象
other();
sleep(1);
// 尝试改为 C
log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
}
private static void other() {
new Thread(() -> {
log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",
ref.getStamp(), ref.getStamp() + 1));
log.debug("更新版本为 {}", ref.getStamp());
}, "t1").start();
sleep(0.5);
new Thread(() -> {
log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",
ref.getStamp(), ref.getStamp() + 1));
log.debug("更新版本为 {}", ref.getStamp());
}, "t2").start();
}
/**
15:41:34.891 c.Test36 [main] - main start...
15:41:34.894 c.Test36 [main] - 版本 0
15:41:34.956 c.Test36 [t1] - change A->B true
15:41:34.956 c.Test36 [t1] - 更新版本为 1
15:41:35.457 c.Test36 [t2] - change B->A true
15:41:35.457 c.Test36 [t2] - 更新版本为 2 # 与保存的不一致了
15:41:36.457 c.Test36 [main] - change A->C false
**/
AtomicMarkableReference

有时候不需要知道版本号,只需要知道有没有被修改过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class GarbageBag {
String desc;
public GarbageBag(String desc) {
this.desc = desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return super.toString() + " " + desc;
}
}
@Slf4j
public class TestABAAtomicMarkableReference {
public static void main(String[] args) throws InterruptedException {
GarbageBag bag = new GarbageBag("装满了垃圾");
// 参数2 mark 可以看作一个标记,表示垃圾袋满了
AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);
log.debug("主线程 start...");
GarbageBag prev = ref.getReference();
log.debug(prev.toString());
new Thread(() -> {
log.debug("打扫卫生的线程 start...");
bag.setDesc("空垃圾袋");
while (!ref.compareAndSet(bag, bag, true, false)) {}
log.debug(bag.toString());
}).start();
Thread.sleep(1000);
log.debug("主线程想换一只新垃圾袋?");
boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);
log.debug("换了么?" + success);
log.debug(ref.getReference().toString());
}
}
/**
2019-10-13 15:30:09.264 [main] 主线程 start...
2019-10-13 15:30:09.270 [main] cn.itcast.GarbageBag@5f0fd5a0 装满了垃圾
2019-10-13 15:30:09.293 [Thread-1] 打扫卫生的线程 start...
2019-10-13 15:30:09.294 [Thread-1] cn.itcast.GarbageBag@5f0fd5a0 空垃圾袋
2019-10-13 15:30:10.294 [main] 主线程想换一只新垃圾袋?
2019-10-13 15:30:10.294 [main] 换了么?false
2019-10-13 15:30:10.294 [main] cn.itcast.GarbageBag@5f0fd5a0 空垃圾袋
**/

原子数组

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray
1
new AtomicIntegerArray(10);

字段更新器

  • AtomicReferenceFieldUpdater // 域 字段
  • AtomicIntegerFieldUpdater
  • AtomicLongFieldUpdater

利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常。

1
Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Test5 {
private volatile int field;
public static void main(String[] args) {
AtomicIntegerFieldUpdater fieldUpdater =
AtomicIntegerFieldUpdater.newUpdater(Test5.class, "field");
Test5 test5 = new Test5();
fieldUpdater.compareAndSet(test5, 0, 10);
// 修改成功 field = 10
System.out.println(test5.field);
// 修改成功 field = 20
fieldUpdater.compareAndSet(test5, 10, 20);
System.out.println(test5.field);
// 修改失败 field = 20
fieldUpdater.compareAndSet(test5, 10, 30);
System.out.println(test5.field);
}
}
/**
10
20
20
**/

原子类加器

性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。

LongAdder类关键域
1
2
3
4
5
6
// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;
CAS锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 不要用于实践!!!
public class LockCas {
private AtomicInteger state = new AtomicInteger(0);
public void lock() {
while (true) {
if (state.compareAndSet(0, 1)) {
break;
}
}
}
public void unlock() {
log.debug("unlock...");
state.set(0);
}
}
伪共享
1
2
3
4
5
6
7
8
9
10
11
12
// 防止缓存行伪共享
@sun.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
// 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值
final boolean cas(long prev, long next) {
return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
}
// 省略不重要代码
}
//@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效

Unsafe

Unsafe对象提供了非常底层的,操作内存、线程的方法,Unsafe对象不能直接调用,只能通过反射获得。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class UnsafeAccessor {
static Unsafe unsafe;
static {
try {
Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");
theUnsafe.setAccessible(true);
unsafe = (Unsafe) theUnsafe.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new Error(e);
}
}
static Unsafe getUnsafe() {
return unsafe;
}
}
CAS操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Student {
volatile int id;
volatile String name;
}

Unsafe unsafe = UnsafeAccessor.getUnsafe();
Field id = Student.class.getDeclaredField("id");
Field name = Student.class.getDeclaredField("name");
// 获得成员变量的偏移量
long idOffset = UnsafeAccessor.unsafe.objectFieldOffset(id);
long nameOffset = UnsafeAccessor.unsafe.objectFieldOffset(name);
Student student = new Student();
// 使用 cas 方法替换成员变量的值
UnsafeAccessor.unsafe.compareAndSwapInt(student, idOffset, 0, 20); // 返回 true
UnsafeAccessor.unsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回 true
System.out.println(student);

5. 共享模型之不可变

日期转换问题

问题提出

下面的代码在运行时,由于SimpleDateFormat不是线程安全的,有很大几率出现java.lang.NumberFormatException或者出现不正确的日期解析结果。

1
2
3
4
5
6
7
8
9
10
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
log.debug("{}", sdf.parse("1951-04-21"));
} catch (Exception e) {
log.error("{}", e);
}
}).start();
}
1
2
3
4
19:10:40.859 [Thread-2] c.TestDateParse - {}
java.lang.NumberFormatException: For input string: ""
19:10:40.857 [Thread-4] c.TestDateParse - Sat Apr 21 00:00:00 CST 1951
19:10:40.857 [Thread-5] c.TestDateParse - Mon Apr 21 00:00:00 CST 178960645
同步锁解决

虽然能解决问题,但带来的是性能上的缺失。

1
2
3
4
5
6
7
8
9
10
11
12
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
for (int i = 0; i < 50; i++) {
new Thread(() -> {
synchronized (sdf) {
try {
log.debug("{}", sdf.parse("1951-04-21"));
} catch (Exception e) {
log.error("{}", e);
}
}
}).start();
}
不可变解决

如果一个对象在不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改.这样的对象在Java中有很多,例如在Java 8后,提供了一个新的日期格式化类:

1
2
3
4
5
6
7
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");
for (int i = 0; i < 10; i++) {
new Thread(() -> {
LocalDate date = dtf.parse("2018-10-01", LocalDate::from);
log.debug("{}", date);
}).start();
}

DateTimeFormatter的文档:

1
2
@implSpec
This class is immutable and thread-safe.

不可变对象,实际是另一种避免竞争的方式。

不可变设计

Stirng类就是不可变的,以String类为例,看不可变设计的要素。

1
2
3
4
5
6
7
8
public final class String
implements java.io.Serializable, Comparable<String>, CharSequence {
/** The value is used for character storage. */
private final char value[];
/** Cache the hash code for the string */
private int hash; // Default to 0
// ...
}
final的使用

该类、类中所有属性都是final的

  • 属性用final修饰符保证了该属性是只读的,不能修改
  • 类用final修饰符保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性
保护性拷贝
1
2
3
4
5
6
7
8
9
10
public String substring(int beginIndex) {
if (beginIndex < 0) {
throw new StringIndexOutOfBoundsException(beginIndex);
}
int subLen = value.length - beginIndex;
if (subLen < 0) {
throw new StringIndexOutOfBoundsException(subLen);
}
return (beginIndex == 0) ? this : new String(value, beginIndex, subLen);
}

其内部是调用String的构造方法创建了一个新字符串

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public String(char value[], int offset, int count) {
if (offset < 0) {
throw new StringIndexOutOfBoundsException(offset);
}
if (count <= 0) {
if (count < 0) {
throw new StringIndexOutOfBoundsException(count);
}
if (offset <= value.length) {
this.value = "".value;
return;
}
}
if (offset > value.length - count) {
throw new StringIndexOutOfBoundsException(offset + count);
}
this.value = Arrays.copyOfRange(value, offset, offset+count);
}

该构造函数也未对final char[] value做出修改,构造新字符串对象时,会生成新的char[] value,对内容进行复制,这种通过创建副本对象来避免共享得手段称之为【保护性拷贝defensive copy】。

模式之享元

用于减少创建对象的数量,以减少内存占用和提高性能。

体现
  1. 包装类

    在JDK中Boolean,Byte,Short,Integer,Long,Character等包装类提供了valueOf方法,例如Long的valueOf会缓存-128~127之间的Long对象,在这个范围之间会重用对象,大于这个范围,才会新建Long对象:

1
2
3
4
5
6
7
public static Long valueOf(long l) {
final int offset = 128;
if (l >= -128 && l <= 127) { // will cache
return LongCache.cache[(int)l + offset];
}
return new Long(l);
}
  • Byte, Short, Long缓存的范围都是-128~127
  • Character缓存的范围是0~127
  • Integer的默认范围是-128~127
    • 最小值不能变
    • 但最大值可以通过调整虚拟机参数 -Djava.lang.Integer.IntegerCache.high来改变
  • Boolean缓存了TRUE和FALSE
  1. String串池
  2. BigDecimal BigInteger
享元模式实现数据库连接池

预先创建好一批连接,放入连接池。一次请求到达后,从连接池获取连接,使用完毕后再还回连接池,这样既节约了连接的创建和关闭时间,也实现了连接的重用,不至于让庞大的连接数压垮数据库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class Pool {
// 1. 连接池大小
private final int poolSize;
// 2. 连接对象数组
private Connection[] connections;
// 3. 连接状态数组 0 表示空闲, 1 表示繁忙
private AtomicIntegerArray states;
// 4. 构造方法初始化
public Pool(int poolSize) {
this.poolSize = poolSize;
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection("连接" + (i+1));
}
}
// 5. 借连接
public Connection borrow() {
while(true) {
for (int i = 0; i < poolSize; i++) {
// 获取空闲连接
if(states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
log.debug("borrow {}", connections[i]);
return connections[i];
}
}
}
// 如果没有空闲连接,当前线程进入等待
synchronized (this) {
try {
log.debug("wait...");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 6. 归还连接
public void free(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == conn) {
states.set(i, 0);
synchronized (this) {
log.debug("free {}", conn);
this.notifyAll();
}
break;
}
}
}
}
class MockConnection implements Connection {
// 实现略
}
1
2
3
4
5
6
7
8
9
10
11
12
13
// 测试
Pool pool = new Pool(2);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
Connection conn = pool.borrow();
try {
Thread.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.free(conn);
}).start();
}

以上实现没有考虑:

  • 连接的动态增长与收缩
  • 连接保活(可用性检测)
  • 等待超时处理
  • 分布式Hash
final的原理
设置final的原理
1
2
3
public class TestFinal {
final int a = 20;
}

字节码

1
2
3
4
5
6
7
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: aload_0
5: bipush 20
7: putfield #2 // Field a:I
<-- 写屏障
10: return

final 变量的赋值也会通过putfield指令来完成,同样在这条指令之后也会加入写屏障,保证在其它线程读到它的值时不会出现为0的情况。

获取final的原理

数字比较小直接在栈内存,数字超过短整型最大值就在常量池,但是如果不加final就在堆中,效率低。

无状态

6. 共享模型之工具

线程池

自定义线程池
  1. 自定义任务队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    @Slf4j
    class BlockingQueue<T> {
    // 任务队列
    private Deque<T> queue = new ArrayDeque<>();
    // 锁
    private ReentrantLock lock = new ReentrantLock();
    // 生产者条件变量,添加线程,满的时候等待
    private Condition fullWaitSet = lock.newCondition();
    // 消费者条件变量,执行线程,空的时候等待
    private Condition emptyWaitSet = lock.newCondition();
    // 容量
    private int capcity;

    public BlockingQueue(int capcity) {
    this.capcity = capcity;
    }
    // 阻塞获取
    public T task() {
    lock.lock();
    try {
    while (queue.isEmpty()) {
    try {
    emptyWaitSet.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    T t = queue.removeFirst();
    fullWaitSet.signal();
    return t;
    } finally {
    lock.unlock();
    }
    }
    // 带超时的阻塞获取
    public T poll(long timeout, TimeUnit unit) {
    lock.lock();
    try {
    long nanos = unit.toNanos(timeout);
    while (queue.isEmpty()) {
    try {
    if (nanos <= 0) {
    return null;
    }
    nanos = emptyWaitSet.awaitNanos(nanos);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    T t = queue.removeFirst();
    return t;
    } finally {
    lock.unlock();
    }
    }
    // 阻塞添加
    public void put(T task) {
    lock.lock();
    try {
    while (queue.size() == capcity) {
    try {
    fullWaitSet.await();
    log.debug("等待任务加入队列{}", task);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    log.debug("加入任务队列{}", task);
    queue.addLast(task);
    emptyWaitSet.signal();
    } finally {
    lock.unlock();
    }
    }
    // 带超时的阻塞添加
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
    lock.lock();
    try {
    long nanos = timeUnit.toNanos(timeout);
    while (queue.size() == capcity) {
    try {
    if (nanos <= 0) {
    return false;
    }
    log.debug("等待加入队列{}", task);
    nanos = fullWaitSet.awaitNanos(nanos);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    log.debug("加入任务队列{}", task);
    queue.addLast(task);
    emptyWaitSet.signal();
    return true;
    } finally {
    lock.unlock();
    }
    }
    // 返回队列长度
    public int size() {
    lock.lock();
    try {
    return queue.size();
    } finally {
    lock.unlock();
    }
    }
    }
  2. 自定义线程池

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    @Slf4j
    class ThreadPool {
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;
    // 线程集合
    private HashSet<Worker> workers = new HashSet<>();
    // 核心线程数
    private int coreSize;
    // 获取任务时的超时时间
    private long timeout;
    private TimeUnit timeUnit;
    private RejectPolicy<Runnable> rejectPolicy;

    public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, RejectPolicy<Runnable> rejectPolicy, int queueCapcity) {
    this.coreSize = coreSize;
    this.timeout = timeout;
    this.timeUnit = timeUnit;
    this.rejectPolicy = rejectPolicy;
    this.taskQueue = new BlockingQueue<>(queueCapcity);
    }
    // 执行任务
    public void execute(Runnable task) {
    synchronized (workers) {
    if (workers.size() < coreSize) {
    Worker worker = new Worker(task);
    log.debug("新增worker{}", worker);
    workers.add(worker);
    worker.start();
    } else {
    taskQueue.tryPut(rejectPolicy, task);
    }
    }
    }

    class Worker extends Thread {
    private Runnable task;
    public Worker(Runnable task) {
    this.task = task;
    }
    @Override
    public void run() {
    // 当task不为空,执行任务;当task执行完毕,从任务队列中获取任务并执行
    while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
    try {
    log.debug("正在执行{}", task);
    task.run();
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    task = null;
    }
    }
    synchronized (workers) {
    log.debug("worker被移除{}", this);
    workers.remove(this);
    }
    }
    }
    }
  3. 追加一个拒绝策略

    1
    2
    3
    4
    @FunctionalInterface
    interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
    }
  4. 针对拒绝策略增加函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // 带拒绝策略的添加
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
    lock.lock();
    try {
    if (queue.size() == capcity) {
    rejectPolicy.reject(this, task);
    } else {
    log.debug("加入任务队列{}", task);
    queue.addLast(task);
    emptyWaitSet.signal();
    }
    } finally {
    lock.unlock();
    }
    }
  5. 测试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    ThreadPool threadPool = new ThreadPool(1,
    1000, TimeUnit.MILLISECONDS, (queue, task) -> {
    // 1. 死等
    // queue.put(task);
    // 2) 带超时等待
    queue.offer(task, 1500, TimeUnit.MILLISECONDS);
    // 3) 让调用者放弃任务执行
    // log.debug("放弃{}", task);
    // 4) 让调用者抛出异常
    // throw new RuntimeException("任务执行失败 " + task);
    // 5) 让调用者自己执行任务
    // task.run();
    }, 1);
    for (int i = 0; i < 4; i++) {
    int j = i;
    threadPool.execute(() -> {
    try {
    Thread.sleep(1000L);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    log.debug("{}", j);
    });
    }
ThreadPoolExecutor
线程池状态

ThreadPoolExecutor使用int的高3位来表示线程池状态,低29位表示线程数量。

状态名 高3位 接收新任务 处理阻塞队列任务 说明
RUNNING 111 Y Y
SHUTDOWN 000 N Y 不会接收新任务,但会处理阻塞队列剩余
任务
STOP 001 N N 会中断正在执行的任务,并抛弃阻塞队列
任务
TIDYING 010 - - 任务全执行完毕,活动线程为 0 即将进入
终结
TERMINATED 011 - - 终结状态

从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING,这里因为高三位,第一位是int的符号位,因此RUNNING是负数,最小。

这些信息存储在一个原子变量clt中,目的是将线程池状态与线程个数合二为一,这样就可以用一次cas原子操作进行赋值。

1
2
3
4
// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }
构造方法
1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
  • corePoolSize,核心线程数目(最多保留的线程数)
  • maximumPoolSize,最大线程数目
  • keepAliveTime,生存时间,针对救急线程
  • unit,时间单位,针对救急线程
  • workQueue,阻塞队列
  • threadFactory,线程工厂,可以为线程创建时候起个好名字
  • handler,拒绝策略

工作方式:

  • 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务
  • 当线程数达到corePoolSize并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue队列排队,直到有空闲的进程
  • 如果队列选择了有界队列,那么任务超过了队列大小时,会创建maximumPoolSize - corePoolSixze数目的线程来救急
  • 如果线程到达maximumPoolSize仍然有新任务这时候会执行拒绝策略,拒绝策略JDK提供了4种实现,其他著名框架也提供了实现
    • CallerRunsPolicy,让调用者运行任务
    • DiscardPolicy,放弃本次任务
    • DiscardOldestPolicy,放弃队列中最早的任务,本任务取而代之
    • Dubbo的实现,在抛出RejectedExecutionException异常之前会记录日志,并dump线程栈信息,方便定位问题
    • Netty的实现,是创建一个新线程来执行任务
    • ActiveMQ的实现,带超时等待(60s)尝试放入队列,类似上文实例中的自定义的拒绝策略
    • PinPoint的实现,使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
  • 当高峰过去之后,超过corePoolSize的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime和unit来控制

使用这个构造方法,JDK Executors类提供了众多工厂方法来创建各种用途的线程池

newFixedThreadPool

固定大小线程池

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  • 核心线程数=最大线程数,因此没有救急线程被创建,因此也不需要超时时间
  • 阻塞队列是无界的,可以放任意数量的任务
  • 用于任务量已知,相对耗时的任务
newCachedThreadPool

带缓冲线程池

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
  • 核心线程是0,最大线程数是Integer.MAX_VALUE,救急线程的空闲生存时间是60s,意味着:

    • 全部都是救济线程(60s后可以回收)
    • 救急线程可以无限创建
  • 队列采用了SynchronousQueue,实现特点是,他没有容量,没有线程来取是放不进去的(一手交钱,一手交货)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    SynchronousQueue<Integer> integers = new SynchronousQueue<>();
    new Thread(() -> {
    try {
    log.debug("putting {} ", 1);
    integers.put(1);
    log.debug("{} putted...", 1);
    log.debug("putting...{} ", 2);
    integers.put(2);
    log.debug("{} putted...", 2);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    },"t1").start();
    sleep(1);
    new Thread(() -> {
    try {
    log.debug("taking {}", 1);
    integers.take();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    },"t2").start();
    sleep(1);
    new Thread(() -> {
    try {
    log.debug("taking {}", 2);
    integers.take();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    },"t3").start();
    /**
    1:48:15.500 c.TestSynchronousQueue [t1] - putting 1
    11:48:16.500 c.TestSynchronousQueue [t2] - taking 1
    11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted... 有人取走put才算真正执行完成
    11:48:16.500 c.TestSynchronousQueue [t1] - putting...2
    11:48:17.502 c.TestSynchronousQueue [t3] - taking 2
    11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted...
    **/
    • 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲1分钟后释放线程
    • 适合任务数比较密集,但每个任务执行时间较短的情况
newSingleThreadExecutor
1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
  • 希望多个任务排队执行,线程数固定为1,任务数多于1时,会放入无界队列排队,任务执行完毕,这唯一的线程也不会被释放。
  • 区别
    • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
    • Executors.newSingleThreadExecutor()的线程个数始终为1,不能修改
      • FinalizableDelegatedExecutorService应用的是装饰器模式,只对外暴露了ExecutorService接口,因此不能调用ThreadPoolExecutor中特有的方法
    • Executors.newFixedThreadPool(1)初始时为1,以后还可以修改
      • 对外暴露的是ThreadPoolExecutor对象,可以强转后调用setCorePoolSize等方法进行修改
提交任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 执行任务
void execute(Runnable command);
// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
关闭线程池

######shutdown

1
2
3
4
5
6
7
/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行
*/
void shutdown();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(SHUTDOWN);
// 仅会打断空闲线程
interruptIdleWorkers();
onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
tryTerminate();
}

######shutdownNow

1
2
3
4
5
6
7
/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(STOP);
// 打断所有线程
interruptWorkers();
// 获取队列中剩余任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终结
tryTerminate();
return tasks;
}

######其他方法

1
2
3
4
5
6
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
异步模式之工作线程

######定义

让有限的工作线程来轮流异步处理无限多的任务,也可以将其归类为分工模式,他的典型实现就是线程池,也体现了经典设计模式中的享元模式。

注意,不同任务类型应该使用不同的线程池,这样能避免饥饿,并能提升效率。

######饥饿

固定大小线程池会有饥饿现象,例如:

  • 两个工人是同一个线程池中的两个线程
  • 他们要做的事情是,为客人点餐和到后厨做菜,这是两个阶段的工作
    • 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
    • 后厨做菜
  • 比如工人A处理了点餐任务,接下来他要等着工人B把菜做好,然后上菜
  • 但现在同时来了两个客人,这个时候工人A和工人B都去处理点餐了,这时候没人作饭了,产生饥饿现象了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class TestDeadLock {
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random RANDOM = new Random();
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> {
log.debug("处理点餐...");
Future<String> f = executorService.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get()); // f.get()会等线程出结果
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
executorService.execute(() -> {
log.debug("处理点餐...");
Future<String> f = executorService.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}
/**
可能的输出
17:08:41.339 c.TestDeadLock [pool-1-thread-2] - 处理点餐...
17:08:41.339 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
**/

解决办法可以增加线程池的大小,不过不是根本解决方案,不同的任务类型,采用不同的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class TestDeadLock {
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random RANDOM = new Random();
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
ExecutorService waiterPool = Executors.newFixedThreadPool(1);
ExecutorService cookPool = Executors.newFixedThreadPool(1);
waiterPool.execute(() -> {
log.debug("处理点餐...");
Future<String> f = cookPool.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
waiterPool.execute(() -> {
log.debug("处理点餐...");
Future<String> f = cookPool.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}

######创建多少线程合适

  • 过小会导致程序不能充分利用系统资源,容易导致饥饿
  • 过大导致更多的线程上下文切换,占更多的内存
  1. CPU密集型运算

    通常采用CPU核心数+1能够实现最优的CPU利用率,+1是保证当线程由于页缺失故障(操作系统)或其他原因导致暂停时,额外的这个线程能够顶上去,保证CPU的周期不被浪费。

  2. I/O密集型运算

    CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当执行IO操作、远程RPC调用时候,包括进行数据库操作时,这时候CPU闲下来了,可以利用多线程提高它的利用率。

    经验公式如下:

    线程数 = 核数 * 期望CPU利用率 * 总时间(CPU计算时间+等待时间) / CPU计算时间

    例如4核CPU计算时间是50%,其他等待时间是50%,期望CPU被百分百利用,套用公式:

    4 * 100% * 100% / 50% = 8

任务调度线程池
Timer(已过期)

在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
log.debug("task 1");
sleep(2);
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
log.debug("task 2");
}
};
// 使用 timer 添加两个任务,希望它们都在 1s 后执行
// 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行
timer.schedule(task1, 1000);
timer.schedule(task2, 1000);
}
/**
20:46:09.444 c.TestTimer [main] - start...
20:46:10.447 c.TestTimer [Timer-0] - task 1
20:46:12.448 c.TestTimer [Timer-0] - task 2
**/

######ScheduledExecutorService改写

1
2
3
4
5
6
7
8
9
10
11
12
13
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 添加两个任务,希望它们都在 1s 后执行
executor.schedule(() -> {
System.out.println("任务1,执行时间:" + new Date());
try { Thread.sleep(2000); } catch (InterruptedException e) { }
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> {
System.out.println("任务2,执行时间:" + new Date());
}, 1000, TimeUnit.MILLISECONDS);
/**
任务1,执行时间:Thu Jan 03 12:45:17 CST 2019
任务2,执行时间:Thu Jan 03 12:45:17 CST 2019
**/

######scheduleAtFixedRate例子

1
2
3
4
5
6
7
8
9
10
11
12
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleAtFixedRate(() -> {
log.debug("running...");
}, 1, 1, TimeUnit.SECONDS);
/**
21:45:43.167 c.TestTimer [main] - start...
21:45:44.215 c.TestTimer [pool-1-thread-1] - running...
21:45:45.215 c.TestTimer [pool-1-thread-1] - running...
21:45:46.215 c.TestTimer [pool-1-thread-1] - running...
21:45:47.215 c.TestTimer [pool-1-thread-1] - running...
**/

当任务执行时间超过了延时时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleAtFixedRate(() -> {
log.debug("running...");
sleep(2);
}, 1, 1, TimeUnit.SECONDS);
/**
一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 2s。
21:44:30.311 c.TestTimer [main] - start...
21:44:31.360 c.TestTimer [pool-1-thread-1] - running...
21:44:33.361 c.TestTimer [pool-1-thread-1] - running...
21:44:35.362 c.TestTimer [pool-1-thread-1] - running...
21:44:37.362 c.TestTimer [pool-1-thread-1] - running...
**/

######scheduleWithFixedDelay例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleWithFixedDelay(()-> {
log.debug("running...");
sleep(2);
}, 1, 1, TimeUnit.SECONDS);
/**
一开始,延时 1s,scheduleWithFixedDelay 的间隔是 上一个任务结束 <-> 延时 <-> 下一个任务开始 所以间隔都是 3s
21:40:55.078 c.TestTimer [main] - start...
21:40:56.140 c.TestTimer [pool-1-thread-1] - running...
21:40:59.143 c.TestTimer [pool-1-thread-1] - running...
21:41:02.145 c.TestTimer [pool-1-thread-1] - running...
21:41:05.147 c.TestTimer [pool-1-thread-1] - running...
**/
正确处理执行任务异常
  • 主动捕捉异常

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    ExecutorService pool = Executors.newFixedThreadPool(1);
    pool.submit(() -> {
    try {
    log.debug("task1");
    int i = 1 / 0;
    } catch (Exception e) {
    log.error("error:", e);
    }
    });
    /**
    21:59:04.558 c.TestTimer [pool-1-thread-1] - task1
    21:59:04.562 c.TestTimer [pool-1-thread-1] - error:
    java.lang.ArithmeticException: / by zero
    at cn.itcast.n8.TestTimer.lambda$main$0(TestTimer.java:28)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    **/
  • 使用Future

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    ExecutorService pool = Executors.newFixedThreadPool(1);
    Future<Boolean> f = pool.submit(() -> {
    log.debug("task1");
    int i = 1 / 0;
    return true;
    });
    log.debug("result:{}", f.get());
    /**
    21:54:58.208 c.TestTimer [pool-1-thread-1] - task1
    Exception in thread "main" java.util.concurrent.ExecutionException:
    java.lang.ArithmeticException: / by zero
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at cn.itcast.n8.TestTimer.main(TestTimer.java:31)
    Caused by: java.lang.ArithmeticException: / by zero
    at cn.itcast.n8.TestTimer.lambda$main$0(TestTimer.java:28)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    **/
线程池的应用之定时任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 完成在周三的晚上八点十分开始每隔一秒执行一次
// 获取当前时间
LocalDateTime now = LocalDateTime.now();
System.out.println(now);
// 获取周三晚上八点
LocalDateTime time = now.withHour(20).withMinute(10).withSecond(0).withNano(0).with(DayOfWeek.WEDNESDAY);
// 一般获取的是当周,如果已经过期了需要加上一周去下一周
if (now.compareTo(time) > 0) {
time = time.plusWeeks(1);
}
System.out.println(time);
// initialDelay代表当前时间和指定时间点的时间差
// preiod 一圈的间隔时间
long initialDelay = Duration.between(now, time).toMillis();
long period = 1000;
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
pool.scheduleAtFixedRate(() -> {
System.out.println("running");
}, initialDelay, period, TimeUnit.MILLISECONDS);
Fork/Join
概念

Fork / Join是JDK 1.7加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的CPU密集型运算。

所谓的任务拆分就是将一个大任务拆分为算法上的小任务,直到不能拆分可以直接求解。跟递归有关的一些计算,如归并排序、斐波那契数列,都可以用分治思想进行求解。

Fork / Join在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率。

Fork / Join默认会创建与CPU核心数大小相同的线程池。

使用

提交给Fork / Join线程池的任务需要继承RecursiveTask(有返回值)或RecursiveActiopn(没有返回值),例如下面是一个对1-n的整数求和的任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Slf4j
class AddTask1 extends RecursiveTask<Integer> {
int n;
public AddTask1(int n) {
this.n = n;
}
@Override
public String toString() {
return "{" +
"n=" + n +
'}';
}
@Override
protected Integer compute() {
// 如果n已经为1,可以求得结果了
if (n == 1) {
log.debug("join(){}~{}", Thread.currentThread().getId(), n);
return n;
}
// 将任务进行拆分,Fork
AddTask1 t1 = new AddTask1(n - 1);
t1.fork();
log.debug("fork(){}~{}+{}", Thread.currentThread().getId(), n, t1);
// 合并join结果
int result = n + t1.join();
log.debug("join(){}~{}+{}={}", Thread.currentThread().getId(), n, t1, result);
return result;
}
}
// 使用ForkJoinPool执行
@Slf4j
public class TestPool {
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(4);
System.out.println(pool.invoke(new AddTask1(5)));
}
}
/**
2020-07-01 21:12:11.556 DEBUG AddTask1 - fork()15~3+{n=2}
2020-07-01 21:12:11.556 DEBUG AddTask1 - fork()14~4+{n=3}
2020-07-01 21:12:11.556 DEBUG AddTask1 - fork()13~5+{n=4}
2020-07-01 21:12:11.556 DEBUG AddTask1 - fork()16~2+{n=1}
2020-07-01 21:12:11.561 DEBUG AddTask1 - join()15~1
2020-07-01 21:12:11.561 DEBUG AddTask1 - join()16~2+{n=1}=3
2020-07-01 21:12:11.562 DEBUG AddTask1 - join()15~3+{n=2}=6
2020-07-01 21:12:11.562 DEBUG AddTask1 - join()14~4+{n=3}=10
2020-07-01 21:12:11.562 DEBUG AddTask1 - join()13~5+{n=4}=15
15
调用多个线程同时执行并等待结果
**/

改进分治策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Slf4j
class AddTask1 extends RecursiveTask<Integer> {
int begin, end;

public AddTask1(int begin, int end) {
this.begin = begin;
this.end = end;
}

@Override
protected Integer compute() {
if (begin == end) {
log.debug("join(){}", begin);
return begin;
}
if (end - begin == 1) {
log.debug("join(){} + {} = {}", begin, end, end + begin);
return begin;
}
int mid = (end + begin) / 2;
AddTask1 t1 = new AddTask1(begin, mid);
t1.fork();
AddTask1 t2 = new AddTask1(mid + 1, end);
t2.fork();
log.debug("fork(){} + {} = ?", t1, t2);
int result = t1.join() + t2.join();
log.debug("join(){} + {} = {}", t1, t2, result);
return result;
}

@Override
public String toString() {
return "{" + begin +
", " + end +
'}';
}
}
/**
2020-07-01 21:18:55.230 DEBUG AddTask1 - fork(){0, 1} + {2, 2} = ?
2020-07-01 21:18:55.230 DEBUG AddTask1 - fork(){0, 2} + {3, 5} = ?
2020-07-01 21:18:55.230 DEBUG AddTask1 - join()0 + 1 = 1
2020-07-01 21:18:55.230 DEBUG AddTask1 - fork(){3, 4} + {5, 5} = ?
2020-07-01 21:18:55.236 DEBUG AddTask1 - join()2
2020-07-01 21:18:55.237 DEBUG AddTask1 - join()3 + 4 = 7
2020-07-01 21:18:55.238 DEBUG AddTask1 - join(){0, 1} + {2, 2} = 2
2020-07-01 21:18:55.238 DEBUG AddTask1 - join()5
2020-07-01 21:18:55.238 DEBUG AddTask1 - join(){3, 4} + {5, 5} = 8
2020-07-01 21:18:55.238 DEBUG AddTask1 - join(){0, 2} + {3, 5} = 10
10
**/

线程池工作过程

  1. 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
  2. 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
    1. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
    2. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
    3. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
    4. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常 RejectExecutionException。
  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行。
  4. 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

Java中的阻塞队列

  1. ArrayBlockingQueue :由数组结构组成的有界阻塞队列。

    用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列:

    1
    ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
  2. LinkedBlockingQueue :由链表结构组成的有界阻塞队列。

    基于链表的阻塞队列,同ArrayListBlockingQueue类似,此队列按照先进先出(FIFO)的原则对元素进行排序。而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
    LinkedBlockingQueue 会默认一个类似无限大小的容量(Integer.MAX_VALUE)。

  3. PriorityBlockingQueue :支持优先级排序的无界阻塞队列。

    是一个支持优先级的无界队列。默认情况下元素采取自然顺序升序排列。可以自定义实现compareTo()方法来指定元素进行排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

  4. DelayQueue:使用优先级队列实现的无界阻塞队列。

    是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

  5. SynchronousQueue:不存储元素的阻塞队列。

    是一个不存储元素的阻塞队列。每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。SynchronousQueue 可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另 外 一 个 线 程 使 用 , SynchronousQueue 的 吞 吐 量 高 于 LinkedBlockingQueue 和 ArrayBlockingQueue。

  6. LinkedTransferQueue:由链表结构组成的无界阻塞队列。

    是 一 个 由 链 表 结 构 组 成 的 无 界 阻 塞 TransferQueue 队 列 。 相 对 于 其 他 阻 塞 队 列 ,
    LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。

  7. LinkedBlockingDeque:由链表结构组成的双向阻塞队列。