互斥量

互斥量解决多线程数据共享问题

当多线程去共享同一个数据的时候,会造成争夺

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <iostream>
#include <thread>

int a = 0;
void func() {
for (int i = 0; i < 10000; i++) {
a += 1;
}
}

int main() {
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();
std::cout << a << std::endl;
system("pause");
return 0;
}

运行上述代码发现,a的值没有如我们预期所认为是20000,而是随机的数字。这是因为线程在读取同一个数据的时候发生了争夺。
解决办法:当一个线程拿了数据,其他线程禁止拿,也就是互斥锁:线程访问共享资源前,先加锁(lock),用完后解锁(unlock)。

1
2
3
4
5
6
7
8
9
10
#include <mutex>
std::mutex mtx;

void func() {
for (int i = 0; i < 10000; i++) {
mtx.lock(); //在这个线程即将访问数据的时候上锁
a += 1;
mtx.unlock(); //在访问完成后解锁
}
}

多线程安全:如果多线程程序每一次的运行结果和单线程运行的结果是一样的,那么你的线程就是安全的。

互斥量死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <iostream>
#include <thread>
#include <mutex>

std::mutex m1,m2;
void func_1()
{
for (int i = 0; i < 50; i++) {
m1.lock();
m2.lock();
m1.unlock();
m2.unlock();
}
}

void func_2()
{
for (int i = 0; i < 50; i++) {
m2.lock();
m1.lock();
m1.unlock();
m2.unlock();
}

}

int main() {
std::thread t1(func_1);
std::thread t2(func_2);
t1.join();
t2.join();
std::cout << "over" << std::endl;
system("pause");
return 0;
}

解决方法,当某个线程获取到了m1,那就让他获取m2,按照这样的规则,所有的线程都得先有m1才有m2,那其他线程拿不到m1自然也拿不到m2,所以调换func_2的m1、m2顺序即可。

lock_guard与unique_lock

std::lock_guard是C++标准库中的一种互斥量封装类,用于保护共享数据,防止多个线程同时访问同一资源而导致的数据竞争问题。

  • 当构造函数被调用时,该互斥量会被自动锁定
  • 当析构函数被调用时,该互斥量会被自动解锁
  • std::lock_guard对象不能复制或移动,因此它只能在局部作用域中使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <iostream>
#include <mutex>
#include <thread>

int shared_data = 0;

std::mutex mtx;
void func() {
for (int i = 0; i < 10000; i++) {
std::lock_guard<std::mutex>lg(mtx);
shared_data++;
}
}

int main()
{
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();

std::cout << shared_data << std::endl;

return 0;

}

lock_guardlg(mtx)的作用就相当于mtx.lock() 且 mtx.unlock().
有五个用法:

标准用法

1
2
3
4
5
6
std::mutex mtx;
void func() {
std::unique_lock<std::mutex> lock(mtx); // 加锁
// 临界区代码
} // 作用域结束,自动解锁

手动解锁

1
2
3
4
5
6
7
8
std::mutex mtx;
void func() {
std::unique_lock<std::mutex> lock(mtx); // 加锁
// 访问共享资源
lock.unlock(); // 提前解锁
// 此处不再受互斥锁保护
}

延迟加锁

1
2
3
4
5
6
7
std::mutex mtx;
void func() {
std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 不自动加锁
// ... 其他代码
lock.lock(); // 需要时再手动加锁
}

尝试加锁

1
2
3
4
5
6
7
8
9
std::mutex mtx;
void func() {
std::unique_lock<std::mutex> lock(mtx, std::try_to_lock);
if (lock.owns_lock()) { // 判断是否加锁成功
// 临界区代码
} else {
// 锁未获取成功,执行其他操作
}
}

互斥锁转移

1
2
3
4
5
6
std::mutex mtx;
void func() {
std::unique_lock<std::mutex> lock1(mtx); // 获取锁
std::unique_lock<std::mutex> lock2 = std::move(lock1); // lock1 转移给 lock2
}

条件变量

生产者与消费者模型

生产者与消费者
生产者与消费者模型可以这样比喻:生产者是小鸡,任务队列是鸡蛋篮子,消费者是饲养员。有源源不断的任务从生产者发出,由消费者解除,也类似于银行排队系统。
当任务队列为的时候,消费者无法去取任务,因此会进入等待的状态。那此时老板会下发任务,如何让消费者知道有任务?需要通知,让消费者知道我该往里面取任务了。
condition_variable有两种

  • notify_one 唤醒消费者中的一个线程来干活
  • notify-all 唤醒消费者中的所有线程来干活
    区别在于:通知只发一次,但是对象不同,唤醒一条线程和所有线程的区别
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <iostream>
#include <mutex>
#include <thread>
#include <queue>
#include <condition_variable>

std::queue<int>g_queue;
std::condition_variable g_cv;
std::mutex mtx;

void Producer()
{
for (int i = 0; i < 10; i++)
{
std::unique_lock<std::mutex> lock(mtx);
g_queue.push(i);
g_cv.notify_one();
std::cout << "Producer : " << i << std::endl;
}
std::this_thread::sleep_for(std::chrono::microseconds(100));
}

void Consumer()
{
while (1)
{
std::unique_lock<std::mutex> lock(mtx);
g_cv.wait(lock, []() {return !g_queue.empty();
});
int value = g_queue.front();
g_queue.pop();

std::cout << "Consumer : " << value << std::endl;
}
}

int main()
{
std::thread t1(Producer);
std::thread t2(Consumer);
t1.join();
t2.join();
return 0;
}

wait函数g_cv.wait(lock, predicate)的作用:

  • 当前线程进入等待状态,直到 predicate 返回 true,也就是说第二个判断条件是true就往下执行
  • lock 是一个 std::unique_lockstd::mutex,用于保护临界区资源。
  • predicate 是一个 Lambda 表达式,返回 true 时线程继续执行,否则会一直等待。
    两个代码是等价的
1
g_cv.wait(lock, []() { return !g_queue.empty(); });
1
2
3
4
5
while (!g_queue.empty()) {
// 条件不满足,释放 lock 并进入等待状态
g_cv.wait(lock);
}
// 条件满足,继续执行后续代码
  • 当 g_queue 为空时,线程会阻塞(等待)。
  • 当 g_queue 非空时,线程继续执行,不会进入等待状态。

原子操作

除了可以用互斥锁来维护共享变量外,还可以通过原子操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <iostream>
#include <thread>
#include <atomic>
std::atomic <int> a(0);
void func() {
for (int i = 0; i < 1000000; i++) {
a += 1;
}
}

int main() {
std::thread t1(func);
std::thread t2(func);
t1.join();
t2.join();
std::cout << a << std::endl;
system("pause");
return 0;
}

注意,在初始化的时候,不允许使用std::atomic<int> a = 0,因为原子操作不允许拷贝复制,应该用默认的构造函数std::atomic<int> a(0)或std::atomic<int> a{0}
把共享的数据设置为原子变量,更好地维护线程安全,还可以提升运行速度。

小班演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <chrono>

std::mutex mtx;
std::condition_variable g_cv;
std::queue<int> g_queue; //容器要包含类型
int flag = 0;

void Producer()
{
for (int i = 0; i < 100; i++)
{
std::unique_lock<std::mutex> lock(mtx);
g_queue.push(i);
g_cv.notify_one(); //每次加任务的时候通知一下
std::cout << "Producer : " << i << std::endl;
}
std::this_thread::sleep_for(std::chrono::microseconds(1000));
}

void Comsumer()
{
while (1)
{
if (flag == 100) break;
std::unique_lock<std::mutex> lock(mtx);
g_cv.wait(lock, []() {return !g_queue.empty();
});
flag++;
int value = g_queue.front();
g_queue.pop();
std::cout << "Comsumer : " << value << std::endl;

}
}

int main()
{
std::thread t1(Producer);
std::thread t2(Comsumer);
t1.join();
t2.join();
return 0;
}

C++11跨平台线程池