casa
$Rev:20696$
|
00001 // -*- C++ -*- 00002 //# ThreadCoordinator.h: Definition of the ThreadCoordinator class 00003 //# Copyright (C) 1997,1998,1999,2000,2001,2002,2003 00004 //# Associated Universities, Inc. Washington DC, USA. 00005 //# 00006 //# This library is free software; you can redistribute it and/or modify it 00007 //# under the terms of the GNU Library General Public License as published by 00008 //# the Free Software Foundation; either version 2 of the License, or (at your 00009 //# option) any later version. 00010 //# 00011 //# This library is distributed in the hope that it will be useful, but WITHOUT 00012 //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 00013 //# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public 00014 //# License for more details. 00015 //# 00016 //# You should have received a copy of the GNU Library General Public License 00017 //# along with this library; if not, write to the Free Software Foundation, 00018 //# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA. 00019 //# 00020 //# Correspondence concerning AIPS++ should be addressed as follows: 00021 //# Internet email: aips2-request@nrao.edu. 00022 //# Postal address: AIPS++ Project Office 00023 //# National Radio Astronomy Observatory 00024 //# 520 Edgemont Road 00025 //# Charlottesville, VA 22903-2475 USA 00026 //# 00027 //# $Id$ 00028 00029 #include <casa/aips.h> 00030 #include <casa/aipstype.h> 00031 #include <cstddef> 00032 00033 //<example> 00034 // 00035 // Code in the master thread: 00036 // 00037 // ThreadCoordinator threadCoordinator (nThreads); 00038 // 00039 // // Create nThread threads and pass them a pointer to the thread coordinator 00040 // // object. Then enter the work loop 00041 // 00042 // for (int i = 0; i < nBuffers; i++){ 00043 // 00044 // // Provide access to the i-th buffer 00045 // 00046 // threadCoordinator->getToWork(& vb); // tell workers to hop to it! 00047 // // blocks if workers still busy 00048 // 00049 // } 00050 // 00051 // Code in each worker thread: 00052 // 00053 // while (True){ 00054 // 00055 // VisBuffer * workingBuffer = threadCoordinator_p->waitForWork (this); 00056 // if (workingBuffer == NULL) 00057 // break; 00058 // 00059 // doSomeWork(workingBuffer); 00060 // } 00061 // </example> 00062 00063 #ifndef SYNTHESIS_THREADCOORDINATOR_H 00064 #define SYNTHESIS_THREADCOORDINATOR_H 00065 00066 namespace boost { 00067 class barrier; 00068 }; 00069 00070 namespace casa { 00071 00072 namespace async { 00073 class Condition; 00074 class Mutex; 00075 class Thread; 00076 } 00077 00078 class String; 00079 class VisBuffer; 00080 00081 class ThreadCoordinatorBase { 00082 00083 public: 00084 00085 virtual ~ThreadCoordinatorBase (); 00086 00087 void waitForWorkersToFinishTask (); 00088 00089 protected: 00090 00091 ThreadCoordinatorBase (Int nThreads, bool logStates); 00092 00093 00094 void dispatchWork (); 00095 void getToWork (); 00096 virtual void installWorkInfo () = 0; 00097 bool waitForWork (const async::Thread * thisThread); 00098 void waitForWorkersToReport (); 00099 Int nThreads_p; 00100 00101 00102 private: 00103 00104 boost::barrier * barrier_p; 00105 bool logStates_p; 00106 async::Mutex * mutex_p; 00107 volatile Int nThreadsAtBarrier_p; 00108 volatile Int nThreadsDispatched_p; 00109 volatile Bool readyForWork_p; 00110 async::Condition * stateChanged_p; 00111 const VisBuffer * vb_p; 00112 volatile bool workCompleted_p; 00113 volatile bool workToBeDone_p; 00114 00115 void logState (const String & tag) const; 00116 00117 }; 00118 00119 template <typename T> 00120 class ThreadCoordinator : public ThreadCoordinatorBase { 00121 00122 public: 00123 00124 ThreadCoordinator (Int nThreads, Bool logStates = False) : ThreadCoordinatorBase (nThreads, logStates) {} 00125 00126 void 00127 giveWorkToWorkers (T * workInfo) 00128 { 00129 workInfoInWaiting_p = workInfo; 00130 waitForWorkersToReport (); 00131 dispatchWork (); 00132 } 00133 00134 void 00135 getToWork (T * workInfo) 00136 { 00137 workInfoInWaiting_p = workInfo; 00138 ThreadCoordinatorBase::getToWork (); 00139 } 00140 00141 T * 00142 waitForWork (const async::Thread * thisThread) 00143 { 00144 bool ok = ThreadCoordinatorBase::waitForWork (thisThread); 00145 T * result = ok ? workInfo_p : NULL; 00146 00147 return result; 00148 } 00149 00150 void setNThreads(Int n) {nThreads_p=n;}; 00151 Int nThreads() {return nThreads_p;}; 00152 protected: 00153 00154 void 00155 installWorkInfo () 00156 { 00157 workInfo_p = workInfoInWaiting_p; 00158 workInfoInWaiting_p = NULL; 00159 } 00160 00161 00162 private: 00163 00164 T * workInfoInWaiting_p; 00165 T * workInfo_p; 00166 }; 00167 00168 } // end namespace casa 00169 #endif //