casa  $Rev:20696$
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines
AsynchronousInterface.h
Go to the documentation of this file.
00001 /*
00002  * VlaData.h
00003  *
00004  *  Created on: Sep 21, 2011
00005  *      Author: jjacobs
00006  */
00007 
00008 #ifndef ASYNCHRONOUS_INTERFACE_H_
00009 #define ASYNCHRONOUS_INTERFACE_H_
00010 
00011 #include "AsynchronousTools.h"
00012 #include "UtilJ.h"
00013 
00014 using casa::utilj::ThreadTimes;
00015 using casa::utilj::DeltaThreadTimes;
00016 
00017 #include <casa/Arrays/Cube.h>
00018 #include <casa/Arrays/Matrix.h>
00019 #include <casa/Arrays/Vector.h>
00020 #include <casa/Containers/Block.h>
00021 #include <casa/Quanta/MVRadialVelocity.h>
00022 #include <measures/Measures/MRadialVelocity.h>
00023 #include <measures/Measures/MDoppler.h>
00024 #include <synthesis/MSVis/VisBufferAsync.h>
00025 #include <synthesis/MSVis/VisibilityIterator.h>
00026 #include <synthesis/MSVis/VisibilityIteratorImpl.h>
00027 
00028 #include <boost/noncopyable.hpp>
00031 #include <boost/thread/condition_variable.hpp>
00032 #include <boost/thread/locks.hpp>
00033 #include <boost/thread/recursive_mutex.hpp>
00035 #include <memory>
00036 #include <queue>
00037 #include <vector>
00038 
00039 namespace casa {
00040 
00041 class ROVisibilityIterator;
00042 
00043 namespace asyncio {
00044 
00045 class RoviaModifier {
00046 public:
00047 
00048     friend std::ostream & operator<< (std::ostream & o, const RoviaModifier & m);
00049 
00050     virtual ~RoviaModifier () {}
00051     virtual void apply (ROVisibilityIterator *) const = 0;
00052 
00053 private:
00054 
00055     virtual void print (std::ostream & o) const = 0;
00056 
00057 };
00058 
00059 class ChannelSelection {
00060 
00061 public:
00062 
00063     ChannelSelection () {}
00064 
00065     ChannelSelection (const Block< Vector<Int> > & blockNGroup,
00066                       const Block< Vector<Int> > & blockStart,
00067                       const Block< Vector<Int> > & blockWidth,
00068                       const Block< Vector<Int> > & blockIncr,
00069                       const Block< Vector<Int> > & blockSpw);
00070 
00071     ChannelSelection (const ChannelSelection & other);
00072     ChannelSelection & operator= (const ChannelSelection & other);
00073 
00074 
00075     void
00076     get (Block< Vector<Int> > & blockNGroup,
00077          Block< Vector<Int> > & blockStart,
00078          Block< Vector<Int> > & blockWidth,
00079          Block< Vector<Int> > & blockIncr,
00080          Block< Vector<Int> > & blockSpw) const;
00081 
00082 protected:
00083 
00084     void copyBlock (const Block <Vector<Int> > & src,
00085                     Block <Vector<Int> > & to) const;
00086 
00087 private:
00088 
00089     Block< Vector<Int> > blockNGroup_p;
00090     Block< Vector<Int> > blockStart_p;
00091     Block< Vector<Int> > blockWidth_p;
00092     Block< Vector<Int> > blockIncr_p;
00093     Block< Vector<Int> > blockSpw_p;
00094 };
00095 
00096 
00097 class SelectChannelModifier : public RoviaModifier {
00098 
00099 public:
00100 
00101     SelectChannelModifier (Int nGroup, Int start, Int width, Int increment, Int spectralWindow);
00102     SelectChannelModifier (const Block< Vector<Int> > & blockNGroup,
00103                            const Block< Vector<Int> > & blockStart,
00104                            const Block< Vector<Int> > & blockWidth,
00105                            const Block< Vector<Int> > & blockIncr,
00106                            const Block< Vector<Int> > & blockSpw);
00107 
00108     void apply (ROVisibilityIterator *) const;
00109 
00110 private:
00111 
00112     Bool channelBlocks_p;
00113     ChannelSelection channelSelection_p;
00114     Int increment_p;
00115     Int nGroup_p;
00116     Int spectralWindow_p;
00117     Int start_p;
00118     Int width_p;
00119 
00120     void print (std::ostream & o) const;
00121     String toCsv (const Block< Vector<Int> > & bv) const;
00122     String toCsv (const Vector<Int> & v) const;
00123 
00124 };
00125 
00126 class SetIntervalModifier : public RoviaModifier {
00127 
00128 public:
00129 
00130     SetIntervalModifier  (Double timeInterval);
00131     void apply (ROVisibilityIterator *) const;
00132 
00133 private:
00134 
00135     Double timeInterval_p;
00136 
00137     void print (std::ostream & o) const;
00138 };
00139 
00140 
00141 class SetRowBlockingModifier : public RoviaModifier {
00142 
00143 public:
00144 
00145     SetRowBlockingModifier (Int nRows);
00146     void apply (ROVisibilityIterator *) const;
00147 
00148 private:
00149 
00150     Int nRows_p;
00151     Int nGroup_p;
00152     Int spectralWindow_p;
00153     Int start_p;
00154     Int width_p;
00155 
00156     void print (std::ostream & o) const;
00157 };
00158 
00159 class RoviaModifiers {
00160 
00161 public:
00162 
00163     ~RoviaModifiers ();
00164 
00165     void add (RoviaModifier *);
00166     void apply (ROVisibilityIterator *);
00167     void clearAndFree ();
00168     RoviaModifiers transferModifiers ();
00169 
00170 private:
00171 
00172     typedef std::vector<RoviaModifier *> Data;
00173     Data data_p;
00174 
00175 };
00176 
00177 class SelectVelocityModifier : public RoviaModifier {
00178 
00179 public:
00180 
00181     SelectVelocityModifier (Int nChan, const MVRadialVelocity& vStart, const MVRadialVelocity& vInc,
00182                             MRadialVelocity::Types rvType, MDoppler::Types dType, Bool precise);
00183     void apply (ROVisibilityIterator *) const;
00184 
00185 private:
00186 
00187     MDoppler::Types dType_p;
00188     Int nChan_p;
00189     Bool precise_p;
00190     MRadialVelocity::Types rvType_p;
00191     MVRadialVelocity vInc_p;
00192     MVRadialVelocity vStart_p;
00193 
00194     virtual void print (std::ostream & o) const;
00195 
00196 };
00197 
00198 
00199 // <summary>
00200 //    VlaDatum is a single elemement in the VlaDatum buffer ring used to support the
00201 //    ROVisibilityIteratorAsync.
00202 // </summary>
00203 
00204 // <use visibility=local>
00205 
00206 // <reviewed reviewer="" date="yyyy/mm/dd" tests="" demos="">
00207 // </reviewed>
00208 
00209 // <prerequisite>
00210 //   <li> VisBuffer
00211 //   <li> VisBufferAsync
00212 //   <li> ROVisibilityIteratorAsync
00213 //   <li> VlaData
00214 //   <li> VLAT
00215 // </prerequisite>
00216 //
00217 // <etymology>
00218 //    VlaDatum is the quantum of data associated with a single position of the visibility
00219 //    look-ahead scheme.
00220 // </etymology>
00221 //
00222 // <synopsis>
00223 //    VlaDatum is a single buffer for data produced by the VLAT thread and consumed by the
00224 //    main thread.  A collection of VlaDatum objects is organized as a buffer ring in a
00225 //    VlaData object.
00226 //
00227 //    A VlaDatum object is responsible for maintaining its state as well as containing the set
00228 //    of data accessed from a single position of a ROVisibilityIterator.  It contains a
00229 //    VisibilityBufferAsync object to hold the data that will be used by the main thread; other
00230 //    data is maintained in member variables.
00231 //
00232 //    VlaDatum has no concurrency mechanisms built in it; that is handled by the VlaData object.
00233 //    It does support a set of states that indicate its current use:
00234 //        Empty -> Filling -> Full -> Reading -> Empty.
00235 //    Changing state is accomplished by the methods fillStart, fillComplete, readStart and readComplete.
00236 // </synopsis>
00237 //
00238 // <example>
00239 // </example>
00240 //
00241 // <motivation>
00242 // </motivation>
00243 //
00244 // <thrown>
00245 //    <li>AipsError for unhandleable errors
00246 // </thrown>
00247 //
00248 // <todo asof="yyyy/mm/dd">
00249 // </todo>
00250 
00251 
00252 class VlaDatum {
00253 
00254 public:
00255 
00256     typedef enum {Empty, Filling, Full, Reading} State;
00257 
00258     VlaDatum (SubChunkPair);
00259     ~VlaDatum ();
00260 
00261     SubChunkPair  getSubChunkPair () const;
00262     VisBufferAsync * getVisBuffer ();
00263     //const VisBufferAsync * getVisBuffer () const;
00264     Bool isSubChunk (SubChunkPair) const;
00265 
00266     VisBufferAsync * releaseVisBufferAsync ();
00267     void reset ();
00268 
00269 protected:
00270 
00271 private:
00272 
00273     SubChunkPair     subchunk_p;
00274     VisBufferAsync * visBuffer_p;
00275 
00276     // Illegal operations
00277 
00278     VlaDatum & operator= (const VlaDatum & other);
00279 
00280 };
00281 
00282 class VLAT;
00283 
00284 // <summary>
00285 //    The VlaData class is a buffer ring used to support the communication between
00286 //    the visiblity lookahead thread (VLAT) and the main application thread.  It
00287 //    implements the required concurrency control to support this communication.
00288 // </summary>
00289 
00290 // <use visibility=local>
00291 
00292 // <reviewed reviewer="" date="yyyy/mm/dd" tests="" demos="">
00293 // </reviewed>
00294 
00295 // <prerequisite>
00296 //   <li> VisBuffer
00297 //   <li> VisBufferAsync
00298 //   <li> ROVisibilityIteratorAsync
00299 //   <li> VlaData
00300 //   <li> VLAT
00301 // </prerequisite>
00302 //
00303 // <etymology>
00304 //    VlaData is the entire collection of visibility look-ahead data currently (or potentially)
00305 //    shared between the lookahead and main threads.
00306 // </etymology>
00307 //
00308 // <synopsis>
00309 //    The VlaData object supports the sharing of information between the VLAT look ahead thread and
00310 //    the main thread.  It contains a buffer ring of VlaDatum objects which each hold all of the
00311 //    data that is normally access by a position of a ROVisibiltyIterator.  Other data that is shared
00312 //    or communicated between the two threads is also managed by VlaData.
00313 //
00314 //    A single mutex (member variable mutex_p) is used to protect data that is shared between the
00315 //    two threads.  In addition there is a single PThreads condition variable, vlaDataChanged_p used
00316 //    to allow either thread to wait for the other to change the state of VlaData object.
00317 //
00318 //    Buffer ring concurrency has two levels: the mutex protecting VlaData and the state of the
00319 //    VlaDatum object.  Whenever a free buffer (VlaDatum object) is available the VLAT thread will
00320 //    fill it with the data from the next position of the ROVisibilityIterator; a buffer is free for
00321 //    filling when it is in the Empty state.  Before the VLAT fills a buffer it must call fillStart;
00322 //    this method will block the caller until the next buffer in the ring becomes free; as a side effect
00323 //    the buffer's state becomes Filling.  After fillStart is complete, the VLAT owns the buffer.
00324 //    When the VLAT is done with the buffer it calls fillComplete to relinquish the buffer; this causes
00325 //    the buffer state to change from Filling to Full.
00326 //
00327 //    The main thread calls readStart to get the next filled buffer; the main thread is blocked until
00328 //    the a filled buffer is available.  When the full buffer is ready its state is changed to Reading
00329 //    and readStart returns.  The VLAT thread will not access the buffer while the main thread is reading.
00330 //    The read operation is terminated by calling readComplete; this changes the buffer state to Empty and
00331 //    does not block the main thread except potentially to acquire the mutex.
00332 //
00333 //    The concurrency scheme is fairly sound except for the possibility of low-level data sharing through
00334 //    CASA data structures.  Some of the CASA containers (e.g., Array<T>) can potentially share storage
00335 //    although it appears that normal operation they do not.  Some problems have been encountered with
00336 //    objects that share data via reference-counted pointers.  For instance, objects derived from
00337 //    MeasBase<T,U> (e.g., MDirection, MPosition, MEpoch, etc.) share the object that serves as the
00338 //    frame of reference for the measurement; only by converting the object to text and back again can
00339 //    a user easily obtain a copy which does not share values with another.  It is possible that other
00340 //    objects deep many layers down a complex object may still be waiting to trip up VlaData's
00341 //    concurrency scheme.
00342 //
00343 //    On unusual interaction mediated by VlaData occurs when it is necessary to reset the visibility
00344 //    iterator back to the start of a MeasurementSet.  This usually happens either at the start of the MS
00345 //    sweep (e.g., to reset the row blocking factor of the iterator) or at the end (e.g., to make an
00346 //    additional pass through the MS).  The main thread requests a reset of the VI and then is blocked
00347 //    until the VI is reset.  The sweepTerminationRequested_p variable is set to true; when the VLAT
00348 //    discovers that this variable is true it resets the buffer ring, repositions its VI to the start
00349 //    of the MS and then informs the blocked main thread by setting viResetComplete to true and
00350 //    signalling vlaDataChanged_p.
00351 // </synopsis>
00352 //
00353 // <example>
00354 // </example>
00355 //
00356 // <motivation>
00357 // </motivation>
00358 //
00359 // <thrown>
00360 //    <li> AipsError
00361 // </thrown>
00362 //
00363 // <todo asof="yyyy/mm/dd">
00364 // </todo>
00365 
00366 class AsynchronousInterface;
00367 class InterfaceController;
00368 
00369 class VlaData {
00370 
00371 public:
00372 
00373     VlaData (Int maxNBuffers, async::Mutex & mutex);
00374     ~VlaData ();
00375 
00376     Bool fillCanStart () const;
00377     void fillComplete (VlaDatum * datum);
00378     VlaDatum * fillStart (SubChunkPair, const ThreadTimes & fillStartTime);
00379     asyncio::ChannelSelection getChannelSelection () const;
00380     void initialize (const AsynchronousInterface *);
00381     void insertValidChunk (Int chunkNumber);
00382     void insertValidSubChunk (SubChunkPair);
00383     Bool isValidChunk (Int chunkNumber) const;
00384     Bool isValidSubChunk (SubChunkPair) const;
00385     void readComplete (SubChunkPair);
00386     VisBufferAsync * readStart (SubChunkPair);
00387     void resetBufferData ();
00388     void setNoMoreData ();
00389     void storeChannelSelection (const asyncio::ChannelSelection & channelSelection);
00390 
00391     //static void debugBlock ();
00392     //static void debugUnblock ();
00393     //static Bool logThis (Int level);
00394 
00395     //static Bool loggingInitialized_p;
00396     //static Int logLevel_p;
00397 
00398 protected:
00399 
00400 private:
00401 
00402     typedef std::queue<VlaDatum *> Data;
00403     typedef std::queue<Int> ValidChunks;
00404     typedef std::queue<SubChunkPair> ValidSubChunks;
00405 
00406     class Timing {
00407     public:
00408         ThreadTimes      fill1_p;
00409         ThreadTimes      fill2_p;
00410         ThreadTimes      fill3_p;
00411         DeltaThreadTimes fillCycle_p;
00412         DeltaThreadTimes fillOperate_p;
00413         DeltaThreadTimes fillWait_p;
00414         ThreadTimes      read1_p;
00415         ThreadTimes      read2_p;
00416         ThreadTimes      read3_p;
00417         DeltaThreadTimes readCycle_p;
00418         DeltaThreadTimes readOperate_p;
00419         DeltaThreadTimes readWait_p;
00420         ThreadTimes      timeStart_p;
00421         ThreadTimes      timeStop_p;
00422     };
00423 
00424     asyncio::ChannelSelection     channelSelection_p; // last channels selected for the VI in use
00425     Data                          data_p;             // Buffer queue
00426     const AsynchronousInterface * interface_p;
00427     const Int                     MaxNBuffers_p;
00428     async::Mutex &                mutex_p; // provided by Asynchronous interface
00429     Timing                        timing_p;
00430     mutable ValidChunks           validChunks_p;       // Queue of valid chunk numbers
00431     mutable ValidSubChunks        validSubChunks_p; // Queue of valid subchunk pairs
00432 
00433 
00434     Int clock (Int arg, Int base);
00435     String makeReport ();
00436 
00437     Bool statsEnabled () const;
00438     void terminateSweep ();
00439 
00441 
00442     static Bool initializeLogging ();
00443 
00444     // Illegal operations
00445 
00446     VlaData (const VlaData & other);
00447     VlaData & operator= (const VlaData & other);
00448 };
00449 
00450 class WriteData {
00451 
00452 public:
00453 
00454     WriteData (const SubChunkPair & subchunkPair) : subchunkPair_p (subchunkPair) {}
00455 
00456     virtual ~WriteData () {}
00457 
00458     SubChunkPair getSubChunkPair () const { return subchunkPair_p;}
00459     virtual void write (VisibilityIterator * vi) = 0;
00460 
00461 private:
00462 
00463     SubChunkPair subchunkPair_p;
00464 
00465 };
00466 
00467 template <typename Data>
00468 class WriteDataImpl : public WriteData {
00469 public:
00470 
00471     typedef void (VisibilityIterator::* Setter) (const Data &);
00472 
00473     WriteDataImpl (const SubChunkPair & subchunkPair,
00474                    const Data & data,
00475                    Setter setter)
00476     : WriteData (subchunkPair),
00477       data_p (),
00478       setter_p (setter)
00479     {
00480         data_p.assign (data); // Make a pure copy
00481     }
00482 
00483     void
00484     write (VisibilityIterator * vi)
00485     {
00486         (vi ->* setter_p) (data_p);
00487     }
00488 
00489 private:
00490 
00491     Data   data_p;
00492     Setter setter_p;
00493 
00494 };
00495 
00496 template <typename Data>
00497 WriteData *
00498 createWriteData (const SubChunkPair & subchunkPair,
00499                  const Data & data,
00500                  void (VisibilityIterator::* setter) (const Data &))
00501 {
00502     return new WriteDataImpl<Data> (subchunkPair, data, setter);
00503 }
00504 
00505 template <typename Data>
00506 class WriteDataImpl2 : public WriteData {
00507 public:
00508 
00509     typedef ROVisibilityIterator::DataColumn DataColumn;
00510     typedef void (VisibilityIterator::* Setter) (const Data &, DataColumn);
00511 
00512     WriteDataImpl2 (const SubChunkPair & subchunkPair,
00513                     const Data & data,
00514                     DataColumn dataColumn,
00515                     Setter setter)
00516     : WriteData (subchunkPair),
00517       data_p (),
00518       dataColumn_p (dataColumn),
00519       setter_p (setter)
00520     {
00521         data_p.assign (data); // Make a pure copy
00522     }
00523 
00524     void
00525     write (VisibilityIterator * vi)
00526     {
00527         (vi ->* setter_p) (data_p, dataColumn_p);
00528     }
00529 
00530 private:
00531 
00532     Data       data_p;
00533     DataColumn dataColumn_p;
00534     Setter     setter_p;
00535 };
00536 
00537 template <typename Data>
00538 WriteData *
00539 createWriteData (const SubChunkPair & subchunkPair,
00540                  const Data & data,
00541                  ROVisibilityIterator::DataColumn dataColumn,
00542                  void (VisibilityIterator::* setter) (const Data &, ROVisibilityIterator::DataColumn))
00543 {
00544     return new WriteDataImpl2 <Data> (subchunkPair, data, dataColumn, setter);
00545 }
00546 
00547 class AsynchronousInterface;
00548 
00549 class WriteQueue {
00550 
00551 public:
00552 
00553     WriteQueue ();
00554     ~WriteQueue ();
00555 
00556     WriteData * dequeue ();
00557     Bool empty (Bool alreadyLocked = False);
00558     void enqueue (WriteData * writeData);
00559 
00560     void initialize (const AsynchronousInterface *);
00561 
00562     void write (VisibilityIterator * vi);
00563 
00564 private:
00565 
00566     const AsynchronousInterface * interface_p;
00567     async::Mutex mutex_p;
00568     std::queue<WriteData *> queue_p;
00569 };
00570 
00571 
00572 class AsynchronousInterface : private boost::noncopyable {
00573 
00574     //friend class InterfaceController;
00575 
00576 public:
00577 
00578     AsynchronousInterface (int maxNBuffers);
00579     ~AsynchronousInterface ();
00580 
00581     void addModifier (asyncio::RoviaModifier * modifier);
00582     async::Mutex & getMutex () const;
00583     //async::LockGuard getLockGuard () const;
00584     VlaData * getVlaData ();
00585     VLAT * getVlat ();
00586     WriteQueue & getWriteQueue ();
00587     void initialize ();
00588     Bool isSweepTerminationRequested () const;
00589     Bool isLookaheadTerminationRequested () const;
00590     void notifyAllInterfaceChanged () const;
00591     void requestViReset ();
00592     pair<Bool, RoviaModifiers> resetVi ();
00593     void terminate ();
00594     void terminateLookahead ();
00595     void terminateSweep ();
00596     RoviaModifiers transferRoviaModifiers ();
00597     void viResetComplete ();
00598     Bool viResetRequested ();
00599     void waitForInterfaceChange (async::UniqueLock & uniqueLock) const;
00600 
00601     static Bool initializeLogging ();
00602     static Bool logThis (Int level);
00603 
00604 private:
00605 
00606     mutable async::Condition  interfaceDataChanged_p; // Signals interface data has changed
00607                                                       // o VisBuffer consumed
00608                                                       // o Write data queued
00609                                                       // o Sweep or thread termination requested
00610     volatile Bool             lookaheadTerminationRequested_p; // True to request thread termination
00611     mutable async::Mutex      mutex_p;                // Mutex protecting access to concurrent data
00612     asyncio::RoviaModifiers   roviaModifiers_p;
00613     volatile Bool             sweepTerminationRequested_p;     // True to request sweep termination
00614                                                                // (e.g., prior to rewinding
00615     volatile Bool             viResetComplete_p; // VI reset process has completed
00616     volatile Bool             viResetRequested_p; // VI reset has been requested
00617     VlaData                   vlaData_p;          // Lookahead data
00618     VLAT *                    vlat_p;             // Lookahead thread
00619     WriteQueue                writeQueue_p;       // Data to be written (writable VIs only)
00620 
00621     static Bool loggingInitialized_p;
00622     static Int logLevel_p;
00623 };
00624 
00625 } // end namespace asyncio
00626 
00627 } // end namespace casa
00628 
00629 #endif /* ASYNCHRONOUS_INTERFACE_H_ */