Java并发编程第三篇,共享模型剩余部分内容。
1. Java内存模型
Java Memory Model,它定义了主存、工作内存抽象概念,底层对应着CPU寄存器、缓存、硬件内存、CPU指令优化等。
- 原子性 - 保证指令不会受到线程上下文切换的影响
- 可见性 - 保证指令不会受CPU缓存的影响
- 有序性 - 保证指令不会受CPU指令并行优化的影响
2. 可见性
问题
1 | static boolean run = true; |
- 初始状态,t线程刚开始从主内存读取了run的值到工作内存
- 因为t线程要频繁从主内存中读取run的值,JIT编译器会将run的值缓存到自己的工作内存的高速缓存中,减少对主存中run的访问,提高效率
- 1秒后,main线程修改了run的值,并同步至内存,而t是从自己工作内存中的高速缓存中读取这个变量的值,结果永远是旧值
解决
1 | volatile static boolean run = true; // 必须去主存得值 |
volatile易变关键字,它可以用来修饰成员变量和静态成员变量,可以避免线程从自己得工作缓存中查找变量的值,必须到主存中获取它的值,线程操作volatile变量都是直接操作主存。
可见性VS原子性
synchronized语句块既可以保证代码块的原子性,也同时保证代码内变量的可见性,但缺点是属于重量级操作,性能相对较低。
终止模式之两阶段终止模式
这次利用volatile实现
1 | // 停止标记用 volatile 是为了保证该变量在多个线程之间的可见性 |
同步模式之Balking模式
Balking犹豫模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做了,直接结束返回。
1 | public class MonitorService { |
它还经常用来实现线程安全的单例。
1 | public final class Singleton { |
保护性暂停模式是用在一个线程等待另一个线程的执行结果,当条件不满足时,线程等待。
3. 有序性
多线程下指令重排会影响正确性。
1 | // 该程序因为线程时间片的原因出现多种结果 |
禁用指令重排。
1 | volatile boolean ready = false; // 加在ready即可,写屏障可以防止之前的代码重排序 |
volatile原理
底层实现是内存屏障的技术,Memory Barrier(Memory Fence)。对volatile变量的写指令后会加入写屏障,对volatile变量的读指令前会加入读屏障。
保证可见性
-
写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中。
1
2
3
4
5public void actor(I_Result r){
num = 2;
ready = true; // ready是volatile赋值带写屏障
// 写屏障
} -
而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新的数据。
1
2
3
4
5
6
7
8
9public void actor(I_Result r){
//读屏障
// ready是volatile读取值带读屏障
if (ready){
r.r1 = num + num;
} else {
r.r1 = 1;
}
}
保证有序性
-
写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后。
1
2
3
4
5public void actor(I_Result r){
num = 2;
ready = true; // ready是volatile赋值带写屏障
// 写屏障
} -
读屏障会确保指令重排序时候,不会将读屏障之后的代码排在读屏障之前
-
但是不能解决指令交错
- 写屏障只是保证之后的读能读到最新的结果,但不能保证读跑到他前面去
- 有序性的保证也只是保证了本线程的相关代码不被重排序
double-checked locking问题
以double-cheaked locking为例
1 | public final class Singleton{ |
以上实现的特点是:
- 懒惰实例化
- 首次使用getInstance()才会使用synchronized加锁,后续使用时候无需加锁
- 隐含的一点是,第一个if使用了INSTANCE变量,是在同步块之外的
问题出现:
INSTANCE = new Singleton()
不是一个原子操作,他可能会出现,先赋值再调用构造方法的可能。因此如果要在构造方法中执行很多初始化操作,那么t2拿到的将是一个未初始化完毕的单例。
解决:
1 | private static volatile Singleton INSTANCE = null; |
happens-before规则
可参见虚拟机部分博客
习题
balking模式
以下代码想让doInit()仅被调用一次,是否有问题?
1 | public class TestVolatile { |
问题:
涉及对共享变量的读写,init()不是原子操作,容易导致:线程1进入doInit()操作时,由于整个函数并不互斥,这时线程2进行判断发现未初始化,接下来也会进入doInit()操作。
volatile主要适用于,一个线程写,其他线程读的情况;double-checked locking时,保证synchronized代码块外的指令重排序问题。
解决:
1 | synchronized void init() { |
线程安全单例
单例模式有很多实现方法,懒汉、饿汉、静态内部类、枚举类
- 饿汉式:类加载就会导致该单实例对象被创建
- 懒汉式:类加载不会导致该单实例对象被创建,而是首次使用该对象时才会创建
实现1
1 | // 问题1:为什么加 final |
实现2
1 | // 问题1:枚举单例是如何限制实例个数的 |
实现3
1 | // 懒汉式,犹豫模式 |
实现4
1 | // dcl,对实现3的改进 |
实现5
1 | public final class Singleton { |
4. 共享模型之无锁
问题
账户转账类,会在多线程执行时,指令交错从而导致结果不同。
1 | interface Account { |
解决思路
-
使用锁解决,给Account加锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14class AccountUnsafe implements Account {
private Integer balance;
public AccountUnsafe(Integer balance) {
this.balance = balance;
}
public synchronized Integer getBalance() {
return balance;
}
public synchronized void withdraw(Integer amount) {
balance -= amount;
}
} -
无锁CAS解决
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24class AccountSafe implements Account {
private AtomicInteger balance; // 原子类
public AccountSafe(Integer balance) {
this.balance = new AtomicInteger(balance);
}
public Integer getBalance() {
return balance.get();
}
public void withdraw(Integer amount) {
while (true) {
int prev = balance.get();
int next = prev - amount;
if (balance.compareAndSet(prev, next)) { // 等待prev和当前值一致
break;
}
}
}
}
// 执行测试方法
public static void main(String[] args) {
Account.demo(new AccountSafe(10000));
}
CAS与Volatile
Volatile
获取共享变量时,为了保证该变量的可见性,需要Volatile修饰。
它可以用来修饰成员变量和静态成员变量,它可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作Volatile变量都是直接操作主存,即一个线程对Volatile变量的修改,对另一个线程可见。
Volatile仅仅保证了共享变量的可见性,让其他线程能够看到最新值,但不能解决指令交错问题,即不能保证原子性。
CAS必须借助Volatile才能读取到共享变量的最新值来实现比较并交换的结果。
为什么CAS比synchronized效率高
- 在无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而synchronized会让线程在没有获得锁的时候,发生上下文切换,进入阻塞。
- 但是在无锁情况下,因为线程要保持运行,需要额外的CPU的支持,没有额外的CPU,线程想要高速运行也无从谈起,虽然不会阻塞,但由于没有时间片,仍然会进入可运行状态,还是会导致上下文切换。
CAS的特点
结合CAS和Volatile可以实现无锁并发,适用于线程数少、多核CPU的场景下:
- CAS是基于乐观锁的思想,最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,重试即可。
- synchronized是基于悲观锁的思想,最悲观的估计,防着其他线程前来修改共享变量,上锁之后谁都不想改,改完再解锁。
- CAS体现的是无锁并发、无阻塞并发
- 因为没有使用synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
- 如果竞争激励,重试必然频繁发生,反而效率会受到影响
原子整数
J.U.C并发包提供了:
- AtomicBoolean
- AtomicInteger
- AtomicLong
以AtomicInteger为例
1 | AtomicInteger i = new AtomicInteger(0); |
原子引用
- AtomicReference
- AtomicMarkableReference
- AtomicStampedReferenced
1 | public interface DecimalAccount { |
锁实现安全取款操作
1 | class DecimalAccountSafeLock implements DecimalAccount { |
AtomicReference
CAS实现安全取款(事实上1000个线程不太符合CAS的应用场景:线程较少,多核CPU)
1 | class DecimalAccountSafeCas implements DecimalAccount { |
ABA问题
1 | static AtomicReference<String> ref = new AtomicReference<>("A"); |
AtomicStampedReference
主线程仅能判断出共享变量的值与最初值A是否相同,不能感知到这种从A改为B又改回A的情况。如果主线程希望,只要有其他线程动过了共享变量,那么自己的CAS就算失败,这时候仅比较值是不够的,还需要加一个版本号。
1 | static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0); |
AtomicMarkableReference
有时候不需要知道版本号,只需要知道有没有被修改过。
1 | class GarbageBag { |
原子数组
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
1 | new AtomicIntegerArray(10); |
字段更新器
- AtomicReferenceFieldUpdater // 域 字段
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常。
1 | Exception in thread "main" java.lang.IllegalArgumentException: Must be volatile type |
1 | public class Test5 { |
原子类加器
性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
LongAdder类关键域
1 | // 累加单元数组, 懒惰初始化 |
CAS锁
1 | // 不要用于实践!!! |
伪共享
1 | // 防止缓存行伪共享 |
Unsafe
Unsafe对象提供了非常底层的,操作内存、线程的方法,Unsafe对象不能直接调用,只能通过反射获得。
1 | public class UnsafeAccessor { |
CAS操作
1 | class Student { |
5. 共享模型之不可变
日期转换问题
问题提出
下面的代码在运行时,由于SimpleDateFormat不是线程安全的,有很大几率出现java.lang.NumberFormatException或者出现不正确的日期解析结果。
1 | SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); |
1 | 19:10:40.859 [Thread-2] c.TestDateParse - {} |
同步锁解决
虽然能解决问题,但带来的是性能上的缺失。
1 | SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); |
不可变解决
如果一个对象在不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改.这样的对象在Java中有很多,例如在Java 8后,提供了一个新的日期格式化类:
1 | DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd"); |
DateTimeFormatter的文档:
1 |
|
不可变对象,实际是另一种避免竞争的方式。
不可变设计
Stirng类就是不可变的,以String类为例,看不可变设计的要素。
1 | public final class String |
final的使用
该类、类中所有属性都是final的
- 属性用final修饰符保证了该属性是只读的,不能修改
- 类用final修饰符保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性
保护性拷贝
1 | public String substring(int beginIndex) { |
其内部是调用String的构造方法创建了一个新字符串
1 | public String(char value[], int offset, int count) { |
该构造函数也未对final char[] value做出修改,构造新字符串对象时,会生成新的char[] value,对内容进行复制,这种通过创建副本对象来避免共享得手段称之为【保护性拷贝defensive copy】。
模式之享元
用于减少创建对象的数量,以减少内存占用和提高性能。
体现
-
包装类
在JDK中Boolean,Byte,Short,Integer,Long,Character等包装类提供了valueOf方法,例如Long的valueOf会缓存-128~127之间的Long对象,在这个范围之间会重用对象,大于这个范围,才会新建Long对象:
1 | public static Long valueOf(long l) { |
- Byte, Short, Long缓存的范围都是-128~127
- Character缓存的范围是0~127
- Integer的默认范围是-128~127
- 最小值不能变
- 但最大值可以通过调整虚拟机参数
-Djava.lang.Integer.IntegerCache.high
来改变
- Boolean缓存了TRUE和FALSE
- String串池
- BigDecimal BigInteger
享元模式实现数据库连接池
预先创建好一批连接,放入连接池。一次请求到达后,从连接池获取连接,使用完毕后再还回连接池,这样既节约了连接的创建和关闭时间,也实现了连接的重用,不至于让庞大的连接数压垮数据库。
1 | class Pool { |
1 | // 测试 |
以上实现没有考虑:
- 连接的动态增长与收缩
- 连接保活(可用性检测)
- 等待超时处理
- 分布式Hash
final的原理
设置final的原理
1 | public class TestFinal { |
字节码
1 | 0: aload_0 |
final 变量的赋值也会通过putfield指令来完成,同样在这条指令之后也会加入写屏障,保证在其它线程读到它的值时不会出现为0的情况。
获取final的原理
数字比较小直接在栈内存,数字超过短整型最大值就在常量池,但是如果不加final就在堆中,效率低。
无状态
6. 共享模型之工具
线程池
自定义线程池
-
自定义任务队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
class BlockingQueue<T> {
// 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 锁
private ReentrantLock lock = new ReentrantLock();
// 生产者条件变量,添加线程,满的时候等待
private Condition fullWaitSet = lock.newCondition();
// 消费者条件变量,执行线程,空的时候等待
private Condition emptyWaitSet = lock.newCondition();
// 容量
private int capcity;
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
// 阻塞获取
public T task() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
// 带超时的阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
if (nanos <= 0) {
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
return t;
} finally {
lock.unlock();
}
}
// 阻塞添加
public void put(T task) {
lock.lock();
try {
while (queue.size() == capcity) {
try {
fullWaitSet.await();
log.debug("等待任务加入队列{}", task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列{}", task);
queue.addLast(task);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
// 带超时的阻塞添加
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capcity) {
try {
if (nanos <= 0) {
return false;
}
log.debug("等待加入队列{}", task);
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列{}", task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
// 返回队列长度
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
} -
自定义线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class ThreadPool {
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
// 核心线程数
private int coreSize;
// 获取任务时的超时时间
private long timeout;
private TimeUnit timeUnit;
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, RejectPolicy<Runnable> rejectPolicy, int queueCapcity) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.rejectPolicy = rejectPolicy;
this.taskQueue = new BlockingQueue<>(queueCapcity);
}
// 执行任务
public void execute(Runnable task) {
synchronized (workers) {
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
log.debug("新增worker{}", worker);
workers.add(worker);
worker.start();
} else {
taskQueue.tryPut(rejectPolicy, task);
}
}
}
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
public void run() {
// 当task不为空,执行任务;当task执行完毕,从任务队列中获取任务并执行
while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
log.debug("正在执行{}", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
log.debug("worker被移除{}", this);
workers.remove(this);
}
}
}
} -
追加一个拒绝策略
1
2
3
4
interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue, T task);
} -
针对拒绝策略增加函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// 带拒绝策略的添加
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
if (queue.size() == capcity) {
rejectPolicy.reject(this, task);
} else {
log.debug("加入任务队列{}", task);
queue.addLast(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
} -
测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24ThreadPool threadPool = new ThreadPool(1,
1000, TimeUnit.MILLISECONDS, (queue, task) -> {
// 1. 死等
// queue.put(task);
// 2) 带超时等待
queue.offer(task, 1500, TimeUnit.MILLISECONDS);
// 3) 让调用者放弃任务执行
// log.debug("放弃{}", task);
// 4) 让调用者抛出异常
// throw new RuntimeException("任务执行失败 " + task);
// 5) 让调用者自己执行任务
// task.run();
}, 1);
for (int i = 0; i < 4; i++) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}", j);
});
}
ThreadPoolExecutor
线程池状态
ThreadPoolExecutor使用int的高3位来表示线程池状态,低29位表示线程数量。
状态名 | 高3位 | 接收新任务 | 处理阻塞队列任务 | 说明 |
---|---|---|---|---|
RUNNING | 111 | Y | Y | |
SHUTDOWN | 000 | N | Y | 不会接收新任务,但会处理阻塞队列剩余 任务 |
STOP | 001 | N | N | 会中断正在执行的任务,并抛弃阻塞队列 任务 |
TIDYING | 010 | - | - | 任务全执行完毕,活动线程为 0 即将进入 终结 |
TERMINATED | 011 | - | - | 终结状态 |
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING,这里因为高三位,第一位是int的符号位,因此RUNNING是负数,最小。
这些信息存储在一个原子变量clt中,目的是将线程池状态与线程个数合二为一,这样就可以用一次cas原子操作进行赋值。
1 | // c 为旧值, ctlOf 返回结果为新值 |
构造方法
1 | public ThreadPoolExecutor(int corePoolSize, |
- corePoolSize,核心线程数目(最多保留的线程数)
- maximumPoolSize,最大线程数目
- keepAliveTime,生存时间,针对救急线程
- unit,时间单位,针对救急线程
- workQueue,阻塞队列
- threadFactory,线程工厂,可以为线程创建时候起个好名字
- handler,拒绝策略
工作方式:
- 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务
- 当线程数达到corePoolSize并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue队列排队,直到有空闲的进程
- 如果队列选择了有界队列,那么任务超过了队列大小时,会创建maximumPoolSize - corePoolSixze数目的线程来救急
- 如果线程到达maximumPoolSize仍然有新任务这时候会执行拒绝策略,拒绝策略JDK提供了4种实现,其他著名框架也提供了实现
- CallerRunsPolicy,让调用者运行任务
- DiscardPolicy,放弃本次任务
- DiscardOldestPolicy,放弃队列中最早的任务,本任务取而代之
- Dubbo的实现,在抛出RejectedExecutionException异常之前会记录日志,并dump线程栈信息,方便定位问题
- Netty的实现,是创建一个新线程来执行任务
- ActiveMQ的实现,带超时等待(60s)尝试放入队列,类似上文实例中的自定义的拒绝策略
- PinPoint的实现,使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
- 当高峰过去之后,超过corePoolSize的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime和unit来控制
使用这个构造方法,JDK Executors类提供了众多工厂方法来创建各种用途的线程池
newFixedThreadPool
固定大小线程池
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
- 核心线程数=最大线程数,因此没有救急线程被创建,因此也不需要超时时间
- 阻塞队列是无界的,可以放任意数量的任务
- 用于任务量已知,相对耗时的任务
newCachedThreadPool
带缓冲线程池
1 | public static ExecutorService newCachedThreadPool() { |
-
核心线程是0,最大线程数是Integer.MAX_VALUE,救急线程的空闲生存时间是60s,意味着:
- 全部都是救济线程(60s后可以回收)
- 救急线程可以无限创建
-
队列采用了SynchronousQueue,实现特点是,他没有容量,没有线程来取是放不进去的(一手交钱,一手交货)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39SynchronousQueue<Integer> integers = new SynchronousQueue<>();
new Thread(() -> {
try {
log.debug("putting {} ", 1);
integers.put(1);
log.debug("{} putted...", 1);
log.debug("putting...{} ", 2);
integers.put(2);
log.debug("{} putted...", 2);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t1").start();
sleep(1);
new Thread(() -> {
try {
log.debug("taking {}", 1);
integers.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t2").start();
sleep(1);
new Thread(() -> {
try {
log.debug("taking {}", 2);
integers.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"t3").start();
/**
1:48:15.500 c.TestSynchronousQueue [t1] - putting 1
11:48:16.500 c.TestSynchronousQueue [t2] - taking 1
11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted... 有人取走put才算真正执行完成
11:48:16.500 c.TestSynchronousQueue [t1] - putting...2
11:48:17.502 c.TestSynchronousQueue [t3] - taking 2
11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted...
**/- 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲1分钟后释放线程
- 适合任务数比较密集,但每个任务执行时间较短的情况
newSingleThreadExecutor
1 | public static ExecutorService newSingleThreadExecutor() { |
- 希望多个任务排队执行,线程数固定为1,任务数多于1时,会放入无界队列排队,任务执行完毕,这唯一的线程也不会被释放。
- 区别
- 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
- Executors.newSingleThreadExecutor()的线程个数始终为1,不能修改
- FinalizableDelegatedExecutorService应用的是装饰器模式,只对外暴露了ExecutorService接口,因此不能调用ThreadPoolExecutor中特有的方法
- Executors.newFixedThreadPool(1)初始时为1,以后还可以修改
- 对外暴露的是ThreadPoolExecutor对象,可以强转后调用setCorePoolSize等方法进行修改
提交任务
1 | // 执行任务 |
关闭线程池
######shutdown
1 | /* |
1 | public void shutdown() { |
######shutdownNow
1 | /* |
1 | public List<Runnable> shutdownNow() { |
######其他方法
1 | // 不在 RUNNING 状态的线程池,此方法就返回 true |
异步模式之工作线程
######定义
让有限的工作线程来轮流异步处理无限多的任务,也可以将其归类为分工模式,他的典型实现就是线程池,也体现了经典设计模式中的享元模式。
注意,不同任务类型应该使用不同的线程池,这样能避免饥饿,并能提升效率。
######饥饿
固定大小线程池会有饥饿现象,例如:
- 两个工人是同一个线程池中的两个线程
- 他们要做的事情是,为客人点餐和到后厨做菜,这是两个阶段的工作
- 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
- 后厨做菜
- 比如工人A处理了点餐任务,接下来他要等着工人B把菜做好,然后上菜
- 但现在同时来了两个客人,这个时候工人A和工人B都去处理点餐了,这时候没人作饭了,产生饥饿现象了。
1 | public class TestDeadLock { |
解决办法可以增加线程池的大小,不过不是根本解决方案,不同的任务类型,采用不同的线程。
1 | public class TestDeadLock { |
######创建多少线程合适
- 过小会导致程序不能充分利用系统资源,容易导致饥饿
- 过大导致更多的线程上下文切换,占更多的内存
-
CPU密集型运算
通常采用CPU核心数+1能够实现最优的CPU利用率,+1是保证当线程由于页缺失故障(操作系统)或其他原因导致暂停时,额外的这个线程能够顶上去,保证CPU的周期不被浪费。
-
I/O密集型运算
CPU不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用CPU资源,但当执行IO操作、远程RPC调用时候,包括进行数据库操作时,这时候CPU闲下来了,可以利用多线程提高它的利用率。
经验公式如下:
线程数 = 核数 * 期望CPU利用率 * 总时间(CPU计算时间+等待时间) / CPU计算时间
例如4核CPU计算时间是50%,其他等待时间是50%,期望CPU被百分百利用,套用公式:
4 * 100% * 100% / 50% = 8
任务调度线程池
Timer(已过期)
在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。
1 | public static void main(String[] args) { |
######ScheduledExecutorService改写
1 | ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); |
######scheduleAtFixedRate例子
1 | ScheduledExecutorService pool = Executors.newScheduledThreadPool(1); |
当任务执行时间超过了延时时间
1 | ScheduledExecutorService pool = Executors.newScheduledThreadPool(1); |
######scheduleWithFixedDelay例子
1 | ScheduledExecutorService pool = Executors.newScheduledThreadPool(1); |
正确处理执行任务异常
-
主动捕捉异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20ExecutorService pool = Executors.newFixedThreadPool(1);
pool.submit(() -> {
try {
log.debug("task1");
int i = 1 / 0;
} catch (Exception e) {
log.error("error:", e);
}
});
/**
21:59:04.558 c.TestTimer [pool-1-thread-1] - task1
21:59:04.562 c.TestTimer [pool-1-thread-1] - error:
java.lang.ArithmeticException: / by zero
at cn.itcast.n8.TestTimer.lambda$main$0(TestTimer.java:28)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
**/ -
使用Future
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21ExecutorService pool = Executors.newFixedThreadPool(1);
Future<Boolean> f = pool.submit(() -> {
log.debug("task1");
int i = 1 / 0;
return true;
});
log.debug("result:{}", f.get());
/**
21:54:58.208 c.TestTimer [pool-1-thread-1] - task1
Exception in thread "main" java.util.concurrent.ExecutionException:
java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at cn.itcast.n8.TestTimer.main(TestTimer.java:31)
Caused by: java.lang.ArithmeticException: / by zero
at cn.itcast.n8.TestTimer.lambda$main$0(TestTimer.java:28)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
**/
线程池的应用之定时任务
1 | // 完成在周三的晚上八点十分开始每隔一秒执行一次 |
Fork/Join
概念
Fork / Join是JDK 1.7加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的CPU密集型运算。
所谓的任务拆分就是将一个大任务拆分为算法上的小任务,直到不能拆分可以直接求解。跟递归有关的一些计算,如归并排序、斐波那契数列,都可以用分治思想进行求解。
Fork / Join在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率。
Fork / Join默认会创建与CPU核心数大小相同的线程池。
使用
提交给Fork / Join线程池的任务需要继承RecursiveTask(有返回值)或RecursiveActiopn(没有返回值),例如下面是一个对1-n的整数求和的任务:
1 |
|
改进分治策略:
1 |
|
线程池工作过程
- 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
- 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
- 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
- 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
- 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
- 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常 RejectExecutionException。
- 当一个线程完成任务时,它会从队列中取下一个任务来执行。
- 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
Java中的阻塞队列
-
ArrayBlockingQueue :由数组结构组成的有界阻塞队列。
用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列:
1
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
-
LinkedBlockingQueue :由链表结构组成的有界阻塞队列。
基于链表的阻塞队列,同ArrayListBlockingQueue类似,此队列按照先进先出(FIFO)的原则对元素进行排序。而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
LinkedBlockingQueue 会默认一个类似无限大小的容量(Integer.MAX_VALUE)。 -
PriorityBlockingQueue :支持优先级排序的无界阻塞队列。
是一个支持优先级的无界队列。默认情况下元素采取自然顺序升序排列。可以自定义实现compareTo()方法来指定元素进行排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。
-
DelayQueue:使用优先级队列实现的无界阻塞队列。
是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
-
SynchronousQueue:不存储元素的阻塞队列。
是一个不存储元素的阻塞队列。每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。SynchronousQueue 可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另 外 一 个 线 程 使 用 , SynchronousQueue 的 吞 吐 量 高 于 LinkedBlockingQueue 和 ArrayBlockingQueue。
-
LinkedTransferQueue:由链表结构组成的无界阻塞队列。
是 一 个 由 链 表 结 构 组 成 的 无 界 阻 塞 TransferQueue 队 列 。 相 对 于 其 他 阻 塞 队 列 ,
LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。 -
LinkedBlockingDeque:由链表结构组成的双向阻塞队列。