高并发、多线程探索-2-线程安全性-原子性-Atomic-CAS

线程安全性

线程安全?

当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。线程安全性?

线程安全性

主要体现在三个方面:原子性、可见性、有序性

  • 原子性:提供了互斥访问,同一时刻只能有一个线程来对它进行操作
  • 可见性:一个线程对主内存的修改可以及时的被其他线程观察到
  • 有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序。

基础代码:以下代码用于描述下方的知识点,所有代码均在此代码基础上进行修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package cn.northpark.concurrency.atomic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

import cn.northpark.concurrency.annotaion.ThreadSafe;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@ThreadSafe
public class TestAtomicInteger {

// 请求总数
static int totalCount = 5000;

// 同时并发执行的线程数
static int threadTotal = 200;

// 计数器
public static AtomicInteger count = new AtomicInteger(0);

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
// 限制同时执行的线程数
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(totalCount);

for (int i = 0; i < totalCount; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
countDownLatch.countDown();

});
}

countDownLatch.await();
executorService.shutdown();
log.info("count{}", count);
}

public static void add() {
count.incrementAndGet();
// count.getAndIncrement();
}
}

原子性

说到原子性,一共有两个方面需要学习一下,一个是JDK中已经提供好的Atomic包,他们均使用了CAS完成线程的原子性操作,另一个是使用锁的机制来处理线程之间的原子性。锁包括:synchronized、Lock

Atomic包中的类与CAS:

1

我们从最简单的AtomicInteger类来了解什么是CAS

AtomicInteger

上边的示例代码就是通过AtomicInteger类保证了线程的原子性。
那么它是如何保证原子性的呢?我们接下来分析一下它的源码。示例中,对count变量的+1操作,采用的是incrementAndGet方法,此方法的源码中调用了一个名为==unsafe.getAndAddInt==的方法

1
2
3
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

而getAndAddInt方法的具体实现为:

1
2
3
4
5
6
7
8
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);

} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}

在此方法中,方法参数为要操作的对象Object
var1、期望底层当前的数值为var2、要修改的数值var4。定义的var5为真正从底层取出来的值。采用do..while循环的方式去获取底层数值并与期望值进行比较,比较成功才将值进行修改。而这个比较再进行修改的方法就是compareAndSwapInt就是我们所说的CAS,它是一系列的接口,比如下面罗列的几个接口。使用native修饰,是底层的方法。CAS取的是==compareAndSwap==三个单词的首字母.

另外,示例代码中的count可以理解为JMM中的==工作内存==,而这里的底层数值即为==主内存==,如果看过我上一篇文章的盆友就能把这一块的知识点串联起来了。

1
2
3
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

AtomicLong 与 LongAdder

LongAdder是java8为我们提供的新的类,跟AtomicLong有相同的效果。首先看一下代码实现:

AtomicLong:

1
2
3
4
5
6
//变量声明
public static AtomicLong count = new AtomicLong(0);
//变量操作
count.incrementAndGet();
//变量取值
count.get();

LongAdder:

1
2
3
4
5
6
//变量声明
public static LongAdder count = new LongAdder();
//变量操作
count.increment();
//变量取值
count

那么问题来了,为什么有了AtomicLong还要新增一个LongAdder呢?
原因是:CAS底层实现是在一个死循环中不断地尝试修改目标值,直到修改成功。如果竞争不激烈的时候,修改成功率很高,否则失败率很高。在失败的时候,这些重复的原子性操作会耗费性能

知识点: 对于普通类型的long、double变量,JVM允许将64位的读操作或写操作拆成两个32位的操作。

1
LongAdder类的实现核心是将热点数据分离,比如说它可以将AtomicLong内部的内部核心数据value分离成一个数组,每个线程访问时,通过hash等算法映射到其中一个数字进行计数,而最终的计数结果则为这个数组的求和累加,其中热点数据value会被分离成多个单元的cell,每个cell独自维护内部的值。当前对象的实际值由所有的cell累计合成,这样热点就进行了有效地分离,并提高了并行度。这相当于将AtomicLong的单点的更新压力分担到各个节点上。在低并发的时候通过对base的直接更新,可以保障和AtomicLong的性能基本一致。而在高并发的时候通过分散提高了性能。

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void increment() {
add(1L);
}
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}

缺点:如果在统计的时候,如果有并发更新,可能会有统计数据有误差。实际使用中在处理高并发计数的时候优先使用==LongAdder[并发]==,而不是AtomicLong在线程竞争很低的时候,使用AtomicLong会简单效率更高一些。比如==序列号生成(准确性)==

AtomicBoolean

这个类中值得一提的是它包含了一个名为==compareAndSet==的方法,这个方法可以做到的是控制一个boolean变量在一件事情执行之前为false,事情执行之后变为true。或者也可以理解为可以控制某一件事只让一个线程执行,并仅能执行一次。

他的源码如下:

1
2
3
4
5
public final boolean compareAndSet(boolean expect, boolean update) {
int e = expect ? 1 : 0;
int u = update ? 1 : 0;
return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}

举例说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package cn.northpark.concurrency.atomic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;

import cn.northpark.concurrency.annotaion.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@ThreadSafe
public class TestAtomicBoolean {

/**
* AtomicBoolean 可以应用某些行为只允许执行一次的场景
*/
private static AtomicBoolean isHappened = new AtomicBoolean(false);

// 请求总数
public static int clientTotal = 5000;

// 同时并发执行的线程数
public static int threadTotal = 200;

public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
executorService.execute(() -> {
try {
semaphore.acquire();
test();
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("isHappened:{}", isHappened.get());
}

private static void test() {
if (isHappened.compareAndSet(false, true)) {
log.info("execute");
}
}
}

结果:(log只打印一次)

1
2
11:27:32.761 [pool-1-thread-1] INFO cn.northpark.concurrency.atomic.TestAtomicBoolean - execute
11:27:32.801 [main] INFO cn.northpark.concurrency.atomic.TestAtomicBoolean - isHappened:true

AtomicIntegerFieldUpdater

这个类的核心作用是要更新一个指定的类的某一个字段的值。并且这个字段一定要用==volatile修饰==同时还==不能是static==的。

举例说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package cn.northpark.concurrency.atomic;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import cn.northpark.concurrency.annotaion.ThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@ThreadSafe
public class TestAtomicReferenceFieldUpdater {

private static AtomicIntegerFieldUpdater<TestAtomicReferenceFieldUpdater> updater = AtomicIntegerFieldUpdater
.newUpdater(TestAtomicReferenceFieldUpdater.class, "count");

// AtomicReferenceFieldUpdater 主要用来更新某个类的volatile标识、非static 的字段
@Getter
public volatile int count = 100;

public static void main(String[] args) {
TestAtomicReferenceFieldUpdater testAtomicReferenceFieldUpdater = new TestAtomicReferenceFieldUpdater();
if (updater.compareAndSet(testAtomicReferenceFieldUpdater, 100, 120)) {
log.info("update success ,{}", testAtomicReferenceFieldUpdater.getCount());
}
if (updater.compareAndSet(testAtomicReferenceFieldUpdater, 100, 120)) {
log.info("update success 2,{}", testAtomicReferenceFieldUpdater.getCount());
} else {
log.info("update fail ");
}
}
}

此方法输出的结果为:

11:57:31.507 [main] INFO
cn.northpark.concurrency.atomic.TestAtomicReferenceFieldUpdater -
update success ,120 11:57:31.511 [main] INFO
cn.northpark.concurrency.atomic.TestAtomicReferenceFieldUpdater -
update fail
由此可见,count的值只修改了一次。

AtomicStampReference与CAS的ABA问题

什么是ABA问题?
CAS操作的时候,其他线程将变量的值A改成了B,但是随后又改成了A,本线程在CAS方法中使用期望值A与当前变量进行比较的时候,发现变量的值未发生改变,于是CAS就将变量的值进行了交换操作。但是实际上变量的值已经被其他的变量改变过,这与设计思想是不符合的。所以就有了AtomicStampReference。

源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}

private volatile Pair<V> pair;

private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}

public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference && //排除新的引用和新的版本号与底层的值相同的情况
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}

AtomicStampReference的处理思想是,每次变量更新的时候,将变量的版本号+1,之前的ABA问题中,变量经过两次操作以后,变量的版本号就会由1变成3,也就是说只要线程对变量进行过操作,变量的版本号就会发生更改。从而解决了ABA问题。

解释一下上边的源码:
类中维护了一个volatile修饰的Pair类型变量current,Pair是一个私有的静态类,current可以理解为底层数值。
compareAndSet方法的参数部分分别为期望的引用、新的引用、期望的版本号、新的版本号。
return的逻辑为判断了期望的引用和版本号是否与底层的引用和版本号相符,并且排除了新的引用和新的版本号与底层的值相同的情况(即不需要修改)的情况(return代码部分3、4行)。条件成立,执行casPair方法,调用CAS操作。

AtomicLongArray

这个类实际上维护了一个Array数组,我们在对数值进行更新的时候,会多一个索引值让我们更新。

原子性,提供了互斥访问,同一时刻只能有一个线程来对它进行操作。那么在java里,保证同一时刻只有一个线程对它进行操作的,除了==Atomic包==之外,还有==锁的机制==。JDK提供锁主要分为两种:==synchronized==和==Lock==。接下来我们了解一下synchronized。

synchronized

依赖于JVM去实现锁,因此在这个关键字作用对象的作用范围内,都是同一时刻只能有一个线程对其进行操作的。
synchronized是java中的一个关键字,是一种同步锁。它可以修饰的对象主要有四种:

  • 修饰代码块:大括号括起来的代码,作用于调用的对象
  • 修饰方法:整个方法,作用于调用的对象

———————————————————————–

  • 修饰静态方法:整个静态方法,作用于所有对象
  • 修饰类:括号括起来的部分,作用于所有对象

synchronized 修饰一个代码块

被修饰的代码称为同步语句块,作用的范围是大括号括起来的部分。作用的对象是调用这段代码的对象。
验证:

1
2
3
4
5
6
7
8
public class SynchronizedExample {
public void test(int j){
synchronized (this){
for (int i = 0; i < 10; i++) {
log.info("test - {} - {}",j,i);
}
}
}

//使用线程池方法进行测试:

1
2
3
4
5
6
7
8
public static void main(String[] args) {
SynchronizedExample example1 = new SynchronizedExample();
SynchronizedExample example2 = new SynchronizedExample();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(()-> example1.test(1));
executorService.execute(()-> example2.test(2));
}
}

结果:不同对象之间的操作互不影响

synchronized 修饰一个方法

被修饰的方法称为同步方法,作用的范围是大括号括起来的部分,作用的对象是调用这段代码的对象
验证:

public class SynchronizedExample
public synchronized void test(int j){
for (int i = 0; i < 10; i++) {
log.info(“test - {} - {}”,j,i);
}
}
//验证方法与上面相同

}

结果:不同对象之间的操作互不影响

==TIPS:
如果当前类是一个父类,子类调用父类的被synchronized修饰的方法,不会携带synchronized属性,因为synchronized不属于方法声明的一部分synchronized 修饰一个静态方法==

作用的范围是synchronized 大括号括起来的部分,作用的对象是这个类的所有对象。
验证:

1
2
3
4
5
6
7
8
9
public class SynchronizedExample{
public static synchronized void test(int j){
for (int i = 0; i < 10; i++) {
log.info("test - {} - {}",j,i);
}
}
//验证方法与上面相同
...
}

结果:同一时间只有一个线程可以执行

synchronized 修饰一个类

验证:

1
2
3
4
5
6
7
8
9
10
11
public class SynchronizedExample{
public static void test(int j){
synchronized (SynchronizedExample.class){
for (int i = 0; i < 10; i++) {
log.info("test - {}-{}",j,i);
}
}
}
//验证方法与上面相同
...
}

结果:同一时间只有一个线程可以执行

原子性操作各方法间的对比

synchronized:不可中断锁,==适合竞争不激烈==,可读性好
Lock:可中断锁,多样化同步,==竞争激烈时能维持常态==
Atomic:==竞争激烈时能维持常态==,==比Lock性能好==,==每次只能同步一个值==

生活不止苟且,还有我喜爱的海岸.