LCOV - code coverage report
Current view: top level - synthesis/Parallel - Applicator.cc (source / functions) Hit Total Coverage
Test: ctest_coverage.info Lines: 133 207 64.3 %
Date: 2023-11-06 10:06:49 Functions: 14 18 77.8 %

          Line data    Source code
       1             : //# Applicator.cc: Implementation of Applicator.h
       2             : //# Copyright (C) 1999,2000,2002
       3             : //# Associated Universities, Inc. Washington DC, USA.
       4             : //#
       5             : //# This library is free software; you can redistribute it and/or modify it
       6             : //# under the terms of the GNU Library General Public License as published by
       7             : //# the Free Software Foundation; either version 2 of the License, or (at your
       8             : //# option) any later version.
       9             : //#
      10             : //# This library is distributed in the hope that it will be useful, but WITHOUT
      11             : //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
      12             : //# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
      13             : //# License for more details.
      14             : //#
      15             : //# You should have received a copy of the GNU Library General Public License
      16             : //# along with this library; if not, write to the Free Software Foundation,
      17             : //# Inc., 675 Massachusetts Ave, Cambridge, MA 02139, USA.
      18             : //#
      19             : //# Correspondence concerning AIPS++ should be addressed as follows:
      20             : //#        Internet email: aips2-request@nrao.edu.
      21             : //#        Postal address: AIPS++ Project Office
      22             : //#                        National Radio Astronomy Observatory
      23             : //#                        520 Edgemont Road
      24             : //#                        Charlottesville, VA 22903-2475 USA
      25             : //#
      26             : //# $Id$
      27             : 
      28             : #include <casacore/casa/Utilities/Assert.h>
      29             : 
      30             : #include <synthesis/Parallel/Applicator.h>
      31             : #include <synthesis/Parallel/MPITransport.h>
      32             : #include <synthesis/Parallel/SerialTransport.h>
      33             : #include <synthesis/Parallel/Algorithm.h>
      34             : #include <synthesis/MeasurementComponents/ClarkCleanAlgorithm.h>
      35             : #include <synthesis/MeasurementComponents/ReadMSAlgorithm.h>
      36             : #include <synthesis/MeasurementComponents/MakeApproxPSFAlgorithm.h>
      37             : #include <synthesis/MeasurementComponents/PredictAlgorithm.h>
      38             : #include <synthesis/MeasurementComponents/ResidualAlgorithm.h>
      39             : #include <synthesis/ImagerObjects/CubeMajorCycleAlgorithm.h>
      40             : #include <synthesis/ImagerObjects/CubeMakeImageAlgorithm.h>
      41             : #include <synthesis/ImagerObjects/CubeMinorCycleAlgorithm.h>
      42             : #include <synthesis/Parallel/MPIError.h>
      43             : 
      44             : using namespace casacore;
      45             : using namespace std;
      46             : namespace casa { //# NAMESPACE CASA - BEGIN
      47             : 
      48           1 : Applicator::Applicator() : comm(0), algorithmIds( ),
      49             :   knownAlgorithms( ), LastID(101), usedAllThreads(false),
      50           1 :                            serial(true), nProcs(0), procStatus(0), initialized_p(false)
      51             : {
      52             : // Default constructor; requires later init().
      53           1 : }
      54             : 
      55           1 : Applicator::~Applicator()
      56             : {
      57             : // Default destructor
      58             : //  
      59           1 :   if (comm) {
      60             :     // If controller, then stop all worker processes
      61           1 :     if (isController() && !(comm->isFinalized())) {
      62           0 :       comm->setTag(STOP);
      63           0 :       for (Int i=0; i<nProcs; i++) {
      64           0 :         if (i != comm->controllerRank()) {
      65           0 :           comm->connect(i);
      66           0 :           put(STOP);
      67             :         }
      68             :       }
      69             :     }
      70           1 :     delete comm;
      71             :   }
      72             : 
      73           9 :   for (auto &algo : knownAlgorithms) {
      74           8 :       delete algo.second;
      75             :   }
      76           1 : }
      77             : 
      78           1 : void Applicator::initThreads(Int argc, Char *argv[]){
      79             : 
      80           1 :   Int numprocs=0;
      81             :  
      82             :    // A no-op if not using MPI
      83             : #ifdef HAVE_MPI
      84             :   //if (debug_p) {
      85             : 
      86           1 :   if(initialized_p) return;
      87             :   
      88             :   //If detecting only  1 proc is offered to OpenMPI but compiling with MPI
      89           1 :   if (!getenv("OMPI_COMM_WORLD_LOCAL_SIZE") ||  (String::toInt(getenv("OMPI_COMM_WORLD_LOCAL_SIZE")) <2) ) {
      90             :     //go serial
      91           1 :     initThreads();
      92             :   } 
      93             :   else {
      94             :     //cerr << "In initThreads. argc: " << argc << ", argv: " << argv << '\n';
      95           0 :     int flag=0;
      96           0 :     MPI_Initialized(&flag);
      97             :     //cerr << "FLAG " << flag << endl;
      98           0 :     if(flag || MPI_Init(&argc, &argv)==MPI_SUCCESS){
      99           0 :       Int numproc=0;
     100           0 :       MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
     101           0 :       if(numprocs < 2){
     102           0 :         initThreads();
     103           0 :         MPI_Finalize();
     104           0 :         return;
     105             :       }
     106             :     }
     107             :  
     108             :     //  cerr << "In initThreads. argc: " << argc << ", argv: " << argv << '\n';
     109             :     // Initialize the MPI transport layer
     110             :     try {
     111           0 :       comm = new MPITransport(argc, argv);
     112             : 
     113             :        // Initialize the process status list
     114           0 :        setupProcStatus();
     115             : 
     116             :        // If controller then exit, else loop, waiting for an assigned task
     117           0 :        if (isWorker()) {
     118           0 :          loop();
     119             :        }
     120             : 
     121           0 :     } catch (MPIError x) {
     122           0 :       cerr << x.getMesg() << " doing serial "<< endl;
     123           0 :       initThreads();
     124             :     }
     125             :   } 
     126             : 
     127             : #else
     128             :   (void)argc;
     129             :   (void)argv;
     130             :   cerr << " doing serial "<< endl;
     131             :   initThreads();
     132             : #endif
     133             : }
     134             : 
     135             :    // Serial transport all around.
     136           1 : void Applicator::initThreads(){
     137             :      // Initialize a serial transport layer
     138           1 :   comm = new SerialTransport();
     139             :      // Initialize the process status list
     140           1 :   setupProcStatus();
     141           1 : }
     142           0 : void Applicator::destroyThreads(){
     143           0 :   if(initialized_p){
     144           0 :     if (comm) {
     145             :     // If controller, then stop all worker processes
     146           0 :       if (isController() && !isSerial() && !(comm->isFinalized())) {
     147             :               //comm->setTag(STOP);
     148           0 :         for (Int i=0; i<nProcs; i++) {
     149           0 :           if (i != comm->controllerRank()) {
     150           0 :             comm->connect(i);
     151           0 :             comm->setTag(STOP);
     152           0 :             put(STOP);
     153             : 
     154             :           }
     155             :         }
     156             :       }
     157             :       //delete comm; ///leaking this for now as if initialized from python..it brings down the whole house
     158             :       //comm=nullptr;
     159             :     }
     160             : 
     161             :   }
     162             : 
     163           0 : }
     164        1064 : void Applicator::init(Int argc, Char *argv[])
     165             : {
     166             : // Initialize the process and parallel transport layer
     167             : //
     168             :   //cerr <<"Applicatorinit " << initialized_p << endl;
     169        1064 :   if(comm){
     170             :     //if worker  was released from loop...want it back now
     171        1063 :     if(comm && isWorker() && !isSerial())
     172           0 :       loop();
     173        1063 :     return;
     174             :   }
     175             :   // Fill the map of known algorithms
     176             :   //cerr << "APPINIT defining algorithms " << endl;
     177           1 :   defineAlgorithms();
     178             : 
     179             : #ifdef HAVE_MPI
     180           1 :   if (debug_p) {
     181           0 :      cerr << "In init threads, HAVE_MPI...\n";
     182             :   }
     183           1 :   initThreads(argc, argv);
     184             : #else
     185             :   if (debug_p) {
     186             :       cerr << "In init threads, not HAVE_MPI...\n";
     187             :   }
     188             :   (void)argc;
     189             :   (void)argv;
     190             :   initThreads();
     191             : #endif
     192           1 :   initialized_p=true;
     193           1 :   return;
     194             : }
     195             : 
     196        2129 : Bool Applicator::isController()
     197             : {
     198             : // Return T if the current process is the controller
     199             : //
     200             :   Bool result;
     201        2129 :   if (comm) {
     202        2129 :     result = comm->isController();
     203             :   } else {
     204           0 :     throw(AipsError("Parallel transport layer not initialized"));
     205             :   }
     206        2129 :   return result;
     207             : }
     208             : 
     209        3191 : Bool Applicator::isWorker()
     210             : {
     211             : // Return T if the current process is a worker process
     212             : //
     213             :   Bool result;
     214        3191 :   if (comm) {
     215        3191 :     result = comm->isWorker();
     216             :   } else {
     217           0 :     throw(AipsError("Parallel transport layer not initialized"));
     218             :   }
     219        3191 :   return result;
     220             : }
     221             : 
     222           0 : void Applicator::loop()
     223             : {
     224             : // Loop, if a worker process, waiting for an assigned task
     225             : //
     226           0 :   Bool die(false);
     227             :   Int what;
     228             :   // Wait for a message from the controller with any Algorithm tag
     229           0 :   while(!die){
     230           0 :     comm->connectToController();
     231           0 :     comm->setAnyTag();
     232             :     //cerr << "in loop get" << endl;
     233           0 :     comm->get(what);
     234           0 :     if (debug_p) {
     235           0 :         cerr << "worker, got what (algID/stop): " << what << endl;
     236             :     }
     237           0 :     switch(what){
     238           0 :     case STOP :
     239           0 :       die = true;
     240           0 :       break;
     241           0 :     default :
     242             :       // In this case, an Algorithm tag is expected.
     243             :       // First check that it is known.
     244           0 :       if (knownAlgorithms.find(what) != knownAlgorithms.end( )) {
     245             :         // Identified algorithm tag; set for subsequent communication
     246           0 :         comm->setTag(what);
     247             :         // Execute (apply) the algorithm
     248           0 :         knownAlgorithms.at(what)->apply();
     249             :       } else {
     250           0 :         throw(AipsError("Unidentified parallel algorithm code"));
     251             :       }
     252           0 :       break;
     253             :     }
     254             :   }
     255             :   //cerr <<"getting out of loop " <<endl;
     256           0 :   return;
     257             : }
     258             : 
     259        1064 : Bool Applicator::nextAvailProcess(Algorithm &a, Int &rank)
     260             : {
     261             : // Assign the next available process for the specified Algorithm
     262             : //  
     263             :   // Must be the controller to request a worker process
     264        1064 :   Bool assigned=False;
     265        1064 :   if (isWorker()) {
     266           0 :     throw(AipsError("Must be the controller to assign a worker process"));
     267             :   } else {
     268        1064 :     if (!usedAllThreads) {
     269             :       // Connect to the next available process in the list
     270             :       Bool lastOne;
     271        1064 :       rank = findFreeProc(lastOne);
     272        1064 :       AlwaysAssert(rank >= 0, AipsError);
     273        1064 :       if (lastOne) usedAllThreads = true;
     274        1064 :       Int tag = algorithmIds.find(a.name()) == algorithmIds.end( ) ? 0 : algorithmIds.at(a.name());
     275             :       
     276             :       // Send wake-up message (containing the Algorithm tag) to
     277             :       // the assigned worker process to activate it (see loop()).
     278        1064 :       comm->connect(rank);
     279        1064 :       comm->setTag(tag);
     280             :       //cerr << "nextAvailproc settag " << tag << " rank " << rank << " name " << a.name() << endl;
     281        1064 :       put(tag);
     282             :       /*
     283             :       if (not isWorker() and numProcs() <= 1){
     284             :       // the first int, algID, is consumed in the loop for the workers when running
     285             :       // in multiprocess mode and there are at least 2 processes. When not multiprocess or a
     286             :       // single process, we need to consume it:
     287             :       // TODO - it could be consumed up here, right after the put()
     288             :       int algID;
     289             :       comm->get(algID);
     290             :     if (debug_p) {
     291             :       cerr << "nextAvailproc controller, got algID: " << algID << " assigned " << assigned << " donesig " << donesig_p<<  endl;
     292             :      }
     293             :       }
     294             :       */
     295        1064 :       assigned = true;
     296        1064 :       procStatus(rank) = ASSIGNED;
     297             :     } else {
     298           0 :       assigned = false;
     299             :     }
     300             :   }
     301             :   //cerr << "nextAvailproc controller assigned " << assigned << endl;
     302             :   
     303        1064 :   if ((!isWorker()) && (numProcs() <= 1) && assigned){
     304             :       // the first int, algID, is consumed in the loop for the workers when running
     305             :       // in multiprocess mode and there are at least 2 processes. When not multiprocess or a
     306             :       // single process, we need to consume it:
     307             :       // TODO - it could be consumed up here, right after the put()
     308             :       Int algID;
     309             :       //comm->get(algID);
     310        1064 :       get(algID);
     311        1064 :     if (debug_p) {
     312           0 :       cerr << "nextAvailproc controller, got algID: " << algID << " assigned " << assigned << " donesig " << donesig_p<<  endl;
     313             :      }
     314             :   }
     315             :   
     316        1064 :   return assigned;
     317             : }
     318             : 
     319           0 : bool Applicator::initialized(){
     320             : #ifdef HAVE_MPI
     321           0 :   return initialized_p;  
     322             : #endif  
     323             :   
     324             :   return false;
     325             : }
     326        2128 : Int Applicator::nextProcessDone(Algorithm &a, Bool &allDone)
     327             : {
     328             : // Return the rank of the next process to complete the specified algorithm
     329             : //
     330        2128 :   Int rank = -1;
     331        2128 :   allDone = true;
     332             :   //cerr << "nextprocess done procstatus " << procStatus << endl;
     333        4256 :   for (uInt i=0; i<procStatus.nelements(); i++) {
     334        2128 :     if (procStatus(i) == ASSIGNED) {
     335        1064 :       if (isSerial()) {
     336             :         // In the serial case, the controller can be assigned
     337        1064 :         allDone = false;
     338             :       } else {
     339             :         // In the parallel case, the controller is not assigned
     340           0 :         if (i != static_cast<uInt>(comm->controllerRank())) {
     341           0 :           allDone = false;
     342             :         }
     343             :       }
     344             :     }
     345             :   }
     346        2128 :   if (!allDone) {
     347             :     // Wait for a process to finish with the correct algorithm tag
     348        1064 :     comm->connectAnySource();
     349        1064 :     Int tag = algorithmIds.find(a.name()) == algorithmIds.end( ) ? 0 : algorithmIds.at(a.name());
     350             :     //cerr <<"procdone name" << a.name() << " id " << tag << endl;
     351        1064 :     comm->setTag(tag);
     352             :     Int doneSignal;
     353        1064 :     rank = get(doneSignal);
     354             :     //cerr <<" procdone rank " << rank << " donesig " << doneSignal << endl;
     355             :     // Consistency check; should return a DONE signal to contoller
     356             :     // on completion.
     357        1064 :     if (doneSignal != DONE) {
     358           0 :       throw(AipsError("Worker process terminated unexpectedly"));
     359             :     } else {
     360             :       // Set source in parallel transport layer
     361        1064 :       comm->connect(rank);
     362             :       // Mark process as free
     363        1064 :       procStatus(rank) = FREE;
     364             :       //cerr << "NEXTProcDone connect rank" << rank << " procstat " << procStatus << endl; 
     365        1064 :       usedAllThreads = false;
     366             :     }
     367             :   }
     368        2128 :   return rank;
     369             : }
     370             : 
     371        1064 : void Applicator::done()
     372             : {
     373             : // Signal that a worker process is done
     374             : //
     375        1064 :   donesig_p=DONE;
     376        1064 :   Int donesig=DONE;
     377        1064 :   if(isSerial())
     378        1064 :     put(donesig_p);
     379             :   else
     380           0 :     put(donesig);
     381        2128 :   return;
     382             : }
     383             : 
     384        1064 : void Applicator::apply(Algorithm &a)
     385             : {
     386             : // Execute an algorithm directly
     387             : //
     388             :   // Null operation unless serial, in which case the 
     389             :   // controller needs to execute the algorithm directly.
     390             :   // In the parallel case, the algorithm applies are
     391             :   // performed in workers processes' applicator.init().
     392        1064 :   donesig_p=10000;
     393        1064 :   if (isSerial() && isController()) {
     394        1064 :     a.apply();
     395             :   }
     396        1064 :   return;
     397             : }
     398             : 
     399           0 : void Applicator::defineAlgorithm(Algorithm *a)
     400             : {
     401             :   //no need to add if it is already defined
     402             :   //  if(algorithmIds.count(a->name()) <1){
     403             :   //knownAlgorithms.insert( std::pair<casacore::Int,Algorithm*>(LastID, a) );
     404             :   // algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a->name(), LastID) );
     405           0 :   Int theid=LastID;
     406           0 :   if(algorithmIds.count(a->name()) >0){
     407           0 :     theid=algorithmIds[a->name()];
     408             :   }
     409             :   else{
     410           0 :     theid=LastID;
     411           0 :     algorithmIds[a->name()]=LastID;
     412           0 :     ++LastID;
     413             :   }
     414           0 :   knownAlgorithms[theid]=a;
     415             :    // }
     416           0 :    return;
     417             : }
     418             : 
     419           1 : void Applicator::defineAlgorithms()
     420             : {
     421             : // Fill the algorithm map
     422             : //
     423             :   // Clark CLEAN parallel deconvolution
     424           1 :   Algorithm *a1 = new ClarkCleanAlgorithm;
     425           1 :   knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a1) );
     426           1 :   algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a1->name(), LastID) );
     427           1 :   LastID++;
     428           1 :   Algorithm *a2 = new ReadMSAlgorithm;
     429           1 :   knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a2) );
     430           1 :   algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a2->name(), LastID) );
     431           1 :   LastID++;
     432           1 :   Algorithm *a3 = new MakeApproxPSFAlgorithm;
     433           1 :   knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a3) );
     434           1 :   algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a3->name(), LastID) );
     435           1 :   LastID++;
     436           1 :   Algorithm *a4 = new PredictAlgorithm;
     437           1 :   knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a4) );
     438           1 :   algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a4->name(), LastID) );
     439           1 :   LastID++;
     440           1 :   Algorithm *a5 = new ResidualAlgorithm;
     441           1 :   knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a5) );
     442           1 :   algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a5->name(), LastID) );
     443           1 :   LastID++;
     444           1 :   Algorithm *a6 = new CubeMajorCycleAlgorithm;
     445           1 :   knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a6) );
     446           1 :   algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a6->name(), LastID) );
     447           1 :   LastID++;
     448           1 :   Algorithm *a7 = new CubeMakeImageAlgorithm;
     449           1 :   knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a7) );
     450           1 :   algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a7->name(), LastID) );
     451           1 :   LastID++;
     452           1 :   Algorithm *a8 = new CubeMinorCycleAlgorithm;
     453           1 :   knownAlgorithms.insert( std::pair<casacore::Int, Algorithm*>(LastID, a8) );
     454           1 :   algorithmIds.insert( std::pair<casacore::String, casacore::Int>(a8->name(), LastID) );
     455           1 :   LastID++;
     456           2 :   return;
     457             : }
     458             : 
     459           1 : void Applicator::setupProcStatus()
     460             : {
     461             : // Set up the process status list
     462             : //
     463           1 :   nProcs = comm->numThreads();
     464           1 :   if (nProcs <= 1) {
     465           1 :     serial = true;
     466             :   } else {
     467           0 :     serial = false;
     468             :   }
     469             :   // Resize the process list, and mark as unassigned (except for controller)
     470           1 :   usedAllThreads = false;
     471           1 :   procStatus.resize(max(nProcs,1));
     472           1 :   procStatus = FREE;
     473             :   // In the parallel case, the controller is never assigned
     474           1 :   if (!isSerial())
     475           0 :       procStatus(comm->controllerRank()) = ASSIGNED;
     476           1 : }
     477             : 
     478        1064 : Int Applicator::findFreeProc(Bool &lastOne)
     479             : {
     480             : // Search the process status list for the next free process
     481             : // 
     482        1064 :   Int freeProc = -1;
     483        1064 :   Int nfree = 0;
     484             :  
     485        2128 :   for (uInt i=0; i<procStatus.nelements(); i++) {
     486        1064 :     if (procStatus(i) == FREE) {
     487        1064 :       nfree++;
     488        1064 :       if (freeProc < 0) freeProc = i;
     489             :     }
     490             :   }
     491        1064 :   lastOne = (nfree==1);
     492             :   //cerr <<"FreeProc procstat "<< procStatus << " nfree " << nfree << endl; 
     493        1064 :   return freeProc;
     494             : }
     495             : 
     496             : // The applicator is ominpresent.
     497             : // Moved here for shared libraries.
     498             : Applicator applicator;
     499             : 
     500             : 
     501             : } //# NAMESPACE CASA - END
     502             : 

Generated by: LCOV version 1.16