00001 from IPython.kernel import client
00002 PARALLEL_CASAPY_ROOT_DIR = "/home/casa-dev-02/casa_ccore/linux_64b/";
00003 PARALLEL_CONFIGFILES_DIR = "/users/sbhatnag/Scripts/ClusterConfigFiles/";
00004 PARALLEL_WORK_DIR = "/sanjay/PTEST/";
00005 PARALLEL_BASE_ROOT_DIR = "/home/casa-dev-";
00006
00007
00008
00009
00010
00011
00012 def mkname(node,rootdir,workdir,basename,ext,regex=false):
00013 format=str('%2.2d');
00014 nodenumber=(format%node);
00015 if regex:
00016 imgnodenumber="*";
00017 else:
00018 imgnodenumber=nodenumber;
00019 targetname=rootdir+nodenumber+workdir+basename+imgnodenumber+ext;
00020 return targetname;
00021
00022
00023
00024
00025
00026 def rcmd(var,op,value):
00027 tt=var+"=\""+value+"\"";
00028 tt=var+op+"\""+value+"\"";
00029 return tt;
00030
00031
00032
00033
00034
00035 def pinit(message="Hello CASA Cluster"):
00036 casalog.post("Setting up the connection to the remote nodes...",origin="PDeconv::pinit");
00037 mec = client.MultiEngineClient();
00038 ids=mec.get_ids();
00039 print "Connected to IDs ",ids;
00040
00041 mec.activate();
00042
00043 return mec;
00044
00045
00046
00047
00048
00049 def startcasapy(rmec,root=PARALLEL_CASAPY_ROOT_DIR):
00050 casalog.post("Starting remote casapys...",origin="PDeconv::startcasapy");
00051 rmec.execute("import sys");
00052 rmec.execute(rcmd("root","=",root));
00053 cmd="sys.path.append("+rcmd("root","+","/python/2.5/")+")";
00054 rmec.execute(cmd);
00055 cmd = "execfile("+rcmd("root","+","/python/2.5/casa_in_py.py")+")";
00056 rmec.execute(cmd);
00057 cmd = "import os,shutil";
00058 rmec.execute(cmd);
00059
00060
00061
00062
00063
00064 def mkContResImgWorkaround(rmec,nodes,iatool,
00065 rootdir=PARALLEL_BASE_ROOT_DIR,
00066 workdir=PARALLEL_WORK_DIR,
00067 imagebasename="ptest.cont.im.",
00068 extension=".residual",
00069 outimagename="dirtyavg.im"):
00070 n=len(nodes);
00071 imagenames=[];
00072 refval=[];
00073 for i in range(n):
00074 imagename=mkname(nodes[i],rootdir,workdir,imagebasename,extension);
00075 iatool.open(imagename);
00076 shp=iatool.shape();
00077 tt="tempimage."+str(nodes[i])+".cont"
00078 iatool.rebin(outfile=tt,bin=[1,1,1,shp[3]],overwrite=true);
00079 iatool.open(tt);
00080 cs=iatool.coordsys();
00081 refval.append(ia.coordsys().referencevalue(type='spectral')['numeric'][0]);
00082 iatool.close();
00083
00084 refval0=sum(refval)/len(refval);
00085 width=100*refval[len(refval)-1]-refval[0];
00086 tmpimagename=outimagename+".sum";
00087 cmd = "rm -rf "+tmpimagename;
00088 os.system(cmd);
00089 cmd = "cp -r tempimage."+str(nodes[0])+".cont "+outimagename;
00090 os.system(cmd);
00091 iatool.open(outimagename);
00092 d0=iatool.getchunk();
00093 d0=d0*0;
00094
00095 cs=iatool.coordsys();
00096 cs.setincrement(value=width,type='spectral');
00097 cs.setreferencepixel(value=0,type='spectral');
00098 cs.setreferencevalue(value=refval0,type='spectral');
00099 iatool.setcoordsys(cs.torecord());
00100 iatool.close()
00101
00102
00103 for i in range(n):
00104 tt="tempimage."+str(nodes[i])+".cont";
00105 iatool.open(tt);
00106 dd=ia.getchunk();
00107 d0=dd+d0;
00108 iatool.close();
00109 imagenames.append(tt);
00110 iatool.open(outimagename);
00111 iatool.putchunk(d0/n);
00112 iatool.close();
00113
00114
00115
00116
00117
00118
00119
00120 def mkContResImgNotInUse(rmec,nodes,iatool,
00121 rootdir=PARALLEL_BASE_ROOT_DIR,
00122 workdir=PARALLEL_WORK_DIR,
00123 imagebasename="ptest.cont.im.",
00124 extension=".residual",
00125 outimagename="dirtyavg.im"):
00126 n=len(nodes);
00127 imagenames=[];
00128 for i in range(n):
00129 imagename=mkname(nodes[i],rootdir,workdir,imagebasename,extension);
00130 imagenames.append(imagename);
00131 print imagenames;
00132 tmpimagename=outimagename+".sum";
00133 iatool.imageconcat(outfile=tmpimagename,infiles=imagenames,overwrite=true,relax=true);
00134 iatool.open(tmpimagename);
00135 shp=iatool.shape();
00136 print "Averaging ",shp[3]," channels of '",tmpimagename,"'";
00137 iatool.rebin(outfile=outimagename,bin=[1,1,1,shp[3]],overwrite=true);
00138 iatool.close();
00139
00140
00141
00142 def mksyscmdstr(cmd):
00143 return "os.system(\""+cmd+"\")";
00144
00145
00146
00147
00148
00149 def cpmodelimg(rmec,nodes,modelimage,
00150 rootdir=PARALLEL_BASE_ROOT_DIR,
00151 workdir=PARALLEL_WORK_DIR,
00152 imagebasename="ptest.model."):
00153 n=len(nodes);
00154 for i in range(n):
00155 toimagename=mkname(nodes[i],rootdir,workdir,imagebasename,".image");
00156 cmd = "shutil.copytree('"+modelimage+"','"+toimagename+"')";
00157 print cmd;
00158
00159
00160
00161
00162 def rmimage(mec,imagename,doit=false):
00163 cmd = "rm -rf "+imagename;
00164 cmd = "os.rmdir("+imagename+")";
00165
00166
00167
00168
00169
00170
00171
00172 def mkResOnly(rmec,nodes,
00173 configfile=PARALLEL_CONFIGFILES_DIR+"clean_mfs_major.last",
00174 rootdir=PARALLEL_BASE_ROOT_DIR,
00175 workdir=PARALLEL_WORK_DIR,
00176 msbasename="pevla2_1h.16.ms-",
00177 imagebasename="ptest.cont.im."):
00178
00179
00180 n=len(nodes);
00181
00182
00183
00184 setUpClean(rmec,1,0,nodes,configfile,rootdir,workdir,msbasename,imagebasename)
00185
00186
00187
00188 def setModelImgCoordsys(modelimage, sourceimage,iatool=ia):
00189 iatool.open(sourceimage);
00190 cs=iatool.coordsys();
00191 iatool.close();
00192 iatool.open(modelimage);
00193 iatool.setcoordsys(cs.torecord());
00194 iatool.close();
00195
00196
00197
00198
00199 def accumulateModel(iModel, modelimage,iatool=ia):
00200
00201
00202
00203 if (os.path.exists(modelimage)):
00204 iatool.open(iModel); d0=iatool.getchunk();
00205 iatool.open(modelimage); d1=iatool.getchunk();
00206 d1 = d1 + d0;
00207 iatool.putchunk(d1);
00208 iatool.close();
00209 else:
00210 cmd = "cp -r " + iModel + " " + modelimage;
00211 os.system(cmd);
00212
00213
00214
00215 cmd = "rm -rf " + iModel;
00216 os.system(cmd);
00217
00218
00219
00220 def setUpFlagger(rmec,nodes,mode="quack",
00221 configfile=PARALLEL_CONFIGFILES_DIR+"pquack.last",
00222 rootdir=PARALLEL_BASE_ROOT_DIR,
00223 workdir=PARALLEL_WORK_DIR,
00224 msbasename="pevla2_1h.ms-"):
00225
00226
00227
00228 rmec.execute(rcmd("pquackconfig","=", configfile));
00229 cmd="execfile(pquackconfig)";
00230 rmec.execute(cmd);
00231
00232
00233
00234 if (mode != ""):
00235 cmd="mode="+"'"+mode+"'";
00236 rmec.execute(cmd);
00237 for i in range(len(nodes)):
00238 msname=mkname(nodes[i],rootdir,workdir,msbasename,"");
00239 mec.execute(rcmd("vis","=",msname),i);
00240
00241
00242
00243 def setUpGaincal(rmec,nodes, caltable='s0_0.gcal',
00244 field='0', spw='0', solint='10s',
00245 configfile=PARALLEL_CONFIGFILES_DIR+"pgaincal.last",
00246 rootdir=PARALLEL_BASE_ROOT_DIR,
00247 workdir=PARALLEL_WORK_DIR,
00248 msbasename="pevla2_1h.ms-"):
00249
00250
00251
00252 rmec.execute(rcmd("pgaincalconfig","=",configfile));
00253 cmd="execfile(pgaincalconfig)";
00254 rmec.execute(cmd);
00255
00256
00257
00258 if (caltable != ""):
00259 rmec.execute(rcmd("caltable", "=", caltable));
00260 if (field != ""):
00261 rmec.execute(rcmd("field", "=", field));
00262 if (spw != ""):
00263 rmec.execute(rcmd("spw", "=", spw));
00264 if (solint != ""):
00265 rmec.execute(rcmd("solint", "=", solint));
00266
00267
00268
00269 for i in range(len(nodes)):
00270 msname=mkname(nodes[i],rootdir,workdir,msbasename,"");
00271 mec.execute(rcmd("vis","=",msname),i);
00272
00273
00274
00275 def setUpBandpass(rmec,nodes,caltable='BJones.tab', field='0',
00276 spw='0', solint='inf', gaintable='',
00277 interp=['nearest'],
00278 configfile=PARALLEL_CONFIGFILES_DIR+"pbandpass.last",
00279 rootdir=PARALLEL_BASE_ROOT_DIR,
00280 workdir=PARALLEL_WORK_DIR,
00281 msbasename="pevla2_1h.ms-"):
00282
00283
00284
00285 rmec.execute(rcmd("pbandpassconfig","=",configfile));
00286 cmd="execfile(pbandpassconfig)";
00287 rmec.execute(cmd);
00288
00289
00290
00291 if (caltable != ""):
00292 rmec.execute(rcmd("caltable", "=", caltable));
00293 if (field != ""):
00294 rmec.execute(rcmd("field", "=", field));
00295 if (spw != ""):
00296 rmec.execute(rcmd("spw", "=", spw));
00297 if (solint != ""):
00298 rmec.execute(rcmd("solint", "=", solint));
00299 if ( gaintable != ""):
00300 rmec.execute(rcmd("gaintable","=", gaintable));
00301
00302
00303
00304 for i in range(len(nodes)):
00305 msname=mkname(nodes[i],rootdir,workdir,msbasename,"");
00306 mec.execute(rcmd("vis","=",msname),i);
00307
00308
00309
00310 def setUpApplycal(rmec,field="", spw="", gaintable=[''],
00311 interp=[''],
00312 rootdir=PARALLEL_BASE_ROOT_DIR,
00313 workdir=PARALLEL_WORK_DIR,
00314 msbasename="pevla2_1h.ms-",
00315 imagebasename="ptest.im."):
00316
00317
00318
00319 rmec.execute(rcmd("papplycalconfig","=",configfile));
00320 cmd="execfile(papplycalconfig)";
00321 rmec.execute(cmd);
00322
00323
00324
00325 if (field != ""):
00326 rmec.execute(rcmd("field", "=", field));
00327 if (spw != ""):
00328 rmec.execute(rcmd("spw", "=", spw));
00329
00330
00331
00332
00333
00334
00335
00336 for i in range(len(nodes)):
00337 msname=mkname(nodes[i],rootdir,workdir,msbasename,"");
00338 mec.execute(rcmd("vis","=",msname),i);
00339
00340
00341
00342 def setUpClean(rmec,nchan,niter=1000, nodes=[1,2,3,4],
00343 configfile=PARALLEL_CONFIGFILES_DIR+"clean.last",
00344 rootdir=PARALLEL_BASE_ROOT_DIR,
00345 workdir=PARALLEL_WORK_DIR,
00346 msbasename="pevla2_1h.ms-",
00347 imagebasename="ptest.im."):
00348
00349
00350
00351 rmec.execute(rcmd("pcleanconfig","=",configfile));
00352 cmd="execfile(pcleanconfig)";
00353 rmec.execute(cmd);
00354
00355
00356
00357 if (nchan > 0):
00358 cmd="nchan="+str(nchan);
00359 rmec.execute(cmd);
00360 if (niter > -1):
00361 cmd="niter="+str(niter);
00362 rmec.execute(cmd);
00363 for i in range(len(nodes)):
00364 msname=mkname(nodes[i],rootdir,workdir,msbasename,"");
00365 imgname=mkname(nodes[i],rootdir,workdir,imagebasename,"");
00366
00367 mec.execute(rcmd("vis","=",msname),i);
00368 mec.execute(rcmd("imagename","=",imgname),i);
00369
00370
00371
00372 def pspectralline(rmec,nodes,nchan,niter=1000,
00373 msbasename="pevla2_1h.16.ms-",
00374 imagebasename="ptest.cont.im.",
00375 configfile=PARALLEL_CONFIGFILES_DIR+"clean.last",
00376 rootdir=PARALLEL_BASE_ROOT_DIR,
00377 workdir=PARALLEL_WORK_DIR):
00378 setUpClean(rmec,nchan,niter,nodes,configfile,
00379 rootdir,workdir,msbasename,
00380 imagebasename);
00381 cmd="go('clean')"
00382 mec.execute(cmd);
00383
00384
00385
00386 def pcontinuum(mec,iatool,deconvtool,nodes,
00387 minoriter=1000,minorpermajor=200,majoriter=5,
00388 majorconfigfile=PARALLEL_CONFIGFILES_DIR+"clean_mfs_major.last",
00389 minorconfigfile="",
00390 msbasename="pevla2_1h.16.ms-",
00391 imagebasename="ptest.cont.im.",
00392 rootdir=PARALLEL_BASE_ROOT_DIR,
00393 workdir=PARALLEL_WORK_DIR):
00394 n = len(nodes);
00395 dirtyimagename = "dirtyavg.im.sum";
00396 avgpsfname = "psfavg.im.sum";
00397 modelimagename = "avgcomp.im";
00398 tmpModelImageName = "tmp.avgcomp.im";
00399 minorIterRemaining = minoriter;
00400 minorIterPerMajor = minorpermajor;
00401 casalog.origin("PDeconv::pcontinuum");
00402 for major in range(majoriter):
00403
00404
00405
00406 casalog.post("====================================================================");
00407 mesg="Making residual images for major cycle no. " + str(major) + " (the parallel operation)";
00408 casalog.post(mesg);
00409 mkResOnly(mec,nodes,configfile=majorconfigfile,rootdir=rootdir,workdir=workdir,
00410 msbasename=msbasename,imagebasename=imagebasename);
00411 cmd="go('clean')"
00412 mec.execute(cmd);
00413
00414
00415
00416 casalog.post("Making continuum residual image (the gather operation)");
00417 casalog.post(" Making average residual image...");
00418 mkContResImgWorkaround(mec,nodes,iatool,imagebasename=imagebasename,
00419 outimagename=dirtyimagename);
00420
00421
00422
00423
00424 casalog.post(" Making average PSF image...");
00425 mkContResImgWorkaround(mec,nodes,iatool,imagebasename=imagebasename,
00426 extension=".psf",outimagename=avgpsfname);
00427
00428
00429
00430 casalog.post("Doing the minor cycle (the serial operation)");
00431 if (minorIterRemaining > minorIterPerMajor):
00432 minorIterRemaining = minorIterPerMajor;
00433 if (minorIterRemaining > 0):
00434 casalog.post("Doing "+str(minorIterRemaining)+ " minor cycle iterations...");
00435 deconvtool.open(dirtyimagename, avgpsfname);
00436 deconvtool.clarkclean(niter=minorIterRemaining,model=tmpModelImageName);
00437 accumulateModel(tmpModelImageName,modelimagename,iatool);
00438 minorIterRemaining = minoriter - (major+1)*minorIterPerMajor;
00439
00440
00441
00442 casalog.post("Scattering the model image");
00443 for i in range(n):
00444 remotemodelimage=mkname(nodes[i],rootdir,workdir,imagebasename,".model");
00445 cmd = "rm -rf " + remotemodelimage;
00446 os.system(cmd);
00447 cmd = "cp -r " + modelimagename + " " + remotemodelimage;
00448 os.system(cmd);
00449 remoteresidualimage=mkname(nodes[i],rootdir,workdir,imagebasename,".residual");
00450 setModelImgCoordsys(remotemodelimage, remoteresidualimage);
00451
00452
00453
00454
00455
00456
00457
00458
00459 def pcontinuumdebugger(mec,iatool,deconvtool,nodes,
00460 minorconfigfile,
00461 majorconfigfile,
00462 dowhat,
00463 rootdir=PARALLEL_BASE_ROOT_DIR,
00464 workdir=PARALLEL_WORK_DIR,
00465 msbasename="pevla2_1h.16.ms-",
00466 imagebasename="ptest.cont.im.",
00467 minoriter=100,majoriter=10):
00468 n = len(nodes);
00469 dirtyimagename="dirtyavg.im.sum";
00470 avgpsfname = "psfavg.im.sum";
00471 modelimagename="avgcomp.im";
00472 tmpModelImageName="tmp.avgcomp.im";
00473 minorIterRemaining=minoriter;
00474 minorIterPerMajor = 100;
00475
00476 major=0;
00477 if (dowhat == 1):
00478 print "Making residual images for major cycle no. ", major, "(the scatter operation)";
00479 mkResOnly(mec,nodes,configfile=majorconfigfile,rootdir=rootdir,workdir=workdir,
00480 msbasename=msbasename,imagebasename=imagebasename);
00481 cmd="go('clean')"
00482 mec.execute(cmd);
00483
00484 if (dowhat == 2):
00485 print "Making continuum dirt image (the gather operation)"
00486 print " Making average dirty image...";
00487 mkContResImgWorkaround(mec,nodes,iatool,imagebasename=imagebasename,
00488 outimagename=dirtyimagename);
00489 print " Making average PSF image...";
00490 mkContResImgWorkaround(mec,nodes,iatool,imagebasename=imagebasename,
00491 extension=".psf",outimagename=avgpsfname);
00492
00493 if (dowhat == 3):
00494 print minorIterRemaining, minorIterPerMajor;
00495 print "Doing the minor cycle (the serial operation)"
00496 if (minorIterRemaining > minorIterPerMajor):
00497 minorIterRemaining = minorIterPerMajor;
00498 if (minorIterRemaining > 0):
00499 print "Doing ", minorIterRemaining, " minor cycle iterations...";
00500 print dirtyimagename, avgpsfname;
00501 deconvtool.open(dirtyimagename, avgpsfname);
00502 deconvtool.clarkclean(niter=minorIterRemaining,model=tmpModelImageName);
00503 deconvtool.done();
00504 accumulateModel(tmpModelImageName, modelimagename,iatool);
00505 minorIterRemaining = minoriter - (major+1)*minorIterPerMajor;
00506
00507 if (dowhat == 4):
00508 print "Scattering the model image"
00509 for i in range(n):
00510 remotemodelimage=mkname(nodes[i],rootdir,workdir,imagebasename,".model");
00511 cmd = "rm -rf " + remotemodelimage;
00512 os.system(cmd);
00513 cmd = "cp -r " + modelimagename + " " + remotemodelimage;
00514 os.system(cmd);
00515 remoteresidualimage=mkname(nodes[i],rootdir,workdir,imagebasename,".residual");
00516 setModelImgCoordsys(remotemodelimage, remoteresidualimage);
00517
00518