Java并发编程第二篇,共享模型之管程,各种锁。
1. 共享带来的问题
临界区 Critical Section
- 一个程序运行多个线程本身是没有问题的
- 问题出在多个线程访问共享资源
- 多个线程访问共享资源其实也没有问题
- 在多个线程对共享资源读写操作时发生指令交错,就会出现问题
- 一段代码块内如果存在对共享资源的多线程读写操作,称这段代码为临界区
1 2 3 4 5 6 7 8 9 10
| static int counter = 0; static void increment(){ counter++; } static void decrement(){ counter--; }
|
竞态条件 Race Condition
多个线程在临界区执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件。
为了避免临界区的竞态条件发生,有多种手段可以达到目的。
- 阻塞式的解决方案,synchronized,Lock
- 非阻塞式的解决方案:原子变量
2. synchronized解决方案
俗称对象锁,它采用互斥的方式让同一时刻至多只有一个线程能持有对象锁,其他线程再想获取这个对象锁时就会被阻塞住,这样就能保证拥有锁的线程可以安全的执行临界区的代码,不用担心上下文的切换。
虽然Java中互斥和同步都可以采用synchronized关键字来完成,但他们还是有区别的:
- 互斥是保证临界区的竞态条件发生,同一时刻只能有一个线程执行临界区代码
- 同步是由于线程执行的先后、顺序不同,需要一个线程等待其他线程运行到某个点
Synchonized是基于进入和退出Monitor对象来实现方法同步和代码块同步,但两者的实现细节不一样。Synchronized 用在方法上时,在字节码中是通过方法的 ACC_SYNCHRONIZED 标志来实现的。而代码块同步则是使用monitorenter和monitorexit指令实现的。
monitorenter指令是在编译后插入到同步代码块的开始位置,而monitorexit是插入到方法结束处和异常处,JVM要保证每个monitorenter必须有对应的monitorexit与之配对。任何对象都有一个monitor与之关联,当且一个monitor被持有后,它将处于锁定状态。线程执行到monitorenter指令时,将会尝试获取对象所对应的monitor的所有权,即尝试获得对象的锁,当获得对象的monitor以后,monitor内部的计数器就会自增(初始为0),当同一个线程再次获得monitor的时候,计数器会再次自增。当同一个线程执行monitorexit指令的时候,计数器会进行自减,当计数器为0的时候,monitor就会被释放,其他线程便可以获得monitor。
语法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| static int counter = 0; static final Object room = new Object(); public static void main(String[] args) throws InterruptedException{ Thread t1 = new Thread(() -> { for (int i = 0; i <5000; i++){ synchronized (room){ counter++; } } }); Thread t2 = new Thread(() -> { for (int i = 0; i <5000; i++){ synchronized (room){ counter--; } } }); t1.start(); t2.start(); t1.join(); t2/join(); }
|
synchronized实际上是用对象锁保证了临界区内代码的原子性,临界区内的代码对外是不可分割的,不会被线程切断所打断。
面向对象改进
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Slf4j public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { Room room = new Room(); Thread thread = new Thread(() -> { for (int i = 0; i < 5000; i++) room.increment(); }); Thread thread1 = new Thread(() -> { for (int i = 0; i < 5000; i++) room.decrement(); }); thread.start(); thread1.start(); thread.join(); thread1.join(); log.debug("主线程结束{}", room.getCounter()); } } class Room { private int counter = 0; public void increment() { synchronized (this) { counter++; } } public void decrement() { synchronized (this) { counter--; } } public int getCounter() { synchronized (this) { return counter; } } }
|
3. 方法上的synchronized
1 2 3 4 5 6 7 8 9 10 11
| class test { public synchronized void test() { } }
class test { public void test() { synchronized(this) { } } }
|
1 2 3 4 5 6 7 8 9 10 11
| class test { public synchronized static void test() { } }
class test { public static void test() { synchronized(test.class) { } } }
|
synchronized 关键字修饰方法和代码的区别
- synchronized修饰方法:同步是隐式的,给方法添加ACC_SYNCHRONIZED的访问修饰符。当某个线程要访问某个方法的时候,会检查是否有ACC_SYNCHRONIZED,如有,则需要先获取当前对象的监视器锁,然后才开始执行方法。方法执行之后再释放监视器锁。
- 同步代码块:采用
monitorenter
、monitorexit
两个指令来实现同步。在执行monitorenter指令时,首先要尝试获取对象的锁。如果这个对象没被锁定,或者当前线程已经拥有了那个对象的锁,把锁的计数器加1,相应的,在执行monitorexit指令时会将锁计数器减1,当计数器为0的时候,锁就会被释放。如果获取对象锁失败,那当前线程就要阻塞等待。直到对象锁被另外一个线程释放为止。
练习:线程八锁
情况1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Slf4j public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { Number n1 = new Number(); new Thread(() -> { log.debug("begin"); n1.a(); }).start(); new Thread(() -> { log.debug("begin"); n1.b(); }).start(); } } @Slf4j class Number { public synchronized void a() { log.debug("1"); }
public synchronized void b() { log.debug("2"); } }
|
情况2,修改Number类如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Slf4j class Number { public synchronized void a() { Thred.sleep(1000); log.debug("1"); }
public synchronized void b() { log.debug("2"); } }
|
情况3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| @Slf4j public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { Number n1 = new Number(); new Thread(() -> { log.debug("begin"); n1.a(); }).start(); new Thread(() -> { log.debug("begin"); n1.b(); }).start(); new Thread(() -> { log.debug("begin"); n1.c(); }).start(); } } @Slf4j class Number { public synchronized void a() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("1"); } public synchronized void b() { log.debug("2"); } public void c() { log.debug("3"); } }
|
情况4
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Slf4j public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { Number n1 = new Number(); Number n2 = new Number(); new Thread(() -> { log.debug("begin"); n1.a(); }).start(); new Thread(() -> { log.debug("begin"); n2.b(); }).start(); } } @Slf4j class Number { public synchronized void a() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("1"); } public synchronized void b() { log.debug("2"); } }
|
情况5
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Slf4j public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { Number n1 = new Number(); new Thread(() -> { log.debug("begin"); n1.a(); }).start(); new Thread(() -> { log.debug("begin"); n1.b(); }).start(); } } @Slf4j class Number { public static synchronized void a() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("1"); } public synchronized void b() { log.debug("2"); } }
|
情况6
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Slf4j public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { Number n1 = new Number(); Number n2 = new Number(); new Thread(() -> { log.debug("begin"); n1.a(); }).start(); new Thread(() -> { log.debug("begin"); n1.b(); }).start(); } } @Slf4j class Number { public static synchronized void a() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("1"); } public static synchronized void b() { log.debug("2"); } }
|
情况7
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Slf4j public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { Number n1 = new Number(); Number n2 = new Number(); new Thread(() -> { log.debug("begin"); n1.a(); }).start(); new Thread(() -> { log.debug("begin"); n2.b(); }).start(); } } @Slf4j class Number { public static synchronized void a() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("1"); } public synchronized void b() { log.debug("2"); } }
|
情况8
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Slf4j public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { Number n1 = new Number(); Number n2 = new Number(); new Thread(() -> { log.debug("begin"); n1.a(); }).start(); new Thread(() -> { log.debug("begin"); n2.b(); }).start(); } } @Slf4j class Number { public static synchronized void a() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("1"); } public static synchronized void b() { log.debug("2"); } }
|
4. 线程安全分析
成员变量和静态变量是否线程安全
- 如果它们没有共享,则线程安全
- 如果它们被共享了,根据它们的状态是否能被改变,又分两种情况:
- 如果只有读操作,则线程安全
- 如果只有写操作,则这段代码属于临界区,需要考虑线程安全
局部变量是否线程安全
- 局部变量是线程安全的
- 但局部变量引用的对象未必
- 如果该对象没有逃离方法的作用域访问,它是线程安全的
- 如果该对象逃离方法的作用范围,需要考虑线程安全
局部变量线程安全分析
1 2 3 4
| public static void text(){ int i = 10; i++; }
|
每个线程调用test()方法时局部变量i,会在每个线程的栈帧内存中被创建多份,因此不存在共享。
但局部变量的引用稍有不同。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Slf4j public class Test { static final int THREAD_NUMBER = 2; static final int LOOP_NUMBER = 200; public static void main(String[] args) { ThreadUnsafe test = new ThreadUnsafe(); for (int i = 0; i < THREAD_NUMBER; i++) { new Thread(() -> { test.method1(LOOP_NUMBER); }, "Thread" + i).start(); } } } class ThreadUnsafe { ArrayList<String> list = new ArrayList<>(); public void method1(int loopNumber) { for (int i = 0; i < loopNumber; i++) { method2(); method3(); } } private void method2() { list.add("1"); } private void method3() { list.remove(0); } }
|
无论哪个线程中的method2引用的都是同一个对象中的list成员变量,method3与method2分析相同。
修改为局部变量,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Slf4j public class Test { static final int THREAD_NUMBER = 2; static final int LOOP_NUMBER = 200;
public static void main(String[] args) { ThreadSafe test = new ThreadSafe(); for (int i = 0; i < THREAD_NUMBER; i++) { new Thread(() -> { test.method1(LOOP_NUMBER); }, "Thread" + i).start(); } } } class ThreadSafe { public final void method1(int loopNumber) { ArrayList<String> list = new ArrayList<>(); for (int i = 0; i < loopNumber; i++) { method2(list); method3(list); } } private void method2(ArrayList<String> list) { list.add("1"); } private void method3(ArrayList<String> list) { list.remove(0); } }
|
- list是局部变量,每个线程调用时会创建其不同实例,没有共享
- 而method2的参数是由method1中传递过来的,与method1引用同一个对象
- method3的参数分析与method2相同
常见线程安全类
- String
- Integer
- StringBuffer
- Random
- Vector
- Hashtable
- java.util.concurrent下的类
这里说它们是线程安全的是指,多个线程调用它们同一个实例的某个方法,是线程安全的,也可以理解为:
- 它们的每个方法是原子的
- 但是它们多个方法组合不是原子的
不可变线程安全性
String、Integer等都是不可变类,因为其内部的状态不可改变,因此它们的方法都是线程安全的。
实例分析
例1
1 2 3 4 5 6 7 8 9 10 11 12
| public class MyServlet extends HttpServlet { Map<String, Object> map = new HashMap<>(); String S1 = "..."; final String S2 = "..."; Date Da = new Date(); final Date D2 = new Date(); }
|
例2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| ublic class MyServlet extends HttpServlet { private UserService userService = new UserServiceImpl(); public void doGet(HttpServletRequest request, HttpServletResponse response) { userService.update(...); } } public class UserServiceImpl implements UserService { private int count = 0; public void update() { count++; } }
|
例3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Aspect @Component public class MyAspect { private long start = 0L; @Before("execution(* *(..))") public void before() { start = System.nanoTime(); } @After("execution(* *(..))") public void after() { long end = System.nanoTime(); System.out.println("cost time:" + (end-start)); } }
|
例4
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class MyServlet extends HttpServlet { private UserService userService = new UserServiceImpl(); public void doGet(HttpServletRequest request, HttpServletResponse response) { userService.update(...); } } public class UserServiceImpl implements UserService { private UserDao userDao = new UserDaoImpl(); public void update() { userDao.update(); } } public class UserDaoImpl implements UserDao { public void update() { String sql = "update user set password = ? where username = ?"; try (Connection conn = DriverManager.getConnection("","","")){ } catch (Exception e) { } } }
|
例5
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class MyServlet extends HttpServlet { private UserService userService = new UserServiceImpl(); public void doGet(HttpServletRequest request, HttpServletResponse response) { userService.update(...); } } public class UserServiceImpl implements UserService { private UserDao userDao = new UserDaoImpl(); public void update() { userDao.update(); } } public class UserDaoImpl implements UserDao { private Connection conn = null; public void update() throws SQLException { String sql = "update user set password = ? where username = ?"; conn = DriverManager.getConnection("","",""); conn.close(); } }
|
例6
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class MyServlet extends HttpServlet { private UserService userService = new UserServiceImpl(); public void doGet(HttpServletRequest request, HttpServletResponse response) { userService.update(...); } } public class UserServiceImpl implements UserService { public void update() { UserDao userDao = new UserDaoImpl(); userDao.update(); } } public class UserDaoImpl implements UserDao { private Connection = null; public void update() throws SQLException { String sql = "update user set password = ? where username = ?"; conn = DriverManager.getConnection("","",""); conn.close(); } }
|
例7
1 2 3 4 5 6 7 8 9 10 11
| public abstract class Test { public void bar() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); foo(sdf); } public abstract foo(SimpleDateFormat sdf); public static void main(String[] args) { new Test().bar(); } }
|
其中foo的行为是不确定的,可能导致不安全的发生,被称之为外星方法。
1 2 3 4 5 6 7 8 9 10 11 12 13
| public void foo(SimpleDateFormat sdf) { String dateStr = "1999-10-11 00:00:00"; for (int i = 0; i < 20; i++) { new Thread(() -> { try { sdf.parse(dateStr); } catch (ParseException e) { e.printStackTrace(); } }).start(); } }
|
6. Monitor概念
被翻译为监视器或者管程
每个Java对象都可以关联一个Monitor对象,如果使用synchronized给对象上锁(重量级)之后,该对象头的Mark Word中就被设置指向Monitor对象的指针。
- 刚开始Monitor中Owner为null
- 当Thread-2执行synchronized(obj)就会将Monitor的所有者Owner置为Thread-2,Monitor中只能有一个Owner
- 在Thread-2上锁的过程中,如果Thread-3、Thread-4、Thread-5也来执行synchronized(obj),就会进入EntryList BLOCKED
- Thread-2执行完同步代码块的内容,然后唤醒EntryList中等待的线程来竞争锁,竞争的时候是非公平的
注意
- synchronized必须是进入同一个对象的monitor才有上述的效果
- 不加synchronized的对象不会监视器,不遵从以上规则
synchronized进阶
1 2 3 4 5 6 7 8 9 10 11 12
| static final Object obj = new Object(); public static void method1() { synchronized( obj ) { method2(); } } public static void method2() { synchronized( obj ) { } }
|
-
创建锁记录(LockRecord)对象,每个线程的栈帧都会包括一个锁记录的结构,内部可以存储锁定对象的MarkWord
-
让锁记录中Object Reference指向锁对象,并尝试用CAS(Compare and swap)替换Object的MarkWord,将MarkWord的值存入锁记录
-
如果CAS替换成功,对象头存储了锁记录地址和状态00,表示由线程给该对象加锁,这时图示如下
-
如果CAS失败,有两种情况
-
当退出synchronized代码块(解锁时)如果有取值为null的锁记录,表示有重入,这时重置锁记录,表示重入计数减一
-
当退出synchronized代码块(解锁时)锁记录的值不为null,这时使用cas将MarkWord的值恢复给对象头
- 成功,则解锁成功
- 失败,说明轻量级锁进行了锁膨胀或已经升级为重量锁,进入重量级锁解锁流程
锁膨胀
如果在尝试加轻量锁的过程中,CAS操作无法成功,这时一种情况就是有其他线程为此对象加上了轻量级锁(有竞争),这时需要进行锁膨胀,将轻量级锁变为重量级锁。
1 2 3 4 5 6
| static Object obj = new Object(); public static void method1() { synchronized( obj ) { } }
|
-
当Thread-1进行轻量级加锁时,Thread-0已经对该对象加了轻量级锁
-
这时Thread-1加轻量级锁失败,进入锁膨胀流程
-
当Thread-0退出同步代码块解锁时,使用CAS将MarkWord的值恢复给对象头,失败,这时会进入重量级解锁流程,即按照Monitor地址找到Monitor对象,设置Owner为null,唤醒EntryList中BLOCKED线程
自旋优化
重量级锁竞争的时候,还可以使用自旋来优化,如果当前线程自旋成功(即这时候锁线程已经退出了同步块,释放了锁),这时当前线程就可以避免阻塞。
-
失败
偏向锁
轻量级锁在没有竞争时(就自己这个线程),每次重入仍然需要执行CAS操作。
Java6中引入了偏向锁来做进一步优化,只有第一次使用CAS将线程ID设置到对象的MarkWord头,之后发现这个线程ID是自己的就表示没有竞争,不用重新CAS,以后只要不发生竞争,这个对象就归该线程所有。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| static final Object obj = new Object(); public static void m1() { synchronized( obj ) { m2(); } } public static void m2() { synchronized( obj ) { m3(); } } public static void m3() { synchronized( obj ) { } }
|
偏向状态
一个对象创建时:
- 如果开启了偏向锁(默认开启),那么对象创建后,markword值为0x05即后三位为101,这时他的thread、epoch、age都为0
- 偏向锁默认是延迟的,不会在程序启动时立即生效,如果想避免延迟,可以加JVM参数
-XX:biasedLockingStartupDelay=0
来禁用延迟
- 如果没有开启偏向锁,那么对象创建后,markword值为0x01即后三位为001,这时它的hashcode、age都为0,第一次用到hashcode时候才会赋值
- -XX:-UseBiasedLocking,禁用偏向锁
- 当一个可偏向的对象,调用了自己的hashcode之后,会撤销他的偏向状态
撤销-调用hashcode
调用了对象的hashcode,但偏向锁的对象markword中存储的是线程id,如果调用hashCode会导致偏向锁被撤销。
- 轻量级锁会在锁记录中记录hashCode
- 重量级锁会在Monitor中记录hashCode
在调用hashCode后使用偏向锁,记得去掉-XX:-UseBiasedLocking
。
撤销-其他线程使用对象
当有其他线程使用偏向锁对象时,会将偏向锁升级为轻量级锁,由可偏向变为不可偏向。
撤销-调用wait/notify
因为wait/notify只有重量级锁才有,因此调用了就会升级为重量级锁。
批量重偏向
如果对象虽然被多个线程访问,但没有竞争,这时偏向了线程T1的对象仍然有机会重新偏向T2,重偏向会重置对象的ThreadID。
当撤销偏向锁阈值超过二十次以后,JVM会觉得自己是不是偏向错了,于是会给这些对象加锁时重新偏向至加锁线程。
批量撤销
当撤销偏向锁阈值超过40次后,JVM会觉得自己确实偏向错了,根本就不该偏向,于是整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的。
锁消除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Fork(1) @BenchmarkMode(Mode.AverageTime) @Warmup(iterations=3) @Measurement(iterations=5) @OutputTimeUnit(TimeUnit.NANOSECONDS) public class MyBenchmark { static int x = 0; @Benchmark public void a() throws Exception { x++; } @Benchmark public void b() throws Exception { Object o = new Object(); synchronized (o) { x++; } } }
|
1 2 3 4
| # java -jar benchmarks.jar 热点代码优化,性能相差无几 Benchmark Mode Samples Score Score error Units c.i.MyBenchmark.a avgt 5 1.542 0.056 ns/op c.i.MyBenchmark.b avgt 5 1.518 0.091 ns/op
|
1 2 3 4
| # java -XX:-EliminateLocks -jar benchmarks.jar 关闭锁消除优化 Benchmark Mode Samples Score Score error Units c.i.MyBenchmark.a avgt 5 1.507 0.108 ns/op c.i.MyBenchmark.b avgt 5 16.976 1.572 ns/op
|
锁粗化:对相同对象多次加锁,导致线程发生多次重入,可以使用锁粗化方式来优化,这不同于之前讲的细分锁的粒度。
7. wait / notify
- Owner线程发现条件不满足,调用wait方法,即可进入WaitSet变为Waiting状态
- BLOCKED和WAITING的线程都处于阻塞状态,不占用CPU时间片
- BLOCKED线程会在Owner线程释放锁时唤醒
- WAITING线程会在Owner线程调用notify或notufyAll时候唤醒, 但唤醒后并不意味着立刻获得锁,仍需进入EntryList重新竞争
API
- obj.wait()让进入object监视器的线程到waitSet等待
- obj.notify()在object上正在waitSet等待的线程中挑一个唤醒
- obj.notifyAll()让object上正在waitSet等待的线程全部唤醒
它们都是线程之间进行协作的手段,都属于Object对象的方法,必须获得此对象的锁,才能调用这几个方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| final static Object obj = new Object(); public static void main(String[] args) { new Thread(() -> { synchronized (obj) { log.debug("执行...."); try { obj.wait(); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("其它代码...."); } }).start(); new Thread(() -> { synchronized (obj) { log.debug("执行...."); try { obj.wait(); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("其它代码...."); } }).start(); sleep(2); log.debug("唤醒 obj 上其它线程"); synchronized (obj) { obj.notify(); } }
|
1 2 3 4 5
| # notify的一种结果 20:00:53.096 [Thread-0] c.TestWaitNotify - 执行.... 20:00:53.099 [Thread-1] c.TestWaitNotify - 执行.... 20:00:55.096 [main] c.TestWaitNotify - 唤醒 obj 上其它线程 20:00:55.096 [Thread-0] c.TestWaitNotify - 其它代码....
|
1 2 3 4 5 6
| # notifyAll的结果 19:58:15.457 [Thread-0] c.TestWaitNotify - 执行.... 19:58:15.460 [Thread-1] c.TestWaitNotify - 执行.... 19:58:17.456 [main] c.TestWaitNotify - 唤醒 obj 上其它线程 19:58:17.456 [Thread-1] c.TestWaitNotify - 其它代码.... 19:58:17.456 [Thread-0] c.TestWaitNotify - 其它代码....
|
wait()方法会释放对象的锁,进入WaitSet等待区,从而让其他线程有机会获得对象的锁。无限制等待直到notify为止。
wait(long n)有时限的等待,到n毫秒后结束等待,或被notify,wait(long timeout, int nanos)纳秒。
8. wait/notify的正确姿势
sleep和notify的区别
- sleep是Thread方法,而wait是Object的方法
- sleep不需要强制和synchronized配合使用,但wait需要和synchronized使用
- sleep在睡眠的同时,不会释放对象锁的,但wait在等待的时候会释放对象锁
- 它们的状态都是TIMED_WAITING
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| new Thread(() -> { synchronized (room) { log.debug("有烟没?[{}]", hasCigarette); while (!hasCigarette) { log.debug("没烟,先歇会!"); try { room.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("有烟没?[{}]", hasCigarette); if (hasCigarette) { log.debug("可以开始干活了"); } else { log.debug("没干成活..."); } } }, "小南").start(); new Thread(() -> { synchronized (room) { Thread thread = Thread.currentThread(); log.debug("外卖送到没?[{}]", hasTakeout); if (!hasTakeout) { log.debug("没外卖,先歇会!"); try { room.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("外卖送到没?[{}]", hasTakeout); if (hasTakeout) { log.debug("可以开始干活了"); } else { log.debug("没干成活..."); } } }, "小女").start(); sleep(1); new Thread(() -> { synchronized (room) { hasTakeout = true; log.debug("外卖到了噢!"); room.notifyAll(); } }, "送外卖的").start();
|
套路总结
1 2 3 4 5 6 7 8 9 10
| synchronized(lock) { while(条件不成立) { lock.wait(); } }
synchronized(lock) { lock.notifyAll(); }
|
同步模式之保护性暂停
Guarded Suspension,用在一个线程等待另一个线程的执行结果。
- 有一个结果需要从一个线程传递到另一个线程,让它们关联同一个GuardedObject
- 如果有结果不断从一个线程到另一个线程,可以使用消息队列
- JDK中,join的实现、Future的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| class GuardedObject { private Object response; private final Object lock = new Object(); public Object get() { synchronized (lock) { while (response == null) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return response; } } public void complete(Object response) { synchronized (lock) { this.response = response; lock.notifyAll(); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static void main(String[] args) { GuardedObject guardedObject = new GuardedObject(); new Thread(() -> { try { List<String> response = download(); log.debug("download complete..."); guardedObject.complete(response); } catch (IOException e) { e.printStackTrace(); } }).start(); log.debug("waiting..."); Object response = guardedObject.get(); log.debug("get response: [{}] lines", ((List<String>) response).size()); }
|
增加超时效果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| class GuardedObjectV { private Object response; private final Object lock = new Object(); public Object get(long millis) { synchronized (lock) { long begin = System.currentTimeMillis(); long timePassed = 0; while (response == null) { long waitTime = millis - timePassed; log.debug("waitTime: {}", waitTime); if (waitTime <= 0) { log.debug("break..."); break; } try { lock.wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } timePassed = System.currentTimeMillis() - begin; log.debug("timePassed: {}, object is null {}", timePassed, response == null); } return response; } } public void complete(Object response) { synchronized (lock) { this.response = response; log.debug("notify..."); lock.notifyAll(); } } }
|
join原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public final void join() throws InterruptedException { join(0); } public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0;
if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); }
if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
|
解耦
以居民邮箱为例,在多个类之间进行参数传递,设计一个用来解耦的中间类,解耦结果等待者和结果生产者。
生产线程和结果使用线程是一一对应的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| @Slf4j public class Test { public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 3; i++) { new People().start(); } Thread.sleep(1000); for (Integer id : MailBoxes.getIds()) { new Postman(id, "内容" + id).start(); } } }
@Slf4j class People extends Thread { @Override public void run() { GuardedObject guardedObject = MailBoxes.createGuardedObject(); log.debug("开始收信 id:{}", guardedObject.getId()); Object mail = guardedObject.get(5000); log.debug("收到信 id:{} 内容:{}", guardedObject.getId(), mail); } } @Slf4j class Postman extends Thread { private int id; private String mail;
public Postman(int id, String mall) { this.id = id; this.mail = mall; } @Override public void run() { GuardedObject guardedObject = MailBoxes.getGuardedObject(id); log.debug("送信 id:{}, 内容:{}", id, mail); guardedObject.complete(mail); } }
class MailBoxes { public static Map<Integer, GuardedObject> boxes = new Hashtable<>(); private static int id = 1; private static synchronized int generateId() { return id++; } public static GuardedObject getGuardedObject(int id) { return boxes.remove(id); } public static GuardedObject createGuardedObject() { GuardedObject go = new GuardedObject(generateId()); boxes.put(go.getId(), go); return go; } public static Set<Integer> getIds() { return boxes.keySet(); } } class GuardedObject { private int id; private Object response; public GuardedObject(int id) { this.id = id; } public int getId() { return id; } public Object get(long timeout) { synchronized (this) { long begin = System.currentTimeMillis(); long passedTime = 0; while (response == null) { long waitTime = timeout - passedTime; if (timeout - passedTime <= 0) { break; } try { this.wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } passedTime = System.currentTimeMillis() - begin; } return response; } } public void complete(Object response) { synchronized (this) { this.response = response; this.notifyAll(); } } }
|
异步模式之生产者消费者
- 与保护性暂停中的GuardedObject不同,不需要产生结果和消费结果的线程一一对应
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
- JDK中各种阻塞队列,采用的就是这种模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| @Slf4j public class Test { public static void main(String[] args) throws InterruptedException { MessageQueue messageQueue = new MessageQueue(2); for (int i = 0; i < 3; i++) { int id = i; new Thread(() -> { messageQueue.put(new Message(id, "值" + id)); }, "生产者" + i).start(); } new Thread(() -> { while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } Message message = messageQueue.take(); } }, "消费者").start(); } } @Slf4j class MessageQueue { private LinkedList<Message> queue; private int capacity;
MessageQueue(int capacity) { this.capacity = capacity; queue = new LinkedList<>(); } public Message take() { synchronized (queue) { while (queue.isEmpty()) { log.debug("没货了,wait"); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message message = queue.removeFirst(); log.debug("已经消费了一个消息"); queue.notifyAll(); return message; } } public void put(Message message) { synchronized (queue) { while (queue.size() == capacity) { log.debug("库存已经达到上限,wait"); try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(message); log.debug("已经生产了一个消息"); queue.notifyAll(); } } } final class Message { private int id; private Object message; Message(int id, Object message) { this.id = id; this.message = message; } public int getId() { return id; } public Object getMessage() { return message; } }
|
9. Park & Unpark
基本使用
1 2 3 4 5
|
LockSupport.park();
LockSupport.unpark(暂停线程的对象);
|
1 2 3 4 5 6 7 8 9 10 11
| Thread t1 = new Thread(() -> { log.debug("start..."); sleep(1); log.debug("park..."); LockSupport.park(); log.debug("resume..."); },"t1"); t1.start(); sleep(2); log.debug("unpark..."); LockSupport.unpark(t1);
|
特点
- wait、notify和notifyAll必须配合Object Monitor一起使用,而park、unpark不必
- park & unpark以线程为单位来阻塞和唤醒线程,而notify只能随机唤醒一个等待线程,notifyAll是唤醒所有等待线程,不是那么精确
- park & unpark可以先unpark,而wait & notify不能先notify
原理
每个线程都有自己的一个Parker对象,由三部分组成,_counter,_cond和*_mutex*,打个比喻。
- 线程就像一个旅人,Parker就像他随身携带的背包,条件变量就好比背包中的帐篷,_counter就好比背包中的备用干粮(0为耗尽,1为充足)
- 调用park就是要看需不需要停下来歇息
- 如果备用干粮耗尽,那么钻进帐篷休息
- 如果备用干粮充足,那么不需停留,继续前进
- 调用unpark,就好比令干粮充足
- 如果这时线程还在帐篷,就唤醒让他继续前进
- 如果这时线程还在运行,那么他下次调用park时,仅是消耗备用干粮,不需停留继续前进
- 因为背包空间有限,多次调用unpark仅会补充一份备用干粮
10. 重新理解线程状态转换
11. 多把锁
一间屋子有学习和睡觉两个功能,如果A要学习,B要睡觉,但是如果只用一个屋子即一个对象锁的话,并发度很低,解决办法就是转杯多个房间即多个对象锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| class BigRoom { public void sleep() { synchronized (this) { log.debug("sleeping 2 小时"); Sleeper.sleep(2); } } public void study() { synchronized (this) { log.debug("study 1 小时"); Sleeper.sleep(1); } } }
|
改进
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| class BigRoom { private final Object studyRoom = new Object(); private final Object bedRoom = new Object(); public void sleep() { synchronized (bedRoom) { log.debug("sleeping 2 小时"); Sleeper.sleep(2); } } public void study() { synchronized (studyRoom) { log.debug("study 1 小时"); Sleeper.sleep(1); } } }
|
将锁的细粒度细分
- 好处:增强并发度
- 坏处:如果一个线程需要同时获得多把锁,容易发生死锁
12. 活跃性
死锁
一个线程需要同时获得多把锁,这时候就容易发生死锁。
- T1线程获得A对象锁,接下来想获得B对象锁
- T2线程获得B对象锁,接下来想获得A对象锁
使用顺序加锁的方式解决该问题,即都按照AB的顺序获得锁。
定位死锁
- 检测死锁可以使用jsconsole工具,或者使用jps定位进程id,再用jstact定位死锁
- 避免死锁要注意加锁顺序
- 如果某个进程进入了死循环,导致其它线程一直等待,对于这种情况,Linux下可以通过top先定位到CPU占用高的Java进程,再利用
top -Hp 进程id
来定位哪个进程,最后再用jstack排查
活锁
活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class TestLiveLock { static volatile int count = 10; static final Object lock = new Object(); public static void main(String[] args) { new Thread(() -> { while (count > 0) { sleep(0.2); count--; log.debug("count: {}", count); } }, "t1").start(); new Thread(() -> { while (count < 20) { sleep(0.2); count++; log.debug("count: {}", count); } }, "t2").start(); } }
|
饥饿
一个线程由于优先级太低,始终得不到CPU调度执行,也不能结束。
13. ReentrantLock
可重入锁
- 可中断
- 可以设置超时时间
- 可以设置公平锁
- 支持多个条件变量
与synchronized一样,都支持可重入。
1 2 3 4 5 6 7 8 9
|
reentrantLock.lock(); try{ } finally { reentrantLock.unlock(); }
|
可重入
可重入是指同一个线程如果首次获得了这把锁,那么是因为它是这把锁的拥有者,因此有权利再次获取这把锁。
如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| static ReentrantLock lock = new ReentrantLock(); public static void main(String[] args) { method1(); } public static void method1() { lock.lock(); try { log.debug("execute method1"); method2(); } finally { lock.unlock(); } } public static void method2() { lock.lock(); try { log.debug("execute method2"); method3(); } finally { lock.unlock(); } } public static void method3() { lock.lock(); try { log.debug("execute method3"); } finally { lock.unlock(); } }
|
可打断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| ReentrantLock lock = new ReentrantLock(); Thread t1 = new Thread(() -> { log.debug("启动..."); try { lock.lockInterruptibly(); } catch (InterruptedException e) { e.printStackTrace(); log.debug("等锁的过程中被打断"); return; } try { log.debug("获得了锁"); } finally { lock.unlock(); } }, "t1"); lock.lock(); log.debug("获得了锁"); t1.start(); try { sleep(1); t1.interrupt(); log.debug("执行打断"); } finally { lock.unlock(); }
|
注意如果是不可中断模式lock()
,那么即使使用了interrupt也不会让等待中断。
锁超时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| ReentrantLock lock = new ReentrantLock(); Thread t1 = new Thread(() -> { log.debug("启动..."); try { if (!lock.tryLock(1, TimeUnit.SECONDS)) { log.debug("获取等待 1s 后失败,返回"); return; } } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("获得了锁"); } finally { lock.unlock(); } }, "t1"); lock.lock(); log.debug("获得了锁"); t1.start(); try { sleep(2); } finally { lock.unlock(); }
|
解决哲学家就餐问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| class Chopstick { String name; public Chopstick(String name) { this.name = name; } @Override public String toString() { return "筷子{" + name + '}'; } } class Philosopher extends Thread { Chopstick left; Chopstick right; public Philosopher(String name, Chopstick left, Chopstick right) { super(name); this.left = left; this.right = right; } private void eat() { log.debug("eating..."); Sleeper.sleep(1); } @Override public void run() { while (true) { synchronized (left) { synchronized (right) { eat(); } } } } }
Chopstick c1 = new Chopstick("1"); Chopstick c2 = new Chopstick("2"); Chopstick c3 = new Chopstick("3"); Chopstick c4 = new Chopstick("4"); Chopstick c5 = new Chopstick("5"); new Philosopher("苏格拉底", c1, c2).start(); new Philosopher("柏拉图", c2, c3).start(); new Philosopher("亚里士多德", c3, c4).start(); new Philosopher("赫拉克利特", c4, c5).start(); new Philosopher("阿基米德", c5, c1).start();
|
-
调换顺序使方向相同,但是会产生饥饿问题
1
| new Philosopher("阿基米德", c1, c5).start();
|
-
使用可重入锁解决问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| class Chopstick extends ReentrantLock {}
@Override public void run() { while (true) { if (left.tryLock()) { try { if (right.tryLock()) { try { eat(); } finally { right.unlock(); } } } finally { left.unlock(); } } } }
|
公平锁
ReentrantLock默认是不公平的,即构造参数中默认不写为false
,设置公平锁ReentrantLock lock = new ReentrantLock(true);
。
公平锁一般没有必要,会降低并发度。
条件变量
synchronized中也有条件变量,即WaitSet休息室。而ReentrantLock的条件变量要比synchronized强大,它支持多个条件变量。
- synchronized让不满足条件的线程都在一间休息室等消息
- ReentrantLock支持多间休息室,有专门等烟的休息室、等早餐的休息室,唤醒时也是按照休息室来唤醒
使用流程
- await之前需要先获得锁
- await执行后,会释放锁,进入conditionObject等待
- await的线程被唤醒(或打断、或超时)去重新竞争lock锁
- 竞争lock锁成功后,从await后继续执行
以送烟送外卖为例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| static ReentrantLock lock = new ReentrantLock(); static Condition waitCigaretteQueue = lock.newCondition(); static Condition waitbreakfastQueue = lock.newCondition(); static volatile boolean hasCigrette = false; static volatile boolean hasBreakfast = false; public static void main(String[] args) { new Thread(() -> { try { lock.lock(); while (!hasCigrette) { try { waitCigaretteQueue.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("等到了它的烟"); } finally { lock.unlock(); } }).start(); new Thread(() -> { try { lock.lock(); while (!hasBreakfast) { try { waitbreakfastQueue.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("等到了它的早餐"); } finally { lock.unlock(); } }).start(); sleep(1); sendBreakfast(); sleep(1); sendCigarette(); } private static void sendCigarette() { lock.lock(); try { log.debug("送烟来了"); hasCigrette = true; waitCigaretteQueue.signal(); } finally { lock.unlock(); } } private static void sendBreakfast() { lock.lock(); try { log.debug("送早餐来了"); hasBreakfast = true; waitbreakfastQueue.signal(); } finally { lock.unlock(); } }
|
同步模式之顺序控制
-
控制运行顺序
-
wait/notify版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| static Object obj = new Object();
static boolean t2runed = false; public static void main(String[] args) { Thread t1 = new Thread(() -> { synchronized (obj) { while (!t2runed) { try { obj.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } System.out.println(1); }); Thread t2 = new Thread(() -> { System.out.println(2); synchronized (obj) { t2runed = true; obj.notifyAll(); } }); t1.start(); t2.start(); }
|
-
park/unpark版
-
首先,需要保证先wait再notify,否则wait线程永远得不到唤醒。因此使用了『运行标记』来判断该不该wait
-
第二,如果有些干扰线程错误地notify了wait线程,条件不满足时还要重新等待,使用了while循环来解决
此问题
-
最后,唤醒对象上的wait线程需要使用notifyAll,因为『同步对象』上的等待线程可能不止一个
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| Thread t1 = new Thread(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { } LockSupport.park(); System.out.println("1"); }); Thread t2 = new Thread(() -> { System.out.println("2"); LockSupport.unpark(t1); }); t1.start(); t2.start();
|
-
交替输出
线程1输出a5次,线程2输出b5次,线程3输出c5次,要求输出abcabcabcabcabc。
-
wait/notify版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| class SyncWaitNotify { private int flag; private int loopNumber; public SyncWaitNotify(int flag, int loopNumber) { this.flag = flag; this.loopNumber = loopNumber; } public void print(int waitFlag, int nextFlag, String str) { for (int i = 0; i < loopNumber; i++) { synchronized (this) { while (this.flag != waitFlag) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.print(str); flag = nextFlag; this.notifyAll(); } } } }
SyncWaitNotify syncWaitNotify = new SyncWaitNotify(1, 5); new Thread(() -> { syncWaitNotify.print(1, 2, "a"); }).start(); new Thread(() -> { syncWaitNotify.print(2, 3, "b"); }).start(); new Thread(() -> { syncWaitNotify.print(3, 1, "c"); }).start();
|
-
ReentrantLock版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| class AwaitSignal extends ReentrantLock { public void start(Condition first) { this.lock(); try { log.debug("start"); first.signal(); } finally { this.unlock(); } } public void print(String str, Condition current, Condition next) { for (int i = 0; i < loopNumber; i++) { this.lock(); try { current.await(); log.debug(str); next.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { this.unlock(); } } } private int loopNumber; public AwaitSignal(int loopNumber) { this.loopNumber = loopNumber; } }
AwaitSignal as = new AwaitSignal(5); Condition aWaitSet = as.newCondition(); Condition bWaitSet = as.newCondition(); Condition cWaitSet = as.newCondition(); new Thread(() -> { as.print("a", aWaitSet, bWaitSet); }).start(); new Thread(() -> { as.print("b", bWaitSet, cWaitSet); }).start(); new Thread(() -> { as.print("c", cWaitSet, aWaitSet); }).start(); as.start(aWaitSet);
|
-
park/unpark版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| class SyncPark { private int loopNumber; private Thread[] threads; public SyncPark(int loopNumber) { this.loopNumber = loopNumber; } public void setThreads(Thread... threads) { this.threads = threads; } public void print(String str) { for (int i = 0; i < loopNumber; i++) { LockSupport.park(); System.out.print(str); LockSupport.unpark(nextThread()); } } private Thread nextThread() { Thread current = Thread.currentThread(); int index = 0; for (int i = 0; i < threads.length; i++) { if(threads[i] == current) { index = i; break; } } if(index < threads.length - 1) { return threads[index+1]; } else { return threads[0]; } } public void start() { for (Thread thread : threads) { thread.start(); } LockSupport.unpark(threads[0]); } }
SyncPark syncPark = new SyncPark(5); Thread t1 = new Thread(() -> { syncPark.print("a"); }); Thread t2 = new Thread(() -> { syncPark.print("b"); }); Thread t3 = new Thread(() -> { syncPark.print("c\n"); }); syncPark.setThreads(t1, t2, t3); syncPark.start();
|
场景
交替打印ABABAABA
可重入锁配合 Condition
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
public class ABA { private static int flag = 0; private static ReentrantLock lock = new ReentrantLock(); private static Condition aCondition = lock.newCondition(); private static Condition bCondition = lock.newCondition();
static class PrintA implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { lock.lock(); try { while (flag % 2 != 0) { try { aCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } flag++; System.out.print("A"); bCondition.signal(); } finally { lock.unlock(); } } } }
static class PrintB implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { lock.lock(); try { while (flag % 2 == 0) { try { bCondition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } flag++; System.out.print("B"); aCondition.signal(); } finally { lock.unlock(); } } } }
public static void main(String[] args) { new Thread(new PrintA()).start(); new Thread(new PrintB()).start(); } }
|
可重入锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public class ABA { private static int flag = 0; private static Lock lock = new ReentrantLock();
static class PrintA implements Runnable { @Override public void run() { for (int i = 0; i < 10; ) { lock.lock(); try { while (flag % 2 == 0) { System.out.print("A"); flag++; i++; } } finally { lock.unlock(); } } } }
static class PrintB implements Runnable { @Override public void run() { for (int i = 0; i < 10; ) { lock.lock(); try { while (flag % 2 != 0) { System.out.print("B"); flag++; i++; } } finally { lock.unlock(); } } } }
public static void main(String[] args) { new Thread(new PrintA()).start(); new Thread(new PrintB()).start(); } }
|
wait() 和 notify()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| public class ABA { private static int flag = 0; private Object lock;
private synchronized void printA() { while (flag % 2 != 0) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } flag++; System.out.print("A"); notify(); }
private synchronized void printB() { while (flag % 2 == 0) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } flag++; System.out.print("B"); notify(); }
class PrintA implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { printA(); } } }
class PrintB implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { printB(); } } }
public static void main(String[] args) { ABA aba = new ABA(); PrintA printA = aba.new PrintA(); PrintB printB = aba.new PrintB(); new Thread(printA).start(); new Thread(printB).start(); } }
|
信号量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import java.util.concurrent.Semaphore; public class ABA { private static Semaphore pa = new Semaphore(1); private static Semaphore pb = new Semaphore(0); static class PrintA implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { pa.acquire(); System.out.print("A"); pb.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } } static class PrintB implements Runnable { @Override public void run() { for (int i = 0; i < 10; i++) { try { pb.acquire(); System.out.print("B"); pa.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { new Thread(new PrintA()).start(); new Thread(new PrintB()).start(); } }
|
i++ 线程安全性
i++ 是不安全的,它是一个复合操作,可分为三个阶段:
- 读值,从内存到寄存器
- +1,寄存器自增
- 写值,写回内存
这三步之间都可能会有 CPU 调度,造成值被修改,造成脏读和脏写。如果是方法内定义的,一定是线程安全的,因为每个方法栈是线程私有的;如果是类的静态成员变量,则不是线程安全的,因为线程共享栈区,不共享堆区和全局区。
用 Volatile 虽然能保证可见性,但是不能保证原子性,如果想要保证多线程下的安全性,可以使用原子变量(AtomicInteger)、synchronized 和 Lock锁实现。AtomicInteger 和各种 Lock 都可以确保线程安全,AtomicInteger 的效率高是因为它的互斥区非常小,而 Lock 的互斥区是拿到锁到放锁之间的区域,至少三条指令。
Java 线程同步的几种方法
- synchronized
- wait() 和 notify()
- volatile
- reentrantlock
- ThreadLocal 局部变量
- 使用阻塞队列BlockingQueue
- 使用原子变量 AtomicInteger