临界区

临界资源:一次仅允许一个进程使用的资源成为临界资源

临界区:访问临界资源的代码块

竞态条件:多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件

一个程序运行多个线程是没有问题,多个线程读共享资源也没有问题,在多个线程对共享资源读写操作时发生指令交错,就会出现问题

为了避免临界区的竞态条件发生(解决线程安全问题):

  • 阻塞式的解决方案:synchronized,lock
  • 非阻塞式的解决方案:原子变量

管程(monitor):由局部于自己的若干公共变量和所有访问这些公共变量的过程所组成的软件模块,保证同一时刻只有一个进程在管程内活动,即管程内定义的操作在同一时刻只被一个进程调用(由编译器实现)

synchronized:对象锁,保证了临界区内代码的原子性,采用互斥的方式让同一时刻至多只有一个线程能持有对象锁,其它线程获取这个对象锁时会阻塞,保证拥有锁的线程可以安全的执行临界区内的代码,不用担心线程上下文切换

互斥和同步都可以采用 synchronized 关键字来完成,区别:

  • 互斥是保证临界区的竞态条件发生,同一时刻只能有一个线程执行临界区代码
  • 同步是由于线程执行的先后、顺序不同、需要一个线程等待其它线程运行到某个点

性能:

  • 线程安全,性能差
  • 线程不安全性能好,假如开发中不会存在多线程安全问题,建议使用线程不安全的设计类

syn-ed

使用锁

同步块

锁对象:理论上可以是任意的唯一对象

synchronized 是可重入、不公平的重量级锁

原则上:

  • 锁对象建议使用共享资源
  • 在实例方法中使用 this 作为锁对象,锁住的 this 正好是共享资源
  • 在静态方法中使用类名 .class 字节码作为锁对象,因为静态成员属于类,被所有实例对象共享,所以需要锁住类

同步代码块格式:

1
2
3
synchronized(锁对象){
// 访问共享资源的核心代码
}

实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class demo {
static int counter = 0;
//static修饰,则元素是属于类本身的,不属于对象 ,与类一起加载一次,只有一个
static final Object room = new Object();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
synchronized (room) {
counter++;
}
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
synchronized (room) {
counter--;
}
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(counter);
}
}

同步方法

把出现线程安全问题的核心方法锁起来,每次只能一个线程进入访问

synchronized 修饰的方法的不具备继承性,所以子类是线程不安全的,如果子类的方法也被 synchronized 修饰,两个锁对象其实是一把锁,而且是子类对象作为锁

用法:直接给方法加上一个修饰符 synchronized

1
2
3
4
5
6
7
8
//同步方法
修饰符 synchronized 返回值类型 方法名(方法参数) {
方法体;
}
//同步静态方法
修饰符 static synchronized 返回值类型 方法名(方法参数) {
方法体;
}

同步方法底层也是有锁对象的:

  • 如果方法是实例方法:同步方法默认用 this 作为的锁对象

    1
    2
    3
    4
    public synchronized void test() {} //等价于
    public void test() {
    synchronized(this) {}
    }
  • 如果方法是静态方法:同步方法默认用类名 .class 作为的锁对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    class Test{
    public synchronized static void test() {}
    }
    //等价于
    class Test{
    public void test() {
    synchronized(Test.class) {}
    }
    }

线程八锁

线程八锁就是考察 synchronized 锁住的是哪个对象,直接百度搜索相关的实例

说明:主要关注锁住的对象是不是同一个

  • 锁住类对象,所有类的实例的方法都是安全的,类的所有实例都相当于同一把锁
  • 锁住 this 对象,只有在当前实例对象的线程内是安全的,如果有多个实例就不安全

线程不安全:因为锁住的不是同一个对象,线程 1 调用 a 方法锁住的类对象,线程 2 调用 b 方法锁住的 n2 对象,不是同一个对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Number{
public static synchronized void a(){
Thread.sleep(1000);
System.out.println("1");
}
public synchronized void b() {
System.out.println("2");
}
}
public static void main(String[] args) {
Number n1 = new Number();
Number n2 = new Number();
new Thread(()->{ n1.a(); }).start();
new Thread(()->{ n2.b(); }).start();
}

线程安全:因为 n1 调用 a() 方法,锁住的是类对象,n2 调用 b() 方法,锁住的也是类对象,所以线程安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Number{
public static synchronized void a(){
Thread.sleep(1000);
System.out.println("1");
}
public static synchronized void b() {
System.out.println("2");
}
}
public static void main(String[] args) {
Number n1 = new Number();
Number n2 = new Number();
new Thread(()->{ n1.a(); }).start();
new Thread(()->{ n2.b(); }).start();
}

锁原理

Monitor

Monitor 被翻译为监视器或管程

每个 Java 对象都可以关联一个 Monitor 对象,Monitor 也是 class,其实例存储在堆中,如果使用 synchronized 给对象上锁(重量级)之后,该对象头的 Mark Word 中就被设置指向 Monitor 对象的指针,这就是重量级锁

  • Mark Word 结构:最后两位是锁标志位

  • 64 位虚拟机 Mark Word:

工作流程:

  • 开始时 Monitor 中 Owner 为 null
  • 当 Thread-2 执行 synchronized(obj) 就会将 Monitor 的所有者 Owner 置为 Thread-2,Monitor 中只能有一个 Owner,obj 对象的 Mark Word 指向 Monitor,把对象原有的 MarkWord 存入线程栈中的锁记录中(轻量级锁部分详解)
  • 在 Thread-2 上锁的过程,Thread-3、Thread-4、Thread-5 也执行 synchronized(obj),就会进入 EntryList BLOCKED(双向链表)
  • Thread-2 执行完同步代码块的内容,根据 obj 对象头中 Monitor 地址寻找,设置 Owner 为空,把线程栈的锁记录中的对象头的值设置回 MarkWord
  • 唤醒 EntryList 中等待的线程来竞争锁,竞争是非公平的,如果这时有新的线程想要获取锁,可能直接就抢占到了,阻塞队列的线程就会继续阻塞
  • WaitSet 中的 Thread-0,是以前获得过锁,但条件不满足进入 WAITING 状态的线程(wait-notify 机制)

注意:

  • synchronized 必须是进入同一个对象的 Monitor 才有上述的效果
  • 不加 synchronized 的对象不会关联监视器,不遵从以上规则

字节码

代码:

1
2
3
4
5
6
public static void main(String[] args) {
Object lock = new Object();
synchronized (lock) {
System.out.println("ok");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
0: 	new				#2		// new Object
3: dup
4: invokespecial #1 // invokespecial <init>:()V,非虚方法
7: astore_1 // lock引用 -> lock
8: aload_1 // lock (synchronized开始)
9: dup // 一份用来初始化,一份用来引用
10: astore_2 // lock引用 -> slot 2
11: monitorenter // 【将 lock对象 MarkWord 置为 Monitor 指针】
12: getstatic #3 // System.out
15: ldc #4 // "ok"
17: invokevirtual #5 // invokevirtual println:(Ljava/lang/String;)V
20: aload_2 // slot 2(lock引用)
21: monitorexit // 【将 lock对象 MarkWord 重置, 唤醒 EntryList】
22: goto 30
25: astore_3 // any -> slot 3
26: aload_2 // slot 2(lock引用)
27: monitorexit // 【将 lock对象 MarkWord 重置, 唤醒 EntryList】
28: aload_3
29: athrow
30: return
Exception table:
from to target type
12 22 25 any
25 28 25 any
LineNumberTable: ...
LocalVariableTable:
Start Length Slot Name Signature
0 31 0 args [Ljava/lang/String;
8 23 1 lock Ljava/lang/Object;

说明:

  • 通过异常 try-catch 机制,确保一定会被解锁
  • 方法级别的 synchronized 不会在字节码指令中有所体现

锁升级

升级过程

synchronized 是可重入、不公平的重量级锁,所以可以对其进行优化

1
无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁	// 随着竞争的增加,只能锁升级,不能降级


偏向锁

偏向锁的思想是偏向于让第一个获取锁对象的线程,这个线程之后重新获取该锁不再需要同步操作:

  • 当锁对象第一次被线程获得的时候进入偏向状态,标记为 101,同时使用 CAS 操作将线程 ID 记录到 Mark Word。如果 CAS 操作成功,这个线程以后进入这个锁相关的同步块,查看这个线程 ID 是自己的就表示没有竞争,就不需要再进行任何同步操作

  • 当有另外一个线程去尝试获取这个锁对象时,偏向状态就宣告结束,此时撤销偏向(Revoke Bias)后恢复到未锁定或轻量级锁状态

一个对象创建时:

  • 如果开启了偏向锁(默认开启),那么对象创建后,MarkWord 值为 0x05 即最后 3 位为 101,thread、epoch、age 都为 0

  • 偏向锁是默认是延迟的,不会在程序启动时立即生效,如果想避免延迟,可以加 VM 参数 -XX:BiasedLockingStartupDelay=0 来禁用延迟。JDK 8 延迟 4s 开启偏向锁原因:在刚开始执行代码时,会有好多线程来抢锁,如果开偏向锁效率反而降低

  • 当一个对象已经计算过 hashCode,就再也无法进入偏向状态了

  • 添加 VM 参数 -XX:-UseBiasedLocking 禁用偏向锁

撤销偏向锁的状态:

  • 调用对象的 hashCode:偏向锁的对象 MarkWord 中存储的是线程 id,调用 hashCode 导致偏向锁被撤销
  • 当有其它线程使用偏向锁对象时,会将偏向锁升级为轻量级锁
  • 调用 wait/notify,需要申请 Monitor,进入 WaitSet

批量撤销:如果对象被多个线程访问,但没有竞争,这时偏向了线程 T1 的对象仍有机会重新偏向 T2,重偏向会重置对象的 Thread ID

  • 批量重偏向:当撤销偏向锁阈值超过 20 次后,JVM 会觉得是不是偏向错了,于是在给这些对象加锁时重新偏向至加锁线程

  • 批量撤销:当撤销偏向锁阈值超过 40 次后,JVM 会觉得自己确实偏向错了,根本就不该偏向,于是整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的


轻量级锁

一个对象有多个线程要加锁,但加锁的时间是错开的(没有竞争),可以使用轻量级锁来优化,轻量级锁对使用者是透明的(不可见)

可重入锁:线程可以进入任何一个它已经拥有的锁所同步着的代码块,可重入锁最大的作用是避免死锁

轻量级锁在没有竞争时(锁重入时),每次重入仍然需要执行 CAS 操作,Java 6 才引入的偏向锁来优化

锁重入实例:

1
2
3
4
5
6
7
8
9
10
11
12
static final Object obj = new Object();
public static void method1() {
synchronized( obj ) {
// 同步块 A
method2();
}
}
public static void method2() {
synchronized( obj ) {
// 同步块 B
}
}
  • 创建锁记录(Lock Record)对象,每个线程的栈帧都会包含一个锁记录的结构,存储锁定对象的 Mark Word

  • 让锁记录中 Object reference 指向锁住的对象,并尝试用 CAS 替换 Object 的 Mark Word,将 Mark Word 的值存入锁记录

  • 如果 CAS 替换成功,对象头中存储了锁记录地址和状态 00(轻量级锁) ,表示由该线程给对象加锁

  • 如果 CAS 失败,有两种情况:

    • 如果是其它线程已经持有了该 Object 的轻量级锁,这时表明有竞争,进入锁膨胀过程
    • 如果是线程自己执行了 synchronized 锁重入,就添加一条 Lock Record 作为重入的计数

  • 当退出 synchronized 代码块(解锁时)

    • 如果有取值为 null 的锁记录,表示有重入,这时重置锁记录,表示重入计数减 1
    • 如果锁记录的值不为 null,这时使用 CAS 将 Mark Word 的值恢复给对象头
      • 成功,则解锁成功
      • 失败,说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程

锁膨胀

在尝试加轻量级锁的过程中,CAS 操作无法成功,可能是其它线程为此对象加上了轻量级锁(有竞争),这时需要进行锁膨胀,将轻量级锁变为重量级锁

  • 当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁

  • Thread-1 加轻量级锁失败,进入锁膨胀流程:为 Object 对象申请 Monitor 锁,通过 Object 对象头获取到持锁线程,将 Monitor 的 Owner 置为 Thread-0,将 Object 的对象头指向重量级锁地址,然后自己进入 Monitor 的 EntryList BLOCKED

  • 当 Thread-0 退出同步块解锁时,使用 CAS 将 Mark Word 的值恢复给对象头失败,这时进入重量级解锁流程,即按照 Monitor 地址找到 Monitor 对象,设置 Owner 为 null,唤醒 EntryList 中 BLOCKED 线程


锁优化

自旋锁

重量级锁竞争时,尝试获取锁的线程不会立即阻塞,可以使用自旋(默认 10 次)来进行优化,采用循环的方式去尝试获取锁

注意:

  • 自旋占用 CPU 时间,单核 CPU 自旋就是浪费时间,因为同一时刻只能运行一个线程,多核 CPU 自旋才能发挥优势
  • 自旋失败的线程会进入阻塞状态

优点:不会进入阻塞状态,减少线程上下文切换的消耗

缺点:当自旋的线程越来越多时,会不断的消耗 CPU 资源

自旋锁情况:

  • 自旋成功的情况:

    <img src="https://seazean.oss-cn-beijing.aliyuncs.com/img/Java/JUC-自旋成功.png" style="zoom: 80%;" />
    
  • 自旋失败的情况:

自旋锁说明:

  • 在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋,比较智能
  • Java 7 之后不能控制是否开启自旋功能,由 JVM 控制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//手写自旋锁
public class SpinLock {
// 泛型装的是Thread,原子引用线程
AtomicReference<Thread> atomicReference = new AtomicReference<>();

public void lock() {
Thread thread = Thread.currentThread();
System.out.println(thread.getName() + " come in");

//开始自旋,期望值为null,更新值是当前线程
while (!atomicReference.compareAndSet(null, thread)) {
Thread.sleep(1000);
System.out.println(thread.getName() + " 正在自旋");
}
System.out.println(thread.getName() + " 自旋成功");
}

public void unlock() {
Thread thread = Thread.currentThread();

//线程使用完锁把引用变为null
atomicReference.compareAndSet(thread, null);
System.out.println(thread.getName() + " invoke unlock");
}

public static void main(String[] args) throws InterruptedException {
SpinLock lock = new SpinLock();
new Thread(() -> {
//占有锁
lock.lock();
Thread.sleep(10000);

//释放锁
lock.unlock();
},"t1").start();

// 让main线程暂停1秒,使得t1线程,先执行
Thread.sleep(1000);

new Thread(() -> {
lock.lock();
lock.unlock();
},"t2").start();
}
}

锁消除

锁消除是指对于被检测出不可能存在竞争的共享数据的锁进行消除,这是 JVM 即时编译器的优化

锁消除主要是通过逃逸分析来支持,如果堆上的共享数据不可能逃逸出去被其它线程访问到,那么就可以把它们当成私有数据对待,也就可以将它们的锁进行消除(同步消除:JVM 逃逸分析)


锁粗化

对相同对象多次加锁,导致线程发生多次重入,频繁的加锁操作就会导致性能损耗,可以使用锁粗化方式优化

如果虚拟机探测到一串的操作都对同一个对象加锁,将会把加锁的范围扩展(粗化)到整个操作序列的外部

  • 一些看起来没有加锁的代码,其实隐式的加了很多锁:

    1
    2
    3
    public static String concatString(String s1, String s2, String s3) {
    return s1 + s2 + s3;
    }
  • String 是一个不可变的类,编译器会对 String 的拼接自动优化。在 JDK 1.5 之前,转化为 StringBuffer 对象的连续 append() 操作,每个 append() 方法中都有一个同步块

    1
    2
    3
    4
    5
    6
    7
    public static String concatString(String s1, String s2, String s3) {
    StringBuffer sb = new StringBuffer();
    sb.append(s1);
    sb.append(s2);
    sb.append(s3);
    return sb.toString();
    }

扩展到第一个 append() 操作之前直至最后一个 append() 操作之后,只需要加锁一次就可以


多把锁

多把不相干的锁:一间大屋子有两个功能睡觉、学习,互不相干。现在一人要学习,一人要睡觉,如果只用一间屋子(一个对象锁)的话,那么并发度很低

将锁的粒度细分:

  • 好处,是可以增强并发度
  • 坏处,如果一个线程需要同时获得多把锁,就容易发生死锁

解决方法:准备多个对象锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) {
BigRoom bigRoom = new BigRoom();
new Thread(() -> { bigRoom.study(); }).start();
new Thread(() -> { bigRoom.sleep(); }).start();
}
class BigRoom {
private final Object studyRoom = new Object();
private final Object sleepRoom = new Object();

public void sleep() throws InterruptedException {
synchronized (sleepRoom) {
System.out.println("sleeping 2 小时");
Thread.sleep(2000);
}
}

public void study() throws InterruptedException {
synchronized (studyRoom) {
System.out.println("study 1 小时");
Thread.sleep(1000);
}
}
}

活跃性

死锁
形成

死锁:多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放,由于线程被无限期地阻塞,因此程序不可能正常终止

Java 死锁产生的四个必要条件:

  1. 互斥条件,即当资源被一个线程使用(占有)时,别的线程不能使用
  2. 不可剥夺条件,资源请求者不能强制从资源占有者手中夺取资源,资源只能由资源占有者主动释放
  3. 请求和保持条件,即当资源请求者在请求其他的资源的同时保持对原有资源的占有
  4. 循环等待条件,即存在一个等待循环队列:p1 要 p2 的资源,p2 要 p1 的资源,形成了一个等待环路

四个条件都成立的时候,便形成死锁。死锁情况下打破上述任何一个条件,便可让死锁消失

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Dead {
public static Object resources1 = new Object();
public static Object resources2 = new Object();
public static void main(String[] args) {
new Thread(() -> {
// 线程1:占用资源1 ,请求资源2
synchronized(resources1){
System.out.println("线程1已经占用了资源1,开始请求资源2");
Thread.sleep(2000);//休息两秒,防止线程1直接运行完成。
//2秒内线程2肯定可以锁住资源2
synchronized (resources2){
System.out.println("线程1已经占用了资源2");
}
}).start();
new Thread(() -> {
// 线程2:占用资源2 ,请求资源1
synchronized(resources2){
System.out.println("线程2已经占用了资源2,开始请求资源1");
Thread.sleep(2000);
synchronized (resources1){
System.out.println("线程2已经占用了资源1");
}
}}
}).start();
}
}

定位

定位死锁的方法:

  • 使用 jps 定位进程 id,再用 jstack id 定位死锁,找到死锁的线程去查看源码,解决优化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    "Thread-1" #12 prio=5 os_prio=0 tid=0x000000001eb69000 nid=0xd40 waiting formonitor entry [0x000000001f54f000]
    java.lang.Thread.State: BLOCKED (on object monitor)
    #省略
    "Thread-1" #12 prio=5 os_prio=0 tid=0x000000001eb69000 nid=0xd40 waiting for monitor entry [0x000000001f54f000]
    java.lang.Thread.State: BLOCKED (on object monitor)
    #省略

    Found one Java-level deadlock:
    ===================================================
    "Thread-1":
    waiting to lock monitor 0x000000000361d378 (object 0x000000076b5bf1c0, a java.lang.Object),
    which is held by "Thread-0"
    "Thread-0":
    waiting to lock monitor 0x000000000361e768 (object 0x000000076b5bf1d0, a java.lang.Object),
    which is held by "Thread-1"

    Java stack information for the threads listed above:
    ===================================================
    "Thread-1":
    at thread.TestDeadLock.lambda$main$1(TestDeadLock.java:28)
    - waiting to lock <0x000000076b5bf1c0> (a java.lang.Object)
    - locked <0x000000076b5bf1d0> (a java.lang.Object)
    at thread.TestDeadLock$$Lambda$2/883049899.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:745)
    "Thread-0":
    at thread.TestDeadLock.lambda$main$0(TestDeadLock.java:15)
    - waiting to lock <0x000000076b5bf1d0> (a java.lang.Object)
    - locked <0x000000076b5bf1c0> (a java.lang.Object)
    at thread.TestDeadLock$$Lambda$1/495053715
  • Linux 下可以通过 top 先定位到 CPU 占用高的 Java 进程,再利用 top -Hp 进程id 来定位是哪个线程,最后再用 jstack 的输出来看各个线程栈

  • 避免死锁:避免死锁要注意加锁顺序

  • 可以使用 jconsole 工具,在 jdk\bin 目录下


活锁

活锁:指的是任务或者执行者没有被阻塞,由于某些条件没有满足,导致一直重复尝试—失败—尝试—失败的过程

两个线程互相改变对方的结束条件,最后谁也无法结束:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class TestLiveLock {
static volatile int count = 10;
static final Object lock = new Object();
public static void main(String[] args) {
new Thread(() -> {
// 期望减到 0 退出循环
while (count > 0) {
Thread.sleep(200);
count--;
System.out.println("线程一count:" + count);
}
}, "t1").start();
new Thread(() -> {
// 期望超过 20 退出循环
while (count < 20) {
Thread.sleep(200);
count++;
System.out.println("线程二count:"+ count);
}
}, "t2").start();
}
}

饥饿

饥饿:一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束


wait-ify

基本使用

需要获取对象锁后才可以调用 锁对象.wait(),notify 随机唤醒一个线程,notifyAll 唤醒所有线程去竞争 CPU

Object 类 API:

1
2
3
4
public final void notify():唤醒正在等待对象监视器的单个线程。
public final void notifyAll():唤醒正在等待对象监视器的所有线程。
public final void wait():导致当前线程等待,直到另一个线程调用该对象的notify()方法或 notifyAll()方法。
public final native void wait(long timeout):有时限的等待, 到n毫秒后结束等待,或是被唤醒

说明:wait 是挂起线程,需要唤醒的都是挂起操作,阻塞线程可以自己去争抢锁,挂起的线程需要唤醒后去争抢锁

对比 sleep():

  • 原理不同:sleep() 方法是属于 Thread 类,是线程用来控制自身流程的,使此线程暂停执行一段时间而把执行机会让给其他线程;wait() 方法属于 Object 类,用于线程间通信
  • 锁的处理机制不同:调用 sleep() 方法的过程中,线程不会释放对象锁,当调用 wait() 方法的时候,线程会放弃对象锁,进入等待此对象的等待锁定池(不释放锁其他线程怎么抢占到锁执行唤醒操作),但是都会释放 CPU
  • 使用区域不同:wait() 方法必须放在同步控制方法和同步代码块(先获取锁)中使用,sleep() 方法则可以放在任何地方使用

底层原理:

  • Owner 线程发现条件不满足,调用 wait 方法,即可进入 WaitSet 变为 WAITING 状态
  • BLOCKED 和 WAITING 的线程都处于阻塞状态,不占用 CPU 时间片
  • BLOCKED 线程会在 Owner 线程释放锁时唤醒
  • WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,唤醒后并不意味者立刻获得锁,需要进入 EntryList 重新竞争


代码优化

虚假唤醒:notify 只能随机唤醒一个 WaitSet 中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线程

解决方法:采用 notifyAll

notifyAll 仅解决某个线程的唤醒问题,使用 if + wait 判断仅有一次机会,一旦条件不成立,无法重新判断

解决方法:用 while + wait,当条件不成立,再次 wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Slf4j(topic = "c.demo")
public class demo {
static final Object room = new Object();
static boolean hasCigarette = false; //有没有烟
static boolean hasTakeout = false;

public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
synchronized (room) {
log.debug("有烟没?[{}]", hasCigarette);
while (!hasCigarette) {//while防止虚假唤醒
log.debug("没烟,先歇会!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有烟没?[{}]", hasCigarette);
if (hasCigarette) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小南").start();

new Thread(() -> {
synchronized (room) {
Thread thread = Thread.currentThread();
log.debug("外卖送到没?[{}]", hasTakeout);
if (!hasTakeout) {
log.debug("没外卖,先歇会!");
try {
room.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("外卖送到没?[{}]", hasTakeout);
if (hasTakeout) {
log.debug("可以开始干活了");
} else {
log.debug("没干成活...");
}
}
}, "小女").start();


Thread.sleep(1000);
new Thread(() -> {
// 这里能不能加 synchronized (room)?
synchronized (room) {
hasTakeout = true;
//log.debug("烟到了噢!");
log.debug("外卖到了噢!");
room.notifyAll();
}
}, "送外卖的").start();
}
}

park-un

LockSupport 是用来创建锁和其他同步类的线程原语

LockSupport 类方法:

  • LockSupport.park():暂停当前线程,挂起原语
  • LockSupport.unpark(暂停的线程对象):恢复某个线程的运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
System.out.println("start..."); //1
Thread.sleep(1000);// Thread.sleep(3000)
// 先 park 再 unpark 和先 unpark 再 park 效果一样,都会直接恢复线程的运行
System.out.println("park..."); //2
LockSupport.park();
System.out.println("resume...");//4
},"t1");
t1.start();
Thread.sleep(2000);
System.out.println("unpark..."); //3
LockSupport.unpark(t1);
}

LockSupport 出现就是为了增强 wait & notify 的功能:

  • wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park、unpark 不需要
  • park & unpark 以线程为单位来阻塞和唤醒线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程
  • park & unpark 可以先 unpark,而 wait & notify 不能先 notify。类比生产消费,先消费发现有产品就消费,没有就等待;先生产就直接产生商品,然后线程直接消费
  • wait 会释放锁资源进入等待队列,park 不会释放锁资源,只负责阻塞当前线程,会释放 CPU

原理:类似生产者消费者

  • 先 park:
    1. 当前线程调用 Unsafe.park() 方法
    2. 检查 _counter ,本情况为 0,这时获得 _mutex 互斥锁
    3. 线程进入 _cond 条件变量挂起
    4. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
    5. 唤醒 _cond 条件变量中的 Thread_0,Thread_0 恢复运行,设置 _counter 为 0

  • 先 unpark:

    1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
    2. 当前线程调用 Unsafe.park() 方法
    3. 检查 _counter ,本情况为 1,这时线程无需挂起,继续运行,设置 _counter 为 0


安全分析

成员变量和静态变量:

  • 如果它们没有共享,则线程安全
  • 如果它们被共享了,根据它们的状态是否能够改变,分两种情况:
    • 如果只有读操作,则线程安全
    • 如果有读写操作,则这段代码是临界区,需要考虑线程安全问题

局部变量:

  • 局部变量是线程安全的
  • 局部变量引用的对象不一定线程安全(逃逸分析):
    • 如果该对象没有逃离方法的作用访问,它是线程安全的(每一个方法有一个栈帧)
    • 如果该对象逃离方法的作用范围,需要考虑线程安全问题(暴露引用)

常见线程安全类:String、Integer、StringBuffer、Random、Vector、Hashtable、java.util.concurrent 包

  • 线程安全的是指,多个线程调用它们同一个实例的某个方法时,是线程安全的

  • 每个方法是原子的,但多个方法的组合不是原子的,只能保证调用的方法内部安全:

    1
    2
    3
    4
    5
    Hashtable table = new Hashtable();
    // 线程1,线程2
    if(table.get("key") == null) {
    table.put("key", value);
    }

无状态类线程安全,就是没有成员变量的类

不可变类线程安全:String、Integer 等都是不可变类,内部的状态不可以改变,所以方法是线程安全

  • replace 等方法底层是新建一个对象,复制过去

    1
    2
    3
    4
    5
    Map<String,Object> map = new HashMap<>();	// 线程不安全
    String S1 = "..."; // 线程安全
    final String S2 = "..."; // 线程安全
    Date D1 = new Date(); // 线程不安全
    final Date D2 = new Date(); // 线程不安全,final让D2引用的对象不能变,但对象的内容可以变

抽象方法如果有参数,被重写后行为不确定可能造成线程不安全,被称之为外星方法:public abstract foo(Student s);


同步模式

保护性暂停

单任务版

Guarded Suspension,用在一个线程等待另一个线程的执行结果

  • 有一个结果需要从一个线程传递到另一个线程,让它们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public static void main(String[] args) {
GuardedObject object = new GuardedObjectV2();
new Thread(() -> {
sleep(1);
object.complete(Arrays.asList("a", "b", "c"));
}).start();

Object response = object.get(2500);
if (response != null) {
log.debug("get response: [{}] lines", ((List<String>) response).size());
} else {
log.debug("can't get response");
}
}

class GuardedObject {
private Object response;
private final Object lock = new Object();

//获取结果
//timeout :最大等待时间
public Object get(long millis) {
synchronized (lock) {
// 1) 记录最初时间
long begin = System.currentTimeMillis();
// 2) 已经经历的时间
long timePassed = 0;
while (response == null) {
// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
long waitTime = millis - timePassed;
log.debug("waitTime: {}", waitTime);
//经历时间超过最大等待时间退出循环
if (waitTime <= 0) {
log.debug("break...");
break;
}
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3) 如果提前被唤醒,这时已经经历的时间假设为 400
timePassed = System.currentTimeMillis() - begin;
log.debug("timePassed: {}, object is null {}",
timePassed, response == null);
}
return response;
}
}

//产生结果
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
log.debug("notify...");
lock.notifyAll();
}
}
}
多任务版

多任务版保护性暂停:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new People().start();
}
Thread.sleep(1000);
for (Integer id : Mailboxes.getIds()) {
new Postman(id, id + "号快递到了").start();
}
}

@Slf4j(topic = "c.People")
class People extends Thread{
@Override
public void run() {
// 收信
GuardedObject guardedObject = Mailboxes.createGuardedObject();
log.debug("开始收信i d:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.debug("收到信id:{},内容:{}", guardedObject.getId(),mail);
}
}

class Postman extends Thread{
private int id;
private String mail;
//构造方法
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.debug("开始送信i d:{},内容:{}", guardedObject.getId(),mail);
guardedObject.complete(mail);
}
}

class Mailboxes {
private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
private static int id = 1;

//产生唯一的id
private static synchronized int generateId() {
return id++;
}

public static GuardedObject getGuardedObject(int id) {
return boxes.remove(id);
}

public static GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(), go);
return go;
}

public static Set<Integer> getIds() {
return boxes.keySet();
}
}
class GuardedObject {
//标识,Guarded Object
private int id;//添加get set方法
}

顺序输出

顺序输出 2 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
while (true) {
//try { Thread.sleep(1000); } catch (InterruptedException e) { }
// 当没有许可时,当前线程暂停运行;有许可时,用掉这个许可,当前线程恢复运行
LockSupport.park();
System.out.println("1");
}
});
Thread t2 = new Thread(() -> {
while (true) {
System.out.println("2");
// 给线程 t1 发放『许可』(多次连续调用 unpark 只会发放一个『许可』)
LockSupport.unpark(t1);
try { Thread.sleep(500); } catch (InterruptedException e) { }
}
});
t1.start();
t2.start();
}

交替输出

连续输出 5 次 abc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class day2_14 {
public static void main(String[] args) throws InterruptedException {
AwaitSignal awaitSignal = new AwaitSignal(5);
Condition a = awaitSignal.newCondition();
Condition b = awaitSignal.newCondition();
Condition c = awaitSignal.newCondition();
new Thread(() -> {
awaitSignal.print("a", a, b);
}).start();
new Thread(() -> {
awaitSignal.print("b", b, c);
}).start();
new Thread(() -> {
awaitSignal.print("c", c, a);
}).start();

Thread.sleep(1000);
awaitSignal.lock();
try {
a.signal();
} finally {
awaitSignal.unlock();
}
}
}

class AwaitSignal extends ReentrantLock {
private int loopNumber;

public AwaitSignal(int loopNumber) {
this.loopNumber = loopNumber;
}
//参数1:打印内容 参数二:条件变量 参数二:唤醒下一个
public void print(String str, Condition condition, Condition next) {
for (int i = 0; i < loopNumber; i++) {
lock();
try {
condition.await();
System.out.print(str);
next.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
unlock();
}
}
}
}

异步模式

传统版

异步模式之生产者/消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class ShareData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

public void increment() throws Exception{
// 同步代码块,加锁
lock.lock();
try {
// 判断 防止虚假唤醒
while(number != 0) {
// 等待不能生产
condition.await();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// 通知 唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void decrement() throws Exception{
// 同步代码块,加锁
lock.lock();
try {
// 判断 防止虚假唤醒
while(number == 0) {
// 等待不能消费
condition.await();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + "\t " + number);
// 通知 唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

public class TraditionalProducerConsumer {
public static void main(String[] args) {
ShareData shareData = new ShareData();
// t1线程,生产
new Thread(() -> {
for (int i = 0; i < 5; i++) {
shareData.increment();
}
}, "t1").start();

// t2线程,消费
new Thread(() -> {
for (int i = 0; i < 5; i++) {
shareData.decrement();
}
}, "t2").start();
}
}

改进版

异步模式之生产者/消费者:

  • 消费队列可以用来平衡生产和消费的线程资源,不需要产生结果和消费结果的线程一一对应
  • 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  • 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  • JDK 中各种阻塞队列,采用的就是这种模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class demo {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue(2);
for (int i = 0; i < 3; i++) {
int id = i;
new Thread(() -> {
queue.put(new Message(id,"值"+id));
}, "生产者" + i).start();
}

new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);
Message message = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"消费者").start();
}
}

//消息队列类,Java间线程之间通信
class MessageQueue {
private LinkedList<Message> list = new LinkedList<>();//消息的队列集合
private int capacity;//队列容量
public MessageQueue(int capacity) {
this.capacity = capacity;
}

//获取消息
public Message take() {
//检查队列是否为空
synchronized (list) {
while (list.isEmpty()) {
try {
sout(Thread.currentThread().getName() + ":队列为空,消费者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//从队列的头部获取消息返回
Message message = list.removeFirst();
sout(Thread.currentThread().getName() + ":已消费消息--" + message);
list.notifyAll();
return message;
}
}

//存入消息
public void put(Message message) {
synchronized (list) {
//检查队列是否满
while (list.size() == capacity) {
try {
sout(Thread.currentThread().getName()+":队列为已满,生产者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//将消息加入队列尾部
list.addLast(message);
sout(Thread.currentThread().getName() + ":已生产消息--" + message);
list.notifyAll();
}
}
}

final class Message {
private int id;
private Object value;
//get set
}

阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) {
ExecutorService consumer = Executors.newFixedThreadPool(1);
ExecutorService producer = Executors.newFixedThreadPool(1);
BlockingQueue<Integer> queue = new SynchronousQueue<>();
producer.submit(() -> {
try {
System.out.println("生产...");
Thread.sleep(1000);
queue.put(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
consumer.submit(() -> {
try {
System.out.println("等待消费...");
Integer result = queue.take();
System.out.println("结果为:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}