目 录CONTENT

文章目录

C++ 实现 生产者-消费者 模型

TalentQ
2025-09-10 / 0 评论 / 0 点赞 / 2 阅读 / 0 字

0 目标

使用 C++ 手搓一个“生产者-消费者”模型,涉及 std::threadstd::queuestd::mutexstd::condition_variable

1 简单的 生产者-消费者 模型

生产者生产数据,存放在队列中,如果队列满了就阻塞住;消费者消费数据,从队列中取出,如果队列为空则阻塞住。

生产者线程和消费者线程,在访问队列时,需要保证线程安全,即访问时需要加锁。

  • “队列满了就阻塞、队列为空就阻塞”,这是由 std::condition_variable 来保证的;

  • “访问队列要加锁“,这是由 std::mutex 来保证的(我们使用 std::unique_lock 锁管理器,因为只有这个锁管理器可以配合条件变量使用)。

#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

std::queue<int> g_queue;
std::mutex g_mutex;
std::condition_variable g_producer_cv;
std::condition_variable g_consumer_cv;
constexpr int MAX_QUEUE_SIZE = 5;

void producer(uint32_t id) {
  for (int i = 0; i < 10; ++i) {
    // 模拟生产时间
    std::this_thread::sleep_for(std::chrono::milliseconds(100));

    // 生产数据
    int item = id * 1000 + i;

    std::unique_lock<std::mutex> lock(g_mutex);

    // 等待队列不满
    g_producer_cv.wait(lock, [] { return g_queue.size() < MAX_QUEUE_SIZE; });

    // 将数据放入队列
    g_queue.push(item);
    std::cout << "Producer " << id << " product: " << item << std::endl;

    // 解锁并通知消费者
    lock.unlock();
    g_consumer_cv.notify_one();
  }
  std::cout << "Producer " << id << " finished." << std::endl;
}

void consumer(uint32_t id) {
  for (int i = 0; i < 10; ++i) {
    // 加锁保护共享队列
    std::unique_lock<std::mutex> lock(g_mutex);

    // 等待队列不空
    g_consumer_cv.wait(lock, [] { return !g_queue.empty(); });

    // 从队列取出项目
    int item = g_queue.front();
    g_queue.pop();
    std::cout << "Consumer " << id << " consumed: " << item << std::endl;

    // 解锁并通知生产者
    lock.unlock();
    g_producer_cv.notify_one();

    // 模拟消费时间
    std::this_thread::sleep_for(std::chrono::milliseconds(150));
  }

  std::cout << "Consumer " << id << " finished." << std::endl;
}

int main() {
  std::vector<std::thread> pro, con;

  for (int i = 0; i < 4; ++i) {
    pro.emplace_back(producer, i + 1);
    con.emplace_back(consumer, i + 1);
  }

  for (int i = 0; i < 4; ++i) {
    pro[i].join();
    con[i].join();
  }

  std::cout << "All producers and consumers finished." << std::endl;

  return 0;
}

2 优雅的 生产者-消费者 模型

生产者-消费者模型,看起来就是生产者线程不断生产数据,消费者线程不断消费数据。上一节的代码实现中,对于这两类线程,借助 std::mutex 保护了访问的冲突域(队列),借助 std::condition_variable 控制了线程的行为(不能无脑生产或消费,该阻塞就要阻塞,时候到了就要唤醒)。

本质上,是由于 C++ 的 队列 std::queue 不是线程安全的,从而要求生产者线程和消费者线程实现线程安全。

至此,有没有一种冲动,让冲突域(队列)变得线程安全不就万事大吉了嘛!如果有一个线程安全的队列,生产者线程和消费者线程就可以闭着眼睛做自己的事情就好了,根本不用考虑要不要加锁、要不要阻塞、要不要唤醒,就直接隐退成普通函数就好了。

代码实现

实现一个线程安全的队列,考虑其可扩展性,实现成了模板类,其中对队列的 push 操作,优化成了移动语义。

这里的 类名为ThreadSafeBuffer,本质上生产者线程和消费者线程共用一块缓冲区,只不过我们采用的是队列这个数据结构。

#include <chrono>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>

template <typename T>
class ThreadSafeBuffer {
 public:
  explicit ThreadSafeBuffer(uint32_t max_size) : max_size_(max_size) {}

  void thread_safe_push(T item) {
    std::unique_lock<std::mutex> lock(buffer_mtx_);
    not_full_.wait(lock, [this] { return buffer_.size() < max_size_; });
    buffer_.push(std::move(item));
    lock.unlock();
    not_empty_.notify_one();
  }

  T thread_safe_front_and_pop() {
    std::unique_lock<std::mutex> lock(buffer_mtx_);
    not_empty_.wait(lock, [this] { return !buffer_.empty(); });
    T item = buffer_.front();
    buffer_.pop();
    lock.unlock();
    not_full_.notify_one();
    return item;
  }

 private:
  std::queue<T> buffer_;
  const uint32_t max_size_;
  std::mutex buffer_mtx_;
  std::condition_variable not_full_;
  std::condition_variable not_empty_;
};

ThreadSafeBuffer<int> g_buffer(5);

void Producer(int id) {
  for (int i = 0; i < 10; ++i) {
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    int item = id * 100 + 1;
    g_buffer.thread_safe_push(item);
    std::cout << "Producer " << id << " product: " << item << std::endl;
  }
  std::cout << "Producer " << id << " finished." << std::endl;
}

void Consumer(int id) {
  for (int i = 0; i < 10; ++i) {
    int item = g_buffer.thread_safe_front_and_pop();
    std::cout << "Consumer " << id << " consumed: " << item << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(150));
  }
  std::cout << "Consumer " << id << " finished." << std::endl;
}

int main() {
  std::vector<std::thread> pro, con;

  for (int i = 0; i < 4; ++i) {
    pro.emplace_back(Producer, i);
    con.emplace_back(Consumer, i);
  }

  for (int i = 0; i < 4; ++i) {
    pro[i].join();
    con[i].join();
  }

  std::cout << "All producers and consumers finished." << std::endl;

  return 0;
}

3 小小瑕疵

如果你真的跑了这个代码,就会发现终端打印的字符会乱序,这是因为输出缓冲区也是冲突域,而我们没有对 std::cout进行加锁管理。

在实际工程中,我们可能不会打印,而是记录日志。如果是记录到同一个文件中,也是需要手动管理这个冲突域的,保证任意时刻最多只能有一个线程在访问冲突域。

0

评论区