casa  $Rev:20696$
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines
ThreadCoordinator.h
Go to the documentation of this file.
00001 // -*- C++ -*-
00002 //# ThreadCoordinator.h: Definition of the ThreadCoordinator class
00003 //# Copyright (C) 1997,1998,1999,2000,2001,2002,2003
00004 //# Associated Universities, Inc. Washington DC, USA.
00005 //#
00006 //# This library is free software; you can redistribute it and/or modify it
00007 //# under the terms of the GNU Library General Public License as published by
00008 //# the Free Software Foundation; either version 2 of the License, or (at your
00009 //# option) any later version.
00010 //#
00011 //# This library is distributed in the hope that it will be useful, but WITHOUT
00012 //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00013 //# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
00014 //# License for more details.
00015 //#
00016 //# You should have received a copy of the GNU Library General Public License
00017 //# along with this library; if not, write to the Free Software Foundation,
00018 //# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA.
00019 //#
00020 //# Correspondence concerning AIPS++ should be addressed as follows:
00021 //#        Internet email: aips2-request@nrao.edu.
00022 //#        Postal address: AIPS++ Project Office
00023 //#                        National Radio Astronomy Observatory
00024 //#                        520 Edgemont Road
00025 //#                        Charlottesville, VA 22903-2475 USA
00026 //#
00027 //# $Id$
00028 
00029 #include <casa/aips.h>
00030 #include <casa/aipstype.h>
00031 #include <cstddef>
00032 
00033 //<example>
00034 //
00035 // Code in the master thread:
00036 //
00037 //    ThreadCoordinator threadCoordinator (nThreads);
00038 //
00039 //    // Create nThread threads and pass them a pointer to the thread coordinator
00040 //    // object.  Then enter the work loop
00041 //    
00042 //    for (int i = 0; i < nBuffers; i++){
00043 //    
00044 //      // Provide access to the i-th buffer
00045 //    
00046 //      threadCoordinator->getToWork(& vb); // tell workers to hop to it!
00047 //                                          // blocks if workers still busy
00048 //    
00049 //     }
00050 //    
00051 // Code in each worker thread:
00052 //    
00053 //    while (True){
00054 //    
00055 //      VisBuffer * workingBuffer = threadCoordinator_p->waitForWork (this);
00056 //      if (workingBuffer == NULL)
00057 //        break;
00058 //    
00059 //      doSomeWork(workingBuffer);
00060 //     }
00061 // </example>
00062 
00063 #ifndef SYNTHESIS_THREADCOORDINATOR_H
00064 #define SYNTHESIS_THREADCOORDINATOR_H
00065 
00066 namespace boost {
00067     class barrier;
00068 };
00069 
00070 namespace casa {
00071 
00072 namespace async {
00073   class Condition;
00074   class Mutex;
00075   class Thread;
00076 }
00077 
00078 class String;
00079 class VisBuffer;
00080 
00081 class ThreadCoordinatorBase {
00082 
00083 public:
00084 
00085   virtual ~ThreadCoordinatorBase ();
00086 
00087   void waitForWorkersToFinishTask ();
00088 
00089 protected:
00090 
00091   ThreadCoordinatorBase (Int nThreads, bool logStates);
00092 
00093 
00094   void dispatchWork ();
00095   void getToWork ();
00096   virtual void installWorkInfo () = 0;
00097   bool waitForWork (const async::Thread * thisThread);
00098   void waitForWorkersToReport ();
00099   Int nThreads_p;
00100 
00101 
00102 private:
00103 
00104   boost::barrier * barrier_p;
00105   bool logStates_p;
00106   async::Mutex * mutex_p;
00107   volatile Int nThreadsAtBarrier_p;
00108   volatile Int nThreadsDispatched_p;
00109   volatile Bool readyForWork_p;
00110   async::Condition * stateChanged_p;
00111   const VisBuffer * vb_p;
00112   volatile bool workCompleted_p;
00113   volatile bool workToBeDone_p;
00114 
00115   void logState (const String & tag) const;
00116 
00117 };
00118 
00119 template <typename T>
00120 class ThreadCoordinator : public ThreadCoordinatorBase {
00121 
00122 public:
00123 
00124     ThreadCoordinator (Int nThreads, Bool logStates = False) : ThreadCoordinatorBase (nThreads, logStates) {}
00125 
00126     void
00127     giveWorkToWorkers (T * workInfo)
00128     {
00129         workInfoInWaiting_p = workInfo;
00130         waitForWorkersToReport ();
00131         dispatchWork ();
00132     }
00133 
00134     void
00135     getToWork (T * workInfo)
00136     {
00137         workInfoInWaiting_p = workInfo;
00138         ThreadCoordinatorBase::getToWork ();
00139     }
00140 
00141     T *
00142     waitForWork (const async::Thread * thisThread)
00143     {
00144         bool ok = ThreadCoordinatorBase::waitForWork (thisThread);
00145         T * result = ok ? workInfo_p : NULL;
00146 
00147         return result;
00148     }
00149 
00150    void setNThreads(Int n) {nThreads_p=n;};
00151    Int nThreads() {return nThreads_p;};
00152 protected:
00153 
00154     void
00155     installWorkInfo ()
00156     {
00157         workInfo_p = workInfoInWaiting_p;
00158         workInfoInWaiting_p = NULL;
00159     }
00160 
00161 
00162 private:
00163 
00164     T * workInfoInWaiting_p;
00165     T * workInfo_p;
00166 };
00167 
00168 } // end namespace casa
00169 #endif //