cas 无锁编程

Last updated on 2 months ago

引言

最近面试 经常问到多线程的问题,如多个线程操作一个数据,如何保证数据同步一致,如果多线程加锁后,如何避免加锁?传统的锁机制虽然能解决大部分同步问题,但也有很多问题?如死锁,性能下降;所以最后问题导向都会朝着无锁问,无锁也有很多种无锁算法,本文就主要介绍 CAS 无锁算法,CAS不仅避免了传统锁的许多问题,还提高了系统的整体性能。本文将详细介绍CAS的基本原理、C++中的实现、应用场景、注意事项以及总结。

CAS的基本原理

什么是CAS?

CAS是一种原子操作,通常由硬件直接支持。它涉及三个参数:

  • **内存位置 (V)**:需要进行操作的内存地址。
  • **预期值 (A)**:期望内存位置V当前存储的值。
  • **新值 (B)**:如果内存位置V的值等于预期值A,则将V的值设置为新值B。

CAS的工作方式

  1. 比较:检查内存位置V的值是否等于预期值A。

  2. 交换:如果相等,则将V的值设置为新值B。

  3. 返回结果:返回操作是否成功的标志。

    以上的三个步骤都是一个整体,都是原子性的,不会被其他线程中断。

C++中的实现 && 和 mutex 差距

10个线程 对一个整数做累加

互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <chrono>
int counter = 0;
std::mutex mtx;
void increment() {
for (int i = 0; i < 10000000; ++i) {
// 锁定互斥锁
std::lock_guard<std::mutex> lock(mtx);
// 增加计数器
++counter;
}
}

int main() {
const int num_threads = 10;
std::vector<std::thread> threads;
// 记录开始时间
auto start = std::chrono::high_resolution_clock::now();

for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(increment);
}

for (auto& thread : threads) {
thread.join();
}

// 记录结束时间
auto end = std::chrono::high_resolution_clock::now();

// 计算总时间
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
std::cout << "Final counter value: " << counter << std::endl;
std::cout << "Time taken: " << duration << " milliseconds" << std::endl;
return 0;
}

//结果
//Final counter value: 100000000
//Time taken: 6941 milliseconds

CAS

在C++11及更高版本中,std::atomic类模板提供了对CAS的支持

在这个例子中,compare_exchange_weak函数尝试将counterexpected更新为expected + 1。如果counter的当前值不是expected,则compare_exchange_weak会失败,并将counter的实际值写回到expected中,循环继续直到成功为止。

其实这种方式 就是类似于自旋锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include <iostream>
#include <atomic>
#include <thread>
#include <vector>
#include <chrono>

std::atomic<int> counter(0);

void increment() {
for (int i = 0; i < 10000000; ++i) {
int expected;
//这总累加场景 ,建议用 atomic_fetch_add 来实现会更快
do {
//先读取 count 值
expected = counter.load();
//用count 的值 和 expected,如果还是一样的,expected +1 ,不是返回false,继续重试
// 多线程场景中,这种累加方式 重试的次数会很多, 会有大量计算资源浪费到轮询本身上;
} while (!counter.compare_exchange_weak(expected, expected + 1));
}
}

int main() {
const int num_threads = 10;
std::vector<std::thread> threads;

// 记录开始时间
auto start = std::chrono::high_resolution_clock::now();

for (int i = 0; i < num_threads; ++i) {
threads.emplace_back(increment);
}

for (auto& thread : threads) {
thread.join();
}

// 记录结束时间
auto end = std::chrono::high_resolution_clock::now();

// 计算总时间
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();

std::cout << "Final counter value: " << counter.load() << std::endl;
std::cout << "Time taken: " << duration << " milliseconds" << std::endl;

return 0;
}

// compare_exchange_weak cas方式
//hrp@ubuntu:~$ g++ cas1.cpp -o cas -pthread && ./cas
//Final counter value: 100000000
//Time taken: 16143 milliseconds
//hrp@ubuntu:~$


//atomic_fetch_add 实现
//hrp@ubuntu:~$ ./cas
//Final counter value: 100000000
//Time taken: 1491 milliseconds


耗时统计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
mutex
//Final counter value: 100000000
//Time taken: 6941 milliseconds

compare_exchange_weak cas方式
//hrp@ubuntu:~$ g++ cas1.cpp -o cas -pthread && ./cas
//Final counter value: 100000000
//Time taken: 16143 milliseconds
//hrp@ubuntu:~$


atomic_fetch_add 实现
//hrp@ubuntu:~$ ./cas
//Final counter value: 100000000
//Time taken: 1491 milliseconds

可以看到 cas 耗时更多,为什么呢?

cas 遇到竞争的解决方式是不是等待,而是不断地重是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
      do {
//先读取 count 值
expected = counter.load();
//用count 的值 和 expected,如果还是一样的,expected +1 ,不是返回false,继续重试
// 多线程场景中,这种累加方式 重试的次数会很多
} while (!counter.compare_exchange_weak(expected, expected + 1));

// 用 每次读取期待值后,打印count ,计算count 打印次数,
for (int i = 0; i < 10000000; ++i) {
int expected = 0;
do {
expected = counter.load();
std::cout << "count" << std::endl;
} while (!counter.compare_exchange_weak(expected, expected + 1));
}

//可以看到 重试了 3612149次
hrp@ubuntu:~$ g++ cas1.cpp -o cas -pthread && ./cas |wc -l
103612149
hrp@ubuntu:~$

系统调用次数,cas方式,上下文切换比较少,耗时都是在用户态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
hrp@ubuntu:~$ sudo perf stat -e context-switches ./cas
Final counter value: 100000000
Time taken: 16679 milliseconds

Performance counter stats for './cas':

3,413 context-switches

16.681459973 seconds time elapsed

131.787816000 seconds user
0.027979000 seconds sys


hrp@ubuntu:~$ sudo perf stat -e context-switches ./mutex
Final counter value: 100000000
Time taken: 7010 milliseconds

Performance counter stats for './mutex':

739,273 context-switches

7.012276162 seconds time elapsed

11.464533000 seconds user
15.450764000 seconds sys

atomic 中的两个接口

atomic 库,提供了两个接口 用来实现 CAS

  • **compare_exchange_weak**:
    • 可能因为硬件中断或其他原因失败,即使预期值与当前值匹配。
    • 通常更高效,因为可以在某些架构上生成更轻量级的指令。
    • 需要在循环中使用,因为可能需要多次尝试。
  • **compare_exchange_strong**:
    • 只有当预期值与当前值不匹配时才会失败。
    • 通常生成更复杂的指令,以确保强保证。
    • 无需在循环中使用,但可能在高竞争环境下性能较差。

两者耗时比较(差距很小)

1
2
3
4
5
6
7
8
9
10
11
#compare_exchange_strong
hrp@ubuntu:~$ g++ cas1.cpp -o cas -pthread && ./cas
Final counter value: 100000000
Time taken: 17137 milliseconds

#compare_exchange_weak
hrp@ubuntu:~$ vi cas1.cpp
hrp@ubuntu:~$ g++ cas1.cpp -o cas -pthread && ./cas
Final counter value: 100000000
Time taken: 17074 milliseconds

注意事项

ABA问题

ABA问题是CAS的一个常见问题。当一个值从A变为B再变回A时,CAS可能会错误地认为值没有改变过。解决这个问题的一种方法是使用带有版本号的CAS,或者使用std::atomic_flag等其他机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <atomic>

struct ABANode {
int value;
std::atomic<int> version;
ABANode(int val) : value(val), version(0) {}
};

bool cas_with_version(ABANode* node, int expected_value, int new_value, int expected_version) {
if (node->value == expected_value && node->version.load() == expected_version) {
node->value = new_value;
node->version.fetch_add(1);
return true;
}
return false;
}