1. 引言
在多线程环境中,线程经常需要等待某个条件达成后,再继续执行。
例如:
等待队列非空再消费
等待缓存刷新完成再继续
仅用互斥量 std::mutex 会导致忙等待(busy-waiting),浪费 CPU。
条件变量(condition variable,下文简称 CV)允许线程主动挂起,直到被其他线程显式唤醒,从而优雅地解决生产者-消费者读写同步等问题。
CV = 互斥锁 + 等待队列 + 原子唤醒。
2. 基本概念与背景
谓词:一个布尔表达式,用于判断等待条件是否满足(防止虚假唤醒)。
虚假唤醒:操作系统可能在没有 notify_* 的情况下唤醒线程,必须用谓词二次检查。
2.1 互斥量 std::mutex 的局限
std::mutex mtx;
bool ready = false;
// 线程1:忙等待
void Thread1() {
while (true) {
std::lock_guard<std::mutex> lk(mtx);
if (ready) break; // 条件满足
} // 解锁
// 仍然需要循环检查,浪费CPU
}2.2 轮询 vs. 阻塞等待
3. std::condition_variable
3.1 基本用法:wait/notify_one/notify_all
代码示例:
// g++ -std=c++17 cv_basic.cpp -pthread
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
std::mutex mtx;
std::condition_variable cv;
bool ready = false; // 受保护的条件
void Worker() {
std::unique_lock<std::mutex> lk(mtx);
// 等待 ready 变为 true;防止虚假唤醒
cv.wait(lk, [] { return ready; });
std::cout << "Worker: condition satisfied!\n";
}
void Boss() {
{
std::lock_guard<std::mutex> lk(mtx);
ready = true;
}
cv.notify_one(); // 通知一个等待线程
}
int main() {
std::thread t(Worker);
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时
Boss();
t.join();
return 0;
}
wait:
内部会自动解锁 lk,阻塞当前线程;被唤醒后再次加锁并检查谓词。必须使用 std::unique_lock 而非 lock_guard,因为 wait 需要在阻塞时释放锁。
notify_one:
唤醒一个正在等待该条件变量的线程(如果有)。被唤醒的线程会重新尝试获取锁,检查谓词是否满足,若不满足则可能继续等待。
notify_all:
唤醒所有正在等待该条件变量的线程。所有被唤醒的线程会竞争锁,随后检查谓词,只有谓词为 true 的线程会继续执行,其余线程继续等待。
notify_one适用于单消费者场景(如任务队列),避免不必要的线程唤醒;notify_all适用于多消费者或状态变化需广播的场景(如读写锁的写完成通知)。
3.2 虚假唤醒(Spurious Wakeup)
虚假唤醒是操作系统运行时偶尔在没有任何线程调用 notify_one/notify_all 的情况下,把某个正在 cv.wait() 的线程唤醒的现象。
它不是 bug,而是所有主流平台(Linux futex、Windows condition variable、BSD 等)都允许出现的实现细节。
操作系统可能无理由唤醒等待线程,因此必须:
使用谓词(如上例的 lambda)
或手动循环检查条件
wait 重载
在 C++ 标准库里,std::condition_variable 提供了两个重载的 wait:
不带谓词的版本:必须自己写循环检查条件,防止虚假唤醒。
void wait(std::unique_lock<std::mutex>& lock);用法示例:
while (!pred()) cv.wait(lock);带谓词的版本:标准库实现了循环,内部仍然是“先解锁→阻塞→醒来再加锁→再检查谓词”。
template <class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);等价于:
while (!pred()) wait(lock);4. 现代 C++ 的改进
4.1 std::condition_variable_any
支持任意可锁类型(如 std::shared_lock),而不仅限于 std::unique_lock。
#include <shared_mutex>
std::shared_mutex smtx;
std::condition_variable_any cv_any;
void SharedWait() {
std::shared_lock<std::shared_mutex> lk(smtx);
cv_any.wait(lk, [] { return /* condition */; });
}4.2 std::stop_token + std::jthread
C++20 起,std::jthread 支持协作式取消,可与条件变量结合:
void Worker(std::stop_token st, std::queue<int>& q,
std::mutex& mtx, std::condition_variable& cv) {
while (!st.stop_requested()) {
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, st, [&q] { return !q.empty(); }); // C++20 支持 stop_token
if (st.stop_requested()) return;
// 处理 q.front()
}
}5. 生产者-消费者完整示例
现代 C++17 风格,支持多生产者/多消费者、优雅退出。
// g++ -std=c++17 -pthread blocking_queue.cpp -o demo
#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
#include <chrono>
#include <iostream>
template <typename T>
class BlockingQueue {
public:
explicit BlockingQueue(size_t capacity) : capacity_(capacity) {}
// 生产者:阻塞插入
void Push(T value) {
std::unique_lock<std::mutex> lk(mtx_);
// 等待队列非满
cv_producer_.wait(lk, [this] { return queue_.size() < capacity_; });
queue_.push(std::move(value));
lk.unlock();
cv_consumer_.notify_one(); // 通知消费者
}
// 消费者:阻塞弹出
T Pop() {
std::unique_lock<std::mutex> lk(mtx_);
// 等待队列非空
cv_consumer_.wait(lk, [this] { return !queue_.empty() || done_; });
if (done_) {
throw std::runtime_error("Queue is closed");
}
T value = std::move(queue_.front());
queue_.pop();
lk.unlock();
cv_producer_.notify_one(); // 通知生产者
return value;
}
// 关闭队列(所有等待中的消费者将退出)
void Shutdown() {
{
std::lock_guard<std::mutex> lk(mtx_);
done_ = true;
}
cv_consumer_.notify_all();
cv_producer_.notify_all();
}
~BlockingQueue() { Shutdown(); }
private:
std::mutex mtx_;
std::condition_variable cv_consumer_;
std::condition_variable cv_producer_;
std::queue<T> queue_;
size_t capacity_;
bool done_ = false;
};
int main() {
BlockingQueue<int> q(4);
// 生产者线程
std::thread producer([&q] {
for (int i = 0; i < 10; ++i) {
q.Push(i);
std::cout << "Produced: " << i << '\n';
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
// 消费者线程
std::thread consumer([&q] {
try {
for (int i = 0; i < 10; ++i) {
int val = q.Pop();
std::cout << "Consumed: " << val << '\n';
}
} catch (const std::exception& e) {
std::cerr << e.what() << '\n';
}
});
producer.join();
consumer.join();
return 0;
}关键代码讲解
双条件变量:
cv_consumer_:等待队列非空cv_producer_:等待队列非满
异常安全:
Shutdown()设置done_,消费者通过异常退出循环。锁粒度:解锁后通知,避免唤醒线程立即再次阻塞(减少上下文切换)。
6. 性能优化与常见误区
6.1 避免通知丢失
始终在修改条件后通知
使用
notify_all()仅当多个等待者可能满足条件(如广播场景)
6.2 条件变量 + 超时等待
cv.wait_for(lk, std::chrono::seconds(1), [] { return ready; });6.3 避免死锁
确保
wait的谓词不依赖其他线程已持有的锁使用
std::scoped_lock管理多锁(C++17)
评论区