casa  $Rev:20696$
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines
VisibilityProcessing.h
Go to the documentation of this file.
00001 /*
00002  * VisibilityProcessing.h
00003  *
00004  *  Created on: Feb 8, 2011
00005  *      Author: jjacobs
00006  */
00007 
00008 #ifndef VISIBILITYPROCESSING_H_
00009 #define VISIBILITYPROCESSING_H_
00010 
00011 #include <casa/aips.h>
00012 #include <casa/BasicSL/String.h>
00013 #include <casa/Exceptions/Error.h>
00014 #include "VisBuffer.h"
00015 #include "VisibilityIterator.h"
00016 #include "UtilJ.h"
00017 
00018 #include <boost/iterator/indirect_iterator.hpp>
00019 #include <boost/shared_ptr.hpp>
00020 #include <boost/tuple/tuple.hpp>
00021 #include <boost/utility.hpp>
00022 #include <map>
00023 #include <set>
00024 #include <vector>
00025 
00026 
00027 /*
00028 
00029 Visibility Processing Framework Class Summary
00030 =============================================
00031 
00032 SubchunkIndex - Index of a subchunk.  Consists of the chunk number,
00033                 the subchunk number and the iteration number.  All three
00034                 are zero-based.  The interation number is nonzero if a chunk
00035                 is reprocessed. A subchunk is used to identify a VisBuffer
00036                 relative to the VisibilityIterator managed by the VpEngine.
00037 VbPtr - Smart pointer of a VisBuffer.
00038 VisibilityProcessor - A visibility processing node in data flow graph
00039     VisibilityProcessorStub - A do-nothing node used for unit testing
00040     VpContainer - A VP which contains a graph of VPs.  It handles moving
00041                   data between its input and output ports to the appropriate
00042                   input and output ports of the data flow graph it contains.
00043     SplitterVp - Has a single input port and outputs identical copies of it
00044                  through its output ports.
00045     WriterVp - Takes an input and writes it out to the VisibilityIterator provided
00046                when it was constructed.  Optionally passes the input data to its
00047                output port.
00048 VpEngine - Object that executes a data flow graph of VisibilityProcessors on data
00049            accessed via a VisibilityIterator.
00050 
00051 VpPort - A data port into or out of (or both) a VisibiltyProcessor
00052 VpPorts - A collection of VpPort objects
00053 VpData - A collection of visibility data; it works like an associative array
00054          pairing a VpPort with a VisBuffer.
00055 
00056 */
00057 
00058 namespace casa {
00059 
00060 namespace asyncio {
00061     class PrefetchColumns;
00062 };
00063 
00064 namespace vpf {
00065 
00066 class VisibilityProcessor;
00067 class VpContainer;
00068 class VpEngine;
00069 
00070 class SubchunkIndex {
00071 
00072     friend class SubchunkIndex_Test;
00073 
00074 public:
00075 
00076     enum {Invalid = -1};
00077 
00078     SubchunkIndex (Int chunkNumber = Invalid, Int subChunkNumber = Invalid, Int iteration = Invalid);
00079 
00080     // Comparison Operators
00081     //
00082     // Comparison is in lexicographic order by chunk, subchunk and iteration.
00083 
00084     Bool operator< (const SubchunkIndex & other) const;
00085     Bool operator== (const SubchunkIndex & other) const { return ! (* this < other || other < * this);}
00086     Bool operator!= (const SubchunkIndex & other) const { return ! (* this == other);}
00087 
00088     Int getChunkNumber () const;
00089     Int getIteration () const;
00090     Int getSubchunkNumber () const;
00091 
00092     String toString () const;
00093 
00094 private:
00095 
00096     Int chunkNumber_p;        // -1 for invalid
00097     Int iteration_p;          // -1 for invalid
00098     Int subChunkNumber_p;
00099 };
00100 
00101 class VbPtr : public boost::shared_ptr<casa::VisBuffer> {
00102 
00103 public:
00104 
00105     VbPtr () : boost::shared_ptr<casa::VisBuffer> () {}
00106     explicit VbPtr (casa::VisBuffer * vb) : boost::shared_ptr<casa::VisBuffer> (vb) {}
00107 
00108     // Assignment operator setting VbPtr to a normal pointer.  Ownership is passed to the
00109     // VbPtr so caller must ensure that delete is not called on the VisBuffer.
00110 
00111     VbPtr & operator= (casa::VisBuffer * vb)
00112     {
00113         boost::shared_ptr<casa::VisBuffer>::operator= (VbPtr (vb));
00114         return * this;
00115     }
00116 };
00117 
00118 class VpPort {
00119 
00120     friend class VpContainer;
00121     friend class VpPort_Test;
00122 
00123 public:
00124 
00125     // Normally ports are either input or output ports.  However, the ports
00126     // of a VpContainer do double duty serving as an input to the container and
00127     // an outputted to the input of a contained VP, or vice versa.
00128 
00129     typedef enum {Unknown, Input = 1, Output = 2, InOut = Input | Output} Type;
00130 
00131     VpPort ();
00132     VpPort (VisibilityProcessor * vp, const String & name, Type type);
00133     ~VpPort () {}
00134 
00135     Bool operator< (const VpPort & other) const;
00136     Bool operator== (const VpPort & other) const;
00137 
00138     Bool empty () const;
00139     String getFullName () const; // returns Vp0.Vp1...VpN.portName
00140     String getName () const; // returns portName
00141     Type getType () const; // Returns the port's type as something from the Type enum
00142     Bool isConnectedInput () const; // True if port has been connected up as an input
00143     Bool isConnectedOutput () const; // True if port has been connected up as an output
00144 
00145     // Used to check the type of the port as defined in the Type enum.  InOut ports
00146     // return true for both Input and Output types.
00147 
00148     bool isType (Type t) const;
00149 
00150     //String toString() const;
00151 
00152 protected:
00153 
00154     const VisibilityProcessor * getVp () const;
00155     VisibilityProcessor * getVp ();
00156     void setConnectedInput ();
00157     void setConnectedOutput ();
00158 
00159 private:
00160 
00161     Bool connectedInput_p;
00162     Bool connectedOutput_p;
00163     String name_p;
00164     VisibilityProcessor * visibilityProcessor_p; // [use]
00165     Type type_p;
00166 
00167 };
00168 
00169 class VpPorts : public std::vector<VpPort> {
00170 
00171     friend class VisibilityProcessor;
00172     friend class VpPorts_Test;
00173 
00174 public:
00175 
00176     Bool contains (const String & name) const;
00177     Bool contains (const VpPort & port) const;
00178     VpPort get (const String & name) const;
00179     String toString () const;
00180 
00181 protected:
00182 
00183     VpPort & getRef (const String & name);
00184 
00185     template <typename Itr>
00186     static
00187     Itr
00188     find(const String & name, Itr begin, Itr end)
00189     {
00190         Itr i;
00191 
00192         for (i = begin; i != end; i++){
00193             if (i->getName() == name){
00194                 break;
00195             }
00196         }
00197 
00198         return i;
00199     }
00200 
00201 };
00202 
00203 namespace asyncio {
00204     class PrefetchColumns;
00205 }
00206 
00207 
00208 class VpData: public std::map<VpPort, VbPtr> {
00209 
00210     friend class VpData_Test;
00211 
00212 public:
00213 
00214     VpData ();
00215     VpData (const VpPort & port, VbPtr);
00216 
00217     void add (const VpPort & port, VbPtr); // Adds a (port,VbPtr) to the collection
00218 
00219     // Returns the (port,VbPtr) pairs for the requested set of ports.  An execption
00220     // is thrown if a requested port is not present unless missingIsOk is set to True.
00221 
00222     VpData getSelection (const VpPorts &, bool missingIsOk = False) const;
00223     String getNames () const; // Returns a comma-separated list of the port names.
00224 };
00225 
00226 
00227 class VisibilityProcessor : boost::noncopyable {
00228 
00229     friend class VpContainer;
00230     friend class WriterVp;
00231 
00232 public:
00233 
00234     typedef enum {
00235         Normal,
00236         RepeatChunk
00237     } ChunkCode;
00238 
00239     typedef enum {
00240         Subchunk,    // Normal processing of a subchunk
00241         EndOfChunk,  // Called after all subchunks of a chunk have been processed
00242         EndOfData    // Called after all chunks have been processed
00243     } ProcessingType;
00244 
00245     typedef boost::tuple <ChunkCode, VpData> ProcessingResult;
00246 
00247     VisibilityProcessor ();
00248     VisibilityProcessor (const String & name,
00249                          const vector<String> & inputNames,
00250                          const vector<String> & outputNames = vector<String>(),
00251                          Bool makeIoPorts = False);
00252     virtual ~VisibilityProcessor () {}
00253 
00254     // chunkStart is called to inform the VP that a new chunk is starting.
00255 
00256     void chunkStart (const SubchunkIndex &);
00257 
00258     // Called to cause the VP to process the provided inputs.  It will be called
00259     // in three different contexts as indicated by the ProcessingType.
00260 
00261     ProcessingResult doProcessing (ProcessingType processingType,
00262                                    VpData & inputData,
00263                                    VpEngine * vpEngine,
00264                                    const SubchunkIndex & subChunkIndex);
00265 
00266     // Returns a pointer to the containing VP or NULL if this VP is top-level.
00267 
00268     const VpContainer * getContainer () const { return NULL;}
00269 
00270     // The full name of a VP is a dotted list of the names of all the containing
00271     // VPs ending with the name of this VP (e.g., vp0.vp1...vpN.thisVp).
00272 
00273     String getFullName () const;
00274 
00275     // Returns the input port having the specified name.  Exception if port is undefined.
00276 
00277     VpPort getInput (const String & name) const;
00278 
00279     // Returns a collection of the input ports for this VP; optionally only the
00280     // connected ports are returned.
00281 
00282     VpPorts getInputs (Bool connectedOnly = False) const;
00283 
00284     // Returns the name of this VP
00285 
00286     String getName () const;
00287 
00288     // Returns the number of Subchunks processed (mainly for testing)
00289 
00290     Int getNSubchunksProcessed () const;
00291 
00292     // Returns the number of unique Subchunks (i.e., iteration ignored) processed.
00293     // (mainly for testing)
00294 
00295     Int getNSubchunksUniqueProcessed () const;
00296 
00297     // Returns the output port having the specified name.  Exception if port is undefined.
00298 
00299     VpPort getOutput (const String & name) const;
00300 
00301     // Returns a collection of the output ports for this VP; optionally only the
00302     // connected ports are returned.
00303 
00304     VpPorts getOutputs (Bool connectedOnly = False) const;
00305 
00306     // Returns the collection of columns that need to be prefetched if this node
00307     // is used with async I/O.
00308 
00309     virtual casa::asyncio::PrefetchColumns getPrefetchColumns () const;
00310 
00311     // Called by the framework when the processing is about to begin (i.e., prior
00312     // to the first VisBuffer being fed into the graph.
00313 
00314     void processingStart ();
00315 
00316     // Called to ask the VP to check its validity (i.e., are all needed inputs connected,
00317     // etc.).
00318 
00319     void validate ();
00320 
00321 protected:
00322 
00323     // The public API contains many methods that are not virtual.  However, where subclass-specific
00324     // behavior is potentially useful, a corresponding xxxImpl method is provided.  This allows the
00325     // framework to perform certain required housekeeping options while allowing the subclass
00326     // to perform custom operations.
00327 
00328     // Called on the object when a new chunk is about to be started.
00329 
00330     virtual void chunkStartImpl (const SubchunkIndex &) {}
00331 
00332 
00333     // Defines the set of possible input ports for this VP
00334 
00335     VpPorts definePorts (const vector<String> & portNames, VpPort::Type type, const String & typeName);
00336 
00337     // Requests processing of the provided (possibly empty) input data.  This is called on each
00338     // subchunk (then inputData will be nonempty) and at the end of a chunk and the end of the
00339     // entire data set.  These last two call types allow the VP to output any data that it might have
00340     // been accumulating across multiple subchunks, etc.
00341 
00342     virtual ProcessingResult doProcessingImpl (ProcessingType processingType,
00343                                                VpData & inputData,
00344                                                const SubchunkIndex & subChunkIndex) = 0;
00345 
00346     // Returns a collection of the ports that are not connected using the provided connection
00347     // method; some ports may also be excluded from this list by name.
00348 
00349     VpPorts portsUnconnected (const VpPorts & ports, Bool (VpPort::* isConnected) () const,
00350                               const vector<String> & except = vector<String> ()) const;
00351 
00352     // Called when data processing is about to beging; this allows the VP to perform any
00353     // initialization that it desires now that it is completely connected into the graph.
00354 
00355     virtual void processingStartImpl () {}
00356 
00357     // Methods to ease the validation process.
00358 
00359     void throwIfAnyInputsUnconnected (const vector<String> & exceptThese = vector<String> ()) const;
00360     void throwIfAnyInputsUnconnectedExcept (const String & exceptThisOne) const;
00361     void throwIfAnyOutputsUnconnected (const vector<String> & exceptThese = vector<String> ()) const;
00362     void throwIfAnyOutputsUnconnectedExcept (const String & exceptThisOne) const;
00363     void throwIfAnyPortsUnconnected () const;
00364 
00365     // Called to allow the node to validate its initial state.  An AipsError should be thrown if
00366     // this node decides that it is invalid.
00367 
00368     virtual void validateImpl () = 0;
00369 
00370 private:
00371 
00372     VpPort & getInputRef (const String & name);
00373     VpPort & getOutputRef (const String & name);
00374     void setContainer (const VpContainer *);
00375 
00376     // Prevent copying of existing objects
00377 
00378     VisibilityProcessor (const VisibilityProcessor & other); // do not define
00379     VisibilityProcessor & operator=(const VisibilityProcessor & other); // do not define
00380 
00381     ROVisibilityIterator * getVi (); // returns the VI used for this data set
00382     VpEngine * getVpEngine(); // returns the engine executing this VP
00383 
00384     const VpContainer * container_p; // [use]
00385     String name_p; // name of this VP
00386     Int nSubchunks_p; // number of subchunks processed
00387     Int nSubchunksUnique_p; // number of unique subchunks processed
00388     VpEngine * vpEngine_p; // pointer to VpEngine processing this VP (can be null)
00389     VpPorts vpInputs_p; // collection of input ports
00390     VpPorts vpOutputs_p; // collection of output ports
00391 };
00392 
00393 ostream & operator<< (ostream & os, const VisibilityProcessor::ProcessingType & processingType);
00394 String toString (VisibilityProcessor::ProcessingType p);
00395 
00396 class VisibilityProcessorStub : public VisibilityProcessor {
00397 
00398     // Used to allow definition of a VP variable for use in testing.
00399     // Should never be actually operated on.
00400 
00401 public:
00402 
00403     VisibilityProcessorStub (const String & name)
00404     : VisibilityProcessor (name, utilj::Strings(), utilj::Strings())
00405     {}
00406 
00407     ProcessingResult doProcessingImpl (ProcessingType /*processingType*/,
00408                                        VpData & /*inputData*/,
00409                                        const SubchunkIndex & /*subChunkIndex*/);
00410 
00411     void validateImpl ();
00412 
00413 
00414 };
00415 
00416 //class SimpleVp: public VisibilityProcessor {
00417 //
00418 //public:
00419 //
00420 //    SimpleVp (const String & name, const String & input = "In", const String & output = "");
00421 //    virtual ~SimpleVp ();
00422 //
00423 //protected:
00424 //
00425 //    class SimpleResult : public boost::tuple<ChunkCode, VisBuffer *> {};
00426 //
00427 //    virtual ProcessingResult doProcessingImpl (ProcessingType processingType,
00428 //                                               VpData & inputData,
00429 //                                               const SubchunkIndex & subChunkIndex);
00430 //    virtual void validateImpl ();
00431 //
00432 //private:
00433 //
00434 //};
00435 
00436 class SplitterVp : public VisibilityProcessor {
00437 
00438 public:
00439 
00440     SplitterVp (const String & name,
00441                 const String & inputName,
00442                 const vector<String> & outputNames);
00443 
00444     ~SplitterVp () {}
00445 
00446 protected:
00447 
00448     ProcessingResult doProcessingImpl (ProcessingType processingType ,
00449                                        VpData & inputData,
00450                                        const SubchunkIndex & subChunkIndex);
00451 
00452     void validateImpl ();
00453 };
00454 
00455 class WriterVp: public VisibilityProcessor {
00456 
00457 public:
00458 
00459     // Creates a WriterVp node.  If the vi argument is NULL then the
00460     // flow graph's VI is used.  The advanceVi argument is used to
00461     // direct the node to advance the VI after each write (i.e., perform
00462     // a vi++ operation); advancing the flow graph's VI will cause a
00463     // run time exception.
00464 
00465     WriterVp (const String & name,
00466               VisibilityIterator * vi = NULL,
00467               Bool advanceVi = False,
00468               const String & input = "In",
00469               const String & output = "Out");
00470 
00471     // This paradoxical method allows the user to create a single data flow graph
00472     // and then programmatically decide at run time whether data should be actually
00473     // output on this particular run.
00474 
00475     Bool setDisableOutput (Bool disableIt);
00476 
00477 protected:
00478 
00479     ProcessingResult doProcessingImpl (ProcessingType processingType,
00480                                        VpData & inputData,
00481                                        const SubchunkIndex & subChunkIndex);
00482 
00483     void validateImpl ();
00484 
00485 private:
00486 
00487     Bool advanceVi_p; // true is VI is to be advanced after each write.
00488                       // N.B., advancing the flow graphs VI is prohibited
00489     Bool disableOutput_p; // true if output is disabled.
00490     VisibilityIterator * vi_p; // VI to use for output.
00491 };
00492 
00493 class VpContainer : public VisibilityProcessor {
00494 
00495     friend class VisibilityProcessing;
00496 
00497 public:
00498 
00499     // Creates a VpContainer object providing the specified inputs and outputs.
00500     // These inputs and outputs will potentially be connected to the inputs and
00501     // outputs of the VPs that are contained in the container.
00502 
00503     VpContainer (const String & name,
00504                  const vector<String> & inputs = vector<String> (1, "In"),
00505                  const vector<String> & outputs = vector<String> ());
00506 
00507     virtual ~VpContainer () {}
00508 
00509     // Adds a VP to the container.  Exception if VP is already in the container.
00510 
00511     virtual void add (VisibilityProcessor * processor);
00512 
00513     // Connects the specified output to the specified input.  The VP pointer may be
00514     // omitted if the port belongs to the container.
00515 
00516     virtual void connect (VisibilityProcessor * sourceVp, const String &  sourcePortName,
00517                           VisibilityProcessor * sinkVp, const String &  sinkPortName);
00518     virtual void connect (const String &  sourcePortName,
00519                           VisibilityProcessor * sinkVp, const String &  sinkPortName);
00520     virtual void connect (VisibilityProcessor * sourceVp, const String &  sourcePortName,
00521                           const String &  sinkPortName);
00522 
00523     virtual void chunkStart (const SubchunkIndex & sci);
00524 
00525     // Fills the container with the specified set of VPs.  The container must be
00526     // empty prior to this call.
00527 
00528     virtual void fillWithSequence (VisibilityProcessor * first, ...); // Last one NULL
00529 
00530     // Returns the columns that are required to be prefetched if async I/O is used.
00531 
00532     virtual casa::asyncio::PrefetchColumns getPrefetchColumns () const;
00533 
00534 protected:
00535 
00536     typedef vector<VisibilityProcessor *> VPs; // VPs are used (not owned)
00537     typedef boost::indirect_iterator <VPs::const_iterator> const_iterator;
00538     typedef boost::indirect_iterator <VPs::iterator> iterator;
00539 
00540     iterator begin();
00541     const_iterator begin() const;
00542 
00543     Bool contains (const VisibilityProcessor *) const;
00544     virtual ProcessingResult doProcessingImpl (ProcessingType processingType,
00545                                                VpData & inputData,
00546                                                const SubchunkIndex & subChunkIndex);
00547     Bool empty () const;
00548     iterator end();
00549     const_iterator end() const;
00550     virtual void processingStartImpl ();
00551     size_t size() const;
00552     virtual void validateImpl ();
00553 
00554 private:
00555 
00556     typedef std::map<VpPort, VpPort> Network;
00557     typedef std::set<VpPort> NetworkReverse;
00558     typedef boost::tuple<VisibilityProcessor *, VpData> ReadyVpAndData;
00559 
00560     class VpSet : public std::set<VisibilityProcessor *> {
00561     public:
00562 
00563         template <typename In>
00564         VpSet (In begin, In end) : std::set<VisibilityProcessor *> (begin, end) {}
00565         String getNames () const;
00566     };
00567 
00568     Network network_p; // connections between the ports of the connected nodes
00569     NetworkReverse networkReverse_p; // connections of contets except indexed in
00570                                      // backwards order.
00571     VPs vps_p; // the VPs contained by this container.
00572 
00573     ReadyVpAndData findReadyVp (VpSet & vpsWaiting, VpData & inputs, bool flushing) const;
00574     ReadyVpAndData findReadyVpFlushing (VpSet & vpsWaiting, VpData & inputs) const;
00575     ReadyVpAndData findReadyVpNormal (VpSet & vpsWaiting, VpData & inputs) const;
00576     bool follows (const VisibilityProcessor * a, const VisibilityProcessor * b) const;
00577     bool followsSet (const VisibilityProcessor * a, const VpSet & vpSet) const;
00578     void orderContents ();
00579     void remapPorts (VpData & data, const VisibilityProcessor *);
00580     pair<VpPort,VpPort> validateConnectionPorts (VisibilityProcessor * sourceVp,
00581                                                  const String &  sourcePortName,
00582                                                  VisibilityProcessor * sinkVp,
00583                                                  const String &  sinkPortName);
00584 };
00585 
00586 class VpEngine {
00587 
00588     friend class VisibilityProcessor;
00589 
00590 public:
00591 
00592     VpEngine () : vi_p (NULL) {}
00593 
00594     // Process the data set swept by the VisibilityIterator using the
00595     // VisibilityProcessor provided with the optionally specified port
00596     // as the input.
00597 
00598     void process (VisibilityProcessor & processor,
00599                   ROVisibilityIterator & vi,
00600                   const String & inputPortName);
00601 
00602     void process (VisibilityProcessor & processor,
00603                   ROVisibilityIterator & vi,
00604                   const VpPort & inputPort = VpPort ());
00605 
00606     static Int getLogLevel ();
00607     static void log (const String & format, ...);
00608     static String getAipsRcBase ();
00609 
00610 private:
00611 
00612     ROVisibilityIterator * vi_p; // [use]
00613 
00614     static Int logLevel_p;
00615     static LogIO * logIo_p;
00616     static Bool loggingInitialized_p;
00617     static LogSink * logSink_p;
00618 
00619     static Bool initializeLogging ();
00620 
00621     ROVisibilityIterator * getVi ();
00622 
00623 };
00624 
00625 } // end namespace vpu
00626 
00627 } // end namespace casa
00628 
00629 
00630 /*
00631 
00632     VisibilityProcessor vp1;
00633     VisibilityProcessor vp2;
00634     VpuContainer vpc1;
00635 
00636     vpc1.add (vp1);
00637     vpc1.add (vp2);
00638 
00639     vpc1.connect (vp1.getOutput (Out), vp2.getInput (In));
00640     vpc1.connect (vpc1.getInput (In), vp1.getInput (In));
00641     vpc1.connect (vp2.getOutput (Out), vpc1.getOutput (Out));
00642 
00643     VpuContainer vpc2;
00644     VpuContainer vpc0;
00645 
00646     vpc0.add (vpc1, vpc2);
00647     vpc0.connect (vpc1.getOutput (Out), vpc2.getOutput (In));
00648     vpc0.connect (vpc0.getOutput (In), vpc1.getInput (In));
00649     vpc0.connect (vpc1.getOutput (Out), vpc0.getOutput (Out));
00650 
00651     vpc0.validate ();
00652 
00653  */
00654 
00655 
00656 
00657 #endif /* VISIBILITYPROCESSING_H_ */