casa  5.7.0-16
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
PThreadUtil.h
Go to the documentation of this file.
1 #ifndef _SINGLEDISH_PTHREAD_UTILS_H_
2 #define _SINGLEDISH_PTHREAD_UTILS_H_
3 
4 #include <pthread.h>
5 
6 #include <iostream>
7 #include <sstream>
8 #include <stdexcept>
9 #include <unistd.h>
10 
11 using namespace std;
12 
13 #define THROW_IF(condition, msg) \
14  do { \
15  if ((condition)) { \
16  throw runtime_error((msg)); \
17  } \
18  } while (false)
19 
20 namespace casa { //# NAMESPACE CASA - BEGIN
21 namespace sdfiller { //# NAMESPACE SDFILLER - BEGIN
22 
23 class Mutex {
24 
25 public:
26  Mutex() :
27  mutex_(PTHREAD_MUTEX_INITIALIZER) {
28 // cout << "Mutex::Mutex()" << endl;
29  int ret = pthread_mutex_init(&mutex_, NULL);
30  THROW_IF(ret != 0, "Mutex::Mutex() failed to initalize mutex");
31  }
32  ~Mutex() noexcept(false) {
33 // cout << "Mutex::~Mutex()" << endl;
34  int ret = pthread_mutex_destroy(&mutex_);
35  THROW_IF(ret != 0, "Mutex::~Mutex() failed to destroy mutex");
36  }
37  int lock() {
38 // cout << "Mutex::lock()" << endl;
39  int ret = pthread_mutex_lock(&mutex_);
40  THROW_IF(ret != 0, "Mutex::lock() failed to lock mutex");
41  return ret;
42  }
43  int unlock() {
44 // cout << "Mutex::unlock()" << endl;
45  int ret = pthread_mutex_unlock(&mutex_);
46  THROW_IF(ret != 0, "Mutex::unlock() failed to unlock mutex");
47  return ret;
48  }
49  int try_lock() {
50 // cout << "Mutex::try_lock()" << endl;
51  return pthread_mutex_trylock(&mutex_);
52  }
53 
54 protected:
55  pthread_mutex_t mutex_;
56 
57  friend class PCondition;
58 };
59 
60 class PCondition {
61 public:
62  PCondition(Mutex *mutex) :
63  mutex_(&(mutex->mutex_)) {
64  int ret = pthread_cond_init(&cond_, NULL);
65  THROW_IF(ret != 0,
66  "PCondition::PCondition() failed to initialize pthread_cond_t");
67  }
68 
69  virtual ~PCondition() noexcept(false) {
70  int ret = pthread_cond_destroy(&cond_);
71  THROW_IF(ret != 0,
72  "PCondition::~PCondition() failed to destroy pthread_cond_t");
73  }
74  int lock() {
75 // cout << "PCondition::lock()" << endl;
76  return pthread_mutex_lock(mutex_);
77  }
78 
79  int unlock() {
80 // cout << "PCondition::unlock()" << endl;
81  return pthread_mutex_unlock(mutex_);
82  }
83 
84  int wait() {
85 // cout << "PCondition::wait()" << endl;
86  int ret = pthread_cond_wait(&cond_, mutex_);
87  THROW_IF(ret != 0, "PCondition::wait() failed to block pthread_cond_t");
88  return ret;
89  }
90 
91  int signal() {
92 // cout << "PCondition::signal()" << endl;
93  int ret = pthread_cond_signal(&cond_);
94  THROW_IF(ret != 0, "PCondition::signal() failed to release pthread_cond_t");
95  return ret;
96  }
97 private:
98  pthread_mutex_t *mutex_;
99  pthread_cond_t cond_;
100 };
101 
102 // implementation of producer consumer model
103 template<class DataType, ssize_t BufferSize>
105 public:
107 
108  // production function
109  static void produce(_Context *context, DataType item) {
110  context->lock();
111 
112  // wait until buffer becomes available for production
113  while (context->buffer_is_full()) {
114  context->producer_wait();
115  }
116 
117  assert(!context->buffer_is_full());
118 
119  context->push_product(item);
120 
121  context->producer_next();
122 
123  // send a signal to consumer since something is produced
124  context->consumer_signal();
125 
126  context->unlock();
127  }
128 
129  // consumption function
130  // return false if no more products available
131  // otherwise return true
132  static bool consume(_Context *context, DataType *item) {
133  context->lock();
134 
135  // wait until something is produced
136  while (context->buffer_is_empty()) {
137  context->consumer_wait();
138  }
139 
140  assert(!context->buffer_is_empty());
141 
142  context->pop_product(item);
143  bool more_products = (*item != context->end_of_production_);
144 
145  context->consumer_next();
146 
147  // send a signal to consumer since there are available slot in buffer
148  context->producer_signal();
149 
150  context->unlock();
151 
152  return more_products;
153  }
154 
155  // it should be called when production complete
156  static void complete_production(_Context *context) {
157  produce(context, context->end_of_production_);
158  }
159 
160  // constructor
161  ProducerConsumerModelContext(DataType const terminator) :
162  end_of_production_(terminator), num_product_in_buffer_(0),
163  producer_index_(0), consumer_index_(0), mutex_(),
164  consumer_condition_(&mutex_), producer_condition_(&mutex_) {
165  //std::cout << "end_of_production = " << end_of_production_ << std::endl;
166  }
167 
168  // destructor
170  }
171 
172  // utility
173  template<class T>
174  static void locked_print(T msg, _Context *context) {
175  context->lock();
176  cout << msg << endl;
177  context->unlock();
178  }
179 
180 private:
181  int lock() {
182  return mutex_.lock();
183  }
184 
185  int unlock() {
186  return mutex_.unlock();
187  }
188 
189  int try_lock() {
190  return mutex_.try_lock();
191  }
192 
194  return producer_condition_.wait();
195  }
196 
198  return producer_condition_.signal();
199  }
200 
202  return consumer_condition_.wait();
203  }
204 
206  return consumer_condition_.signal();
207  }
208 
209  bool buffer_is_full() {
210  return num_product_in_buffer_ >= BufferSize;
211  }
212 
214  return num_product_in_buffer_ <= 0;
215  }
216 
217  void producer_next() {
218  producer_index_++;
219  producer_index_ %= BufferSize;
220  num_product_in_buffer_++;
221  }
222 
223  void consumer_next() {
224  consumer_index_++;
225  consumer_index_ %= BufferSize;
226  num_product_in_buffer_--;
227  }
228 
229  void push_product(DataType item) {
230  buffer_[producer_index_] = item;
231  }
232 
233  void pop_product(DataType *item) {
234  *item = buffer_[consumer_index_];
235  }
236 
237  // terminator data
238  // (product == end_of_production_) indicates that production
239  // is completed.
240  DataType const end_of_production_;
241  DataType buffer_[BufferSize];
248 };
249 
250 void create_thread(pthread_t *tid, pthread_attr_t *attr, void *(*func)(void *),
251  void *param);
252 void join_thread(pthread_t *tid, void **status);
253 
254 } //# NAMESPACE SDFILLER - END
255 } //# NAMESPACE CASA - END
256 
257 #endif /* _SINGLEDISH_PTHREAD_UTILS_H_ */
static void locked_print(T msg, _Context *context)
utility
Definition: PThreadUtil.h:174
#define THROW_IF(condition, msg)
Definition: PThreadUtil.h:13
virtual ~PCondition() noexcept(false)
Definition: PThreadUtil.h:69
void create_thread(pthread_t *tid, pthread_attr_t *attr, void *(*func)(void *), void *param)
pthread_mutex_t mutex_
Definition: PThreadUtil.h:55
static void complete_production(_Context *context)
it should be called when production complete
Definition: PThreadUtil.h:156
ProducerConsumerModelContext(DataType const terminator)
constructor
Definition: PThreadUtil.h:161
PCondition(Mutex *mutex)
Definition: PThreadUtil.h:62
ProducerConsumerModelContext< DataType, BufferSize > _Context
Definition: PThreadUtil.h:106
DataType const end_of_production_
terminator data (product == end_of_production_) indicates that production is completed.
Definition: PThreadUtil.h:240
implementation of producer consumer model
Definition: PThreadUtil.h:104
static void produce(_Context *context, DataType item)
production function
Definition: PThreadUtil.h:109
static bool consume(_Context *context, DataType *item)
consumption function return false if no more products available otherwise return true ...
Definition: PThreadUtil.h:132
~Mutex() noexcept(false)
Definition: PThreadUtil.h:32
pthread_mutex_t * mutex_
Definition: PThreadUtil.h:98
void join_thread(pthread_t *tid, void **status)