Line data Source code
1 : #include <synthesis/ImagerObjects/grpcInteractiveClean.h>
2 : #include <synthesis/ImagerObjects/SIMinorCycleController.h>
3 : #include <casatools/Config/State.h>
4 : #include <casacore/casa/Logging/LogIO.h>
5 : #include <casacore/images/Images/PagedImage.h>
6 : #include <stdcasa/StdCasa/CasacSupport.h>
7 : #include <string.h>
8 : #include <stdlib.h>
9 : #include <sys/types.h>
10 : #include <sys/wait.h>
11 : #include <iostream>
12 :
13 : #include <sys/types.h>
14 : #include <sys/stat.h>
15 : #include <unistd.h>
16 : #include <array>
17 : #include <regex>
18 : #include <string>
19 :
20 : #include <algorithm>
21 : #include <cctype>
22 : #include <locale>
23 :
24 : #include <grpc++/grpc++.h>
25 : #include "shutdown.grpc.pb.h"
26 : #include "img.grpc.pb.h"
27 : #include "ping.grpc.pb.h"
28 :
29 : #include <stdcasa/StdCasa/CasacSupport.h>
30 :
31 : #ifdef __APPLE__
32 : extern "C" char **environ;
33 : #include <unistd.h>
34 : #endif
35 :
36 : using namespace casacore;
37 :
38 : // https://stackoverflow.com/questions/216823/whats-the-best-way-to-trim-stdstring
39 : // C++ is so ridiculous... trim from start (in place)
40 0 : static inline void ltrim(std::string &s) {
41 0 : s.erase(s.begin(), std::find_if(s.begin(), s.end(), [](int ch) {
42 0 : return !std::isspace(ch);
43 0 : }));
44 0 : }
45 :
46 : // trim from end (in place)
47 0 : static inline void rtrim(std::string &s) {
48 0 : s.erase(std::find_if(s.rbegin(), s.rend(), [](int ch) {
49 0 : return !std::isspace(ch);
50 0 : }).base(), s.end());
51 0 : }
52 :
53 : // trim from both ends (in place)
54 0 : static inline void trim(std::string &s) {
55 0 : ltrim(s);
56 0 : rtrim(s);
57 0 : }
58 :
59 : // Executes the given program, with the given arguments and the given environment.
60 : // The stdout from the program is collected and returned in output, up to outputlen characters.
61 : // @param envp To get around the MPI issue from CAS-13252, this should probably come from getenv_sansmpi().
62 0 : static void execve_getstdout(char *pathname, char *argv[], char *envp[], char *output, ssize_t outputlen)
63 : {
64 : // We use execve here instead of popen to get around issues related to using MPI.
65 : // MPI crashes when starting a process that calls MPI_Init in a process spawned using popen or exec.
66 : // We can trick MPI into behaving itself by removing all the MPI environmental variables for
67 : // the child precess (thus getenv_sansmpi and execve).
68 :
69 : int filedes[2];
70 0 : if (pipe(filedes) == -1) {
71 0 : std::cerr << "pipe error" << std::endl;
72 0 : exit(1);
73 : }
74 :
75 0 : pid_t pid = fork();
76 0 : if (pid == -1) {
77 0 : std::cerr << "fork error" << std::endl;
78 0 : exit(1);
79 0 : } else if (pid == 0) { // child
80 : // close stdout and connect it to the input of the pipe
81 0 : while ((dup2(filedes[1], STDOUT_FILENO) == -1) && (errno == EINTR)) {}
82 0 : close(filedes[1]);
83 0 : close(filedes[0]);
84 : // exec on the child process
85 0 : execve(pathname, argv, envp);
86 0 : exit(1);
87 : } else { // parent
88 : // don't care about the input end of the pipe
89 0 : close(filedes[1]);
90 :
91 0 : const ssize_t tmplen = 128;
92 : char tmp[tmplen];
93 0 : ssize_t total = 0;
94 : while (1) {
95 0 : ssize_t count = read(filedes[0], tmp, tmplen);
96 0 : if (count == -1) {
97 0 : if (errno == EINTR) {
98 0 : continue;
99 : } else {
100 0 : std::cerr << "read error" << std::endl;
101 0 : exit(1);
102 : }
103 0 : } else if (count == 0) {
104 0 : break;
105 : } else {
106 0 : ssize_t remaining = outputlen - total;
107 0 : ssize_t cpysize = (count < remaining) ? count : remaining;
108 0 : memcpy(output+total, tmp, cpysize);
109 0 : total += cpysize;
110 0 : output[total] = '\0';
111 : }
112 0 : }
113 :
114 0 : close(filedes[0]);
115 : }
116 0 : }
117 :
118 : // Get all environment parameters (as from the "environ" posix variable),
119 : // but don't include any environment parameters that match "*MPI*".
120 : // @return A malloc'ed set of environment parameters. Should call free after use.
121 0 : static char **getenv_sansmpi()
122 : {
123 0 : int nvars = 0, nvars_sansmpi = 0;
124 0 : for (nvars = 0; environ[nvars] != NULL; nvars++) {
125 : // printf("%s\n", environ[nvars]);
126 0 : std::string envvar = environ[nvars];
127 0 : if (envvar.find("MPI") == std::string::npos) {
128 0 : nvars_sansmpi++;
129 : }
130 : }
131 :
132 0 : char **ret = (char**)malloc(sizeof(char*) * (nvars_sansmpi+1));
133 0 : int retidx = 0;
134 0 : for (int i = 0; environ[i] != NULL; i++) {
135 0 : std::string envvar = environ[i];
136 0 : if (envvar.find("MPI") == std::string::npos) {
137 0 : ret[retidx] = environ[i];
138 0 : retidx++;
139 : }
140 : }
141 0 : ret[nvars_sansmpi] = NULL;
142 :
143 0 : return ret;
144 : }
145 :
146 : namespace casa { //# NAMESPACE CASA - BEGIN
147 :
148 629 : grpcInteractiveCleanManager &grpcInteractiveClean::getManager( ) {
149 629 : static grpcInteractiveCleanManager mgr;
150 629 : return mgr;
151 : }
152 :
153 3346 : void grpcInteractiveCleanManager::pushDetails() {
154 3346 : }
155 :
156 1 : grpcInteractiveCleanState::grpcInteractiveCleanState( ) : SummaryMinor(casacore::IPosition(2,
157 : SIMinorCycleController::nSummaryFields, // temporary CAS-13683 workaround
158 : // SIMinorCycleController::useSmallSummaryminor() ? 6 : SIMinorCycleController::nSummaryFields, // temporary CAS-13683 workaround
159 : 0)),
160 2 : SummaryMajor(casacore::IPosition(1,0)) {
161 3 : LogIO os( LogOrigin("grpcInteractiveCleanState",__FUNCTION__,WHERE) );
162 1 : reset( );
163 1 : }
164 :
165 1259 : void grpcInteractiveCleanState::reset( ) {
166 1259 : Niter = 0;
167 1259 : MajorDone = 0;
168 1259 : CycleNiter = 0;
169 1259 : InteractiveNiter = 0;
170 1259 : Threshold = 0;
171 1259 : CycleThreshold = 0;
172 1259 : InteractiveThreshold = 0.0;
173 1259 : IsCycleThresholdAuto = true;
174 1259 : IsCycleThresholdMutable = true;
175 1259 : IsThresholdAuto = false;
176 1259 : CycleFactor = 1.0;
177 1259 : LoopGain = 0.1;
178 1259 : StopFlag = false;
179 1259 : PauseFlag = false;
180 1259 : InteractiveMode = false;
181 1259 : UpdatedModelFlag = false;
182 1259 : InteractiveIterDone = 0;
183 1259 : IterDone = 0;
184 1259 : StopCode = 0;
185 1259 : Nsigma = 0.0;
186 1259 : MaxPsfSidelobe = 0.0;
187 1259 : MinPsfFraction = 0.05;
188 1259 : MaxPsfFraction = 0.8;
189 1259 : PeakResidual = 0.0;
190 1259 : MinorCyclePeakResidual = 0.0;
191 1259 : PrevPeakResidual = -1.0;
192 1259 : NsigmaThreshold = 0.0;
193 1259 : PrevMajorCycleCount = 0;
194 1259 : PeakResidualNoMask = 0.0;
195 1259 : PrevPeakResidualNoMask = -1.0;
196 1259 : MinPeakResidualNoMask = 1e+9;
197 1259 : MinPeakResidual = 1e+9;
198 1259 : MaskSum = -1.0;
199 1259 : MadRMS = 0.0;
200 : //int nSummaryFields = SIMinorCycleController::useSmallSummaryminor() ? 6 : SIMinorCycleController::nSummaryFields; // temporary CAS-13683 workaround
201 1259 : int nSummaryFields = SIMinorCycleController::nSummaryFields; // temporary CAS-13683 workaround
202 : //int nSummaryFields = !FullSummary ? 6 : SIMinorCycleController::nSummaryFields; // temporary CAS-13683 workaround
203 1259 : SummaryMinor.reformOrResize(casacore::IPosition(2, nSummaryFields ,0));
204 1259 : SummaryMajor.reformOrResize(casacore::IPosition(1,0));
205 1259 : SummaryMinor = 0;
206 1259 : SummaryMajor = 0;
207 1259 : }
208 :
209 0 : void grpcInteractiveCleanManager::setControls( int niter, int ncycle, float threshold ) {
210 0 : LogIO os( LogOrigin("grpcInteractiveCleanManager", __FUNCTION__, WHERE) );
211 0 : static const auto debug = getenv("GRPC_DEBUG");
212 0 : if ( debug ) std::cerr << "setting clean controls:";
213 0 : access( (void*) 0,
214 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
215 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
216 :
217 0 : state.Niter = niter;
218 0 : if ( debug ) std::cerr << " niter=" << state.Niter;
219 0 : state.CycleNiter = ncycle;
220 0 : if ( debug ) std::cerr << " cycleniter=" << state.CycleNiter;
221 0 : state.Threshold = threshold;
222 0 : if ( debug ) std::cerr << " threshold=" << state.Threshold;
223 0 : return dummy;
224 :
225 : } ) );
226 :
227 0 : if ( debug ) {
228 0 : std::cerr << " (process " << getpid( ) << ", thread " <<
229 0 : std::this_thread::get_id() << ")" << std::endl;
230 0 : fflush(stderr);
231 : }
232 :
233 0 : }
234 629 : void grpcInteractiveCleanManager::setControlsFromRecord(const casac::record &iterpars) {
235 1887 : LogIO os( LogOrigin("grpcInteractiveCleanManager", __FUNCTION__, WHERE) );
236 629 : static const auto debug = getenv("GRPC_DEBUG");
237 :
238 629 : if ( debug ) std::cerr << "initializing clean controls:";
239 :
240 629 : access( (void*) 0,
241 1258 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
242 629 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
243 :
244 629 : auto oldNiter = state.Niter;
245 629 : auto oldCycleNiter = state.CycleNiter;
246 629 : auto oldThreshold = state.Threshold;
247 629 : auto oldCycleThreshold = state.CycleThreshold;
248 :
249 629 : state.reset( );
250 :
251 : /* Note it is important that niter get set first as we catch
252 : negative values in the cycleniter, and set it equal to niter */
253 629 : auto niter = iterpars.find("niter");
254 629 : if ( niter != iterpars.end( ) ) {
255 629 : state.Niter = niter->second.toInt( );
256 629 : if ( debug ) std::cerr << " niter=" << state.Niter;
257 : }
258 629 : auto newNiter = state.Niter;
259 :
260 629 : auto cycleniter = iterpars.find("cycleniter");
261 629 : if ( cycleniter != iterpars.end( ) ) {
262 629 : int val = cycleniter->second.toInt( );
263 629 : if ( val <= 0 )
264 5 : state.CycleNiter = state.Niter;
265 : else
266 624 : state.CycleNiter = val;
267 629 : if ( debug ) std::cerr << " cycleniter=" << state.CycleNiter;
268 : }
269 629 : auto newCycleNiter = state.CycleNiter;
270 :
271 629 : auto interactiveniter = iterpars.find("interactiveniter");
272 629 : if ( interactiveniter != iterpars.end( ) ) {
273 0 : state.InteractiveNiter = interactiveniter->second.toInt( );
274 0 : if ( debug ) std::cerr << " interactiveniter=" << state.InteractiveNiter;
275 : }
276 :
277 629 : auto threshold = iterpars.find("threshold");
278 629 : if ( threshold != iterpars.end( ) ) {
279 1258 : auto quant = casaQuantity(threshold->second);
280 629 : if ( quant.getUnit( ) == "" ) quant.setUnit("Jy");
281 629 : auto val = quant.getValue(Unit("Jy"));
282 629 : if ( val == -1.0 ) {
283 0 : state.Threshold = 0.0;
284 0 : state.IsThresholdAuto = true;
285 : } else {
286 629 : state.Threshold = (float) val;
287 629 : state.IsThresholdAuto = false;
288 : }
289 629 : if ( debug ) {
290 0 : std::cerr << " threshold=" << state.Threshold;
291 : std::cerr << " isthresholdauto=" <<
292 0 : (state.IsThresholdAuto ? "true" : "false");
293 : }
294 : }
295 629 : auto newThreshold = state.Threshold;
296 :
297 629 : auto cyclethreshold = iterpars.find("cyclethreshold");
298 629 : if ( cyclethreshold != iterpars.end( ) ) {
299 176 : state.CycleThreshold = casaQuantity(cyclethreshold->second).getValue(Unit("Jy"));
300 176 : state.IsCycleThresholdAuto = false;
301 176 : if ( debug ) {
302 0 : std::cerr << " cyclethreshold=" << state.CycleThreshold;
303 : std::cerr << " iscyclethresholdauto=" <<
304 0 : (state.IsCycleThresholdAuto ? "true" : "false");
305 0 : fflush(stderr);
306 : }
307 : }
308 629 : auto newCycleThreshold = state.CycleThreshold;
309 :
310 629 : auto cyclethresholdismutable = iterpars.find("cyclethresholdismutable");
311 629 : if ( cyclethresholdismutable != iterpars.end( ) ) {
312 176 : state.IsCycleThresholdMutable = cyclethresholdismutable->second.toBool( );
313 176 : state.IsCycleThresholdAuto = state.IsCycleThresholdAuto && state.IsCycleThresholdMutable;
314 176 : if ( debug ) {
315 0 : std::cerr << " iscyclethresholdmutable=" << (state.IsCycleThresholdMutable ? "true" : "false");
316 0 : std::cerr << " iscyclethresholdauto=" << (state.IsCycleThresholdAuto ? "true" : "false");
317 0 : fflush(stderr);
318 : }
319 : }
320 :
321 629 : auto interactivethreshold = iterpars.find("interactivethreshold");
322 629 : if ( interactivethreshold != iterpars.end( ) ) {
323 0 : state.InteractiveThreshold = casaQuantity(interactivethreshold->second).getValue(Unit("Jy"));
324 0 : if ( debug ) std::cerr << " interactivethreshold=" << state.InteractiveThreshold;
325 : }
326 :
327 629 : auto loopgain = iterpars.find("loopgain");
328 629 : if ( loopgain != iterpars.end( ) ) {
329 629 : state.LoopGain = (float) loopgain->second.toDouble( );
330 629 : if ( debug ) std::cerr << " loopgain=" << state.LoopGain;
331 : }
332 629 : auto cyclefactor = iterpars.find("cyclefactor");
333 629 : if ( cyclefactor != iterpars.end( ) ) {
334 629 : state.CycleFactor = (float) cyclefactor->second.toDouble( );
335 629 : if ( debug ) std::cerr << " cyclefactor=" << state.CycleFactor;
336 : }
337 :
338 629 : auto interactivemode = iterpars.find("interactive");
339 629 : if ( interactivemode != iterpars.end( ) ) {
340 629 : state.InteractiveMode = interactivemode->second.toBool( );
341 629 : if ( debug ) std::cerr << " interactive=" <<
342 0 : (state.InteractiveMode ? "true" : "false");
343 : }
344 :
345 629 : auto minpsffraction = iterpars.find("minpsffraction");
346 629 : if ( minpsffraction != iterpars.end( ) ) {
347 629 : state.MinPsfFraction = (float) minpsffraction->second.toDouble( );
348 629 : if ( debug ) std::cerr << " minpsffraction=" << state.MinPsfFraction;
349 : }
350 :
351 629 : auto maxpsffraction = iterpars.find("maxpsffraction");
352 629 : if ( maxpsffraction != iterpars.end( ) ) {
353 629 : state.MaxPsfFraction = (float) maxpsffraction->second.toDouble( );
354 629 : if ( debug ) std::cerr << " maxpsffraction=" << state.MaxPsfFraction;
355 : }
356 :
357 629 : auto nsigma = iterpars.find("nsigma");
358 629 : if ( nsigma != iterpars.end( ) ) {
359 629 : state.Nsigma = (float) nsigma->second.toDouble( );
360 629 : if ( debug ) std::cerr << " nsigma=" << state.Nsigma;
361 : }
362 629 : auto fullsummary = iterpars.find("fullsummary");
363 629 : if ( fullsummary != iterpars.end() ) {
364 629 : state.FullSummary = fullsummary->second.toBool();
365 629 : if ( debug ) std::cerr << " fullsummry=" << state.FullSummary;
366 : }
367 629 : if ( debug ) std::cerr << std::endl;
368 :
369 629 : if ( debug ) {
370 0 : std::cerr << "-------------------------------------------" << this << " / " << &state << std::endl;
371 0 : std::cerr << " exported python state: " << std::endl;
372 0 : std::cerr << "-------------------------------------------" << std::endl;
373 0 : std::cerr << " Niter " << oldNiter <<
374 0 : " ---> " << newNiter << std::endl;
375 0 : std::cerr << " CycleNiter " << oldCycleNiter <<
376 0 : " ---> " << newCycleNiter << std::endl;
377 0 : std::cerr << " Threshold " << oldThreshold <<
378 0 : " ---> " << newThreshold << std::endl;
379 0 : std::cerr << " CycleThreshold " << oldCycleThreshold <<
380 0 : " ---> " << newCycleThreshold << std::endl;
381 0 : std::cerr << " IsCycleThresholdAuto " << state.IsCycleThresholdAuto << std::endl;
382 0 : std::cerr << "-------------------------------------------" << std::endl;
383 :
384 :
385 : }
386 629 : return dummy;
387 :
388 : } ) );
389 :
390 629 : if ( debug ) {
391 0 : std::cerr << " (process " << getpid( ) << ", thread " <<
392 0 : std::this_thread::get_id() << ")" << std::endl;
393 0 : fflush(stderr);
394 : }
395 629 : }
396 : /*
397 : Float SIIterBot_state::readThreshold( Record recordIn, String id ) {
398 : LogIO os( LogOrigin("SIIterBot_state",__FUNCTION__,WHERE) );
399 : std::lock_guard<std::recursive_mutex> guard(recordMutex);
400 : // Threshold can be a variant, either Float or String(with units).
401 : Float fthresh=0.0;
402 : // If a number, treat it as a number in units of Jy.
403 : if( recordIn.dataType(id) == TpFloat ||
404 : recordIn.dataType(id) == TpDouble ||
405 : recordIn.dataType(id) == TpInt )
406 : { fthresh = recordIn.asFloat( RecordFieldId(id)); }
407 : // If a string, try to convert to a Quantity
408 : else if( recordIn.dataType(id) == TpString )
409 : {
410 : Quantity thresh;
411 : // If it cannot be converted to a Quantity.... complain, and use zero.
412 : if( ! casacore::Quantity::read( thresh, recordIn.asString( RecordFieldId(id) ) ) )
413 : {os << LogIO::WARN << "Cannot parse threshold value. Setting to zero." << LogIO::POST;
414 : fthresh=0.0;}
415 : // If converted to Quantity, get value in Jy.
416 : // ( Note : This does not check for wrong units, e.g. if the user says '100m' ! )
417 : else { fthresh = thresh.getValue(Unit("Jy")); }
418 : }
419 : // If neither valid datatype, print a warning and use zero.
420 : else {os << LogIO::WARN << id << " is neither a number nor a string Quantity. Setting to zero." << LogIO::POST;
421 : fthresh=0.0; }
422 :
423 : return fthresh;
424 : }
425 : */
426 629 : void grpcInteractiveCleanManager::setIterationDetails(const casac::record &iterpars) {
427 1887 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
428 629 : static const auto debug = getenv("GRPC_DEBUG");
429 :
430 : // Setup interactive masking : list of image names.
431 629 : if ( clean_images.size( ) == 0 ) {
432 : try {
433 : ////////////////////////////////////////////////////////////////////////////////////////////////////////
434 : ///// START : code to get a list of image names for interactive masking
435 :
436 629 : auto allimages = iterpars.find("allimages");
437 629 : if ( allimages != iterpars.end( ) ) {
438 1258 : auto rec = allimages->second.getRecord( );
439 1269 : for ( auto it = rec.begin( ); it != rec.end( ); ++it ) {
440 1280 : auto oneimg = it->second.getRecord( );
441 640 : auto img_name = oneimg.find("imagename");
442 640 : auto img_multiterm = oneimg.find("multiterm");
443 640 : if ( img_name != oneimg.end( ) && img_multiterm != oneimg.end( ) ) {
444 640 : clean_images.push_back( std::make_tuple( img_name->second.getString(),
445 1280 : img_multiterm->second.toBool( ), false, 0) );
446 : }
447 : }
448 : } else {
449 0 : throw( AipsError("Need image names and nterms in iteration parameter list") );
450 : }
451 629 : if ( clean_images.size( ) <= 0 ) {
452 0 : throw( AipsError("Need image names for iteration") );
453 : }
454 :
455 629 : if ( debug ) {
456 0 : std::cerr << "clean images specified: ";
457 0 : for ( auto it = clean_images.begin( ); it != clean_images.end( ); ++it ) {
458 0 : if ( it != clean_images.begin( ) ) std::cerr << ", ";
459 0 : std::cerr << std::get<0>(*it) << " [" << (std::get<1>(*it) ? "true" : "false") << "]";
460 : }
461 0 : std::cerr << " (process " << getpid( ) << ", thread " <<
462 0 : std::this_thread::get_id() << ")" << std::endl;
463 0 : fflush(stderr);
464 : }
465 :
466 : ///// END : code to get a list of image names for interactive masking
467 : ////////////////////////////////////////////////////////////////////////////////////////////////////////
468 :
469 629 : setControlsFromRecord( iterpars );
470 629 : Int nSummaryFields = !state.FullSummary ? 6 : SIMinorCycleController::nSummaryFields;
471 629 : state.SummaryMinor.reformOrResize(casacore::IPosition(2, nSummaryFields ,0));
472 :
473 0 : } catch( AipsError &x ) {
474 0 : throw( AipsError("Error in updating iteration parameters : " + x.getMesg()) );
475 : }
476 : }
477 629 : }
478 :
479 3346 : void grpcInteractiveCleanManager::updateCycleThreshold( grpcInteractiveCleanState &state ) {
480 3346 : static const auto debug = getenv("GRPC_DEBUG");
481 :
482 3346 : Float psffraction = state.MaxPsfSidelobe * state.CycleFactor;
483 :
484 3346 : psffraction = max(psffraction, state.MinPsfFraction);
485 3346 : psffraction = min(psffraction, state.MaxPsfFraction);
486 :
487 3346 : if ( debug ) {
488 0 : std::cerr << "------------------------------------------- " << this << " / " << &state << std::endl;
489 0 : std::cerr << " algorithmic update of cycle threshold: " << std::endl;
490 0 : std::cerr << " CycleThreshold " << state.CycleThreshold <<
491 0 : " ---> " << (state.PeakResidual * psffraction) << std::endl;
492 0 : std::cerr << " IsCycleThresholdAuto " << state.IsCycleThresholdAuto << std::endl;
493 0 : std::cerr << "-------------------------------------------" << std::endl;
494 : }
495 :
496 3346 : state.CycleThreshold = state.PeakResidual * psffraction;
497 3346 : pushDetails();
498 3346 : }
499 :
500 1150 : void grpcInteractiveCleanManager::addSummaryMajor( ) {
501 1150 : access( (void*) 0,
502 2300 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
503 1150 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
504 1150 : IPosition shp = state.SummaryMajor.shape();
505 1150 : if( shp.nelements() != 1 )
506 0 : throw(AipsError("Internal error in shape of major-cycle summary record"));
507 :
508 1150 : state.SummaryMajor.resize( IPosition( 1, shp[0]+1 ) , true );
509 1150 : state.SummaryMajor( IPosition(1, shp[0] ) ) = state.IterDone;
510 2300 : return dummy; } ) );
511 1150 : }
512 :
513 1243 : casacore::Record grpcInteractiveCleanManager::getDetailsRecord( bool includeSummary ) {
514 3729 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
515 :
516 2486 : Record returnRecord;
517 :
518 : Record result = access( returnRecord,
519 1243 : std::function< casacore::Record ( casacore::Record, grpcInteractiveCleanState & )>(
520 1243 : [&]( casacore::Record rec, grpcInteractiveCleanState &state )->casacore::Record {
521 : //*** Control Variables **************************************************
522 1243 : rec.define( RecordFieldId("niter"), state.Niter );
523 1243 : rec.define( RecordFieldId("cycleniter"), state.CycleNiter );
524 1243 : rec.define( RecordFieldId("interactiveniter"), state.InteractiveNiter );
525 :
526 1243 : rec.define( RecordFieldId("threshold"), state.Threshold );
527 1243 : rec.define( RecordFieldId("nsigma"), state.Nsigma );
528 :
529 1243 : if( state.IsCycleThresholdAuto == true ) updateCycleThreshold(state);
530 1243 : state.IsCycleThresholdAuto = true && state.IsCycleThresholdMutable; // Reset this, for the next round
531 :
532 1243 : rec.define( RecordFieldId("cyclethreshold"), state.CycleThreshold );
533 1243 : rec.define( RecordFieldId("interactivethreshold"), state.InteractiveThreshold );
534 :
535 1243 : rec.define( RecordFieldId("loopgain"), state.LoopGain );
536 1243 : rec.define( RecordFieldId("cyclefactor"), state.CycleFactor );
537 :
538 : //*** Status Reporting Variables *****************************************
539 1243 : rec.define( RecordFieldId("iterdone"), state.IterDone );
540 1243 : rec.define( RecordFieldId("cycleiterdone"), state.MaxCycleIterDone );
541 1243 : rec.define( RecordFieldId("interactiveiterdone"),
542 1243 : state.InteractiveIterDone + state.MaxCycleIterDone);
543 :
544 1243 : rec.define( RecordFieldId("nmajordone"), state.MajorDone );
545 1243 : rec.define( RecordFieldId("maxpsfsidelobe"), state.MaxPsfSidelobe );
546 1243 : rec.define( RecordFieldId("maxpsffraction"), state.MaxPsfFraction );
547 1243 : rec.define( RecordFieldId("minpsffraction"), state.MinPsfFraction );
548 1243 : rec.define( RecordFieldId("interactivemode"), state.InteractiveMode );
549 :
550 1243 : rec.define( RecordFieldId("stopcode"), state.StopCode );
551 :
552 : //*** report clean's state ***********************************************
553 1243 : rec.define( RecordFieldId("cleanstate"),
554 1243 : state.StopFlag ? "stopped" : state.PauseFlag ? "paused" : "running" );
555 :
556 1243 : if ( includeSummary ) {
557 1243 : rec.define( RecordFieldId("summaryminor"), state.SummaryMinor );
558 1243 : rec.define( RecordFieldId("summarymajor"), state.SummaryMajor );
559 : }
560 :
561 3729 : return rec; }) );
562 :
563 2486 : return result;
564 :
565 :
566 : /* return access( returnRecord,
567 : std::function< casacore::Record ( casacore::Record, grpcInteractiveCleanState & )>(
568 : [&]( casacore::Record rec, grpcInteractiveCleanState &state )->casacore::Record {
569 : //*** Control Variables **************************************************
570 : rec.define( RecordFieldId("niter"), state.Niter );
571 : rec.define( RecordFieldId("cycleniter"), state.CycleNiter );
572 : rec.define( RecordFieldId("interactiveniter"), state.InteractiveNiter );
573 :
574 : rec.define( RecordFieldId("threshold"), state.Threshold );
575 : rec.define( RecordFieldId("nsigma"), state.Nsigma );
576 : if( state.IsCycleThresholdAuto == true ) updateCycleThreshold(state);
577 : state.IsCycleThresholdAuto = true && state.IsCycleThresholdMutable; // Reset this, for the next round
578 :
579 : rec.define( RecordFieldId("cyclethreshold"), state.CycleThreshold );
580 : rec.define( RecordFieldId("interactivethreshold"), state.InteractiveThreshold );
581 :
582 : rec.define( RecordFieldId("loopgain"), state.LoopGain );
583 : rec.define( RecordFieldId("cyclefactor"), state.CycleFactor );
584 :
585 : //*** Status Reporting Variables *****************************************
586 : rec.define( RecordFieldId("iterdone"), state.IterDone );
587 : rec.define( RecordFieldId("cycleiterdone"), state.MaxCycleIterDone );
588 : rec.define( RecordFieldId("interactiveiterdone"),
589 : state.InteractiveIterDone + state.MaxCycleIterDone);
590 :
591 : rec.define( RecordFieldId("nmajordone"), state.MajorDone );
592 : rec.define( RecordFieldId("maxpsfsidelobe"), state.MaxPsfSidelobe );
593 : rec.define( RecordFieldId("maxpsffraction"), state.MaxPsfFraction );
594 : rec.define( RecordFieldId("minpsffraction"), state.MinPsfFraction );
595 : rec.define( RecordFieldId("interactivemode"), state.InteractiveMode );
596 :
597 : rec.define( RecordFieldId("stopcode"), state.StopCode );
598 :
599 : //*** report clean's state ***********************************************
600 : rec.define( RecordFieldId("cleanstate"),
601 : state.StopFlag ? "stopped" : state.PauseFlag ? "paused" : "running" );
602 :
603 : if ( includeSummary ) {
604 : rec.define( RecordFieldId("summaryminor"), state.SummaryMinor );
605 : rec.define( RecordFieldId("summarymajor"), state.SummaryMajor );
606 : }
607 :
608 : return rec; }) );
609 : */
610 : }
611 :
612 :
613 909 : Record grpcInteractiveCleanManager::getMinorCycleControls( ){
614 2727 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
615 :
616 : /* This returns a record suitable for initializing the minor cycle controls. */
617 909 : Record returnRecord;
618 :
619 : return access( returnRecord,
620 1818 : std::function< casacore::Record ( casacore::Record, grpcInteractiveCleanState & )>(
621 909 : [&]( casacore::Record rec, grpcInteractiveCleanState &state )->casacore::Record {
622 :
623 : /* If autocalc, compute cyclethresh from peak res, cyclefactor and psf sidelobe
624 : Otherwise, the user has explicitly set it (interactively) for this minor cycle */
625 909 : if( state.IsCycleThresholdAuto == true ) { updateCycleThreshold(state); }
626 909 : state.IsCycleThresholdAuto = true && state.IsCycleThresholdMutable; /* Reset this, for the next round */
627 :
628 : /* The minor cycle will stop based on the cycle parameters. */
629 909 : int maxCycleIterations = state.CycleNiter;
630 909 : float cycleThreshold = state.CycleThreshold;
631 909 : maxCycleIterations = min(maxCycleIterations, state.Niter - state.IterDone);
632 909 : cycleThreshold = max(cycleThreshold, state.Threshold);
633 909 : bool thresholdReached = (cycleThreshold==state.Threshold)? True : False;
634 :
635 909 : rec.define( RecordFieldId("cycleniter"), maxCycleIterations);
636 909 : rec.define( RecordFieldId("cyclethreshold"), cycleThreshold);
637 909 : rec.define( RecordFieldId("loopgain"), state.LoopGain);
638 909 : rec.define( RecordFieldId("thresholdreached"), thresholdReached);
639 909 : rec.define( RecordFieldId("nsigma"), state.Nsigma);
640 :
641 2727 : return rec; }) );
642 : }
643 :
644 3284 : int grpcInteractiveCleanManager::cleanComplete( bool lastcyclecheck, bool reachedMajorLimit ){
645 6568 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
646 :
647 3284 : int stopCode=0;
648 :
649 3284 : return access( stopCode,
650 6568 : std::function< int ( int, grpcInteractiveCleanState & )>(
651 3284 : [&]( int stop_code, grpcInteractiveCleanState &state ) -> int {
652 :
653 : float usePeakRes;
654 :
655 3284 : if( lastcyclecheck==True ) { usePeakRes = state.MinorCyclePeakResidual; }
656 2134 : else { usePeakRes = state.PeakResidual; }
657 :
658 : // for debugging, remove it later
659 3284 : os<<LogIO::DEBUG1<<"cleanComplete-- CycleThreshold without Threshold limit="<<state.CycleThreshold<<LogIO::POST;
660 :
661 5408 : if( state.PeakResidual > 0 && state.PrevPeakResidual>0 &&
662 2124 : fabs( state.PeakResidual - state.PrevPeakResidual)/fabs(state.PrevPeakResidual) > 2.0 ) {
663 8 : os << "[WARN] Peak residual (within the mask) increased from " << state.PrevPeakResidual << " to " << state.PeakResidual << LogIO::POST;
664 : }
665 : // for debugging, remove it later
666 3284 : os <<LogIO::DEBUG1<<"Threshold="<<state.Threshold<<" itsNsigmaThreshold===="<<state.NsigmaThreshold<<LogIO::POST;
667 3284 : os <<LogIO::DEBUG1<<"usePeakRes="<<usePeakRes<<" itsPeakResidual="<<state.PeakResidual<<" itsPrevPeakRes="<<state.PrevPeakResidual<<LogIO::POST;
668 3284 : os <<LogIO::DEBUG1<<"itsIterDone="<<state.IterDone<<" itsNiter="<<state.Niter<<LogIO::POST;
669 3284 : os <<LogIO::DEBUG1<<"FullSummary="<<state.FullSummary<<LogIO::POST;
670 :
671 : /// This may interfere with some other criterion... check.
672 3284 : float tol = 0.01; // threshold test torelance (CAS-11278)
673 3284 : if ( state.MajorDone==0 && state.IterDone==0 && state.MaskSum==0.0) {
674 1 : stopCode=7; // if zero mask is detected it should exit right away
675 8858 : } else if ( state.IterDone >= state.Niter ||
676 2292 : usePeakRes <= state.Threshold ||
677 1832 : state.PeakResidual <= state.NsigmaThreshold ||
678 1529 : fabs(usePeakRes-state.Threshold)/state.Threshold < tol ||
679 7103 : fabs(state.PeakResidual - state.NsigmaThreshold)/state.NsigmaThreshold < tol ||
680 1528 : state.StopFlag ) {
681 : // os << "Reached global stopping criteria : ";
682 :
683 1755 : if ( state.IterDone >= state.Niter ) { stopCode=1; }
684 : //os << "Numer of iterations. "; // (" << state.IterDone << ") >= limit (" << state.Niter << ")" ;
685 1755 : if( usePeakRes <= state.Threshold || (usePeakRes-state.Threshold)/state.Threshold < tol) {stopCode=2; }
686 1289 : else if ( usePeakRes <= state.NsigmaThreshold || (state.PeakResidual - state.NsigmaThreshold)/state.NsigmaThreshold < tol ) {
687 47 : if (state.NsigmaThreshold!=0.0) { stopCode=8; } // for nsigma=0.0 this mode is turned off
688 : }
689 :
690 : //os << "Peak residual (" << state.PeakResidual << ") <= threshold(" << state.Threshold << ")";
691 1755 : if( state.StopFlag ) {stopCode=3;}
692 : //os << "Forced stop. ";
693 : // os << LogIO::POST;
694 :
695 : //return true;
696 :
697 : } else { // not converged yet... but....if nothing has changed in this round... also stop
698 :
699 1528 : if (state.MaskSum==0.0) {
700 : //cout << "(7) Mask is all zero.Stopping" << endl;
701 2 : stopCode = 7;
702 : }
703 : // Nothing has changed across the last set of minor cycle iterations and major cycle.
704 1826 : else if( state.IterDone>0 && (state.MajorDone>state.PrevMajorCycleCount) &&
705 300 : fabs(state.PrevPeakResidual - state.PeakResidual)<1e-10)
706 0 : {stopCode = 4;}
707 :
708 : // another non-convergent condition: diverging (relative increase is more than 3 times across one major cycle)
709 1844 : else if ( state.IterDone > 0 &&
710 318 : fabs(state.PeakResidualNoMask-state.PrevPeakResidualNoMask)/fabs(state.PrevPeakResidualNoMask) > 3.0) {
711 : //cout << "(5) Peak res (no mask) : " << state.PeakResidualNoMask
712 : // << " Dev from prev peak res " << state.PrevPeakResidualNoMask << endl;
713 0 : stopCode = 5;}
714 :
715 : // divergence check, 3 times increase from the minimum peak residual so far (across all previous major cycles).
716 1844 : else if ( state.IterDone > 0 &&
717 318 : (fabs(state.PeakResidualNoMask)-state.MinPeakResidualNoMask)/state.MinPeakResidualNoMask > 3.0 )
718 : {
719 : //cout << "(6) Peak res (no mask): " << state.PeakResidualNoMask
720 : // << " Dev from min peak res " << state.MinPeakResidualNoMask << endl;
721 1 : stopCode = 6;
722 : }
723 :
724 : }
725 :
726 3284 : if (stopCode == 0 && reachedMajorLimit) {
727 6 : stopCode = 9;
728 : }
729 :
730 : /*
731 : if( lastcyclecheck==False)
732 : {
733 : cout << "*****" << endl;
734 : cout << "Peak residual : " << state.PeakResidual << " No Mask : " << state.PeakResidualNoMask << endl;
735 : cout << "Prev Peak residual : " << state.PrevPeakResidual << " No Mask : " << state.PrevPeakResidualNoMask << endl;
736 : cout << "Min Peak residual : " << state.MinPeakResidual << " No Mask : " << state.MinPeakResidualNoMask << endl;
737 : }
738 : */
739 :
740 : // os << "Peak residual : " << state.PeakResidual << " and " << state.IterDone << " iterations."<< LogIO::POST;
741 : //cout << "cleancomp : stopcode : " << stopCode << endl;
742 :
743 : //cout << "peak res : " << state.PeakResidual << " state.minPR : " << state.MinPeakResidual << endl;
744 :
745 3284 : if( lastcyclecheck==False)
746 : {
747 2134 : if( fabs(state.PeakResidual) < state.MinPeakResidual )
748 1532 : {state.MinPeakResidual = fabs(state.PeakResidual);}
749 :
750 2134 : state.PrevPeakResidual = state.PeakResidual;
751 :
752 :
753 2134 : if( fabs(state.PeakResidualNoMask) < state.MinPeakResidualNoMask )
754 1509 : {state.MinPeakResidualNoMask = fabs(state.PeakResidualNoMask);}
755 :
756 2134 : state.PrevPeakResidualNoMask = state.PeakResidualNoMask;
757 :
758 2134 : state.PrevMajorCycleCount = state.MajorDone;
759 :
760 : }
761 :
762 3284 : state.StopCode=stopCode;
763 9852 : return stopCode; } ) );
764 : }
765 :
766 4199 : void grpcInteractiveCleanManager::resetMinorCycleInitInfo( grpcInteractiveCleanState &state ) {
767 : /* Get ready to do the minor cycle */
768 4199 : state.PeakResidual = 0;
769 4199 : state.PeakResidualNoMask = 0;
770 4199 : state.MaxPsfSidelobe = 0;
771 4199 : state.MaxCycleIterDone = 0;
772 4199 : state.MaskSum = -1.0;
773 4199 : }
774 :
775 3049 : void grpcInteractiveCleanManager::resetMinorCycleInitInfo( ) {
776 3049 : access( (void*) 0,
777 6098 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
778 3049 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
779 3049 : resetMinorCycleInitInfo(state);
780 3049 : return dummy; } ) );
781 3049 : }
782 :
783 1150 : void grpcInteractiveCleanManager::incrementMajorCycleCount( ) {
784 :
785 1150 : access( (void*) 0,
786 2300 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
787 1150 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
788 1150 : state.PrevMajorCycleCount = state.MajorDone;
789 1150 : state.MajorDone++;
790 :
791 : /* Interactive iteractions update */
792 1150 : state.InteractiveIterDone += state.MaxCycleIterDone;
793 :
794 1150 : resetMinorCycleInitInfo(state);
795 1150 : return dummy; } ) );
796 1150 : }
797 :
798 2167 : void grpcInteractiveCleanManager::mergeCycleInitializationRecord( Record &initRecord ){
799 4334 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
800 :
801 2167 : access( (void*) 0,
802 4334 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
803 2167 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
804 :
805 2167 : state.PeakResidual = max(state.PeakResidual, initRecord.asFloat(RecordFieldId("peakresidual")));
806 2167 : state.MaxPsfSidelobe = max(state.MaxPsfSidelobe, initRecord.asFloat(RecordFieldId("maxpsfsidelobe")));
807 :
808 2167 : state.PeakResidualNoMask = max( state.PeakResidualNoMask, initRecord.asFloat(RecordFieldId("peakresidualnomask")));
809 2167 : state.MadRMS = max(state.MadRMS, initRecord.asFloat(RecordFieldId("madrms")));
810 2167 : state.NsigmaThreshold = initRecord.asFloat(RecordFieldId("nsigmathreshold"));
811 :
812 : /*
813 : It has been reset to -1.0.
814 : If no masks have changed, it should remain at -1.0
815 : If any mask has changed, the sum will come in, and should be added to this.
816 : */
817 2167 : float thismasksum = initRecord.asFloat(RecordFieldId("masksum"));
818 2167 : if( thismasksum != -1.0 ) {
819 643 : if ( state.MaskSum == -1.0 ) state.MaskSum = thismasksum;
820 11 : else state.MaskSum += thismasksum;
821 : }
822 :
823 2167 : if ( state.PrevPeakResidual == -1.0 ) state.PrevPeakResidual = state.PeakResidual;
824 2167 : if ( state.PrevPeakResidualNoMask == -1.0 ) state.PrevPeakResidualNoMask = state.PeakResidualNoMask;
825 2167 : if( state.IsCycleThresholdAuto == true ) updateCycleThreshold(state);
826 :
827 2167 : return dummy; } ) );
828 2167 : }
829 :
830 :
831 912 : void grpcInteractiveCleanManager::mergeMinorCycleSummary( const Array<Double> &summary, grpcInteractiveCleanState &state, Int immod ){
832 1824 : IPosition cShp = state.SummaryMinor.shape();
833 1824 : IPosition nShp = summary.shape();
834 :
835 : //bool uss = SIMinorCycleController::useSmallSummaryminor(); // temporary CAS-13683 workaround
836 : //int nSummaryFields = uss ? 6 : SIMinorCycleController::nSummaryFields;
837 912 : int nSummaryFields = !state.FullSummary ? 6 : SIMinorCycleController::nSummaryFields;
838 :
839 1824 : if( cShp.nelements() != 2 || cShp[0] != nSummaryFields ||
840 1824 : nShp.nelements() != 2 || nShp[0] != nSummaryFields )
841 0 : throw(AipsError("Internal error in shape of global minor-cycle summary record"));
842 :
843 912 : state.SummaryMinor.resize( IPosition( 2, nSummaryFields, cShp[1]+nShp[1] ) ,true );
844 :
845 4053 : for (unsigned int row = 0; row < nShp[1]; row++) {
846 : // iterations done
847 3141 : state.SummaryMinor( IPosition(2,0,cShp[1]+row) ) = summary(IPosition(2,0,row));
848 : //if (state.FullSummary){
849 : // state.SummaryMinor( IPosition(2,0,cShp[1]+row) ) = state.IterDone + summary(IPosition(2,0,row));
850 : //}
851 : //else{
852 : // state.SummaryMinor( IPosition(2,0,cShp[1]+row) ) = summary(IPosition(2,0,row));
853 : //}
854 : //state.SummaryMinor( IPosition(2,0,cShp[1]+row) ) = summary(IPosition(2,0,row));
855 : // peak residual
856 3141 : state.SummaryMinor( IPosition(2,1,cShp[1]+row) ) = summary(IPosition(2,1,row));
857 : // model flux
858 3141 : state.SummaryMinor( IPosition(2,2,cShp[1]+row) ) = summary(IPosition(2,2,row));
859 : // cycle threshold
860 3141 : state.SummaryMinor( IPosition(2,3,cShp[1]+row) ) = summary(IPosition(2,3,row));
861 : //if (uss) { // temporary CAS-13683 workaround
862 3141 : if (!state.FullSummary) { // temporary CAS-13683 workaround
863 : // swap out mapper id with multifield id
864 3093 : state.SummaryMinor( IPosition(2,4,cShp[1]+row) ) = immod;
865 : // chunk id (channel/stokes)
866 3093 : state.SummaryMinor( IPosition(2,5,cShp[1]+row) ) = summary(IPosition(2,5,row));
867 : } else {
868 : // mapper id
869 48 : state.SummaryMinor( IPosition(2,4,cShp[1]+row) ) = summary(IPosition(2,4,row));
870 : // channel id
871 48 : state.SummaryMinor( IPosition(2,5,cShp[1]+row) ) = summary(IPosition(2,5,row));
872 : // polarity id
873 48 : state.SummaryMinor( IPosition(2,6,cShp[1]+row) ) = summary(IPosition(2,6,row));
874 : // cycle start iterations done
875 48 : state.SummaryMinor( IPosition(2,7,cShp[1]+row) ) = state.IterDone + summary(IPosition(2,7,row));
876 : // starting iterations done
877 48 : state.SummaryMinor( IPosition(2,8,cShp[1]+row) ) = state.IterDone + summary(IPosition(2,8,row));
878 : // starting peak residual
879 48 : state.SummaryMinor( IPosition(2,9,cShp[1]+row) ) = summary(IPosition(2,9,row));
880 : // starting model flux
881 48 : state.SummaryMinor( IPosition(2,10,cShp[1]+row) ) = summary(IPosition(2,10,row));
882 : // starting peak residual, not limited to the user's mask
883 48 : state.SummaryMinor( IPosition(2,11,cShp[1]+row) ) = summary(IPosition(2,11,row));
884 : // peak residual, not limited to the user's mask
885 48 : state.SummaryMinor( IPosition(2,12,cShp[1]+row) ) = summary(IPosition(2,12,row));
886 : // number of pixels in the mask
887 48 : state.SummaryMinor( IPosition(2,13,cShp[1]+row) ) = summary(IPosition(2,13,row));
888 : // mpi server
889 48 : state.SummaryMinor( IPosition(2,14,cShp[1]+row) ) = summary(IPosition(2,14,row));
890 : // outlier field id
891 48 : state.SummaryMinor( IPosition(2,15,cShp[1]+row) ) = immod;
892 : // stopcode
893 48 : state.SummaryMinor( IPosition(2,16,cShp[1]+row) ) = summary(IPosition(2,16,row));
894 : }
895 :
896 : }
897 912 : }
898 :
899 912 : void grpcInteractiveCleanManager::mergeCycleExecutionRecord( Record& execRecord, Int immod ){
900 1824 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
901 :
902 912 : access( (void*) 0,
903 1824 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
904 912 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
905 912 : mergeMinorCycleSummary( execRecord.asArrayDouble( RecordFieldId("summaryminor")), state, immod );
906 :
907 912 : state.IterDone += execRecord.asInt(RecordFieldId("iterdone"));
908 :
909 912 : state.MaxCycleIterDone = max( state.MaxCycleIterDone, execRecord.asInt(RecordFieldId("maxcycleiterdone")) );
910 :
911 912 : state.MinorCyclePeakResidual = max( state.PeakResidual, execRecord.asFloat(RecordFieldId("peakresidual")) );
912 :
913 912 : state.UpdatedModelFlag |=execRecord.asBool( RecordFieldId("updatedmodelflag") );
914 :
915 912 : os << "Completed " << state.IterDone << " iterations." << LogIO::POST;
916 : //with peak residual "<< state.PeakResidual << LogIO::POST;
917 912 : return dummy; } ) );
918 912 : }
919 :
920 0 : void grpcInteractiveCleanManager::changeStopFlag( bool stopEnabled ) {
921 0 : access( (void*) 0,
922 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
923 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
924 0 : state.StopFlag = stopEnabled;
925 0 : return dummy;
926 : } ) );
927 0 : }
928 :
929 : //====================================================================================================
930 :
931 0 : static bool isdir( const char *path ) {
932 : struct stat statbuf;
933 0 : int err = stat(path, &statbuf);
934 0 : if ( err == -1 ) return false;
935 0 : if ( S_ISDIR(statbuf.st_mode) ) return true;
936 0 : return false;
937 : }
938 :
939 0 : static std::string trim_trailing_slash( const char *str ) {
940 0 : char *temp = strdup(str);
941 0 : for ( int off = strlen(str) - 1; off >= 0; --off ) {
942 0 : if ( temp[off] == '/' ) temp[off] = '\0';
943 0 : else break;
944 : }
945 0 : std::string result = temp;
946 0 : free(temp);
947 0 : return result;
948 : }
949 :
950 1 : grpcInteractiveCleanGui::grpcInteractiveCleanGui( ) : viewer_pid(0), viewer_started(false) { }
951 1 : grpcInteractiveCleanGui::~grpcInteractiveCleanGui( ) {
952 1 : static const auto debug = getenv("GRPC_DEBUG");
953 :
954 1 : if ( ! viewer_started ) {
955 1 : if ( debug ) {
956 0 : std::cerr << "viewer shutdown required (" << viewer_uri << ")" <<
957 0 : " (process " << getpid( ) << ", thread " <<
958 0 : std::this_thread::get_id() << ")" << std::endl;
959 0 : fflush(stderr);
960 : }
961 : } else {
962 0 : if ( debug ) {
963 0 : std::cerr << "sending shutdown message to viewer (" << viewer_uri << ")" <<
964 0 : " (process " << getpid( ) << ", thread " <<
965 0 : std::this_thread::get_id() << ")" << std::endl;
966 0 : fflush(stderr);
967 : }
968 :
969 0 : bool stopped = stop_viewer( );
970 :
971 0 : if ( debug ) {
972 0 : if ( stopped ) {
973 0 : std::cerr << "viewer shutdown successful (" << viewer_uri << ")" <<
974 0 : " (process " << getpid( ) << ", thread " <<
975 0 : std::this_thread::get_id() << ")" << std::endl;
976 : } else {
977 0 : std::cerr << "viewer shutdown failed (" << viewer_uri << ")" <<
978 0 : " (process " << getpid( ) << ", thread " <<
979 0 : std::this_thread::get_id() << ")" << std::endl;
980 : }
981 0 : fflush(stderr);
982 : }
983 : }
984 1 : }
985 :
986 :
987 0 : bool grpcInteractiveCleanGui::alive( ) {
988 0 : static const auto debug = getenv("GRPC_DEBUG");
989 0 : if ( debug ) {
990 0 : std::cerr << "pinging viewer (" << viewer_uri << ")" <<
991 0 : " (process " << getpid( ) << ", thread " <<
992 0 : std::this_thread::get_id() << ")" << std::endl;
993 0 : fflush(stderr);
994 : }
995 0 : grpc::ClientContext context;
996 0 : ::google::protobuf::Empty resp;
997 0 : ::google::protobuf::Empty msg;
998 0 : auto ping = casatools::rpc::Ping::NewStub( grpc::CreateChannel( viewer_uri, grpc::InsecureChannelCredentials( ) ) );
999 0 : ::grpc::Status status = ping->now( &context, msg, &resp );
1000 0 : bool ping_result = status.ok( );
1001 0 : if ( debug ) {
1002 : std::cerr << "ping result: " << (ping_result ? "OK" : "FAIL")<<
1003 0 : " (process " << getpid( ) << ", thread " <<
1004 0 : std::this_thread::get_id() << ")" << std::endl;
1005 0 : fflush(stderr);
1006 : }
1007 0 : if ( ping_result == false ) {
1008 : int proc_status;
1009 0 : waitpid( viewer_pid, &proc_status, WUNTRACED | WCONTINUED | WNOHANG );
1010 0 : viewer_pid = 0;
1011 0 : viewer_proxy.release( );
1012 0 : viewer_started = false;
1013 0 : if ( debug ) {
1014 : std::cerr << "ping failed resetting state" <<
1015 0 : " (process " << getpid( ) << ", thread " <<
1016 0 : std::this_thread::get_id() << ")" << std::endl;
1017 0 : fflush(stderr);
1018 : }
1019 : }
1020 0 : return ping_result;
1021 : }
1022 :
1023 0 : bool grpcInteractiveCleanGui::launch( ) {
1024 0 : static const auto debug = getenv("GRPC_DEBUG");
1025 0 : if ( viewer_started == false ) {
1026 : // start the viewer process if it is not already running...
1027 0 : if ( debug ) {
1028 : std::cerr << "spawning viewer process" <<
1029 0 : " (process " << getpid( ) << ", thread " <<
1030 0 : std::this_thread::get_id() << ")" << std::endl;
1031 0 : fflush(stderr);
1032 : }
1033 0 : return spawn_viewer( );
1034 : } else {
1035 0 : if ( alive( ) ) {
1036 0 : if ( debug ) {
1037 : std::cerr << "viewer process available" <<
1038 0 : " (process " << getpid( ) << ", thread " <<
1039 0 : std::this_thread::get_id() << ")" << std::endl;
1040 0 : fflush(stderr);
1041 : }
1042 0 : return true;
1043 : } else {
1044 0 : if ( debug ) {
1045 : std::cerr << "re-spawning viewer process" <<
1046 0 : " (process " << getpid( ) << ", thread " <<
1047 0 : std::this_thread::get_id() << ")" << std::endl;
1048 0 : fflush(stderr);
1049 : }
1050 0 : return launch( );
1051 : }
1052 : }
1053 : return false;
1054 : }
1055 :
1056 629 : void grpcInteractiveCleanGui::close_panel( int id ) {
1057 629 : static const auto debug = getenv("GRPC_DEBUG");
1058 629 : if ( debug ) {
1059 0 : std::cerr << "close_panel(" << id << ")" <<
1060 0 : " (process " << getpid( ) << ", thread " <<
1061 0 : std::this_thread::get_id() << ")" << std::endl;
1062 0 : fflush(stderr);
1063 : }
1064 629 : if ( id != -1 && alive( ) ) {
1065 0 : if ( debug ) {
1066 0 : std::cerr << "close_panel(" << id << ") -- closing panel" <<
1067 0 : " (process " << getpid( ) << ", thread " <<
1068 0 : std::this_thread::get_id() << ")" << std::endl;
1069 0 : fflush(stderr);
1070 : }
1071 : {
1072 : // unload panel's images
1073 0 : rpc::img::Id panel;
1074 0 : grpc::ClientContext context;
1075 0 : ::google::protobuf::Empty resp;
1076 0 : panel.set_id(id);
1077 0 : viewer_proxy->unload( &context, panel, &resp );
1078 : }
1079 : {
1080 : // close panel
1081 0 : rpc::img::Id panel;
1082 0 : grpc::ClientContext context;
1083 0 : ::google::protobuf::Empty resp;
1084 0 : panel.set_id(id);
1085 0 : viewer_proxy->close( &context, panel, &resp );
1086 : }
1087 : }
1088 629 : }
1089 :
1090 0 : int grpcInteractiveCleanGui::open_panel( std::list<std::tuple<std::string,bool,bool,int>> images ) {
1091 0 : static const auto debug = getenv("GRPC_DEBUG");
1092 0 : if ( viewer_started == false ) {
1093 0 : if ( launch( ) == false ) return -1;
1094 : }
1095 0 : if ( debug ) {
1096 : std::cerr << "opening viewer panel" <<
1097 0 : " (process " << getpid( ) << ", thread " <<
1098 0 : std::this_thread::get_id() << ")" << std::endl;
1099 0 : fflush(stderr);
1100 : }
1101 0 : grpc::ClientContext context;
1102 0 : ::rpc::img::NewPanel np;
1103 0 : rpc::img::Id resp;
1104 0 : np.set_type("clean2");
1105 0 : np.set_hidden(false);
1106 0 : viewer_proxy->panel( &context, np, &resp );
1107 0 : int result = resp.id( );
1108 :
1109 0 : if ( debug ) {
1110 0 : std::cerr << "opened viewer panel " << result <<
1111 0 : " (process " << getpid( ) << ", thread " <<
1112 0 : std::this_thread::get_id() << ")" << std::endl;
1113 0 : fflush(stderr);
1114 : }
1115 :
1116 : // state for interactive masking in the new viewer panel
1117 0 : clean_state.insert( std::pair<int,CleanState>(result, CleanState( )) );
1118 :
1119 0 : if ( debug ) {
1120 0 : std::cerr << "created panel " << result <<
1121 0 : " (process " << getpid( ) << ", thread " <<
1122 0 : std::this_thread::get_id() << ")" << std::endl;
1123 0 : fflush(stderr);
1124 : }
1125 0 : return result;
1126 : }
1127 :
1128 0 : void grpcInteractiveCleanGui::unload( int id ) {
1129 0 : grpc::ClientContext context;
1130 0 : ::rpc::img::Id data;
1131 0 : ::google::protobuf::Empty resp;
1132 0 : data.set_id(id);
1133 0 : viewer_proxy->unload( &context, data, &resp );
1134 0 : }
1135 :
1136 0 : bool grpcInteractiveCleanGui::clone( const std::string &imageName, const std::string &newImageName ) {
1137 0 : LogIO os(LogOrigin("grpcInteractiveCleanGui", __FUNCTION__, WHERE));
1138 :
1139 : try {
1140 0 : PagedImage<Float> oldImage( imageName );
1141 0 : PagedImage<Float> newImage( TiledShape( oldImage.shape(), oldImage.niceCursorShape()),
1142 0 : oldImage.coordinates(), newImageName );
1143 0 : newImage.set(0.0);
1144 0 : newImage.table().flush(true, true);
1145 0 : } catch (AipsError x) {
1146 0 : os << LogIO::SEVERE << "Exception: " << x.getMesg() << LogIO::POST;
1147 0 : return false;
1148 : }
1149 0 : return true;
1150 : }
1151 :
1152 0 : float grpcInteractiveCleanGui::maskSum(const std::string &maskname) {
1153 :
1154 0 : PagedImage<Float> mask( maskname );
1155 :
1156 0 : LatticeExprNode msum( sum( mask ) );
1157 0 : float maskSum = msum.getFloat( );
1158 :
1159 0 : mask.unlock();
1160 0 : mask.tempClose();
1161 :
1162 0 : return maskSum;
1163 : }
1164 :
1165 0 : int grpcInteractiveCleanGui::interactivemask( int panel, const std::string &image, const std::string &mask,
1166 : int &niter, int &cycleniter, std::string &thresh,
1167 : std::string &cyclethresh, const bool forceReload ) {
1168 :
1169 0 : static const auto debug = getenv("GRPC_DEBUG");
1170 0 : LogIO os( LogOrigin("grpcInteractiveCleanGui",__FUNCTION__,WHERE) );
1171 :
1172 0 : if ( debug ) {
1173 0 : std::cerr << "starting interactivemask( " <<
1174 0 : panel << ", " << image << ", " << mask << ", " <<
1175 0 : niter << ", " << cycleniter << ", " << thresh << ", " <<
1176 : cyclethresh << ", " << (forceReload ? "true" : "false") << ")" <<
1177 0 : " (process " << getpid( ) << ", thread " <<
1178 0 : std::this_thread::get_id() << ")" << std::endl;
1179 0 : fflush(stderr);
1180 : }
1181 :
1182 0 : if ( viewer_started == false ) {
1183 : // viewer should be started before calling interactivemask(...)
1184 0 : os << LogIO::WARN << "Viewer GUI Not Available" << LogIO::POST;
1185 0 : return 0;
1186 : }
1187 :
1188 0 : auto state = clean_state.find(panel);
1189 0 : if ( state == clean_state.end( ) ) {
1190 0 : os << LogIO::WARN << "Invalid clean panel id used for interactive masking" << LogIO::POST;
1191 0 : return 0;
1192 : }
1193 :
1194 0 : if( Table::isReadable(mask) ) {
1195 0 : if ( ! Table::isWritable(mask) ) {
1196 0 : os << LogIO::WARN << "Mask image is not modifiable " << LogIO::POST;
1197 0 : return 0;
1198 : }
1199 : // we should regrid here if image and mask do not match
1200 : } else {
1201 0 : clone(image, mask);
1202 : }
1203 :
1204 0 : double startmask = maskSum(mask);
1205 :
1206 0 : if ( state->second.image_id == 0 || state->second.mask_id == 0 || forceReload ) {
1207 :
1208 : //Make sure image left after a "no more" is pressed is cleared
1209 0 : if ( forceReload && state->second.image_id !=0 )
1210 0 : state->second.prev_image_id = state->second.image_id;
1211 0 : if ( forceReload && state->second.mask_id !=0 )
1212 0 : state->second.prev_mask_id = state->second.mask_id;
1213 :
1214 0 : if ( state->second.prev_image_id ){
1215 0 : if ( debug ) {
1216 0 : std::cerr << "preparing to unload prev_image_id " <<
1217 0 : state->second.prev_image_id << " (panel " << panel << ")" <<
1218 0 : " (process " << getpid( ) << ", thread " <<
1219 0 : std::this_thread::get_id() << ")" << std::endl;
1220 0 : fflush(stderr);
1221 : }
1222 0 : unload( state->second.prev_image_id );
1223 : }
1224 0 : if ( state->second.prev_mask_id ) {
1225 0 : if ( debug ) {
1226 0 : std::cerr << "preparing to unload prev_mask_id " <<
1227 0 : state->second.prev_mask_id << " (panel " << panel << ")" <<
1228 0 : " (process " << getpid( ) << ", thread " <<
1229 0 : std::this_thread::get_id() << ")" << std::endl;
1230 0 : fflush(stderr);
1231 : }
1232 0 : unload( state->second.prev_mask_id );
1233 : }
1234 :
1235 0 : state->second.prev_image_id = 0;
1236 0 : state->second.prev_mask_id = 0;
1237 :
1238 : {
1239 0 : grpc::ClientContext context;
1240 0 : ::rpc::img::NewData nd;
1241 0 : rpc::img::Id resp;
1242 0 : nd.mutable_panel( )->set_id(panel);
1243 0 : nd.set_path(image);
1244 0 : nd.set_type("raster");
1245 0 : nd.set_scale(0);
1246 0 : viewer_proxy->load( &context, nd, &resp );
1247 0 : state->second.image_id = resp.id( );
1248 : }
1249 : {
1250 0 : grpc::ClientContext context;
1251 0 : ::rpc::img::NewData nd;
1252 0 : rpc::img::Id resp;
1253 0 : nd.mutable_panel( )->set_id(panel);
1254 0 : nd.set_path(mask);
1255 0 : nd.set_type("contour");
1256 0 : nd.set_scale(0);
1257 0 : viewer_proxy->load( &context, nd, &resp );
1258 0 : state->second.mask_id = resp.id( );
1259 : }
1260 :
1261 : } else {
1262 0 : grpc::ClientContext context;
1263 0 : ::rpc::img::Id id;
1264 0 : ::google::protobuf::Empty resp;
1265 0 : id.set_id(state->second.image_id);
1266 0 : viewer_proxy->reload( &context, id, &resp );
1267 0 : id.set_id(state->second.mask_id);
1268 0 : viewer_proxy->reload( &context, id, &resp );
1269 : }
1270 :
1271 0 : grpc::ClientContext context;
1272 0 : ::rpc::img::InteractiveMaskOptions options;
1273 0 : options.mutable_panel( )->set_id(state->first);
1274 0 : options.set_niter(niter);
1275 0 : options.set_cycleniter(cycleniter);
1276 0 : options.set_threshold(thresh);
1277 0 : options.set_cyclethreshold(cyclethresh);
1278 0 : ::rpc::img::InteractiveMaskResult imresult;
1279 0 : ::grpc::Status s = viewer_proxy->interactivemask( &context, options, &imresult );
1280 :
1281 0 : if ( ! s.ok( ) ) {
1282 0 : std::cerr << "interactive mask failed: " << s.error_details( ) << std::endl;
1283 0 : fflush(stderr);
1284 : }
1285 :
1286 0 : niter = imresult.state( ).niter( );
1287 0 : cycleniter = imresult.state( ).cycleniter( );
1288 0 : thresh = imresult.state( ).threshold( );
1289 0 : cyclethresh = imresult.state( ).cyclethreshold( );
1290 0 : int result = 1;
1291 0 : std::string action = imresult.action( );
1292 :
1293 0 : if ( debug ) {
1294 0 : std::cerr << "-------------------------------------------" << std::endl;
1295 0 : std::cerr << " gui state from interactive masking" << std::endl;
1296 0 : std::cerr << "-------------------------------------------" << std::endl;
1297 0 : std::cerr << " action: " << action << std::endl;
1298 0 : std::cerr << " niter: " << niter << std::endl;
1299 0 : std::cerr << " cycle niter: " << cycleniter << std::endl;
1300 0 : std::cerr << " threshold: " << thresh << std::endl;
1301 0 : std::cerr << " cycle threshold: " << cyclethresh << std::endl;
1302 0 : std::cerr << "-------------------------------------------" << std::endl;
1303 : }
1304 :
1305 0 : if ( action == "stop" ) result = 3;
1306 0 : else if ( action == "no more" ) result = 2;
1307 0 : else if ( action == "continue" ) result = 1;
1308 : else {
1309 0 : os << "ill-formed action result (" << action << ")" << LogIO::WARN << LogIO::POST;
1310 0 : return 0;
1311 : }
1312 :
1313 0 : state->second.prev_image_id = state->second.image_id;
1314 0 : state->second.prev_mask_id = state->second.mask_id;
1315 :
1316 0 : state->second.image_id = 0;
1317 0 : state->second.mask_id = 0;
1318 :
1319 0 : if ( debug ) {
1320 0 : std::cerr << "set prev_image_id to " << state->second.prev_image_id << " (panel " << panel << ")" <<
1321 0 : " (process " << getpid( ) << ", thread " <<
1322 0 : std::this_thread::get_id() << ")" << std::endl;
1323 0 : std::cerr << "set prev_mask_id to " << state->second.prev_mask_id << " (panel " << panel << ")" <<
1324 0 : " (process " << getpid( ) << ", thread " <<
1325 0 : std::this_thread::get_id() << ")" << std::endl;
1326 0 : fflush(stderr);
1327 : }
1328 :
1329 0 : double endmask = maskSum(mask);
1330 :
1331 0 : if( startmask != endmask ) {
1332 0 : result = -1 * result;
1333 0 : LogIO os( LogOrigin("grpcInteractiveCleanGui",__FUNCTION__,WHERE) );
1334 0 : os << "[" << mask << "] Mask modified from " << startmask << " pixels to " << endmask << " pixels " << LogIO::POST;
1335 : }
1336 :
1337 0 : return result;
1338 : }
1339 :
1340 0 : bool grpcInteractiveCleanGui::stop_viewer( ) {
1341 : // viewer is not running...
1342 0 : if ( ! viewer_started ) return false;
1343 0 : static const auto debug = getenv("GRPC_DEBUG");
1344 0 : if ( debug ) {
1345 0 : std::cerr << "sending shutdown message to viewer (" << viewer_uri << ")" <<
1346 0 : " (process " << getpid( ) << ", thread " <<
1347 0 : std::this_thread::get_id() << ")" << std::endl;
1348 0 : fflush(stderr);
1349 : }
1350 :
1351 : // send shutdown message to viewer...
1352 0 : grpc::ClientContext context;
1353 0 : ::google::protobuf::Empty req;
1354 0 : ::google::protobuf::Empty resp;
1355 0 : auto shutdown = casatools::rpc::Shutdown::NewStub( grpc::CreateChannel( viewer_uri,
1356 0 : grpc::InsecureChannelCredentials( ) ) );
1357 0 : shutdown->now( &context, req, &resp );
1358 :
1359 : // wait on viewer (appimage) to exit...
1360 : int status;
1361 0 : pid_t w = waitpid( viewer_pid, &status, WUNTRACED | WCONTINUED );
1362 0 : if ( w == -1 ){
1363 0 : if ( debug ) {
1364 : std::cerr << "viewer process waitpid failed " <<
1365 0 : " (process " << getpid( ) << ", thread " <<
1366 0 : std::this_thread::get_id() << ")" << std::endl;
1367 0 : fflush(stderr);
1368 : }
1369 : // waitpid failed
1370 0 : return false;
1371 0 : } else if ( w == 0 ) {
1372 0 : if ( debug ) {
1373 : std::cerr << "viewer process not found " <<
1374 0 : " (process " << getpid( ) << ", thread " <<
1375 0 : std::this_thread::get_id() << ")" << std::endl;
1376 0 : fflush(stderr);
1377 : }
1378 0 : return false;
1379 : } else {
1380 0 : if ( debug ) {
1381 : std::cerr << "viewer process exited, status fetched " <<
1382 0 : " (process " << getpid( ) << ", thread " <<
1383 0 : std::this_thread::get_id() << ")" << std::endl;
1384 0 : fflush(stderr);
1385 : }
1386 0 : return true;
1387 : }
1388 :
1389 : viewer_pid = 0;
1390 : viewer_proxy.release( );
1391 : viewer_started = false;
1392 : return true;
1393 : }
1394 :
1395 0 : bool grpcInteractiveCleanGui::spawn_viewer( ) {
1396 0 : static const auto debug = getenv("GRPC_DEBUG");
1397 :
1398 0 : std::string viewer_path = get_viewer_path( );
1399 0 : if ( viewer_path.size( ) == 0 ) return false;
1400 :
1401 : // To minimize package size for distribution via pypi.org, the
1402 : // data repo has been moved out of the viewer appImage/app and
1403 : // into a separate package. The path to this needs to be specified
1404 : // when starting the viewer now...
1405 0 : std::string distro_data_path_arg = get_distro_data_path( );
1406 :
1407 : // sanity check on viewer path...
1408 : struct stat statbuf;
1409 0 : if ( stat( viewer_path.c_str( ), &statbuf ) < 0 ) {
1410 : // file (or dir) does not exist... e.g.
1411 : // >>>>>>registry available at 0.0.0.0:40939
1412 : // stopping registry<<<<<<
1413 0 : return false;
1414 : }
1415 :
1416 0 : std::string fifo = get_fifo( );
1417 0 : if ( fifo.size( ) == 0 ) return false;
1418 :
1419 : // here we start the viewer in a very basic manner... we do not bother
1420 : // with all of the theatrics needed to daemonize the launched process
1421 : // (see https://stackoverflow.com/questions/17954432/creating-a-daemon-in-linux)
1422 : // it could be that this should be done in the future, but for now we
1423 : // will adopt the simple...
1424 :
1425 0 : const int maxargc = 5;
1426 : char *arguments[maxargc];
1427 0 : for (int i = 0; i < maxargc; i++) { arguments[i] = (char*)""; };
1428 :
1429 0 : arguments[0] = strdup(viewer_path.c_str( ));
1430 0 : arguments[1] = (char*) malloc(sizeof(char) * (fifo.size( ) + 12));
1431 0 : sprintf( arguments[1], "--server=%s", fifo.c_str( ) );
1432 0 : arguments[2] = strdup("--oldregions");
1433 0 : int argc =3;
1434 0 : if ( distro_data_path_arg.size( ) > 0 ) {
1435 0 : distro_data_path_arg = std::string("--datapath=") + distro_data_path_arg;
1436 0 : arguments[argc] = strdup(distro_data_path_arg.c_str( ));
1437 0 : argc++;
1438 : }
1439 0 : std::string log_path = casatools::get_state( ).logPath( );
1440 0 : if ( log_path.size( ) > 0 ) {
1441 0 : arguments[argc] = (char*) malloc(sizeof(char) * (log_path.size( ) + 17));
1442 0 : sprintf( arguments[argc], "--casalogfile=%s", log_path.c_str( ) );
1443 0 : argc++;
1444 : }
1445 :
1446 0 : if ( debug ) {
1447 0 : std::cerr << "forking viewer process: ";
1448 0 : for (int i=0; i < argc; ++i) std::cout << arguments[i] << " ";
1449 0 : std::cerr << " (process " << getpid( ) << ", thread " <<
1450 0 : std::this_thread::get_id() << ")" << std::endl;
1451 0 : fflush(stderr);
1452 : }
1453 0 : pid_t pid = fork( );
1454 :
1455 0 : if ( pid == 0 ) {
1456 0 : if ( debug ) {
1457 0 : std::cerr << "execing viewer process: ";
1458 0 : for (int i=0; i < argc; ++i) std::cout << arguments[i] << " ";
1459 0 : std::cerr << " (process " << getpid( ) << ", thread " <<
1460 0 : std::this_thread::get_id() << ")" << std::endl;
1461 0 : fflush(stderr);
1462 : }
1463 0 : char **envp = getenv_sansmpi(); // bugfix: run the viewer without MPI CAS-13252
1464 0 : execle( arguments[0], arguments[0], arguments[1], arguments[2], arguments[3], arguments[4], NULL, envp );
1465 0 : perror( "grpcInteractiveCleanGui::launch(...) child process exec failed" );
1466 0 : exit(1);
1467 : }
1468 :
1469 0 : for ( int i=0; i < argc; ++i ) free(arguments[i]);
1470 :
1471 0 : if ( pid == -1 ) {
1472 0 : perror( "grpcInteractiveCleanGui::launch(...) child process fork failed" );
1473 0 : return false;
1474 : }
1475 :
1476 : // perform a health check, after a delay...
1477 : int status;
1478 0 : sleep(2);
1479 0 : pid_t w = waitpid( pid, &status, WUNTRACED | WCONTINUED | WNOHANG );
1480 0 : if ( w == -1 ){
1481 0 : if ( debug ) {
1482 : std::cerr << "viewer process failed " <<
1483 0 : " (process " << getpid( ) << ", thread " <<
1484 0 : std::this_thread::get_id() << ")" << std::endl;
1485 0 : fflush(stderr);
1486 : }
1487 : // waitpid failed
1488 0 : return false;
1489 0 : } else if ( w != 0 ) {
1490 0 : if ( debug ) {
1491 : std::cerr << "viewer process died " <<
1492 0 : " (process " << getpid( ) << ", thread " <<
1493 0 : std::this_thread::get_id() << ")" << std::endl;
1494 0 : fflush(stderr);
1495 : }
1496 : // process exited
1497 0 : if ( WIFEXITED(status) ) {
1498 0 : printf("exited, status=%d\n", WEXITSTATUS(status));
1499 0 : } else if (WIFSIGNALED(status)) {
1500 0 : printf("killed by signal %d\n", WTERMSIG(status));
1501 0 : } else if (WIFSTOPPED(status)) {
1502 0 : printf("stopped by signal %d\n", WSTOPSIG(status));
1503 : }
1504 0 : return false;
1505 : }
1506 :
1507 0 : if ( debug ) {
1508 : std::cerr << "fetching viewer uri from " << fifo <<
1509 0 : " (process " << getpid( ) << ", thread " <<
1510 0 : std::this_thread::get_id() << ")" << std::endl;
1511 0 : fflush(stderr);
1512 : }
1513 : char buffer[512];
1514 0 : std::string uri_buffer;
1515 0 : FILE *fp = fopen(fifo.c_str( ), "r");
1516 0 : while ( fgets( buffer, sizeof(buffer), fp ) ) { uri_buffer = uri_buffer + buffer; }
1517 0 : fclose(fp);
1518 0 : trim(uri_buffer);
1519 :
1520 : // validate viewer uri...
1521 0 : if ( ! std::regex_match( uri_buffer, std::regex("^([0-9]+\\.){3}[0-9]+:[0-9]+$") ) ) {
1522 : //rework of regex required for IPv6...
1523 0 : if ( debug ) {
1524 : std::cerr << "bad viewer uri " << uri_buffer <<
1525 0 : " (process " << getpid( ) << ", thread " <<
1526 0 : std::this_thread::get_id() << ")" << std::endl;
1527 0 : fflush(stderr);
1528 : }
1529 0 : return false;
1530 : }
1531 :
1532 0 : if ( debug ) {
1533 : std::cerr << "received viewer uri: " << uri_buffer <<
1534 0 : " (process " << getpid( ) << ", thread " <<
1535 0 : std::this_thread::get_id() << ")" << std::endl;
1536 0 : fflush(stderr);
1537 : }
1538 :
1539 0 : viewer_uri = uri_buffer;
1540 0 : viewer_pid = pid;
1541 0 : viewer_proxy = rpc::img::view::NewStub( grpc::CreateChannel( viewer_uri,
1542 0 : grpc::InsecureChannelCredentials( ) ) );
1543 0 : viewer_started = true;
1544 :
1545 0 : return true;
1546 : }
1547 :
1548 0 : std::string grpcInteractiveCleanGui::get_python_path( ) {
1549 0 : std::string ret = casatools::get_state( ).pythonPath( );
1550 0 : return ret;
1551 : }
1552 :
1553 0 : std::string grpcInteractiveCleanGui::get_distro_data_path( ) {
1554 : static bool initialized = false;
1555 0 : static std::string result;
1556 0 : if ( initialized == false ) {
1557 0 : initialized = true;
1558 0 : result = casatools::get_state( ).distroDataPath( );
1559 : struct stat statbuf;
1560 0 : if ( stat( result.c_str( ), &statbuf ) < 0 ) {
1561 : // file (or dir) does not exist...
1562 0 : result = "";
1563 : }
1564 : }
1565 0 : return result;
1566 : }
1567 :
1568 0 : std::string grpcInteractiveCleanGui::get_viewer_path( ) {
1569 : // Get the path to the casaviewer Qt application, to be called in spawn_viewer()
1570 0 : std::string python_path = get_python_path( );
1571 0 : if ( python_path.size( ) == 0 ) return std::string( );
1572 :
1573 : //*** python3 -m casaviewer --app-path
1574 : char buffer[1024];
1575 0 : std::string result;
1576 0 : char **envp = getenv_sansmpi(); // bugfix: run the viewer without MPI CAS-13252
1577 0 : char *python_args[] = { (char*)python_path.c_str(), (char*)"-m", (char*)"casaviewer", (char*)"--app-path", NULL };
1578 0 : execve_getstdout((char*)python_path.c_str(), python_args, envp, buffer, 1024);
1579 0 : result = buffer;
1580 0 : free(envp);
1581 :
1582 0 : trim(result);
1583 0 : if ( result.size( ) == 0 ) return std::string( );
1584 0 : return result;
1585 : }
1586 :
1587 0 : std::string grpcInteractiveCleanGui::get_fifo( ) {
1588 0 : static const char *env_tmpdir = getenv("TMPDIR");
1589 0 : static std::string fifo_template = trim_trailing_slash(env_tmpdir && isdir(env_tmpdir) ? env_tmpdir : P_tmpdir) + "/vwr-XXXXXXXXXX";
1590 0 : static int fifo_template_size = fifo_template.size( );
1591 0 : char fifo_path[fifo_template_size+1];
1592 0 : strncpy( fifo_path, fifo_template.c_str( ), fifo_template_size );
1593 0 : fifo_path[fifo_template_size] = '\0';
1594 0 : int fd = mkstemp(fifo_path);
1595 0 : if ( fd == -1 ) throw std::runtime_error("mkstemp failed...");
1596 0 : close( fd );
1597 0 : unlink(fifo_path);
1598 0 : mkfifo( fifo_path, 0666 );
1599 0 : return fifo_path;
1600 : }
1601 :
1602 0 : casacore::Record grpcInteractiveCleanManager::pauseForUserInteraction( ) {
1603 0 : LogIO os( LogOrigin("grpcInteractiveCleanManager",__FUNCTION__,WHERE) );
1604 0 : static const auto debug = getenv("GRPC_DEBUG");
1605 :
1606 0 : if ( clean_images.size( ) == 0 ) {
1607 : // cannot open clean panel in viewer if not images are available...
1608 0 : if ( debug ) {
1609 : std::cerr << "no clean images available" <<
1610 0 : " (process " << getpid( ) << ", thread " <<
1611 0 : std::this_thread::get_id() << ")" << std::endl;
1612 0 : fflush(stderr);
1613 : }
1614 0 : return Record( );
1615 : }
1616 :
1617 0 : if ( clean_panel_id == -1 || ! gui.alive( ) ) {
1618 : // open panel if it is not already open...
1619 0 : clean_panel_id = gui.open_panel( clean_images );
1620 : }
1621 :
1622 0 : int niter=0,cycleniter=0,iterdone;
1623 0 : float threshold=0.0, cyclethreshold=0.0;
1624 0 : access( (void*) 0,
1625 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
1626 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
1627 0 : niter = state.Niter;
1628 0 : cycleniter = state.CycleNiter;
1629 0 : threshold = state.Threshold;
1630 0 : cyclethreshold = state.CycleThreshold;
1631 0 : iterdone = state.IterDone;
1632 0 : return dummy;
1633 : } ) );
1634 :
1635 0 : std::string strthresh = std::to_string(threshold)+"Jy";
1636 0 : std::string strcycthresh = std::to_string(cyclethreshold)+"Jy";
1637 :
1638 0 : int iterleft = niter - iterdone;
1639 0 : if( iterleft<0 ) iterleft=0;
1640 :
1641 0 : casacore::Vector<int> itsActionCodes(clean_images.size( ));
1642 0 : itsActionCodes = 1.0;
1643 :
1644 0 : unsigned ind = 0;
1645 0 : for ( auto it = clean_images.begin( ); it != clean_images.end( ); ++it, ++ind ) {
1646 0 : if ( std::get<2>(*it) ) {
1647 0 : itsActionCodes[ind] = std::get<3>(*it);
1648 0 : continue;
1649 : }
1650 0 : if ( fabs(itsActionCodes[ind]) == 1.0 ) {
1651 0 : std::string imageName = std::get<0>(*it) + ".residual" + ( std::get<1>(*it) ? ".tt0" : "" );
1652 0 : std::string maskName = std::get<0>(*it) + ".mask";
1653 0 : std::string last_strcycthresh = strcycthresh;
1654 0 : itsActionCodes[ind] = gui.interactivemask( clean_panel_id, imageName, maskName, iterleft,
1655 : cycleniter, strthresh, strcycthresh );
1656 :
1657 0 : if ( strcycthresh != last_strcycthresh ) {
1658 0 : access( (void*) 0,
1659 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
1660 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
1661 : // if this is not set to false, the users cyclethreshold
1662 : // change are recomputed...
1663 0 : state.IsCycleThresholdAuto = false;
1664 0 : return dummy;
1665 : } ) );
1666 : }
1667 :
1668 0 : if( itsActionCodes[ind] < 0 ) os << "[" << std::get<0>(*it) <<"] Mask changed interactively." << LogIO::POST;
1669 0 : if( fabs(itsActionCodes[ind])==3 || fabs(itsActionCodes[ind])==2 ) {
1670 : // fabs(itsActionCodes[ind])==3 --> stop
1671 : // fabs(itsActionCodes[ind])==2 --> no more
1672 0 : std::get<2>(*it) = true;
1673 0 : std::get<3>(*it) = fabs(itsActionCodes[ind]);
1674 : }
1675 : }
1676 : }
1677 :
1678 :
1679 0 : Quantity qa;
1680 0 : casacore::Quantity::read(qa,strthresh);
1681 0 : threshold = qa.getValue(Unit("Jy"));
1682 :
1683 :
1684 0 : float oldcyclethreshold = cyclethreshold;
1685 0 : Quantity qb;
1686 0 : casacore::Quantity::read(qb,strcycthresh);
1687 0 : cyclethreshold = qb.getValue(Unit("Jy"));
1688 :
1689 0 : access( (void*) 0,
1690 0 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
1691 0 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
1692 0 : if ( debug ) {
1693 0 : std::cerr << "-------------------------------------------" << std::endl;
1694 0 : std::cerr << " exporting gui state: " << std::endl;
1695 0 : std::cerr << "-------------------------------------------" << std::endl;
1696 0 : std::cerr << " Niter " << state.Niter <<
1697 0 : " ---> " << iterdone+iterleft << std::endl;
1698 0 : std::cerr << " CycleNiter " << state.CycleNiter <<
1699 0 : " ---> " << cycleniter << std::endl;
1700 0 : std::cerr << " Threshold " << state.Threshold <<
1701 0 : " ---> " << threshold << std::endl;
1702 0 : std::cerr << " CycleThreshold " << oldcyclethreshold <<
1703 0 : ( fabs( cyclethreshold - oldcyclethreshold ) > 1e-06 &&
1704 0 : cyclethreshold != 0 && oldcyclethreshold != 0 ?
1705 0 : " ---> " : " -x-> ") << cyclethreshold << std::endl;
1706 0 : std::cerr << "-------------------------------------------" << std::endl;
1707 : }
1708 :
1709 0 : state.Niter = iterdone+iterleft;
1710 0 : state.CycleNiter = cycleniter;
1711 0 : state.Threshold = threshold;
1712 0 : if ( cyclethreshold != 0 && oldcyclethreshold != 0 &&
1713 0 : fabs( cyclethreshold - oldcyclethreshold ) > 1e-06 )
1714 0 : state.CycleThreshold = cyclethreshold;
1715 :
1716 0 : return dummy;
1717 : } ) );
1718 :
1719 0 : Bool alldone=true;
1720 0 : for ( ind = 0; ind < clean_images.size( ); ++ind ) {
1721 0 : alldone = alldone & ( fabs(itsActionCodes[ind])==3 );
1722 : }
1723 0 : if( alldone==true ) changeStopFlag( true );
1724 :
1725 0 : Record returnRec;
1726 0 : for( ind = 0; ind < clean_images.size( ); ind++ ){
1727 0 : returnRec.define( RecordFieldId( String::toString(ind)), itsActionCodes[ind] );
1728 : }
1729 :
1730 0 : return returnRec;
1731 : }
1732 :
1733 629 : void grpcInteractiveCleanManager::closePanel( ) {
1734 629 : gui.close_panel(clean_panel_id);
1735 629 : clean_panel_id = -1;
1736 629 : clean_images.clear( );
1737 629 : access( (void*) 0,
1738 1258 : std::function< void* ( void*, grpcInteractiveCleanState& )>(
1739 629 : [&]( void *dummy, grpcInteractiveCleanState &state ) -> void* {
1740 629 : state.reset( );
1741 629 : return dummy; } ) );
1742 629 : }
1743 :
1744 : } //# NAMESPACE CASA - END
|