Line data Source code
1 : #ifndef _SINGLEDISH_PTHREAD_UTILS_H_ 2 : #define _SINGLEDISH_PTHREAD_UTILS_H_ 3 : 4 : #include <pthread.h> 5 : 6 : #include <iostream> 7 : #include <sstream> 8 : #include <stdexcept> 9 : #include <unistd.h> 10 : 11 : using namespace std; 12 : 13 : #define THROW_IF(condition, msg) \ 14 : do { \ 15 : if ((condition)) { \ 16 : throw runtime_error((msg)); \ 17 : } \ 18 : } while (false) 19 : 20 : namespace casa { //# NAMESPACE CASA - BEGIN 21 : namespace sdfiller { //# NAMESPACE SDFILLER - BEGIN 22 : 23 : class Mutex { 24 : 25 : public: 26 0 : Mutex() : 27 0 : mutex_(PTHREAD_MUTEX_INITIALIZER) { 28 : // cout << "Mutex::Mutex()" << endl; 29 0 : int ret = pthread_mutex_init(&mutex_, NULL); 30 0 : THROW_IF(ret != 0, "Mutex::Mutex() failed to initalize mutex"); 31 0 : } 32 0 : ~Mutex() noexcept(false) { 33 : // cout << "Mutex::~Mutex()" << endl; 34 0 : int ret = pthread_mutex_destroy(&mutex_); 35 0 : THROW_IF(ret != 0, "Mutex::~Mutex() failed to destroy mutex"); 36 0 : } 37 0 : int lock() { 38 : // cout << "Mutex::lock()" << endl; 39 0 : int ret = pthread_mutex_lock(&mutex_); 40 0 : THROW_IF(ret != 0, "Mutex::lock() failed to lock mutex"); 41 0 : return ret; 42 : } 43 0 : int unlock() { 44 : // cout << "Mutex::unlock()" << endl; 45 0 : int ret = pthread_mutex_unlock(&mutex_); 46 0 : THROW_IF(ret != 0, "Mutex::unlock() failed to unlock mutex"); 47 0 : return ret; 48 : } 49 : int try_lock() { 50 : // cout << "Mutex::try_lock()" << endl; 51 : return pthread_mutex_trylock(&mutex_); 52 : } 53 : 54 : protected: 55 : pthread_mutex_t mutex_; 56 : 57 : friend class PCondition; 58 : }; 59 : 60 : class PCondition { 61 : public: 62 0 : PCondition(Mutex *mutex) : 63 0 : mutex_(&(mutex->mutex_)) { 64 0 : int ret = pthread_cond_init(&cond_, NULL); 65 0 : THROW_IF(ret != 0, 66 : "PCondition::PCondition() failed to initialize pthread_cond_t"); 67 0 : } 68 : 69 0 : virtual ~PCondition() noexcept(false) { 70 0 : int ret = pthread_cond_destroy(&cond_); 71 0 : THROW_IF(ret != 0, 72 : "PCondition::~PCondition() failed to destroy pthread_cond_t"); 73 0 : } 74 : int lock() { 75 : // cout << "PCondition::lock()" << endl; 76 : return pthread_mutex_lock(mutex_); 77 : } 78 : 79 : int unlock() { 80 : // cout << "PCondition::unlock()" << endl; 81 : return pthread_mutex_unlock(mutex_); 82 : } 83 : 84 0 : int wait() { 85 : // cout << "PCondition::wait()" << endl; 86 0 : int ret = pthread_cond_wait(&cond_, mutex_); 87 0 : THROW_IF(ret != 0, "PCondition::wait() failed to block pthread_cond_t"); 88 0 : return ret; 89 : } 90 : 91 0 : int signal() { 92 : // cout << "PCondition::signal()" << endl; 93 0 : int ret = pthread_cond_signal(&cond_); 94 0 : THROW_IF(ret != 0, "PCondition::signal() failed to release pthread_cond_t"); 95 0 : return ret; 96 : } 97 : private: 98 : pthread_mutex_t *mutex_; 99 : pthread_cond_t cond_; 100 : }; 101 : 102 : // implementation of producer consumer model 103 : template<class DataType, ssize_t BufferSize> 104 : class ProducerConsumerModelContext { 105 : public: 106 : typedef ProducerConsumerModelContext<DataType, BufferSize> _Context; 107 : 108 : // production function 109 0 : static void produce(_Context *context, DataType item) { 110 0 : context->lock(); 111 : 112 : // wait until buffer becomes available for production 113 0 : while (context->buffer_is_full()) { 114 0 : context->producer_wait(); 115 : } 116 : 117 0 : assert(!context->buffer_is_full()); 118 : 119 0 : context->push_product(item); 120 : 121 0 : context->producer_next(); 122 : 123 : // send a signal to consumer since something is produced 124 0 : context->consumer_signal(); 125 : 126 0 : context->unlock(); 127 0 : } 128 : 129 : // consumption function 130 : // return false if no more products available 131 : // otherwise return true 132 0 : static bool consume(_Context *context, DataType *item) { 133 0 : context->lock(); 134 : 135 : // wait until something is produced 136 0 : while (context->buffer_is_empty()) { 137 0 : context->consumer_wait(); 138 : } 139 : 140 0 : assert(!context->buffer_is_empty()); 141 : 142 0 : context->pop_product(item); 143 0 : bool more_products = (*item != context->end_of_production_); 144 : 145 0 : context->consumer_next(); 146 : 147 : // send a signal to consumer since there are available slot in buffer 148 0 : context->producer_signal(); 149 : 150 0 : context->unlock(); 151 : 152 0 : return more_products; 153 : } 154 : 155 : // it should be called when production complete 156 0 : static void complete_production(_Context *context) { 157 0 : produce(context, context->end_of_production_); 158 0 : } 159 : 160 : // constructor 161 0 : ProducerConsumerModelContext(DataType const terminator) : 162 0 : end_of_production_(terminator), num_product_in_buffer_(0), 163 0 : producer_index_(0), consumer_index_(0), mutex_(), 164 0 : consumer_condition_(&mutex_), producer_condition_(&mutex_) { 165 : //std::cout << "end_of_production = " << end_of_production_ << std::endl; 166 0 : } 167 : 168 : // destructor 169 0 : ~ProducerConsumerModelContext() { 170 0 : } 171 : 172 : // utility 173 : template<class T> 174 0 : static void locked_print(T msg, _Context *context) { 175 0 : context->lock(); 176 0 : cout << msg << endl; 177 0 : context->unlock(); 178 0 : } 179 : 180 : private: 181 0 : int lock() { 182 0 : return mutex_.lock(); 183 : } 184 : 185 0 : int unlock() { 186 0 : return mutex_.unlock(); 187 : } 188 : 189 : int try_lock() { 190 : return mutex_.try_lock(); 191 : } 192 : 193 0 : int producer_wait() { 194 0 : return producer_condition_.wait(); 195 : } 196 : 197 0 : int producer_signal() { 198 0 : return producer_condition_.signal(); 199 : } 200 : 201 0 : int consumer_wait() { 202 0 : return consumer_condition_.wait(); 203 : } 204 : 205 0 : int consumer_signal() { 206 0 : return consumer_condition_.signal(); 207 : } 208 : 209 0 : bool buffer_is_full() { 210 0 : return num_product_in_buffer_ >= BufferSize; 211 : } 212 : 213 0 : bool buffer_is_empty() { 214 0 : return num_product_in_buffer_ <= 0; 215 : } 216 : 217 0 : void producer_next() { 218 0 : producer_index_++; 219 0 : producer_index_ %= BufferSize; 220 0 : num_product_in_buffer_++; 221 0 : } 222 : 223 0 : void consumer_next() { 224 0 : consumer_index_++; 225 0 : consumer_index_ %= BufferSize; 226 0 : num_product_in_buffer_--; 227 0 : } 228 : 229 0 : void push_product(DataType item) { 230 0 : buffer_[producer_index_] = item; 231 0 : } 232 : 233 0 : void pop_product(DataType *item) { 234 0 : *item = buffer_[consumer_index_]; 235 0 : } 236 : 237 : // terminator data 238 : // (product == end_of_production_) indicates that production 239 : // is completed. 240 : DataType const end_of_production_; 241 : DataType buffer_[BufferSize]; 242 : ssize_t num_product_in_buffer_; 243 : ssize_t producer_index_; 244 : ssize_t consumer_index_; 245 : Mutex mutex_; 246 : PCondition consumer_condition_; 247 : PCondition producer_condition_; 248 : }; 249 : 250 : void create_thread(pthread_t *tid, pthread_attr_t *attr, void *(*func)(void *), 251 : void *param); 252 : void join_thread(pthread_t *tid, void **status); 253 : 254 : } //# NAMESPACE SDFILLER - END 255 : } //# NAMESPACE CASA - END 256 : 257 : #endif /* _SINGLEDISH_PTHREAD_UTILS_H_ */