Line data Source code
1 : //# CubeMajorCycleAlgorithm.cc: implementation of class to grid and degrid (and write model vis when necessary) in parallel/serial
2 : //# Copyright (C) 2019
3 : //# Associated Universities, Inc. Washington DC, USA.
4 : //#
5 : //# This library is free software; you can redistribute it and/or modify it
6 : //# under the terms of the GNU General Public License as published by
7 : //# the Free Software Foundation; either version 3 of the License, or (at your
8 : //# option) any later version.
9 : //#
10 : //# This library is distributed in the hope that it will be useful, but WITHOUT
11 : //# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 : //# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public
13 : //# License for more details.
14 : //#
15 : //# https://www.gnu.org/licenses/
16 : //#
17 : //# Queries concerning CASA should be submitted at
18 : //# https://help.nrao.edu
19 : //#
20 : //# Postal address: CASA Project Manager
21 : //# National Radio Astronomy Observatory
22 : //# 520 Edgemont Road
23 : //# Charlottesville, VA 22903-2475 USA
24 : //#
25 : //#
26 : //# $Id$
27 : #include <casacore/lattices/Lattices/LatticeLocker.h>
28 : #include <synthesis/ImagerObjects/CubeMajorCycleAlgorithm.h>
29 : #include <synthesis/ImagerObjects/SynthesisImagerVi2.h>
30 : #include <synthesis/ImagerObjects/SynthesisNormalizer.h>
31 : #include <casacore/casa/Containers/Record.h>
32 : #include <synthesis/ImagerObjects/SimpleSIImageStore.h>
33 : #include <imageanalysis/Utilities/SpectralImageUtil.h>
34 : #include <casacore/casa/OS/Timer.h>
35 : #include <chrono>
36 : #include <thread>
37 : using namespace casacore;
38 : namespace casa { //# NAMESPACE CASA - BEGIN
39 : extern Applicator applicator;
40 :
41 800 : CubeMajorCycleAlgorithm::CubeMajorCycleAlgorithm() : myName_p("CubeMajorCycleAlgorithm"), ftmRec_p(0), iftmRec_p(0), polRep_p(0),startmodel_p(0), residualNames_p(0), psfNames_p(0), sumwtNames_p(0), weightNames_p(0), pbNames_p(0), movingSource_p(""),status_p(False), retuning_p(True), doPB_p(False), nterms_p(0){
42 :
43 800 : }
44 801 : CubeMajorCycleAlgorithm::~CubeMajorCycleAlgorithm() {
45 :
46 801 : }
47 :
48 799 : void CubeMajorCycleAlgorithm::get() {
49 799 : reset();
50 : //cerr << "in get for child process " << applicator.isWorker() << endl;
51 1598 : Record vecImParsRec;
52 1598 : Record vecSelParsRec;
53 1598 : Record vecGridParsRec;
54 : // get data sel params #1
55 799 : applicator.get(vecSelParsRec);
56 : // get image sel params #2
57 799 : applicator.get(vecImParsRec);
58 : // get gridders params #3
59 799 : applicator.get(vecGridParsRec);
60 : // get which channel to process #4
61 799 : applicator.get(chanRange_p);
62 : //cerr <<"GET chanRange " << chanRange_p << endl;
63 : // psf or residual CubeMajorCycleAlgorithm #5
64 799 : applicator.get(dopsf_p);
65 : // store modelvis and other controls #6
66 799 : applicator.get(controlRecord_p);
67 : // weight params
68 799 : applicator.get(weightParams_p);
69 : //Somewhere before this we have to make sure that vecSelPars has more than 0 fields)
70 799 : dataSel_p.resize(vecSelParsRec.nfields());
71 : /// Fill the private variables
72 1627 : for (Int k=0; k < Int(dataSel_p.nelements()); ++k){
73 828 : (dataSel_p[k]).fromRecord(vecSelParsRec.asRecord(String::toString(k)));
74 : //cerr << k << "datasel " << vecSelParsRec.asRecord(String::toString(k)) << endl;
75 : }
76 : //imsel and gridsel should be the same numbers (number of image fields)
77 799 : Int nmajorcycles=0;
78 799 : if(controlRecord_p.isDefined("nmajorcycles"))
79 524 : controlRecord_p.get("nmajorcycles",nmajorcycles);
80 799 : imSel_p.resize(vecImParsRec.nfields());
81 799 : gridSel_p.resize(vecImParsRec.nfields());
82 799 : ftmRec_p.resize(vecImParsRec.nfields());
83 799 : iftmRec_p.resize(vecImParsRec.nfields());
84 799 : polRep_p.resize(vecImParsRec.nfields());
85 799 : polRep_p.set(-1);
86 799 : startmodel_p.resize(vecImParsRec.nfields());
87 799 : startmodel_p.set(Vector<String>(0));
88 : //TESTOO
89 : //Int CPUID;
90 : //MPI_Comm_rank(MPI_COMM_WORLD, &CPUID);
91 : //////////////////
92 1613 : for (uInt k=0; k < imSel_p.nelements(); ++k){
93 2442 : Record imSelRec=vecImParsRec.asRecord(String::toString(k));
94 : //cerr << k << " imsel " << imSelRec << endl;
95 814 : if(imSelRec.isDefined("polrep"))
96 814 : imSelRec.get("polrep", polRep_p[k]);
97 : ///Get moving source name
98 814 : if(imSelRec.isDefined("movingsource") && imSelRec.asString("movingsource") != ""){
99 4 : imSelRec.get("movingsource", movingSource_p);
100 : }
101 : //cerr << CPUID << " POLREP " << polRep_p << endl;
102 : //Only first major cycle we need to reset model
103 814 : if(nmajorcycles==1)
104 285 : imSelRec.get("startmodel", startmodel_p[k]);
105 : //have to do that as fromRecord check does not like to have both model and startmodel on disk !
106 814 : imSelRec.define("startmodel", Vector<String>(0));
107 814 : (imSel_p[k]).fromRecord(imSelRec);
108 2442 : Record recGridSel=vecGridParsRec.asRecord(String::toString(k));
109 814 : (gridSel_p[k]).fromRecord(recGridSel);
110 814 : if(!recGridSel.isDefined("ftmachine")){
111 814 : ftmRec_p.resize();
112 814 : iftmRec_p.resize();
113 : }
114 814 : if(ftmRec_p.nelements() >0){
115 0 : ftmRec_p[k]=recGridSel.asRecord("ftmachine");
116 0 : iftmRec_p[k]=recGridSel.asRecord("iftmachine");
117 : }
118 :
119 : }
120 :
121 :
122 799 : }
123 799 : void CubeMajorCycleAlgorithm::put() {
124 :
125 : //if(applicator.isSerial()){
126 : // serialBug_p=Applicator::DONE;
127 : //applicator.put(serialBug_p);
128 :
129 : // }
130 : //cerr << "in put " << status_p << endl;
131 799 : applicator.put(returnRec_p);
132 799 : applicator.put(status_p);
133 :
134 799 : }
135 :
136 799 : void CubeMajorCycleAlgorithm::task(){
137 2397 : LogIO logger(LogOrigin("CubeMajorCycleAlgorithm", "task", WHERE));
138 799 : status_p = True;
139 : try{
140 : //Timer tim;
141 : //tim.mark();
142 : //SynthesisImagerVi2 imgr;
143 : //imgr.selectData(dataSel_p);
144 : // We do not use chanchunking in this model
145 1613 : for (uInt k=0; k < gridSel_p.nelements(); ++k)
146 814 : gridSel_p[k].chanchunks = 1;
147 :
148 : //imgr.defineImage(imSel_p,gridSel_p);
149 : // need to find how many subfields/outliers have been set
150 : //CountedPtr<SIImageStore> imstor =imgr.imageStore(0);
151 : //CountedPtr<ImageInterface<Float> > resid=imstor->residual();
152 : //Int nchan = resid->shape()(3);
153 : //std::shared_ptr<SIImageStore> subImStor=imstor->getSubImageStore(0, 1, chanId_p, nchan, 0,1);
154 :
155 1598 : SynthesisImagerVi2 subImgr;
156 799 : subImgr.setCubeGridding(False);
157 1627 : for (Int k=0; k < Int(dataSel_p.nelements()); ++k){
158 : //The original SynthesisImager would have cleared the model if it was requested
159 828 : dataSel_p[k].incrmodel=True;
160 828 : dataSel_p[k].freqbeg="";
161 828 : subImgr.selectData(dataSel_p[k]);
162 : }
163 : //cerr <<"***Time for select data " << tim.real() << endl;
164 : //tim.mark();
165 799 : subImgr.setMovingSource(movingSource_p);
166 1598 : Vector<CountedPtr<SIImageStore> > subImStor(imSel_p.nelements());
167 : //copy as shared_ptr as we mix the usage of countedptr and sharedptr
168 1598 : Vector<std::shared_ptr<SIImageStore> > subImStorShared(imSel_p.nelements());
169 800 : Vector<SIImageStore *> p(imSel_p.nelements());
170 : //Do multifield in one process only for now
171 : //TODO if all fields have same nchan then partition all on all subcalls
172 799 : if(chanRange_p[0]==0){
173 1613 : for (uInt k=0; k < imSel_p.nelements(); ++k){
174 814 : subImStorShared[k]=subImageStore(k);
175 814 : subImStor[k]=CountedPtr<SIImageStore>(subImStorShared[k]);
176 :
177 814 : if(ftmRec_p.nelements()>0){
178 0 : subImgr.defineImage(subImStor[k], ftmRec_p[k], iftmRec_p[k]);
179 : }else{
180 814 : subImgr.defineImage(subImStor[k], imSel_p[k], gridSel_p[k]);
181 : }
182 : }
183 : }else{
184 0 : subImStor.resize(1);
185 0 : subImStorShared[0]=subImageStore(0);
186 0 : subImStor[0]=CountedPtr<SIImageStore>(subImStorShared[0]);
187 :
188 0 : if(ftmRec_p.nelements()>0){
189 0 : subImgr.defineImage(subImStor[0], ftmRec_p[0], iftmRec_p[0]);
190 : }else{
191 0 : subImgr.defineImage(subImStor[0], imSel_p[0], gridSel_p[0]);
192 : }
193 : }
194 799 : subImgr.setCubeGridding(False);
195 : // TO DO get weight param and set weight
196 : // Modified to fix CAS-13660 bug for natural weight setting with uvtaper
197 799 : if(!weightParams_p.isDefined("type") ){
198 0 : subImgr.weight("natural");
199 : }
200 : else{
201 799 : if(controlRecord_p.isDefined("weightdensity")){
202 4 : String densName=controlRecord_p.asString("weightdensity");
203 : //cerr << "Loading weightdensity " << densName << endl;
204 2 : if(Table::isReadable(densName))
205 2 : subImgr.setWeightDensity(densName);
206 : }
207 : else{
208 797 : subImgr.weight(weightParams_p);
209 : }
210 : }
211 : ///Now do the selection tuning if needed
212 799 : if(imSel_p[0].mode !="cubedata"){
213 : //cerr << "IN RETUNING " << endl;
214 793 : if(retuning_p)
215 793 : subImgr.tuneSelectData();
216 : }
217 :
218 : //cerr << "***Time for all other setting " << tim.real() << endl;
219 : //tim.mark();
220 799 : if (!dopsf_p){
221 : ///In case weightimages for mosaicft is done load it...we can get rid of this if we are using fromrecord ftm
222 : ///doing it for non-psf only ...psf divides it by sumwt for some reason in
223 : ///SIImageStore ..so restart with psf creates havoc
224 524 : subImgr.loadMosaicSensitivity();
225 1048 : Record outrec=subImgr.executeMajorCycle(controlRecord_p);
226 524 : if(outrec.isDefined("tempfilenames")){
227 508 : returnRec_p.define("tempfilenames", outrec.asArrayString("tempfilenames"));
228 : }
229 1058 : for(uInt k=0; k < subImStor.nelements(); ++k){
230 534 : if(controlRecord_p.isDefined("dividebyweight") && controlRecord_p.asBool("dividebyweight")){
231 : {
232 534 : if(controlRecord_p.isDefined("normpars")){
233 1068 : Record normpars=controlRecord_p.asRecord("normpars");
234 1068 : SynthesisNormalizer norm;
235 534 : if(nterms_p[k] > 0){
236 2 : if(!normpars.isDefined("nterms"))
237 0 : normpars.define("nterms", uInt(nterms_p[k]));
238 2 : normpars.define("deconvolver", "mtmfs");
239 : }
240 :
241 534 : norm.setupNormalizer(normpars);
242 534 : norm.setImageStore(subImStorShared[k]);
243 534 : norm.divideResidualByWeight();
244 : }
245 : else{
246 0 : LatticeLocker lock1 (*(subImStor[k]->residual()), FileLocker::Write);
247 0 : LatticeLocker lock2 (*(subImStor[k]->sumwt()), FileLocker::Read);
248 0 : subImStor[k]->divideResidualByWeight();
249 : //subImStor[k]->residual()->flush();
250 : }
251 : }
252 534 : subImStor[k]->residual()->unlock();
253 534 : if(subImStor[k]->hasModel())
254 278 : subImStor[k]->model()->unlock();
255 : }
256 534 : Int chanBeg=0; Int chanEnd=0;
257 534 : if(k==0){
258 524 : chanBeg=chanRange_p[0];
259 524 : chanEnd=chanRange_p[1];
260 : }
261 : else{
262 10 : chanBeg=0;
263 10 : chanEnd=subImStor[k]->sumwt()->shape()[3]-1;
264 : }
265 534 : if(subImStor[k]->getType() != "multiterm"){
266 532 : writeBackToFullImage(residualNames_p[k], chanBeg, chanEnd, (subImStor[k]->residual()));
267 532 : writeBackToFullImage(sumwtNames_p[k], chanBeg, chanEnd, (subImStor[k]->sumwt()));
268 : }
269 :
270 : }
271 :
272 :
273 : }
274 : else{
275 549 : Record&& outrec=subImgr.makePSF();
276 274 : if(outrec.isDefined("tempfilenames")){
277 274 : returnRec_p.define("tempfilenames", outrec.asArrayString("tempfilenames"));
278 : }
279 : ////tclean expects a PB to be always there...
280 : //so for standard make it
281 274 : subImgr.makePB();
282 553 : for(uInt k=0; k < subImStor.nelements(); ++k){
283 279 : if(controlRecord_p.isDefined("dividebyweight") && controlRecord_p.asBool("dividebyweight"))
284 : {
285 279 : if(controlRecord_p.isDefined("normpars")){
286 558 : Record normpars=controlRecord_p.asRecord("normpars");
287 279 : if(nterms_p[k] > 0){
288 1 : if(!normpars.isDefined("nterms"))
289 0 : normpars.define("nterms", uInt(nterms_p[k]));
290 1 : normpars.define("deconvolver", "mtmfs");
291 : }
292 : // cerr << k << " " << nterms_p[k] <<" NORMPARS " << normpars << endl;
293 279 : SynthesisNormalizer norm;
294 279 : norm.setupNormalizer(normpars);
295 279 : norm.setImageStore(subImStorShared[k]);
296 279 : norm.dividePSFByWeight();
297 279 : copyBeamSet(*(subImStorShared[k]->psf()), k);
298 : }
299 : else{
300 0 : LatticeLocker lock1 (*(subImStor[k]->psf()), FileLocker::Write);
301 0 : LatticeLocker lock2 (*(subImStor[k]->sumwt()), FileLocker::Read);
302 0 : subImStor[k]->dividePSFByWeight();
303 0 : copyBeamSet(*(subImStor[k]->psf()), k);
304 : //subImStor[k]->psf()->flush();
305 : }
306 : }
307 279 : Int chanBeg=0; Int chanEnd=0;
308 279 : if(k==0){
309 274 : chanBeg=chanRange_p[0];
310 274 : chanEnd=chanRange_p[1];
311 : }
312 : else{
313 5 : chanBeg=0;
314 5 : chanEnd=subImStor[k]->sumwt()->shape()[3]-1;
315 : }
316 279 : if(subImStor[k]->getType() != "multiterm"){
317 278 : writeBackToFullImage(psfNames_p[k], chanBeg, chanEnd, (subImStor[k]->psf()));
318 278 : writeBackToFullImage(sumwtNames_p[k], chanBeg, chanEnd, (subImStor[k]->sumwt()));
319 278 : if((subImStor[k]->hasSensitivity()) && Table::isWritable(weightNames_p[k])){
320 57 : writeBackToFullImage(weightNames_p[k], chanBeg, chanEnd, (subImStor[k]->weight()));
321 :
322 : }
323 278 : if( doPB_p && Table::isWritable(pbNames_p[k])){
324 221 : writeBackToFullImage(pbNames_p[k], chanBeg, chanEnd, (subImStor[k]->pb()));
325 :
326 : }
327 :
328 : }
329 279 : subImStor[k]->psf()->unlock();
330 :
331 : }
332 : }
333 : //cerr << "***Time gridding/ffting " << tim.real() << endl;
334 798 : status_p = True;
335 : }
336 2 : catch (AipsError x) {
337 :
338 2 : LogIO os( LogOrigin("SynthesisImagerVi2","CubeMajorCycle",WHERE) );
339 1 : os << LogIO::WARN << "Exception for chan range " << chanRange_p << " --- "<< x.getMesg() << LogIO::POST;
340 1 : cerr << "##################################\n#############################\nException: " << x.getMesg() << endl;
341 :
342 1 : status_p=false;
343 : }
344 0 : catch(std::exception& exc) {
345 0 : logger << LogIO::SEVERE << "Exception (std): " << exc.what() << LogIO::POST;
346 0 : returnRec_p=Record();
347 : }
348 0 : catch(...){
349 0 : cerr << "###################################\n#######################3##\nUnknown exception " << endl;
350 0 : std::exception_ptr eptr=std::current_exception();
351 : try{
352 0 : if(eptr)
353 0 : std::rethrow_exception(eptr);
354 : }
355 0 : catch(const std::exception& a){
356 0 : LogIO os( LogOrigin("SynthesisImagerVi2","CubeMajorCycle",WHERE) );
357 0 : os << LogIO::WARN << "Unknown Exception for chan range " << chanRange_p << " --- "<< a.what() << LogIO::POST;
358 : }
359 :
360 0 : logger << LogIO::SEVERE << "Unknown exception " << LogIO::POST;
361 0 : status_p=False;
362 : }
363 799 : }
364 3197 : String& CubeMajorCycleAlgorithm::name(){
365 3197 : return myName_p;
366 : }
367 :
368 814 : shared_ptr<SIImageStore> CubeMajorCycleAlgorithm::subImageStore(const int imId){
369 : //For some reason multiterm deconvolver is allowed with cubes !
370 1628 : String isMTdeconv="";
371 814 : nterms_p.resize(imId+1, True);
372 814 : nterms_p[imId]=-1;
373 814 : if(imId==0 && imSel_p[imId].deconvolver=="mtmfs"){
374 0 : isMTdeconv=".tt0";
375 0 : nterms_p[0]=1;
376 : }
377 :
378 1628 : String residname=imSel_p[imId].imageName+".residual"+isMTdeconv;
379 1628 : String psfname=imSel_p[imId].imageName+".psf"+isMTdeconv;
380 1628 : String sumwgtname=imSel_p[imId].imageName+".sumwt"+isMTdeconv;
381 814 : if(imId > 0 && !Table::isReadable(sumwgtname)){
382 3 : return multiTermImageStore(imId);
383 : }
384 1622 : shared_ptr<ImageInterface<Float> >subpsf=nullptr;
385 1622 : shared_ptr<ImageInterface<Float> >subresid=nullptr;
386 1622 : shared_ptr<ImageInterface<Float> >submodel=nullptr;
387 1622 : shared_ptr<ImageInterface<Float> > subweight=nullptr;
388 1622 : shared_ptr<ImageInterface<Float> > subpb=nullptr;
389 : //cerr << imId << " sumwt name " << sumwgtname << endl;
390 1622 : String workingdir="";
391 811 : if(controlRecord_p.isDefined("workingdirectory"))
392 811 : controlRecord_p.get("workingdirectory", workingdir);
393 : //cerr <<"WORKINGDIR " << workingdir << endl;
394 811 : if(Table::isReadable(workingdir+"/"+sumwgtname)){
395 811 : workingdir=workingdir+"/";
396 811 : residname=workingdir+residname;
397 811 : psfname=workingdir+psfname;
398 811 : sumwgtname=workingdir+sumwgtname;
399 :
400 : }
401 0 : else if(!Table::isReadable(sumwgtname) )
402 0 : throw(AipsError("Programmer error: sumwt disk image is non existant"));
403 : else
404 0 : workingdir="";
405 : ///Use user locking to make sure locks are compliant
406 1622 : PagedImage<Float> sumwt(sumwgtname, TableLock::UserLocking);
407 811 : sumwt.lock(FileLocker::Write, 200);
408 : //PagedImage<Float> sumwt(sumwgtname, TableLock::AutoNoReadLocking);
409 811 : if(sumwtNames_p.nelements() <= uInt(imId)){
410 811 : sumwtNames_p.resize(imId+1, True);
411 811 : psfNames_p.resize(imId+1, True);
412 811 : residualNames_p.resize(imId+1, True);
413 811 : sumwtNames_p[imId]=sumwgtname;
414 811 : residualNames_p[imId]=residname;
415 811 : psfNames_p[imId]=psfname;
416 : }
417 811 : Int nchannels=sumwt.shape()[3];
418 :
419 :
420 :
421 : //Should be partitioning for main image only
422 : //chanRange
423 811 : Int chanBeg=0;
424 811 : Int chanEnd=0;
425 811 : if(imId==0){
426 799 : chanBeg=chanRange_p[0];
427 799 : chanEnd=chanRange_p[1];
428 : }
429 : else{
430 12 : chanBeg=0;
431 12 : chanEnd=sumwt.shape()[3]-1;
432 : }
433 811 : sumwt.tempClose();
434 : ////For some small channel ms's retuning trigger a vi2/vb2 bug in nChannels
435 : ///avoid retuning for small images
436 : ////Skipping this here..
437 : //overloaded SynthesisImagerVi2::retune to do this check
438 : //if(nchannels < 30 && imId==0 && ((chanEnd-chanBeg) < 10)){
439 : // retuning_p=False;
440 : //}
441 : //cerr << "chanBeg " << chanBeg << " chanEnd " << chanEnd << " imId " << imId << endl;
442 1622 : Vector<String> weightnams(controlRecord_p.asArrayString("weightnames"));
443 1622 : Vector<String> pbnams(controlRecord_p.asArrayString("pbnames"));
444 811 : pbNames_p.resize();
445 811 : pbNames_p=pbnams;
446 811 : weightNames_p.resize();
447 811 : weightNames_p=weightnams;
448 :
449 811 : if(imId >= int(weightNames_p.nelements()))
450 0 : throw(AipsError("Number of weight images does not match number of image fields defined"));
451 811 : if(Table::isReadable(workingdir+weightNames_p[imId])){
452 0 : weightNames_p[imId]=workingdir+weightNames_p[imId];
453 : }
454 811 : if(Table::isReadable(workingdir+pbNames_p[imId])){
455 0 : pbNames_p[imId]=workingdir+pbNames_p[imId];
456 : }
457 :
458 811 : if(dopsf_p){
459 : //PagedImage<Float> psf(psfname, TableLock::UserNoReadLocking);
460 : //subpsf.reset(SpectralImageUtil::getChannel(psf, chanBeg, chanEnd, true));
461 279 : getSubImage(subpsf, chanBeg, chanEnd, psfname, False);
462 : //cerr << "PBNAMES " << pbNames_p << endl;
463 279 : if(Table::isReadable(pbNames_p[imId])){
464 :
465 222 : doPB_p=True;
466 222 : getSubImage(subpb, chanBeg, chanEnd, pbNames_p[imId], False);
467 : }
468 : }
469 : else{
470 : //need to loop over all fields somewhere
471 : //PagedImage<Float> resid(residname, TableLock::UserNoReadLocking);
472 : //subresid.reset(SpectralImageUtil::getChannel(resid, chanBeg, chanEnd, true));
473 532 : getSubImage(subresid, chanBeg, chanEnd, residname, False);
474 :
475 532 : if(controlRecord_p.isDefined("modelnames")){
476 554 : Vector<String> modelnames(controlRecord_p.asArrayString("modelnames"));
477 277 : if(imId >= int(modelnames.nelements()))
478 0 : throw(AipsError("Number of model images does not match number of image fields defined"));
479 277 : if(Table::isReadable(workingdir+modelnames[imId])){
480 277 : modelnames[imId]=workingdir+modelnames[imId];
481 : }
482 277 : if(Table::isReadable(modelnames[imId])){
483 :
484 :
485 : ///Pass some extra channels for interpolation while degridding..should match or be less than in SynthesisImager::tuneSelect
486 277 : Int startmodchan=(chanBeg-2) >0 ? chanBeg-2 : 0;
487 277 : Int endmodchan=(chanEnd+2) < nchannels ? chanEnd+2 : nchannels-1 ;
488 : //cerr << "START END mod " << startmodchan << " " << endmodchan << endl;
489 : //Darn has to lock it as writable because overlap in SIMapperCollection code
490 : //wants that...though we are not really modifying it here
491 : //Bool writeisneeded=(imSel_p.nelements()!=1 || startmodel_p[imId].nelements() >0);
492 : //PagedImage<Float> model(modelnames[imId], TableLock::UserNoReadLocking);
493 : //submodel.reset(SpectralImageUtil::getChannel(model, startmodchan, endmodchan, writeisneeded));
494 277 : getSubImage(submodel, startmodchan, endmodchan, modelnames[imId], True);
495 : //
496 277 : if(Table::isReadable(weightNames_p[imId])){
497 66 : divideModelByWeight(imSel_p[imId].imageName, submodel, startmodchan, endmodchan, weightNames_p[imId]);
498 : }
499 : //ImageInterface<Float>* modim=new PagedImage<Float>(modelnames[imId], TableLock::UserNoReadLocking);
500 : //submodel.reset(modim);
501 :
502 : }
503 :
504 : }
505 :
506 : }
507 :
508 811 : if(Table::isReadable(weightNames_p[imId])){
509 : //PagedImage<Float> weight(weightnames[imId], TableLock::UserNoReadLocking);
510 : //subweight.reset(SpectralImageUtil::getChannel(weight, chanBeg, chanEnd, true));
511 168 : getSubImage(subweight, chanBeg, chanEnd, weightNames_p[imId], True);
512 : }
513 1622 : shared_ptr<ImageInterface<Float> >subsumwt=nullptr;
514 : // subsumwt.reset(SpectralImageUtil::getChannel(sumwt, chanBeg, chanEnd, true));
515 811 : getSubImage(subsumwt, chanBeg, chanEnd, sumwgtname, True);
516 811 : bool useweightimage=(subweight) ? true : false;
517 2433 : shared_ptr<SIImageStore> subimstor(new SimpleSIImageStore(imSel_p[imId].imageName, submodel, subresid, subpsf, subweight, nullptr, nullptr, subsumwt, nullptr, subpb, nullptr, nullptr, useweightimage));
518 811 : if(polRep_p[imId]< 0)
519 0 : throw(AipsError("data polarization type is not defined"));
520 811 : StokesImageUtil::PolRep polrep=(StokesImageUtil::PolRep)polRep_p[imId];
521 811 : subimstor->setDataPolFrame(polrep);
522 811 : if(startmodel_p[imId].nelements() >0){
523 6 : LatticeLocker lock1 (*(subimstor->model()), FileLocker::Write);
524 3 : subimstor->setModelImage(startmodel_p[imId]);
525 : }
526 : //cerr << "subimagestor TYPE" << subimstor->getType() << endl;
527 811 : return subimstor;
528 : }
529 :
530 :
531 3 : std::shared_ptr<SIImageStore> CubeMajorCycleAlgorithm::multiTermImageStore(const Int imId){
532 3 : uInt nterms=0;
533 9 : String sumwgtname=imSel_p[imId].imageName+".sumwt.tt"+String::toString(nterms);
534 12 : while (Table::isReadable(sumwgtname)){
535 9 : ++nterms;
536 9 : sumwgtname=imSel_p[imId].imageName+".sumwt.tt"+String::toString(nterms);
537 : }
538 3 : if(nterms==0){
539 0 : throw(AipsError("outlier "+String::toString(imId)+" field weight image is not defined"));
540 : }
541 3 : nterms=(nterms+1)/2;
542 3 : nterms_p[imId]=nterms;
543 3 : shared_ptr<SIImageStore> subimstor(new SIImageStoreMultiTerm(imSel_p[imId].imageName, nterms, True));
544 3 : if(polRep_p[imId]< 0)
545 0 : throw(AipsError("data polarization type is not defined"));
546 3 : StokesImageUtil::PolRep polrep=(StokesImageUtil::PolRep)polRep_p[imId];
547 3 : subimstor->setDataPolFrame(polrep);
548 6 : return subimstor;
549 : }
550 66 : void CubeMajorCycleAlgorithm::divideModelByWeight(casacore::String imageName, shared_ptr<ImageInterface<Float> >&submodel, const Int startchan, const Int endchan, const String weightname){
551 66 : if(controlRecord_p.isDefined("dividebyweight") && controlRecord_p.asBool("dividebyweight")){
552 66 : if(controlRecord_p.isDefined("normpars")){
553 132 : Record normpars=controlRecord_p.asRecord("normpars");
554 132 : SynthesisNormalizer norm;
555 66 : norm.setupNormalizer(normpars);
556 132 : shared_ptr<ImageInterface<Float> > subweight=nullptr;
557 66 : getSubImage(subweight, startchan, endchan, weightname, True);
558 132 : LatticeLocker lock1(*(subweight), FileLocker::Read);
559 132 : LatticeLocker lock2(*(submodel), FileLocker::Read);
560 198 : std::shared_ptr<SIImageStore> subimstor(new SimpleSIImageStore(imageName, submodel, nullptr, nullptr, subweight, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, nullptr, True));
561 66 : norm.setImageStore(subimstor);
562 66 : norm.divideModelByWeight();
563 :
564 : }
565 : }
566 66 : }
567 :
568 :
569 :
570 :
571 799 : void CubeMajorCycleAlgorithm::reset(){
572 :
573 799 : dataSel_p.resize();
574 799 : imSel_p.resize();
575 799 : gridSel_p.resize();
576 799 : ftmRec_p.resize();
577 799 : iftmRec_p.resize();
578 799 : polRep_p.resize();
579 799 : chanRange_p.resize();
580 799 : dopsf_p=False;
581 799 : controlRecord_p=Record();
582 799 : weightParams_p=Record();
583 799 : returnRec_p=Record();
584 799 : startmodel_p.resize();
585 799 : status_p=False;
586 799 : residualNames_p.resize();
587 799 : psfNames_p.resize();
588 799 : sumwtNames_p.resize();
589 799 : weightNames_p.resize();
590 799 : pbNames_p.resize();
591 799 : movingSource_p="";
592 799 : retuning_p=True;
593 799 : doPB_p=False;
594 799 : nterms_p.resize();
595 :
596 :
597 :
598 799 : }
599 :
600 2355 : void CubeMajorCycleAlgorithm::getSubImage(std::shared_ptr<ImageInterface<Float> >& subimptr, const Int chanBeg, const Int chanEnd, const String imagename, const Bool copy){
601 7065 : LogIO os( LogOrigin("CubeMajorCycle","getSubImage",WHERE) );
602 4710 : CountedPtr<PagedImage<Float> > im=nullptr;
603 : {///work around
604 2355 : uInt exceptCounter=0;
605 : while(true){
606 : try{
607 2355 : im = new PagedImage<Float> (imagename, TableLock::UserLocking);
608 : //PagedImage<Float> im(imagename, TableLock::AutoNoReadLocking);
609 2355 : im->lock(FileLocker::Read, 1000);
610 2355 : SubImage<Float> *tmpptr=nullptr;
611 2355 : tmpptr=SpectralImageUtil::getChannel(*im, chanBeg, chanEnd, false);
612 2355 : subimptr.reset(new TempImage<Float>(TiledShape(tmpptr->shape(), tmpptr->niceCursorShape()), tmpptr->coordinates()));
613 :
614 2355 : if(copy)
615 1322 : subimptr->copyData(*tmpptr);
616 : //subimptr->flush();
617 2355 : im->unlock();
618 2355 : delete tmpptr;
619 2355 : break; ///get out of while loop
620 : }
621 0 : catch(AipsError &x){
622 0 : im=nullptr;
623 0 : String mes=x.getMesg();
624 0 : if(mes.contains("FilebufIO::readBlock") ){
625 0 : std::this_thread::sleep_for(std::chrono::milliseconds(50));
626 0 : os << LogIO::WARN << "#####CATCHING a sleep because "<< mes<< LogIO::POST;
627 : }
628 : else
629 0 : throw(AipsError("Error in readimage "+imagename+" : "+mes));
630 :
631 0 : if(exceptCounter > 100){
632 0 : throw(AipsError("Error in readimage got 100 of this exeception: "+mes));
633 :
634 : }
635 :
636 : }
637 0 : ++exceptCounter;
638 0 : }
639 : }
640 :
641 2355 : }
642 :
643 1898 : void CubeMajorCycleAlgorithm::writeBackToFullImage(const String imagename, const Int chanBeg, const Int chanEnd, std::shared_ptr<ImageInterface<Float> > subimptr){
644 5694 : LogIO os( LogOrigin("CubeMajorCycle","writebacktofullimage",WHERE) );
645 3796 : CountedPtr<PagedImage<Float> > im=nullptr;
646 : {///work around
647 1898 : uInt exceptCounter=0;
648 : while(true){
649 : try{
650 1898 : im=new PagedImage<Float> (imagename, TableLock::UserLocking);
651 :
652 :
653 : //PagedImage<Float> im(imagename, TableLock::AutoLocking);
654 1898 : SubImage<Float> *tmpptr=nullptr;
655 : {
656 1898 : tmpptr=SpectralImageUtil::getChannel(*im, chanBeg, chanEnd, true);
657 3796 : LatticeLocker lock1 (*(tmpptr), FileLocker::Write);
658 1898 : tmpptr->set(0.0);
659 1898 : tmpptr->copyData(*(subimptr));
660 : }
661 1898 : im->flush();
662 1898 : im->unlock();
663 1898 : delete tmpptr;
664 1898 : break; //get out of while loop
665 : }
666 0 : catch(AipsError &x){
667 :
668 0 : String mes=x.getMesg();
669 0 : if(mes.contains("FilebufIO::readBlock") ){
670 0 : std::this_thread::sleep_for(std::chrono::milliseconds(50));
671 0 : os << LogIO::WARN << "#####CATCHING a sleep because "<< mes<< LogIO::POST;
672 : }
673 : else
674 0 : throw(AipsError("Error in writing back image "+imagename+" : "+mes));
675 :
676 0 : if(exceptCounter > 100){
677 0 : throw(AipsError("Error in writeimage got 100 of this exeception: "+mes));
678 :
679 : }
680 :
681 : }
682 0 : ++exceptCounter;
683 0 : }
684 : }//End of work around for table disappearing bug
685 :
686 :
687 1898 : }
688 279 : void CubeMajorCycleAlgorithm::copyBeamSet(ImageInterface<Float>& subpsf, const Int imageid){
689 : //For now lets do for the main image only
690 279 : if(imageid != 0)
691 5 : return;
692 548 : ImageBeamSet iibeamset=subpsf.imageInfo().getBeamSet();
693 274 : uInt nchan=chanRange_p[1]-chanRange_p[0]+1;
694 274 : if(nchan ==iibeamset.nchan()){
695 548 : Matrix<GaussianBeam> gbs=iibeamset.getBeams();
696 274 : Cube<Float> beams(nchan, iibeamset.nstokes(),3);
697 :
698 2491 : for (uInt k=0; k < nchan; ++k){
699 4605 : for (uInt j=0; j < iibeamset.nstokes(); ++j){
700 2388 : beams(k,j, 0)=gbs(k, 0).getMajor("arcsec");
701 2388 : beams(k,j, 1)=gbs(k, 0).getMinor("arcsec");
702 2388 : beams(k,j, 2)=gbs(k, 0).getPA("deg", True);
703 : }
704 : }
705 274 : returnRec_p.define("imageid", imageid);
706 274 : returnRec_p.define("chanrange", chanRange_p);
707 274 : returnRec_p.define("beams", beams);
708 :
709 :
710 : }
711 :
712 :
713 :
714 : }
715 :
716 :
717 :
718 : } //# NAMESPACE CASA - END
|