C++ 实现线程安全的任务队列
C++ 实现线程安全的任务队列flyfish 2015-3-6一、三个接口函数说明1 add 新增任务2 get_nonblocking 非阻塞获取任务或者空任务3 get_blocking 阻塞获取任务头文件#pragma once#include#include#include#include//任务 网络发送任务使用的结构,通常有...
·
C++ 实现线程安全的任务队列
flyfish 2015-3-6
本文包括使用Boost库实现和使用标准C++11实现
使用Boost库实现
一、三个接口函数说明
1 add 新增任务
2 get_nonblocking 非阻塞获取任务或者空任务
3 get_blocking 阻塞获取任务
头文件
#pragma once
#include <deque>
#include <boost/thread/mutex.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/condition_variable.hpp>
//任务 网络发送任务使用的结构,通常有一个发送缓冲区和一个实际要发送的长度
class task
{
public:
unsigned char data[2048];
unsigned int len;//实际发送长度
task::task();
task::~task();
};
class task_queue
{
private:
std::deque<task> tasks;
boost::mutex tasks_mutex;
boost::condition_variable cv;
public:
task_queue::task_queue();
task_queue::~task_queue();
void add(const task& task);
std::tuple<bool,task> get_nonblock();
task get_block();
};
实现文件
#include "task_queue.h"
//task
task::task()
{
for (int i=0;i<2048;i++)
data[i] = 0;
}
task::~task()
{
}
// task queue
task_queue::task_queue()
{
}
task_queue::~task_queue()
{
}
void task_queue::add(const task& task)
{
boost::unique_lock<boost::mutex> lock(tasks_mutex);//不允许其他线程执行
tasks.push_back(task);
lock.unlock();
cv.notify_one();//通知其他线程继续
}
std::tuple<bool,task> task_queue::get_nonblock()
{
boost::lock_guard<boost::mutex> lock(tasks_mutex);
std::tuple<bool,task> ret;
if (!tasks.empty())
{
ret=std::make_tuple(true,tasks.front());
tasks.pop_front();
}
else
{
task tmp;
ret=std::make_tuple(false,tmp);
}
return ret;
}
task task_queue::get_block()
{
boost::unique_lock<boost::mutex> lock(tasks_mutex);
while (tasks.empty())
{
cv.wait(lock);
}
task ret=tasks.front();
tasks.pop_front();
return ret;
}
二 解释
1 notify_one用于唤醒一个等待该条件(condition)发生的线程
2 可以使用boost::mutex::scoped_lock 替换boost::unique_lock<boost::mutex>,可以避免遗漏unlock
例如
boost::mutex mutex_;
try
{
mutex_.lock();
//do something
mutex_.unlock();
}
catch(...)
{
mutex_.unlock();
return 0;
}
使用boost::mutex::scoped_lock就可以避免catch遗漏unlock
3 boost::unique_lock仅仅比boost::lock_guard附加一些功能
4 如果任务在获取之后不删除,就可以使用多读一写方式,就要实现读写锁
读操作发生时: 写线程停止操作,允许多个线程同时读操作
写操作发生时: 只允许同一时刻只有一个线程写操作,其他无论读写线程全部停止。
代码类似
typedef boost::shared_lock<boost::shared_mutex> r_lock;
typedef boost::unique_lock<boost::shared_mutex> w_lock;
boost::shared_mutex mutex_;
void read()
{
r_lock lock(mutex_);
//do something
}
void write()
{
w_lock lock(mutex_);
//do something
}
模板方式
#pragma once
#include <tuple>
#include <deque>
#include <boost/thread/mutex.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/condition_variable.hpp>
template <typename T>
class task_queue
{
private:
std::deque<T> tasks;
boost::mutex tasks_mutex;
boost::condition_variable cv;
public:
void add(const T& t)
{
boost::unique_lock<boost::mutex> lock(tasks_mutex);
tasks.push_back(t);
lock.unlock();
cv.notify_one();
}
std::tuple<bool, T> get_nonblock()
{
boost::lock_guard<boost::mutex> lock(tasks_mutex);
std::tuple<bool,T> ret;
if (!tasks.empty())
{
ret=std::make_tuple(true,tasks.front());
tasks.pop_front();
}
else
{
T tmp;
ret=std::make_tuple(false,tmp);
}
return ret;
}
T get_block()
{
boost::unique_lock<boost::mutex> lock(tasks_mutex);
while (tasks.empty())
{
cv.wait(lock);
}
T ret=tasks.front();
tasks.pop_front();
return ret;
}
};
使用C++11实现
#pragma once
#include <condition_variable>
#include <mutex>
#include <queue>
template <typename T>
class ThreadSafeQueue
{
public:
ThreadSafeQueue() = default;
ThreadSafeQueue(const ThreadSafeQueue& other) = delete;
ThreadSafeQueue& operator=(const ThreadSafeQueue& other) = delete;
void Push(const T& new_value);
bool TryPop(T& value);
void WaitAndPop(T& value);
bool WaitAndTryPop(T& value, const std::chrono::microseconds rel_time);
bool Empty()
{
std::lock_guard<std::mutex> lock(mutex_);
return data_.empty();
}
uint32_t Size()
{
std::lock_guard<std::mutex> lock(mutex_);
return data_.size();
}
private:
std::mutex mutex_;
std::queue<T> data_;
std::condition_variable cv_;
};
template <typename T>
bool ThreadSafeQueue<T>::TryPop(T& value)
{
std::lock_guard<std::mutex> lock(mutex_);
if (data_.empty()) {
return false;
} else {
value = data_.front();
data_.pop();
return true;
}
}
template <typename T>
void ThreadSafeQueue<T>::WaitAndPop(T& value)
{
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return !data_.empty(); });
value = data_.front();
data_.pop();
}
template <typename T>
bool ThreadSafeQueue<T>::WaitAndTryPop(T& value, const std::chrono::microseconds rel_time)
{
//the relative timeout rel_time expires
std::unique_lock<std::mutex> lock(mutex_);
if (cv_.wait_for(lock, rel_time, [&] { return !data_.empty(); }))
{
value = data_.front();
data_.pop();
return true;
} else {
return false;
}
}
template <typename T>
void ThreadSafeQueue<T>::Push(const T& new_value)
{
std::unique_lock<std::mutex> lock(mutex_);
data_.push(new_value);
lock.unlock();
cv_.notify_one();
}
测试
#include <string>
#include <sstream>
#include <vector>
#include <queue>
#include <functional>
#include <algorithm>
#include <cstdlib>
#include <ctime>
#include <iostream>
#include <memory>
#include <thread>
#include "ThreadSafeQueue.hpp"
std::mutex data_mutex_;
void ThreadFuncPush(ThreadSafeQueue<int>* thread_safe_queue, int data)
{
std::lock_guard<std::mutex> lk(data_mutex_);
thread_safe_queue->Push(data);
std::cout<<"Push:"<<data<<std::endl;
}
void ThreadFuncTryPop(ThreadSafeQueue<int>* thread_safe_queue)
{
int value = -1;
bool res = thread_safe_queue->TryPop(value);
std::lock_guard<std::mutex> lk(data_mutex_);
if (res)
{
std::cout<<"TryPop:"<<value<<std::endl;
}
}
void ThreadFuncWaitAndPop(ThreadSafeQueue<int>* thread_safe_queue)
{
int value = -1;
thread_safe_queue->WaitAndPop(value);
std::lock_guard<std::mutex> lk(data_mutex_);
std::cout<<"WaitAndPop:"<<value<<std::endl;
}
void ThreadFuncWaitAndTryPop(ThreadSafeQueue<int>* thread_safe_queue)
{
int value = -1;
bool res = thread_safe_queue->WaitAndTryPop(value, std::chrono::microseconds(50));
std::lock_guard<std::mutex> lk(data_mutex_);
if (res)
{
std::cout<<"WaitAndTryPop 50:"<<value<<std::endl;
}
}
int main()
{
const int total_count=100;
ThreadSafeQueue<int> thread_safe_queue;
std::thread* threads[total_count];
int data[total_count];
for (int i = 0; i < total_count; i++) {
data[i] = i+100;
}
uint32_t seed = (uint32_t)time(0);
srand(time(nullptr));
for(int i=0;i<50;i++)
{
threads[i] = new std::thread(ThreadFuncPush, &thread_safe_queue, data[i]);
}
for(int i=50;i<total_count;i++)
{
switch (rand_r(&seed) % 4)
{
case 0:
threads[i] = new std::thread(ThreadFuncTryPop, &thread_safe_queue);
break;
case 1:
threads[i] = new std::thread(ThreadFuncWaitAndPop, &thread_safe_queue);
break;
case 2:
threads[i] = new std::thread(ThreadFuncWaitAndTryPop, &thread_safe_queue);
break;
case 3:
threads[i] = new std::thread(ThreadFuncPush, &thread_safe_queue, data[i]);
break;
default:
break;
}
}
for (int k = 0; k < 100; k++) {
threads[k]->join();
}
return 0;
}
更多推荐
所有评论(0)