casa
$Rev:20696$
|
00001 /* 00002 * VlaData.h 00003 * 00004 * Created on: Sep 21, 2011 00005 * Author: jjacobs 00006 */ 00007 00008 #ifndef ASYNCHRONOUS_INTERFACE_H_ 00009 #define ASYNCHRONOUS_INTERFACE_H_ 00010 00011 #include "AsynchronousTools.h" 00012 #include "UtilJ.h" 00013 00014 using casa::utilj::ThreadTimes; 00015 using casa::utilj::DeltaThreadTimes; 00016 00017 #include <casa/Arrays/Cube.h> 00018 #include <casa/Arrays/Matrix.h> 00019 #include <casa/Arrays/Vector.h> 00020 #include <casa/Containers/Block.h> 00021 #include <casa/Quanta/MVRadialVelocity.h> 00022 #include <measures/Measures/MRadialVelocity.h> 00023 #include <measures/Measures/MDoppler.h> 00024 #include <synthesis/MSVis/VisBufferAsync.h> 00025 #include <synthesis/MSVis/VisibilityIterator.h> 00026 #include <synthesis/MSVis/VisibilityIteratorImpl.h> 00027 00028 #include <boost/noncopyable.hpp> 00031 #include <boost/thread/condition_variable.hpp> 00032 #include <boost/thread/locks.hpp> 00033 #include <boost/thread/recursive_mutex.hpp> 00035 #include <memory> 00036 #include <queue> 00037 #include <vector> 00038 00039 namespace casa { 00040 00041 class ROVisibilityIterator; 00042 00043 namespace asyncio { 00044 00045 class RoviaModifier { 00046 public: 00047 00048 friend std::ostream & operator<< (std::ostream & o, const RoviaModifier & m); 00049 00050 virtual ~RoviaModifier () {} 00051 virtual void apply (ROVisibilityIterator *) const = 0; 00052 00053 private: 00054 00055 virtual void print (std::ostream & o) const = 0; 00056 00057 }; 00058 00059 class ChannelSelection { 00060 00061 public: 00062 00063 ChannelSelection () {} 00064 00065 ChannelSelection (const Block< Vector<Int> > & blockNGroup, 00066 const Block< Vector<Int> > & blockStart, 00067 const Block< Vector<Int> > & blockWidth, 00068 const Block< Vector<Int> > & blockIncr, 00069 const Block< Vector<Int> > & blockSpw); 00070 00071 ChannelSelection (const ChannelSelection & other); 00072 ChannelSelection & operator= (const ChannelSelection & other); 00073 00074 00075 void 00076 get (Block< Vector<Int> > & blockNGroup, 00077 Block< Vector<Int> > & blockStart, 00078 Block< Vector<Int> > & blockWidth, 00079 Block< Vector<Int> > & blockIncr, 00080 Block< Vector<Int> > & blockSpw) const; 00081 00082 protected: 00083 00084 void copyBlock (const Block <Vector<Int> > & src, 00085 Block <Vector<Int> > & to) const; 00086 00087 private: 00088 00089 Block< Vector<Int> > blockNGroup_p; 00090 Block< Vector<Int> > blockStart_p; 00091 Block< Vector<Int> > blockWidth_p; 00092 Block< Vector<Int> > blockIncr_p; 00093 Block< Vector<Int> > blockSpw_p; 00094 }; 00095 00096 00097 class SelectChannelModifier : public RoviaModifier { 00098 00099 public: 00100 00101 SelectChannelModifier (Int nGroup, Int start, Int width, Int increment, Int spectralWindow); 00102 SelectChannelModifier (const Block< Vector<Int> > & blockNGroup, 00103 const Block< Vector<Int> > & blockStart, 00104 const Block< Vector<Int> > & blockWidth, 00105 const Block< Vector<Int> > & blockIncr, 00106 const Block< Vector<Int> > & blockSpw); 00107 00108 void apply (ROVisibilityIterator *) const; 00109 00110 private: 00111 00112 Bool channelBlocks_p; 00113 ChannelSelection channelSelection_p; 00114 Int increment_p; 00115 Int nGroup_p; 00116 Int spectralWindow_p; 00117 Int start_p; 00118 Int width_p; 00119 00120 void print (std::ostream & o) const; 00121 String toCsv (const Block< Vector<Int> > & bv) const; 00122 String toCsv (const Vector<Int> & v) const; 00123 00124 }; 00125 00126 class SetIntervalModifier : public RoviaModifier { 00127 00128 public: 00129 00130 SetIntervalModifier (Double timeInterval); 00131 void apply (ROVisibilityIterator *) const; 00132 00133 private: 00134 00135 Double timeInterval_p; 00136 00137 void print (std::ostream & o) const; 00138 }; 00139 00140 00141 class SetRowBlockingModifier : public RoviaModifier { 00142 00143 public: 00144 00145 SetRowBlockingModifier (Int nRows); 00146 void apply (ROVisibilityIterator *) const; 00147 00148 private: 00149 00150 Int nRows_p; 00151 Int nGroup_p; 00152 Int spectralWindow_p; 00153 Int start_p; 00154 Int width_p; 00155 00156 void print (std::ostream & o) const; 00157 }; 00158 00159 class RoviaModifiers { 00160 00161 public: 00162 00163 ~RoviaModifiers (); 00164 00165 void add (RoviaModifier *); 00166 void apply (ROVisibilityIterator *); 00167 void clearAndFree (); 00168 RoviaModifiers transferModifiers (); 00169 00170 private: 00171 00172 typedef std::vector<RoviaModifier *> Data; 00173 Data data_p; 00174 00175 }; 00176 00177 class SelectVelocityModifier : public RoviaModifier { 00178 00179 public: 00180 00181 SelectVelocityModifier (Int nChan, const MVRadialVelocity& vStart, const MVRadialVelocity& vInc, 00182 MRadialVelocity::Types rvType, MDoppler::Types dType, Bool precise); 00183 void apply (ROVisibilityIterator *) const; 00184 00185 private: 00186 00187 MDoppler::Types dType_p; 00188 Int nChan_p; 00189 Bool precise_p; 00190 MRadialVelocity::Types rvType_p; 00191 MVRadialVelocity vInc_p; 00192 MVRadialVelocity vStart_p; 00193 00194 virtual void print (std::ostream & o) const; 00195 00196 }; 00197 00198 00199 // <summary> 00200 // VlaDatum is a single elemement in the VlaDatum buffer ring used to support the 00201 // ROVisibilityIteratorAsync. 00202 // </summary> 00203 00204 // <use visibility=local> 00205 00206 // <reviewed reviewer="" date="yyyy/mm/dd" tests="" demos=""> 00207 // </reviewed> 00208 00209 // <prerequisite> 00210 // <li> VisBuffer 00211 // <li> VisBufferAsync 00212 // <li> ROVisibilityIteratorAsync 00213 // <li> VlaData 00214 // <li> VLAT 00215 // </prerequisite> 00216 // 00217 // <etymology> 00218 // VlaDatum is the quantum of data associated with a single position of the visibility 00219 // look-ahead scheme. 00220 // </etymology> 00221 // 00222 // <synopsis> 00223 // VlaDatum is a single buffer for data produced by the VLAT thread and consumed by the 00224 // main thread. A collection of VlaDatum objects is organized as a buffer ring in a 00225 // VlaData object. 00226 // 00227 // A VlaDatum object is responsible for maintaining its state as well as containing the set 00228 // of data accessed from a single position of a ROVisibilityIterator. It contains a 00229 // VisibilityBufferAsync object to hold the data that will be used by the main thread; other 00230 // data is maintained in member variables. 00231 // 00232 // VlaDatum has no concurrency mechanisms built in it; that is handled by the VlaData object. 00233 // It does support a set of states that indicate its current use: 00234 // Empty -> Filling -> Full -> Reading -> Empty. 00235 // Changing state is accomplished by the methods fillStart, fillComplete, readStart and readComplete. 00236 // </synopsis> 00237 // 00238 // <example> 00239 // </example> 00240 // 00241 // <motivation> 00242 // </motivation> 00243 // 00244 // <thrown> 00245 // <li>AipsError for unhandleable errors 00246 // </thrown> 00247 // 00248 // <todo asof="yyyy/mm/dd"> 00249 // </todo> 00250 00251 00252 class VlaDatum { 00253 00254 public: 00255 00256 typedef enum {Empty, Filling, Full, Reading} State; 00257 00258 VlaDatum (SubChunkPair); 00259 ~VlaDatum (); 00260 00261 SubChunkPair getSubChunkPair () const; 00262 VisBufferAsync * getVisBuffer (); 00263 //const VisBufferAsync * getVisBuffer () const; 00264 Bool isSubChunk (SubChunkPair) const; 00265 00266 VisBufferAsync * releaseVisBufferAsync (); 00267 void reset (); 00268 00269 protected: 00270 00271 private: 00272 00273 SubChunkPair subchunk_p; 00274 VisBufferAsync * visBuffer_p; 00275 00276 // Illegal operations 00277 00278 VlaDatum & operator= (const VlaDatum & other); 00279 00280 }; 00281 00282 class VLAT; 00283 00284 // <summary> 00285 // The VlaData class is a buffer ring used to support the communication between 00286 // the visiblity lookahead thread (VLAT) and the main application thread. It 00287 // implements the required concurrency control to support this communication. 00288 // </summary> 00289 00290 // <use visibility=local> 00291 00292 // <reviewed reviewer="" date="yyyy/mm/dd" tests="" demos=""> 00293 // </reviewed> 00294 00295 // <prerequisite> 00296 // <li> VisBuffer 00297 // <li> VisBufferAsync 00298 // <li> ROVisibilityIteratorAsync 00299 // <li> VlaData 00300 // <li> VLAT 00301 // </prerequisite> 00302 // 00303 // <etymology> 00304 // VlaData is the entire collection of visibility look-ahead data currently (or potentially) 00305 // shared between the lookahead and main threads. 00306 // </etymology> 00307 // 00308 // <synopsis> 00309 // The VlaData object supports the sharing of information between the VLAT look ahead thread and 00310 // the main thread. It contains a buffer ring of VlaDatum objects which each hold all of the 00311 // data that is normally access by a position of a ROVisibiltyIterator. Other data that is shared 00312 // or communicated between the two threads is also managed by VlaData. 00313 // 00314 // A single mutex (member variable mutex_p) is used to protect data that is shared between the 00315 // two threads. In addition there is a single PThreads condition variable, vlaDataChanged_p used 00316 // to allow either thread to wait for the other to change the state of VlaData object. 00317 // 00318 // Buffer ring concurrency has two levels: the mutex protecting VlaData and the state of the 00319 // VlaDatum object. Whenever a free buffer (VlaDatum object) is available the VLAT thread will 00320 // fill it with the data from the next position of the ROVisibilityIterator; a buffer is free for 00321 // filling when it is in the Empty state. Before the VLAT fills a buffer it must call fillStart; 00322 // this method will block the caller until the next buffer in the ring becomes free; as a side effect 00323 // the buffer's state becomes Filling. After fillStart is complete, the VLAT owns the buffer. 00324 // When the VLAT is done with the buffer it calls fillComplete to relinquish the buffer; this causes 00325 // the buffer state to change from Filling to Full. 00326 // 00327 // The main thread calls readStart to get the next filled buffer; the main thread is blocked until 00328 // the a filled buffer is available. When the full buffer is ready its state is changed to Reading 00329 // and readStart returns. The VLAT thread will not access the buffer while the main thread is reading. 00330 // The read operation is terminated by calling readComplete; this changes the buffer state to Empty and 00331 // does not block the main thread except potentially to acquire the mutex. 00332 // 00333 // The concurrency scheme is fairly sound except for the possibility of low-level data sharing through 00334 // CASA data structures. Some of the CASA containers (e.g., Array<T>) can potentially share storage 00335 // although it appears that normal operation they do not. Some problems have been encountered with 00336 // objects that share data via reference-counted pointers. For instance, objects derived from 00337 // MeasBase<T,U> (e.g., MDirection, MPosition, MEpoch, etc.) share the object that serves as the 00338 // frame of reference for the measurement; only by converting the object to text and back again can 00339 // a user easily obtain a copy which does not share values with another. It is possible that other 00340 // objects deep many layers down a complex object may still be waiting to trip up VlaData's 00341 // concurrency scheme. 00342 // 00343 // On unusual interaction mediated by VlaData occurs when it is necessary to reset the visibility 00344 // iterator back to the start of a MeasurementSet. This usually happens either at the start of the MS 00345 // sweep (e.g., to reset the row blocking factor of the iterator) or at the end (e.g., to make an 00346 // additional pass through the MS). The main thread requests a reset of the VI and then is blocked 00347 // until the VI is reset. The sweepTerminationRequested_p variable is set to true; when the VLAT 00348 // discovers that this variable is true it resets the buffer ring, repositions its VI to the start 00349 // of the MS and then informs the blocked main thread by setting viResetComplete to true and 00350 // signalling vlaDataChanged_p. 00351 // </synopsis> 00352 // 00353 // <example> 00354 // </example> 00355 // 00356 // <motivation> 00357 // </motivation> 00358 // 00359 // <thrown> 00360 // <li> AipsError 00361 // </thrown> 00362 // 00363 // <todo asof="yyyy/mm/dd"> 00364 // </todo> 00365 00366 class AsynchronousInterface; 00367 class InterfaceController; 00368 00369 class VlaData { 00370 00371 public: 00372 00373 VlaData (Int maxNBuffers, async::Mutex & mutex); 00374 ~VlaData (); 00375 00376 Bool fillCanStart () const; 00377 void fillComplete (VlaDatum * datum); 00378 VlaDatum * fillStart (SubChunkPair, const ThreadTimes & fillStartTime); 00379 asyncio::ChannelSelection getChannelSelection () const; 00380 void initialize (const AsynchronousInterface *); 00381 void insertValidChunk (Int chunkNumber); 00382 void insertValidSubChunk (SubChunkPair); 00383 Bool isValidChunk (Int chunkNumber) const; 00384 Bool isValidSubChunk (SubChunkPair) const; 00385 void readComplete (SubChunkPair); 00386 VisBufferAsync * readStart (SubChunkPair); 00387 void resetBufferData (); 00388 void setNoMoreData (); 00389 void storeChannelSelection (const asyncio::ChannelSelection & channelSelection); 00390 00391 //static void debugBlock (); 00392 //static void debugUnblock (); 00393 //static Bool logThis (Int level); 00394 00395 //static Bool loggingInitialized_p; 00396 //static Int logLevel_p; 00397 00398 protected: 00399 00400 private: 00401 00402 typedef std::queue<VlaDatum *> Data; 00403 typedef std::queue<Int> ValidChunks; 00404 typedef std::queue<SubChunkPair> ValidSubChunks; 00405 00406 class Timing { 00407 public: 00408 ThreadTimes fill1_p; 00409 ThreadTimes fill2_p; 00410 ThreadTimes fill3_p; 00411 DeltaThreadTimes fillCycle_p; 00412 DeltaThreadTimes fillOperate_p; 00413 DeltaThreadTimes fillWait_p; 00414 ThreadTimes read1_p; 00415 ThreadTimes read2_p; 00416 ThreadTimes read3_p; 00417 DeltaThreadTimes readCycle_p; 00418 DeltaThreadTimes readOperate_p; 00419 DeltaThreadTimes readWait_p; 00420 ThreadTimes timeStart_p; 00421 ThreadTimes timeStop_p; 00422 }; 00423 00424 asyncio::ChannelSelection channelSelection_p; // last channels selected for the VI in use 00425 Data data_p; // Buffer queue 00426 const AsynchronousInterface * interface_p; 00427 const Int MaxNBuffers_p; 00428 async::Mutex & mutex_p; // provided by Asynchronous interface 00429 Timing timing_p; 00430 mutable ValidChunks validChunks_p; // Queue of valid chunk numbers 00431 mutable ValidSubChunks validSubChunks_p; // Queue of valid subchunk pairs 00432 00433 00434 Int clock (Int arg, Int base); 00435 String makeReport (); 00436 00437 Bool statsEnabled () const; 00438 void terminateSweep (); 00439 00441 00442 static Bool initializeLogging (); 00443 00444 // Illegal operations 00445 00446 VlaData (const VlaData & other); 00447 VlaData & operator= (const VlaData & other); 00448 }; 00449 00450 class WriteData { 00451 00452 public: 00453 00454 WriteData (const SubChunkPair & subchunkPair) : subchunkPair_p (subchunkPair) {} 00455 00456 virtual ~WriteData () {} 00457 00458 SubChunkPair getSubChunkPair () const { return subchunkPair_p;} 00459 virtual void write (VisibilityIterator * vi) = 0; 00460 00461 private: 00462 00463 SubChunkPair subchunkPair_p; 00464 00465 }; 00466 00467 template <typename Data> 00468 class WriteDataImpl : public WriteData { 00469 public: 00470 00471 typedef void (VisibilityIterator::* Setter) (const Data &); 00472 00473 WriteDataImpl (const SubChunkPair & subchunkPair, 00474 const Data & data, 00475 Setter setter) 00476 : WriteData (subchunkPair), 00477 data_p (), 00478 setter_p (setter) 00479 { 00480 data_p.assign (data); // Make a pure copy 00481 } 00482 00483 void 00484 write (VisibilityIterator * vi) 00485 { 00486 (vi ->* setter_p) (data_p); 00487 } 00488 00489 private: 00490 00491 Data data_p; 00492 Setter setter_p; 00493 00494 }; 00495 00496 template <typename Data> 00497 WriteData * 00498 createWriteData (const SubChunkPair & subchunkPair, 00499 const Data & data, 00500 void (VisibilityIterator::* setter) (const Data &)) 00501 { 00502 return new WriteDataImpl<Data> (subchunkPair, data, setter); 00503 } 00504 00505 template <typename Data> 00506 class WriteDataImpl2 : public WriteData { 00507 public: 00508 00509 typedef ROVisibilityIterator::DataColumn DataColumn; 00510 typedef void (VisibilityIterator::* Setter) (const Data &, DataColumn); 00511 00512 WriteDataImpl2 (const SubChunkPair & subchunkPair, 00513 const Data & data, 00514 DataColumn dataColumn, 00515 Setter setter) 00516 : WriteData (subchunkPair), 00517 data_p (), 00518 dataColumn_p (dataColumn), 00519 setter_p (setter) 00520 { 00521 data_p.assign (data); // Make a pure copy 00522 } 00523 00524 void 00525 write (VisibilityIterator * vi) 00526 { 00527 (vi ->* setter_p) (data_p, dataColumn_p); 00528 } 00529 00530 private: 00531 00532 Data data_p; 00533 DataColumn dataColumn_p; 00534 Setter setter_p; 00535 }; 00536 00537 template <typename Data> 00538 WriteData * 00539 createWriteData (const SubChunkPair & subchunkPair, 00540 const Data & data, 00541 ROVisibilityIterator::DataColumn dataColumn, 00542 void (VisibilityIterator::* setter) (const Data &, ROVisibilityIterator::DataColumn)) 00543 { 00544 return new WriteDataImpl2 <Data> (subchunkPair, data, dataColumn, setter); 00545 } 00546 00547 class AsynchronousInterface; 00548 00549 class WriteQueue { 00550 00551 public: 00552 00553 WriteQueue (); 00554 ~WriteQueue (); 00555 00556 WriteData * dequeue (); 00557 Bool empty (Bool alreadyLocked = False); 00558 void enqueue (WriteData * writeData); 00559 00560 void initialize (const AsynchronousInterface *); 00561 00562 void write (VisibilityIterator * vi); 00563 00564 private: 00565 00566 const AsynchronousInterface * interface_p; 00567 async::Mutex mutex_p; 00568 std::queue<WriteData *> queue_p; 00569 }; 00570 00571 00572 class AsynchronousInterface : private boost::noncopyable { 00573 00574 //friend class InterfaceController; 00575 00576 public: 00577 00578 AsynchronousInterface (int maxNBuffers); 00579 ~AsynchronousInterface (); 00580 00581 void addModifier (asyncio::RoviaModifier * modifier); 00582 async::Mutex & getMutex () const; 00583 //async::LockGuard getLockGuard () const; 00584 VlaData * getVlaData (); 00585 VLAT * getVlat (); 00586 WriteQueue & getWriteQueue (); 00587 void initialize (); 00588 Bool isSweepTerminationRequested () const; 00589 Bool isLookaheadTerminationRequested () const; 00590 void notifyAllInterfaceChanged () const; 00591 void requestViReset (); 00592 pair<Bool, RoviaModifiers> resetVi (); 00593 void terminate (); 00594 void terminateLookahead (); 00595 void terminateSweep (); 00596 RoviaModifiers transferRoviaModifiers (); 00597 void viResetComplete (); 00598 Bool viResetRequested (); 00599 void waitForInterfaceChange (async::UniqueLock & uniqueLock) const; 00600 00601 static Bool initializeLogging (); 00602 static Bool logThis (Int level); 00603 00604 private: 00605 00606 mutable async::Condition interfaceDataChanged_p; // Signals interface data has changed 00607 // o VisBuffer consumed 00608 // o Write data queued 00609 // o Sweep or thread termination requested 00610 volatile Bool lookaheadTerminationRequested_p; // True to request thread termination 00611 mutable async::Mutex mutex_p; // Mutex protecting access to concurrent data 00612 asyncio::RoviaModifiers roviaModifiers_p; 00613 volatile Bool sweepTerminationRequested_p; // True to request sweep termination 00614 // (e.g., prior to rewinding 00615 volatile Bool viResetComplete_p; // VI reset process has completed 00616 volatile Bool viResetRequested_p; // VI reset has been requested 00617 VlaData vlaData_p; // Lookahead data 00618 VLAT * vlat_p; // Lookahead thread 00619 WriteQueue writeQueue_p; // Data to be written (writable VIs only) 00620 00621 static Bool loggingInitialized_p; 00622 static Int logLevel_p; 00623 }; 00624 00625 } // end namespace asyncio 00626 00627 } // end namespace casa 00628 00629 #endif /* ASYNCHRONOUS_INTERFACE_H_ */