LCOV - code coverage report
Current view: top level - synthesis/Utilities - ThreadCoordinator.cc (source / functions) Hit Total Coverage
Test: ctest_coverage.info Lines: 0 56 0.0 %
Date: 2023-11-06 10:06:49 Functions: 0 9 0.0 %

          Line data    Source code
       1             : // -*- C++ -*-
       2             : //# ThreadCoordinator.cc: Implementation of the ThreadCoordinator class
       3             : //# Copyright (C) 1997,1998,1999,2000,2001,2002,2003
       4             : //# Associated Universities, Inc. Washington DC, USA.
       5             : //#
       6             : //# This library is free software; you can redistribute it and/or modify it
       7             : //# under the terms of the GNU Library General Public License as published by
       8             : //# the Free Software Foundation; either version 2 of the License, or (at your
       9             : //# option) any later version.
      10             : //#
      11             : //# This library is distributed in the hope that it will be useful, but WITHOUT
      12             : //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      13             : //# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
      14             : //# License for more details.
      15             : //#
      16             : //# You should have received a copy of the GNU Library General Public License
      17             : //# along with this library; if not, write to the Free Software Foundation,
      18             : //# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA.
      19             : //#
      20             : //# Correspondence concerning AIPS++ should be addressed as follows:
      21             : //#        Internet email: aips2-request@nrao.edu.
      22             : //#        Postal address: AIPS++ Project Office
      23             : //#                        National Radio Astronomy Observatory
      24             : //#                        520 Edgemont Road
      25             : //#                        Charlottesville, VA 22903-2475 USA
      26             : //#
      27             : //# $Id$
      28             : 
      29             : #include <synthesis/Utilities/ThreadCoordinator.h>
      30             : #include <stdcasa/thread/AsynchronousTools.h>
      31             : #include <cassert>
      32             : #include <stdcasa/thread/Barrier.h>
      33             : 
      34             : 
      35             : #define Log(level, ...) \
      36             :     {Logger::get()->log (__VA_ARGS__);};
      37             : 
      38             : using namespace casacore;
      39             : using namespace casa::async;
      40             : 
      41             : using namespace casacore;
      42             : namespace casa {
      43             : 
      44           0 : ThreadCoordinatorBase::ThreadCoordinatorBase (Int nThreads, bool logStates)
      45             :   : nThreads_p (nThreads),
      46           0 :     barrier_p (new Barrier(nThreads)),
      47             :     logStates_p (logStates),
      48           0 :     mutex_p (new async::Mutex()),
      49             :     nThreadsAtBarrier_p (0),
      50             :     nThreadsDispatched_p (0),
      51             :     readyForWork_p (false),
      52           0 :     stateChanged_p (new Condition ()),
      53             :     workCompleted_p (false),
      54           0 :     workToBeDone_p (false)
      55           0 : {}
      56             : 
      57           0 : ThreadCoordinatorBase::~ThreadCoordinatorBase ()
      58             : {
      59           0 :     delete mutex_p;
      60           0 :     delete stateChanged_p;
      61           0 : }
      62             : 
      63             : void
      64           0 : ThreadCoordinatorBase::dispatchWork ()
      65             : {
      66           0 :   MutexLocker ml (* mutex_p);
      67             : 
      68           0 :   installWorkInfo (); // have subclass install work info for distribution
      69             : 
      70           0 :   workToBeDone_p = true;
      71           0 :   workCompleted_p = false;
      72           0 :   stateChanged_p->notify_all ();
      73           0 : }
      74             : 
      75             : void
      76           0 : ThreadCoordinatorBase::getToWork ()
      77             : {
      78             : 
      79           0 :   waitForWorkersToReport ();
      80             : 
      81             :   // Direct the workers to do another round of work
      82             : 
      83           0 :   dispatchWork ();
      84             : 
      85             :   // Wait for them to complete the current round of work
      86             : 
      87           0 :   waitForWorkersToFinishTask ();
      88           0 : }
      89             : 
      90             : void
      91           0 : ThreadCoordinatorBase::logState (const String & tag) const
      92             : {
      93           0 :     if (logStates_p){
      94           0 :         Log (1, "%s:\tready=%s\tworkToBeDone=%s\tworkCompleted=%s\n",
      95             :              tag.c_str(),
      96             :              readyForWork_p ? "T" : "F",
      97             :              workToBeDone_p ? "T" : "F",
      98             :              workCompleted_p ? "T" : "F");
      99             :     }
     100           0 : }
     101             : 
     102             : Bool
     103           0 : ThreadCoordinatorBase::waitForWork (const Thread * thisThread)
     104             : {
     105             : 
     106             :   // Decrement the working thread counter.  If it's now zero then
     107             :   // clear the workToBeDone flag and signal a stateChange.
     108             : 
     109             :   //  logState ("waitForWork (at barrier)");
     110             : 
     111             :   // Wait until all of the worker threads are done. This prevents an
     112             :   // eager beaver thread from trying to do two passes through the work
     113             :   // cycle while another has not been awoken.
     114             : 
     115             :   {
     116           0 :       MutexLocker ml (* mutex_p);
     117           0 :       ++ nThreadsAtBarrier_p;
     118           0 :       if (nThreadsAtBarrier_p == nThreads_p){
     119           0 :           workCompleted_p = true;
     120           0 :           nThreadsAtBarrier_p = 0;
     121           0 :           stateChanged_p->notify_all ();
     122             :       }
     123             :   }
     124             : 
     125           0 :   barrier_p->wait ();
     126             : 
     127           0 :   UniqueLock uniqueLock (* mutex_p);
     128             : 
     129           0 :   if (! readyForWork_p){
     130           0 :       readyForWork_p = true;
     131           0 :       stateChanged_p->notify_all ();
     132             :   }
     133             : 
     134             :   //  logState ("waitForWork (past barrier)");
     135             : 
     136             :   // Wait for the next bit of work
     137             : 
     138           0 :   while (! workToBeDone_p && ! thisThread->isTerminationRequested()){
     139           0 :     stateChanged_p->wait (uniqueLock);
     140             :   }
     141             : 
     142           0 :   ++ nThreadsDispatched_p;
     143           0 :   if (nThreadsDispatched_p == nThreads_p){
     144           0 :       nThreadsDispatched_p = 0;
     145           0 :       workToBeDone_p = false;
     146             :   }
     147             : 
     148           0 :   readyForWork_p = false;
     149             : 
     150             :   //  logState ("waitForWork (end wait)");
     151             : 
     152           0 :   return ! thisThread->isTerminationRequested();
     153             : }
     154             : 
     155             : void
     156           0 : ThreadCoordinatorBase::waitForWorkersToFinishTask ()
     157             : {
     158           0 :   UniqueLock uniqueLock (* mutex_p);
     159             : 
     160             :   // Wait for them to complete the current round of work
     161             : 
     162             :   //  logState ("Waiting for workers to finish task...");
     163             : 
     164           0 :   while (! workCompleted_p){
     165           0 :     stateChanged_p->wait (uniqueLock);
     166             :   }
     167             : 
     168             :   //  logState ("... workers have finished task.");
     169             : 
     170           0 : }
     171             : 
     172             : void
     173           0 : ThreadCoordinatorBase::waitForWorkersToReport ()
     174             : {
     175           0 :   UniqueLock uniqueLock (* mutex_p);
     176             : 
     177             :   // Wait for all of the worker threads to complete
     178             :   // the previous work.
     179             : 
     180             :   //  logState ("Waiting for workers to report ...");
     181             : 
     182           0 :   while (! readyForWork_p || workToBeDone_p){
     183           0 :     stateChanged_p->wait (uniqueLock);
     184             :   }
     185             :   //  logState ("... workers have reported");
     186           0 : }
     187             : 
     188             : using namespace casacore;
     189             : } // end namespace casa

Generated by: LCOV version 1.16