casa  5.7.0-16
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
AsynchronousInterface2.h
Go to the documentation of this file.
1 /*
2  * VlaData.h
3  *
4  * Created on: Sep 21, 2011
5  * Author: jjacobs
6  */
7 
8 #ifndef ASYNCHRONOUS_INTERFACE2_H_
9 #define ASYNCHRONOUS_INTERFACE2_H_
10 
11 #include "AsynchronousTools.h"
12 #include "UtilJ.h"
13 
16 
17 #include <casa/Arrays/Cube.h>
18 #include <casa/Arrays/Matrix.h>
19 #include <casa/Arrays/Vector.h>
20 #include <casa/Containers/Block.h>
27 
29 #include <memory>
30 #include <queue>
31 #include <vector>
32 #include <sstream>
33 
34 namespace casa {
35 
36 namespace vi {
37 
38 class VisibilityIterator2;
39 
41 public:
42 
43  virtual ~RoviaModifier () {}
44  virtual void apply (VisibilityIterator2 *) const = 0;
45  inline operator std::string( ) const {
46  std::stringstream ss;
47  print(ss);
48  return ss.str( );
49  }
50 
51 private:
52 
53  virtual void print (std::ostream & o) const = 0;
54 
55 };
56 
58 
59 public:
60 
62 
68 
69  ChannelSelection (const ChannelSelection & other);
71 
72 
73  void
79 
80 protected:
81 
84 
85 private:
86 
92 };
93 
94 
96 
97 public:
98 
99  SelectChannelModifier (casacore::Int nGroup, casacore::Int start, casacore::Int width, casacore::Int increment, casacore::Int spectralWindow);
101  const casacore::Block< casacore::Vector<casacore::Int> > & blockStart,
102  const casacore::Block< casacore::Vector<casacore::Int> > & blockWidth,
105 
106  void apply (VisibilityIterator2 *) const;
107 
108 private:
109 
117 
118  void print (std::ostream & o) const;
121 
122 };
123 
125 
126 public:
127 
128  SetIntervalModifier (casacore::Double timeInterval);
129  void apply (VisibilityIterator2 *) const;
130 
131 private:
132 
134 
135  void print (std::ostream & o) const;
136 };
137 
138 
140 
141 public:
142 
144  void apply (VisibilityIterator2 *) const;
145 
146 private:
147 
153 
154  void print (std::ostream & o) const;
155 };
156 
158 
159 public:
160 
161  ~RoviaModifiers ();
162 
163  void add (RoviaModifier *);
164  void apply (VisibilityIterator2 *);
165  void clearAndFree ();
167 
168 private:
169 
170  typedef std::vector<RoviaModifier *> casacore::Data;
171  casacore::Data data_p;
172 
173 };
174 
176 
177 public:
178 
181  void apply (VisibilityIterator2 *) const;
182 
183 private:
184 
191 
192  virtual void print (std::ostream & o) const;
193 
194 };
195 
196 
197 // <summary>
198 // VlaDatum is a single elemement in the VlaDatum buffer ring used to support the
199 // VisibilityIterator2Async.
200 // </summary>
201 
202 // <use visibility=local>
203 
204 // <reviewed reviewer="" date="yyyy/mm/dd" tests="" demos="">
205 // </reviewed>
206 
207 // <prerequisite>
208 // <li> VisBuffer
209 // <li> VisBufferAsync2
210 // <li> VisibilityIterator2Async
211 // <li> VlaData
212 // <li> VLAT
213 // </prerequisite>
214 //
215 // <etymology>
216 // VlaDatum is the quantum of data associated with a single position of the visibility
217 // look-ahead scheme.
218 // </etymology>
219 //
220 // <synopsis>
221 // VlaDatum is a single buffer for data produced by the VLAT thread and consumed by the
222 // main thread. A collection of VlaDatum objects is organized as a buffer ring in a
223 // VlaData object.
224 //
225 // A VlaDatum object is responsible for maintaining its state as well as containing the set
226 // of data accessed from a single position of a VisibilityIterator2. It contains a
227 // VisibilityBufferAsync object to hold the data that will be used by the main thread; other
228 // data is maintained in member variables.
229 //
230 // VlaDatum has no concurrency mechanisms built in it; that is handled by the VlaData object.
231 // It does support a set of states that indicate its current use:
232 // Empty -> Filling -> Full -> Reading -> Empty.
233 // Changing state is accomplished by the methods fillStart, fillComplete, readStart and readComplete.
234 // </synopsis>
235 //
236 // <example>
237 // </example>
238 //
239 // <motivation>
240 // </motivation>
241 //
242 // <thrown>
243 // <li>casacore::AipsError for unhandleable errors
244 // </thrown>
245 //
246 // <todo asof="yyyy/mm/dd">
247 // </todo>
248 
249 
250 class VlaDatum {
251 
252 public:
253 
254  typedef enum {Empty, Filling, Full, Reading} State;
255 
256  VlaDatum (Subchunk);
257  ~VlaDatum ();
258 
259  Subchunk getSubchunk () const;
261  //const VisBufferAsync2 * getVisBuffer () const;
262  casacore::Bool isSubchunk (Subchunk) const;
263 
265  void reset ();
266 
267 protected:
268 
269 private:
270 
271  Subchunk subchunk_p;
273 
274  // Illegal operations
275 
276  VlaDatum & operator= (const VlaDatum & other);
277 
278 };
279 
280 class VLAT;
281 
282 // <summary>
283 // The VlaData class is a buffer ring used to support the communication between
284 // the visiblity lookahead thread (VLAT) and the main application thread. It
285 // implements the required concurrency control to support this communication.
286 // </summary>
287 
288 // <use visibility=local>
289 
290 // <reviewed reviewer="" date="yyyy/mm/dd" tests="" demos="">
291 // </reviewed>
292 
293 // <prerequisite>
294 // <li> VisBuffer
295 // <li> VisBufferImplAsync2
296 // <li> VisibilityIterator2Async
297 // <li> VlaData
298 // <li> VLAT
299 // </prerequisite>
300 //
301 // <etymology>
302 // VlaData is the entire collection of visibility look-ahead data currently (or potentially)
303 // shared between the lookahead and main threads.
304 // </etymology>
305 //
306 // <synopsis>
307 // The VlaData object supports the sharing of information between the VLAT look ahead thread and
308 // the main thread. It contains a buffer ring of VlaDatum objects which each hold all of the
309 // data that is normally access by a position of a VisibiltyIterator. Other data that is shared
310 // or communicated between the two threads is also managed by VlaData.
311 //
312 // A single mutex (member variable mutex_p) is used to protect data that is shared between the
313 // two threads. In addition there is a single PThreads condition variable, vlaDataChanged_p used
314 // to allow either thread to wait for the other to change the state of VlaData object.
315 //
316 // Buffer ring concurrency has two levels: the mutex protecting VlaData and the state of the
317 // VlaDatum object. Whenever a free buffer (VlaDatum object) is available the VLAT thread will
318 // fill it with the data from the next position of the VisibilityIterator2; a buffer is free for
319 // filling when it is in the Empty state. Before the VLAT fills a buffer it must call fillStart;
320 // this method will block the caller until the next buffer in the ring becomes free; as a side effect
321 // the buffer's state becomes Filling. After fillStart is complete, the VLAT owns the buffer.
322 // When the VLAT is done with the buffer it calls fillComplete to relinquish the buffer; this causes
323 // the buffer state to change from Filling to Full.
324 //
325 // The main thread calls readStart to get the next filled buffer; the main thread is blocked until
326 // the a filled buffer is available. When the full buffer is ready its state is changed to Reading
327 // and readStart returns. The VLAT thread will not access the buffer while the main thread is reading.
328 // The read operation is terminated by calling readComplete; this changes the buffer state to Empty and
329 // does not block the main thread except potentially to acquire the mutex.
330 //
331 // The concurrency scheme is fairly sound except for the possibility of low-level data sharing through
332 // CASA data structures. Some of the CASA containers (e.g., casacore::Array<T>) can potentially share storage
333 // although it appears that normal operation they do not. Some problems have been encountered with
334 // objects that share data via reference-counted pointers. For instance, objects derived from
335 // casacore::MeasBase<T,U> (e.g., casacore::MDirection, casacore::MPosition, casacore::MEpoch, etc.) share the object that serves as the
336 // frame of reference for the measurement; only by converting the object to text and back again can
337 // a user easily obtain a copy which does not share values with another. It is possible that other
338 // objects deep many layers down a complex object may still be waiting to trip up VlaData's
339 // concurrency scheme.
340 //
341 // On unusual interaction mediated by VlaData occurs when it is necessary to reset the visibility
342 // iterator back to the start of a MeasurementSet. This usually happens either at the start of the MS
343 // sweep (e.g., to reset the row blocking factor of the iterator) or at the end (e.g., to make an
344 // additional pass through the casacore::MS). The main thread requests a reset of the VI and then is blocked
345 // until the VI is reset. The sweepTerminationRequested_p variable is set to true; when the VLAT
346 // discovers that this variable is true it resets the buffer ring, repositions its VI to the start
347 // of the casacore::MS and then informs the blocked main thread by setting viResetComplete to true and
348 // signalling vlaDataChanged_p.
349 // </synopsis>
350 //
351 // <example>
352 // </example>
353 //
354 // <motivation>
355 // </motivation>
356 //
357 // <thrown>
358 // <li> AipsError
359 // </thrown>
360 //
361 // <todo asof="yyyy/mm/dd">
362 // </todo>
363 
365 class InterfaceController;
366 
367 class VlaData {
368 
369 public:
370 
371  VlaData (casacore::Int maxNBuffers, async::Mutex & mutex);
372  ~VlaData ();
373 
374  casacore::Bool fillCanStart () const;
375  void fillComplete (VlaDatum * datum);
376  VlaDatum * fillStart (Subchunk, const ThreadTimes & fillStartTime);
378  void initialize (const AsynchronousInterface *);
379  void insertValidChunk (casacore::Int chunkNumber);
380  void insertValidSubChunk (Subchunk);
381  casacore::Bool isValidChunk (casacore::Int chunkNumber) const;
382  casacore::Bool isValidSubChunk (Subchunk) const;
383  void readComplete (Subchunk);
384  VisBufferImplAsync2 * readStart (Subchunk);
385  void resetBufferData ();
386  void setNoMoreData ();
387  void storeChannelSelection (const ChannelSelection & channelSelection);
388 
389  //static void debugBlock ();
390  //static void debugUnblock ();
391  //static casacore::Bool logThis (casacore::Int level);
392 
393  //static casacore::Bool loggingInitialized_p;
394  //static casacore::Int logLevel_p;
395 
396 protected:
397 
398 private:
399 
400  typedef std::queue<VlaDatum *> casacore::Data;
401  typedef std::queue<casacore::Int> ValidChunks;
402  typedef std::queue<Subchunk> ValidSubChunks;
403 
404  class Timing {
405  public:
420  };
421 
422  ChannelSelection channelSelection_p; // last channels selected for the VI in use
423  casacore::Data data_p; // Buffer queue
426  async::Mutex & mutex_p; // provided by Asynchronous interface
428  mutable ValidChunks validChunks_p; // casacore::Queue of valid chunk numbers
429  mutable ValidSubChunks validSubChunks_p; // casacore::Queue of valid subchunk pairs
430 
431 
434 
435  casacore::Bool statsEnabled () const;
436  void terminateSweep ();
437 
439 
441 
442  // Illegal operations
443 
444  VlaData (const VlaData & other);
445  VlaData & operator= (const VlaData & other);
446 };
447 
448 class WriteData {
449 
450 public:
451 
452  WriteData (const Subchunk & subchunkPair) : subchunkPair_p (subchunkPair) {}
453 
454  virtual ~WriteData () {}
455 
456  Subchunk getsubchunk () const { return subchunkPair_p;}
457  virtual void write (VisibilityIterator2 * vi) = 0;
458 
459 private:
460 
461  Subchunk subchunkPair_p;
462 
463 };
464 
465 template <typename casacore::Data>
466 class WriteDataImpl : public WriteData {
467 public:
468 
469  typedef void (VisibilityIterator2::* Setter) (const casacore::Data &);
470 
471  WriteDataImpl (const Subchunk & subchunkPair,
472  const casacore::Data & data,
473  Setter setter)
474  : WriteData (subchunkPair),
475  data_p (),
476  setter_p (setter)
477  {
478  data_p.assign (data); // Make a pure copy
479  }
480 
481  void
483  {
484  (vi ->* setter_p) (data_p);
485  }
486 
487 private:
488 
489  casacore::Data data_p;
491 
492 };
493 
494 template <typename casacore::Data>
495 WriteData *
496 createWriteData (const Subchunk & subchunkPair,
497  const casacore::Data & data,
498  void (VisibilityIterator2::* setter) (const casacore::Data &))
499 {
500  return new WriteDataImpl<casacore::Data> (subchunkPair, data, setter);
501 }
502 
503 template <typename casacore::Data>
504 class WriteDataImpl2 : public WriteData {
505 public:
506 
508  typedef void (VisibilityIterator2::* Setter) (const casacore::Data &, DataColumn);
509 
510  WriteDataImpl2 (const Subchunk & subchunkPair,
511  const casacore::Data & data,
512  DataColumn dataColumn,
513  Setter setter)
514  : WriteData (subchunkPair),
515  data_p (),
516  dataColumn_p (dataColumn),
517  setter_p (setter)
518  {
519  data_p.assign (data); // Make a pure copy
520  }
521 
522  void
524  {
525  (vi ->* setter_p) (data_p, dataColumn_p);
526  }
527 
528 private:
529 
530  casacore::Data data_p;
533 };
534 
535 template <typename casacore::Data>
536 WriteData *
537 createWriteData (const Subchunk & subchunkPair,
538  const casacore::Data & data,
540  void (VisibilityIterator2::* setter) (const casacore::Data &, VisibilityIterator2::DataColumn))
541 {
542  return new WriteDataImpl2 <casacore::Data> (subchunkPair, data, dataColumn, setter);
543 }
544 
546 
547 class WriteQueue {
548 
549 public:
550 
551  WriteQueue ();
552  ~WriteQueue ();
553 
554  WriteData * dequeue ();
555  casacore::Bool empty (casacore::Bool alreadyLocked = false);
556  void enqueue (WriteData * writeData);
557 
558  void initialize (const AsynchronousInterface *);
559 
560  void write (VisibilityIterator2 * vi);
561 
562 private:
563 
566  std::queue<WriteData *> queue_p;
567 };
568 
569 
571 
572  //friend class InterfaceController;
573 
574 public:
575 
576  AsynchronousInterface( const AsynchronousInterface& ) = delete;
578  AsynchronousInterface (int maxNBuffers);
580 
581  void addModifier (RoviaModifier * modifier);
582  async::Mutex & getMutex () const;
583  //async::LockGuard getLockGuard () const;
584  VlaData * getVlaData ();
585  VLAT * getVlat ();
587  void initialize ();
590  void notifyAllInterfaceChanged () const;
591  void requestViReset ();
592  pair<casacore::Bool, RoviaModifiers> resetVi ();
593  void terminate ();
594  void terminateLookahead ();
595  void terminateSweep ();
597  void viResetComplete ();
599  void waitForInterfaceChange (async::UniqueLock & uniqueLock) const;
600 
602  static casacore::Bool logThis (casacore::Int level);
603 
604 private:
605 
606  mutable async::Condition interfaceDataChanged_p; // Signals interface data has changed
607  // o VisBuffer consumed
608  // o Write data queued
609  // o Sweep or thread termination requested
610  volatile casacore::Bool lookaheadTerminationRequested_p; // true to request thread termination
611  mutable async::Mutex mutex_p; // casacore::Mutex protecting access to concurrent data
613  volatile casacore::Bool sweepTerminationRequested_p; // true to request sweep termination
614  // (e.g., prior to rewinding
615  volatile casacore::Bool viResetComplete_p; // VI reset process has completed
616  volatile casacore::Bool viResetRequested_p; // VI reset has been requested
617  VlaData vlaData_p; // Lookahead data
618  VLAT * vlat_p; // Lookahead thread
619  WriteQueue writeQueue_p; // casacore::Data to be written (writable VIs only)
620 
623 };
624 
625 } // end namespace vi
626 
627 } // end namespace casa
628 
629 #endif /* ASYNCHRONOUS_INTERFACE_H_ */
casacore::Block< casacore::Vector< casacore::Int > > blockWidth_p
*static casacore::Bool initializeLogging()
void apply(VisibilityIterator2 *)
int Int
Definition: aipstype.h:50
void storeChannelSelection(const ChannelSelection &channelSelection)
virtual void write(VisibilityIterator2 *vi)=0
SetRowBlockingModifier(casacore::Int nRows)
VisBufferImplAsync2 * visBuffer_p
ChannelSelection & operator=(const ChannelSelection &other)
casacore::String toCsv(const casacore::Block< casacore::Vector< casacore::Int > > &bv) const
void initialize(const AsynchronousInterface *)
LatticeExprNode arg(const LatticeExprNode &expr)
SelectChannelModifier(casacore::Int nGroup, casacore::Int start, casacore::Int width, casacore::Int increment, casacore::Int spectralWindow)
WriteData(const Subchunk &subchunkPair)
static casacore::Bool logThis(casacore::Int level)
void insertValidSubChunk(Subchunk)
VisibilityIterator2 iterates through one or more readonly MeasurementSets.
void(VisibilityIterator2::* Setter)(const casacore::Data &, DataColumn)
void apply(VisibilityIterator2 *) const
const casacore::Int MaxNBuffers_p
void addModifier(RoviaModifier *modifier)
void waitForInterfaceChange(async::UniqueLock &uniqueLock) const
VlaData & operator=(const VlaData &other)
AsynchronousInterface & operator=(const AsynchronousInterface &)=delete
VlaDatum & operator=(const VlaDatum &other)
Illegal operations.
ChannelSelection channelSelection_p
ChannelSelection getChannelSelection() const
VisBufferImplAsync2 * getVisBuffer()
casacore::Block< casacore::Vector< casacore::Int > > blockNGroup_p
void copyBlock(const casacore::Block< casacore::Vector< casacore::Int > > &src, casacore::Block< casacore::Vector< casacore::Int > > &to) const
const AsynchronousInterface * interface_p
async::Mutex & getMutex() const
virtual void print(std::ostream &o) const
casacore::String makeReport()
casacore::Bool isValidSubChunk(Subchunk) const
casacore::Bool isSweepTerminationRequested() const
void notifyAllInterfaceChanged() const
RoviaModifiers transferModifiers()
casacore::Bool isLookaheadTerminationRequested() const
WriteDataImpl(const Subchunk &subchunkPair, const casacore::Data &data, Setter setter)
ABSTRACT CLASSES Deliberately vague to be general enough to allow for many different types of data
Definition: PlotData.h:48
void apply(VisibilityIterator2 *) const
Internal value for MRadialVelocity.
casacore::Bool statsEnabled() const
void add(RoviaModifier *)
void insertValidChunk(casacore::Int chunkNumber)
casacore::Int clock(casacore::Int arg, casacore::Int base)
void write(VisibilityIterator2 *vi)
void initialize(const AsynchronousInterface *)
virtual void apply(VisibilityIterator2 *) const =0
void enqueue(WriteData *writeData)
Subchunk getSubchunk() const
void readComplete(Subchunk)
casacore::Block< casacore::Vector< casacore::Int > > blockSpw_p
WriteDataImpl2(const Subchunk &subchunkPair, const casacore::Data &data, DataColumn dataColumn, Setter setter)
double Double
Definition: aipstype.h:55
VisBufferImplAsync2 * releaseVisBufferAsync2()
WriteData * dequeue()
casacore::Bool isValidChunk(casacore::Int chunkNumber) const
void write(VisibilityIterator2 *vi)
volatile casacore::Bool sweepTerminationRequested_p
void print(std::ostream &o) const
std::queue< casacore::Int > ValidChunks
Types
Types of known MRadialVelocity Warning: The order defines the order in the translation matrix FromTo...
bool Bool
Define the standard types used by Casacore.
Definition: aipstype.h:42
volatile casacore::Bool viResetComplete_p
(e.g., prior to rewinding
void fillComplete(VlaDatum *datum)
Types
Types of known MDopplers Warning: The order defines the order in the translation matrix FromTo in th...
Definition: MDoppler.h:149
void(VisibilityIterator2::* Setter)(const casacore::Data &)
std::queue< Subchunk > ValidSubChunks
virtual void print(std::ostream &o) const =0
std::queue< WriteData * > queue_p
casacore::Bool isSubchunk(Subchunk) const
const VisBufferAsync2 * getVisBuffer () const;
static casacore::Bool initializeLogging()
static casacore::Bool loggingInitialized_p
void apply(VisibilityIterator2 *) const
simple 1-D array
Subchunk getsubchunk() const
void print(std::ostream &o) const
WriteData * createWriteData(const Subchunk &subchunkPair, const casacore::Data &data, void(VisibilityIterator2::*setter)(const casacore::Data &))
VlaDatum * fillStart(Subchunk, const ThreadTimes &fillStartTime)
String: the storage and methods of handling collections of characters.
Definition: String.h:223
VlaData(casacore::Int maxNBuffers, async::Mutex &mutex)
volatile casacore::Bool viResetRequested_p
VisibilityIterator2::DataColumn DataColumn
RoviaModifiers transferRoviaModifiers()
casacore::Block< casacore::Vector< casacore::Int > > blockStart_p
casacore::MRadialVelocity::Types rvType_p
casacore::Bool viResetRequested()
AsynchronousInterface(const AsynchronousInterface &)=delete
friend class InterfaceController;
const AsynchronousInterface * interface_p
void write(VisibilityIterator2 *vi)
void apply(VisibilityIterator2 *) const
VisBufferImplAsync2 * readStart(Subchunk)
volatile casacore::Bool lookaheadTerminationRequested_p
o VisBuffer consumed o Write data queued o Sweep or thread termination requested
SetIntervalModifier(casacore::Double timeInterval)
casacore::Block< casacore::Vector< casacore::Int > > blockIncr_p
VlaData * getVlaData()
async::LockGuard getLockGuard () const;
ValidSubChunks validSubChunks_p
casacore::Bool empty(casacore::Bool alreadyLocked=false)
VlaDatum is a single elemement in the VlaDatum buffer ring used to support the VisibilityIterator2Asy...
casacore::MVRadialVelocity vStart_p
SelectVelocityModifier(casacore::Int nChan, const casacore::MVRadialVelocity &vStart, const casacore::MVRadialVelocity &vInc, casacore::MRadialVelocity::Types rvType, casacore::MDoppler::Types dType, casacore::Bool precise)
casacore::Bool fillCanStart() const
void print(std::ostream &o) const
pair< casacore::Bool, RoviaModifiers > resetVi()