LCOV - code coverage report
Current view: top level - singledishfiller/Filler - PThreadUtil.h (source / functions) Hit Total Coverage
Test: ctest_coverage.info Lines: 0 100 0.0 %
Date: 2023-11-06 10:06:49 Functions: 0 28 0.0 %

          Line data    Source code
       1             : #ifndef _SINGLEDISH_PTHREAD_UTILS_H_
       2             : #define _SINGLEDISH_PTHREAD_UTILS_H_
       3             : 
       4             : #include <pthread.h>
       5             : 
       6             : #include <iostream>
       7             : #include <sstream>
       8             : #include <stdexcept>
       9             : #include <unistd.h>
      10             : 
      11             : using namespace std;
      12             : 
      13             : #define THROW_IF(condition, msg) \
      14             :   do { \
      15             :     if ((condition)) { \
      16             :       throw runtime_error((msg)); \
      17             :     } \
      18             :   } while (false)
      19             : 
      20             : namespace casa { //# NAMESPACE CASA - BEGIN
      21             : namespace sdfiller { //# NAMESPACE SDFILLER - BEGIN
      22             : 
      23             : class Mutex {
      24             : 
      25             : public:
      26           0 :   Mutex() :
      27           0 :       mutex_(PTHREAD_MUTEX_INITIALIZER) {
      28             : //    cout << "Mutex::Mutex()" << endl;
      29           0 :     int ret = pthread_mutex_init(&mutex_, NULL);
      30           0 :     THROW_IF(ret != 0, "Mutex::Mutex() failed to initalize mutex");
      31           0 :   }
      32           0 :   ~Mutex() noexcept(false) {
      33             : //    cout << "Mutex::~Mutex()" << endl;
      34           0 :     int ret = pthread_mutex_destroy(&mutex_);
      35           0 :     THROW_IF(ret != 0, "Mutex::~Mutex() failed to destroy mutex");
      36           0 :   }
      37           0 :   int lock() {
      38             : //    cout << "Mutex::lock()" << endl;
      39           0 :     int ret = pthread_mutex_lock(&mutex_);
      40           0 :     THROW_IF(ret != 0, "Mutex::lock() failed to lock mutex");
      41           0 :     return ret;
      42             :   }
      43           0 :   int unlock() {
      44             : //    cout << "Mutex::unlock()" << endl;
      45           0 :     int ret = pthread_mutex_unlock(&mutex_);
      46           0 :     THROW_IF(ret != 0, "Mutex::unlock() failed to unlock mutex");
      47           0 :     return ret;
      48             :   }
      49             :   int try_lock() {
      50             : //    cout << "Mutex::try_lock()" << endl;
      51             :     return pthread_mutex_trylock(&mutex_);
      52             :   }
      53             : 
      54             : protected:
      55             :   pthread_mutex_t mutex_;
      56             : 
      57             :   friend class PCondition;
      58             : };
      59             : 
      60             : class PCondition {
      61             : public:
      62           0 :   PCondition(Mutex *mutex) :
      63           0 :       mutex_(&(mutex->mutex_)) {
      64           0 :     int ret = pthread_cond_init(&cond_, NULL);
      65           0 :     THROW_IF(ret != 0,
      66             :         "PCondition::PCondition() failed to initialize pthread_cond_t");
      67           0 :   }
      68             : 
      69           0 :   virtual ~PCondition() noexcept(false) {
      70           0 :     int ret = pthread_cond_destroy(&cond_);
      71           0 :     THROW_IF(ret != 0,
      72             :         "PCondition::~PCondition() failed to destroy pthread_cond_t");
      73           0 :   }
      74             :   int lock() {
      75             : //    cout << "PCondition::lock()" << endl;
      76             :     return pthread_mutex_lock(mutex_);
      77             :   }
      78             : 
      79             :   int unlock() {
      80             : //    cout << "PCondition::unlock()" << endl;
      81             :     return pthread_mutex_unlock(mutex_);
      82             :   }
      83             : 
      84           0 :   int wait() {
      85             : //    cout << "PCondition::wait()" << endl;
      86           0 :     int ret = pthread_cond_wait(&cond_, mutex_);
      87           0 :     THROW_IF(ret != 0, "PCondition::wait() failed to block pthread_cond_t");
      88           0 :     return ret;
      89             :   }
      90             : 
      91           0 :   int signal() {
      92             : //    cout << "PCondition::signal()" << endl;
      93           0 :     int ret = pthread_cond_signal(&cond_);
      94           0 :     THROW_IF(ret != 0, "PCondition::signal() failed to release pthread_cond_t");
      95           0 :     return ret;
      96             :   }
      97             : private:
      98             :   pthread_mutex_t *mutex_;
      99             :   pthread_cond_t cond_;
     100             : };
     101             : 
     102             : // implementation of producer consumer model
     103             : template<class DataType, ssize_t BufferSize>
     104             : class ProducerConsumerModelContext {
     105             : public:
     106             :   typedef ProducerConsumerModelContext<DataType, BufferSize> _Context;
     107             : 
     108             :   // production function
     109           0 :   static void produce(_Context *context, DataType item) {
     110           0 :     context->lock();
     111             : 
     112             :     // wait until buffer becomes available for production
     113           0 :     while (context->buffer_is_full()) {
     114           0 :       context->producer_wait();
     115             :     }
     116             : 
     117           0 :     assert(!context->buffer_is_full());
     118             : 
     119           0 :     context->push_product(item);
     120             : 
     121           0 :     context->producer_next();
     122             : 
     123             :     // send a signal to consumer since something is produced
     124           0 :     context->consumer_signal();
     125             : 
     126           0 :     context->unlock();
     127           0 :   }
     128             : 
     129             :   // consumption function
     130             :   // return false if no more products available
     131             :   // otherwise return true
     132           0 :   static bool consume(_Context *context, DataType *item) {
     133           0 :     context->lock();
     134             : 
     135             :     // wait until something is produced
     136           0 :     while (context->buffer_is_empty()) {
     137           0 :       context->consumer_wait();
     138             :     }
     139             : 
     140           0 :     assert(!context->buffer_is_empty());
     141             : 
     142           0 :     context->pop_product(item);
     143           0 :     bool more_products = (*item != context->end_of_production_);
     144             : 
     145           0 :     context->consumer_next();
     146             : 
     147             :     // send a signal to consumer since there are available slot in buffer
     148           0 :     context->producer_signal();
     149             : 
     150           0 :     context->unlock();
     151             : 
     152           0 :     return more_products;
     153             :   }
     154             : 
     155             :   // it should be called when production complete
     156           0 :   static void complete_production(_Context *context) {
     157           0 :     produce(context, context->end_of_production_);
     158           0 :   }
     159             : 
     160             :   // constructor
     161           0 :   ProducerConsumerModelContext(DataType const terminator) :
     162             :       end_of_production_(terminator), num_product_in_buffer_(0),
     163             :       producer_index_(0), consumer_index_(0), mutex_(),
     164           0 :       consumer_condition_(&mutex_), producer_condition_(&mutex_) {
     165             :     //std::cout << "end_of_production = " << end_of_production_ << std::endl;
     166           0 :   }
     167             : 
     168             :   // destructor
     169           0 :   ~ProducerConsumerModelContext() {
     170           0 :   }
     171             : 
     172             :   // utility
     173             :   template<class T>
     174           0 :   static void locked_print(T msg, _Context *context) {
     175           0 :     context->lock();
     176           0 :     cout << msg << endl;
     177           0 :     context->unlock();
     178           0 :   }
     179             : 
     180             : private:
     181           0 :   int lock() {
     182           0 :     return mutex_.lock();
     183             :   }
     184             : 
     185           0 :   int unlock() {
     186           0 :     return mutex_.unlock();
     187             :   }
     188             : 
     189             :   int try_lock() {
     190             :     return mutex_.try_lock();
     191             :   }
     192             : 
     193           0 :   int producer_wait() {
     194           0 :     return producer_condition_.wait();
     195             :   }
     196             : 
     197           0 :   int producer_signal() {
     198           0 :     return producer_condition_.signal();
     199             :   }
     200             : 
     201           0 :   int consumer_wait() {
     202           0 :     return consumer_condition_.wait();
     203             :   }
     204             : 
     205           0 :   int consumer_signal() {
     206           0 :     return consumer_condition_.signal();
     207             :   }
     208             : 
     209           0 :   bool buffer_is_full() {
     210           0 :     return num_product_in_buffer_ >= BufferSize;
     211             :   }
     212             : 
     213           0 :   bool buffer_is_empty() {
     214           0 :     return num_product_in_buffer_ <= 0;
     215             :   }
     216             : 
     217           0 :   void producer_next() {
     218           0 :     producer_index_++;
     219           0 :     producer_index_ %= BufferSize;
     220           0 :     num_product_in_buffer_++;
     221           0 :   }
     222             : 
     223           0 :   void consumer_next() {
     224           0 :     consumer_index_++;
     225           0 :     consumer_index_ %= BufferSize;
     226           0 :     num_product_in_buffer_--;
     227           0 :   }
     228             : 
     229           0 :   void push_product(DataType item) {
     230           0 :     buffer_[producer_index_] = item;
     231           0 :   }
     232             : 
     233           0 :   void pop_product(DataType *item) {
     234           0 :     *item = buffer_[consumer_index_];
     235           0 :   }
     236             : 
     237             :   // terminator data
     238             :   // (product == end_of_production_) indicates that production
     239             :   // is completed.
     240             :   DataType const end_of_production_;
     241             :   DataType buffer_[BufferSize];
     242             :   ssize_t num_product_in_buffer_;
     243             :   ssize_t producer_index_;
     244             :   ssize_t consumer_index_;
     245             :   Mutex mutex_;
     246             :   PCondition consumer_condition_;
     247             :   PCondition producer_condition_;
     248             : };
     249             : 
     250             : void create_thread(pthread_t *tid, pthread_attr_t *attr, void *(*func)(void *),
     251             :     void *param);
     252             : void join_thread(pthread_t *tid, void **status);
     253             : 
     254             : } //# NAMESPACE SDFILLER - END
     255             : } //# NAMESPACE CASA - END
     256             : 
     257             : #endif /* _SINGLEDISH_PTHREAD_UTILS_H_ */

Generated by: LCOV version 1.16