Line data Source code
1 : /*
2 : * VisibilityProcessing.cc
3 : *
4 : * Created on: Apr 20, 2011
5 : * Author: jjacobs
6 : */
7 :
8 : #include "VisibilityProcessing.h"
9 : #include "VisBufferAsync.h"
10 : #include "VisibilityIteratorImplAsync.h"
11 : #include <stdcasa/UtilJ.h>
12 :
13 : #include <tuple>
14 :
15 : #include <casacore/casa/System/AipsrcValue.h>
16 :
17 : #include <algorithm>
18 : #include <list>
19 : #include <stdarg.h>
20 : #include <limits>
21 : #include <memory>
22 : #include <numeric>
23 :
24 : using namespace casacore;
25 : using namespace casa;
26 : using namespace casacore;
27 : using namespace casa::asyncio;
28 : using namespace casacore;
29 : using namespace casa::utilj;
30 : using namespace std;
31 : using std::shared_ptr;
32 :
33 : using namespace casacore;
34 : namespace casa {
35 :
36 : namespace vpf {
37 :
38 : #define Log(level, ...) \
39 : {if (level <= VpEngine::getLogLevel()) \
40 : VpEngine::log (__VA_ARGS__);}
41 :
42 :
43 : //SimpleVp::SimpleVp (const String & name, const String & input, const String & output)
44 : //: VisibilityProcessor (name,
45 : // vector<String> (1, input),
46 : // output.empty() ? vector<String> () : vector<String> (1, output))
47 : //{}
48 : //
49 : //SimpleVp::~SimpleVp ()
50 : //{}
51 : //
52 : //void
53 : //SimpleVp::validateImpl ()
54 : //{
55 : // throwIfAnyPortsUnconnected ();
56 : //}
57 :
58 :
59 0 : SplitterVp::SplitterVp (const String & name,
60 : const String & inputName,
61 0 : const vector<String> & outputNames)
62 0 : : VisibilityProcessor (name, vector<String> (1, inputName), outputNames)
63 : {
64 0 : ThrowIf (inputName.size() != 1, "Exactly one input is required.");
65 0 : ThrowIf (outputNames.size () < 1, "At least one output is required.");
66 0 : }
67 :
68 :
69 :
70 : VisibilityProcessor::ProcessingResult
71 0 : SplitterVp::doProcessingImpl (ProcessingType processingType ,
72 : VpData & inputData,
73 : const SubchunkIndex & /*subChunkIndex*/)
74 : {
75 0 : if (processingType == VisibilityProcessor::EndOfChunk ||
76 : processingType == VisibilityProcessor::EndOfData){
77 0 : return ProcessingResult();
78 : }
79 :
80 0 : VpPort inputPort = getInputs () [0];
81 0 : VpPorts outputs = getOutputs();
82 0 : VbPtr inputVbPtr = inputData [inputPort];
83 0 : VpData vpData;
84 :
85 : // Handle the first, required output by reusing the input VB
86 :
87 0 : vpData [outputs [0]] = inputVbPtr; // reuse in input
88 :
89 : // Handle any additional outputs.
90 :
91 0 : for (int i = 1; i < (int) outputs.size(); i++){
92 :
93 0 : VpPort port = outputs [i];
94 :
95 : // Make a copy of the input and add it as an output
96 :
97 0 : vpData [port] = VbPtr (inputVbPtr->clone());
98 :
99 : }
100 :
101 0 : ProcessingResult processingResult (Normal, vpData);
102 :
103 0 : return processingResult;
104 : }
105 :
106 : void
107 0 : SplitterVp::validateImpl ()
108 : {
109 0 : throwIfAnyInputsUnconnected ();
110 0 : ThrowIf (getOutputs (true).empty(),
111 : String::format ("SplitterVp %s has no outputs connected.", getFullName().c_str()));
112 0 : }
113 :
114 :
115 0 : SubchunkIndex::SubchunkIndex (Int chunkNumber, Int subChunkNumber, Int iteration)
116 0 : : chunkNumber_p (chunkNumber), iteration_p (iteration), subChunkNumber_p (subChunkNumber)
117 0 : {}
118 :
119 : Bool
120 0 : SubchunkIndex::operator< (const SubchunkIndex & other) const
121 : {
122 0 : Bool result = std::make_tuple (chunkNumber_p, subChunkNumber_p, iteration_p) <
123 0 : std::make_tuple (other.chunkNumber_p, other.subChunkNumber_p, other.iteration_p);
124 :
125 0 : return result;
126 : }
127 :
128 : Int
129 0 : SubchunkIndex::getChunkNumber () const
130 : {
131 0 : return chunkNumber_p;
132 : }
133 :
134 : Int
135 0 : SubchunkIndex::getIteration () const
136 : {
137 0 : return iteration_p;
138 : }
139 :
140 : Int
141 0 : SubchunkIndex::getSubchunkNumber () const
142 : {
143 0 : return subChunkNumber_p;
144 : }
145 :
146 : String
147 0 : SubchunkIndex::toString() const
148 : {
149 0 : return String::format ("(%d,%d,%d)", chunkNumber_p, subChunkNumber_p, iteration_p);
150 : }
151 :
152 : VisibilityProcessorStub::ProcessingResult
153 0 : VisibilityProcessorStub::doProcessingImpl (ProcessingType /*processingType*/,
154 : VpData & /*inputData*/,
155 : const SubchunkIndex & /*subChunkIndex*/)
156 : {
157 0 : Throw ("Stub does not permit processing.");
158 : }
159 :
160 : void
161 0 : VisibilityProcessorStub::validateImpl ()
162 : {
163 0 : Throw ("Stub does not permit validation.");
164 : }
165 :
166 :
167 : //VisBuffer *
168 : //VbPtr::get ()
169 : //{
170 : // return vb_p; // use with caution
171 : //}
172 : //
173 :
174 : //VisBuffer *
175 : //VbPtr::release ()
176 : //{
177 : // freeOnDelete_p = false;
178 : // VisBuffer * vb = vb_p;
179 : // vb_p = NULL;
180 : //
181 : // return vb;
182 : //}
183 :
184 0 : VisibilityProcessor::VisibilityProcessor ()
185 : : container_p (NULL),
186 : nSubchunks_p (0),
187 : nSubchunksUnique_p (0),
188 0 : vpEngine_p (0)
189 0 : {}
190 :
191 :
192 0 : VisibilityProcessor::VisibilityProcessor (const String & name,
193 : const vector<String> & inputNames,
194 : const vector<String> & outputNames,
195 0 : Bool makeIoPorts)
196 : : container_p (NULL),
197 : name_p (name),
198 : nSubchunks_p (0),
199 : nSubchunksUnique_p (0),
200 0 : vpEngine_p (0)
201 :
202 : {
203 0 : VpPort::Type portType = makeIoPorts ? VpPort::InOut : VpPort::Input;
204 0 : vpInputs_p = definePorts (inputNames, portType, "input");
205 0 : portType = makeIoPorts ? VpPort::InOut : VpPort::Output;
206 0 : vpOutputs_p = definePorts (outputNames, portType, "output");
207 0 : }
208 :
209 : void
210 0 : VisibilityProcessor::throwIfAnyInputsUnconnected (const vector<String> & exceptThese) const
211 : {
212 0 : VpPorts inputs = getInputs ();
213 :
214 0 : VpPorts unconnectedPorts = portsUnconnected (inputs, & VpPort::isConnectedInput, exceptThese);
215 :
216 0 : ThrowIf (! unconnectedPorts.empty(),
217 : String::format ("Vp '%s' has unconnected inputs: %s",
218 : getName().c_str(),
219 : unconnectedPorts.toString().c_str()));
220 0 : }
221 :
222 : void
223 0 : VisibilityProcessor::throwIfAnyInputsUnconnectedExcept (const String & name) const
224 : {
225 0 : throwIfAnyInputsUnconnected (vector<String> (1, name));
226 0 : }
227 :
228 :
229 : void
230 0 : VisibilityProcessor::throwIfAnyOutputsUnconnected (const vector<String> & exceptThese) const
231 : {
232 0 : VpPorts outputs = getOutputs ();
233 :
234 0 : VpPorts unconnectedPorts = portsUnconnected (outputs, & VpPort::isConnectedOutput, exceptThese);
235 :
236 0 : ThrowIf (! unconnectedPorts.empty(),
237 : String::format ("Vp '%s' has unconnected outputs: %s",
238 : getName().c_str(),
239 : unconnectedPorts.toString().c_str()));
240 0 : }
241 :
242 : void
243 0 : VisibilityProcessor::throwIfAnyOutputsUnconnectedExcept (const String & name) const
244 : {
245 0 : throwIfAnyOutputsUnconnected (vector<String> (1, name));
246 0 : }
247 :
248 :
249 : void
250 0 : VisibilityProcessor::throwIfAnyPortsUnconnected () const
251 : {
252 0 : VpPorts inputs = getInputs ();
253 :
254 0 : VpPorts unconnectedInputs = portsUnconnected (inputs, & VpPort::isConnectedInput);
255 :
256 0 : VpPorts outputs = getOutputs ();
257 :
258 0 : VpPorts unconnectedOutputs = portsUnconnected (outputs, & VpPort::isConnectedOutput);
259 :
260 0 : if (! unconnectedInputs.empty () || ! unconnectedOutputs.empty()){
261 :
262 0 : String message = String::format ("Vp '%s' has");
263 :
264 :
265 0 : if (! unconnectedInputs.empty()){
266 :
267 : // String::format up the inputs portion of the message, if applicable
268 :
269 0 : message += String::format (" unconnected inputs: %s",
270 0 : unconnectedInputs.toString().c_str());
271 : }
272 :
273 0 : if (! unconnectedInputs.empty()){
274 :
275 : // String::format up the inputs portion of the message, if applicable.
276 : // Determine the appropriate text to join up the previous text.
277 :
278 0 : string conjunction = (unconnectedInputs.empty()) ? ""
279 0 : : "\n and";
280 :
281 0 : message += String::format ("%s has unconnected outputs: %s",
282 : conjunction.c_str (),
283 0 : unconnectedOutputs.toString().c_str());
284 : }
285 :
286 0 : message += ".";
287 :
288 0 : ThrowIf (true, message);
289 :
290 : }
291 :
292 0 : }
293 :
294 :
295 : VpPorts
296 0 : VisibilityProcessor::portsUnconnected (const VpPorts & ports, Bool (VpPort::* isConnected) () const,
297 : const vector<String> & except) const
298 : {
299 0 : VpPorts unconnectedPorts;
300 :
301 0 : for (VpPorts::const_iterator port = ports.begin(); port != ports.end(); port ++){
302 :
303 0 : Bool connected = ((* port).*isConnected)();
304 :
305 0 : if (! connected){
306 :
307 : // See if it's in the list of exceptions
308 :
309 0 : if (! except.empty() && find (except.begin(), except.end(), port->getName()) == except.end()){
310 0 : unconnectedPorts.push_back (* port);
311 : }
312 : }
313 :
314 : }
315 :
316 0 : return unconnectedPorts;
317 : }
318 :
319 : void
320 0 : VisibilityProcessor::chunkStart (const SubchunkIndex & sci)
321 : {
322 0 : chunkStartImpl (sci);
323 0 : }
324 :
325 :
326 : VpPorts
327 0 : VisibilityProcessor::definePorts (const vector<String> & portNames, VpPort::Type type, const String & typeName)
328 : {
329 :
330 0 : VpPorts vpPorts;
331 :
332 0 : for (vector<String>::const_iterator portName = portNames.begin();
333 0 : portName != portNames.end();
334 0 : portName ++){
335 :
336 0 : ThrowIf (vpPorts.contains (* portName),
337 : String::format ("VisibilityProcessor %s already has an %s port '%s'",
338 : getName().c_str(), typeName.c_str(), portName->c_str()));
339 :
340 0 : vpPorts.push_back (VpPort (this, * portName, type));
341 : }
342 :
343 0 : return vpPorts;
344 : }
345 :
346 : VisibilityProcessor::ProcessingResult
347 0 : VisibilityProcessor::doProcessing (ProcessingType processingType,
348 : VpData & inputData,
349 : VpEngine * vpEngine,
350 : const SubchunkIndex & subchunkIndex)
351 : {
352 :
353 0 : vpEngine_p = vpEngine;
354 0 : pair<Int,Int> originalViPosition = getVi()->getSubchunkId ();
355 :
356 0 : if (processingType == Subchunk && subchunkIndex != SubchunkIndex::Invalid){
357 0 : nSubchunks_p ++;
358 0 : if (subchunkIndex.getIteration() == 0){
359 0 : nSubchunksUnique_p ++;
360 : }
361 : }
362 :
363 0 : ProcessingResult result;
364 :
365 : try {
366 0 : result = doProcessingImpl (processingType, inputData, subchunkIndex);
367 :
368 : }
369 0 : catch (AipsError & e){
370 :
371 0 : vpEngine_p = 0;
372 :
373 0 : Rethrow (e, String::format ("Error in doProcessing of VP '%s': %s on %s", getName().c_str(),
374 : toString (processingType).c_str(), subchunkIndex.toString().c_str()));
375 : }
376 :
377 0 : pair<Int,Int> currentViPosition = getVi()->getSubchunkId ();
378 :
379 0 : ThrowIf (currentViPosition != originalViPosition,
380 : String::format ("VisibilityIterator moved during processing in VP '%s'", getName().c_str()));
381 :
382 0 : vpEngine_p = 0;
383 :
384 0 : return result;
385 :
386 : }
387 :
388 :
389 : String
390 0 : VisibilityProcessor::getFullName () const
391 : {
392 :
393 0 : list<String> names;
394 0 : names.push_front (getName());
395 :
396 0 : const VisibilityProcessor * parent = getContainer();
397 :
398 0 : while (parent != NULL){
399 :
400 0 : names.push_front (parent->getName());
401 0 : parent = parent->getContainer();
402 : }
403 :
404 0 : String fullName = utilj::join (names, ".");
405 :
406 0 : return fullName;
407 : }
408 :
409 : VpPorts
410 0 : VisibilityProcessor::getInputs (Bool connectedOnly) const
411 : {
412 0 : VpPorts result;
413 :
414 0 : if (connectedOnly){
415 :
416 : // Copy over the inputs that are input connected.
417 : // Need to negate predicate to get the right results --STL is strange sometimes.
418 :
419 : remove_copy_if (vpInputs_p.begin(), vpInputs_p.end(), back_inserter (result),
420 0 : not1 (mem_fun_ref (& VpPort::isConnectedInput)));
421 : }
422 : else{
423 0 : result = vpInputs_p;
424 : }
425 :
426 0 : return result;
427 : }
428 :
429 : VpPort
430 0 : VisibilityProcessor::getInput (const String & name) const
431 : {
432 0 : ThrowIf (! vpInputs_p.contains (name),
433 : String::format ("Vp '%s' has no input port '%s'", getName().c_str(), name.c_str()));
434 :
435 0 : return vpInputs_p.get (name);
436 : }
437 :
438 : VpPort &
439 0 : VisibilityProcessor::getInputRef (const String & name)
440 : {
441 0 : ThrowIf (! vpInputs_p.contains (name),
442 : String::format ("Vp '%s' has no input port '%s'", getName().c_str(), name.c_str()));
443 :
444 0 : return vpInputs_p.getRef (name);
445 : }
446 :
447 : String
448 0 : VisibilityProcessor::getName () const
449 : {
450 0 : return name_p;
451 : }
452 :
453 : Int
454 0 : VisibilityProcessor::getNSubchunksProcessed () const
455 : {
456 0 : return nSubchunks_p;
457 : }
458 :
459 : Int
460 0 : VisibilityProcessor::getNSubchunksUniqueProcessed () const
461 : {
462 0 : return nSubchunksUnique_p;
463 : }
464 :
465 :
466 : VpPort
467 0 : VisibilityProcessor::getOutput (const String & name) const
468 : {
469 0 : ThrowIf (! vpOutputs_p.contains (name),
470 : String::format ("Vp '%s' has no output port '%s'", getName().c_str(), name.c_str()));
471 :
472 0 : return vpOutputs_p.get (name);
473 : }
474 :
475 : VpPort &
476 0 : VisibilityProcessor::getOutputRef (const String & name)
477 : {
478 0 : ThrowIf (! vpOutputs_p.contains (name),
479 : String::format ("Vp '%s' has no output port '%s'", getName().c_str(), name.c_str()));
480 :
481 0 : return vpOutputs_p.getRef (name);
482 : }
483 :
484 : VpPorts
485 0 : VisibilityProcessor::getOutputs (Bool connectedOnly) const
486 : {
487 0 : VpPorts result;
488 :
489 0 : if (connectedOnly){
490 :
491 : // Copy over the outputs that are output connected.
492 : // Need to negate predicate to get the right results --STL is strange sometimes.
493 :
494 : remove_copy_if (vpOutputs_p.begin(), vpOutputs_p.end(), back_inserter (result),
495 0 : not1 (mem_fun_ref (& VpPort::isConnectedOutput)));
496 : }
497 : else{
498 0 : result = vpOutputs_p;
499 : }
500 :
501 0 : return result;
502 : }
503 :
504 : PrefetchColumns
505 0 : VisibilityProcessor::getPrefetchColumns () const
506 : {
507 0 : return PrefetchColumns ();
508 : }
509 :
510 : ROVisibilityIterator *
511 0 : VisibilityProcessor::getVi ()
512 : {
513 0 : return vpEngine_p->getVi();
514 : }
515 :
516 : VpEngine *
517 0 : VisibilityProcessor::getVpEngine ()
518 : {
519 0 : return vpEngine_p;
520 : }
521 :
522 : void
523 0 : VisibilityProcessor::processingStart ()
524 : {
525 0 : nSubchunks_p = 0;
526 0 : nSubchunksUnique_p = 0;
527 :
528 0 : validate();
529 :
530 0 : processingStartImpl ();
531 0 : }
532 :
533 : void
534 0 : VisibilityProcessor::setContainer (const VpContainer * container)
535 : {
536 0 : assert (container != NULL);
537 :
538 0 : ThrowIf (container_p != NULL,
539 : String::format ("Attempting to add VisibiltyProcessor '%s' into '%s'; previously added to '%s'",
540 : getName().c_str(), container->getFullName().c_str(), container_p->getFullName().c_str()));
541 :
542 0 : container_p = container;
543 0 : }
544 :
545 : String
546 0 : toString (VisibilityProcessor::ProcessingType p)
547 : {
548 : static const char * names [] = {"Subchunk", "EndOfChunk", "EndOfData"};
549 :
550 0 : return names [p];
551 : }
552 :
553 : void
554 0 : VisibilityProcessor::validate ()
555 : {
556 0 : validateImpl ();
557 0 : }
558 :
559 0 : VpContainer::VpContainer (const String & name, const vector<String> & inputs, const vector<String> & outputs)
560 0 : : VisibilityProcessor (name, inputs, outputs, true)
561 0 : {}
562 :
563 : void
564 0 : VpContainer::add (VisibilityProcessor * vp)
565 : {
566 0 : ThrowIf (contains (vp),
567 : String::format ("Visibility processor %s already in container %s",
568 : vp->getName().c_str(), getName().c_str()));
569 :
570 0 : vp->setContainer (this);
571 0 : vps_p.push_back (vp);
572 0 : }
573 :
574 : void
575 0 : VpContainer::chunkStart (const SubchunkIndex & sci)
576 : {
577 0 : iterator i;
578 : try{
579 0 : for (i = begin(); i != end(); i++){
580 0 : (*i)->chunkStart (sci);
581 : }
582 : }
583 0 : catch (AipsError & e){
584 0 : Rethrow (e, String::format ("Error during chunkStart for container '%s' VP '%s'",
585 : getName().c_str(), (*i)->getName().c_str()));
586 : }
587 0 : }
588 :
589 : void
590 0 : VpContainer::connect (const String & sourcePortName,
591 : VisibilityProcessor * sinkVp, const String & sinkPortName)
592 : {
593 0 : connect (this, sourcePortName, sinkVp, sinkPortName);
594 0 : }
595 :
596 : void
597 0 : VpContainer::connect (VisibilityProcessor * sourceVp, const String & sourcePortName,
598 : const String & sinkPortName)
599 : {
600 0 : connect (sourceVp, sourcePortName, this, sinkPortName);
601 0 : }
602 :
603 : pair<VpPort, VpPort>
604 0 : VpContainer::validateConnectionPorts (VisibilityProcessor * sourceVp,
605 : const String & sourcePortName,
606 : VisibilityProcessor * sinkVp,
607 : const String & sinkPortName)
608 : {
609 : // Does the owning VP really support these ports?
610 :
611 0 : ThrowIf (sourceVp != this && ! sourceVp->getOutputs ().contains (sourcePortName),
612 : String::format ("Visibility processor %s in %s does not have output %s",
613 : sourceVp->getName().c_str(), getName().c_str(),
614 : sourcePortName.c_str()));
615 :
616 0 : ThrowIf (sourceVp == this && ! getInputs().contains (sourcePortName),
617 : String::format ("Visibility processor container %s in %s does not have input %s",
618 : sourceVp->getName().c_str(), getName().c_str(),
619 : sourcePortName.c_str()));
620 :
621 0 : ThrowIf (sinkVp != this && ! sinkVp->getInputs ().contains (sinkPortName),
622 : String::format ("Visibility processor %s in %s does not have input %s",
623 : sinkVp->getName().c_str(), getName().c_str(),
624 : sinkPortName.c_str()));
625 :
626 0 : ThrowIf (sinkVp == this && ! getOutputs().contains (sinkPortName),
627 : String::format ("Visibility processor container %s in %s does not have output %s",
628 : sinkVp->getName().c_str(), getName().c_str(),
629 : sinkPortName.c_str()));
630 :
631 : // Are the ports already in use?
632 :
633 0 : VpPort sink = (sinkVp == this) ? sinkVp->getOutput (sinkPortName)
634 0 : : sinkVp->getInput (sinkPortName);
635 0 : VpPort source = (sourceVp == this) ? sourceVp->getInput (sourcePortName)
636 0 : : sourceVp->getOutput (sourcePortName);
637 :
638 0 : ThrowIf (utilj::containsKey (source, network_p),
639 : String::format ("Output %s already in use for visibility processor %s in %s",
640 : source.getName().c_str(), sourceVp->getName().c_str(), getName().c_str()));
641 :
642 0 : ThrowIf (utilj::containsKey (sink, networkReverse_p),
643 : String::format ("Input %s already in use for visibility processor %s in %s",
644 : sink.getName().c_str(), sinkVp->getName().c_str(), getName().c_str()));
645 :
646 0 : return make_pair (source, sink);
647 : }
648 :
649 : void
650 0 : VpContainer::connect (VisibilityProcessor * sourceVp, const String & sourcePortName,
651 : VisibilityProcessor * sinkVp, const String & sinkPortName)
652 : {
653 : // Validate the requested connection
654 : // =================================
655 :
656 : // Do they refer to a VP in this container?
657 :
658 0 : ThrowIf (! contains (sourceVp) && sourceVp != this,
659 : String::format ("No such visibility processor %s in %s.",
660 : sourceVp->getName().c_str(), getName().c_str()));
661 0 : ThrowIf (! contains (sinkVp) && sinkVp != this,
662 : String::format ("No such visibility processor %s in %s.",
663 : sinkVp->getName().c_str(), getName().c_str()));
664 :
665 0 : VpPort sink, source;
666 0 : std::tie (source, sink) = validateConnectionPorts (sourceVp, sourcePortName, sinkVp, sinkPortName);
667 :
668 : // See if this is a connection to the container inputs or outputs or
669 : // a normal connection between VPs
670 :
671 0 : Bool containerConnect = (source.getType() == sink.getType()) &&
672 0 : ((source.isType(VpPort::Input) && sourceVp == this) ||
673 0 : (sink.isType(VpPort::Output) && sinkVp == this));
674 :
675 0 : Bool normalConnect = source.isType (VpPort::Output) && sink.isType (VpPort::Input);
676 :
677 0 : Bool selfConnect = sourceVp == sinkVp; // detects loop back
678 :
679 0 : ThrowIf (! (normalConnect || containerConnect) || selfConnect,
680 : String::format ("Cannot connect %s:%s to %s:%s in %s", sourceVp->getName ().c_str(),
681 : source.getName ().c_str (), sinkVp->getName().c_str (),
682 : sink.getName ().c_str (), getName().c_str()));
683 :
684 : // The validation is over, so actually do the connection.
685 :
686 0 : network_p [source] = sink;
687 0 : networkReverse_p.insert (sink);
688 :
689 : // Inform the real ports (i.e., not the copies) that they are connected
690 : // N.B.: Container ports are in/out and are intended to be doubly connected,
691 : // from the inside and from the outside of the container.
692 :
693 0 : if (source.isType (VpPort::Input)){
694 0 : sourceVp->getInputRef (source.getName()).setConnectedOutput ();
695 : }
696 : else{
697 0 : sourceVp->getOutputRef (source.getName()).setConnectedOutput ();
698 : }
699 :
700 0 : if (sink.isType (VpPort::Output)){
701 0 : sinkVp->getOutputRef (sink.getName()).setConnectedInput ();
702 : }
703 : else{
704 0 : sinkVp->getInputRef (sink.getName()).setConnectedInput ();
705 : }
706 0 : }
707 :
708 : VpContainer::iterator
709 0 : VpContainer::begin()
710 : {
711 0 : return iterator (vps_p.begin());
712 : }
713 :
714 : VpContainer::const_iterator
715 0 : VpContainer::begin() const
716 : {
717 0 : return const_iterator (vps_p.begin());
718 : }
719 :
720 :
721 :
722 : Bool
723 0 : VpContainer::contains (const VisibilityProcessor * vp) const
724 : {
725 0 : Bool foundIt = find (vps_p.begin(), vps_p.end(), vp) != vps_p.end();
726 :
727 0 : return foundIt;
728 : }
729 :
730 : VisibilityProcessor::ProcessingResult
731 0 : VpContainer::doProcessingImpl (ProcessingType processingType, VpData & data, const SubchunkIndex & sci)
732 : {
733 :
734 0 : VpSet vpsWaiting (vps_p.begin(), vps_p.end()); // Set of pending VPs
735 0 : ChunkCode overallChunkCode = Normal; // container result for this data
736 : VisibilityProcessor * vp; // Currently executing VP
737 0 : VpData vpInputs; // Inputs to be fed to current Vp
738 :
739 0 : remapPorts (data, this);
740 :
741 0 : Log (3, "VpContainer::doProcessing: '%s' starting execution with inputs {%s}.\n",
742 : getName().c_str(), data.getNames().c_str());
743 :
744 : try{
745 :
746 0 : do {
747 :
748 : // Find a VP which can compute given the current set of inputs
749 :
750 0 : Bool flushing = processingType != Subchunk;
751 0 : std::tie (vp, vpInputs) = findReadyVp (vpsWaiting, data, flushing);
752 :
753 0 : if (vp != NULL){
754 :
755 0 : Log (3, "VpContainer::doProcessing: '%s' starting execution of %s.\n",
756 : getName().c_str(), vp->getName().c_str());
757 :
758 : // Have the ready VP process its inputs and
759 : // potentially produce more outputs
760 :
761 : ChunkCode chunkCode;
762 0 : VpData outputs;
763 :
764 0 : std::tie (chunkCode, outputs) =
765 0 : vp->doProcessing (processingType, vpInputs, getVpEngine(), sci);
766 :
767 0 : Log (3, "VpContainer::doProcessing: execution of %s output {%s}.\n",
768 : vp->getName().c_str(), outputs.getNames().c_str());
769 :
770 0 : if (processingType == EndOfChunk && chunkCode == RepeatChunk){
771 :
772 : // If any VP in this iteration requests a chunk repeat,
773 : // then that's the overall result.
774 :
775 0 : overallChunkCode = RepeatChunk;
776 : }
777 :
778 : // Remove the VP from the set of pending VPs, remove
779 : // the data this VP consumed as inputs and add any outputs
780 : // it produced to the set of available data.
781 :
782 0 : vpsWaiting.erase (vp);
783 0 : for (VpData::const_iterator i = vpInputs.begin(); i != vpInputs.end(); i++){
784 0 : data.erase (i->first);
785 : }
786 0 : remapPorts (outputs, vp);
787 0 : data.insert (outputs.begin(), outputs.end());
788 :
789 : }
790 :
791 0 : } while (vp != NULL);
792 :
793 : }
794 0 : catch (AipsError & e){
795 0 : Rethrow (e, String::format ("Error while container '%s' processing VP '%s'",
796 : getName().c_str(), (vp != NULL) ? vp->getName().c_str() : "NULL"));
797 : }
798 :
799 0 : if (vpsWaiting.empty()){
800 0 : Log (3, "VpContainer::doProcessing: '%s' executed all VPs.\n", getName().c_str());
801 : }
802 : else{
803 0 : Log (3, "VpContainer::doProcessing: '%s' did not execute VPs: {%s}.\n",
804 : getName().c_str(), vpsWaiting.getNames().c_str());
805 : }
806 :
807 0 : return ProcessingResult (overallChunkCode, data);
808 : }
809 :
810 : Bool
811 0 : VpContainer::empty () const
812 : {
813 0 : return vps_p.empty();
814 : }
815 :
816 : VpContainer::iterator
817 0 : VpContainer::end()
818 : {
819 0 : return iterator (vps_p.end());
820 : }
821 :
822 : VpContainer::const_iterator
823 0 : VpContainer::end() const
824 : {
825 0 : return const_iterator (vps_p.end());
826 : }
827 :
828 : void
829 0 : VpContainer::fillWithSequence (VisibilityProcessor * first, ...)
830 : {
831 0 : ThrowIf (! vps_p.empty (),
832 : String::format ("fillWithSequence performed on non-empty container %s", getName().c_str()));
833 :
834 : va_list vaList;
835 :
836 0 : VisibilityProcessor * vp = first;
837 :
838 0 : va_start (vaList, first);
839 :
840 0 : while (vp != NULL){
841 :
842 0 : add (vp);
843 :
844 0 : vp = va_arg (vaList, VisibilityProcessor *);
845 : }
846 :
847 0 : va_end (vaList);
848 :
849 0 : for (VPs::iterator vp = vps_p.begin ();
850 0 : vp != vps_p.end();
851 0 : vp ++){
852 :
853 0 : VPs::iterator vp2 = vp + 1;
854 0 : if (vp2 == vps_p.end()){
855 0 : break;
856 : }
857 :
858 0 : ThrowIf ((* vp)->getOutputs().empty(),
859 : String::format ("Visibility processor %s has no outputs.", (* vp)->getName().c_str()));
860 0 : ThrowIf ((* vp2)->getInputs().empty(),
861 : String::format ("Visibility processor %s has no inputs.", (* vp2)->getName().c_str()));
862 :
863 0 : connect (* vp, (* vp)->getOutputs().front().getName(),
864 0 : * vp2, (* vp2)->getInputs().front().getName());
865 :
866 : }
867 :
868 : // Connect up containers input to the input of the first VP
869 :
870 0 : ThrowIf (vps_p.front()->getInputs().empty(),
871 : String::format ("First node in sequence, %s, has no inputs",
872 : vps_p.front()->getName().c_str()));
873 :
874 0 : connect (getInputs().front().getName(),
875 0 : vps_p.front(), vps_p.front()->getInputs().front().getName());
876 :
877 0 : if (! getOutputs().empty() && ! vps_p.back()->getOutputs().empty()){
878 :
879 : // Connect up output of last node with output of container
880 :
881 0 : connect (vps_p.back(), vps_p.back()->getInputs().front().getName(),
882 0 : getOutputs().front().getName());
883 :
884 : }
885 0 : }
886 :
887 : VpContainer::ReadyVpAndData
888 0 : VpContainer::findReadyVp (VpSet & vps, VpData & data, Bool flushing) const
889 : {
890 0 : if (flushing){
891 0 : return findReadyVpFlushing (vps, data);
892 : }
893 : else{
894 0 : return findReadyVpNormal (vps, data);
895 : }
896 : }
897 :
898 : VpContainer::ReadyVpAndData
899 0 : VpContainer::findReadyVpFlushing (VpSet & vpsWaiting, VpData & data) const
900 : {
901 :
902 0 : VisibilityProcessor * readyVp = NULL;
903 :
904 : // The first vp in the ordered list to also be in the waiting set is the
905 : // one we want.
906 :
907 0 : for (VPs::const_iterator vp = vps_p.begin(); vp != vps_p.end(); vp ++){
908 0 : if (vpsWaiting.find (* vp) != vpsWaiting.end()){
909 0 : readyVp = * vp;
910 0 : break;
911 : }
912 : }
913 :
914 0 : if (readyVp == NULL){
915 :
916 0 : ThrowIf (! vpsWaiting.empty(), "Could not find ready VP during flush (bug)");
917 :
918 0 : return ReadyVpAndData (NULL, VpData());
919 : }
920 :
921 : // The set of input data will be all of the inputs desired by the node that
922 : // are available. They may not be present if an upstream node didn't have any
923 : // data to flush out.
924 :
925 0 : VpPorts connectedInputList = readyVp -> getInputs (true);
926 :
927 0 : ReadyVpAndData result = ReadyVpAndData (readyVp, data.getSelection (connectedInputList, true));
928 :
929 0 : return result;
930 : }
931 :
932 : VpContainer::ReadyVpAndData
933 0 : VpContainer::findReadyVpNormal (VpSet & vps, VpData & data) const
934 : {
935 0 : ReadyVpAndData result (NULL, VpData());
936 :
937 0 : set<VpPort> dataPorts;
938 0 : for (VpData::const_iterator d = data.begin(); d != data.end(); d++){
939 0 : dataPorts.insert (d->first);
940 : }
941 :
942 0 : for (VpSet::const_iterator vp = vps.begin(); vp != vps.end(); vp ++){
943 :
944 0 : VpPorts connectedInputList = (* vp)->getInputs (true);
945 :
946 0 : set<VpPort> connectedInputSet (connectedInputList.begin(), connectedInputList.end());
947 :
948 : // Subtract from the needed input ports, the set of available data ports.
949 : // When the comes up empty then the VP can execute.
950 :
951 0 : VpPorts diff;
952 : set_difference (connectedInputSet.begin(), connectedInputSet.end(),
953 : dataPorts.begin(), dataPorts.end(),
954 0 : back_inserter (diff));
955 :
956 0 : if (diff.empty()){
957 :
958 0 : result = ReadyVpAndData (* vp, data.getSelection (connectedInputList));
959 :
960 0 : break;
961 : }
962 :
963 : }
964 :
965 0 : return result;
966 : }
967 :
968 : bool
969 0 : VpContainer::follows (const VisibilityProcessor * a, const VisibilityProcessor * b) const
970 : {
971 : // Go through the interconnection network and see if one of processor b's outputs go to
972 : // processor a's inputs
973 :
974 0 : Bool result = false;
975 0 : for (Network::const_iterator arc = network_p.begin(); arc != network_p.end(); arc ++){
976 0 : if (arc->first.getVp() == b && arc->second.getVp() == a){
977 0 : result = true;
978 0 : break;
979 : }
980 : }
981 :
982 0 : return result;
983 : }
984 :
985 : bool
986 0 : VpContainer::followsSet (const VisibilityProcessor * a, const VpSet & vpSet) const
987 : {
988 0 : Bool result = false;
989 :
990 0 : for (VpSet::const_iterator vp = vpSet.begin(); vp != vpSet.end(); vp++){
991 0 : if (follows (a, * vp)){
992 0 : result = true;
993 0 : break;
994 : }
995 : }
996 :
997 0 : return result;
998 : }
999 :
1000 : casa::asyncio::PrefetchColumns
1001 0 : VpContainer::getPrefetchColumns () const
1002 : {
1003 0 : PrefetchColumns result;
1004 :
1005 0 : for (VPs::const_iterator vp = vps_p.begin(); vp != vps_p.end(); vp ++){
1006 :
1007 0 : result = result + (* vp)->getPrefetchColumns();
1008 :
1009 : }
1010 :
1011 0 : return result;
1012 : }
1013 :
1014 :
1015 : void
1016 0 : VpContainer::orderContents ()
1017 : {
1018 : // Order the VPs in this container using their dependencies. Nodes that are only dependent
1019 : // on the container will be first, then nodes dependent on the first set of nodes, etc.
1020 :
1021 0 : VpSet unorderedVps (vps_p.begin(), vps_p.end()); // VPs not assigned an order as of yet
1022 0 : VPs orderedVps; // sorted list of VPs
1023 :
1024 0 : while (! unorderedVps.empty()){
1025 :
1026 0 : VPs nextClass;
1027 :
1028 : // Create the next class of VPs which are only dependent on previous classes of VPs.
1029 : // These will be VPs which are not dependent on any of the currently unordered nodes.
1030 :
1031 0 : for (VpSet::const_iterator vp = unorderedVps.begin(); vp != unorderedVps.end(); vp ++){
1032 0 : if (! followsSet (* vp, unorderedVps)){
1033 0 : nextClass.push_back (* vp);
1034 0 : orderedVps.push_back (* vp);
1035 : }
1036 : }
1037 :
1038 : // Remove the VPs that are
1039 :
1040 0 : for (VPs::const_iterator vp = nextClass.begin(); vp != nextClass.end(); vp++){
1041 0 : unorderedVps.erase (* vp);
1042 : }
1043 :
1044 : // If no nodes were found then there must be a cycle and the loop will never
1045 : // terminate!
1046 :
1047 0 : ThrowIf (nextClass.size() == 0, String::format ("VpContainer %s contains a cycle", getName().c_str()));
1048 :
1049 : }
1050 :
1051 0 : vps_p = orderedVps;
1052 0 : }
1053 :
1054 : void
1055 0 : VpContainer::processingStartImpl ()
1056 : {
1057 0 : iterator i;
1058 : try{
1059 0 : for (i = begin(); i != end(); i++){
1060 0 : (*i)->processingStart ();
1061 : }
1062 : }
1063 0 : catch (AipsError & e){
1064 0 : Rethrow (e, String::format ("Error during processingStart for container '%s' VP '%s'",
1065 : getName().c_str(), (*i)->getName().c_str()));
1066 : }
1067 0 : }
1068 :
1069 : void
1070 0 : VpContainer::remapPorts (VpData & data, const VisibilityProcessor * vp)
1071 : {
1072 0 : vector<VpPort> oldPorts = mapKeys (data);
1073 :
1074 0 : for (vector<VpPort>::const_iterator oldPort = oldPorts.begin();
1075 0 : oldPort != oldPorts.end();
1076 0 : oldPort ++){
1077 :
1078 0 : Network::const_iterator newPortItr = network_p.find (* oldPort);
1079 :
1080 0 : if (newPortItr != network_p.end()){
1081 :
1082 0 : VpPort newPort = newPortItr->second;
1083 :
1084 0 : assert (! utilj::containsKey (newPort, data));
1085 :
1086 0 : data [newPort] = data [* oldPort];
1087 0 : data.erase (* oldPort);
1088 :
1089 : }
1090 : else{
1091 0 : ThrowIf (true,
1092 : String::format ("Vp '%s' produced unused output '%s'",
1093 : vp->getFullName().c_str(), oldPort->getName().c_str()));
1094 : }
1095 : }
1096 0 : }
1097 :
1098 : size_t
1099 0 : VpContainer::size() const
1100 : {
1101 0 : return vps_p.size();
1102 : }
1103 :
1104 :
1105 :
1106 : void
1107 0 : VpContainer::validateImpl()
1108 : {
1109 0 : iterator i;
1110 : try{
1111 0 : for (i = begin(); i != end(); i++){
1112 0 : (*i)->validate ();
1113 : }
1114 : }
1115 0 : catch (AipsError & e){
1116 0 : Rethrow (e, String::format ("Error during validate for container '%s' VP '%s'",
1117 : getName().c_str(), (*i)->getName().c_str()));
1118 : }
1119 :
1120 0 : orderContents(); // put vps_p into dependency order
1121 0 : }
1122 :
1123 : String
1124 0 : VpContainer::VpSet::getNames () const
1125 : {
1126 0 : String nameList = utilj::join (begin(), end(), mem_fun (& VisibilityProcessor::getName), ",");
1127 :
1128 0 : return nameList;
1129 : }
1130 :
1131 0 : VpData::VpData ()
1132 0 : {}
1133 :
1134 0 : VpData::VpData (const VpPort & port, VbPtr vb)
1135 : {
1136 0 : add (port, vb);
1137 0 : }
1138 :
1139 : void
1140 0 : VpData::add (const VpPort & port, VbPtr vb)
1141 : {
1142 0 : ThrowIf (utilj::containsKey (port, * this),
1143 : String::format ("VpData::add: data already present for port %s.", port.getFullName ().c_str()));
1144 :
1145 0 : (* this) [port] = vb;
1146 0 : }
1147 :
1148 : String
1149 0 : VpData::getNames () const
1150 : {
1151 0 : string names = join (begin(), end(),
1152 : compose (mem_fun_ref (& VpPort::getName),
1153 0 : casa::utilj::firstFunctor<VpPort, VbPtr>()),
1154 0 : ",");
1155 :
1156 0 : return names;
1157 : }
1158 :
1159 : VpData
1160 0 : VpData::getSelection (const VpPorts & ports, bool missingIsOk) const
1161 : {
1162 0 : VpData result;
1163 :
1164 0 : for (VpPorts::const_iterator port = ports.begin(); port != ports.end(); port ++){
1165 :
1166 0 : const_iterator data = find (* port);
1167 :
1168 0 : if (data != end()){
1169 0 : result [* port] = data->second;
1170 : }
1171 : else{
1172 0 : assert (missingIsOk);
1173 : UnusedVariable (missingIsOk);
1174 : }
1175 : }
1176 :
1177 0 : return result;
1178 : }
1179 :
1180 :
1181 : Int VpEngine::logLevel_p = std::numeric_limits<int>::min();
1182 : LogIO * VpEngine::logIo_p = NULL;
1183 : LogSink * VpEngine::logSink_p = NULL;
1184 : Bool VpEngine::loggingInitialized_p = false;
1185 :
1186 : Bool
1187 0 : VpEngine::initializeLogging()
1188 : {
1189 :
1190 0 : AipsrcValue<Int>::find (logLevel_p, getAipsRcBase () + ".debug.logLevel",
1191 0 : std::numeric_limits<int>::min());
1192 :
1193 0 : if (logLevel_p >= 0){
1194 :
1195 0 : if (logSink_p == 0){
1196 0 : logSink_p = new LogSink(LogMessage::NORMAL, false);
1197 : }
1198 :
1199 0 : logIo_p = new LogIO (LogOrigin ("VisibilityProcessing"));
1200 0 : * logIo_p << "VisibilityProcessing logging enabled; level=" << logLevel_p << endl << LogIO::POST;
1201 :
1202 : }
1203 :
1204 0 : loggingInitialized_p = true;
1205 :
1206 0 : return true;
1207 : }
1208 :
1209 : String
1210 0 : VpEngine::getAipsRcBase ()
1211 : {
1212 0 : return "VpFramework";
1213 : }
1214 :
1215 : Int
1216 0 : VpEngine::getLogLevel ()
1217 : {
1218 0 : return logLevel_p;
1219 : }
1220 :
1221 : ROVisibilityIterator *
1222 0 : VpEngine::getVi ()
1223 : {
1224 0 : return vi_p;
1225 : }
1226 :
1227 : void
1228 0 : VpEngine::log (const String & formatString, ...)
1229 : {
1230 0 : if (! loggingInitialized_p){
1231 0 : initializeLogging ();
1232 : }
1233 :
1234 : va_list vaList;
1235 :
1236 0 : va_start (vaList, formatString);
1237 :
1238 0 : String result = formatV (formatString.c_str(), vaList);
1239 :
1240 0 : va_end (vaList);
1241 :
1242 0 : (* logIo_p) << result << endl << LogIO::POST;
1243 0 : }
1244 :
1245 : void
1246 0 : VpEngine::process (VisibilityProcessor & processor,
1247 : ROVisibilityIterator & vi,
1248 : const String & inputPortName)
1249 : {
1250 0 : ThrowIf (! processor.getInputs ().contains (inputPortName),
1251 : String::format ("VisibilityProcessor %s does not have an input port '%s'",
1252 : processor.getName().c_str(), inputPortName.c_str()));
1253 :
1254 0 : process (processor, vi, processor.getInput (inputPortName));
1255 0 : }
1256 :
1257 : void
1258 0 : VpEngine::process (VisibilityProcessor & processor,
1259 : ROVisibilityIterator & vi,
1260 : const VpPort & inputPortProvided)
1261 : {
1262 0 : Log (1, "VpEngine::process starting on processor '%s'", processor.getName().c_str());
1263 :
1264 0 : vi_p = & vi;
1265 :
1266 0 : VisBufferAutoPtr vbTemp (vi);
1267 0 : VbPtr vb (vbTemp.release());
1268 :
1269 0 : VpPort inputPort = inputPortProvided;
1270 :
1271 0 : if (inputPort.empty()){ // Take single input to VP as default if not specified
1272 :
1273 0 : VpPorts inputs = processor.getInputs ();
1274 :
1275 0 : ThrowIf (inputs.size() != 1,
1276 : String::format ("Vp '%s' must have exactly one input or an input must be specified explicitly",
1277 : processor.getName().c_str()));
1278 :
1279 0 : inputPort = inputs.front();
1280 : }
1281 :
1282 : // connect up input and then validate
1283 :
1284 0 : processor.processingStart ();
1285 :
1286 0 : Int chunkNumber = 0;
1287 0 : Int subchunkNumber = 0;
1288 :
1289 : try {
1290 :
1291 0 : for (vi.originChunks ();
1292 0 : vi.moreChunks();
1293 0 : vi.nextChunk (), chunkNumber ++){
1294 :
1295 0 : Int iteration = 0;
1296 0 : VisibilityProcessor::ChunkCode chunkCode = VisibilityProcessor::Normal;
1297 :
1298 0 : do { // The VP can request repeating a chunk
1299 :
1300 0 : Log (2, "VpEngine::process: Starting chunk %d (iteration=%d)\n",
1301 : chunkNumber, iteration);
1302 :
1303 0 : processor.chunkStart (SubchunkIndex (chunkNumber, SubchunkIndex::Invalid, iteration));
1304 :
1305 0 : subchunkNumber = 0;
1306 :
1307 0 : for (vi.origin (); vi.more (); ++ vi, subchunkNumber ++){
1308 :
1309 0 : vb->dirtyComponentsClear();
1310 0 : VisibilityProcessor::ProcessingResult ignored;
1311 :
1312 0 : SubchunkIndex sci (chunkNumber, subchunkNumber, iteration);
1313 :
1314 0 : Log (2, "VpEngine::process: Starting Subchunk %s \n",
1315 : sci.toString ().c_str());
1316 :
1317 0 : VpData data (inputPort, vb);
1318 0 : ignored = processor.doProcessing (VisibilityProcessor::Subchunk,
1319 : data,
1320 : this,
1321 0 : sci);
1322 : }
1323 :
1324 0 : VpData noData;
1325 0 : VpData ignored;
1326 0 : std::tie (chunkCode, ignored) =
1327 0 : processor.doProcessing (VisibilityProcessor::EndOfChunk,
1328 : noData,
1329 : this,
1330 0 : SubchunkIndex (chunkNumber, SubchunkIndex::Invalid, iteration));
1331 :
1332 0 : iteration ++;
1333 :
1334 0 : } while (chunkCode == VisibilityProcessor::RepeatChunk);
1335 :
1336 : }
1337 : }
1338 0 : catch (AipsError & e){
1339 : }
1340 :
1341 0 : VisibilityProcessor::ProcessingResult ignored;
1342 0 : VpData noData;
1343 :
1344 0 : ignored = processor.doProcessing (VisibilityProcessor::EndOfData,
1345 : noData,
1346 : this,
1347 0 : SubchunkIndex ());
1348 :
1349 0 : Log (1, "VpEngine::process completed for processor '%s'", processor.getName().c_str());
1350 :
1351 0 : }
1352 :
1353 0 : VpPort::VpPort()
1354 : : connectedInput_p (false),
1355 : connectedOutput_p (false),
1356 : name_p (""),
1357 : visibilityProcessor_p (NULL),
1358 0 : type_p (Unknown)
1359 0 : {}
1360 :
1361 :
1362 0 : VpPort::VpPort (VisibilityProcessor * vp, const String & name, VpPort::Type type)
1363 : : connectedInput_p (false),
1364 : connectedOutput_p (false),
1365 : name_p (name),
1366 : visibilityProcessor_p (vp),
1367 0 : type_p (type)
1368 0 : {}
1369 :
1370 : Bool
1371 0 : VpPort::operator== (const VpPort & other) const
1372 : {
1373 0 : Bool result = other.getVp() == getVp() && other.getName() == getName();
1374 :
1375 0 : return result;
1376 : }
1377 :
1378 : Bool
1379 0 : VpPort::operator< (const VpPort & other) const
1380 : {
1381 0 : Bool result = other.getVp() < getVp() ||
1382 0 : (other.getVp() == getVp() && other.getName() < getName());
1383 :
1384 0 : return result;
1385 : }
1386 :
1387 :
1388 : Bool
1389 0 : VpPort::empty () const
1390 : {
1391 0 : return visibilityProcessor_p == NULL;
1392 : }
1393 :
1394 : String
1395 0 : VpPort::getFullName () const
1396 : {
1397 0 : String vpName = "*NULL*";
1398 0 : if (getVp() != NULL){
1399 0 : vpName = getVp()->getFullName();
1400 : }
1401 :
1402 0 : return String::format ("%s:%s", vpName.c_str(), getName().c_str());
1403 : }
1404 :
1405 : String
1406 0 : VpPort::getName () const
1407 : {
1408 0 : return name_p;
1409 : }
1410 :
1411 : VpPort::Type
1412 0 : VpPort::getType () const
1413 : {
1414 0 : return type_p;
1415 : }
1416 :
1417 : VisibilityProcessor *
1418 0 : VpPort::getVp ()
1419 : {
1420 0 : return visibilityProcessor_p;
1421 : }
1422 :
1423 : const VisibilityProcessor *
1424 0 : VpPort::getVp () const
1425 : {
1426 0 : return visibilityProcessor_p;
1427 : }
1428 :
1429 : Bool
1430 0 : VpPort::isConnectedInput () const
1431 : {
1432 0 : return connectedInput_p;
1433 : }
1434 :
1435 : Bool
1436 0 : VpPort::isConnectedOutput () const
1437 : {
1438 0 : return connectedOutput_p;
1439 : }
1440 :
1441 : Bool
1442 0 : VpPort::isType (Type t) const
1443 : {
1444 0 : return (type_p & t) != 0;
1445 : }
1446 :
1447 : void
1448 0 : VpPort::setConnectedInput ()
1449 : {
1450 0 : AssertAlways (! empty() && ! connectedInput_p && isType (Input));
1451 :
1452 0 : connectedInput_p = true;
1453 0 : }
1454 :
1455 : void
1456 0 : VpPort::setConnectedOutput ()
1457 : {
1458 0 : AssertAlways (! empty() && ! connectedOutput_p && isType (Output));
1459 :
1460 0 : connectedOutput_p = true;
1461 0 : }
1462 :
1463 : Bool
1464 0 : VpPorts::contains (const String & name) const
1465 : {
1466 0 : Bool foundIt = find (name, begin(), end()) != end();
1467 :
1468 0 : return foundIt;
1469 : }
1470 :
1471 : Bool
1472 0 : VpPorts::contains (const VpPort & port) const
1473 : {
1474 0 : Bool foundIt = std::find (begin(), end(), port) != end();
1475 :
1476 0 : return foundIt;
1477 : }
1478 :
1479 : VpPort
1480 0 : VpPorts::get (const String & name) const
1481 : {
1482 0 : const_iterator i = find (name, begin(), end());
1483 0 : ThrowIf (i == end(), "No such port '" + name + "'");
1484 :
1485 0 : return * i;
1486 : }
1487 :
1488 : VpPort &
1489 0 : VpPorts::getRef (const String & name)
1490 : {
1491 0 : iterator i = find (name, begin(), end());
1492 0 : ThrowIf (i == end(), "No such port '" + name + "'");
1493 :
1494 0 : return * i;
1495 : }
1496 :
1497 : String
1498 0 : VpPorts::toString () const
1499 : {
1500 :
1501 0 : String result;
1502 :
1503 0 : result = utilj::containerToString (begin(), end(), & VpPort::getName);
1504 :
1505 0 : return result;
1506 : }
1507 :
1508 :
1509 0 : WriterVp::WriterVp (const String & name,
1510 : VisibilityIterator * vi,
1511 : Bool advanceVi,
1512 : const String & input,
1513 0 : const String & output)
1514 0 : : VisibilityProcessor (name, vector<String> (1, input), vector<String> (1, output)),
1515 : advanceVi_p (advanceVi),
1516 : disableOutput_p (false),
1517 0 : vi_p (vi)
1518 : {
1519 0 : ThrowIf (advanceVi_p && vi == NULL,
1520 : String::format ("Parameter advanceVi can only be true if a VI is provided for WriterVp '%s',",
1521 : name.c_str()));
1522 0 : }
1523 :
1524 : VisibilityProcessor::ProcessingResult
1525 0 : WriterVp::doProcessingImpl (ProcessingType /*processingType*/,
1526 : VpData & inputData,
1527 : const SubchunkIndex & /*subChunkIndex*/)
1528 : {
1529 0 : if (inputData.empty()){
1530 0 : return ProcessingResult(); // Nothing to write
1531 : }
1532 :
1533 0 : VpPort inputPort = getInputs () [0];
1534 :
1535 0 : ThrowIf (! utilj::containsKey (inputPort, inputData),
1536 : String::format ("Input data not found for port '%s' in VP '%s'",
1537 : inputPort.getName().c_str(),
1538 : getName().c_str()));
1539 :
1540 : // Get the (writable) VisibilityIterator
1541 :
1542 0 : VisibilityIterator * vi = vi_p;
1543 :
1544 0 : if (vi == NULL){
1545 :
1546 : // If a VI wasn't provided, then use the one being swept by the VI engine
1547 :
1548 0 : vi = dynamic_cast <VisibilityIterator *> (getVi());
1549 : }
1550 :
1551 :
1552 0 : ThrowIf (vi == NULL, String::format ("No writable VI found in VP '%s'", getName().c_str()));
1553 :
1554 : // Write out the data to the VI
1555 :
1556 :
1557 :
1558 : try{
1559 0 : if (! disableOutput_p){
1560 0 : vi->writeBack (inputData [inputPort].get());
1561 :
1562 0 : if (advanceVi_p){
1563 :
1564 : // Advance VI to the next position. If the current chunk is exhausted then
1565 : // advance the chunk and reset to the chunk's origin.
1566 :
1567 0 : (* vi) ++;
1568 0 : if (! vi->more()){
1569 0 : vi->nextChunk();
1570 0 : if (vi->moreChunks()){
1571 0 : vi->origin();
1572 : }
1573 : }
1574 : }
1575 : }
1576 : }
1577 0 : catch (AipsError & e){
1578 0 : Rethrow (e, String::format ("While '%s' writing VB to disk", getName().c_str()));
1579 : }
1580 :
1581 0 : inputData [inputPort] -> dirtyComponentsClear();
1582 :
1583 : // Output the data if the output of this VP is connected.
1584 :
1585 0 : VpData outputData; // Start out with empty outputs
1586 :
1587 0 : VpPorts outputs = getOutputs(true); // get connected outputs
1588 :
1589 0 : if (! outputs.empty()){
1590 :
1591 : // An output was connected.
1592 :
1593 0 : outputData [outputs [0]] = inputData [inputPort];
1594 : }
1595 :
1596 0 : ProcessingResult processingResult (Normal, outputData);
1597 :
1598 0 : return processingResult;
1599 : }
1600 :
1601 : Bool
1602 0 : WriterVp::setDisableOutput (Bool disableIt)
1603 : {
1604 0 : Bool old = disableOutput_p;
1605 0 : disableOutput_p = disableIt;
1606 :
1607 0 : return old;
1608 : }
1609 :
1610 : void
1611 0 : WriterVp::validateImpl()
1612 : {
1613 0 : throwIfAnyInputsUnconnected ();
1614 0 : }
1615 :
1616 :
1617 :
1618 : ostream &
1619 0 : operator<< (ostream & os, const VisibilityProcessor::ProcessingType & processingType)
1620 : {
1621 0 : os << toString (processingType);
1622 :
1623 0 : return os;
1624 : }
1625 :
1626 :
1627 : } // end namespace vpu
1628 :
1629 : using namespace casacore;
1630 : } // end namespace casa
|