Line data Source code
1 : /*
2 : * VisibilityIteratorAsync.cc
3 : *
4 : * Created on: Nov 2, 2010
5 : * Author: jjacobs
6 : */
7 :
8 : #include <msvis/MSVis/VisibilityIterator.h>
9 : #include <msvis/MSVis/VisibilityIteratorImplAsync.h>
10 : #include <msvis/MSVis/VisBufferAsync.h>
11 : #include <msvis/MSVis/VisBufferAsyncWrapper.h>
12 : #include <msvis/MSVis/VLAT.h>
13 : #include <msvis/MSVis/AsynchronousInterface.h>
14 : #include <stdcasa/UtilJ.h>
15 : #include <casacore/casa/Utilities/GenSort.h>
16 :
17 : #include <casacore/ms/MeasurementSets/MSColumns.h>
18 : #include <casacore/casa/System/AipsrcValue.h>
19 :
20 : #include <algorithm>
21 : #include <cstdarg>
22 : #include <ostream>
23 :
24 : using namespace std;
25 :
26 : using namespace casacore;
27 : using namespace casa::utilj;
28 :
29 : using namespace casacore;
30 : using namespace casa::asyncio;
31 :
32 : #define Log(level, ...) \
33 : {if (VlaData::loggingInitialized_p && level <= VlaData::logLevel_p) \
34 : Logger::get()->log (__VA_ARGS__);};
35 :
36 : #define NotImplemented Throw ("VisibilityIteratorReadImplAsync: Method not implemented!");
37 :
38 : #define VIWIA_NotImplemented() \
39 : Throw ("ViWriteImplAsync --> Operation not permitted!\n" \
40 : "Modify VisBuffer, mark components as dirty and use VisibilityIterator::writeBack.");
41 :
42 : #define ThrowIfNoVbaAttached() \
43 : ThrowIf (visBufferAsync_p == 0, "No VisBufferAsync attached to VI; try doing vi.origin() first.");
44 :
45 :
46 : using namespace casacore;
47 : namespace casa {
48 :
49 :
50 0 : ViReadImplAsync::ViReadImplAsync (const Block<MeasurementSet> & mss,
51 : const PrefetchColumns & prefetchColumns,
52 : const Block<Int> & sortColumns,
53 : const Bool addDefaultSortCols,
54 : Double timeInterval,
55 0 : Bool writable)
56 : : visBufferAsync_p (NULL),
57 : vlaData_p (NULL),
58 0 : vlat_p (NULL)
59 : {
60 0 : construct (mss, prefetchColumns, sortColumns, addDefaultSortCols,
61 : timeInterval, writable);
62 0 : }
63 :
64 0 : ViReadImplAsync::ViReadImplAsync (const PrefetchColumns & prefetchColumns,
65 : const VisibilityIteratorReadImpl & other,
66 0 : Bool writable)
67 : : visBufferAsync_p (NULL),
68 : vlaData_p (NULL),
69 0 : vlat_p (NULL)
70 : {
71 0 : Block<MeasurementSet> mss (other.measurementSets_p.size());
72 :
73 0 : for (int i = 0; i < (Int) other.measurementSets_p.size(); i++){
74 0 : mss [i] = other.measurementSets_p [i];
75 : }
76 :
77 0 : construct (mss, prefetchColumns, other.sortColumns_p,
78 0 : other.addDefaultSort_p, other.timeInterval_p, writable);
79 :
80 0 : imwgt_p = other.imwgt_p;
81 :
82 : // Pass over to the VLAT some of the VI modifiers if they are
83 : // not at the default value.
84 :
85 0 : Bool needViReset = false;
86 :
87 0 : if (other.timeInterval_p != 0){
88 0 : setInterval (other.timeInterval_p);
89 0 : needViReset = true;
90 : }
91 :
92 0 : if (other.nRowBlocking_p != 0){
93 0 : setRowBlocking (other.nRowBlocking_p);
94 0 : needViReset = true;
95 : }
96 :
97 0 : if (other.msChannels_p.nGroups_p.nelements() != 0){
98 0 : selectChannel (other.msChannels_p.nGroups_p,
99 0 : other.msChannels_p.start_p,
100 0 : other.msChannels_p.width_p,
101 0 : other.msChannels_p.inc_p,
102 0 : other.msChannels_p.spw_p);
103 0 : needViReset = true;
104 : }
105 :
106 0 : if (needViReset){
107 0 : originChunks ();
108 : }
109 0 : }
110 :
111 :
112 0 : ViReadImplAsync::~ViReadImplAsync ()
113 : {
114 0 : if (! vbaWrapperStack_p.empty()){
115 : VisBufferAsync * vba =
116 0 : vbaWrapperStack_p.top()->releaseVba ();
117 0 : assert (vba == visBufferAsync_p);
118 : UnusedVariable (vba);
119 0 : vba = NULL; // prevent warning when in non425debug build
120 0 : delete visBufferAsync_p;
121 : }
122 0 : else if (visBufferAsync_p != NULL){
123 0 : delete visBufferAsync_p;
124 : }
125 :
126 0 : interface_p->terminate ();
127 0 : delete interface_p;
128 0 : }
129 :
130 : void
131 0 : ViReadImplAsync::advance ()
132 : {
133 : //readComplete (); // complete any pending read
134 :
135 0 : subchunk_p.incrementSubChunk ();
136 :
137 0 : fillVisBuffer ();
138 0 : }
139 :
140 : Bool
141 0 : ViReadImplAsync::allBeamOffsetsZero () const
142 : {
143 0 : ThrowIf (visBufferAsync_p == NULL, "No attached VisBufferAsync");
144 :
145 0 : return visBufferAsync_p -> getAllBeamOffsetsZero ();
146 : }
147 :
148 : const Vector<String> &
149 0 : ViReadImplAsync::antennaMounts () const
150 : {
151 0 : ThrowIf (visBufferAsync_p == NULL, "No attached VisBufferAsync");
152 :
153 0 : return visBufferAsync_p -> getAntennaMounts ();
154 : }
155 :
156 :
157 : void
158 0 : ViReadImplAsync::attachVisBuffer (VisBuffer & vb0)
159 : {
160 0 : VisBufferAsyncWrapper * vb = dynamic_cast<VisBufferAsyncWrapper *> (& vb0);
161 0 : ThrowIf (vb == NULL, "Attempt to attach other than VisBufferAsyncWrapper");
162 :
163 0 : if (! vbaWrapperStack_p.empty()){
164 0 : vbaWrapperStack_p.top () -> releaseVba ();
165 : }
166 :
167 0 : vbaWrapperStack_p.push (vb);
168 :
169 0 : if (visBufferAsync_p != NULL){
170 0 : vb->wrap (visBufferAsync_p);
171 : }
172 0 : }
173 :
174 : ViReadImplAsync::PrefetchColumns
175 0 : ViReadImplAsync::augmentPrefetchColumns (const PrefetchColumns & prefetchColumnsBase)
176 : {
177 : // Augment the list of prefetch columns with any that are implied
178 : // by the others. N.B., be wary of reordering these.
179 :
180 0 : PrefetchColumns prefetchColumns = prefetchColumnsBase;
181 :
182 0 : if (contains (VisBufferComponents::Direction1, prefetchColumns)){
183 0 : prefetchColumns.insert (VisBufferComponents::AllBeamOffsetsZero);
184 0 : prefetchColumns.insert (VisBufferComponents::AntennaMounts);
185 0 : prefetchColumns.insert (VisBufferComponents::Feed1_pa);
186 0 : prefetchColumns.erase (VisBufferComponents::Direction1);
187 : }
188 :
189 0 : if (contains (VisBufferComponents::Direction2, prefetchColumns)){
190 0 : prefetchColumns.insert (VisBufferComponents::AllBeamOffsetsZero);
191 0 : prefetchColumns.insert (VisBufferComponents::AntennaMounts);
192 0 : prefetchColumns.insert (VisBufferComponents::Feed2_pa);
193 0 : prefetchColumns.erase (VisBufferComponents::Direction2);
194 : }
195 :
196 0 : if (contains (VisBufferComponents::Feed1_pa, prefetchColumns)){
197 0 : prefetchColumns.insert (VisBufferComponents::Ant1);
198 0 : prefetchColumns.insert (VisBufferComponents::Feed1);
199 0 : prefetchColumns.insert (VisBufferComponents::PhaseCenter);
200 0 : prefetchColumns.insert (VisBufferComponents::ReceptorAngles);
201 0 : prefetchColumns.insert (VisBufferComponents::Time);
202 0 : prefetchColumns.insert (VisBufferComponents::TimeInterval);
203 0 : prefetchColumns.erase (VisBufferComponents::Feed1_pa);
204 : }
205 :
206 0 : if (contains (VisBufferComponents::Feed2_pa, prefetchColumns)){
207 0 : prefetchColumns.insert (VisBufferComponents::Ant2);
208 0 : prefetchColumns.insert (VisBufferComponents::Feed2);
209 0 : prefetchColumns.insert (VisBufferComponents::PhaseCenter);
210 0 : prefetchColumns.insert (VisBufferComponents::ReceptorAngles);
211 0 : prefetchColumns.insert (VisBufferComponents::Time);
212 0 : prefetchColumns.insert (VisBufferComponents::TimeInterval);
213 0 : prefetchColumns.erase (VisBufferComponents::Feed2_pa);
214 : }
215 :
216 0 : return prefetchColumns;
217 : }
218 :
219 : const Cube<RigidVector<Double, 2> >&
220 0 : ViReadImplAsync::getBeamOffsets () const
221 : {
222 0 : ThrowIf (visBufferAsync_p == NULL, "No attached VisBufferAsync");
223 :
224 0 : return visBufferAsync_p -> getBeamOffsets ();
225 : }
226 :
227 :
228 :
229 : VisibilityIteratorReadImpl *
230 0 : ViReadImplAsync::clone () const
231 : {
232 0 : Throw ("ViReadImplAsync: cloning not permitted!!!");
233 : }
234 :
235 : void
236 0 : ViReadImplAsync::construct(const Block<MeasurementSet> & mss,
237 : const PrefetchColumns & prefetchColumns,
238 : const Block<Int> & sortColumns,
239 : const Bool addDefaultSortCols,
240 : Double timeInterval,
241 : Bool writable)
242 : {
243 0 : AsynchronousInterface::initializeLogging();
244 :
245 0 : casa::async::Logger::get()->registerName ("Main");
246 :
247 0 : visBufferAsync_p = NULL;
248 :
249 0 : for (uint i = 0; i < mss.size(); i++){
250 0 : measurementSets_p.push_back (mss [i]);
251 : }
252 :
253 0 : msId_p = -1;
254 :
255 : // Create and initialize the Asynchronous Interface
256 :
257 0 : Int nReadAheadBuffers = getDefaultNBuffers ();
258 :
259 0 : interface_p = new AsynchronousInterface (nReadAheadBuffers);
260 0 : interface_p->initialize();
261 :
262 0 : vlaData_p = interface_p->getVlaData ();
263 0 : vlat_p = interface_p->getVlat ();
264 :
265 : // Augment the list of prefetch columns with any that are implied
266 : // by the others. N.B., be wary of reordering these.
267 :
268 0 : prefetchColumns_p = augmentPrefetchColumns (prefetchColumns);
269 :
270 : // Get the VLAT going
271 :
272 0 : vlat_p->setPrefetchColumns (prefetchColumns_p);
273 :
274 : // If this is a writable VI then let the write implementation handle the
275 : // initialization of the VLAT.
276 :
277 0 : vlat_p->initialize (mss, sortColumns, addDefaultSortCols, timeInterval, writable);
278 0 : vlat_p->startThread ();
279 0 : }
280 :
281 : void
282 0 : ViReadImplAsync::detachVisBuffer (VisBuffer & vb0)
283 : {
284 0 : VisBufferAsyncWrapper * vb = dynamic_cast<VisBufferAsyncWrapper *> (& vb0);
285 :
286 0 : ThrowIf (vb == NULL, "Attempt to detach other than a VisBufferAsyncWrapper");
287 :
288 0 : if (vb == vbaWrapperStack_p.top()){
289 :
290 : // Get rid of the old buffer
291 :
292 0 : VisBufferAsync * vba = vb->releaseVba ();
293 0 : Assert (vba == visBufferAsync_p);
294 : UnusedVariable (vba); // prevent release build warning
295 :
296 0 : vbaWrapperStack_p.pop ();
297 :
298 : // If there is still a VB attached either fill it with the
299 : // current values for the VI position or clear it.
300 :
301 0 : if (! vbaWrapperStack_p.empty()){
302 :
303 0 : if (visBufferAsync_p != NULL){
304 0 : vbaWrapperStack_p.top() -> wrap (visBufferAsync_p);
305 : }
306 : }
307 :
308 : } else {
309 0 : Throw ("ViReadImplAsync::detachVisBuffer: VisBufferAsync not attached ");
310 : }
311 0 : }
312 :
313 : void
314 0 : ViReadImplAsync::dumpPrefetchColumns () const
315 : {
316 0 : int i = 0;
317 0 : for (PrefetchColumns::const_iterator c = prefetchColumns_p.begin();
318 0 : c != prefetchColumns_p.end();
319 0 : c ++){
320 :
321 0 : cerr << PrefetchColumns::columnName (*c) << ", ";
322 0 : i ++;
323 0 : if (i == 10){
324 0 : cerr << endl;
325 0 : i = 0;
326 : }
327 :
328 : }
329 :
330 0 : if (i != 0)
331 0 : cerr << endl;
332 :
333 0 : }
334 :
335 : void
336 0 : ViReadImplAsync::fillVisBuffer()
337 : {
338 : // Get the next buffer from the lookahead buffer ring.
339 : // This could block if the next buffer is not ready.
340 : // Before doing the fill check to see that there's more data.
341 :
342 0 : if (more()){
343 :
344 0 : readComplete ();
345 :
346 0 : visBufferAsync_p = vlaData_p->readStart (subchunk_p);
347 :
348 0 : Assert (visBufferAsync_p != NULL);
349 :
350 0 : msId_p = visBufferAsync_p->msId ();
351 :
352 : // If a VisBufferAsync is attached, then copy the prefetched VisBuffer into it.
353 :
354 0 : if (! vbaWrapperStack_p.empty ()){
355 :
356 0 : vbaWrapperStack_p.top() -> wrap (visBufferAsync_p);
357 : }
358 : }
359 0 : }
360 :
361 : //Vector<MDirection>
362 : //ViReadImplAsync::fillAzel(Double time) const
363 : //{
364 : // NotImplemented;
365 : //}
366 :
367 :
368 :
369 :
370 : //void
371 : //ViReadImplAsync::fillVisBuffer ()
372 : //{
373 : // vlaDatum_p = impl_p->getVlaData()->getDatum (subchunk_p);
374 : //
375 : // ThrowIf (datum == NULL, format("Failed to get datum for subchunk (%d, %d)", subchunk_p));
376 : //
377 : // if (visBuffer_p != NULL){
378 : // visBuffer_p->setRealBuffer (vlaDatum_p->getVisBuffer());
379 : // }
380 : //}
381 :
382 : void
383 0 : ViReadImplAsync::getChannelSelection(Block< Vector<Int> >& blockNGroup,
384 : Block< Vector<Int> >& blockStart,
385 : Block< Vector<Int> >& blockWidth,
386 : Block< Vector<Int> >& blockIncr,
387 : Block< Vector<Int> >& blockSpw){
388 :
389 0 : asyncio::ChannelSelection channelSelection = vlaData_p->getChannelSelection ();
390 0 : channelSelection.get (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw);
391 0 : }
392 :
393 : int
394 0 : ViReadImplAsync::getDefaultNBuffers ()
395 : {
396 : int nBuffers;
397 0 : casacore::AipsrcValue<Int>::find (nBuffers, ROVisibilityIterator::getAsyncRcBase () + ".nBuffers", 2);
398 :
399 0 : return nBuffers;
400 : }
401 :
402 : MEpoch
403 0 : ViReadImplAsync::getEpoch () const
404 : {
405 0 : ThrowIf (visBufferAsync_p == NULL, "No attached VisBufferAsync");
406 :
407 0 : return visBufferAsync_p -> mEpoch_p;
408 : }
409 :
410 : //ViReadImplAsync::PrefetchColumns
411 : //ViReadImplAsync::getPrefetchColumns () const
412 : //{
413 : // return prefetchColumns_p;
414 : //}
415 : const MeasurementSet &
416 0 : ViReadImplAsync::getMs() const
417 : {
418 0 : ThrowIfNoVbaAttached ();
419 :
420 0 : return visBufferAsync_p->getMs ();
421 : }
422 :
423 :
424 : VisBuffer *
425 0 : ViReadImplAsync::getVisBuffer ()
426 : {
427 : // Returns the currently attached VisBufferAsync or NULL if none attached
428 :
429 0 : VisBuffer * vb = (! vbaWrapperStack_p.empty()) ? vbaWrapperStack_p.top() : NULL;
430 0 : return vb;
431 : }
432 :
433 : //void
434 : //ViReadImplAsync::linkWithRovi (VisibilityIteratorReadImpl * rovi)
435 : //{
436 : // linkedVisibilityIterator_p = rovi;
437 : //}
438 :
439 :
440 : Bool
441 0 : ViReadImplAsync::more () const
442 : {
443 : // Returns true if the lookahead data structure has the next subchunk.
444 :
445 0 : Bool b = vlaData_p->isValidSubChunk (subchunk_p);
446 :
447 0 : return b;
448 : }
449 :
450 : Bool
451 0 : ViReadImplAsync::moreChunks () const
452 : {
453 : // Returns true if the looahead data structure has the first subchunk of the
454 : // next chunk.
455 :
456 0 : Bool b = vlaData_p->isValidChunk (subchunk_p.chunk());
457 :
458 0 : return b;
459 : }
460 :
461 : const MSColumns &
462 0 : ViReadImplAsync::msColumns() const
463 : {
464 0 : ThrowIfNoVbaAttached ();
465 :
466 0 : return visBufferAsync_p->msColumns();
467 : }
468 :
469 :
470 : Int
471 0 : ViReadImplAsync::msId() const
472 : {
473 0 : ThrowIf (msId_p < 0, "MS ID value not currently known.");
474 :
475 0 : return msId_p;
476 : }
477 :
478 :
479 : ViReadImplAsync &
480 0 : ViReadImplAsync::nextChunk ()
481 : {
482 : // Terminates the current read and advances the state of this
483 : // object to expect to access the first subchunk of the next
484 : // chunk
485 :
486 0 : subchunk_p.incrementChunk ();
487 :
488 0 : if (moreChunks()){
489 :
490 0 : readComplete (); // complete any pending read
491 :
492 : }
493 :
494 0 : return * this;
495 : }
496 :
497 : Int
498 0 : ViReadImplAsync::nRowChunk() const
499 : {
500 0 : ThrowIfNoVbaAttached ();
501 :
502 0 : return visBufferAsync_p->nRowChunk ();
503 : }
504 :
505 :
506 : Int
507 0 : ViReadImplAsync::numberSpw()
508 : {
509 0 : ThrowIfNoVbaAttached ();
510 :
511 0 : return visBufferAsync_p->getNSpw ();
512 : }
513 :
514 :
515 : void
516 0 : ViReadImplAsync::origin ()
517 : {
518 : // Terminates the current read and
519 :
520 0 : readComplete (); // complete any pending read
521 :
522 0 : subchunk_p.resetSubChunk();
523 :
524 0 : fillVisBuffer ();
525 :
526 0 : updateMsd ();
527 0 : }
528 :
529 : void
530 0 : ViReadImplAsync::originChunks ()
531 : {
532 0 : readComplete (); // complete any pending read
533 :
534 0 : subchunk_p.resetToOrigin ();
535 :
536 0 : interface_p->requestViReset ();
537 0 : }
538 :
539 : void
540 0 : ViReadImplAsync::readComplete()
541 : {
542 0 : if (visBufferAsync_p != NULL){
543 :
544 : // A buffer in the buffer ring was in use: clean up
545 :
546 0 : if (! vbaWrapperStack_p.empty()){
547 :
548 : // Break connection between our VisBufferAsync and
549 : // the shared data
550 :
551 0 : vbaWrapperStack_p.top()->releaseVba ();
552 : }
553 :
554 : // Clear the pointer to the shared buffer to indicate
555 : // internally that the read is complete
556 :
557 0 : delete visBufferAsync_p;
558 0 : visBufferAsync_p = NULL;
559 :
560 : // Inform the buffer ring that the read is complete.
561 :
562 0 : vlaData_p->readComplete (subchunk_p);
563 : }
564 0 : }
565 :
566 : void
567 0 : ViReadImplAsync::updateMsd ()
568 : {
569 0 : ThrowIf (visBufferAsync_p == NULL, "No attached VisBufferAsync");
570 :
571 0 : msd_p.setAntennas (visBufferAsync_p->msColumns().antenna());
572 :
573 0 : MDirection phaseCenter = visBufferAsync_p -> getPhaseCenter();
574 0 : msd_p.setFieldCenter (phaseCenter);
575 0 : }
576 :
577 :
578 : const Cube<Double>&
579 0 : ViReadImplAsync::receptorAngles() const
580 : {
581 0 : ThrowIf (visBufferAsync_p == NULL, "No attached VisBufferAsync");
582 :
583 0 : return visBufferAsync_p -> getReceptorAngles ();
584 : }
585 :
586 :
587 : void
588 0 : ViReadImplAsync::saveMss (const Block<MeasurementSet> & mss)
589 : {
590 0 : measurementSets_p.clear();
591 0 : for (uint i = 0; i < mss.size(); i++){
592 0 : measurementSets_p.push_back (mss [i]);
593 : }
594 0 : }
595 :
596 : void
597 0 : ViReadImplAsync::saveMss (const MeasurementSet & ms)
598 : {
599 0 : measurementSets_p.clear();
600 0 : measurementSets_p.push_back (ms);
601 0 : }
602 :
603 : VisibilityIteratorReadImpl&
604 0 : ViReadImplAsync::selectVelocity (Int nChan,
605 : const MVRadialVelocity& vStart,
606 : const MVRadialVelocity& vInc,
607 : MRadialVelocity::Types rvType,
608 : MDoppler::Types dType,
609 : Bool precise)
610 : {
611 0 : SelectVelocityModifier * svm = new SelectVelocityModifier (nChan, vStart, vInc, rvType, dType, precise);
612 :
613 0 : interface_p->addModifier (svm); // ownership is transferred by this call
614 :
615 0 : originChunks ();
616 :
617 0 : return * this;
618 : }
619 :
620 : VisibilityIteratorReadImpl&
621 0 : ViReadImplAsync::selectChannel(Int nGroup,
622 : Int start,
623 : Int width,
624 : Int increment,
625 : Int spectralWindow)
626 : {
627 0 : SelectChannelModifier * scm = new SelectChannelModifier (nGroup, start, width, increment, spectralWindow);
628 :
629 0 : interface_p->addModifier (scm);
630 : // ownership is transferred by this call
631 :
632 0 : return * this;
633 : }
634 :
635 : VisibilityIteratorReadImpl&
636 0 : ViReadImplAsync::selectChannel(const Block< Vector<Int> >& blockNGroup,
637 : const Block< Vector<Int> >& blockStart,
638 : const Block< Vector<Int> >& blockWidth,
639 : const Block< Vector<Int> >& blockIncr,
640 : const Block< Vector<Int> >& blockSpw)
641 : {
642 : SelectChannelModifier * scm = new SelectChannelModifier (blockNGroup, blockStart, blockWidth,
643 0 : blockIncr, blockSpw);
644 :
645 0 : interface_p->addModifier (scm);
646 : // ownership is transferred by this call
647 :
648 0 : msChannels_p.nGroups_p = blockNGroup;
649 0 : msChannels_p.start_p = blockStart;
650 0 : msChannels_p.width_p = blockWidth;
651 0 : msChannels_p.inc_p = blockIncr;
652 0 : msChannels_p.spw_p = blockSpw;
653 :
654 0 : return * this;
655 : }
656 :
657 : void
658 0 : ViReadImplAsync::setInterval (Double timeInterval)
659 : {
660 0 : SetIntervalModifier * sim = new SetIntervalModifier (timeInterval);
661 :
662 0 : interface_p->addModifier (sim);
663 0 : }
664 :
665 :
666 : void
667 0 : ViReadImplAsync::setRowBlocking(Int nRow)
668 : {
669 0 : SetRowBlockingModifier * srbm = new SetRowBlockingModifier (nRow);
670 :
671 0 : interface_p->addModifier (srbm);
672 0 : }
673 :
674 : //void
675 : //ViReadImplAsync::startVlat ()
676 : //{
677 : // vlat_p->startThread ();
678 : //}
679 :
680 : namespace asyncio {
681 :
682 : PrefetchColumns
683 0 : PrefetchColumns::operator+ (const PrefetchColumns & other)
684 : {
685 : // Form and return the union
686 :
687 0 : PrefetchColumns result;
688 0 : result.insert (begin(), end());
689 :
690 0 : for (const_iterator o = other.begin(); o != other.end(); o++){
691 :
692 0 : result.insert (* o);
693 : }
694 :
695 0 : return result;
696 : }
697 :
698 : } // end namespace asyncio
699 :
700 : //void
701 : //ViReadImplAsync::setPrefetchColumns (const PrefetchColumns & columns) const
702 : //{
703 : //
704 : //}
705 :
706 :
707 : //VisibilityIteratorReadImpl&
708 : //ViReadImplAsync::selectChannel(Int nGroup=1,
709 : // Int start=0,
710 : // Int width=0,
711 : // Int increment=1,
712 : // Int spectralWindow=-1)
713 : //{
714 : // VisibilityIteratorReadImpl & rovi = VisibilityIterator::selectChannel (nGroup, start, width, increment, spectralWindow);
715 : //
716 : // ThrowIf (visBuffer_p == NULL, "No VisBufferAsync attached");
717 : //
718 : // visBuffer_p->setBlockSpw (blockSpw_p);
719 : //
720 : // return rovi;
721 : //}
722 :
723 : //VisibilityIteratorReadImpl&
724 : //ViReadImplAsync::selectChannel(Block< Vector<Int> >& blockNGroup,
725 : // Block< Vector<Int> >& blockStart,
726 : // Block< Vector<Int> >& blockWidth,
727 : // Block< Vector<Int> >& blockIncr,
728 : // Block< Vector<Int> >& blockSpw)
729 : //{
730 : // VisibilityIteratorReadImpl & rovi = VisibilityIterator::selectChannel (blockNGroup, blockStart, blockWidth, blockIncr, blockSpw);
731 : //
732 : // ThrowIf (visBuffer_p == NULL, "No VisBufferAsync attached");
733 : //
734 : // visBuffer_p->setBlockSpw (blockSpw_p);
735 : //
736 : // return rovi;
737 : //}
738 :
739 0 : ViWriteImplAsync::ViWriteImplAsync (VisibilityIterator * vi)
740 0 : : VisibilityIteratorWriteImpl (vi)
741 : {
742 0 : }
743 :
744 : //ViWriteImplAsync::ViWriteImplAsync (VisibilityIterator * vi)
745 : // : VisibilityIteratorWriteImpl (vi)
746 : //{
747 : //}
748 :
749 0 : ViWriteImplAsync::ViWriteImplAsync (const PrefetchColumns & /*prefetchColumns*/,
750 : const VisibilityIteratorWriteImpl & /*other*/,
751 0 : VisibilityIterator * vi)
752 0 : : VisibilityIteratorWriteImpl (vi)
753 0 : {}
754 :
755 :
756 0 : ViWriteImplAsync::~ViWriteImplAsync ()
757 0 : {}
758 :
759 : VisibilityIteratorWriteImpl *
760 0 : ViWriteImplAsync::clone () const
761 : {
762 0 : Throw ("ViWriteImplAsync: cloning not permitted!!!");
763 : }
764 :
765 : ViReadImplAsync *
766 0 : ViWriteImplAsync::getReadImpl()
767 : {
768 0 : return dynamic_cast <ViReadImplAsync *> (VisibilityIteratorWriteImpl::getReadImpl());
769 : }
770 :
771 : void
772 0 : ViWriteImplAsync::putModel(const RecordInterface& rec, Bool iscomponentlist, Bool incremental)
773 : {
774 : //visBufferAsync_p;
775 : //Throw ("ViWriteImplAsync::putModel not implemented!!!");
776 :
777 0 : Vector<Int> fields = getReadImpl()->msColumns().fieldId().getColumn();
778 0 : const Int option = Sort::HeapSort | Sort::NoDuplicates;
779 0 : const Sort::Order order = Sort::Ascending;
780 :
781 0 : Int nfields = GenSort<Int>::sort (fields, order, option);
782 :
783 : // Make sure we have the right size
784 :
785 0 : fields.resize(nfields, true);
786 0 : Int msid = getReadImpl()->msId();
787 :
788 0 : Vector<Int> spws = getReadImpl()->msChannels_p.spw_p[msid];
789 0 : Vector<Int> starts = getReadImpl()->msChannels_p.start_p[msid];
790 0 : Vector<Int> nchan = getReadImpl()->msChannels_p.width_p[msid];
791 0 : Vector<Int> incr = getReadImpl()->msChannels_p.inc_p[msid];
792 :
793 0 : CountedPtr<VisModelDataI> visModelData = VisModelDataI::create();
794 :
795 0 : visModelData->putModelI (getReadImpl()->getMs (), rec, fields, spws, starts, nchan, incr,
796 0 : iscomponentlist, incremental);
797 :
798 0 : }
799 :
800 :
801 :
802 : void
803 0 : ViWriteImplAsync::setFlag(const Matrix<Bool>& flag)
804 : {
805 0 : AsynchronousInterface * interface = getReadImpl()->interface_p;
806 0 : WriteQueue & writeQueue = interface->getWriteQueue();
807 0 : SubChunkPair subchunk = getReadImpl()->getSubchunkId ();
808 :
809 0 : writeQueue.enqueue (createWriteData (subchunk, flag, & VisibilityIterator::setFlag));
810 0 : }
811 :
812 : void
813 0 : ViWriteImplAsync::setFlag(const Cube<Bool>& flag)
814 : {
815 0 : AsynchronousInterface * interface = getReadImpl()->interface_p;
816 0 : WriteQueue & writeQueue = interface->getWriteQueue();
817 :
818 0 : writeQueue.enqueue (createWriteData (getReadImpl()->getSubchunkId (),
819 : flag,
820 : & VisibilityIterator::setFlag));
821 0 : }
822 :
823 : void
824 0 : ViWriteImplAsync::setFlagCategory (const Array<Bool> & flagCategory)
825 : {
826 0 : AsynchronousInterface * interface = getReadImpl()->interface_p;
827 0 : WriteQueue & writeQueue = interface->getWriteQueue();
828 :
829 0 : writeQueue.enqueue (createWriteData (getReadImpl()->getSubchunkId (),
830 : flagCategory,
831 : & VisibilityIterator::setFlagCategory));
832 0 : }
833 :
834 :
835 : void
836 0 : ViWriteImplAsync::setFlagRow(const Vector<Bool>& rowflags)
837 : {
838 0 : AsynchronousInterface * interface = getReadImpl()->interface_p;
839 0 : WriteQueue & writeQueue = interface->getWriteQueue();
840 :
841 0 : writeQueue.enqueue (createWriteData (getReadImpl()->getSubchunkId (),
842 : rowflags,
843 : & VisibilityIterator::setFlagRow));
844 0 : }
845 :
846 : void
847 0 : ViWriteImplAsync::setVis(const Matrix<CStokesVector>& vis, DataColumn whichOne)
848 : {
849 0 : AsynchronousInterface * interface = getReadImpl()->interface_p;
850 0 : WriteQueue & writeQueue = interface->getWriteQueue();
851 :
852 0 : writeQueue.enqueue (createWriteData (getReadImpl()->getSubchunkId (),
853 : vis,
854 : whichOne,
855 : & VisibilityIterator::setVis));
856 0 : }
857 :
858 : void
859 0 : ViWriteImplAsync::setVis(const Cube<Complex>& vis, DataColumn whichOne)
860 : {
861 0 : AsynchronousInterface * interface = getReadImpl()->interface_p;
862 0 : WriteQueue & writeQueue = interface->getWriteQueue();
863 :
864 0 : writeQueue.enqueue (createWriteData (getReadImpl()->getSubchunkId (),
865 : vis,
866 : whichOne,
867 : & VisibilityIterator::setVis));
868 0 : }
869 :
870 : void
871 0 : ViWriteImplAsync::setVisAndFlag(const Cube<Complex>& vis, const Cube<Bool>& flag,
872 : DataColumn whichOne)
873 : {
874 0 : setVis (vis, whichOne);
875 0 : setFlag (flag);
876 0 : }
877 :
878 : void
879 0 : ViWriteImplAsync::setWeight(const Vector<Float>& wt)
880 : {
881 0 : AsynchronousInterface * interface = getReadImpl()->interface_p;
882 0 : WriteQueue & writeQueue = interface->getWriteQueue();
883 0 : SubChunkPair subchunk = getReadImpl()->getSubchunkId ();
884 :
885 0 : writeQueue.enqueue (createWriteData (subchunk, wt, & VisibilityIterator::setWeight));
886 0 : }
887 :
888 : void
889 0 : ViWriteImplAsync::setWeightMat(const Matrix<Float>& wtmat)
890 : {
891 0 : AsynchronousInterface * interface = getReadImpl()->interface_p;
892 0 : WriteQueue & writeQueue = interface->getWriteQueue();
893 :
894 0 : writeQueue.enqueue (createWriteData (getReadImpl()->getSubchunkId (),
895 : wtmat,
896 : & VisibilityIterator::setWeightMat));
897 0 : }
898 :
899 : void
900 0 : ViWriteImplAsync::setWeightSpectrum(const Cube<Float>& wtsp)
901 : {
902 0 : AsynchronousInterface * interface = getReadImpl()->interface_p;
903 0 : WriteQueue & writeQueue = interface->getWriteQueue();
904 :
905 0 : writeQueue.enqueue (createWriteData (getReadImpl()->getSubchunkId (),
906 : wtsp,
907 : & VisibilityIterator::setWeightSpectrum));
908 0 : }
909 :
910 : void
911 0 : ViWriteImplAsync::setSigma(const Vector<Float>& sig)
912 : {
913 0 : AsynchronousInterface * interface = getReadImpl()->interface_p;
914 0 : WriteQueue & writeQueue = interface->getWriteQueue();
915 :
916 0 : writeQueue.enqueue (createWriteData (getReadImpl()->getSubchunkId (),
917 : sig,
918 : & VisibilityIterator::setSigma));
919 0 : }
920 :
921 : void
922 0 : ViWriteImplAsync::setSigmaMat(const Matrix<Float>& sigmat)
923 : {
924 0 : AsynchronousInterface * interface = getReadImpl()->interface_p;
925 0 : WriteQueue & writeQueue = interface->getWriteQueue();
926 :
927 0 : writeQueue.enqueue (createWriteData (getReadImpl()->getSubchunkId (),
928 : sigmat,
929 : & VisibilityIterator::setSigmaMat));
930 0 : }
931 :
932 : using namespace casacore;
933 : } // end namespace casa
|