casa
$Rev:20696$
|
00001 /* 00002 * VisibilityProcessing.h 00003 * 00004 * Created on: Feb 8, 2011 00005 * Author: jjacobs 00006 */ 00007 00008 #ifndef VISIBILITYPROCESSING_H_ 00009 #define VISIBILITYPROCESSING_H_ 00010 00011 #include <casa/aips.h> 00012 #include <casa/BasicSL/String.h> 00013 #include <casa/Exceptions/Error.h> 00014 #include "VisBuffer.h" 00015 #include "VisibilityIterator.h" 00016 #include "UtilJ.h" 00017 00018 #include <boost/iterator/indirect_iterator.hpp> 00019 #include <boost/shared_ptr.hpp> 00020 #include <boost/tuple/tuple.hpp> 00021 #include <boost/utility.hpp> 00022 #include <map> 00023 #include <set> 00024 #include <vector> 00025 00026 00027 /* 00028 00029 Visibility Processing Framework Class Summary 00030 ============================================= 00031 00032 SubchunkIndex - Index of a subchunk. Consists of the chunk number, 00033 the subchunk number and the iteration number. All three 00034 are zero-based. The interation number is nonzero if a chunk 00035 is reprocessed. A subchunk is used to identify a VisBuffer 00036 relative to the VisibilityIterator managed by the VpEngine. 00037 VbPtr - Smart pointer of a VisBuffer. 00038 VisibilityProcessor - A visibility processing node in data flow graph 00039 VisibilityProcessorStub - A do-nothing node used for unit testing 00040 VpContainer - A VP which contains a graph of VPs. It handles moving 00041 data between its input and output ports to the appropriate 00042 input and output ports of the data flow graph it contains. 00043 SplitterVp - Has a single input port and outputs identical copies of it 00044 through its output ports. 00045 WriterVp - Takes an input and writes it out to the VisibilityIterator provided 00046 when it was constructed. Optionally passes the input data to its 00047 output port. 00048 VpEngine - Object that executes a data flow graph of VisibilityProcessors on data 00049 accessed via a VisibilityIterator. 00050 00051 VpPort - A data port into or out of (or both) a VisibiltyProcessor 00052 VpPorts - A collection of VpPort objects 00053 VpData - A collection of visibility data; it works like an associative array 00054 pairing a VpPort with a VisBuffer. 00055 00056 */ 00057 00058 namespace casa { 00059 00060 namespace asyncio { 00061 class PrefetchColumns; 00062 }; 00063 00064 namespace vpf { 00065 00066 class VisibilityProcessor; 00067 class VpContainer; 00068 class VpEngine; 00069 00070 class SubchunkIndex { 00071 00072 friend class SubchunkIndex_Test; 00073 00074 public: 00075 00076 enum {Invalid = -1}; 00077 00078 SubchunkIndex (Int chunkNumber = Invalid, Int subChunkNumber = Invalid, Int iteration = Invalid); 00079 00080 // Comparison Operators 00081 // 00082 // Comparison is in lexicographic order by chunk, subchunk and iteration. 00083 00084 Bool operator< (const SubchunkIndex & other) const; 00085 Bool operator== (const SubchunkIndex & other) const { return ! (* this < other || other < * this);} 00086 Bool operator!= (const SubchunkIndex & other) const { return ! (* this == other);} 00087 00088 Int getChunkNumber () const; 00089 Int getIteration () const; 00090 Int getSubchunkNumber () const; 00091 00092 String toString () const; 00093 00094 private: 00095 00096 Int chunkNumber_p; // -1 for invalid 00097 Int iteration_p; // -1 for invalid 00098 Int subChunkNumber_p; 00099 }; 00100 00101 class VbPtr : public boost::shared_ptr<casa::VisBuffer> { 00102 00103 public: 00104 00105 VbPtr () : boost::shared_ptr<casa::VisBuffer> () {} 00106 explicit VbPtr (casa::VisBuffer * vb) : boost::shared_ptr<casa::VisBuffer> (vb) {} 00107 00108 // Assignment operator setting VbPtr to a normal pointer. Ownership is passed to the 00109 // VbPtr so caller must ensure that delete is not called on the VisBuffer. 00110 00111 VbPtr & operator= (casa::VisBuffer * vb) 00112 { 00113 boost::shared_ptr<casa::VisBuffer>::operator= (VbPtr (vb)); 00114 return * this; 00115 } 00116 }; 00117 00118 class VpPort { 00119 00120 friend class VpContainer; 00121 friend class VpPort_Test; 00122 00123 public: 00124 00125 // Normally ports are either input or output ports. However, the ports 00126 // of a VpContainer do double duty serving as an input to the container and 00127 // an outputted to the input of a contained VP, or vice versa. 00128 00129 typedef enum {Unknown, Input = 1, Output = 2, InOut = Input | Output} Type; 00130 00131 VpPort (); 00132 VpPort (VisibilityProcessor * vp, const String & name, Type type); 00133 ~VpPort () {} 00134 00135 Bool operator< (const VpPort & other) const; 00136 Bool operator== (const VpPort & other) const; 00137 00138 Bool empty () const; 00139 String getFullName () const; // returns Vp0.Vp1...VpN.portName 00140 String getName () const; // returns portName 00141 Type getType () const; // Returns the port's type as something from the Type enum 00142 Bool isConnectedInput () const; // True if port has been connected up as an input 00143 Bool isConnectedOutput () const; // True if port has been connected up as an output 00144 00145 // Used to check the type of the port as defined in the Type enum. InOut ports 00146 // return true for both Input and Output types. 00147 00148 bool isType (Type t) const; 00149 00150 //String toString() const; 00151 00152 protected: 00153 00154 const VisibilityProcessor * getVp () const; 00155 VisibilityProcessor * getVp (); 00156 void setConnectedInput (); 00157 void setConnectedOutput (); 00158 00159 private: 00160 00161 Bool connectedInput_p; 00162 Bool connectedOutput_p; 00163 String name_p; 00164 VisibilityProcessor * visibilityProcessor_p; // [use] 00165 Type type_p; 00166 00167 }; 00168 00169 class VpPorts : public std::vector<VpPort> { 00170 00171 friend class VisibilityProcessor; 00172 friend class VpPorts_Test; 00173 00174 public: 00175 00176 Bool contains (const String & name) const; 00177 Bool contains (const VpPort & port) const; 00178 VpPort get (const String & name) const; 00179 String toString () const; 00180 00181 protected: 00182 00183 VpPort & getRef (const String & name); 00184 00185 template <typename Itr> 00186 static 00187 Itr 00188 find(const String & name, Itr begin, Itr end) 00189 { 00190 Itr i; 00191 00192 for (i = begin; i != end; i++){ 00193 if (i->getName() == name){ 00194 break; 00195 } 00196 } 00197 00198 return i; 00199 } 00200 00201 }; 00202 00203 namespace asyncio { 00204 class PrefetchColumns; 00205 } 00206 00207 00208 class VpData: public std::map<VpPort, VbPtr> { 00209 00210 friend class VpData_Test; 00211 00212 public: 00213 00214 VpData (); 00215 VpData (const VpPort & port, VbPtr); 00216 00217 void add (const VpPort & port, VbPtr); // Adds a (port,VbPtr) to the collection 00218 00219 // Returns the (port,VbPtr) pairs for the requested set of ports. An execption 00220 // is thrown if a requested port is not present unless missingIsOk is set to True. 00221 00222 VpData getSelection (const VpPorts &, bool missingIsOk = False) const; 00223 String getNames () const; // Returns a comma-separated list of the port names. 00224 }; 00225 00226 00227 class VisibilityProcessor : boost::noncopyable { 00228 00229 friend class VpContainer; 00230 friend class WriterVp; 00231 00232 public: 00233 00234 typedef enum { 00235 Normal, 00236 RepeatChunk 00237 } ChunkCode; 00238 00239 typedef enum { 00240 Subchunk, // Normal processing of a subchunk 00241 EndOfChunk, // Called after all subchunks of a chunk have been processed 00242 EndOfData // Called after all chunks have been processed 00243 } ProcessingType; 00244 00245 typedef boost::tuple <ChunkCode, VpData> ProcessingResult; 00246 00247 VisibilityProcessor (); 00248 VisibilityProcessor (const String & name, 00249 const vector<String> & inputNames, 00250 const vector<String> & outputNames = vector<String>(), 00251 Bool makeIoPorts = False); 00252 virtual ~VisibilityProcessor () {} 00253 00254 // chunkStart is called to inform the VP that a new chunk is starting. 00255 00256 void chunkStart (const SubchunkIndex &); 00257 00258 // Called to cause the VP to process the provided inputs. It will be called 00259 // in three different contexts as indicated by the ProcessingType. 00260 00261 ProcessingResult doProcessing (ProcessingType processingType, 00262 VpData & inputData, 00263 VpEngine * vpEngine, 00264 const SubchunkIndex & subChunkIndex); 00265 00266 // Returns a pointer to the containing VP or NULL if this VP is top-level. 00267 00268 const VpContainer * getContainer () const { return NULL;} 00269 00270 // The full name of a VP is a dotted list of the names of all the containing 00271 // VPs ending with the name of this VP (e.g., vp0.vp1...vpN.thisVp). 00272 00273 String getFullName () const; 00274 00275 // Returns the input port having the specified name. Exception if port is undefined. 00276 00277 VpPort getInput (const String & name) const; 00278 00279 // Returns a collection of the input ports for this VP; optionally only the 00280 // connected ports are returned. 00281 00282 VpPorts getInputs (Bool connectedOnly = False) const; 00283 00284 // Returns the name of this VP 00285 00286 String getName () const; 00287 00288 // Returns the number of Subchunks processed (mainly for testing) 00289 00290 Int getNSubchunksProcessed () const; 00291 00292 // Returns the number of unique Subchunks (i.e., iteration ignored) processed. 00293 // (mainly for testing) 00294 00295 Int getNSubchunksUniqueProcessed () const; 00296 00297 // Returns the output port having the specified name. Exception if port is undefined. 00298 00299 VpPort getOutput (const String & name) const; 00300 00301 // Returns a collection of the output ports for this VP; optionally only the 00302 // connected ports are returned. 00303 00304 VpPorts getOutputs (Bool connectedOnly = False) const; 00305 00306 // Returns the collection of columns that need to be prefetched if this node 00307 // is used with async I/O. 00308 00309 virtual casa::asyncio::PrefetchColumns getPrefetchColumns () const; 00310 00311 // Called by the framework when the processing is about to begin (i.e., prior 00312 // to the first VisBuffer being fed into the graph. 00313 00314 void processingStart (); 00315 00316 // Called to ask the VP to check its validity (i.e., are all needed inputs connected, 00317 // etc.). 00318 00319 void validate (); 00320 00321 protected: 00322 00323 // The public API contains many methods that are not virtual. However, where subclass-specific 00324 // behavior is potentially useful, a corresponding xxxImpl method is provided. This allows the 00325 // framework to perform certain required housekeeping options while allowing the subclass 00326 // to perform custom operations. 00327 00328 // Called on the object when a new chunk is about to be started. 00329 00330 virtual void chunkStartImpl (const SubchunkIndex &) {} 00331 00332 00333 // Defines the set of possible input ports for this VP 00334 00335 VpPorts definePorts (const vector<String> & portNames, VpPort::Type type, const String & typeName); 00336 00337 // Requests processing of the provided (possibly empty) input data. This is called on each 00338 // subchunk (then inputData will be nonempty) and at the end of a chunk and the end of the 00339 // entire data set. These last two call types allow the VP to output any data that it might have 00340 // been accumulating across multiple subchunks, etc. 00341 00342 virtual ProcessingResult doProcessingImpl (ProcessingType processingType, 00343 VpData & inputData, 00344 const SubchunkIndex & subChunkIndex) = 0; 00345 00346 // Returns a collection of the ports that are not connected using the provided connection 00347 // method; some ports may also be excluded from this list by name. 00348 00349 VpPorts portsUnconnected (const VpPorts & ports, Bool (VpPort::* isConnected) () const, 00350 const vector<String> & except = vector<String> ()) const; 00351 00352 // Called when data processing is about to beging; this allows the VP to perform any 00353 // initialization that it desires now that it is completely connected into the graph. 00354 00355 virtual void processingStartImpl () {} 00356 00357 // Methods to ease the validation process. 00358 00359 void throwIfAnyInputsUnconnected (const vector<String> & exceptThese = vector<String> ()) const; 00360 void throwIfAnyInputsUnconnectedExcept (const String & exceptThisOne) const; 00361 void throwIfAnyOutputsUnconnected (const vector<String> & exceptThese = vector<String> ()) const; 00362 void throwIfAnyOutputsUnconnectedExcept (const String & exceptThisOne) const; 00363 void throwIfAnyPortsUnconnected () const; 00364 00365 // Called to allow the node to validate its initial state. An AipsError should be thrown if 00366 // this node decides that it is invalid. 00367 00368 virtual void validateImpl () = 0; 00369 00370 private: 00371 00372 VpPort & getInputRef (const String & name); 00373 VpPort & getOutputRef (const String & name); 00374 void setContainer (const VpContainer *); 00375 00376 // Prevent copying of existing objects 00377 00378 VisibilityProcessor (const VisibilityProcessor & other); // do not define 00379 VisibilityProcessor & operator=(const VisibilityProcessor & other); // do not define 00380 00381 ROVisibilityIterator * getVi (); // returns the VI used for this data set 00382 VpEngine * getVpEngine(); // returns the engine executing this VP 00383 00384 const VpContainer * container_p; // [use] 00385 String name_p; // name of this VP 00386 Int nSubchunks_p; // number of subchunks processed 00387 Int nSubchunksUnique_p; // number of unique subchunks processed 00388 VpEngine * vpEngine_p; // pointer to VpEngine processing this VP (can be null) 00389 VpPorts vpInputs_p; // collection of input ports 00390 VpPorts vpOutputs_p; // collection of output ports 00391 }; 00392 00393 ostream & operator<< (ostream & os, const VisibilityProcessor::ProcessingType & processingType); 00394 String toString (VisibilityProcessor::ProcessingType p); 00395 00396 class VisibilityProcessorStub : public VisibilityProcessor { 00397 00398 // Used to allow definition of a VP variable for use in testing. 00399 // Should never be actually operated on. 00400 00401 public: 00402 00403 VisibilityProcessorStub (const String & name) 00404 : VisibilityProcessor (name, utilj::Strings(), utilj::Strings()) 00405 {} 00406 00407 ProcessingResult doProcessingImpl (ProcessingType /*processingType*/, 00408 VpData & /*inputData*/, 00409 const SubchunkIndex & /*subChunkIndex*/); 00410 00411 void validateImpl (); 00412 00413 00414 }; 00415 00416 //class SimpleVp: public VisibilityProcessor { 00417 // 00418 //public: 00419 // 00420 // SimpleVp (const String & name, const String & input = "In", const String & output = ""); 00421 // virtual ~SimpleVp (); 00422 // 00423 //protected: 00424 // 00425 // class SimpleResult : public boost::tuple<ChunkCode, VisBuffer *> {}; 00426 // 00427 // virtual ProcessingResult doProcessingImpl (ProcessingType processingType, 00428 // VpData & inputData, 00429 // const SubchunkIndex & subChunkIndex); 00430 // virtual void validateImpl (); 00431 // 00432 //private: 00433 // 00434 //}; 00435 00436 class SplitterVp : public VisibilityProcessor { 00437 00438 public: 00439 00440 SplitterVp (const String & name, 00441 const String & inputName, 00442 const vector<String> & outputNames); 00443 00444 ~SplitterVp () {} 00445 00446 protected: 00447 00448 ProcessingResult doProcessingImpl (ProcessingType processingType , 00449 VpData & inputData, 00450 const SubchunkIndex & subChunkIndex); 00451 00452 void validateImpl (); 00453 }; 00454 00455 class WriterVp: public VisibilityProcessor { 00456 00457 public: 00458 00459 // Creates a WriterVp node. If the vi argument is NULL then the 00460 // flow graph's VI is used. The advanceVi argument is used to 00461 // direct the node to advance the VI after each write (i.e., perform 00462 // a vi++ operation); advancing the flow graph's VI will cause a 00463 // run time exception. 00464 00465 WriterVp (const String & name, 00466 VisibilityIterator * vi = NULL, 00467 Bool advanceVi = False, 00468 const String & input = "In", 00469 const String & output = "Out"); 00470 00471 // This paradoxical method allows the user to create a single data flow graph 00472 // and then programmatically decide at run time whether data should be actually 00473 // output on this particular run. 00474 00475 Bool setDisableOutput (Bool disableIt); 00476 00477 protected: 00478 00479 ProcessingResult doProcessingImpl (ProcessingType processingType, 00480 VpData & inputData, 00481 const SubchunkIndex & subChunkIndex); 00482 00483 void validateImpl (); 00484 00485 private: 00486 00487 Bool advanceVi_p; // true is VI is to be advanced after each write. 00488 // N.B., advancing the flow graphs VI is prohibited 00489 Bool disableOutput_p; // true if output is disabled. 00490 VisibilityIterator * vi_p; // VI to use for output. 00491 }; 00492 00493 class VpContainer : public VisibilityProcessor { 00494 00495 friend class VisibilityProcessing; 00496 00497 public: 00498 00499 // Creates a VpContainer object providing the specified inputs and outputs. 00500 // These inputs and outputs will potentially be connected to the inputs and 00501 // outputs of the VPs that are contained in the container. 00502 00503 VpContainer (const String & name, 00504 const vector<String> & inputs = vector<String> (1, "In"), 00505 const vector<String> & outputs = vector<String> ()); 00506 00507 virtual ~VpContainer () {} 00508 00509 // Adds a VP to the container. Exception if VP is already in the container. 00510 00511 virtual void add (VisibilityProcessor * processor); 00512 00513 // Connects the specified output to the specified input. The VP pointer may be 00514 // omitted if the port belongs to the container. 00515 00516 virtual void connect (VisibilityProcessor * sourceVp, const String & sourcePortName, 00517 VisibilityProcessor * sinkVp, const String & sinkPortName); 00518 virtual void connect (const String & sourcePortName, 00519 VisibilityProcessor * sinkVp, const String & sinkPortName); 00520 virtual void connect (VisibilityProcessor * sourceVp, const String & sourcePortName, 00521 const String & sinkPortName); 00522 00523 virtual void chunkStart (const SubchunkIndex & sci); 00524 00525 // Fills the container with the specified set of VPs. The container must be 00526 // empty prior to this call. 00527 00528 virtual void fillWithSequence (VisibilityProcessor * first, ...); // Last one NULL 00529 00530 // Returns the columns that are required to be prefetched if async I/O is used. 00531 00532 virtual casa::asyncio::PrefetchColumns getPrefetchColumns () const; 00533 00534 protected: 00535 00536 typedef vector<VisibilityProcessor *> VPs; // VPs are used (not owned) 00537 typedef boost::indirect_iterator <VPs::const_iterator> const_iterator; 00538 typedef boost::indirect_iterator <VPs::iterator> iterator; 00539 00540 iterator begin(); 00541 const_iterator begin() const; 00542 00543 Bool contains (const VisibilityProcessor *) const; 00544 virtual ProcessingResult doProcessingImpl (ProcessingType processingType, 00545 VpData & inputData, 00546 const SubchunkIndex & subChunkIndex); 00547 Bool empty () const; 00548 iterator end(); 00549 const_iterator end() const; 00550 virtual void processingStartImpl (); 00551 size_t size() const; 00552 virtual void validateImpl (); 00553 00554 private: 00555 00556 typedef std::map<VpPort, VpPort> Network; 00557 typedef std::set<VpPort> NetworkReverse; 00558 typedef boost::tuple<VisibilityProcessor *, VpData> ReadyVpAndData; 00559 00560 class VpSet : public std::set<VisibilityProcessor *> { 00561 public: 00562 00563 template <typename In> 00564 VpSet (In begin, In end) : std::set<VisibilityProcessor *> (begin, end) {} 00565 String getNames () const; 00566 }; 00567 00568 Network network_p; // connections between the ports of the connected nodes 00569 NetworkReverse networkReverse_p; // connections of contets except indexed in 00570 // backwards order. 00571 VPs vps_p; // the VPs contained by this container. 00572 00573 ReadyVpAndData findReadyVp (VpSet & vpsWaiting, VpData & inputs, bool flushing) const; 00574 ReadyVpAndData findReadyVpFlushing (VpSet & vpsWaiting, VpData & inputs) const; 00575 ReadyVpAndData findReadyVpNormal (VpSet & vpsWaiting, VpData & inputs) const; 00576 bool follows (const VisibilityProcessor * a, const VisibilityProcessor * b) const; 00577 bool followsSet (const VisibilityProcessor * a, const VpSet & vpSet) const; 00578 void orderContents (); 00579 void remapPorts (VpData & data, const VisibilityProcessor *); 00580 pair<VpPort,VpPort> validateConnectionPorts (VisibilityProcessor * sourceVp, 00581 const String & sourcePortName, 00582 VisibilityProcessor * sinkVp, 00583 const String & sinkPortName); 00584 }; 00585 00586 class VpEngine { 00587 00588 friend class VisibilityProcessor; 00589 00590 public: 00591 00592 VpEngine () : vi_p (NULL) {} 00593 00594 // Process the data set swept by the VisibilityIterator using the 00595 // VisibilityProcessor provided with the optionally specified port 00596 // as the input. 00597 00598 void process (VisibilityProcessor & processor, 00599 ROVisibilityIterator & vi, 00600 const String & inputPortName); 00601 00602 void process (VisibilityProcessor & processor, 00603 ROVisibilityIterator & vi, 00604 const VpPort & inputPort = VpPort ()); 00605 00606 static Int getLogLevel (); 00607 static void log (const String & format, ...); 00608 static String getAipsRcBase (); 00609 00610 private: 00611 00612 ROVisibilityIterator * vi_p; // [use] 00613 00614 static Int logLevel_p; 00615 static LogIO * logIo_p; 00616 static Bool loggingInitialized_p; 00617 static LogSink * logSink_p; 00618 00619 static Bool initializeLogging (); 00620 00621 ROVisibilityIterator * getVi (); 00622 00623 }; 00624 00625 } // end namespace vpu 00626 00627 } // end namespace casa 00628 00629 00630 /* 00631 00632 VisibilityProcessor vp1; 00633 VisibilityProcessor vp2; 00634 VpuContainer vpc1; 00635 00636 vpc1.add (vp1); 00637 vpc1.add (vp2); 00638 00639 vpc1.connect (vp1.getOutput (Out), vp2.getInput (In)); 00640 vpc1.connect (vpc1.getInput (In), vp1.getInput (In)); 00641 vpc1.connect (vp2.getOutput (Out), vpc1.getOutput (Out)); 00642 00643 VpuContainer vpc2; 00644 VpuContainer vpc0; 00645 00646 vpc0.add (vpc1, vpc2); 00647 vpc0.connect (vpc1.getOutput (Out), vpc2.getOutput (In)); 00648 vpc0.connect (vpc0.getOutput (In), vpc1.getInput (In)); 00649 vpc0.connect (vpc1.getOutput (Out), vpc0.getOutput (Out)); 00650 00651 vpc0.validate (); 00652 00653 */ 00654 00655 00656 00657 #endif /* VISIBILITYPROCESSING_H_ */