Line data Source code
1 : #include "AsynchronousInterface.h"
2 : #include "VLAT.h"
3 :
4 : #include <stdcasa/thread/AsynchronousTools.h>
5 : #include <stdcasa/UtilJ.h>
6 : #include <casacore/casa/System/AipsrcValue.h>
7 : #include <msvis/MSVis/VisBufferAsync.h>
8 : #include <msvis/MSVis/VisibilityIteratorImplAsync.h>
9 :
10 : #include <ostream>
11 : #include <utility>
12 :
13 : using namespace casacore;
14 : using namespace casa::async;
15 : using namespace casacore;
16 : using namespace casa::utilj;
17 : using namespace std;
18 :
19 : #define Log(level, ...) \
20 : {if (AsynchronousInterface::logThis (level)) \
21 : Logger::get()->log (__VA_ARGS__);};
22 :
23 : using casa::async::Mutex;
24 :
25 : using namespace casacore;
26 : namespace casa {
27 :
28 : namespace asyncio {
29 :
30 : Bool AsynchronousInterface::loggingInitialized_p = false;
31 : Int AsynchronousInterface::logLevel_p = -1;
32 :
33 0 : AsynchronousInterface::AsynchronousInterface (int maxNBuffers)
34 : : lookaheadTerminationRequested_p (false),
35 : sweepTerminationRequested_p (false),
36 : viResetComplete_p (false),
37 : viResetRequested_p (false),
38 0 : vlaData_p (maxNBuffers, mutex_p),
39 : vlat_p (NULL),
40 0 : writeQueue_p ()
41 0 : {}
42 :
43 0 : AsynchronousInterface::~AsynchronousInterface ()
44 0 : {}
45 :
46 : void
47 0 : AsynchronousInterface::addModifier (RoviaModifier * modifier)
48 : {
49 0 : Log (1, "AsynchronousInterface::addModifier: {%s}\n", string(*modifier).c_str());
50 :
51 0 : LockGuard lg (mutex_p);
52 :
53 0 : roviaModifiers_p.add (modifier);
54 0 : }
55 :
56 : async::Mutex &
57 0 : AsynchronousInterface::getMutex () const
58 : {
59 0 : return mutex_p;
60 : }
61 :
62 : VlaData *
63 0 : AsynchronousInterface::getVlaData ()
64 : {
65 0 : return & vlaData_p;
66 : }
67 :
68 : VLAT *
69 0 : AsynchronousInterface::getVlat ()
70 : {
71 0 : return vlat_p;
72 : }
73 :
74 : WriteQueue &
75 0 : AsynchronousInterface::getWriteQueue ()
76 : {
77 0 : return writeQueue_p;
78 : }
79 :
80 :
81 :
82 : void
83 0 : AsynchronousInterface::initialize ()
84 : {
85 0 : initializeLogging ();
86 :
87 0 : vlaData_p.initialize (this);
88 :
89 0 : writeQueue_p.initialize (this);
90 :
91 0 : vlat_p = new VLAT (this);
92 0 : }
93 :
94 : Bool
95 0 : AsynchronousInterface::initializeLogging ()
96 : {
97 0 : if (loggingInitialized_p){
98 0 : return true;
99 : }
100 :
101 0 : loggingInitialized_p = true;
102 :
103 : // If the log file variable is defined then start
104 : // up the logger
105 :
106 0 : const String logFileVariable = "Casa_VIA_LogFile";
107 0 : const String logLevelVariable = "Casa_VIA_LogLevel";
108 :
109 0 : String logFilename;
110 0 : Bool logFileFound = AipsrcValue<String>::find (logFilename,
111 0 : ROVisibilityIterator::getAsyncRcBase () + ".debug.logFile",
112 : "");
113 :
114 0 : if (logFileFound &&
115 0 : ! logFilename.empty() &&
116 0 : downcase (logFilename) != "null" &&
117 0 : downcase (logFilename) != "none"){
118 :
119 0 : Logger::get()->start (logFilename.c_str());
120 0 : AipsrcValue<Int>::find (logLevel_p, ROVisibilityIterator::getAsyncRcBase () + ".debug.logLevel", 1);
121 0 : Logger::get()->log ("VlaData log-level is %d; async I/O: %s; nBuffers=%d\n",
122 : logLevel_p,
123 0 : ROVisibilityIterator::isAsynchronousIoEnabled() ? "enabled" : "disabled",
124 : ViReadImplAsync::getDefaultNBuffers() );
125 :
126 0 : return true;
127 :
128 : }
129 :
130 0 : return false;
131 : }
132 :
133 : Bool
134 0 : AsynchronousInterface::isLookaheadTerminationRequested () const
135 : {
136 0 : return lookaheadTerminationRequested_p;
137 : }
138 :
139 :
140 : Bool
141 0 : AsynchronousInterface::isSweepTerminationRequested () const
142 : {
143 0 : return sweepTerminationRequested_p;
144 : }
145 :
146 : Bool
147 0 : AsynchronousInterface::logThis (Int level)
148 : {
149 0 : return loggingInitialized_p && level <= logLevel_p;
150 : }
151 :
152 : void
153 0 : AsynchronousInterface::notifyAllInterfaceChanged () const
154 : {
155 0 : interfaceDataChanged_p.notify_all();
156 0 : }
157 :
158 : void
159 0 : AsynchronousInterface::requestViReset ()
160 : {
161 : // Called by main thread to request that the VI reset to the
162 : // start of the MS.
163 :
164 0 : UniqueLock uniqueLock (mutex_p); // enter critical section
165 :
166 0 : Log (1, "Requesting VI reset\n");
167 :
168 0 : viResetRequested_p = true; // officially request the reset
169 0 : viResetComplete_p = false; // clear any previous completions
170 :
171 0 : terminateSweep ();
172 :
173 : // Wait for the request to be completed.
174 :
175 0 : Log (1, "Waiting for requesting VI reset\n");
176 :
177 0 : while (! viResetComplete_p){
178 0 : interfaceDataChanged_p.wait (uniqueLock);
179 : }
180 :
181 0 : Log (1, "Notified that VI reset has completed\n");
182 :
183 : // The VI was reset
184 0 : }
185 :
186 :
187 :
188 :
189 : void
190 0 : AsynchronousInterface::terminate ()
191 : {
192 : // Destroy the VLAT
193 :
194 0 : vlat_p->terminate(); // request termination
195 0 : vlat_p->join(); // wait for it to terminate
196 0 : delete vlat_p; // free its storage
197 0 : }
198 :
199 : void
200 0 : AsynchronousInterface::terminateLookahead ()
201 : {
202 : // Called by main thread to stop the VLAT, etc.
203 :
204 0 : LockGuard lg (& mutex_p);
205 :
206 0 : lookaheadTerminationRequested_p = true;
207 :
208 0 : terminateSweep();
209 0 : }
210 :
211 : void
212 0 : AsynchronousInterface::terminateSweep ()
213 : {
214 : // Called internally to terminate VI sweeping.
215 :
216 0 : sweepTerminationRequested_p = true; // stop filling
217 :
218 0 : notifyAllInterfaceChanged();
219 0 : }
220 :
221 : RoviaModifiers
222 0 : AsynchronousInterface::transferRoviaModifiers ()
223 : {
224 0 : return roviaModifiers_p.transferModifiers();
225 : }
226 :
227 : void
228 0 : AsynchronousInterface::viResetComplete ()
229 : {
230 : ////Assert (mutex_p.isLockedByThisThread());
231 :
232 0 : viResetRequested_p = false;
233 0 : sweepTerminationRequested_p = false;
234 0 : viResetComplete_p = true;
235 :
236 0 : notifyAllInterfaceChanged();
237 0 : }
238 :
239 : Bool
240 0 : AsynchronousInterface::viResetRequested ()
241 : {
242 : ////Assert (mutex_p.isLockedByThisThread());
243 :
244 0 : return viResetRequested_p;
245 : }
246 :
247 : void
248 0 : AsynchronousInterface::waitForInterfaceChange (async::UniqueLock & uniqueLock) const
249 : {
250 0 : interfaceDataChanged_p.wait (uniqueLock);
251 0 : }
252 :
253 0 : ChannelSelection::ChannelSelection (const Block< Vector<Int> > & blockNGroup,
254 : const Block< Vector<Int> > & blockStart,
255 : const Block< Vector<Int> > & blockWidth,
256 : const Block< Vector<Int> > & blockIncr,
257 0 : const Block< Vector<Int> > & blockSpw)
258 : {
259 0 : blockNGroup_p = blockNGroup;
260 0 : blockStart_p = blockStart;
261 0 : blockWidth_p = blockWidth;
262 0 : blockIncr_p = blockIncr;
263 0 : blockSpw_p = blockSpw;
264 0 : }
265 :
266 0 : ChannelSelection::ChannelSelection (const ChannelSelection & other)
267 : {
268 0 : * this = other;
269 0 : }
270 :
271 : ChannelSelection &
272 0 : ChannelSelection::operator= (const ChannelSelection & other)
273 : {
274 0 : if (this != & other){
275 :
276 0 : copyBlock (other.blockNGroup_p, blockNGroup_p);
277 0 : copyBlock (other.blockStart_p, blockStart_p);
278 0 : copyBlock (other.blockWidth_p, blockWidth_p);
279 0 : copyBlock (other.blockIncr_p, blockIncr_p);
280 0 : copyBlock (other.blockSpw_p, blockSpw_p);
281 :
282 : }
283 :
284 0 : return * this;
285 : }
286 :
287 : void
288 0 : ChannelSelection::copyBlock (const Block <Vector<Int> > & src,
289 : Block <Vector<Int> > & to) const
290 : {
291 : // Since this is a Block of Vector, we need to wipe out
292 : // the original contents of "to"; otherwise the semantics
293 : // of Vector::operator= will generate an exception if there
294 : // is a difference in length of any of the vector elements.
295 :
296 0 : to.resize (0, true);
297 0 : to = src;
298 0 : }
299 :
300 :
301 : void
302 0 : ChannelSelection::get (Block< Vector<Int> > & blockNGroup,
303 : Block< Vector<Int> > & blockStart,
304 : Block< Vector<Int> > & blockWidth,
305 : Block< Vector<Int> > & blockIncr,
306 : Block< Vector<Int> > & blockSpw) const
307 : {
308 0 : copyBlock (blockNGroup_p, blockNGroup);
309 0 : copyBlock (blockStart_p, blockStart);
310 0 : copyBlock (blockWidth_p, blockWidth);
311 0 : copyBlock (blockIncr_p, blockIncr);
312 0 : copyBlock (blockSpw_p, blockSpw);
313 0 : }
314 :
315 : std::ostream &
316 0 : operator<< (std::ostream & o, const RoviaModifier & m)
317 : {
318 0 : m.print (o);
319 :
320 0 : return o;
321 : }
322 :
323 0 : RoviaModifiers::~RoviaModifiers ()
324 : {
325 : // // Free the objects owned by the vector
326 : //
327 : // for (Data::iterator i = data_p.begin(); i != data_p.end(); i++){
328 : // delete (* i);
329 : // }
330 0 : }
331 :
332 : void
333 0 : RoviaModifiers::add (RoviaModifier * modifier)
334 : {
335 0 : data_p.push_back (modifier);
336 0 : }
337 :
338 : void
339 0 : RoviaModifiers::apply (ROVisibilityIterator * rovi)
340 : {
341 : // Free the objects owned by the vector
342 :
343 0 : for (Data::iterator i = data_p.begin(); i != data_p.end(); i++){
344 0 : Log (1, "Applying vi modifier: %s\n", string(** i).c_str());
345 0 : (* i) -> apply (rovi);
346 : }
347 :
348 0 : }
349 :
350 : void
351 0 : RoviaModifiers::clearAndFree ()
352 : {
353 0 : for (Data::iterator i = data_p.begin(); i != data_p.end(); i++){
354 0 : delete (* i);
355 : }
356 :
357 0 : data_p.clear();
358 0 : }
359 :
360 : RoviaModifiers
361 0 : RoviaModifiers::transferModifiers ()
362 : {
363 0 : RoviaModifiers result;
364 :
365 0 : result.data_p.assign (data_p.begin(), data_p.end());
366 :
367 0 : data_p.clear(); // remove them from the other object but do not destroy them
368 :
369 0 : return result;
370 : }
371 :
372 0 : SelectChannelModifier::SelectChannelModifier (Int nGroup, Int start, Int width, Int increment, Int spectralWindow)
373 : : channelBlocks_p (false),
374 : increment_p (increment),
375 : nGroup_p (nGroup),
376 : spectralWindow_p (spectralWindow),
377 : start_p (start),
378 0 : width_p (width)
379 0 : {}
380 :
381 0 : SelectChannelModifier::SelectChannelModifier (const Block< Vector<Int> > & blockNGroup,
382 : const Block< Vector<Int> > & blockStart,
383 : const Block< Vector<Int> > & blockWidth,
384 : const Block< Vector<Int> > & blockIncr,
385 0 : const Block< Vector<Int> > & blockSpw)
386 : : channelBlocks_p (true),
387 0 : channelSelection_p (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw)
388 0 : {}
389 :
390 : void
391 0 : SelectChannelModifier::apply (ROVisibilityIterator * rovi) const
392 : {
393 0 : if (! channelBlocks_p){
394 0 : rovi->selectChannel (nGroup_p, start_p, width_p, increment_p, spectralWindow_p);
395 : }
396 : else{
397 0 : Block< Vector<Int> > blockNGroup;
398 0 : Block< Vector<Int> > blockStart;
399 0 : Block< Vector<Int> > blockWidth;
400 0 : Block< Vector<Int> > blockIncr;
401 0 : Block< Vector<Int> > blockSpw;
402 :
403 0 : channelSelection_p.get (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw);
404 0 : rovi->selectChannel (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw);
405 : }
406 0 : }
407 :
408 : void
409 0 : SelectChannelModifier::print (ostream & os) const
410 : {
411 0 : os << "SelectChannel::{";
412 :
413 0 : if (channelBlocks_p){
414 0 : Block< Vector<Int> > blockNGroup;
415 0 : Block< Vector<Int> > blockStart;
416 0 : Block< Vector<Int> > blockWidth;
417 0 : Block< Vector<Int> > blockIncr;
418 0 : Block< Vector<Int> > blockSpw;
419 :
420 0 : channelSelection_p.get (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw);
421 :
422 0 : os << "nGroup=" << toCsv (blockNGroup)
423 0 : << ", start=" << toCsv (blockStart)
424 0 : << ", width=" << toCsv (blockWidth)
425 0 : << ", increment=" << toCsv (blockIncr)
426 0 : << ", spw=" << toCsv (blockSpw);
427 : }
428 : else {
429 0 : os << "nGroup=" << nGroup_p
430 0 : << ", start=" << start_p
431 0 : << ", width=" << width_p
432 0 : << ", increment=" << increment_p
433 0 : << ", spw=" << spectralWindow_p;
434 : }
435 0 : os << "}";
436 0 : }
437 :
438 : String
439 0 : SelectChannelModifier::toCsv (const Block< Vector<Int> > & bv) const
440 : {
441 0 : String result = "{";
442 :
443 0 : for (Block<Vector<Int> >::const_iterator v = bv.begin(); v != bv.end(); ++ v){
444 0 : if (result.size() != 1)
445 0 : result += ",";
446 :
447 0 : result += "{" + toCsv (* v) + "}";
448 :
449 : }
450 :
451 0 : result += "}";
452 :
453 0 : return result;
454 :
455 : }
456 :
457 : String
458 0 : SelectChannelModifier::toCsv (const Vector<Int> & v) const
459 : {
460 0 : String result = "";
461 0 : for (Vector<Int>::const_iterator i = v.begin(); i != v.end(); ++ i){
462 0 : if (! result.empty())
463 0 : result += ",";
464 0 : result += String::toString (* i);
465 : }
466 :
467 0 : return result;
468 : }
469 :
470 :
471 0 : SelectVelocityModifier::SelectVelocityModifier (Int nChan, const MVRadialVelocity& vStart, const MVRadialVelocity& vInc,
472 0 : MRadialVelocity::Types rvType, MDoppler::Types dType, Bool precise)
473 :
474 : : dType_p (dType),
475 : nChan_p (nChan),
476 : precise_p (precise),
477 : rvType_p (rvType),
478 : vInc_p (vInc),
479 0 : vStart_p (vStart)
480 0 : {}
481 :
482 : void
483 0 : SelectVelocityModifier::apply (ROVisibilityIterator * rovi) const
484 : {
485 0 : rovi-> selectVelocity (nChan_p, vStart_p, vInc_p, rvType_p, dType_p, precise_p);
486 0 : }
487 :
488 : void
489 0 : SelectVelocityModifier::print (std::ostream & os) const
490 : {
491 : os << "SelectVelocity::{"
492 :
493 0 : << "dType=" << dType_p
494 0 : << ",nChan=" << nChan_p
495 0 : << ",precise=" << precise_p
496 0 : << ",rvType=" << rvType_p
497 0 : << ",vInc=" << vInc_p
498 0 : << ",vStart=" << vStart_p
499 0 : << "}";
500 0 : }
501 :
502 0 : SetIntervalModifier::SetIntervalModifier (Double timeInterval)
503 0 : : timeInterval_p (timeInterval)
504 0 : {}
505 :
506 : void
507 0 : SetIntervalModifier::apply (ROVisibilityIterator * rovi) const
508 : {
509 0 : rovi -> setInterval (timeInterval_p);
510 0 : }
511 :
512 : void
513 0 : SetIntervalModifier::print (std::ostream & os) const
514 : {
515 0 : os << "SetInterval::{" << timeInterval_p << "}";
516 0 : }
517 :
518 :
519 :
520 0 : SetRowBlockingModifier::SetRowBlockingModifier (Int nRows)
521 0 : : nRows_p (nRows)
522 0 : {}
523 :
524 : void
525 0 : SetRowBlockingModifier::apply (ROVisibilityIterator * rovi) const
526 : {
527 0 : rovi->setRowBlocking (nRows_p);
528 0 : }
529 :
530 : void
531 0 : SetRowBlockingModifier::print (std::ostream & os) const
532 : {
533 : os << "SetRowBlocking::{"
534 0 : << "nRows=" << nRows_p
535 0 : << ",nGroup=" << nGroup_p
536 0 : << ",spectralWindow=" << spectralWindow_p
537 0 : << ",start=" << start_p
538 0 : << ",width=" << width_p
539 0 : << "}";
540 0 : }
541 :
542 :
543 : // **************************
544 : // * *
545 : // * VlaData Implementation *
546 : // * *
547 : // **************************
548 :
549 : //Semaphore VlaData::debugBlockSemaphore_p (0); // used to block a thread for debugging
550 :
551 0 : VlaData::VlaData (Int maxNBuffers, async::Mutex & mutex)
552 : : MaxNBuffers_p (maxNBuffers),
553 0 : mutex_p (mutex)
554 : {
555 0 : timing_p.fillCycle_p = DeltaThreadTimes (true);
556 0 : timing_p.fillOperate_p = DeltaThreadTimes (true);
557 0 : timing_p.fillWait_p = DeltaThreadTimes (true);
558 0 : timing_p.readCycle_p = DeltaThreadTimes (true);
559 0 : timing_p.readOperate_p = DeltaThreadTimes (true);
560 0 : timing_p.readWait_p = DeltaThreadTimes (true);
561 0 : timing_p.timeStart_p = ThreadTimes();
562 0 : }
563 :
564 0 : VlaData::~VlaData ()
565 : {
566 0 : timing_p.timeStop_p = ThreadTimes();
567 :
568 0 : if (statsEnabled()){
569 0 : Log (1, "VlaData stats:\n%s", makeReport ().c_str());
570 : }
571 :
572 0 : resetBufferData ();
573 0 : }
574 :
575 :
576 : Int
577 0 : VlaData::clock (Int arg, Int base)
578 : {
579 0 : Int r = arg % base;
580 :
581 0 : if (r < 0){
582 0 : r += base;
583 : }
584 :
585 0 : return r;
586 : }
587 :
588 : //void
589 : //VlaData::debugBlock ()
590 : //{
591 : // // Log (1, "VlaData::debugBlock(): Blocked\n");
592 : // //
593 : // // debugBlockSemaphore_p.wait ();
594 : // //
595 : // // Log (1, "VlaData::debugBlock(): Unblocked\n");
596 : //}
597 :
598 : //void
599 : //VlaData::debugUnblock ()
600 : //{
601 : // // int v = debugBlockSemaphore_p.getValue();
602 : // //
603 : // // if (v == 0){
604 : // // Log (1, "VlaData::debugUnblock()\n");
605 : // // debugBlockSemaphore_p.post ();
606 : // // }
607 : // // else
608 : // // Log (1, "VlaData::debugUnblock(): already unblocked; v=%d\n", v);
609 : //}
610 :
611 :
612 : void
613 0 : VlaData::fillComplete (VlaDatum * datum)
614 : {
615 0 : LockGuard lg (mutex_p);
616 :
617 0 : if (statsEnabled()){
618 0 : timing_p.fill3_p = ThreadTimes();
619 0 : timing_p.fillWait_p += timing_p.fill2_p - timing_p.fill1_p;
620 0 : timing_p.fillOperate_p += timing_p.fill3_p - timing_p.fill2_p;
621 0 : timing_p.fillCycle_p += timing_p.fill3_p - timing_p.fill1_p;
622 : }
623 :
624 0 : data_p.push (datum);
625 :
626 0 : Log (2, "VlaData::fillComplete on %s\n", datum->getSubChunkPair ().toString().c_str());
627 :
628 0 : assert ((Int)data_p.size() <= MaxNBuffers_p);
629 :
630 0 : interface_p->notifyAllInterfaceChanged();
631 0 : }
632 :
633 : Bool
634 0 : VlaData::fillCanStart () const
635 : {
636 : // Caller must lock
637 :
638 0 : Bool canStart = (int) data_p.size() < MaxNBuffers_p;
639 :
640 0 : return canStart;
641 : }
642 :
643 :
644 : VlaDatum *
645 0 : VlaData::fillStart (SubChunkPair subchunk, const ThreadTimes & fillStartTime)
646 : {
647 0 : LockGuard lg (mutex_p);
648 :
649 0 : statsEnabled () && (timing_p.fill1_p = fillStartTime, true);
650 :
651 0 : Assert ((int) data_p.size() < MaxNBuffers_p);
652 :
653 0 : VlaDatum * datum = new VlaDatum (subchunk);
654 :
655 0 : Log (2, "VlaData::fillStart on %s\n", datum->getSubChunkPair().toString().c_str());
656 :
657 0 : if (validChunks_p.empty() || validChunks_p.back() != subchunk.chunk ())
658 0 : insertValidChunk (subchunk.chunk ());
659 :
660 0 : insertValidSubChunk (subchunk);
661 :
662 0 : statsEnabled () && (timing_p.fill2_p = ThreadTimes(), true);
663 :
664 0 : if (interface_p->isSweepTerminationRequested()){
665 0 : delete datum;
666 0 : datum = NULL; // datum may not be ready to fill and shouldn't be anyway
667 : }
668 :
669 0 : return datum;
670 : }
671 :
672 : asyncio::ChannelSelection
673 0 : VlaData::getChannelSelection () const
674 : {
675 0 : LockGuard lg (mutex_p);
676 :
677 0 : return channelSelection_p;
678 : }
679 :
680 : void
681 0 : VlaData::initialize (const AsynchronousInterface * interface)
682 : {
683 0 : interface_p = interface;
684 :
685 0 : LockGuard lg (mutex_p);
686 :
687 0 : resetBufferData ();
688 0 : }
689 :
690 :
691 : void
692 0 : VlaData::insertValidChunk (Int chunkNumber)
693 : {
694 : ////Assert (mutex_p.isLockedByThisThread ()); // Caller locks mutex.
695 :
696 0 : validChunks_p.push (chunkNumber);
697 :
698 0 : interface_p->notifyAllInterfaceChanged();
699 0 : }
700 :
701 : void
702 0 : VlaData::insertValidSubChunk (SubChunkPair subchunk)
703 : {
704 : ////Assert (mutex_p.isLockedByThisThread ()); // Caller locks mutex.
705 :
706 0 : validSubChunks_p.push (subchunk);
707 :
708 0 : interface_p->notifyAllInterfaceChanged();
709 0 : }
710 :
711 : //Bool
712 : //VlaData::isSweepTerminationRequested () const
713 : //{
714 : // return sweepTerminationRequested_p;
715 : //}
716 :
717 : Bool
718 0 : VlaData::isValidChunk (Int chunkNumber) const
719 : {
720 0 : bool validChunk = false;
721 :
722 : // Check to see if this is a valid chunk. If the data structure is empty
723 : // then sleep for a tiny bit to allow the VLAT thread to either make more
724 : // chunks available for insert the sentinel value INT_MAX into the data
725 : // structure.
726 :
727 0 : UniqueLock uniqueLock (mutex_p);
728 :
729 0 : do {
730 :
731 0 : while (validChunks_p.empty()){
732 0 : interface_p->waitForInterfaceChange (uniqueLock);
733 : }
734 :
735 0 : while (! validChunks_p.empty() && validChunks_p.front() < chunkNumber){
736 0 : validChunks_p.pop();
737 : }
738 :
739 0 : if (! validChunks_p.empty())
740 0 : validChunk = validChunks_p.front() == chunkNumber;
741 :
742 0 : } while (validChunks_p.empty());
743 :
744 0 : Log (3, "isValidChunk (%d) --> %s\n", chunkNumber, validChunk ? "true" : "false");
745 :
746 0 : return validChunk;
747 : }
748 :
749 : Bool
750 0 : VlaData::isValidSubChunk (SubChunkPair subchunk) const
751 : {
752 0 : SubChunkPair s;
753 :
754 0 : bool validSubChunk = false;
755 :
756 : // Check to see if this is a valid subchunk. If the data structure is empty
757 : // then sleep for a tiny bit to allow the VLAT thread to either make more
758 : // subchunks available for insert the sentinel value (INT_MAX, INT_MAX) into the data
759 : // structure.
760 :
761 0 : UniqueLock uniqueLock (mutex_p);
762 :
763 0 : do {
764 :
765 0 : while (validSubChunks_p.empty()){
766 0 : interface_p->waitForInterfaceChange (uniqueLock);
767 : }
768 :
769 0 : while (! validSubChunks_p.empty() && validSubChunks_p.front() < subchunk){
770 0 : validSubChunks_p.pop();
771 : }
772 :
773 0 : if (! validSubChunks_p.empty())
774 0 : validSubChunk = validSubChunks_p.front() == subchunk;
775 :
776 0 : } while (validSubChunks_p.empty());
777 :
778 0 : Log (3, "isValidSubChunk %s --> %s\n", subchunk.toString().c_str(), validSubChunk ? "true" : "false");
779 :
780 0 : return validSubChunk;
781 : }
782 :
783 : String
784 0 : VlaData::makeReport ()
785 : {
786 0 : String report;
787 :
788 0 : DeltaThreadTimes duration = (timing_p.timeStop_p - timing_p.timeStart_p); // seconds
789 0 : report += String::format ("\nLookahead Stats: nCycles=%d, duration=%.3f sec\n...\n",
790 0 : timing_p.readWait_p.n(), duration.elapsed());
791 0 : report += "...ReadWait: " + timing_p.readWait_p.formatAverage () + "\n";
792 0 : report += "...ReadOperate: " + timing_p.readOperate_p.formatAverage() + "\n";
793 0 : report += "...ReadCycle: " + timing_p.readCycle_p.formatAverage() + "\n";
794 :
795 0 : report += "...FillWait: " + timing_p.fillWait_p.formatAverage() + "\n";
796 0 : report += "...FillOperate: " + timing_p.fillOperate_p.formatAverage () + "\n";
797 0 : report += "...FillCycle: " + timing_p.fillCycle_p.formatAverage () + "\n";
798 :
799 0 : Double syncCycle = timing_p.fillOperate_p.elapsedAvg() + timing_p.readOperate_p.elapsedAvg();
800 0 : Double asyncCycle = max (timing_p.fillCycle_p.elapsedAvg(), timing_p.readCycle_p.elapsedAvg());
801 0 : report += String::format ("...Sync cycle would be %6.1f ms\n", syncCycle * 1000);
802 0 : report += String::format ("...Speedup is %5.1f%%\n", (syncCycle / asyncCycle - 1) * 100);
803 0 : report += String::format ("...Total time savings estimate is %7.3f seconds\n",
804 0 : (syncCycle - asyncCycle) * timing_p.readWait_p.n());
805 :
806 0 : return report;
807 :
808 : }
809 :
810 :
811 : void
812 0 : VlaData::readComplete (SubChunkPair subchunk)
813 : {
814 0 : LockGuard lg (mutex_p);
815 :
816 0 : if (statsEnabled()){
817 0 : timing_p.read3_p = ThreadTimes();
818 0 : timing_p.readWait_p += timing_p.read2_p - timing_p.read1_p;
819 0 : timing_p.readOperate_p += timing_p.read3_p - timing_p.read2_p;
820 0 : timing_p.readCycle_p += timing_p.read3_p - timing_p.read1_p;
821 : }
822 :
823 0 : Log (2, "VlaData::readComplete on %s\n", subchunk.toString().c_str());
824 0 : }
825 :
826 : VisBufferAsync *
827 0 : VlaData::readStart (SubChunkPair subchunk)
828 : {
829 : // Called by main thread
830 :
831 0 : UniqueLock uniqueLock (mutex_p);
832 :
833 0 : statsEnabled () && (timing_p.read1_p = ThreadTimes(), true);
834 :
835 : // Wait for a subchunk's worth of data to be available.
836 :
837 0 : while (data_p.empty()){
838 0 : interface_p->waitForInterfaceChange (uniqueLock);
839 : }
840 :
841 : // Get the data off the queue and notify world of change in VlaData.
842 :
843 0 : VlaDatum * datum = data_p.front();
844 0 : data_p.pop ();
845 0 : interface_p->notifyAllInterfaceChanged();
846 :
847 0 : ThrowIf (! datum->isSubChunk (subchunk),
848 : String::format ("Reader wanted subchunk %s while next subchunk is %s",
849 : subchunk.toString().c_str(), datum->getSubChunkPair().toString().c_str()));
850 :
851 0 : Log (2, "VlaData::readStart on %s\n", subchunk.toString().c_str());
852 :
853 0 : statsEnabled () && (timing_p.read2_p = ThreadTimes(), true);
854 :
855 : // Extract the VisBufferAsync enclosed in the datum for return to caller,
856 : // then destroy the rest of the datum object
857 :
858 0 : VisBufferAsync * vba = datum->releaseVisBufferAsync ();
859 0 : delete datum;
860 0 : return vba;
861 : }
862 :
863 : void
864 0 : VlaData::resetBufferData ()
865 : {
866 : ////Assert (mutex_p.isLockedByThisThread ()); // Caller locks mutex.
867 :
868 : // Flush any accumulated buffers
869 :
870 0 : while (! data_p.empty()){
871 0 : VlaDatum * datum = data_p.front();
872 0 : data_p.pop ();
873 0 : delete datum;
874 : }
875 :
876 : // Flush the chunk and subchunk indices
877 :
878 0 : while (! validChunks_p.empty())
879 0 : validChunks_p.pop();
880 :
881 0 : while (! validSubChunks_p.empty())
882 0 : validSubChunks_p.pop();
883 0 : }
884 :
885 : void
886 0 : VlaData::setNoMoreData ()
887 : {
888 0 : LockGuard lg (mutex_p);
889 :
890 0 : insertValidChunk (INT_MAX);
891 0 : insertValidSubChunk (SubChunkPair::noMoreData ());
892 0 : }
893 :
894 : Bool
895 0 : VlaData::statsEnabled () const
896 : {
897 : // Determines whether asynchronous I/O is enabled by looking for the
898 : // expected AipsRc value. If not found then async i/o is disabled.
899 :
900 : Bool doStats;
901 0 : AipsrcValue<Bool>::find (doStats, ROVisibilityIterator::getAsyncRcBase () + ".doStats", false);
902 :
903 0 : return doStats;
904 : }
905 :
906 : void
907 0 : VlaData::storeChannelSelection (const asyncio::ChannelSelection & channelSelection)
908 : {
909 0 : LockGuard lg (mutex_p);
910 :
911 0 : channelSelection_p = channelSelection;
912 0 : }
913 :
914 :
915 :
916 : // ***************************
917 : // * *
918 : // * VlaDatum Implementation *
919 : // * *
920 : // ***************************
921 :
922 0 : VlaDatum::VlaDatum (SubChunkPair subchunk)
923 : : subchunk_p (subchunk),
924 0 : visBuffer_p (new VisBufferAsync ())
925 0 : {}
926 :
927 0 : VlaDatum::~VlaDatum()
928 : {
929 0 : delete visBuffer_p;
930 0 : }
931 :
932 : SubChunkPair
933 0 : VlaDatum::getSubChunkPair () const
934 : {
935 0 : return subchunk_p;
936 : }
937 :
938 : VisBufferAsync *
939 0 : VlaDatum::getVisBuffer ()
940 : {
941 0 : return visBuffer_p;
942 : }
943 :
944 : //const VisBufferAsync *
945 : //VlaDatum::getVisBuffer () const
946 : //{
947 : // assert (state_p == Filling || state_p == Reading);
948 : //
949 : // return visBuffer_p;
950 : //}
951 :
952 : Bool
953 0 : VlaDatum::isSubChunk (SubChunkPair subchunk) const
954 : {
955 0 : return subchunk == subchunk_p;
956 : }
957 :
958 : VisBufferAsync *
959 0 : VlaDatum::releaseVisBufferAsync ()
960 : {
961 0 : VisBufferAsync * vba = visBuffer_p;
962 0 : visBuffer_p = NULL;
963 :
964 0 : return vba;
965 : }
966 :
967 0 : WriteQueue::WriteQueue ()
968 0 : : interface_p (NULL)
969 0 : {}
970 :
971 0 : WriteQueue::~WriteQueue ()
972 : {
973 0 : Assert (queue_p.empty());
974 0 : }
975 :
976 : WriteData *
977 0 : WriteQueue::dequeue ()
978 : {
979 0 : LockGuard lg (mutex_p);
980 :
981 0 : WriteData * result = NULL;
982 :
983 0 : if (! empty (true)){
984 :
985 0 : result = queue_p.front(); // get the first value
986 0 : queue_p.pop(); // remove it from the queue
987 : }
988 :
989 0 : return result;
990 : }
991 :
992 : Bool
993 0 : WriteQueue::empty (Bool alreadyLocked)
994 : {
995 : Bool isEmpty;
996 :
997 0 : if (alreadyLocked){
998 0 : isEmpty = queue_p.empty();
999 : }
1000 : else {
1001 0 : LockGuard lg (mutex_p);
1002 0 : isEmpty = queue_p.empty();
1003 : }
1004 :
1005 0 : return isEmpty;
1006 : }
1007 :
1008 : void
1009 0 : WriteQueue::enqueue (WriteData * writeData)
1010 : {
1011 0 : Assert (writeData != NULL);
1012 :
1013 0 : LockGuard lg (mutex_p);
1014 :
1015 0 : queue_p.push (writeData);
1016 :
1017 0 : interface_p->notifyAllInterfaceChanged ();
1018 0 : }
1019 :
1020 : void
1021 0 : WriteQueue::initialize (const AsynchronousInterface * interface)
1022 : {
1023 0 : interface_p = interface;
1024 0 : }
1025 :
1026 : } // end namespace asyncio
1027 :
1028 : using namespace casacore;
1029 : } // end namespace casa
|