0 目标
使用 C++ 手搓一个“生产者-消费者”模型,涉及 std::thread、std::queue、std::mutex、std::condition_variable。
1 简单的 生产者-消费者 模型
生产者生产数据,存放在队列中,如果队列满了就阻塞住;消费者消费数据,从队列中取出,如果队列为空则阻塞住。
生产者线程和消费者线程,在访问队列时,需要保证线程安全,即访问时需要加锁。
“队列满了就阻塞、队列为空就阻塞”,这是由
std::condition_variable来保证的;“访问队列要加锁“,这是由
std::mutex来保证的(我们使用std::unique_lock锁管理器,因为只有这个锁管理器可以配合条件变量使用)。
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
std::queue<int> g_queue;
std::mutex g_mutex;
std::condition_variable g_producer_cv;
std::condition_variable g_consumer_cv;
constexpr int MAX_QUEUE_SIZE = 5;
void producer(uint32_t id) {
for (int i = 0; i < 10; ++i) {
// 模拟生产时间
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// 生产数据
int item = id * 1000 + i;
std::unique_lock<std::mutex> lock(g_mutex);
// 等待队列不满
g_producer_cv.wait(lock, [] { return g_queue.size() < MAX_QUEUE_SIZE; });
// 将数据放入队列
g_queue.push(item);
std::cout << "Producer " << id << " product: " << item << std::endl;
// 解锁并通知消费者
lock.unlock();
g_consumer_cv.notify_one();
}
std::cout << "Producer " << id << " finished." << std::endl;
}
void consumer(uint32_t id) {
for (int i = 0; i < 10; ++i) {
// 加锁保护共享队列
std::unique_lock<std::mutex> lock(g_mutex);
// 等待队列不空
g_consumer_cv.wait(lock, [] { return !g_queue.empty(); });
// 从队列取出项目
int item = g_queue.front();
g_queue.pop();
std::cout << "Consumer " << id << " consumed: " << item << std::endl;
// 解锁并通知生产者
lock.unlock();
g_producer_cv.notify_one();
// 模拟消费时间
std::this_thread::sleep_for(std::chrono::milliseconds(150));
}
std::cout << "Consumer " << id << " finished." << std::endl;
}
int main() {
std::vector<std::thread> pro, con;
for (int i = 0; i < 4; ++i) {
pro.emplace_back(producer, i + 1);
con.emplace_back(consumer, i + 1);
}
for (int i = 0; i < 4; ++i) {
pro[i].join();
con[i].join();
}
std::cout << "All producers and consumers finished." << std::endl;
return 0;
}2 优雅的 生产者-消费者 模型
生产者-消费者模型,看起来就是生产者线程不断生产数据,消费者线程不断消费数据。上一节的代码实现中,对于这两类线程,借助 std::mutex 保护了访问的冲突域(队列),借助 std::condition_variable 控制了线程的行为(不能无脑生产或消费,该阻塞就要阻塞,时候到了就要唤醒)。
本质上,是由于 C++ 的 队列 std::queue 不是线程安全的,从而要求生产者线程和消费者线程实现线程安全。
至此,有没有一种冲动,让冲突域(队列)变得线程安全不就万事大吉了嘛!如果有一个线程安全的队列,生产者线程和消费者线程就可以闭着眼睛做自己的事情就好了,根本不用考虑要不要加锁、要不要阻塞、要不要唤醒,就直接隐退成普通函数就好了。
代码实现
实现一个线程安全的队列,考虑其可扩展性,实现成了模板类,其中对队列的 push 操作,优化成了移动语义。
这里的 类名为ThreadSafeBuffer,本质上生产者线程和消费者线程共用一块缓冲区,只不过我们采用的是队列这个数据结构。
#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
template <typename T>
class ThreadSafeBuffer {
public:
explicit ThreadSafeBuffer(uint32_t max_size) : max_size_(max_size) {}
void thread_safe_push(T item) {
std::unique_lock<std::mutex> lock(buffer_mtx_);
not_full_.wait(lock, [this] { return buffer_.size() < max_size_; });
buffer_.push(std::move(item));
lock.unlock();
not_empty_.notify_one();
}
T thread_safe_front_and_pop() {
std::unique_lock<std::mutex> lock(buffer_mtx_);
not_empty_.wait(lock, [this] { return !buffer_.empty(); });
T item = buffer_.front();
buffer_.pop();
lock.unlock();
not_full_.notify_one();
return item;
}
private:
std::queue<T> buffer_;
const uint32_t max_size_;
std::mutex buffer_mtx_;
std::condition_variable not_full_;
std::condition_variable not_empty_;
};
ThreadSafeBuffer<int> g_buffer(5);
void Producer(int id) {
for (int i = 0; i < 10; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
int item = id * 100 + 1;
g_buffer.thread_safe_push(item);
std::cout << "Producer " << id << " product: " << item << std::endl;
}
std::cout << "Producer " << id << " finished." << std::endl;
}
void Consumer(int id) {
for (int i = 0; i < 10; ++i) {
int item = g_buffer.thread_safe_front_and_pop();
std::cout << "Consumer " << id << " consumed: " << item << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(150));
}
std::cout << "Consumer " << id << " finished." << std::endl;
}
int main() {
std::vector<std::thread> pro, con;
for (int i = 0; i < 4; ++i) {
pro.emplace_back(Producer, i);
con.emplace_back(Consumer, i);
}
for (int i = 0; i < 4; ++i) {
pro[i].join();
con[i].join();
}
std::cout << "All producers and consumers finished." << std::endl;
return 0;
}3 小小瑕疵
如果你真的跑了这个代码,就会发现终端打印的字符会乱序,这是因为输出缓冲区也是冲突域,而我们没有对 std::cout进行加锁管理。
在实际工程中,我们可能不会打印,而是记录日志。如果是记录到同一个文件中,也是需要手动管理这个冲突域的,保证任意时刻最多只能有一个线程在访问冲突域。
评论区