LCOV - code coverage report
Current view: top level - msvis/MSVis - VisibilityProcessing.cc (source / functions) Hit Total Coverage
Test: ctest_coverage.info Lines: 0 595 0.0 %
Date: 2023-11-06 10:06:49 Functions: 0 103 0.0 %

          Line data    Source code
       1             :  /*
       2             :  * VisibilityProcessing.cc
       3             :  *
       4             :  *  Created on: Apr 20, 2011
       5             :  *      Author: jjacobs
       6             :  */
       7             : 
       8             : #include "VisibilityProcessing.h"
       9             : #include "VisBufferAsync.h"
      10             : #include "VisibilityIteratorImplAsync.h"
      11             : #include <stdcasa/UtilJ.h>
      12             : 
      13             : #include <tuple>
      14             : 
      15             : #include <casacore/casa/System/AipsrcValue.h>
      16             : 
      17             : #include <algorithm>
      18             : #include <list>
      19             : #include <stdarg.h>
      20             : #include <limits>
      21             : #include <memory>
      22             : #include <numeric>
      23             : 
      24             : using namespace casacore;
      25             : using namespace casa;
      26             : using namespace casacore;
      27             : using namespace casa::asyncio;
      28             : using namespace casacore;
      29             : using namespace casa::utilj;
      30             : using namespace std;
      31             : using std::shared_ptr;
      32             : 
      33             : using namespace casacore;
      34             : namespace casa {
      35             : 
      36             : namespace vpf {
      37             : 
      38             : #define Log(level, ...) \
      39             :     {if (level <= VpEngine::getLogLevel()) \
      40             :          VpEngine::log (__VA_ARGS__);}
      41             : 
      42             : 
      43             : //SimpleVp::SimpleVp (const String & name, const String & input, const String & output)
      44             : //: VisibilityProcessor (name,
      45             : //                       vector<String> (1, input),
      46             : //                       output.empty() ? vector<String> () : vector<String> (1, output))
      47             : //{}
      48             : //
      49             : //SimpleVp::~SimpleVp ()
      50             : //{}
      51             : //
      52             : //void
      53             : //SimpleVp::validateImpl ()
      54             : //{
      55             : //    throwIfAnyPortsUnconnected ();
      56             : //}
      57             : 
      58             : 
      59           0 : SplitterVp::SplitterVp (const String & name,
      60             :                         const String & inputName,
      61           0 :                         const vector<String> & outputNames)
      62           0 : : VisibilityProcessor (name, vector<String> (1, inputName), outputNames)
      63             : {
      64           0 :     ThrowIf (inputName.size() != 1, "Exactly one input is required.");
      65           0 :     ThrowIf (outputNames.size () < 1, "At least one output is required.");
      66           0 : }
      67             : 
      68             : 
      69             : 
      70             : VisibilityProcessor::ProcessingResult
      71           0 : SplitterVp::doProcessingImpl (ProcessingType processingType ,
      72             :                               VpData & inputData,
      73             :                               const SubchunkIndex & /*subChunkIndex*/)
      74             : {
      75           0 :     if (processingType == VisibilityProcessor::EndOfChunk ||
      76             :         processingType == VisibilityProcessor::EndOfData){
      77           0 :         return ProcessingResult();
      78             :     }
      79             : 
      80           0 :     VpPort inputPort = getInputs () [0];
      81           0 :     VpPorts outputs = getOutputs();
      82           0 :     VbPtr inputVbPtr = inputData [inputPort];
      83           0 :     VpData vpData;
      84             : 
      85             :     // Handle the first, required output by reusing the input VB
      86             : 
      87           0 :     vpData [outputs [0]] = inputVbPtr; // reuse in input
      88             : 
      89             :     // Handle any additional outputs.
      90             : 
      91           0 :     for (int i = 1; i < (int) outputs.size(); i++){
      92             : 
      93           0 :         VpPort port = outputs [i];
      94             : 
      95             :         // Make a copy of the input and add it as an output
      96             : 
      97           0 :         vpData [port] = VbPtr (inputVbPtr->clone());
      98             : 
      99             :     }
     100             : 
     101           0 :     ProcessingResult processingResult (Normal, vpData);
     102             : 
     103           0 :     return processingResult;
     104             : }
     105             : 
     106             : void
     107           0 : SplitterVp::validateImpl ()
     108             : {
     109           0 :     throwIfAnyInputsUnconnected ();
     110           0 :     ThrowIf (getOutputs (true).empty(),
     111             :              String::format ("SplitterVp %s has no outputs connected.", getFullName().c_str()));
     112           0 : }
     113             : 
     114             : 
     115           0 : SubchunkIndex::SubchunkIndex (Int chunkNumber, Int subChunkNumber, Int iteration)
     116           0 : : chunkNumber_p (chunkNumber), iteration_p (iteration), subChunkNumber_p (subChunkNumber)
     117           0 : {}
     118             : 
     119             : Bool
     120           0 : SubchunkIndex::operator< (const SubchunkIndex & other) const
     121             : {
     122           0 :     Bool result = std::make_tuple (chunkNumber_p, subChunkNumber_p, iteration_p) <
     123           0 :                   std::make_tuple (other.chunkNumber_p, other.subChunkNumber_p, other.iteration_p);
     124             : 
     125           0 :     return result;
     126             : }
     127             : 
     128             : Int
     129           0 : SubchunkIndex::getChunkNumber () const
     130             : {
     131           0 :     return chunkNumber_p;
     132             : }
     133             : 
     134             : Int
     135           0 : SubchunkIndex::getIteration () const
     136             : {
     137           0 :     return iteration_p;
     138             : }
     139             : 
     140             : Int
     141           0 : SubchunkIndex::getSubchunkNumber () const
     142             : {
     143           0 :     return subChunkNumber_p;
     144             : }
     145             : 
     146             : String
     147           0 : SubchunkIndex::toString() const
     148             : {
     149           0 :     return String::format ("(%d,%d,%d)", chunkNumber_p, subChunkNumber_p, iteration_p);
     150             : }
     151             : 
     152             : VisibilityProcessorStub::ProcessingResult
     153           0 : VisibilityProcessorStub::doProcessingImpl (ProcessingType /*processingType*/,
     154             :                                            VpData & /*inputData*/,
     155             :                                            const SubchunkIndex & /*subChunkIndex*/)
     156             : {
     157           0 :     Throw ("Stub does not permit processing.");
     158             : }
     159             : 
     160             : void
     161           0 : VisibilityProcessorStub::validateImpl ()
     162             : {
     163           0 :     Throw ("Stub does not permit validation.");
     164             : }
     165             : 
     166             : 
     167             : //VisBuffer *
     168             : //VbPtr::get ()
     169             : //{
     170             : //    return vb_p; // use with caution
     171             : //}
     172             : //
     173             : 
     174             : //VisBuffer *
     175             : //VbPtr::release ()
     176             : //{
     177             : //    freeOnDelete_p = false;
     178             : //    VisBuffer * vb = vb_p;
     179             : //    vb_p = NULL;
     180             : //
     181             : //    return vb;
     182             : //}
     183             : 
     184           0 : VisibilityProcessor::VisibilityProcessor ()
     185             : : container_p (NULL),
     186             :   nSubchunks_p (0),
     187             :   nSubchunksUnique_p (0),
     188           0 :   vpEngine_p (0)
     189           0 : {}
     190             : 
     191             : 
     192           0 : VisibilityProcessor::VisibilityProcessor (const String & name,
     193             :                                           const vector<String> & inputNames,
     194             :                                           const vector<String> & outputNames,
     195           0 :                                           Bool makeIoPorts)
     196             : : container_p (NULL),
     197             :   name_p (name),
     198             :   nSubchunks_p (0),
     199             :   nSubchunksUnique_p (0),
     200           0 :   vpEngine_p (0)
     201             : 
     202             : {
     203           0 :     VpPort::Type portType = makeIoPorts ? VpPort::InOut : VpPort::Input;
     204           0 :     vpInputs_p = definePorts (inputNames, portType, "input");
     205           0 :     portType = makeIoPorts ? VpPort::InOut : VpPort::Output;
     206           0 :     vpOutputs_p = definePorts (outputNames, portType, "output");
     207           0 : }
     208             : 
     209             : void
     210           0 : VisibilityProcessor::throwIfAnyInputsUnconnected (const vector<String> & exceptThese) const
     211             : {
     212           0 :     VpPorts inputs = getInputs ();
     213             : 
     214           0 :     VpPorts unconnectedPorts = portsUnconnected (inputs, & VpPort::isConnectedInput, exceptThese);
     215             : 
     216           0 :     ThrowIf (! unconnectedPorts.empty(),
     217             :              String::format ("Vp '%s' has unconnected inputs: %s",
     218             :                      getName().c_str(),
     219             :                      unconnectedPorts.toString().c_str()));
     220           0 : }
     221             : 
     222             : void
     223           0 : VisibilityProcessor::throwIfAnyInputsUnconnectedExcept (const String & name) const
     224             : {
     225           0 :     throwIfAnyInputsUnconnected (vector<String> (1, name));
     226           0 : }
     227             : 
     228             : 
     229             : void
     230           0 : VisibilityProcessor::throwIfAnyOutputsUnconnected (const vector<String> & exceptThese) const
     231             : {
     232           0 :     VpPorts outputs = getOutputs ();
     233             : 
     234           0 :     VpPorts unconnectedPorts = portsUnconnected (outputs, & VpPort::isConnectedOutput, exceptThese);
     235             : 
     236           0 :     ThrowIf (! unconnectedPorts.empty(),
     237             :              String::format ("Vp '%s' has unconnected outputs: %s",
     238             :                      getName().c_str(),
     239             :                      unconnectedPorts.toString().c_str()));
     240           0 : }
     241             : 
     242             : void
     243           0 : VisibilityProcessor::throwIfAnyOutputsUnconnectedExcept (const String & name) const
     244             : {
     245           0 :     throwIfAnyOutputsUnconnected (vector<String> (1, name));
     246           0 : }
     247             : 
     248             : 
     249             : void
     250           0 : VisibilityProcessor::throwIfAnyPortsUnconnected () const
     251             : {
     252           0 :     VpPorts inputs = getInputs ();
     253             : 
     254           0 :     VpPorts unconnectedInputs = portsUnconnected (inputs, & VpPort::isConnectedInput);
     255             : 
     256           0 :     VpPorts outputs = getOutputs ();
     257             : 
     258           0 :     VpPorts unconnectedOutputs = portsUnconnected (outputs, & VpPort::isConnectedOutput);
     259             : 
     260           0 :     if (! unconnectedInputs.empty () || ! unconnectedOutputs.empty()){
     261             : 
     262           0 :         String message = String::format ("Vp '%s' has");
     263             : 
     264             : 
     265           0 :         if (! unconnectedInputs.empty()){
     266             : 
     267             :             // String::format up the inputs portion of the message, if applicable
     268             : 
     269           0 :             message += String::format (" unconnected inputs: %s",
     270           0 :                                 unconnectedInputs.toString().c_str());
     271             :         }
     272             : 
     273           0 :         if (! unconnectedInputs.empty()){
     274             : 
     275             :             // String::format up the inputs portion of the message, if applicable.
     276             :             // Determine the appropriate text to join up the previous text.
     277             : 
     278           0 :             string conjunction = (unconnectedInputs.empty()) ? ""
     279           0 :                                                              : "\n and";
     280             : 
     281           0 :              message += String::format ("%s has unconnected outputs: %s",
     282             :                                 conjunction.c_str (),
     283           0 :                                 unconnectedOutputs.toString().c_str());
     284             :         }
     285             : 
     286           0 :         message += ".";
     287             : 
     288           0 :         ThrowIf (true, message);
     289             : 
     290             :     }
     291             : 
     292           0 : }
     293             : 
     294             : 
     295             : VpPorts
     296           0 : VisibilityProcessor::portsUnconnected (const VpPorts & ports, Bool (VpPort::* isConnected) () const,
     297             :                                        const vector<String> & except) const
     298             : {
     299           0 :     VpPorts unconnectedPorts;
     300             : 
     301           0 :     for (VpPorts::const_iterator port = ports.begin(); port != ports.end(); port ++){
     302             : 
     303           0 :         Bool connected = ((* port).*isConnected)();
     304             : 
     305           0 :         if (! connected){
     306             : 
     307             :             // See if it's in the list of exceptions
     308             : 
     309           0 :             if (! except.empty() && find (except.begin(), except.end(), port->getName()) == except.end()){
     310           0 :                 unconnectedPorts.push_back (* port);
     311             :             }
     312             :         }
     313             : 
     314             :     }
     315             : 
     316           0 :     return unconnectedPorts;
     317             : }
     318             : 
     319             : void
     320           0 : VisibilityProcessor::chunkStart (const SubchunkIndex & sci)
     321             : {
     322           0 :     chunkStartImpl (sci);
     323           0 : }
     324             : 
     325             : 
     326             : VpPorts
     327           0 : VisibilityProcessor::definePorts (const vector<String> & portNames, VpPort::Type type, const String & typeName)
     328             : {
     329             : 
     330           0 :     VpPorts vpPorts;
     331             : 
     332           0 :     for (vector<String>::const_iterator portName = portNames.begin();
     333           0 :          portName != portNames.end();
     334           0 :          portName ++){
     335             : 
     336           0 :         ThrowIf (vpPorts.contains (* portName),
     337             :                  String::format ("VisibilityProcessor %s already has an %s port '%s'",
     338             :                          getName().c_str(), typeName.c_str(), portName->c_str()));
     339             : 
     340           0 :         vpPorts.push_back (VpPort (this, * portName, type));
     341             :     }
     342             : 
     343           0 :     return vpPorts;
     344             : }
     345             : 
     346             : VisibilityProcessor::ProcessingResult
     347           0 : VisibilityProcessor::doProcessing (ProcessingType processingType,
     348             :                                    VpData & inputData,
     349             :                                    VpEngine * vpEngine,
     350             :                                    const SubchunkIndex & subchunkIndex)
     351             : {
     352             : 
     353           0 :     vpEngine_p = vpEngine;
     354           0 :     pair<Int,Int> originalViPosition = getVi()->getSubchunkId ();
     355             : 
     356           0 :     if (processingType == Subchunk && subchunkIndex != SubchunkIndex::Invalid){
     357           0 :         nSubchunks_p ++;
     358           0 :         if (subchunkIndex.getIteration() == 0){
     359           0 :             nSubchunksUnique_p ++;
     360             :         }
     361             :     }
     362             : 
     363           0 :     ProcessingResult result;
     364             : 
     365             :     try {
     366           0 :         result = doProcessingImpl (processingType, inputData, subchunkIndex);
     367             : 
     368             :     }
     369           0 :     catch (AipsError & e){
     370             : 
     371           0 :         vpEngine_p = 0;
     372             : 
     373           0 :         Rethrow (e, String::format ("Error in doProcessing of VP '%s': %s on %s", getName().c_str(),
     374             :                             toString (processingType).c_str(), subchunkIndex.toString().c_str()));
     375             :     }
     376             : 
     377           0 :     pair<Int,Int> currentViPosition = getVi()->getSubchunkId ();
     378             : 
     379           0 :     ThrowIf (currentViPosition != originalViPosition,
     380             :              String::format ("VisibilityIterator moved during processing in VP '%s'", getName().c_str()));
     381             : 
     382           0 :     vpEngine_p = 0;
     383             : 
     384           0 :     return result;
     385             : 
     386             : }
     387             : 
     388             : 
     389             : String
     390           0 : VisibilityProcessor::getFullName () const
     391             : {
     392             : 
     393           0 :     list<String> names;
     394           0 :     names.push_front (getName());
     395             : 
     396           0 :     const VisibilityProcessor * parent = getContainer();
     397             : 
     398           0 :     while (parent != NULL){
     399             : 
     400           0 :         names.push_front (parent->getName());
     401           0 :         parent = parent->getContainer();
     402             :     }
     403             : 
     404           0 :     String fullName = utilj::join (names, ".");
     405             : 
     406           0 :     return fullName;
     407             : }
     408             : 
     409             : VpPorts
     410           0 : VisibilityProcessor::getInputs (Bool connectedOnly) const
     411             : {
     412           0 :     VpPorts result;
     413             : 
     414           0 :     if (connectedOnly){
     415             : 
     416             :         // Copy over the inputs that are input connected.
     417             :         // Need to negate predicate to get the right results --STL is strange sometimes.
     418             : 
     419             :         remove_copy_if (vpInputs_p.begin(), vpInputs_p.end(), back_inserter (result),
     420           0 :                         not1 (mem_fun_ref (& VpPort::isConnectedInput)));
     421             :     }
     422             :     else{
     423           0 :         result = vpInputs_p;
     424             :     }
     425             : 
     426           0 :     return result;
     427             : }
     428             : 
     429             : VpPort
     430           0 : VisibilityProcessor::getInput (const String & name) const
     431             : {
     432           0 :     ThrowIf (! vpInputs_p.contains (name),
     433             :              String::format ("Vp '%s' has no input port '%s'", getName().c_str(), name.c_str()));
     434             : 
     435           0 :     return vpInputs_p.get (name);
     436             : }
     437             : 
     438             : VpPort &
     439           0 : VisibilityProcessor::getInputRef (const String & name)
     440             : {
     441           0 :     ThrowIf (! vpInputs_p.contains (name),
     442             :              String::format ("Vp '%s' has no input port '%s'", getName().c_str(), name.c_str()));
     443             : 
     444           0 :     return vpInputs_p.getRef (name);
     445             : }
     446             : 
     447             : String
     448           0 : VisibilityProcessor::getName () const
     449             : {
     450           0 :     return name_p;
     451             : }
     452             : 
     453             : Int
     454           0 : VisibilityProcessor::getNSubchunksProcessed () const
     455             : {
     456           0 :     return nSubchunks_p;
     457             : }
     458             : 
     459             : Int
     460           0 : VisibilityProcessor::getNSubchunksUniqueProcessed () const
     461             : {
     462           0 :     return nSubchunksUnique_p;
     463             : }
     464             : 
     465             : 
     466             : VpPort
     467           0 : VisibilityProcessor::getOutput (const String & name) const
     468             : {
     469           0 :     ThrowIf (! vpOutputs_p.contains (name),
     470             :              String::format ("Vp '%s' has no output port '%s'", getName().c_str(), name.c_str()));
     471             : 
     472           0 :     return vpOutputs_p.get (name);
     473             : }
     474             : 
     475             : VpPort &
     476           0 : VisibilityProcessor::getOutputRef (const String & name)
     477             : {
     478           0 :     ThrowIf (! vpOutputs_p.contains (name),
     479             :              String::format ("Vp '%s' has no output port '%s'", getName().c_str(), name.c_str()));
     480             : 
     481           0 :     return vpOutputs_p.getRef (name);
     482             : }
     483             : 
     484             : VpPorts
     485           0 : VisibilityProcessor::getOutputs (Bool connectedOnly) const
     486             : {
     487           0 :     VpPorts result;
     488             : 
     489           0 :     if (connectedOnly){
     490             : 
     491             :         // Copy over the outputs that are output connected.
     492             :         // Need to negate predicate to get the right results --STL is strange sometimes.
     493             : 
     494             :         remove_copy_if (vpOutputs_p.begin(), vpOutputs_p.end(), back_inserter (result),
     495           0 :                         not1 (mem_fun_ref (& VpPort::isConnectedOutput)));
     496             :     }
     497             :     else{
     498           0 :         result = vpOutputs_p;
     499             :     }
     500             : 
     501           0 :     return result;
     502             : }
     503             : 
     504             : PrefetchColumns
     505           0 : VisibilityProcessor::getPrefetchColumns () const
     506             : {
     507           0 :     return PrefetchColumns ();
     508             : }
     509             : 
     510             : ROVisibilityIterator *
     511           0 : VisibilityProcessor::getVi ()
     512             : {
     513           0 :     return vpEngine_p->getVi();
     514             : }
     515             : 
     516             : VpEngine *
     517           0 : VisibilityProcessor::getVpEngine ()
     518             : {
     519           0 :     return vpEngine_p;
     520             : }
     521             : 
     522             : void
     523           0 : VisibilityProcessor::processingStart ()
     524             : {
     525           0 :     nSubchunks_p = 0;
     526           0 :     nSubchunksUnique_p = 0;
     527             : 
     528           0 :     validate();
     529             : 
     530           0 :     processingStartImpl ();
     531           0 : }
     532             : 
     533             : void
     534           0 : VisibilityProcessor::setContainer (const VpContainer * container)
     535             : {
     536           0 :     assert (container != NULL);
     537             : 
     538           0 :     ThrowIf (container_p != NULL,
     539             :              String::format ("Attempting to add VisibiltyProcessor '%s' into '%s'; previously added to '%s'",
     540             :                      getName().c_str(), container->getFullName().c_str(), container_p->getFullName().c_str()));
     541             : 
     542           0 :     container_p = container;
     543           0 : }
     544             : 
     545             : String
     546           0 : toString (VisibilityProcessor::ProcessingType p)
     547             : {
     548             :     static const char * names [] = {"Subchunk", "EndOfChunk", "EndOfData"};
     549             : 
     550           0 :     return names [p];
     551             : }
     552             : 
     553             : void
     554           0 : VisibilityProcessor::validate ()
     555             : {
     556           0 :     validateImpl ();
     557           0 : }
     558             : 
     559           0 : VpContainer::VpContainer (const String & name, const vector<String> & inputs, const vector<String> & outputs)
     560           0 : : VisibilityProcessor (name, inputs, outputs, true)
     561           0 : {}
     562             : 
     563             : void
     564           0 : VpContainer::add (VisibilityProcessor * vp)
     565             : {
     566           0 :     ThrowIf (contains (vp),
     567             :              String::format ("Visibility processor %s already in container %s",
     568             :                             vp->getName().c_str(), getName().c_str()));
     569             : 
     570           0 :     vp->setContainer (this);
     571           0 :     vps_p.push_back (vp);
     572           0 : }
     573             : 
     574             : void
     575           0 : VpContainer::chunkStart (const SubchunkIndex & sci)
     576             : {
     577           0 :     iterator i;
     578             :     try{
     579           0 :         for (i = begin(); i != end(); i++){
     580           0 :             (*i)->chunkStart (sci);
     581             :         }
     582             :     }
     583           0 :     catch (AipsError & e){
     584           0 :         Rethrow (e, String::format ("Error during chunkStart for container '%s' VP '%s'",
     585             :                             getName().c_str(), (*i)->getName().c_str()));
     586             :     }
     587           0 : }
     588             : 
     589             : void
     590           0 : VpContainer::connect (const String &  sourcePortName,
     591             :                       VisibilityProcessor * sinkVp, const String &  sinkPortName)
     592             : {
     593           0 :     connect (this, sourcePortName, sinkVp, sinkPortName);
     594           0 : }
     595             : 
     596             : void
     597           0 : VpContainer::connect (VisibilityProcessor * sourceVp, const String &  sourcePortName,
     598             :                       const String &  sinkPortName)
     599             : {
     600           0 :     connect (sourceVp, sourcePortName, this, sinkPortName);
     601           0 : }
     602             : 
     603             : pair<VpPort, VpPort>
     604           0 : VpContainer::validateConnectionPorts (VisibilityProcessor * sourceVp,
     605             :                                       const String &  sourcePortName,
     606             :                                       VisibilityProcessor * sinkVp,
     607             :                                       const String &  sinkPortName)
     608             : {
     609             :     // Does the owning VP really support these ports?
     610             : 
     611           0 :     ThrowIf (sourceVp != this && ! sourceVp->getOutputs ().contains (sourcePortName),
     612             :              String::format ("Visibility processor %s in %s does not have output %s",
     613             :                      sourceVp->getName().c_str(), getName().c_str(),
     614             :                      sourcePortName.c_str()));
     615             : 
     616           0 :     ThrowIf (sourceVp == this && ! getInputs().contains (sourcePortName),
     617             :              String::format ("Visibility processor container %s in %s does not have input %s",
     618             :                      sourceVp->getName().c_str(), getName().c_str(),
     619             :                      sourcePortName.c_str()));
     620             : 
     621           0 :     ThrowIf (sinkVp != this && ! sinkVp->getInputs ().contains (sinkPortName),
     622             :              String::format ("Visibility processor %s in %s does not have input %s",
     623             :                      sinkVp->getName().c_str(), getName().c_str(),
     624             :                      sinkPortName.c_str()));
     625             : 
     626           0 :     ThrowIf (sinkVp == this && ! getOutputs().contains (sinkPortName),
     627             :              String::format ("Visibility processor container %s in %s does not have output %s",
     628             :                      sinkVp->getName().c_str(), getName().c_str(),
     629             :                      sinkPortName.c_str()));
     630             : 
     631             :     // Are the ports already in use?
     632             : 
     633           0 :     VpPort sink = (sinkVp == this) ? sinkVp->getOutput (sinkPortName)
     634           0 :                                    : sinkVp->getInput (sinkPortName);
     635           0 :     VpPort source = (sourceVp == this) ? sourceVp->getInput (sourcePortName)
     636           0 :                                        : sourceVp->getOutput (sourcePortName);
     637             : 
     638           0 :     ThrowIf (utilj::containsKey (source, network_p),
     639             :              String::format ("Output %s already in use for visibility processor %s in %s",
     640             :                      source.getName().c_str(), sourceVp->getName().c_str(), getName().c_str()));
     641             : 
     642           0 :     ThrowIf (utilj::containsKey (sink, networkReverse_p),
     643             :              String::format ("Input %s already in use for visibility processor %s in %s",
     644             :                      sink.getName().c_str(), sinkVp->getName().c_str(), getName().c_str()));
     645             : 
     646           0 :     return make_pair (source, sink);
     647             : }
     648             : 
     649             : void
     650           0 : VpContainer::connect (VisibilityProcessor * sourceVp, const String &  sourcePortName,
     651             :                       VisibilityProcessor * sinkVp, const String &  sinkPortName)
     652             : {
     653             :     // Validate the requested connection
     654             :     // =================================
     655             : 
     656             :     // Do they refer to a VP in this container?
     657             : 
     658           0 :     ThrowIf (! contains (sourceVp) && sourceVp != this,
     659             :              String::format ("No such visibility processor %s in %s.",
     660             :                      sourceVp->getName().c_str(), getName().c_str()));
     661           0 :     ThrowIf (! contains (sinkVp) && sinkVp != this,
     662             :              String::format ("No such visibility processor %s in %s.",
     663             :                      sinkVp->getName().c_str(), getName().c_str()));
     664             : 
     665           0 :     VpPort sink, source;
     666           0 :     std::tie (source, sink) = validateConnectionPorts (sourceVp, sourcePortName, sinkVp, sinkPortName);
     667             : 
     668             :     // See if this is a connection to the container inputs or outputs or
     669             :     // a normal connection between VPs
     670             : 
     671           0 :     Bool containerConnect = (source.getType() == sink.getType()) &&
     672           0 :                             ((source.isType(VpPort::Input) && sourceVp == this) ||
     673           0 :                              (sink.isType(VpPort::Output) && sinkVp == this));
     674             : 
     675           0 :     Bool normalConnect = source.isType (VpPort::Output) && sink.isType (VpPort::Input);
     676             : 
     677           0 :     Bool selfConnect = sourceVp == sinkVp; // detects loop back
     678             : 
     679           0 :     ThrowIf (! (normalConnect ||  containerConnect) || selfConnect,
     680             :              String::format ("Cannot connect %s:%s to %s:%s in %s", sourceVp->getName ().c_str(),
     681             :                      source.getName ().c_str (), sinkVp->getName().c_str (),
     682             :                      sink.getName ().c_str (), getName().c_str()));
     683             : 
     684             :     // The validation is over, so actually do the connection.
     685             : 
     686           0 :     network_p [source] = sink;
     687           0 :     networkReverse_p.insert (sink);
     688             : 
     689             :     // Inform the real ports (i.e., not the copies) that they are connected
     690             :     // N.B.: Container ports are in/out and are intended to be doubly connected,
     691             :     //       from the inside and from the outside of the container.
     692             : 
     693           0 :     if (source.isType (VpPort::Input)){
     694           0 :         sourceVp->getInputRef (source.getName()).setConnectedOutput ();
     695             :     }
     696             :     else{
     697           0 :         sourceVp->getOutputRef (source.getName()).setConnectedOutput ();
     698             :     }
     699             : 
     700           0 :     if (sink.isType (VpPort::Output)){
     701           0 :         sinkVp->getOutputRef (sink.getName()).setConnectedInput ();
     702             :     }
     703             :     else{
     704           0 :         sinkVp->getInputRef (sink.getName()).setConnectedInput ();
     705             :     }
     706           0 : }
     707             : 
     708             : VpContainer::iterator
     709           0 : VpContainer::begin()
     710             : {
     711           0 :     return iterator (vps_p.begin());
     712             : }
     713             : 
     714             : VpContainer::const_iterator
     715           0 : VpContainer::begin() const
     716             : {
     717           0 :     return const_iterator (vps_p.begin());
     718             : }
     719             : 
     720             : 
     721             : 
     722             : Bool
     723           0 : VpContainer::contains (const VisibilityProcessor * vp) const
     724             : {
     725           0 :     Bool foundIt = find (vps_p.begin(), vps_p.end(), vp) != vps_p.end();
     726             : 
     727           0 :     return foundIt;
     728             : }
     729             : 
     730             : VisibilityProcessor::ProcessingResult
     731           0 : VpContainer::doProcessingImpl (ProcessingType processingType, VpData & data, const SubchunkIndex & sci)
     732             : {
     733             : 
     734           0 :     VpSet vpsWaiting (vps_p.begin(), vps_p.end()); // Set of pending VPs
     735           0 :     ChunkCode overallChunkCode = Normal;           // container result for this data
     736             :     VisibilityProcessor * vp;                      // Currently executing VP
     737           0 :     VpData vpInputs;                               // Inputs to be fed to current Vp
     738             : 
     739           0 :     remapPorts (data, this);
     740             : 
     741           0 :     Log (3, "VpContainer::doProcessing: '%s' starting execution with inputs {%s}.\n",
     742             :          getName().c_str(), data.getNames().c_str());
     743             : 
     744             :     try{
     745             : 
     746           0 :         do {
     747             : 
     748             :             // Find a VP which can compute given the current set of inputs
     749             : 
     750           0 :             Bool flushing = processingType != Subchunk;
     751           0 :             std::tie (vp, vpInputs) = findReadyVp (vpsWaiting, data, flushing);
     752             : 
     753           0 :             if (vp != NULL){
     754             : 
     755           0 :                 Log (3, "VpContainer::doProcessing: '%s' starting execution of %s.\n",
     756             :                      getName().c_str(), vp->getName().c_str());
     757             : 
     758             :                 // Have the ready VP process its inputs and
     759             :                 // potentially produce more outputs
     760             : 
     761             :                 ChunkCode chunkCode;
     762           0 :                 VpData outputs;
     763             : 
     764           0 :                 std::tie (chunkCode, outputs) =
     765           0 :                         vp->doProcessing (processingType, vpInputs, getVpEngine(), sci);
     766             : 
     767           0 :                 Log (3, "VpContainer::doProcessing: execution of %s output {%s}.\n",
     768             :                      vp->getName().c_str(), outputs.getNames().c_str());
     769             : 
     770           0 :                 if (processingType == EndOfChunk && chunkCode == RepeatChunk){
     771             : 
     772             :                     // If any VP in this iteration requests a chunk repeat,
     773             :                     // then that's the overall result.
     774             : 
     775           0 :                     overallChunkCode = RepeatChunk;
     776             :                 }
     777             : 
     778             :                 // Remove the VP from the set of pending VPs, remove
     779             :                 // the data this VP consumed as inputs and add any outputs
     780             :                 // it produced to the set of available data.
     781             : 
     782           0 :                 vpsWaiting.erase (vp);
     783           0 :                 for (VpData::const_iterator i = vpInputs.begin(); i != vpInputs.end(); i++){
     784           0 :                     data.erase (i->first);
     785             :                 }
     786           0 :                 remapPorts (outputs, vp);
     787           0 :                 data.insert (outputs.begin(), outputs.end());
     788             : 
     789             :             }
     790             : 
     791           0 :         } while (vp != NULL);
     792             : 
     793             :     }
     794           0 :     catch (AipsError & e){
     795           0 :         Rethrow (e, String::format ("Error while container '%s' processing VP '%s'",
     796             :                             getName().c_str(), (vp != NULL) ? vp->getName().c_str() : "NULL"));
     797             :     }
     798             : 
     799           0 :     if (vpsWaiting.empty()){
     800           0 :         Log (3, "VpContainer::doProcessing: '%s' executed all VPs.\n", getName().c_str());
     801             :     }
     802             :     else{
     803           0 :         Log (3, "VpContainer::doProcessing: '%s' did not execute VPs: {%s}.\n",
     804             :              getName().c_str(), vpsWaiting.getNames().c_str());
     805             :     }
     806             : 
     807           0 :     return ProcessingResult (overallChunkCode, data);
     808             : }
     809             : 
     810             : Bool
     811           0 : VpContainer::empty () const
     812             : {
     813           0 :     return vps_p.empty();
     814             : }
     815             : 
     816             : VpContainer::iterator
     817           0 : VpContainer::end()
     818             : {
     819           0 :     return iterator (vps_p.end());
     820             : }
     821             : 
     822             : VpContainer::const_iterator
     823           0 : VpContainer::end() const
     824             : {
     825           0 :     return const_iterator (vps_p.end());
     826             : }
     827             : 
     828             : void
     829           0 : VpContainer::fillWithSequence (VisibilityProcessor * first, ...)
     830             : {
     831           0 :     ThrowIf (! vps_p.empty (),
     832             :              String::format ("fillWithSequence performed on non-empty container %s", getName().c_str()));
     833             : 
     834             :         va_list vaList;
     835             : 
     836           0 :         VisibilityProcessor * vp = first;
     837             : 
     838           0 :         va_start (vaList, first);
     839             : 
     840           0 :         while (vp != NULL){
     841             : 
     842           0 :             add (vp);
     843             : 
     844           0 :             vp = va_arg (vaList, VisibilityProcessor *);
     845             :         }
     846             : 
     847           0 :         va_end (vaList);
     848             : 
     849           0 :         for (VPs::iterator vp = vps_p.begin ();
     850           0 :              vp != vps_p.end();
     851           0 :              vp ++){
     852             : 
     853           0 :             VPs::iterator vp2 = vp + 1;
     854           0 :             if (vp2 == vps_p.end()){
     855           0 :                 break;
     856             :             }
     857             : 
     858           0 :             ThrowIf ((* vp)->getOutputs().empty(),
     859             :                      String::format ("Visibility processor %s has no outputs.", (* vp)->getName().c_str()));
     860           0 :             ThrowIf ((* vp2)->getInputs().empty(),
     861             :                      String::format ("Visibility processor %s has no inputs.", (* vp2)->getName().c_str()));
     862             : 
     863           0 :             connect (* vp, (* vp)->getOutputs().front().getName(),
     864           0 :                      * vp2, (* vp2)->getInputs().front().getName());
     865             : 
     866             :         }
     867             : 
     868             :         // Connect up containers input to the input of the first VP
     869             : 
     870           0 :         ThrowIf (vps_p.front()->getInputs().empty(),
     871             :                  String::format ("First node in sequence, %s, has no inputs",
     872             :                          vps_p.front()->getName().c_str()));
     873             : 
     874           0 :         connect (getInputs().front().getName(),
     875           0 :                  vps_p.front(), vps_p.front()->getInputs().front().getName());
     876             : 
     877           0 :         if (! getOutputs().empty()  && ! vps_p.back()->getOutputs().empty()){
     878             : 
     879             :             // Connect up output of last node with output of container
     880             : 
     881           0 :             connect (vps_p.back(), vps_p.back()->getInputs().front().getName(),
     882           0 :                      getOutputs().front().getName());
     883             : 
     884             :         }
     885           0 : }
     886             : 
     887             : VpContainer::ReadyVpAndData
     888           0 : VpContainer::findReadyVp (VpSet & vps, VpData & data, Bool flushing) const
     889             : {
     890           0 :     if (flushing){
     891           0 :         return findReadyVpFlushing (vps, data);
     892             :     }
     893             :     else{
     894           0 :         return findReadyVpNormal (vps, data);
     895             :     }
     896             : }
     897             : 
     898             : VpContainer::ReadyVpAndData
     899           0 : VpContainer::findReadyVpFlushing (VpSet & vpsWaiting, VpData & data) const
     900             : {
     901             : 
     902           0 :     VisibilityProcessor * readyVp = NULL;
     903             : 
     904             :     // The first vp in the ordered list to also be in the waiting set is the
     905             :     // one we want.
     906             : 
     907           0 :     for (VPs::const_iterator vp = vps_p.begin(); vp != vps_p.end(); vp ++){
     908           0 :         if (vpsWaiting.find (* vp) != vpsWaiting.end()){
     909           0 :             readyVp = * vp;
     910           0 :             break;
     911             :         }
     912             :     }
     913             : 
     914           0 :     if (readyVp == NULL){
     915             : 
     916           0 :         ThrowIf (! vpsWaiting.empty(), "Could not find ready VP during flush (bug)");
     917             : 
     918           0 :         return ReadyVpAndData (NULL, VpData());
     919             :     }
     920             : 
     921             :     // The set of input data will be all of the inputs desired by the node that
     922             :     // are available.  They may not be present if an upstream node didn't have any
     923             :     // data to flush out.
     924             : 
     925           0 :     VpPorts connectedInputList = readyVp -> getInputs (true);
     926             : 
     927           0 :     ReadyVpAndData result = ReadyVpAndData (readyVp, data.getSelection (connectedInputList, true));
     928             : 
     929           0 :     return result;
     930             : }
     931             : 
     932             : VpContainer::ReadyVpAndData
     933           0 : VpContainer::findReadyVpNormal (VpSet & vps, VpData & data) const
     934             : {
     935           0 :     ReadyVpAndData result (NULL, VpData());
     936             : 
     937           0 :     set<VpPort> dataPorts;
     938           0 :     for (VpData::const_iterator d = data.begin(); d != data.end(); d++){
     939           0 :         dataPorts.insert (d->first);
     940             :     }
     941             : 
     942           0 :     for (VpSet::const_iterator vp = vps.begin(); vp != vps.end(); vp ++){
     943             : 
     944           0 :         VpPorts connectedInputList = (* vp)->getInputs (true);
     945             : 
     946           0 :         set<VpPort> connectedInputSet (connectedInputList.begin(), connectedInputList.end());
     947             : 
     948             :         // Subtract from the needed input ports, the set of available data ports.
     949             :         // When the comes up empty then the VP can execute.
     950             : 
     951           0 :         VpPorts diff;
     952             :         set_difference (connectedInputSet.begin(), connectedInputSet.end(),
     953             :                         dataPorts.begin(), dataPorts.end(),
     954           0 :                         back_inserter (diff));
     955             : 
     956           0 :         if (diff.empty()){
     957             : 
     958           0 :             result = ReadyVpAndData (* vp, data.getSelection (connectedInputList));
     959             : 
     960           0 :             break;
     961             :         }
     962             : 
     963             :     }
     964             : 
     965           0 :     return result;
     966             : }
     967             : 
     968             : bool
     969           0 : VpContainer::follows (const VisibilityProcessor * a, const VisibilityProcessor * b) const
     970             : {
     971             :     // Go through the interconnection network and see if one of processor b's outputs go to
     972             :     // processor a's inputs
     973             : 
     974           0 :     Bool result = false;
     975           0 :     for (Network::const_iterator arc = network_p.begin(); arc != network_p.end(); arc ++){
     976           0 :         if (arc->first.getVp() == b && arc->second.getVp() == a){
     977           0 :             result = true;
     978           0 :             break;
     979             :         }
     980             :     }
     981             : 
     982           0 :     return result;
     983             : }
     984             : 
     985             : bool
     986           0 : VpContainer::followsSet (const VisibilityProcessor * a, const VpSet & vpSet) const
     987             : {
     988           0 :     Bool result = false;
     989             : 
     990           0 :     for (VpSet::const_iterator vp = vpSet.begin(); vp != vpSet.end(); vp++){
     991           0 :         if (follows (a, * vp)){
     992           0 :             result = true;
     993           0 :             break;
     994             :         }
     995             :     }
     996             : 
     997           0 :     return result;
     998             : }
     999             : 
    1000             : casa::asyncio::PrefetchColumns
    1001           0 : VpContainer::getPrefetchColumns () const
    1002             : {
    1003           0 :     PrefetchColumns result;
    1004             : 
    1005           0 :     for (VPs::const_iterator vp = vps_p.begin(); vp != vps_p.end(); vp ++){
    1006             : 
    1007           0 :         result = result + (* vp)->getPrefetchColumns();
    1008             : 
    1009             :     }
    1010             : 
    1011           0 :     return result;
    1012             : }
    1013             : 
    1014             : 
    1015             : void
    1016           0 : VpContainer::orderContents ()
    1017             : {
    1018             :     // Order the VPs in this container using their dependencies.  Nodes that are only dependent
    1019             :     // on the container will be first, then nodes dependent on the first set of nodes, etc.
    1020             : 
    1021           0 :     VpSet unorderedVps (vps_p.begin(), vps_p.end()); // VPs not assigned an order as of yet
    1022           0 :     VPs orderedVps; // sorted list of VPs
    1023             : 
    1024           0 :     while (! unorderedVps.empty()){
    1025             : 
    1026           0 :         VPs nextClass;
    1027             : 
    1028             :         // Create the next class of VPs which are only dependent on previous classes of VPs.
    1029             :         // These will be VPs which are not dependent on any of the currently unordered nodes.
    1030             : 
    1031           0 :         for (VpSet::const_iterator vp = unorderedVps.begin(); vp != unorderedVps.end(); vp ++){
    1032           0 :             if (! followsSet (* vp, unorderedVps)){
    1033           0 :                 nextClass.push_back (* vp);
    1034           0 :                 orderedVps.push_back (* vp);
    1035             :             }
    1036             :         }
    1037             : 
    1038             :         // Remove the VPs that are
    1039             : 
    1040           0 :         for (VPs::const_iterator vp = nextClass.begin(); vp != nextClass.end(); vp++){
    1041           0 :             unorderedVps.erase (* vp);
    1042             :         }
    1043             : 
    1044             :         // If no nodes were found then there must be a cycle and the loop will never
    1045             :         // terminate!
    1046             : 
    1047           0 :         ThrowIf (nextClass.size() == 0, String::format ("VpContainer %s contains a cycle", getName().c_str()));
    1048             : 
    1049             :     }
    1050             : 
    1051           0 :     vps_p = orderedVps;
    1052           0 : }
    1053             : 
    1054             : void
    1055           0 : VpContainer::processingStartImpl ()
    1056             : {
    1057           0 :     iterator i;
    1058             :     try{
    1059           0 :         for (i = begin(); i != end(); i++){
    1060           0 :             (*i)->processingStart ();
    1061             :         }
    1062             :     }
    1063           0 :     catch (AipsError & e){
    1064           0 :         Rethrow (e, String::format ("Error during processingStart for container '%s' VP '%s'",
    1065             :                             getName().c_str(), (*i)->getName().c_str()));
    1066             :     }
    1067           0 : }
    1068             : 
    1069             : void
    1070           0 : VpContainer::remapPorts (VpData & data, const VisibilityProcessor * vp)
    1071             : {
    1072           0 :     vector<VpPort> oldPorts = mapKeys (data);
    1073             : 
    1074           0 :     for (vector<VpPort>::const_iterator oldPort = oldPorts.begin();
    1075           0 :          oldPort != oldPorts.end();
    1076           0 :          oldPort ++){
    1077             : 
    1078           0 :         Network::const_iterator newPortItr = network_p.find (* oldPort);
    1079             : 
    1080           0 :         if (newPortItr != network_p.end()){
    1081             : 
    1082           0 :             VpPort newPort = newPortItr->second;
    1083             : 
    1084           0 :             assert (! utilj::containsKey (newPort, data));
    1085             : 
    1086           0 :             data [newPort] = data [* oldPort];
    1087           0 :             data.erase (* oldPort);
    1088             : 
    1089             :         }
    1090             :         else{
    1091           0 :             ThrowIf (true,
    1092             :                      String::format ("Vp '%s' produced unused output '%s'",
    1093             :                              vp->getFullName().c_str(), oldPort->getName().c_str()));
    1094             :         }
    1095             :     }
    1096           0 : }
    1097             : 
    1098             : size_t
    1099           0 : VpContainer::size() const
    1100             : {
    1101           0 :     return vps_p.size();
    1102             : }
    1103             : 
    1104             : 
    1105             : 
    1106             : void
    1107           0 : VpContainer::validateImpl()
    1108             : {
    1109           0 :     iterator i;
    1110             :     try{
    1111           0 :         for (i = begin(); i != end(); i++){
    1112           0 :             (*i)->validate ();
    1113             :         }
    1114             :     }
    1115           0 :     catch (AipsError & e){
    1116           0 :         Rethrow (e, String::format ("Error during validate for container '%s' VP '%s'",
    1117             :                             getName().c_str(), (*i)->getName().c_str()));
    1118             :     }
    1119             : 
    1120           0 :     orderContents(); // put vps_p into dependency order
    1121           0 : }
    1122             : 
    1123             : String
    1124           0 : VpContainer::VpSet::getNames () const
    1125             : {
    1126           0 :     String nameList = utilj::join (begin(), end(), mem_fun (& VisibilityProcessor::getName), ",");
    1127             : 
    1128           0 :     return nameList;
    1129             : }
    1130             : 
    1131           0 : VpData::VpData ()
    1132           0 : {}
    1133             : 
    1134           0 : VpData::VpData (const VpPort & port, VbPtr vb)
    1135             : {
    1136           0 :     add (port, vb);
    1137           0 : }
    1138             : 
    1139             : void
    1140           0 : VpData::add (const VpPort & port, VbPtr vb)
    1141             : {
    1142           0 :     ThrowIf (utilj::containsKey (port, * this),
    1143             :             String::format ("VpData::add: data already present for port %s.", port.getFullName ().c_str()));
    1144             : 
    1145           0 :     (* this) [port] = vb;
    1146           0 : }
    1147             : 
    1148             : String
    1149           0 : VpData::getNames () const
    1150             : {
    1151           0 :     string names = join (begin(), end(),
    1152             :                      compose (mem_fun_ref (& VpPort::getName),
    1153           0 :                               casa::utilj::firstFunctor<VpPort, VbPtr>()),
    1154           0 :                      ",");
    1155             : 
    1156           0 :     return names;
    1157             : }
    1158             : 
    1159             : VpData
    1160           0 : VpData::getSelection (const VpPorts & ports, bool missingIsOk) const
    1161             : {
    1162           0 :     VpData result;
    1163             : 
    1164           0 :     for (VpPorts::const_iterator port = ports.begin(); port != ports.end(); port ++){
    1165             : 
    1166           0 :         const_iterator data = find (* port);
    1167             : 
    1168           0 :         if (data != end()){
    1169           0 :             result [* port] = data->second;
    1170             :         }
    1171             :         else{
    1172           0 :             assert (missingIsOk);
    1173             :             UnusedVariable (missingIsOk);
    1174             :         }
    1175             :     }
    1176             : 
    1177           0 :     return result;
    1178             : }
    1179             : 
    1180             : 
    1181             : Int VpEngine::logLevel_p = std::numeric_limits<int>::min();
    1182             : LogIO * VpEngine::logIo_p = NULL;
    1183             : LogSink * VpEngine::logSink_p = NULL;
    1184             : Bool VpEngine::loggingInitialized_p = false;
    1185             : 
    1186             : Bool
    1187           0 : VpEngine::initializeLogging()
    1188             : {
    1189             : 
    1190           0 :     AipsrcValue<Int>::find (logLevel_p, getAipsRcBase () + ".debug.logLevel",
    1191           0 :                             std::numeric_limits<int>::min());
    1192             : 
    1193           0 :     if (logLevel_p >= 0){
    1194             : 
    1195           0 :         if (logSink_p == 0){
    1196           0 :             logSink_p = new LogSink(LogMessage::NORMAL, false);
    1197             :         }
    1198             : 
    1199           0 :         logIo_p = new LogIO (LogOrigin ("VisibilityProcessing"));
    1200           0 :         * logIo_p << "VisibilityProcessing logging enabled; level=" << logLevel_p << endl << LogIO::POST;
    1201             : 
    1202             :     }
    1203             : 
    1204           0 :     loggingInitialized_p = true;
    1205             : 
    1206           0 :     return true;
    1207             : }
    1208             : 
    1209             : String
    1210           0 : VpEngine::getAipsRcBase ()
    1211             : {
    1212           0 :     return "VpFramework";
    1213             : }
    1214             : 
    1215             : Int
    1216           0 : VpEngine::getLogLevel ()
    1217             : {
    1218           0 :     return logLevel_p;
    1219             : }
    1220             : 
    1221             : ROVisibilityIterator *
    1222           0 : VpEngine::getVi ()
    1223             : {
    1224           0 :     return vi_p;
    1225             : }
    1226             : 
    1227             : void
    1228           0 : VpEngine::log (const String & formatString, ...)
    1229             : {
    1230           0 :     if (! loggingInitialized_p){
    1231           0 :         initializeLogging ();
    1232             :     }
    1233             : 
    1234             :     va_list vaList;
    1235             : 
    1236           0 :     va_start (vaList, formatString);
    1237             : 
    1238           0 :     String result = formatV (formatString.c_str(), vaList);
    1239             : 
    1240           0 :     va_end (vaList);
    1241             : 
    1242           0 :     (* logIo_p) << result << endl << LogIO::POST;
    1243           0 : }
    1244             : 
    1245             : void
    1246           0 : VpEngine::process (VisibilityProcessor & processor,
    1247             :                    ROVisibilityIterator & vi,
    1248             :                    const String & inputPortName)
    1249             : {
    1250           0 :     ThrowIf (! processor.getInputs ().contains (inputPortName),
    1251             :              String::format ("VisibilityProcessor %s does not have an input port '%s'",
    1252             :                             processor.getName().c_str(), inputPortName.c_str()));
    1253             : 
    1254           0 :     process (processor, vi, processor.getInput (inputPortName));
    1255           0 : }
    1256             : 
    1257             : void
    1258           0 : VpEngine::process (VisibilityProcessor & processor,
    1259             :                    ROVisibilityIterator & vi,
    1260             :                    const VpPort & inputPortProvided)
    1261             : {
    1262           0 :     Log (1, "VpEngine::process starting on processor '%s'", processor.getName().c_str());
    1263             : 
    1264           0 :     vi_p = & vi;
    1265             : 
    1266           0 :     VisBufferAutoPtr vbTemp (vi);
    1267           0 :     VbPtr vb (vbTemp.release());
    1268             : 
    1269           0 :     VpPort inputPort = inputPortProvided;
    1270             : 
    1271           0 :     if (inputPort.empty()){ // Take single input to VP as default if not specified
    1272             : 
    1273           0 :         VpPorts inputs = processor.getInputs ();
    1274             : 
    1275           0 :         ThrowIf (inputs.size() != 1,
    1276             :                  String::format ("Vp '%s' must have exactly one input or an input must be specified explicitly",
    1277             :                          processor.getName().c_str()));
    1278             : 
    1279           0 :         inputPort = inputs.front();
    1280             :     }
    1281             : 
    1282             :     // connect up input and then validate
    1283             : 
    1284           0 :     processor.processingStart ();
    1285             : 
    1286           0 :     Int chunkNumber = 0;
    1287           0 :     Int subchunkNumber = 0;
    1288             : 
    1289             :     try {
    1290             : 
    1291           0 :         for (vi.originChunks ();
    1292           0 :                 vi.moreChunks();
    1293           0 :                 vi.nextChunk (), chunkNumber ++){
    1294             : 
    1295           0 :             Int iteration = 0;
    1296           0 :             VisibilityProcessor::ChunkCode chunkCode = VisibilityProcessor::Normal;
    1297             : 
    1298           0 :             do {  // The VP can request repeating a chunk
    1299             : 
    1300           0 :                 Log (2, "VpEngine::process: Starting chunk %d (iteration=%d)\n",
    1301             :                      chunkNumber, iteration);
    1302             : 
    1303           0 :                 processor.chunkStart (SubchunkIndex (chunkNumber, SubchunkIndex::Invalid, iteration));
    1304             : 
    1305           0 :                 subchunkNumber = 0;
    1306             : 
    1307           0 :                 for (vi.origin (); vi.more (); ++ vi, subchunkNumber ++){
    1308             : 
    1309           0 :                     vb->dirtyComponentsClear();
    1310           0 :                     VisibilityProcessor::ProcessingResult ignored;
    1311             : 
    1312           0 :                     SubchunkIndex sci (chunkNumber, subchunkNumber, iteration);
    1313             : 
    1314           0 :                     Log (2, "VpEngine::process: Starting Subchunk %s \n",
    1315             :                          sci.toString ().c_str());
    1316             : 
    1317           0 :                     VpData data (inputPort, vb);
    1318           0 :                     ignored = processor.doProcessing (VisibilityProcessor::Subchunk,
    1319             :                                                       data,
    1320             :                                                       this,
    1321           0 :                                                       sci);
    1322             :                 }
    1323             : 
    1324           0 :                 VpData noData;
    1325           0 :                 VpData ignored;
    1326           0 :                 std::tie (chunkCode, ignored) =
    1327           0 :                         processor.doProcessing (VisibilityProcessor::EndOfChunk,
    1328             :                                                 noData,
    1329             :                                                 this,
    1330           0 :                                                 SubchunkIndex (chunkNumber, SubchunkIndex::Invalid, iteration));
    1331             : 
    1332           0 :                 iteration ++;
    1333             : 
    1334           0 :             } while (chunkCode == VisibilityProcessor::RepeatChunk);
    1335             : 
    1336             :         }
    1337             :     }
    1338           0 :     catch (AipsError & e){
    1339             :     }
    1340             : 
    1341           0 :     VisibilityProcessor::ProcessingResult ignored;
    1342           0 :     VpData noData;
    1343             : 
    1344           0 :     ignored = processor.doProcessing (VisibilityProcessor::EndOfData,
    1345             :                                       noData,
    1346             :                                       this,
    1347           0 :                                       SubchunkIndex ());
    1348             : 
    1349           0 :     Log (1, "VpEngine::process completed for processor '%s'", processor.getName().c_str());
    1350             : 
    1351           0 : }
    1352             : 
    1353           0 : VpPort::VpPort()
    1354             : : connectedInput_p (false),
    1355             :   connectedOutput_p (false),
    1356             :   name_p (""),
    1357             :   visibilityProcessor_p (NULL),
    1358           0 :   type_p (Unknown)
    1359           0 : {}
    1360             : 
    1361             : 
    1362           0 : VpPort::VpPort (VisibilityProcessor * vp, const String & name, VpPort::Type type)
    1363             : : connectedInput_p (false),
    1364             :   connectedOutput_p (false),
    1365             :   name_p (name),
    1366             :   visibilityProcessor_p (vp),
    1367           0 :   type_p (type)
    1368           0 : {}
    1369             : 
    1370             : Bool
    1371           0 : VpPort::operator== (const VpPort & other) const
    1372             : {
    1373           0 :     Bool result = other.getVp() == getVp() && other.getName() == getName();
    1374             : 
    1375           0 :     return result;
    1376             : }
    1377             : 
    1378             : Bool
    1379           0 : VpPort::operator< (const VpPort & other) const
    1380             : {
    1381           0 :     Bool result = other.getVp() < getVp() ||
    1382           0 :                   (other.getVp() == getVp() && other.getName() < getName());
    1383             : 
    1384           0 :     return result;
    1385             : }
    1386             : 
    1387             : 
    1388             : Bool
    1389           0 : VpPort::empty () const
    1390             : {
    1391           0 :     return visibilityProcessor_p == NULL;
    1392             : }
    1393             : 
    1394             : String
    1395           0 : VpPort::getFullName () const
    1396             : {
    1397           0 :     String vpName = "*NULL*";
    1398           0 :     if (getVp() != NULL){
    1399           0 :         vpName = getVp()->getFullName();
    1400             :     }
    1401             : 
    1402           0 :     return String::format ("%s:%s", vpName.c_str(), getName().c_str());
    1403             : }
    1404             : 
    1405             : String
    1406           0 : VpPort::getName () const
    1407             : {
    1408           0 :     return name_p;
    1409             : }
    1410             : 
    1411             : VpPort::Type
    1412           0 : VpPort::getType () const
    1413             : {
    1414           0 :     return type_p;
    1415             : }
    1416             : 
    1417             : VisibilityProcessor *
    1418           0 : VpPort::getVp ()
    1419             : {
    1420           0 :     return visibilityProcessor_p;
    1421             : }
    1422             : 
    1423             : const VisibilityProcessor *
    1424           0 : VpPort::getVp () const
    1425             : {
    1426           0 :     return visibilityProcessor_p;
    1427             : }
    1428             : 
    1429             : Bool
    1430           0 : VpPort::isConnectedInput () const
    1431             : {
    1432           0 :     return connectedInput_p;
    1433             : }
    1434             : 
    1435             : Bool
    1436           0 : VpPort::isConnectedOutput () const
    1437             : {
    1438           0 :     return connectedOutput_p;
    1439             : }
    1440             : 
    1441             : Bool
    1442           0 : VpPort::isType (Type t) const
    1443             : {
    1444           0 :     return (type_p & t) != 0;
    1445             : }
    1446             : 
    1447             : void
    1448           0 : VpPort::setConnectedInput ()
    1449             : {
    1450           0 :     AssertAlways (! empty() && ! connectedInput_p && isType (Input));
    1451             : 
    1452           0 :     connectedInput_p = true;
    1453           0 : }
    1454             : 
    1455             : void
    1456           0 : VpPort::setConnectedOutput ()
    1457             : {
    1458           0 :     AssertAlways (! empty() && ! connectedOutput_p && isType (Output));
    1459             : 
    1460           0 :     connectedOutput_p = true;
    1461           0 : }
    1462             : 
    1463             : Bool
    1464           0 : VpPorts::contains (const String & name) const
    1465             : {
    1466           0 :     Bool foundIt = find (name, begin(), end()) != end();
    1467             : 
    1468           0 :     return foundIt;
    1469             : }
    1470             : 
    1471             : Bool
    1472           0 : VpPorts::contains (const VpPort & port) const
    1473             : {
    1474           0 :     Bool foundIt = std::find (begin(), end(), port) != end();
    1475             : 
    1476           0 :     return foundIt;
    1477             : }
    1478             : 
    1479             : VpPort
    1480           0 : VpPorts::get (const String & name) const
    1481             : {
    1482           0 :     const_iterator i = find (name, begin(), end());
    1483           0 :     ThrowIf (i == end(), "No such port '" + name + "'");
    1484             : 
    1485           0 :     return * i;
    1486             : }
    1487             : 
    1488             : VpPort &
    1489           0 : VpPorts::getRef (const String & name)
    1490             : {
    1491           0 :     iterator i = find (name, begin(), end());
    1492           0 :     ThrowIf (i == end(), "No such port '" + name + "'");
    1493             : 
    1494           0 :     return * i;
    1495             : }
    1496             : 
    1497             : String
    1498           0 : VpPorts::toString () const
    1499             : {
    1500             : 
    1501           0 :     String result;
    1502             : 
    1503           0 :     result = utilj::containerToString (begin(), end(), & VpPort::getName);
    1504             : 
    1505           0 :     return result;
    1506             : }
    1507             : 
    1508             : 
    1509           0 : WriterVp::WriterVp (const String & name,
    1510             :                     VisibilityIterator * vi,
    1511             :                     Bool advanceVi,
    1512             :                     const String & input,
    1513           0 :                     const String & output)
    1514           0 : : VisibilityProcessor (name, vector<String> (1, input), vector<String> (1, output)),
    1515             :   advanceVi_p (advanceVi),
    1516             :   disableOutput_p (false),
    1517           0 :   vi_p (vi)
    1518             : {
    1519           0 :     ThrowIf (advanceVi_p && vi == NULL,
    1520             :              String::format ("Parameter advanceVi can only be true if a VI is provided for WriterVp '%s',",
    1521             :                      name.c_str()));
    1522           0 : }
    1523             : 
    1524             : VisibilityProcessor::ProcessingResult
    1525           0 : WriterVp::doProcessingImpl (ProcessingType /*processingType*/,
    1526             :                             VpData & inputData,
    1527             :                             const SubchunkIndex & /*subChunkIndex*/)
    1528             : {
    1529           0 :     if (inputData.empty()){
    1530           0 :         return ProcessingResult();  // Nothing to write
    1531             :     }
    1532             : 
    1533           0 :     VpPort inputPort = getInputs () [0];
    1534             : 
    1535           0 :     ThrowIf (! utilj::containsKey (inputPort, inputData),
    1536             :              String::format ("Input data not found for port '%s' in VP '%s'",
    1537             :                      inputPort.getName().c_str(),
    1538             :                      getName().c_str()));
    1539             : 
    1540             :     // Get the (writable) VisibilityIterator
    1541             : 
    1542           0 :     VisibilityIterator * vi = vi_p;
    1543             : 
    1544           0 :     if (vi == NULL){
    1545             : 
    1546             :         // If a VI wasn't provided, then use the one being swept by the VI engine
    1547             : 
    1548           0 :         vi = dynamic_cast <VisibilityIterator *> (getVi());
    1549             :     }
    1550             : 
    1551             : 
    1552           0 :     ThrowIf (vi == NULL, String::format ("No writable VI found in VP '%s'", getName().c_str()));
    1553             : 
    1554             :     // Write out the data to the VI
    1555             : 
    1556             : 
    1557             : 
    1558             :     try{
    1559           0 :         if (! disableOutput_p){
    1560           0 :             vi->writeBack (inputData [inputPort].get());
    1561             : 
    1562           0 :             if (advanceVi_p){
    1563             : 
    1564             :                 // Advance VI to the next position.  If the current chunk is exhausted then
    1565             :                 // advance the chunk and reset to the chunk's origin.
    1566             : 
    1567           0 :                 (* vi) ++;
    1568           0 :                 if (! vi->more()){
    1569           0 :                     vi->nextChunk();
    1570           0 :                     if (vi->moreChunks()){
    1571           0 :                         vi->origin();
    1572             :                     }
    1573             :                 }
    1574             :             }
    1575             :         }
    1576             :     }
    1577           0 :     catch (AipsError & e){
    1578           0 :         Rethrow (e, String::format ("While '%s' writing VB to disk", getName().c_str()));
    1579             :     }
    1580             : 
    1581           0 :     inputData [inputPort] -> dirtyComponentsClear();
    1582             : 
    1583             :     // Output the data if the output of this VP is connected.
    1584             : 
    1585           0 :     VpData outputData; // Start out with empty outputs
    1586             : 
    1587           0 :     VpPorts outputs = getOutputs(true); // get connected outputs
    1588             : 
    1589           0 :     if (! outputs.empty()){
    1590             : 
    1591             :         // An output was connected.
    1592             : 
    1593           0 :         outputData [outputs [0]] = inputData [inputPort];
    1594             :     }
    1595             : 
    1596           0 :     ProcessingResult processingResult (Normal, outputData);
    1597             : 
    1598           0 :     return processingResult;
    1599             : }
    1600             : 
    1601             : Bool
    1602           0 : WriterVp::setDisableOutput (Bool disableIt)
    1603             : {
    1604           0 :     Bool old = disableOutput_p;
    1605           0 :     disableOutput_p = disableIt;
    1606             : 
    1607           0 :     return old;
    1608             : }
    1609             : 
    1610             : void
    1611           0 : WriterVp::validateImpl()
    1612             : {
    1613           0 :     throwIfAnyInputsUnconnected ();
    1614           0 : }
    1615             : 
    1616             : 
    1617             : 
    1618             : ostream &
    1619           0 : operator<< (ostream & os, const VisibilityProcessor::ProcessingType & processingType)
    1620             : {
    1621           0 :     os << toString (processingType);
    1622             : 
    1623           0 :     return os;
    1624             : }
    1625             : 
    1626             : 
    1627             : } // end namespace vpu
    1628             : 
    1629             : using namespace casacore;
    1630             : } // end namespace casa

Generated by: LCOV version 1.16