无锁
CAS
原理
无锁编程:Lock Free
CAS 的全称是 Compare-And-Swap,是 CPU 并发原语
- CAS 并发原语体现在 Java 语言中就是 sun.misc.Unsafe 类的各个方法,调用 UnSafe 类中的 CAS 方法,JVM 会实现出 CAS 汇编指令,这是一种完全依赖于硬件的功能,实现了原子操作
- CAS 是一种系统原语,原语属于操作系统范畴,是由若干条指令组成 ,用于完成某个功能的一个过程,并且原语的执行必须是连续的,执行过程中不允许被中断,所以 CAS 是一条 CPU 的原子指令,不会造成数据不一致的问题,是线程安全的
底层原理:CAS 的底层是 lock cmpxchg
指令(X86 架构),在单核和多核 CPU 下都能够保证比较交换的原子性
程序是在单核处理器上运行,会省略 lock 前缀,单处理器自身会维护处理器内的顺序一致性,不需要 lock 前缀的内存屏障效果
程序是在多核处理器上运行,会为 cmpxchg 指令加上 lock 前缀。当某个核执行到带 lock 的指令时,CPU 会执行总线锁定或缓存锁定,将修改的变量写入到主存,这个过程不会被线程的调度机制所打断,保证了多个线程对内存操作的原子性
作用:比较当前工作内存中的值和主物理内存中的值,如果相同则执行规定操作,否则继续比较直到主内存和工作内存的值一致为止
CAS 特点:
- CAS 体现的是无锁并发、无阻塞并发,线程不会陷入阻塞,线程不需要频繁切换状态(上下文切换,系统调用)
- CAS 是基于乐观锁的思想
CAS 缺点:
- 执行的是循环操作,如果比较不成功一直在循环,最差的情况某个线程一直取到的值和预期值都不一样,就会无限循环导致饥饿,使用 CAS 线程数不要超过 CPU 的核心数,采用分段 CAS 和自动迁移机制
- 只能保证一个共享变量的原子操作
- 对于一个共享变量执行操作时,可以通过循环 CAS 的方式来保证原子操作
- 对于多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候只能用锁来保证原子性
- 引出来 ABA 问题
乐观锁
CAS 与 synchronized 总结:
- synchronized 是从悲观的角度出发:总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程),因此 synchronized 也称之为悲观锁,ReentrantLock 也是一种悲观锁,性能较差
- CAS 是从乐观的角度出发:总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据。如果别人修改过,则获取现在最新的值,如果别人没修改过,直接修改共享数据的值,CAS 这种机制也称之为乐观锁,综合性能较好
Atomic
常用API
常见原子类:AtomicInteger、AtomicBoolean、AtomicLong
构造方法:
public AtomicInteger()
:初始化一个默认值为 0 的原子型 Integerpublic AtomicInteger(int initialValue)
:初始化一个指定值的原子型 Integer
常用API:
方法 | 作用 |
---|---|
public final int get() | 获取 AtomicInteger 的值 |
public final int getAndIncrement() | 以原子方式将当前值加 1,返回的是自增前的值 |
public final int incrementAndGet() | 以原子方式将当前值加 1,返回的是自增后的值 |
public final int getAndSet(int value) | 以原子方式设置为 newValue 的值,返回旧值 |
public final int addAndGet(int data) | 以原子方式将输入的数值与实例中的值相加并返回 实例:AtomicInteger 里的 value |
原理分析
AtomicInteger 原理:自旋锁 + CAS 算法
CAS 算法:有 3 个操作数(内存值 V, 旧的预期值 A,要修改的值 B)
- 当旧的预期值 A == 内存值 V 此时可以修改,将 V 改为 B
- 当旧的预期值 A != 内存值 V 此时不能修改,并重新获取现在的最新值,重新获取的动作就是自旋
分析 getAndSet 方法:
AtomicInteger:
1
2
3
4
5
6
7public final int getAndSet(int newValue) {
/**
* this: 当前对象
* valueOffset: 内存偏移量,内存地址
*/
return unsafe.getAndSetInt(this, valueOffset, newValue);
}valueOffset:偏移量表示该变量值相对于当前对象地址的偏移,Unsafe 就是根据内存偏移地址获取数据
1
2
3
4valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
//调用本地方法 -->
public native long objectFieldOffset(Field var1);unsafe 类:
1
2
3
4
5
6
7
8
9
10// val1: AtomicInteger对象本身,var2: 该对象值得引用地址,var4: 需要变动的数
public final int getAndSetInt(Object var1, long var2, int var4) {
int var5;
do {
// var5: 用 var1 和 var2 找到的内存中的真实值
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var4));
return var5;
}var5:从主内存中拷贝到工作内存中的值(每次都要从主内存拿到最新的值到本地内存),然后执行
compareAndSwapInt()
再和主内存的值进行比较,假设方法返回 false,那么就一直执行 while 方法,直到期望的值和真实值一样,修改数据变量 value 用 volatile 修饰,保证了多线程之间的内存可见性,避免线程从工作缓存中获取失效的变量
1
private volatile int value
CAS 必须借助 volatile 才能读取到共享变量的最新值来实现比较并交换的效果
分析 getAndUpdate 方法:
getAndUpdate:
1
2
3
4
5
6
7
8public final int getAndUpdate(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get(); //当前值,cas的期望值
next = updateFunction.applyAsInt(prev);//期望值更新到该值
} while (!compareAndSet(prev, next));//自旋
return prev;
}函数式接口:可以自定义操作逻辑
1
2AtomicInteger a = new AtomicInteger();
a.getAndUpdate(i -> i + 10);compareAndSet:
1
2
3
4
5
6
7
8
9public final boolean compareAndSet(int expect, int update) {
/**
* this: 当前对象
* valueOffset: 内存偏移量,内存地址
* expect: 期望的值
* update: 更新的值
*/
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
原子引用
原子引用:对 Object 进行原子操作,提供一种读和写都是原子性的对象引用变量
原子引用类:AtomicReference、AtomicStampedReference、AtomicMarkableReference
AtomicReference 类:
构造方法:
AtomicReference<T> atomicReference = new AtomicReference<T>()
常用 API:
public final boolean compareAndSet(V expectedValue, V newValue)
:CAS 操作public final void set(V newValue)
:将值设置为 newValuepublic final V get()
:返回当前值
1 | public class AtomicReferenceDemo { |
原子数组
原子数组类:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
AtomicIntegerArray 类方法:
1 | /** |
原子更新器
原子更新器类:AtomicReferenceFieldUpdater、AtomicIntegerFieldUpdater、AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常 IllegalArgumentException: Must be volatile type
常用 API:
static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> c, String fieldName)
:构造方法abstract boolean compareAndSet(T obj, int expect, int update)
:CAS
1 | public class UpdateDemo { |
原子累加器
原子累加器类:LongAdder、DoubleAdder、LongAccumulator、DoubleAccumulator
LongAdder 和 LongAccumulator 区别:
相同点:
- LongAddr 与 LongAccumulator 类都是使用非阻塞算法 CAS 实现的
- LongAddr 类是 LongAccumulator 类的一个特例,只是 LongAccumulator 提供了更强大的功能,可以自定义累加规则,当accumulatorFunction 为 null 时就等价于 LongAddr
不同点:
调用 casBase 时,LongAccumulator 使用 function.applyAsLong(b = base, x) 来计算,LongAddr 使用 casBase(b = base, b + x)
LongAccumulator 类功能更加强大,构造方法参数中
- accumulatorFunction 是一个双目运算器接口,可以指定累加规则,比如累加或者相乘,其根据输入的两个参数返回一个计算值,LongAdder 内置累加规则
- identity 则是 LongAccumulator 累加器的初始值,LongAccumulator 可以为累加器提供非0的初始值,而 LongAdder 只能提供默认的 0
Adder
优化机制
LongAdder 是 Java8 提供的类,跟 AtomicLong 有相同的效果,但对 CAS 机制进行了优化,尝试使用分段 CAS 以及自动分段迁移的方式来大幅度提升多线程高并发执行 CAS 操作的性能
CAS 底层实现是在一个循环中不断地尝试修改目标值,直到修改成功。如果竞争不激烈修改成功率很高,否则失败率很高,失败后这些重复的原子性操作会耗费性能(导致大量线程空循环,自旋转)
优化核心思想:数据分离,将 AtomicLong 的单点的更新压力分担到各个节点,空间换时间,在低并发的时候直接更新,可以保障和 AtomicLong 的性能基本一致,而在高并发的时候通过分散减少竞争,提高了性能
分段 CAS 机制:
- 在发生竞争时,创建 Cell 数组用于将不同线程的操作离散(通过 hash 等算法映射)到不同的节点上
- 设置多个累加单元(会根据需要扩容,最大为 CPU 核数),Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1] 等,最后将结果汇总
- 在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能
自动分段迁移机制:某个 Cell 的 value 执行 CAS 失败,就会自动寻找另一个 Cell 分段内的 value 值进行 CAS 操作
伪共享
Cell 为累加单元:数组访问索引是通过 Thread 里的 threadLocalRandomProbe 域取模实现的,这个域是 ThreadLocalRandom 更新的
1 | // Striped64.Cell |
Cell 是数组形式,在内存中是连续存储的,64 位系统中,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),每一个 cache line 为 64 字节,因此缓存行可以存下 2 个的 Cell 对象,当 Core-0 要修改 Cell[0]、Core-1 要修改 Cell[1],无论谁修改成功都会导致当前缓存行失效,从而导致对方的数据失效,需要重新去主存获取,影响效率
@sun.misc.Contended:防止缓存行伪共享,在使用此注解的对象或字段的前后各增加 128 字节大小的 padding,使用 2 倍于大多数硬件缓存行让 CPU 将对象预读至缓存时占用不同的缓存行,这样就不会造成对方缓存行的失效
源码解析
Striped64 类成员属性:
1 | // 表示当前计算机CPU数量 |
工作流程:
cells 占用内存是相对比较大的,是惰性加载的,在无竞争或者其他线程正在初始化 cells 数组的情况下,直接更新 base 域
在第一次发生竞争时(casBase 失败)会创建一个大小为 2 的 cells 数组,将当前累加的值包装为 Cell 对象,放入映射的槽位上
分段累加的过程中,如果当前线程对应的 cells 槽位为空,就会新建 Cell 填充,如果出现竞争,就会重新计算线程对应的槽位,继续自旋尝试修改
分段迁移后还出现竞争就会扩容 cells 数组长度为原来的两倍,然后 rehash,数组长度总是 2 的 n 次幂,默认最大为 CPU 核数,但是可以超过,如果核数是 6 核,数组最长是 8
方法分析:
LongAdder#add:累加方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public void add(long x) {
// as 为累加单元数组的引用,b 为基础值,v 表示期望值
// m 表示 cells 数组的长度 - 1,a 表示当前线程命中的 cell 单元格
Cell[] as; long b, v; int m; Cell a;
// cells 不为空说明 cells 已经被初始化,线程发生了竞争,去更新对应的 cell 槽位
// 进入 || 后的逻辑去更新 base 域,更新失败表示发生竞争进入条件
if ((as = cells) != null || !casBase(b = base, b + x)) {
// uncontended 为 true 表示 cell 没有竞争
boolean uncontended = true;
// 条件一: true 说明 cells 未初始化,多线程写 base 发生竞争需要进行初始化 cells 数组
// fasle 说明 cells 已经初始化,进行下一个条件寻找自己的 cell 去累加
// 条件二: getProbe() 获取 hash 值,& m 的逻辑和 HashMap 的逻辑相同,保证散列的均匀性
// true 说明当前线程对应下标的 cell 为空,需要创建 cell
// false 说明当前线程对应的 cell 不为空,进行下一个条件【将 x 值累加到对应的 cell 中】
// 条件三: 有取反符号,false 说明 cas 成功,直接返回,true 说明失败,当前线程对应的 cell 有竞争
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
// 【uncontended 在对应的 cell 上累加失败的时候才为 false,其余情况均为 true】
}
}Striped64#longAccumulate:cell 数组创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113// x null false | true
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
int h;
// 当前线程还没有对应的 cell, 需要随机生成一个 hash 值用来将当前线程绑定到 cell
if ((h = getProbe()) == 0) {
// 初始化 probe,获取 hash 值
ThreadLocalRandom.current();
h = getProbe();
// 默认情况下 当前线程肯定是写入到了 cells[0] 位置,不把它当做一次真正的竞争
wasUncontended = true;
}
// 表示【扩容意向】,false 一定不会扩容,true 可能会扩容
boolean collide = false;
//自旋
for (;;) {
// as 表示cells引用,a 表示当前线程命中的 cell,n 表示 cells 数组长度,v 表示 期望值
Cell[] as; Cell a; int n; long v;
// 【CASE1】: 表示 cells 已经初始化了,当前线程应该将数据写入到对应的 cell 中
if ((as = cells) != null && (n = as.length) > 0) {
// CASE1.1: true 表示当前线程对应的索引下标的 Cell 为 null,需要创建 new Cell
if ((a = as[(n - 1) & h]) == null) {
// 判断 cellsBusy 是否被锁
if (cellsBusy == 0) {
// 创建 cell, 初始累加值为 x
Cell r = new Cell(x);
// 加锁
if (cellsBusy == 0 && casCellsBusy()) {
// 创建成功标记,进入【创建 cell 逻辑】
boolean created = false;
try {
Cell[] rs; int m, j;
// 把当前 cells 数组赋值给 rs,并且不为 null
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
// 再次判断防止其它线程初始化过该位置,当前线程再次初始化该位置会造成数据丢失
// 因为这里是线程安全的判断,进行的逻辑不会被其他线程影响
rs[j = (m - 1) & h] == null) {
// 把新创建的 cell 填充至当前位置
rs[j] = r;
created = true; // 表示创建完成
}
} finally {
cellsBusy = 0; // 解锁
}
if (created) // true 表示创建完成,可以推出循环了
break;
continue;
}
}
collide = false;
}
// CASE1.2: 条件成立说明线程对应的 cell 有竞争, 改变线程对应的 cell 来重试 cas
else if (!wasUncontended)
wasUncontended = true;
// CASE 1.3: 当前线程 rehash 过,如果新命中的 cell 不为空,就尝试累加,false 说明新命中也有竞争
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;
// CASE 1.4: cells 长度已经超过了最大长度 CPU 内核的数量或者已经扩容
else if (n >= NCPU || cells != as)
collide = false; // 扩容意向改为false,【表示不能扩容了】
// CASE 1.5: 更改扩容意向,如果 n >= NCPU,这里就永远不会执行到,case1.4 永远先于 1.5 执行
else if (!collide)
collide = true;
// CASE 1.6: 【扩容逻辑】,进行加锁
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 线程安全的检查,防止期间被其他线程扩容了
if (cells == as) {
// 扩容为以前的 2 倍
Cell[] rs = new Cell[n << 1];
// 遍历移动值
for (int i = 0; i < n; ++i)
rs[i] = as[i];
// 把扩容后的引用给 cells
cells = rs;
}
} finally {
cellsBusy = 0; // 解锁
}
collide = false; // 扩容意向改为 false,表示不扩容了
continue;
}
// 重置当前线程 Hash 值,这就是【分段迁移机制】
h = advanceProbe(h);
}
// 【CASE2】: 运行到这说明 cells 还未初始化,as 为null
// 判断是否没有加锁,没有加锁就用 CAS 加锁
// 条件二判断是否其它线程在当前线程给 as 赋值之后修改了 cells,这里不是线程安全的判断
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// 初始化标志,开始 【初始化 cells 数组】
boolean init = false;
try {
// 再次判断 cells == as 防止其它线程已经提前初始化了,当前线程再次初始化导致丢失数据
// 因为这里是【线程安全的,重新检查,经典 DCL】
if (cells == as) {
Cell[] rs = new Cell[2]; // 初始化数组大小为2
rs[h & 1] = new Cell(x); // 填充线程对应的cell
cells = rs;
init = true; // 初始化成功,标记置为 true
}
} finally {
cellsBusy = 0; // 解锁啊
}
if (init)
break; // 初始化成功直接跳出自旋
}
// 【CASE3】: 运行到这说明其他线程在初始化 cells,当前线程将值累加到 base,累加成功直接结束自旋
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
}
}sum:获取最终结果通过 sum 整合,保证最终一致性,不保证强一致性
1
2
3
4
5
6
7
8
9
10
11
12public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
// 遍历 累加
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
ABA
ABA 问题:当进行获取主内存值时,该内存值在写入主内存时已经被修改了 N 次,但是最终又改成原来的值
其他线程先把 A 改成 B 又改回 A,主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况,这时 CAS 虽然成功,但是过程存在问题
构造方法:
public AtomicStampedReference(V initialRef, int initialStamp)
:初始值和初始版本号
常用API:
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp)
:期望引用和期望版本号都一致才进行 CAS 修改数据public void set(V newReference, int newStamp)
:设置值和版本号public V getReference()
:返回引用的值public int getStamp()
:返回当前版本号
1 | public static void main(String[] args) { |
Unsafe
Unsafe 是 CAS 的核心类,由于 Java 无法直接访问底层系统,需要通过本地(Native)方法来访问
Unsafe 类存在 sun.misc 包,其中所有方法都是 native 修饰的,都是直接调用操作系统底层资源执行相应的任务,基于该类可以直接操作特定的内存数据,其内部方法操作类似 C 的指针
模拟实现原子整数:
1 | public static void main(String[] args) { |
final
原理
1 | public class TestFinal { |
字节码:
1 | 0: aload_0 |
final 变量的赋值通过 putfield 指令来完成,在这条指令之后也会加入写屏障,保证在其它线程读到它的值时不会出现为 0 的情况
其他线程访问 final 修饰的变量会复制一份放入栈中,效率更高
不可变
不可变:如果一个对象不能够修改其内部状态(属性),那么就是不可变对象
不可变对象线程安全的,不存在并发修改和可见性问题,是另一种避免竞争的方式
String 类也是不可变的,该类和类中所有属性都是 final 的
类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性
无写入方法(set)确保外部不能对内部属性进行修改
属性用 final 修饰保证了该属性是只读的,不能修改
1
2
3
4
5
6public final class String
implements java.io.Serializable, Comparable<String>, CharSequence {
/** The value is used for character storage. */
private final char value[];
//....
}更改 String 类数据时,会构造新字符串对象,生成新的 char[] value,通过创建副本对象来避免共享的方式称之为保护性拷贝
State
无状态:成员变量保存的数据也可以称为状态信息,无状态就是没有成员变量
Servlet 为了保证其线程安全,一般不为 Servlet 设置成员变量,这种没有任何成员变量的类是线程安全的
Local
基本介绍
ThreadLocal 类用来提供线程内部的局部变量,这种变量在多线程环境下访问(通过 get 和 set 方法访问)时能保证各个线程的变量相对独立于其他线程内的变量,分配在堆内的 TLAB 中
ThreadLocal 实例通常来说都是 private static
类型的,属于一个线程的本地变量,用于关联线程和线程上下文。每个线程都会在 ThreadLocal 中保存一份该线程独有的数据,所以是线程安全的
ThreadLocal 作用:
线程并发:应用在多线程并发的场景下
传递数据:通过 ThreadLocal 实现在同一线程不同函数或组件中传递公共变量,减少传递复杂度
线程隔离:每个线程的变量都是独立的,不会互相影响
对比 synchronized:
synchronized | ThreadLocal | |
---|---|---|
原理 | 同步机制采用以时间换空间的方式,只提供了一份变量,让不同的线程排队访问 | ThreadLocal 采用以空间换时间的方式,为每个线程都提供了一份变量的副本,从而实现同时访问而相不干扰 |
侧重点 | 多个线程之间访问资源的同步 | 多线程中让每个线程之间的数据相互隔离 |
基本使用
常用方法
方法 | 描述 |
---|---|
ThreadLocal<>() | 创建 ThreadLocal 对象 |
protected T initialValue() | 返回当前线程局部变量的初始值 |
public void set( T value) | 设置当前线程绑定的局部变量 |
public T get() | 获取当前线程绑定的局部变量 |
public void remove() | 移除当前线程绑定的局部变量 |
1 | public class MyDemo { |
应用场景
ThreadLocal 适用于下面两种场景:
- 每个线程需要有自己单独的实例
- 实例需要在多个方法中共享,但不希望被多线程共享
ThreadLocal 方案有两个突出的优势:
- 传递数据:保存每个线程绑定的数据,在需要的地方可以直接获取,避免参数直接传递带来的代码耦合问题
- 线程隔离:各线程之间的数据相互隔离却又具备并发性,避免同步方式带来的性能损失
ThreadLocal 用于数据连接的事务管理:
1 | public class JdbcUtils { |
用 ThreadLocal 使 SimpleDateFormat 从独享变量变成单个线程变量:
1 | public class ThreadLocalDateUtil { |
实现原理
底层结构
JDK8 以前:每个 ThreadLocal 都创建一个 Map,然后用线程作为 Map 的 key,要存储的局部变量作为 Map 的 value,达到各个线程的局部变量隔离的效果。这种结构会造成 Map 结构过大和内存泄露,因为 Thread 停止后无法通过 key 删除对应的数据
JDK8 以后:每个 Thread 维护一个 ThreadLocalMap,这个 Map 的 key 是 ThreadLocal 实例本身,value 是真正要存储的值
- 每个 Thread 线程内部都有一个 Map (ThreadLocalMap)
- Map 里面存储 ThreadLocal 对象(key)和线程的私有变量(value)
- Thread 内部的 Map 是由 ThreadLocal 维护的,由 ThreadLocal 负责向 map 获取和设置线程的变量值
- 对于不同的线程,每次获取副本值时,别的线程并不能获取到当前线程的副本值,形成副本的隔离,互不干扰
JDK8 前后对比:
- 每个 Map 存储的 Entry 数量会变少,因为之前的存储数量由 Thread 的数量决定,现在由 ThreadLocal 的数量决定,在实际编程当中,往往 ThreadLocal 的数量要少于 Thread 的数量
- 当 Thread 销毁之后,对应的 ThreadLocalMap 也会随之销毁,能减少内存的使用,防止内存泄露
成员变量
Thread 类的相关属性:每一个线程持有一个 ThreadLocalMap 对象,存放由 ThreadLocal 和数据组成的 Entry 键值对
1
ThreadLocal.ThreadLocalMap threadLocals = null
计算 ThreadLocal 对象的哈希值:
1
private final int threadLocalHashCode = nextHashCode()
使用
threadLocalHashCode & (table.length - 1)
计算当前 entry 需要存放的位置每创建一个 ThreadLocal 对象就会使用 nextHashCode 分配一个 hash 值给这个对象:
1
private static AtomicInteger nextHashCode = new AtomicInteger()
斐波那契数也叫黄金分割数,hash 的增量就是这个数字,带来的好处是 hash 分布非常均匀:
1
private static final int HASH_INCREMENT = 0x61c88647
成员方法
方法都是线程安全的,因为 ThreadLocal 属于一个线程的,ThreadLocal 中的方法,逻辑都是获取当前线程维护的 ThreadLocalMap 对象,然后进行数据的增删改查,没有指定初始值的 threadlcoal 对象默认赋值为 null
initialValue():返回该线程局部变量的初始值
- 延迟调用的方法,在执行 get 方法时才执行
- 该方法缺省(默认)实现直接返回一个 null
- 如果想要一个初始值,可以重写此方法, 该方法是一个
protected
的方法,为了让子类覆盖而设计的
1
2
3protected T initialValue() {
return null;
}nextHashCode():计算哈希值,ThreadLocal 的散列方式称之为斐波那契散列,每次获取哈希值都会加上 HASH_INCREMENT,这样做可以尽量避免 hash 冲突,让哈希值能均匀的分布在 2 的 n 次方的数组中
1
2
3
4private static int nextHashCode() {
// 哈希值自增一个 HASH_INCREMENT 数值
return nextHashCode.getAndAdd(HASH_INCREMENT);
}set():修改当前线程与当前 threadlocal 对象相关联的线程局部变量
1
2
3
4
5
6
7
8
9
10
11
12
13public void set(T value) {
// 获取当前线程对象
Thread t = Thread.currentThread();
// 获取此线程对象中维护的 ThreadLocalMap 对象
ThreadLocalMap map = getMap(t);
// 判断 map 是否存在
if (map != null)
// 调用 threadLocalMap.set 方法进行重写或者添加
map.set(this, value);
else
// map 为空,调用 createMap 进行 ThreadLocalMap 对象的初始化。参数1是当前线程,参数2是局部变量
createMap(t, value);
}1
2
3
4
5
6
7
8
9// 获取当前线程 Thread 对应维护的 ThreadLocalMap
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
// 创建当前线程Thread对应维护的ThreadLocalMap
void createMap(Thread t, T firstValue) {
// 【这里的 this 是调用此方法的 threadLocal】,创建一个新的 Map 并设置第一个数据
t.threadLocals = new ThreadLocalMap(this, firstValue);
}get():获取当前线程与当前 ThreadLocal 对象相关联的线程局部变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
// 如果此map存在
if (map != null) {
// 以当前的 ThreadLocal 为 key,调用 getEntry 获取对应的存储实体 e
ThreadLocalMap.Entry e = map.getEntry(this);
// 对 e 进行判空
if (e != null) {
// 获取存储实体 e 对应的 value值
T result = (T)e.value;
return result;
}
}
/*有两种情况有执行当前代码
第一种情况: map 不存在,表示此线程没有维护的 ThreadLocalMap 对象
第二种情况: map 存在, 但是【没有与当前 ThreadLocal 关联的 entry】,就会设置为默认值 */
// 初始化当前线程与当前 threadLocal 对象相关联的 value
return setInitialValue();
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private T setInitialValue() {
// 调用initialValue获取初始化的值,此方法可以被子类重写, 如果不重写默认返回 null
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
// 判断 map 是否初始化过
if (map != null)
// 存在则调用 map.set 设置此实体 entry,value 是默认的值
map.set(this, value);
else
// 调用 createMap 进行 ThreadLocalMap 对象的初始化中
createMap(t, value);
// 返回线程与当前 threadLocal 关联的局部变量
return value;
}remove():移除当前线程与当前 threadLocal 对象相关联的线程局部变量
1
2
3
4
5
6
7public void remove() {
// 获取当前线程对象中维护的 ThreadLocalMap 对象
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
// map 存在则调用 map.remove,this时当前ThreadLocal,以this为key删除对应的实体
m.remove(this);
}
LocalMap
成员属性
ThreadLocalMap 是 ThreadLocal 的内部类,没有实现 Map 接口,用独立的方式实现了 Map 的功能,其内部 Entry 也是独立实现
1 | // 初始化当前 map 内部散列表数组的初始长度 16 |
存储结构 Entry:
- Entry 继承 WeakReference,key 是弱引用,目的是将 ThreadLocal 对象的生命周期和线程生命周期解绑
- Entry 限制只能用 ThreadLocal 作为 key,key 为 null (entry.get() == null) 意味着 key 不再被引用,entry 也可以从 table 中清除
1 | static class Entry extends WeakReference<ThreadLocal<?>> { |
构造方法:延迟初始化的,线程第一次存储 threadLocal - value 时才会创建 threadLocalMap 对象
1 | ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) { |
成员方法
set():添加数据,ThreadLocalMap 使用线性探测法来解决哈希冲突
该方法会一直探测下一个地址,直到有空的地址后插入,若插入后 Map 数量超过阈值,数组会扩容为原来的 2 倍
假设当前 table 长度为16,计算出来 key 的 hash 值为 14,如果 table[14] 上已经有值,并且其 key 与当前 key 不一致,那么就发生了 hash 冲突,这个时候将 14 加 1 得到 15,取 table[15] 进行判断,如果还是冲突会回到 0,取 table[0],以此类推,直到可以插入,可以把 Entry[] table 看成一个环形数组
线性探测法会出现堆积问题,可以采取平方探测法解决
在探测过程中 ThreadLocal 会复用 key 为 null 的脏 Entry 对象,并进行垃圾清理,防止出现内存泄漏
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35private void set(ThreadLocal<?> key, Object value) {
// 获取散列表
ThreadLocal.ThreadLocalMap.Entry[] tab = table;
int len = tab.length;
// 哈希寻址
int i = key.threadLocalHashCode & (len-1);
// 使用线性探测法向后查找元素,碰到 entry 为空时停止探测
for (ThreadLocal.ThreadLocalMap.Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
// 获取当前元素 key
ThreadLocal<?> k = e.get();
// ThreadLocal 对应的 key 存在,【直接覆盖之前的值】
if (k == key) {
e.value = value;
return;
}
// 【这两个条件谁先成立不一定,所以 replaceStaleEntry 中还需要判断 k == key 的情况】
// key 为 null,但是值不为 null,说明之前的 ThreadLocal 对象已经被回收了,当前是【过期数据】
if (k == null) {
// 【碰到一个过期的 slot,当前数据复用该槽位,替换过期数据】
// 这个方法还进行了垃圾清理动作,防止内存泄漏
replaceStaleEntry(key, value, i);
return;
}
}
// 逻辑到这说明碰到 slot == null 的位置,则在空元素的位置创建一个新的 Entry
tab[i] = new Entry(key, value);
// 数量 + 1
int sz = ++size;
// 【做一次启发式清理】,如果没有清除任何 entry 并且【当前使用量达到了负载因子所定义,那么进行 rehash
if (!cleanSomeSlots(i, sz) && sz >= threshold)
// 扩容
rehash();
}1
2
3
4
5// 获取【环形数组】的下一个索引
private static int nextIndex(int i, int len) {
// 索引越界后从 0 开始继续获取
return ((i + 1 < len) ? i + 1 : 0);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50// 在指定位置插入指定的数据
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
// 获取散列表
Entry[] tab = table;
int len = tab.length;
Entry e;
// 探测式清理的开始下标,默认从当前 staleSlot 开始
int slotToExpunge = staleSlot;
// 以当前 staleSlot 开始【向前迭代查找】,找到索引靠前过期数据,找到以后替换 slotToExpunge 值
// 【保证在一个区间段内,从最前面的过期数据开始清理】
for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
// 以 staleSlot 【向后去查找】,直到碰到 null 为止,还是线性探测
for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
// 获取当前节点的 key
ThreadLocal<?> k = e.get();
// 条件成立说明是【替换逻辑】
if (k == key) {
e.value = value;
// 因为本来要在 staleSlot 索引处插入该数据,现在找到了i索引处的key与数据一致
// 但是 i 位置距离正确的位置更远,因为是向后查找,所以还是要在 staleSlot 位置插入当前 entry
// 然后将 table[staleSlot] 这个过期数据放到当前循环到的 table[i] 这个位置,
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// 条件成立说明向前查找过期数据并未找到过期的 entry,但 staleSlot 位置已经不是过期数据了,i 位置才是
if (slotToExpunge == staleSlot)
slotToExpunge = i;
// 【清理过期数据,expungeStaleEntry 探测式清理,cleanSomeSlots 启发式清理】
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// 条件成立说明当前遍历的 entry 是一个过期数据,并且该位置前面也没有过期数据
if (k == null && slotToExpunge == staleSlot)
// 探测式清理过期数据的开始下标修改为当前循环的 index,因为 staleSlot 会放入要添加的数据
slotToExpunge = i;
}
// 向后查找过程中并未发现 k == key 的 entry,说明当前是一个【取代过期数据逻辑】
// 删除原有的数据引用,防止内存泄露
tab[staleSlot].value = null;
// staleSlot 位置添加数据,【上面的所有逻辑都不会更改 staleSlot 的值】
tab[staleSlot] = new Entry(key, value);
// 条件成立说明除了 staleSlot 以外,还发现其它的过期 slot,所以要【开启清理数据的逻辑】
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}1
2
3
4private static int prevIndex(int i, int len) {
// 形成一个环绕式的访问,头索引越界后置为尾索引
return ((i - 1 >= 0) ? i - 1 : len - 1);
}getEntry():ThreadLocal 的 get 方法以当前的 ThreadLocal 为 key,调用 getEntry 获取对应的存储实体 e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38private Entry getEntry(ThreadLocal<?> key) {
// 哈希寻址
int i = key.threadLocalHashCode & (table.length - 1);
// 访问散列表中指定指定位置的 slot
Entry e = table[i];
// 条件成立,说明 slot 有值并且 key 就是要寻找的 key,直接返回
if (e != null && e.get() == key)
return e;
else
// 进行线性探测
return getEntryAfterMiss(key, i, e);
}
// 线性探测寻址
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
// 获取散列表
Entry[] tab = table;
int len = tab.length;
// 开始遍历,碰到 slot == null 的情况,搜索结束
while (e != null) {
// 获取当前 slot 中 entry 对象的 key
ThreadLocal<?> k = e.get();
// 条件成立说明找到了,直接返回
if (k == key)
return e;
if (k == null)
// 过期数据,【探测式过期数据回收】
expungeStaleEntry(i);
else
// 更新 index 继续向后走
i = nextIndex(i, len);
// 获取下一个槽位中的 entry
e = tab[i];
}
// 说明当前区段没有找到相应数据
// 【因为存放数据是线性的向后寻找槽位,都是紧挨着的,不可能越过一个 空槽位 在后面放】,可以减少遍历的次数
return null;
}rehash():触发一次全量清理,如果数组长度大于等于长度的
2/3 * 3/4 = 1/2
,则进行 resize1
2
3
4
5
6
7
8private void rehash() {
// 清楚当前散列表内的【所有】过期的数据
expungeStaleEntries();
// threshold = len * 2 / 3,就是 2/3 * (1 - 1/4)
if (size >= threshold - threshold / 4)
resize();
}1
2
3
4
5
6
7
8
9
10private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
// 【遍历所有的槽位,清理过期数据】
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null)
expungeStaleEntry(j);
}
}Entry 数组为扩容为原来的 2 倍 ,重新计算 key 的散列值,如果遇到 key 为 null 的情况,会将其 value 也置为 null,帮助 GC
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
// 新数组的长度是老数组的二倍
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
// 统计新table中的entry数量
int count = 0;
// 遍历老表,进行【数据迁移】
for (int j = 0; j < oldLen; ++j) {
// 访问老表的指定位置的 entry
Entry e = oldTab[j];
// 条件成立说明老表中该位置有数据,可能是过期数据也可能不是
if (e != null) {
ThreadLocal<?> k = e.get();
// 过期数据
if (k == null) {
e.value = null; // Help the GC
} else {
// 非过期数据,在新表中进行哈希寻址
int h = k.threadLocalHashCode & (newLen - 1);
// 【线程探测】
while (newTab[h] != null)
h = nextIndex(h, newLen);
// 将数据存放到新表合适的 slot 中
newTab[h] = e;
count++;
}
}
}
// 设置下一次触发扩容的指标:threshold = len * 2 / 3;
setThreshold(newLen);
size = count;
// 将扩容后的新表赋值给 threadLocalMap 内部散列表数组引用
table = newTab;
}remove():删除 Entry
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
// 哈希寻址
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) {
// 找到了对应的 key
if (e.get() == key) {
// 设置 key 为 null
e.clear();
// 探测式清理
expungeStaleEntry(i);
return;
}
}
}
清理方法
探测式清理:沿着开始位置向后探测清理过期数据,沿途中碰到未过期数据则将此数据 rehash 在 table 数组中的定位,重定位后的元素理论上更接近
i = entry.key & (table.length - 1)
,让数据的排列更紧凑,会优化整个散列表查询性能1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42// table[staleSlot] 是一个过期数据,以这个位置开始继续向后查找过期数据
private int expungeStaleEntry(int staleSlot) {
// 获取散列表和数组长度
Entry[] tab = table;
int len = tab.length;
// help gc,先把当前过期的 entry 置空,在取消对 entry 的引用
tab[staleSlot].value = null;
tab[staleSlot] = null;
// 数量-1
size--;
Entry e;
int i;
// 从 staleSlot 开始向后遍历,直到碰到 slot == null 结束,【区间内清理过期数据】
for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 当前 entry 是过期数据
if (k == null) {
// help gc
e.value = null;
tab[i] = null;
size--;
} else {
// 当前 entry 不是过期数据的逻辑,【rehash】
// 重新计算当前 entry 对应的 index
int h = k.threadLocalHashCode & (len - 1);
// 条件成立说明当前 entry 存储时发生过 hash 冲突,向后偏移过了
if (h != i) {
// 当前位置置空
tab[i] = null;
// 以正确位置 h 开始,向后查找第一个可以存放 entry 的位置
while (tab[h] != null)
h = nextIndex(h, len);
// 将当前元素放入到【距离正确位置更近的位置,有可能就是正确位置】
tab[h] = e;
}
}
}
// 返回 slot = null 的槽位索引,图例是 7,这个索引代表【索引前面的区间已经清理完成垃圾了】
return i;
}启发式清理:向后循环扫描过期数据,发现过期数据调用探测式清理方法,如果连续几次的循环都没有发现过期数据,就停止扫描
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28// i 表示启发式清理工作开始位置,一般是空 slot,n 一般传递的是 table.length
private boolean cleanSomeSlots(int i, int n) {
// 表示启发式清理工作是否清除了过期数据
boolean removed = false;
// 获取当前 map 的散列表引用
Entry[] tab = table;
int len = tab.length;
do {
// 获取下一个索引,因为探测式返回的 slot 为 null
i = nextIndex(i, len);
Entry e = tab[i];
// 条件成立说明是过期的数据,key 被 gc 了
if (e != null && e.get() == null) {
// 【发现过期数据重置 n 为数组的长度】
n = len;
// 表示清理过过期数据
removed = true;
// 以当前过期的 slot 为开始节点 做一次探测式清理工作
i = expungeStaleEntry(i);
}
// 假设 table 长度为 16
// 16 >>> 1 ==> 8,8 >>> 1 ==> 4,4 >>> 1 ==> 2,2 >>> 1 ==> 1,1 >>> 1 ==> 0
// 连续经过这么多次循环【没有扫描到过期数据】,就停止循环,扫描到空 slot 不算,因为不是过期数据
} while ((n >>>= 1) != 0);
// 返回清除标记
return removed;
}
参考视频:https://space.bilibili.com/457326371/
内存泄漏
Memory leak:内存泄漏是指程序中动态分配的堆内存由于某种原因未释放或无法释放,造成系统内存的浪费,导致程序运行速度减慢甚至系统崩溃等严重后果,内存泄漏的堆积终将导致内存溢出
如果 key 使用强引用:使用完 ThreadLocal ,threadLocal Ref 被回收,但是 threadLocalMap 的 Entry 强引用了 threadLocal,造成 threadLocal 无法被回收,无法完全避免内存泄漏
如果 key 使用弱引用:使用完 ThreadLocal ,threadLocal Ref 被回收,ThreadLocalMap 只持有 ThreadLocal 的弱引用,所以threadlocal 也可以被回收,此时 Entry 中的 key = null。但没有手动删除这个 Entry 或者 CurrentThread 依然运行,依然存在强引用链,value 不会被回收,而这块 value 永远不会被访问到,也会导致 value 内存泄漏
两个主要原因:
- 没有手动删除这个 Entry
- CurrentThread 依然运行
根本原因:ThreadLocalMap 是 Thread的一个属性,生命周期跟 Thread 一样长,如果没有手动删除对应 Entry 就会导致内存泄漏
解决方法:使用完 ThreadLocal 中存储的内容后将它 remove 掉就可以
ThreadLocal 内部解决方法:在 ThreadLocalMap 中的 set/getEntry 方法中,通过线性探测法对 key 进行判断,如果 key 为 null(ThreadLocal 为 null)会对 Entry 进行垃圾回收。所以使用弱引用比强引用多一层保障,就算不调用 remove,也有机会进行 GC
变量传递
基本使用
父子线程:创建子线程的线程是父线程,比如实例中的 main 线程就是父线程
ThreadLocal 中存储的是线程的局部变量,如果想实现线程间局部变量传递可以使用 InheritableThreadLocal 类
1 | public static void main(String[] args) { |
实现原理
InheritableThreadLocal 源码:
1 | public class InheritableThreadLocal<T> extends ThreadLocal<T> { |
实现父子线程间的局部变量共享需要追溯到 Thread 对象的构造方法:
1 | private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc, |
1 | private ThreadLocalMap(ThreadLocalMap parentMap) { |
参考文章:https://blog.csdn.net/feichitianxia/article/details/110495764