casa  5.7.0-16
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
AsynchronousInterface.h
Go to the documentation of this file.
1 /*
2  * VlaData.h
3  *
4  * Created on: Sep 21, 2011
5  * Author: jjacobs
6  */
7 
8 #ifndef ASYNCHRONOUS_INTERFACE_H_
9 #define ASYNCHRONOUS_INTERFACE_H_
10 
11 #include "AsynchronousTools.h"
12 #include "UtilJ.h"
13 
16 
17 #include <casa/Arrays/Cube.h>
18 #include <casa/Arrays/Matrix.h>
19 #include <casa/Arrays/Vector.h>
20 #include <casa/Containers/Block.h>
27 
29 #include <memory>
30 #include <queue>
31 #include <vector>
32 
33 namespace casa {
34 
35 class ROVisibilityIterator;
36 
37 namespace asyncio {
38 
40 public:
41 
42  friend std::ostream & operator<< (std::ostream & o, const RoviaModifier & m);
43 
44  virtual ~RoviaModifier () {}
45  virtual void apply (ROVisibilityIterator *) const = 0;
46 
47  inline operator std::string( ) const {
48  std::stringstream ss;
49  print(ss);
50  return ss.str( );
51  }
52 
53 private:
54 
55  virtual void print (std::ostream & o) const = 0;
56 
57 };
58 
60 
61 public:
62 
64 
70 
71  ChannelSelection (const ChannelSelection & other);
73 
74 
75  void
81 
82 protected:
83 
86 
87 private:
88 
94 };
95 
96 
98 
99 public:
100 
101  SelectChannelModifier (casacore::Int nGroup, casacore::Int start, casacore::Int width, casacore::Int increment, casacore::Int spectralWindow);
103  const casacore::Block< casacore::Vector<casacore::Int> > & blockStart,
104  const casacore::Block< casacore::Vector<casacore::Int> > & blockWidth,
107 
108  void apply (ROVisibilityIterator *) const;
109 
110 private:
111 
119 
120  void print (std::ostream & o) const;
123 
124 };
125 
127 
128 public:
129 
130  SetIntervalModifier (casacore::Double timeInterval);
131  void apply (ROVisibilityIterator *) const;
132 
133 private:
134 
136 
137  void print (std::ostream & o) const;
138 };
139 
140 
142 
143 public:
144 
146  void apply (ROVisibilityIterator *) const;
147 
148 private:
149 
155 
156  void print (std::ostream & o) const;
157 };
158 
160 
161 public:
162 
163  ~RoviaModifiers ();
164 
165  void add (RoviaModifier *);
166  void apply (ROVisibilityIterator *);
167  void clearAndFree ();
169 
170 private:
171 
172  typedef std::vector<RoviaModifier *> Data;
174 
175 };
176 
178 
179 public:
180 
183  void apply (ROVisibilityIterator *) const;
184 
185 private:
186 
193 
194  virtual void print (std::ostream & o) const;
195 
196 };
197 
198 
199 // <summary>
200 // VlaDatum is a single elemement in the VlaDatum buffer ring used to support the
201 // ROVisibilityIteratorAsync.
202 // </summary>
203 
204 // <use visibility=local>
205 
206 // <reviewed reviewer="" date="yyyy/mm/dd" tests="" demos="">
207 // </reviewed>
208 
209 // <prerequisite>
210 // <li> VisBuffer
211 // <li> VisBufferAsync
212 // <li> ROVisibilityIteratorAsync
213 // <li> VlaData
214 // <li> VLAT
215 // </prerequisite>
216 //
217 // <etymology>
218 // VlaDatum is the quantum of data associated with a single position of the visibility
219 // look-ahead scheme.
220 // </etymology>
221 //
222 // <synopsis>
223 // VlaDatum is a single buffer for data produced by the VLAT thread and consumed by the
224 // main thread. A collection of VlaDatum objects is organized as a buffer ring in a
225 // VlaData object.
226 //
227 // A VlaDatum object is responsible for maintaining its state as well as containing the set
228 // of data accessed from a single position of a ROVisibilityIterator. It contains a
229 // VisibilityBufferAsync object to hold the data that will be used by the main thread; other
230 // data is maintained in member variables.
231 //
232 // VlaDatum has no concurrency mechanisms built in it; that is handled by the VlaData object.
233 // It does support a set of states that indicate its current use:
234 // Empty -> Filling -> Full -> Reading -> Empty.
235 // Changing state is accomplished by the methods fillStart, fillComplete, readStart and readComplete.
236 // </synopsis>
237 //
238 // <example>
239 // </example>
240 //
241 // <motivation>
242 // </motivation>
243 //
244 // <thrown>
245 // <li>casacore::AipsError for unhandleable errors
246 // </thrown>
247 //
248 // <todo asof="yyyy/mm/dd">
249 // </todo>
250 
251 
252 class VlaDatum {
253 
254 public:
255 
256  typedef enum {Empty, Filling, Full, Reading} State;
257 
259  ~VlaDatum ();
260 
261  SubChunkPair getSubChunkPair () const;
263  //const VisBufferAsync * getVisBuffer () const;
265 
267  void reset ();
268 
269 protected:
270 
271 private:
272 
275 
276  // Illegal operations
277 
278  VlaDatum & operator= (const VlaDatum & other);
279 
280 };
281 
282 class VLAT;
283 
284 // <summary>
285 // The VlaData class is a buffer ring used to support the communication between
286 // the visiblity lookahead thread (VLAT) and the main application thread. It
287 // implements the required concurrency control to support this communication.
288 // </summary>
289 
290 // <use visibility=local>
291 
292 // <reviewed reviewer="" date="yyyy/mm/dd" tests="" demos="">
293 // </reviewed>
294 
295 // <prerequisite>
296 // <li> VisBuffer
297 // <li> VisBufferAsync
298 // <li> ROVisibilityIteratorAsync
299 // <li> VlaData
300 // <li> VLAT
301 // </prerequisite>
302 //
303 // <etymology>
304 // VlaData is the entire collection of visibility look-ahead data currently (or potentially)
305 // shared between the lookahead and main threads.
306 // </etymology>
307 //
308 // <synopsis>
309 // The VlaData object supports the sharing of information between the VLAT look ahead thread and
310 // the main thread. It contains a buffer ring of VlaDatum objects which each hold all of the
311 // data that is normally access by a position of a ROVisibiltyIterator. Other data that is shared
312 // or communicated between the two threads is also managed by VlaData.
313 //
314 // A single mutex (member variable mutex_p) is used to protect data that is shared between the
315 // two threads. In addition there is a single PThreads condition variable, vlaDataChanged_p used
316 // to allow either thread to wait for the other to change the state of VlaData object.
317 //
318 // Buffer ring concurrency has two levels: the mutex protecting VlaData and the state of the
319 // VlaDatum object. Whenever a free buffer (VlaDatum object) is available the VLAT thread will
320 // fill it with the data from the next position of the ROVisibilityIterator; a buffer is free for
321 // filling when it is in the Empty state. Before the VLAT fills a buffer it must call fillStart;
322 // this method will block the caller until the next buffer in the ring becomes free; as a side effect
323 // the buffer's state becomes Filling. After fillStart is complete, the VLAT owns the buffer.
324 // When the VLAT is done with the buffer it calls fillComplete to relinquish the buffer; this causes
325 // the buffer state to change from Filling to Full.
326 //
327 // The main thread calls readStart to get the next filled buffer; the main thread is blocked until
328 // the a filled buffer is available. When the full buffer is ready its state is changed to Reading
329 // and readStart returns. The VLAT thread will not access the buffer while the main thread is reading.
330 // The read operation is terminated by calling readComplete; this changes the buffer state to Empty and
331 // does not block the main thread except potentially to acquire the mutex.
332 //
333 // The concurrency scheme is fairly sound except for the possibility of low-level data sharing through
334 // CASA data structures. Some of the CASA containers (e.g., casacore::Array<T>) can potentially share storage
335 // although it appears that normal operation they do not. Some problems have been encountered with
336 // objects that share data via reference-counted pointers. For instance, objects derived from
337 // casacore::MeasBase<T,U> (e.g., casacore::MDirection, casacore::MPosition, casacore::MEpoch, etc.) share the object that serves as the
338 // frame of reference for the measurement; only by converting the object to text and back again can
339 // a user easily obtain a copy which does not share values with another. It is possible that other
340 // objects deep many layers down a complex object may still be waiting to trip up VlaData's
341 // concurrency scheme.
342 //
343 // On unusual interaction mediated by VlaData occurs when it is necessary to reset the visibility
344 // iterator back to the start of a MeasurementSet. This usually happens either at the start of the MS
345 // sweep (e.g., to reset the row blocking factor of the iterator) or at the end (e.g., to make an
346 // additional pass through the casacore::MS). The main thread requests a reset of the VI and then is blocked
347 // until the VI is reset. The sweepTerminationRequested_p variable is set to true; when the VLAT
348 // discovers that this variable is true it resets the buffer ring, repositions its VI to the start
349 // of the casacore::MS and then informs the blocked main thread by setting viResetComplete to true and
350 // signalling vlaDataChanged_p.
351 // </synopsis>
352 //
353 // <example>
354 // </example>
355 //
356 // <motivation>
357 // </motivation>
358 //
359 // <thrown>
360 // <li> AipsError
361 // </thrown>
362 //
363 // <todo asof="yyyy/mm/dd">
364 // </todo>
365 
367 class InterfaceController;
368 
369 class VlaData {
370 
371 public:
372 
373  VlaData (casacore::Int maxNBuffers, async::Mutex & mutex);
374  ~VlaData ();
375 
376  casacore::Bool fillCanStart () const;
377  void fillComplete (VlaDatum * datum);
378  VlaDatum * fillStart (SubChunkPair, const ThreadTimes & fillStartTime);
380  void initialize (const AsynchronousInterface *);
381  void insertValidChunk (casacore::Int chunkNumber);
383  casacore::Bool isValidChunk (casacore::Int chunkNumber) const;
385  void readComplete (SubChunkPair);
387  void resetBufferData ();
388  void setNoMoreData ();
389  void storeChannelSelection (const asyncio::ChannelSelection & channelSelection);
390 
391  //static void debugBlock ();
392  //static void debugUnblock ();
393  //static casacore::Bool logThis (casacore::Int level);
394 
395  //static casacore::Bool loggingInitialized_p;
396  //static casacore::Int logLevel_p;
397 
398 protected:
399 
400 private:
401 
402  typedef std::queue<VlaDatum *> Data;
403  typedef std::queue<casacore::Int> ValidChunks;
404  typedef std::queue<SubChunkPair> ValidSubChunks;
405 
406  class Timing {
407  public:
422  };
423 
424  asyncio::ChannelSelection channelSelection_p; // last channels selected for the VI in use
425  Data data_p; // Buffer queue
428  async::Mutex & mutex_p; // provided by Asynchronous interface
430  mutable ValidChunks validChunks_p; // casacore::Queue of valid chunk numbers
431  mutable ValidSubChunks validSubChunks_p; // casacore::Queue of valid subchunk pairs
432 
433 
436 
437  casacore::Bool statsEnabled () const;
438  void terminateSweep ();
439 
441 
443 
444  // Illegal operations
445 
446  VlaData (const VlaData & other);
447  VlaData & operator= (const VlaData & other);
448 };
449 
450 class WriteData {
451 
452 public:
453 
454  WriteData (const SubChunkPair & subchunkPair) : subchunkPair_p (subchunkPair) {}
455 
456  virtual ~WriteData () {}
457 
459  virtual void write (VisibilityIterator * vi) = 0;
460 
461 private:
462 
464 
465 };
466 
467 template <typename Data>
468 class WriteDataImpl : public WriteData {
469 public:
470 
471  typedef void (VisibilityIterator::* Setter) (const Data &);
472 
473  WriteDataImpl (const SubChunkPair & subchunkPair,
474  const Data & data,
475  Setter setter)
476  : WriteData (subchunkPair),
477  data_p (),
478  setter_p (setter)
479  {
480  data_p.assign (data); // Make a pure copy
481  }
482 
483  void
485  {
486  (vi ->* setter_p) (data_p);
487  }
488 
489 private:
490 
491  Data data_p;
493 
494 };
495 
496 template <typename Data>
497 WriteData *
498 createWriteData (const SubChunkPair & subchunkPair,
499  const Data & data,
500  void (VisibilityIterator::* setter) (const Data &))
501 {
502  return new WriteDataImpl<Data> (subchunkPair, data, setter);
503 }
504 
505 template <typename Data>
506 class WriteDataImpl2 : public WriteData {
507 public:
508 
510  typedef void (VisibilityIterator::* Setter) (const Data &, DataColumn);
511 
512  WriteDataImpl2 (const SubChunkPair & subchunkPair,
513  const Data & data,
514  DataColumn dataColumn,
515  Setter setter)
516  : WriteData (subchunkPair),
517  data_p (),
518  dataColumn_p (dataColumn),
519  setter_p (setter)
520  {
521  data_p.assign (data); // Make a pure copy
522  }
523 
524  void
526  {
527  (vi ->* setter_p) (data_p, dataColumn_p);
528  }
529 
530 private:
531 
532  Data data_p;
535 };
536 
537 template <typename Data>
538 WriteData *
539 createWriteData (const SubChunkPair & subchunkPair,
540  const Data & data,
542  void (VisibilityIterator::* setter) (const Data &, ROVisibilityIterator::DataColumn))
543 {
544  return new WriteDataImpl2 <Data> (subchunkPair, data, dataColumn, setter);
545 }
546 
548 
549 class WriteQueue {
550 
551 public:
552 
553  WriteQueue ();
554  ~WriteQueue ();
555 
556  WriteData * dequeue ();
557  casacore::Bool empty (casacore::Bool alreadyLocked = false);
558  void enqueue (WriteData * writeData);
559 
560  void initialize (const AsynchronousInterface *);
561 
562  void write (VisibilityIterator * vi);
563 
564 private:
565 
568  std::queue<WriteData *> queue_p;
569 };
570 
571 
573 
574  //friend class InterfaceController;
575 
576 public:
577 
578  // make noncopyable...
579  AsynchronousInterface( const AsynchronousInterface& ) = delete;
581 
582  AsynchronousInterface (int maxNBuffers);
584 
585  void addModifier (asyncio::RoviaModifier * modifier);
586  async::Mutex & getMutex () const;
587  //async::LockGuard getLockGuard () const;
588  VlaData * getVlaData ();
589  VLAT * getVlat ();
591  void initialize ();
594  void notifyAllInterfaceChanged () const;
595  void requestViReset ();
596  std::pair<casacore::Bool, RoviaModifiers> resetVi ();
597  void terminate ();
598  void terminateLookahead ();
599  void terminateSweep ();
601  void viResetComplete ();
603  void waitForInterfaceChange (async::UniqueLock & uniqueLock) const;
604 
606  static casacore::Bool logThis (casacore::Int level);
607 
608 private:
609 
610  mutable async::Condition interfaceDataChanged_p; // Signals interface data has changed
611  // o VisBuffer consumed
612  // o Write data queued
613  // o Sweep or thread termination requested
614  volatile casacore::Bool lookaheadTerminationRequested_p; // true to request thread termination
615  mutable async::Mutex mutex_p; // casacore::Mutex protecting access to concurrent data
617  volatile casacore::Bool sweepTerminationRequested_p; // true to request sweep termination
618  // (e.g., prior to rewinding
619  volatile casacore::Bool viResetComplete_p; // VI reset process has completed
620  volatile casacore::Bool viResetRequested_p; // VI reset has been requested
621  VlaData vlaData_p; // Lookahead data
622  VLAT * vlat_p; // Lookahead thread
623  WriteQueue writeQueue_p; // casacore::Data to be written (writable VIs only)
624 
627 };
628 
629 } // end namespace asyncio
630 
631 } // end namespace casa
632 
633 #endif /* ASYNCHRONOUS_INTERFACE_H_ */
void insertValidSubChunk(SubChunkPair)
SetIntervalModifier(casacore::Double timeInterval)
casacore::Bool empty(casacore::Bool alreadyLocked=false)
void print(std::ostream &o) const
int Int
Definition: aipstype.h:50
VisBufferAsync * getVisBuffer()
std::queue< casacore::Int > ValidChunks
RoviaModifiers transferModifiers()
void apply(ROVisibilityIterator *) const
LatticeExprNode arg(const LatticeExprNode &expr)
VlaData * getVlaData()
async::LockGuard getLockGuard () const;
VlaDatum(SubChunkPair)
VlaDatum is a single elemement in the VlaDatum buffer ring used to support the ROVisibilityIteratorAs...
casacore::Bool isLookaheadTerminationRequested() const
WriteData(const SubChunkPair &subchunkPair)
const casacore::Int MaxNBuffers_p
void apply(ROVisibilityIterator *) const
ROVisibilityIterator::DataColumn DataColumn
void(VisibilityIterator::* Setter)(const Data &)
WriteDataImpl(const SubChunkPair &subchunkPair, const Data &data, Setter setter)
VLAT is the Visibility LookAhead Thread. This thread advances a visibility iterator and fills the dat...
Definition: VLAT.h:219
VisBufferAsync * readStart(SubChunkPair)
virtual void print(std::ostream &o) const =0
void print(std::ostream &o) const
casacore::Block< casacore::Vector< casacore::Int > > blockWidth_p
casacore::Bool fillCanStart() const
const AsynchronousInterface * interface_p
casacore::Bool isValidChunk(casacore::Int chunkNumber) const
void initialize(const AsynchronousInterface *)
static casacore::Bool initializeLogging()
std::queue< VlaDatum * > Data
static void debugBlock (); static void debugUnblock (); static casacore::Bool logThis (casacore::Int ...
std::pair< casacore::Bool, RoviaModifiers > resetVi()
void initialize(const AsynchronousInterface *)
void add(RoviaModifier *)
volatile casacore::Bool viResetComplete_p
(e.g., prior to rewinding
virtual void apply(ROVisibilityIterator *) const =0
casacore::Block< casacore::Vector< casacore::Int > > blockIncr_p
casacore::MRadialVelocity::Types rvType_p
ABSTRACT CLASSES Deliberately vague to be general enough to allow for many different types of data
Definition: PlotData.h:48
Internal value for MRadialVelocity.
void storeChannelSelection(const asyncio::ChannelSelection &channelSelection)
void waitForInterfaceChange(async::UniqueLock &uniqueLock) const
casacore::Bool isSubChunk(SubChunkPair) const
const VisBufferAsync * getVisBuffer () const;
void insertValidChunk(casacore::Int chunkNumber)
SelectChannelModifier(casacore::Int nGroup, casacore::Int start, casacore::Int width, casacore::Int increment, casacore::Int spectralWindow)
void copyBlock(const casacore::Block< casacore::Vector< casacore::Int > > &src, casacore::Block< casacore::Vector< casacore::Int > > &to) const
SetRowBlockingModifier(casacore::Int nRows)
volatile casacore::Bool sweepTerminationRequested_p
WriteDataImpl2(const SubChunkPair &subchunkPair, const Data &data, DataColumn dataColumn, Setter setter)
casacore::Int clock(casacore::Int arg, casacore::Int base)
casacore::String makeReport()
double Double
Definition: aipstype.h:55
const AsynchronousInterface * interface_p
void print(std::ostream &o) const
AsynchronousInterface & operator=(const AsynchronousInterface &)=delete
void write(VisibilityIterator *vi)
static casacore::Bool logThis(casacore::Int level)
Types
Types of known MRadialVelocity Warning: The order defines the order in the translation matrix FromTo...
bool Bool
Define the standard types used by Casacore.
Definition: aipstype.h:42
void apply(ROVisibilityIterator *)
Types
Types of known MDopplers Warning: The order defines the order in the translation matrix FromTo in th...
Definition: MDoppler.h:149
VlaData(casacore::Int maxNBuffers, async::Mutex &mutex)
VlaDatum * fillStart(SubChunkPair, const ThreadTimes &fillStartTime)
casacore::Block< casacore::Vector< casacore::Int > > blockSpw_p
AsynchronousInterface(const AsynchronousInterface &)=delete
friend class InterfaceController;
casacore::Block< casacore::Vector< casacore::Int > > blockStart_p
VisBufferAsync * releaseVisBufferAsync()
async::Mutex & getMutex() const
std::queue< SubChunkPair > ValidSubChunks
void enqueue(WriteData *writeData)
simple 1-D array
SelectVelocityModifier(casacore::Int nChan, const casacore::MVRadialVelocity &vStart, const casacore::MVRadialVelocity &vInc, casacore::MRadialVelocity::Types rvType, casacore::MDoppler::Types dType, casacore::Bool precise)
casacore::Bool isValidSubChunk(SubChunkPair) const
virtual void write(VisibilityIterator *vi)=0
void apply(ROVisibilityIterator *) const
VlaDatum & operator=(const VlaDatum &other)
Illegal operations.
WriteData * createWriteData(const SubChunkPair &subchunkPair, const Data &data, void(VisibilityIterator::*setter)(const Data &))
SubChunkPair getSubChunkPair() const
casacore::String toCsv(const casacore::Block< casacore::Vector< casacore::Int > > &bv) const
void write(VisibilityIterator *vi)
asyncio::ChannelSelection getChannelSelection() const
String: the storage and methods of handling collections of characters.
Definition: String.h:223
*static casacore::Bool initializeLogging()
void write(VisibilityIterator *vi)
void fillComplete(VlaDatum *datum)
VisibilityIterator iterates through one or more writable MeasurementSets.
void apply(ROVisibilityIterator *) const
casacore::Block< casacore::Vector< casacore::Int > > blockNGroup_p
asyncio::ChannelSelection channelSelection_p
virtual void print(std::ostream &o) const
volatile casacore::Bool lookaheadTerminationRequested_p
o VisBuffer consumed o Write data queued o Sweep or thread termination requested
ROVisibilityIterator iterates through one or more readonly MeasurementSets.
SubChunkPair getSubChunkPair() const
casacore::Bool statsEnabled() const
std::vector< RoviaModifier * > Data
ChannelSelection & operator=(const ChannelSelection &other)
void(VisibilityIterator::* Setter)(const Data &, DataColumn)
void addModifier(asyncio::RoviaModifier *modifier)
casacore::Bool isSweepTerminationRequested() const
std::queue< WriteData * > queue_p
void readComplete(SubChunkPair)
VlaData & operator=(const VlaData &other)
friend std::ostream & operator<<(std::ostream &o, const RoviaModifier &m)