Line data Source code
1 : // -*- C++ -*- 2 : //# ThreadCoordinator.cc: Implementation of the ThreadCoordinator class 3 : //# Copyright (C) 1997,1998,1999,2000,2001,2002,2003 4 : //# Associated Universities, Inc. Washington DC, USA. 5 : //# 6 : //# This library is free software; you can redistribute it and/or modify it 7 : //# under the terms of the GNU Library General Public License as published by 8 : //# the Free Software Foundation; either version 2 of the License, or (at your 9 : //# option) any later version. 10 : //# 11 : //# This library is distributed in the hope that it will be useful, but WITHOUT 12 : //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 : //# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public 14 : //# License for more details. 15 : //# 16 : //# You should have received a copy of the GNU Library General Public License 17 : //# along with this library; if not, write to the Free Software Foundation, 18 : //# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA. 19 : //# 20 : //# Correspondence concerning AIPS++ should be addressed as follows: 21 : //# Internet email: aips2-request@nrao.edu. 22 : //# Postal address: AIPS++ Project Office 23 : //# National Radio Astronomy Observatory 24 : //# 520 Edgemont Road 25 : //# Charlottesville, VA 22903-2475 USA 26 : //# 27 : //# $Id$ 28 : 29 : #include <synthesis/Utilities/ThreadCoordinator.h> 30 : #include <stdcasa/thread/AsynchronousTools.h> 31 : #include <cassert> 32 : #include <stdcasa/thread/Barrier.h> 33 : 34 : 35 : #define Log(level, ...) \ 36 : {Logger::get()->log (__VA_ARGS__);}; 37 : 38 : using namespace casacore; 39 : using namespace casa::async; 40 : 41 : using namespace casacore; 42 : namespace casa { 43 : 44 0 : ThreadCoordinatorBase::ThreadCoordinatorBase (Int nThreads, bool logStates) 45 : : nThreads_p (nThreads), 46 0 : barrier_p (new Barrier(nThreads)), 47 : logStates_p (logStates), 48 0 : mutex_p (new async::Mutex()), 49 : nThreadsAtBarrier_p (0), 50 : nThreadsDispatched_p (0), 51 : readyForWork_p (false), 52 0 : stateChanged_p (new Condition ()), 53 : workCompleted_p (false), 54 0 : workToBeDone_p (false) 55 0 : {} 56 : 57 0 : ThreadCoordinatorBase::~ThreadCoordinatorBase () 58 : { 59 0 : delete mutex_p; 60 0 : delete stateChanged_p; 61 0 : } 62 : 63 : void 64 0 : ThreadCoordinatorBase::dispatchWork () 65 : { 66 0 : MutexLocker ml (* mutex_p); 67 : 68 0 : installWorkInfo (); // have subclass install work info for distribution 69 : 70 0 : workToBeDone_p = true; 71 0 : workCompleted_p = false; 72 0 : stateChanged_p->notify_all (); 73 0 : } 74 : 75 : void 76 0 : ThreadCoordinatorBase::getToWork () 77 : { 78 : 79 0 : waitForWorkersToReport (); 80 : 81 : // Direct the workers to do another round of work 82 : 83 0 : dispatchWork (); 84 : 85 : // Wait for them to complete the current round of work 86 : 87 0 : waitForWorkersToFinishTask (); 88 0 : } 89 : 90 : void 91 0 : ThreadCoordinatorBase::logState (const String & tag) const 92 : { 93 0 : if (logStates_p){ 94 0 : Log (1, "%s:\tready=%s\tworkToBeDone=%s\tworkCompleted=%s\n", 95 : tag.c_str(), 96 : readyForWork_p ? "T" : "F", 97 : workToBeDone_p ? "T" : "F", 98 : workCompleted_p ? "T" : "F"); 99 : } 100 0 : } 101 : 102 : Bool 103 0 : ThreadCoordinatorBase::waitForWork (const Thread * thisThread) 104 : { 105 : 106 : // Decrement the working thread counter. If it's now zero then 107 : // clear the workToBeDone flag and signal a stateChange. 108 : 109 : // logState ("waitForWork (at barrier)"); 110 : 111 : // Wait until all of the worker threads are done. This prevents an 112 : // eager beaver thread from trying to do two passes through the work 113 : // cycle while another has not been awoken. 114 : 115 : { 116 0 : MutexLocker ml (* mutex_p); 117 0 : ++ nThreadsAtBarrier_p; 118 0 : if (nThreadsAtBarrier_p == nThreads_p){ 119 0 : workCompleted_p = true; 120 0 : nThreadsAtBarrier_p = 0; 121 0 : stateChanged_p->notify_all (); 122 : } 123 : } 124 : 125 0 : barrier_p->wait (); 126 : 127 0 : UniqueLock uniqueLock (* mutex_p); 128 : 129 0 : if (! readyForWork_p){ 130 0 : readyForWork_p = true; 131 0 : stateChanged_p->notify_all (); 132 : } 133 : 134 : // logState ("waitForWork (past barrier)"); 135 : 136 : // Wait for the next bit of work 137 : 138 0 : while (! workToBeDone_p && ! thisThread->isTerminationRequested()){ 139 0 : stateChanged_p->wait (uniqueLock); 140 : } 141 : 142 0 : ++ nThreadsDispatched_p; 143 0 : if (nThreadsDispatched_p == nThreads_p){ 144 0 : nThreadsDispatched_p = 0; 145 0 : workToBeDone_p = false; 146 : } 147 : 148 0 : readyForWork_p = false; 149 : 150 : // logState ("waitForWork (end wait)"); 151 : 152 0 : return ! thisThread->isTerminationRequested(); 153 : } 154 : 155 : void 156 0 : ThreadCoordinatorBase::waitForWorkersToFinishTask () 157 : { 158 0 : UniqueLock uniqueLock (* mutex_p); 159 : 160 : // Wait for them to complete the current round of work 161 : 162 : // logState ("Waiting for workers to finish task..."); 163 : 164 0 : while (! workCompleted_p){ 165 0 : stateChanged_p->wait (uniqueLock); 166 : } 167 : 168 : // logState ("... workers have finished task."); 169 : 170 0 : } 171 : 172 : void 173 0 : ThreadCoordinatorBase::waitForWorkersToReport () 174 : { 175 0 : UniqueLock uniqueLock (* mutex_p); 176 : 177 : // Wait for all of the worker threads to complete 178 : // the previous work. 179 : 180 : // logState ("Waiting for workers to report ..."); 181 : 182 0 : while (! readyForWork_p || workToBeDone_p){ 183 0 : stateChanged_p->wait (uniqueLock); 184 : } 185 : // logState ("... workers have reported"); 186 0 : } 187 : 188 : using namespace casacore; 189 : } // end namespace casa