智慧团建网站进不去网站在线制作
C++并发编程:构建线程安全队列(第一部分:粗粒度锁)
引言
在多线程编程中,线程之间的数据共享和通信是一个非常重要的问题。在这篇博客中,我们将讨论如何用C++实现一个基础但非常实用的线程安全队列。这个队列使用粗粒度的互斥锁和条件变量来实现。
线程安全队列的基础实现
下面是基础代码结构:
template <typename T>
class threadsafe_queue
{
private:mutable std::mutex mut;std::queue<std::shared_ptr<T>> data_queue;std::condition_variable data_cond;// ...(省略其余代码)
};
互斥锁和条件变量
std::mutex mut
: 用于确保队列操作的线程安全。std::condition_variable data_cond
: 用于阻塞和唤醒等待队列操作的线程。
push方法
void push(T new_value)
{std::shared_ptr<T> data(std::make_shared<T>(std::move(new_value)));std::unique_lock lk(mut);data_queue.push(data);data_cond.notify_one();
}
这里使用 std::unique_lock
来获取互斥锁,确保数据的线程安全。然后使用 data_cond.notify_one()
来唤醒可能正在等待队列变为非空的线程。
pop方法
对于 pop
,我们有两个版本:
wait_and_pop
:等待直到队列非空。try_pop
:尝试弹出,如果队列为空则立即返回。
void wait_and_pop(T& value)
{std::unique_lock lk(mut);data_cond.wait(lk, [this] { return !data_queue.empty(); });value = std::move(*data_queue.front());data_queue.pop();
}bool try_pop(T& value)
{std::unique_lock lk(mut);if (data_queue.empty()) return false;value = std::move(*data_queue.front());data_queue.pop();return true;
}
在 wait_and_pop
中,我们使用 data_cond.wait()
来阻塞当前线程,直到队列变为非空。
测试
我们使用了一个生产者线程和两个消费者线程进行测试。
// 测试函数
void test_threadsafe_queue()
{threadsafe_queue<int> tsq;// 创建一个生产者线程std::thread producer([&](){for (int i = 0; i < 10; ++i){std::cout << "Pushing " << i << std::endl;tsq.push(i);}});// 创建两个消费者线程std::thread consumer1([&](){for (int i = 0; i < 5; ++i){int value;tsq.wait_and_pop(value);std::cout << "Consumer 1 popped " << value << std::endl;}});std::thread consumer2([&](){for (int i = 0; i < 5; ++i){int value;tsq.wait_and_pop(value);std::cout << "Consumer 2 popped " << value << std::endl;}});// 等待所有线程完成producer.join();consumer1.join();consumer2.join();
}
完整代码
template <typename T>
class threadsafe_queue
{
private:mutable std::mutex mut;std::queue<std::shared_ptr<T>> data_queue;std::condition_variable data_cond;public:threadsafe_queue() = default;void wait_and_pop(T& value){std::unique_lock lk(mut);data_cond.wait(lk, [this] { return !data_queue.empty(); });value = std::move(*data_queue.front());data_queue.pop();}bool try_pop(T& value){std::unique_lock lk(mut);if (data_queue.empty()) return false;value = std::move(*data_queue.front());data_queue.pop();return true;}std::shared_ptr<T> wait_and_pop(){std::unique_lock lk(mut);data_cond.wait(lk, [this] { return !data_queue.empty(); });std::shared_ptr<T> res = data_queue.front();data_queue.pop();return res;}std::shared_ptr<T> try_pop(){std::unique_lock lk(mut);if (data_queue.empty()) return std::make_shared<T>();std::shared_ptr<T> res = data_queue.front();data_queue.pop();return res;}void push(T new_value){std::shared_ptr<T> data(std::make_shared<T>(std::move(new_value)));std::unique_lock lk(mut);data_queue.push(data);data_cond.notify_one();}bool empty(){std::unique_lock lk(mut);return data_queue.empty();}
};// 测试函数
void test_threadsafe_queue()
{threadsafe_queue<int> tsq;// 创建一个生产者线程std::thread producer([&](){for (int i = 0; i < 10; ++i){std::cout << "Pushing " << i << std::endl;tsq.push(i);}});// 创建两个消费者线程std::thread consumer1([&](){for (int i = 0; i < 5; ++i){int value;tsq.wait_and_pop(value);std::cout << "Consumer 1 popped " << value << std::endl;}});std::thread consumer2([&](){for (int i = 0; i < 5; ++i){int value;tsq.wait_and_pop(value);std::cout << "Consumer 2 popped " << value << std::endl;}});// 等待所有线程完成producer.join();consumer1.join();consumer2.join();
}int main()
{test_threadsafe_queue();return 0;
}
总结
这篇博客中,我们简要介绍了如何使用C++的标准库来实现一个基础的线程安全队列。虽然我们使用了粗粒度的互斥锁,但这个实现是非常实用和直观的。在下一篇博客中,我们将讨论如何进行优化,以提高性能和效率。