Line data Source code
1 : /*
2 : * VlaData.h
3 : *
4 : * Created on: Sep 21, 2011
5 : * Author: jjacobs
6 : */
7 :
8 : #ifndef ASYNCHRONOUS_INTERFACE_H_
9 : #define ASYNCHRONOUS_INTERFACE_H_
10 :
11 : #include <stdcasa/thread/AsynchronousTools.h>
12 : #include <stdcasa/UtilJ.h>
13 :
14 : using casa::utilj::ThreadTimes;
15 : using casa::utilj::DeltaThreadTimes;
16 :
17 : #include <casacore/casa/Arrays/Cube.h>
18 : #include <casacore/casa/Arrays/Matrix.h>
19 : #include <casacore/casa/Arrays/Vector.h>
20 : #include <casacore/casa/Containers/Block.h>
21 : #include <casacore/casa/Quanta/MVRadialVelocity.h>
22 : #include <casacore/measures/Measures/MRadialVelocity.h>
23 : #include <casacore/measures/Measures/MDoppler.h>
24 : #include "VisBufferAsync.h"
25 : #include "VisibilityIterator.h"
26 : #include "VisibilityIteratorImpl.h"
27 :
28 : ///#pragma GCC diagnostic warning "-Wno-missing-field-initializers"
29 : #include <memory>
30 : #include <queue>
31 : #include <vector>
32 :
33 : namespace casa {
34 :
35 : class ROVisibilityIterator;
36 :
37 : namespace asyncio {
38 :
39 : class RoviaModifier {
40 : public:
41 :
42 : friend std::ostream & operator<< (std::ostream & o, const RoviaModifier & m);
43 :
44 0 : virtual ~RoviaModifier () {}
45 : virtual void apply (ROVisibilityIterator *) const = 0;
46 :
47 0 : inline operator std::string( ) const {
48 0 : std::stringstream ss;
49 0 : print(ss);
50 0 : return ss.str( );
51 : }
52 :
53 : private:
54 :
55 : virtual void print (std::ostream & o) const = 0;
56 :
57 : };
58 :
59 : class ChannelSelection {
60 :
61 : public:
62 :
63 0 : ChannelSelection () {}
64 :
65 : ChannelSelection (const casacore::Block< casacore::Vector<casacore::Int> > & blockNGroup,
66 : const casacore::Block< casacore::Vector<casacore::Int> > & blockStart,
67 : const casacore::Block< casacore::Vector<casacore::Int> > & blockWidth,
68 : const casacore::Block< casacore::Vector<casacore::Int> > & blockIncr,
69 : const casacore::Block< casacore::Vector<casacore::Int> > & blockSpw);
70 :
71 : ChannelSelection (const ChannelSelection & other);
72 : ChannelSelection & operator= (const ChannelSelection & other);
73 :
74 :
75 : void
76 : get (casacore::Block< casacore::Vector<casacore::Int> > & blockNGroup,
77 : casacore::Block< casacore::Vector<casacore::Int> > & blockStart,
78 : casacore::Block< casacore::Vector<casacore::Int> > & blockWidth,
79 : casacore::Block< casacore::Vector<casacore::Int> > & blockIncr,
80 : casacore::Block< casacore::Vector<casacore::Int> > & blockSpw) const;
81 :
82 : protected:
83 :
84 : void copyBlock (const casacore::Block <casacore::Vector<casacore::Int> > & src,
85 : casacore::Block <casacore::Vector<casacore::Int> > & to) const;
86 :
87 : private:
88 :
89 : casacore::Block< casacore::Vector<casacore::Int> > blockNGroup_p;
90 : casacore::Block< casacore::Vector<casacore::Int> > blockStart_p;
91 : casacore::Block< casacore::Vector<casacore::Int> > blockWidth_p;
92 : casacore::Block< casacore::Vector<casacore::Int> > blockIncr_p;
93 : casacore::Block< casacore::Vector<casacore::Int> > blockSpw_p;
94 : };
95 :
96 :
97 : class SelectChannelModifier : public RoviaModifier {
98 :
99 : public:
100 :
101 : SelectChannelModifier (casacore::Int nGroup, casacore::Int start, casacore::Int width, casacore::Int increment, casacore::Int spectralWindow);
102 : SelectChannelModifier (const casacore::Block< casacore::Vector<casacore::Int> > & blockNGroup,
103 : const casacore::Block< casacore::Vector<casacore::Int> > & blockStart,
104 : const casacore::Block< casacore::Vector<casacore::Int> > & blockWidth,
105 : const casacore::Block< casacore::Vector<casacore::Int> > & blockIncr,
106 : const casacore::Block< casacore::Vector<casacore::Int> > & blockSpw);
107 :
108 : void apply (ROVisibilityIterator *) const;
109 :
110 : private:
111 :
112 : casacore::Bool channelBlocks_p;
113 : ChannelSelection channelSelection_p;
114 : casacore::Int increment_p;
115 : casacore::Int nGroup_p;
116 : casacore::Int spectralWindow_p;
117 : casacore::Int start_p;
118 : casacore::Int width_p;
119 :
120 : void print (std::ostream & o) const;
121 : casacore::String toCsv (const casacore::Block< casacore::Vector<casacore::Int> > & bv) const;
122 : casacore::String toCsv (const casacore::Vector<casacore::Int> & v) const;
123 :
124 : };
125 :
126 : class SetIntervalModifier : public RoviaModifier {
127 :
128 : public:
129 :
130 : SetIntervalModifier (casacore::Double timeInterval);
131 : void apply (ROVisibilityIterator *) const;
132 :
133 : private:
134 :
135 : casacore::Double timeInterval_p;
136 :
137 : void print (std::ostream & o) const;
138 : };
139 :
140 :
141 : class SetRowBlockingModifier : public RoviaModifier {
142 :
143 : public:
144 :
145 : SetRowBlockingModifier (casacore::Int nRows);
146 : void apply (ROVisibilityIterator *) const;
147 :
148 : private:
149 :
150 : casacore::Int nRows_p;
151 : casacore::Int nGroup_p;
152 : casacore::Int spectralWindow_p;
153 : casacore::Int start_p;
154 : casacore::Int width_p;
155 :
156 : void print (std::ostream & o) const;
157 : };
158 :
159 : class RoviaModifiers {
160 :
161 : public:
162 :
163 : ~RoviaModifiers ();
164 :
165 : void add (RoviaModifier *);
166 : void apply (ROVisibilityIterator *);
167 : void clearAndFree ();
168 : RoviaModifiers transferModifiers ();
169 :
170 : private:
171 :
172 : typedef std::vector<RoviaModifier *> Data;
173 : Data data_p;
174 :
175 : };
176 :
177 : class SelectVelocityModifier : public RoviaModifier {
178 :
179 : public:
180 :
181 : SelectVelocityModifier (casacore::Int nChan, const casacore::MVRadialVelocity& vStart, const casacore::MVRadialVelocity& vInc,
182 : casacore::MRadialVelocity::Types rvType, casacore::MDoppler::Types dType, casacore::Bool precise);
183 : void apply (ROVisibilityIterator *) const;
184 :
185 : private:
186 :
187 : casacore::MDoppler::Types dType_p;
188 : casacore::Int nChan_p;
189 : casacore::Bool precise_p;
190 : casacore::MRadialVelocity::Types rvType_p;
191 : casacore::MVRadialVelocity vInc_p;
192 : casacore::MVRadialVelocity vStart_p;
193 :
194 : virtual void print (std::ostream & o) const;
195 :
196 : };
197 :
198 :
199 : // <summary>
200 : // VlaDatum is a single elemement in the VlaDatum buffer ring used to support the
201 : // ROVisibilityIteratorAsync.
202 : // </summary>
203 :
204 : // <use visibility=local>
205 :
206 : // <reviewed reviewer="" date="yyyy/mm/dd" tests="" demos="">
207 : // </reviewed>
208 :
209 : // <prerequisite>
210 : // <li> VisBuffer
211 : // <li> VisBufferAsync
212 : // <li> ROVisibilityIteratorAsync
213 : // <li> VlaData
214 : // <li> VLAT
215 : // </prerequisite>
216 : //
217 : // <etymology>
218 : // VlaDatum is the quantum of data associated with a single position of the visibility
219 : // look-ahead scheme.
220 : // </etymology>
221 : //
222 : // <synopsis>
223 : // VlaDatum is a single buffer for data produced by the VLAT thread and consumed by the
224 : // main thread. A collection of VlaDatum objects is organized as a buffer ring in a
225 : // VlaData object.
226 : //
227 : // A VlaDatum object is responsible for maintaining its state as well as containing the set
228 : // of data accessed from a single position of a ROVisibilityIterator. It contains a
229 : // VisibilityBufferAsync object to hold the data that will be used by the main thread; other
230 : // data is maintained in member variables.
231 : //
232 : // VlaDatum has no concurrency mechanisms built in it; that is handled by the VlaData object.
233 : // It does support a set of states that indicate its current use:
234 : // Empty -> Filling -> Full -> Reading -> Empty.
235 : // Changing state is accomplished by the methods fillStart, fillComplete, readStart and readComplete.
236 : // </synopsis>
237 : //
238 : // <example>
239 : // </example>
240 : //
241 : // <motivation>
242 : // </motivation>
243 : //
244 : // <thrown>
245 : // <li>casacore::AipsError for unhandleable errors
246 : // </thrown>
247 : //
248 : // <todo asof="yyyy/mm/dd">
249 : // </todo>
250 :
251 :
252 : class VlaDatum {
253 :
254 : public:
255 :
256 : typedef enum {Empty, Filling, Full, Reading} State;
257 :
258 : VlaDatum (SubChunkPair);
259 : ~VlaDatum ();
260 :
261 : SubChunkPair getSubChunkPair () const;
262 : VisBufferAsync * getVisBuffer ();
263 : //const VisBufferAsync * getVisBuffer () const;
264 : casacore::Bool isSubChunk (SubChunkPair) const;
265 :
266 : VisBufferAsync * releaseVisBufferAsync ();
267 : void reset ();
268 :
269 : protected:
270 :
271 : private:
272 :
273 : SubChunkPair subchunk_p;
274 : VisBufferAsync * visBuffer_p;
275 :
276 : // Illegal operations
277 :
278 : VlaDatum & operator= (const VlaDatum & other);
279 :
280 : };
281 :
282 : class VLAT;
283 :
284 : // <summary>
285 : // The VlaData class is a buffer ring used to support the communication between
286 : // the visiblity lookahead thread (VLAT) and the main application thread. It
287 : // implements the required concurrency control to support this communication.
288 : // </summary>
289 :
290 : // <use visibility=local>
291 :
292 : // <reviewed reviewer="" date="yyyy/mm/dd" tests="" demos="">
293 : // </reviewed>
294 :
295 : // <prerequisite>
296 : // <li> VisBuffer
297 : // <li> VisBufferAsync
298 : // <li> ROVisibilityIteratorAsync
299 : // <li> VlaData
300 : // <li> VLAT
301 : // </prerequisite>
302 : //
303 : // <etymology>
304 : // VlaData is the entire collection of visibility look-ahead data currently (or potentially)
305 : // shared between the lookahead and main threads.
306 : // </etymology>
307 : //
308 : // <synopsis>
309 : // The VlaData object supports the sharing of information between the VLAT look ahead thread and
310 : // the main thread. It contains a buffer ring of VlaDatum objects which each hold all of the
311 : // data that is normally access by a position of a ROVisibiltyIterator. Other data that is shared
312 : // or communicated between the two threads is also managed by VlaData.
313 : //
314 : // A single mutex (member variable mutex_p) is used to protect data that is shared between the
315 : // two threads. In addition there is a single PThreads condition variable, vlaDataChanged_p used
316 : // to allow either thread to wait for the other to change the state of VlaData object.
317 : //
318 : // Buffer ring concurrency has two levels: the mutex protecting VlaData and the state of the
319 : // VlaDatum object. Whenever a free buffer (VlaDatum object) is available the VLAT thread will
320 : // fill it with the data from the next position of the ROVisibilityIterator; a buffer is free for
321 : // filling when it is in the Empty state. Before the VLAT fills a buffer it must call fillStart;
322 : // this method will block the caller until the next buffer in the ring becomes free; as a side effect
323 : // the buffer's state becomes Filling. After fillStart is complete, the VLAT owns the buffer.
324 : // When the VLAT is done with the buffer it calls fillComplete to relinquish the buffer; this causes
325 : // the buffer state to change from Filling to Full.
326 : //
327 : // The main thread calls readStart to get the next filled buffer; the main thread is blocked until
328 : // the a filled buffer is available. When the full buffer is ready its state is changed to Reading
329 : // and readStart returns. The VLAT thread will not access the buffer while the main thread is reading.
330 : // The read operation is terminated by calling readComplete; this changes the buffer state to Empty and
331 : // does not block the main thread except potentially to acquire the mutex.
332 : //
333 : // The concurrency scheme is fairly sound except for the possibility of low-level data sharing through
334 : // CASA data structures. Some of the CASA containers (e.g., casacore::Array<T>) can potentially share storage
335 : // although it appears that normal operation they do not. Some problems have been encountered with
336 : // objects that share data via reference-counted pointers. For instance, objects derived from
337 : // casacore::MeasBase<T,U> (e.g., casacore::MDirection, casacore::MPosition, casacore::MEpoch, etc.) share the object that serves as the
338 : // frame of reference for the measurement; only by converting the object to text and back again can
339 : // a user easily obtain a copy which does not share values with another. It is possible that other
340 : // objects deep many layers down a complex object may still be waiting to trip up VlaData's
341 : // concurrency scheme.
342 : //
343 : // On unusual interaction mediated by VlaData occurs when it is necessary to reset the visibility
344 : // iterator back to the start of a MeasurementSet. This usually happens either at the start of the MS
345 : // sweep (e.g., to reset the row blocking factor of the iterator) or at the end (e.g., to make an
346 : // additional pass through the casacore::MS). The main thread requests a reset of the VI and then is blocked
347 : // until the VI is reset. The sweepTerminationRequested_p variable is set to true; when the VLAT
348 : // discovers that this variable is true it resets the buffer ring, repositions its VI to the start
349 : // of the casacore::MS and then informs the blocked main thread by setting viResetComplete to true and
350 : // signalling vlaDataChanged_p.
351 : // </synopsis>
352 : //
353 : // <example>
354 : // </example>
355 : //
356 : // <motivation>
357 : // </motivation>
358 : //
359 : // <thrown>
360 : // <li> AipsError
361 : // </thrown>
362 : //
363 : // <todo asof="yyyy/mm/dd">
364 : // </todo>
365 :
366 : class AsynchronousInterface;
367 : class InterfaceController;
368 :
369 : class VlaData {
370 :
371 : public:
372 :
373 : VlaData (casacore::Int maxNBuffers, async::Mutex & mutex);
374 : ~VlaData ();
375 :
376 : casacore::Bool fillCanStart () const;
377 : void fillComplete (VlaDatum * datum);
378 : VlaDatum * fillStart (SubChunkPair, const ThreadTimes & fillStartTime);
379 : asyncio::ChannelSelection getChannelSelection () const;
380 : void initialize (const AsynchronousInterface *);
381 : void insertValidChunk (casacore::Int chunkNumber);
382 : void insertValidSubChunk (SubChunkPair);
383 : casacore::Bool isValidChunk (casacore::Int chunkNumber) const;
384 : casacore::Bool isValidSubChunk (SubChunkPair) const;
385 : void readComplete (SubChunkPair);
386 : VisBufferAsync * readStart (SubChunkPair);
387 : void resetBufferData ();
388 : void setNoMoreData ();
389 : void storeChannelSelection (const asyncio::ChannelSelection & channelSelection);
390 :
391 : //static void debugBlock ();
392 : //static void debugUnblock ();
393 : //static casacore::Bool logThis (casacore::Int level);
394 :
395 : //static casacore::Bool loggingInitialized_p;
396 : //static casacore::Int logLevel_p;
397 :
398 : protected:
399 :
400 : private:
401 :
402 : typedef std::queue<VlaDatum *> Data;
403 : typedef std::queue<casacore::Int> ValidChunks;
404 : typedef std::queue<SubChunkPair> ValidSubChunks;
405 :
406 : class Timing {
407 : public:
408 : ThreadTimes fill1_p;
409 : ThreadTimes fill2_p;
410 : ThreadTimes fill3_p;
411 : DeltaThreadTimes fillCycle_p;
412 : DeltaThreadTimes fillOperate_p;
413 : DeltaThreadTimes fillWait_p;
414 : ThreadTimes read1_p;
415 : ThreadTimes read2_p;
416 : ThreadTimes read3_p;
417 : DeltaThreadTimes readCycle_p;
418 : DeltaThreadTimes readOperate_p;
419 : DeltaThreadTimes readWait_p;
420 : ThreadTimes timeStart_p;
421 : ThreadTimes timeStop_p;
422 : };
423 :
424 : asyncio::ChannelSelection channelSelection_p; // last channels selected for the VI in use
425 : Data data_p; // Buffer queue
426 : const AsynchronousInterface * interface_p;
427 : const casacore::Int MaxNBuffers_p;
428 : async::Mutex & mutex_p; // provided by Asynchronous interface
429 : Timing timing_p;
430 : mutable ValidChunks validChunks_p; // casacore::Queue of valid chunk numbers
431 : mutable ValidSubChunks validSubChunks_p; // casacore::Queue of valid subchunk pairs
432 :
433 :
434 : casacore::Int clock (casacore::Int arg, casacore::Int base);
435 : casacore::String makeReport ();
436 :
437 : casacore::Bool statsEnabled () const;
438 : void terminateSweep ();
439 :
440 : //// static Semaphore debugBlockSemaphore_p; // used to block a thread for debugging
441 :
442 : static casacore::Bool initializeLogging ();
443 :
444 : // Illegal operations
445 :
446 : VlaData (const VlaData & other);
447 : VlaData & operator= (const VlaData & other);
448 : };
449 :
450 : class WriteData {
451 :
452 : public:
453 :
454 0 : WriteData (const SubChunkPair & subchunkPair) : subchunkPair_p (subchunkPair) {}
455 :
456 0 : virtual ~WriteData () {}
457 :
458 0 : SubChunkPair getSubChunkPair () const { return subchunkPair_p;}
459 : virtual void write (VisibilityIterator * vi) = 0;
460 :
461 : private:
462 :
463 : SubChunkPair subchunkPair_p;
464 :
465 : };
466 :
467 : template <typename Data>
468 : class WriteDataImpl : public WriteData {
469 : public:
470 :
471 : typedef void (VisibilityIterator::* Setter) (const Data &);
472 :
473 0 : WriteDataImpl (const SubChunkPair & subchunkPair,
474 : const Data & data,
475 : Setter setter)
476 : : WriteData (subchunkPair),
477 : data_p (),
478 0 : setter_p (setter)
479 : {
480 0 : data_p.assign (data); // Make a pure copy
481 0 : }
482 :
483 : void
484 0 : write (VisibilityIterator * vi)
485 : {
486 0 : (vi ->* setter_p) (data_p);
487 0 : }
488 :
489 : private:
490 :
491 : Data data_p;
492 : Setter setter_p;
493 :
494 : };
495 :
496 : template <typename Data>
497 : WriteData *
498 0 : createWriteData (const SubChunkPair & subchunkPair,
499 : const Data & data,
500 : void (VisibilityIterator::* setter) (const Data &))
501 : {
502 0 : return new WriteDataImpl<Data> (subchunkPair, data, setter);
503 : }
504 :
505 : template <typename Data>
506 : class WriteDataImpl2 : public WriteData {
507 : public:
508 :
509 : typedef ROVisibilityIterator::DataColumn DataColumn;
510 : typedef void (VisibilityIterator::* Setter) (const Data &, DataColumn);
511 :
512 0 : WriteDataImpl2 (const SubChunkPair & subchunkPair,
513 : const Data & data,
514 : DataColumn dataColumn,
515 : Setter setter)
516 : : WriteData (subchunkPair),
517 : data_p (),
518 : dataColumn_p (dataColumn),
519 0 : setter_p (setter)
520 : {
521 0 : data_p.assign (data); // Make a pure copy
522 0 : }
523 :
524 : void
525 0 : write (VisibilityIterator * vi)
526 : {
527 0 : (vi ->* setter_p) (data_p, dataColumn_p);
528 0 : }
529 :
530 : private:
531 :
532 : Data data_p;
533 : DataColumn dataColumn_p;
534 : Setter setter_p;
535 : };
536 :
537 : template <typename Data>
538 : WriteData *
539 0 : createWriteData (const SubChunkPair & subchunkPair,
540 : const Data & data,
541 : ROVisibilityIterator::DataColumn dataColumn,
542 : void (VisibilityIterator::* setter) (const Data &, ROVisibilityIterator::DataColumn))
543 : {
544 0 : return new WriteDataImpl2 <Data> (subchunkPair, data, dataColumn, setter);
545 : }
546 :
547 : class AsynchronousInterface;
548 :
549 : class WriteQueue {
550 :
551 : public:
552 :
553 : WriteQueue ();
554 : ~WriteQueue ();
555 :
556 : WriteData * dequeue ();
557 : casacore::Bool empty (casacore::Bool alreadyLocked = false);
558 : void enqueue (WriteData * writeData);
559 :
560 : void initialize (const AsynchronousInterface *);
561 :
562 : void write (VisibilityIterator * vi);
563 :
564 : private:
565 :
566 : const AsynchronousInterface * interface_p;
567 : async::Mutex mutex_p;
568 : std::queue<WriteData *> queue_p;
569 : };
570 :
571 :
572 : class AsynchronousInterface {
573 :
574 : //friend class InterfaceController;
575 :
576 : public:
577 :
578 : // make noncopyable...
579 : AsynchronousInterface( const AsynchronousInterface& ) = delete;
580 : AsynchronousInterface& operator=( const AsynchronousInterface& ) = delete;
581 :
582 : AsynchronousInterface (int maxNBuffers);
583 : ~AsynchronousInterface ();
584 :
585 : void addModifier (asyncio::RoviaModifier * modifier);
586 : async::Mutex & getMutex () const;
587 : //async::LockGuard getLockGuard () const;
588 : VlaData * getVlaData ();
589 : VLAT * getVlat ();
590 : WriteQueue & getWriteQueue ();
591 : void initialize ();
592 : casacore::Bool isSweepTerminationRequested () const;
593 : casacore::Bool isLookaheadTerminationRequested () const;
594 : void notifyAllInterfaceChanged () const;
595 : void requestViReset ();
596 : std::pair<casacore::Bool, RoviaModifiers> resetVi ();
597 : void terminate ();
598 : void terminateLookahead ();
599 : void terminateSweep ();
600 : RoviaModifiers transferRoviaModifiers ();
601 : void viResetComplete ();
602 : casacore::Bool viResetRequested ();
603 : void waitForInterfaceChange (async::UniqueLock & uniqueLock) const;
604 :
605 : static casacore::Bool initializeLogging ();
606 : static casacore::Bool logThis (casacore::Int level);
607 :
608 : private:
609 :
610 : mutable async::Condition interfaceDataChanged_p; // Signals interface data has changed
611 : // o VisBuffer consumed
612 : // o Write data queued
613 : // o Sweep or thread termination requested
614 : volatile casacore::Bool lookaheadTerminationRequested_p; // true to request thread termination
615 : mutable async::Mutex mutex_p; // casacore::Mutex protecting access to concurrent data
616 : asyncio::RoviaModifiers roviaModifiers_p;
617 : volatile casacore::Bool sweepTerminationRequested_p; // true to request sweep termination
618 : // (e.g., prior to rewinding
619 : volatile casacore::Bool viResetComplete_p; // VI reset process has completed
620 : volatile casacore::Bool viResetRequested_p; // VI reset has been requested
621 : VlaData vlaData_p; // Lookahead data
622 : VLAT * vlat_p; // Lookahead thread
623 : WriteQueue writeQueue_p; // casacore::Data to be written (writable VIs only)
624 :
625 : static casacore::Bool loggingInitialized_p;
626 : static casacore::Int logLevel_p;
627 : };
628 :
629 : } // end namespace asyncio
630 :
631 : } // end namespace casa
632 :
633 : #endif /* ASYNCHRONOUS_INTERFACE_H_ */
|