casa  $Rev:20696$
 All Classes Namespaces Files Functions Variables
cluster.py
Go to the documentation of this file.
00001 from IPython.kernel import client
00002 PARALLEL_CASAPY_ROOT_DIR = "/home/casa-dev-02/casa_ccore/linux_64b/";
00003 PARALLEL_CONFIGFILES_DIR = "/users/sbhatnag/Scripts/ClusterConfigFiles/";
00004 PARALLEL_WORK_DIR        = "/sanjay/PTEST/";
00005 PARALLEL_BASE_ROOT_DIR   = "/home/casa-dev-";
00006 #
00007 #===============================================================
00008 # Make up a name containting the node number info.
00009 # (This function has the policy used to identify pieces of the
00010 #  distributed database)
00011 #
00012 def mkname(node,rootdir,workdir,basename,ext,regex=false):
00013     format=str('%2.2d');
00014     nodenumber=(format%node);
00015     if regex:
00016         imgnodenumber="*";
00017     else:
00018         imgnodenumber=nodenumber;
00019     targetname=rootdir+nodenumber+workdir+basename+imgnodenumber+ext;
00020     return targetname;
00021 #
00022 #===============================================================
00023 # Return a string as <var>+<op>+"<value>".  Often required
00024 # functionality when set up remote variables.
00025 #
00026 def rcmd(var,op,value):
00027     tt=var+"=\""+value+"\"";
00028     tt=var+op+"\""+value+"\"";
00029     return tt;
00030 #
00031 #===============================================================
00032 # Set up and return the MultiEngineClient.  This is required
00033 # when doing any communication of the nodes.
00034 #
00035 def pinit(message="Hello CASA Cluster"):
00036     casalog.post("Setting up the connection to the remote nodes...",origin="PDeconv::pinit");
00037     mec = client.MultiEngineClient();
00038     ids=mec.get_ids();
00039     print "Connected to IDs ",ids;
00040 #        tt='print '+"'"+message+"'";
00041     mec.activate();
00042 #       mec.execute(tt);
00043     return mec;
00044 #
00045 #===============================================================
00046 # Start remote, non-interactive casapy
00047 #
00048 #def setpath(rmec,root='/home/casa-dev-01/hye/gnuactive/linux_64b'):
00049 def startcasapy(rmec,root=PARALLEL_CASAPY_ROOT_DIR):
00050     casalog.post("Starting remote casapys...",origin="PDeconv::startcasapy");
00051     rmec.execute("import sys");
00052     rmec.execute(rcmd("root","=",root));
00053     cmd="sys.path.append("+rcmd("root","+","/python/2.5/")+")";
00054     rmec.execute(cmd);
00055     cmd = "execfile("+rcmd("root","+","/python/2.5/casa_in_py.py")+")";
00056     rmec.execute(cmd);
00057     cmd = "import os,shutil";
00058     rmec.execute(cmd);
00059 #
00060 #===============================================================
00061 # Make a continuum image from the images on various nodes.
00062 # (This is a work around a bug in image analysis tool (CAS-1229))
00063 #
00064 def mkContResImgWorkaround(rmec,nodes,iatool,
00065                            rootdir=PARALLEL_BASE_ROOT_DIR,
00066                            workdir=PARALLEL_WORK_DIR,
00067                            imagebasename="ptest.cont.im.",
00068                            extension=".residual",
00069                            outimagename="dirtyavg.im"):
00070     n=len(nodes);
00071     imagenames=[];
00072     refval=[];
00073     for i in range(n):
00074         imagename=mkname(nodes[i],rootdir,workdir,imagebasename,extension);
00075         iatool.open(imagename);
00076         shp=iatool.shape();
00077         tt="tempimage."+str(nodes[i])+".cont"
00078         iatool.rebin(outfile=tt,bin=[1,1,1,shp[3]],overwrite=true);
00079         iatool.open(tt);
00080         cs=iatool.coordsys();
00081         refval.append(ia.coordsys().referencevalue(type='spectral')['numeric'][0]);
00082         iatool.close();
00083 #    print refval;
00084     refval0=sum(refval)/len(refval);
00085     width=100*refval[len(refval)-1]-refval[0];
00086     tmpimagename=outimagename+".sum";
00087     cmd = "rm -rf "+tmpimagename;
00088     os.system(cmd);
00089     cmd = "cp -r tempimage."+str(nodes[0])+".cont "+outimagename;
00090     os.system(cmd);
00091     iatool.open(outimagename);
00092     d0=iatool.getchunk();
00093     d0=d0*0;
00094 #    iatool.open(outimagename);
00095     cs=iatool.coordsys();
00096     cs.setincrement(value=width,type='spectral');
00097     cs.setreferencepixel(value=0,type='spectral');
00098     cs.setreferencevalue(value=refval0,type='spectral');
00099     iatool.setcoordsys(cs.torecord());
00100     iatool.close()
00101     
00102 #    print "Averaging...";
00103     for i in range(n):
00104         tt="tempimage."+str(nodes[i])+".cont";
00105         iatool.open(tt);
00106         dd=ia.getchunk();
00107         d0=dd+d0;
00108         iatool.close();
00109         imagenames.append(tt);
00110     iatool.open(outimagename);
00111     iatool.putchunk(d0/n);
00112     iatool.close();
00113 #
00114 #===============================================================
00115 # Gather operation
00116 # (keep this serial for now - but make this parallel (btree averager))
00117 # (This is not in use yet because of an bug in ia tool related to
00118 #  Tabulated Axis in the image)
00119 #
00120 def mkContResImgNotInUse(rmec,nodes,iatool,
00121                          rootdir=PARALLEL_BASE_ROOT_DIR,
00122                          workdir=PARALLEL_WORK_DIR,
00123                          imagebasename="ptest.cont.im.",
00124                          extension=".residual",
00125                          outimagename="dirtyavg.im"):
00126     n=len(nodes);
00127     imagenames=[];
00128     for i in range(n):
00129         imagename=mkname(nodes[i],rootdir,workdir,imagebasename,extension);
00130         imagenames.append(imagename);
00131     print imagenames;
00132     tmpimagename=outimagename+".sum";
00133     iatool.imageconcat(outfile=tmpimagename,infiles=imagenames,overwrite=true,relax=true);
00134     iatool.open(tmpimagename);
00135     shp=iatool.shape();
00136     print "Averaging ",shp[3]," channels of '",tmpimagename,"'";
00137     iatool.rebin(outfile=outimagename,bin=[1,1,1,shp[3]],overwrite=true);
00138     iatool.close();
00139 #
00140 #===============================================================
00141 #
00142 def mksyscmdstr(cmd):
00143     return "os.system(\""+cmd+"\")";
00144 #
00145 #===============================================================
00146 # Scatter operation
00147 # (keep this serial for now - but make this non-block/parallel)
00148 #
00149 def cpmodelimg(rmec,nodes,modelimage,
00150                rootdir=PARALLEL_BASE_ROOT_DIR,
00151                workdir=PARALLEL_WORK_DIR,
00152                imagebasename="ptest.model."):
00153     n=len(nodes);
00154     for i in range(n):
00155         toimagename=mkname(nodes[i],rootdir,workdir,imagebasename,".image");
00156         cmd = "shutil.copytree('"+modelimage+"','"+toimagename+"')";
00157         print cmd;
00158 #        mec.execute(cmd,i);
00159 #
00160 #===============================================================
00161 #
00162 def rmimage(mec,imagename,doit=false):
00163     cmd = "rm -rf "+imagename;
00164     cmd = "os.rmdir("+imagename+")";
00165 #    cmd = mksyscmdstr(cmd);
00166 #
00167 #===============================================================
00168 # Make only residual images using distributed database.
00169 # This is used as a the parallel-major cycle of a parallel
00170 # deconvolution run.
00171 #
00172 def mkResOnly(rmec,nodes,
00173               configfile=PARALLEL_CONFIGFILES_DIR+"clean_mfs_major.last",
00174               rootdir=PARALLEL_BASE_ROOT_DIR,
00175               workdir=PARALLEL_WORK_DIR,
00176               msbasename="pevla2_1h.16.ms-",
00177               imagebasename="ptest.cont.im."):
00178     # Just set up imager parameter to stop making PSFs
00179     # Fireup imager to do "makeimage("residual")
00180     n=len(nodes);
00181 #     for i in range(n):
00182 #         imgname=mkname(nodes[i],rootdir,workdir,imagebasename,".residual");
00183 #         rmimage(imgname);
00184     setUpClean(rmec,1,0,nodes,configfile,rootdir,workdir,msbasename,imagebasename)
00185 #
00186 #===============================================================
00187 #
00188 def setModelImgCoordsys(modelimage, sourceimage,iatool=ia):
00189     iatool.open(sourceimage);
00190     cs=iatool.coordsys();
00191     iatool.close();
00192     iatool.open(modelimage);
00193     iatool.setcoordsys(cs.torecord());
00194     iatool.close();
00195 #
00196 #===============================================================
00197 # Accumulate model image from each minor cycle iteration
00198 #
00199 def accumulateModel(iModel, modelimage,iatool=ia):
00200     #
00201     # Accumulate the model image by hand too!
00202     #
00203     if (os.path.exists(modelimage)):
00204         iatool.open(iModel);      d0=iatool.getchunk();
00205         iatool.open(modelimage);  d1=iatool.getchunk();
00206         d1 = d1 + d0;
00207         iatool.putchunk(d1);
00208         iatool.close();
00209     else:
00210         cmd = "cp -r " + iModel + " " + modelimage;
00211         os.system(cmd);
00212     #
00213     # Remove the incremental model
00214     #
00215     cmd = "rm -rf " + iModel;
00216     os.system(cmd);
00217 #
00218 #===============================================================
00219 #
00220 def setUpFlagger(rmec,nodes,mode="quack",
00221                  configfile=PARALLEL_CONFIGFILES_DIR+"pquack.last",
00222                  rootdir=PARALLEL_BASE_ROOT_DIR,
00223                  workdir=PARALLEL_WORK_DIR,
00224                  msbasename="pevla2_1h.ms-"):
00225     #
00226     # Load the defaults from a user defined configfile
00227     #
00228     rmec.execute(rcmd("pquackconfig","=", configfile));
00229     cmd="execfile(pquackconfig)";
00230     rmec.execute(cmd);
00231     #
00232     # Override the parameters if required
00233     #
00234     if (mode != ""):
00235         cmd="mode="+"'"+mode+"'";
00236         rmec.execute(cmd);
00237     for i in range(len(nodes)):
00238         msname=mkname(nodes[i],rootdir,workdir,msbasename,"");
00239         mec.execute(rcmd("vis","=",msname),i);
00240 #
00241 #===============================================================
00242 #
00243 def setUpGaincal(rmec,nodes, caltable='s0_0.gcal',
00244                  field='0', spw='0', solint='10s',
00245                  configfile=PARALLEL_CONFIGFILES_DIR+"pgaincal.last",
00246                  rootdir=PARALLEL_BASE_ROOT_DIR,
00247                  workdir=PARALLEL_WORK_DIR,
00248                  msbasename="pevla2_1h.ms-"):
00249     #
00250     # Load the defaults from a user defined configfile
00251     #
00252     rmec.execute(rcmd("pgaincalconfig","=",configfile));
00253     cmd="execfile(pgaincalconfig)";
00254     rmec.execute(cmd);
00255     #
00256     # Override the parameters if required
00257     #
00258     if (caltable != ""):
00259         rmec.execute(rcmd("caltable", "=", caltable));
00260     if (field != ""):
00261         rmec.execute(rcmd("field",    "=", field));
00262     if (spw != ""):
00263         rmec.execute(rcmd("spw",      "=", spw));
00264     if (solint != ""):
00265         rmec.execute(rcmd("solint",   "=", solint));
00266     #
00267     # This is the "distributed" parameter (i.e., different at each node)
00268     #
00269     for i in range(len(nodes)):
00270         msname=mkname(nodes[i],rootdir,workdir,msbasename,"");
00271         mec.execute(rcmd("vis","=",msname),i);
00272 #
00273 #===============================================================
00274 #
00275 def setUpBandpass(rmec,nodes,caltable='BJones.tab', field='0',
00276                   spw='0', solint='inf', gaintable='',
00277                   interp=['nearest'],
00278                   configfile=PARALLEL_CONFIGFILES_DIR+"pbandpass.last",
00279                   rootdir=PARALLEL_BASE_ROOT_DIR,
00280                   workdir=PARALLEL_WORK_DIR,
00281                   msbasename="pevla2_1h.ms-"):
00282     #
00283     # Load the defaults from a user defined configfile
00284     #
00285     rmec.execute(rcmd("pbandpassconfig","=",configfile));
00286     cmd="execfile(pbandpassconfig)";
00287     rmec.execute(cmd);
00288     #
00289     # Override the parameters if required
00290     #
00291     if (caltable != ""):
00292         rmec.execute(rcmd("caltable", "=", caltable));
00293     if (field != ""):
00294         rmec.execute(rcmd("field",    "=", field));
00295     if (spw != ""):
00296         rmec.execute(rcmd("spw",      "=", spw));
00297     if (solint != ""):
00298         rmec.execute(rcmd("solint",   "=", solint));
00299     if ( gaintable != ""):
00300         rmec.execute(rcmd("gaintable","=", gaintable));
00301     #
00302     # This is the "distributed" parameter (i.e., different at each node)
00303     #
00304     for i in range(len(nodes)):
00305         msname=mkname(nodes[i],rootdir,workdir,msbasename,"");
00306         mec.execute(rcmd("vis","=",msname),i);
00307 #
00308 #===============================================================
00309 #
00310 def setUpApplycal(rmec,field="", spw="", gaintable=[''],
00311                   interp=[''],
00312                   rootdir=PARALLEL_BASE_ROOT_DIR,
00313                   workdir=PARALLEL_WORK_DIR,
00314                   msbasename="pevla2_1h.ms-",
00315                   imagebasename="ptest.im."):
00316     #
00317     # Load the defaults from a user defined configfile
00318     #
00319     rmec.execute(rcmd("papplycalconfig","=",configfile));
00320     cmd="execfile(papplycalconfig)";
00321     rmec.execute(cmd);
00322     #
00323     # Override the parameters if required
00324     #
00325     if (field != ""):
00326         rmec.execute(rcmd("field", "=", field));
00327     if (spw != ""):
00328         rmec.execute(rcmd("spw", "=", spw));
00329     #
00330     # set gaintable - this is a list of strings
00331     # set interp    - this is a list of strings
00332     #
00333     #
00334     # This is the "distributed" parameter (i.e., different at each node)
00335     #
00336     for i in range(len(nodes)):
00337         msname=mkname(nodes[i],rootdir,workdir,msbasename,"");
00338         mec.execute(rcmd("vis","=",msname),i);
00339 #
00340 #===============================================================
00341 #
00342 def setUpClean(rmec,nchan,niter=1000, nodes=[1,2,3,4],
00343                configfile=PARALLEL_CONFIGFILES_DIR+"clean.last",
00344                rootdir=PARALLEL_BASE_ROOT_DIR,
00345                workdir=PARALLEL_WORK_DIR,
00346                msbasename="pevla2_1h.ms-",
00347                imagebasename="ptest.im."):
00348     #
00349     # Load the defaults from a user defined configfile
00350     #
00351     rmec.execute(rcmd("pcleanconfig","=",configfile));
00352     cmd="execfile(pcleanconfig)";
00353     rmec.execute(cmd);
00354     #
00355     # Override the parameters if required
00356     #
00357     if (nchan > 0):
00358         cmd="nchan="+str(nchan);
00359         rmec.execute(cmd);
00360     if (niter > -1):
00361         cmd="niter="+str(niter);
00362         rmec.execute(cmd);
00363     for i in range(len(nodes)):
00364         msname=mkname(nodes[i],rootdir,workdir,msbasename,"");
00365         imgname=mkname(nodes[i],rootdir,workdir,imagebasename,"");
00366 #         print "Setting vis=",msname;
00367         mec.execute(rcmd("vis","=",msname),i);
00368         mec.execute(rcmd("imagename","=",imgname),i);
00369 #
00370 #===============================================================
00371 # Do a spectral line deconvolution.
00372 def pspectralline(rmec,nodes,nchan,niter=1000,
00373                   msbasename="pevla2_1h.16.ms-",
00374                   imagebasename="ptest.cont.im.",
00375                   configfile=PARALLEL_CONFIGFILES_DIR+"clean.last",
00376                   rootdir=PARALLEL_BASE_ROOT_DIR,
00377                   workdir=PARALLEL_WORK_DIR):
00378     setUpClean(rmec,nchan,niter,nodes,configfile,
00379                rootdir,workdir,msbasename,
00380                imagebasename);
00381     cmd="go('clean')"
00382     mec.execute(cmd);
00383 #
00384 #===============================================================
00385 # Do a continuum deconvolution.
00386 def pcontinuum(mec,iatool,deconvtool,nodes,
00387                minoriter=1000,minorpermajor=200,majoriter=5,
00388                majorconfigfile=PARALLEL_CONFIGFILES_DIR+"clean_mfs_major.last",
00389                minorconfigfile="",
00390                msbasename="pevla2_1h.16.ms-",
00391                imagebasename="ptest.cont.im.",
00392                rootdir=PARALLEL_BASE_ROOT_DIR,
00393                workdir=PARALLEL_WORK_DIR):
00394     n = len(nodes);
00395     dirtyimagename     = "dirtyavg.im.sum";
00396     avgpsfname         = "psfavg.im.sum";
00397     modelimagename     = "avgcomp.im";
00398     tmpModelImageName  = "tmp.avgcomp.im";
00399     minorIterRemaining = minoriter;
00400     minorIterPerMajor  = minorpermajor;
00401     casalog.origin("PDeconv::pcontinuum");
00402     for major in range(majoriter):
00403         #
00404         #--------------------Parallel Major Cycle------------------
00405         #
00406         casalog.post("====================================================================");
00407         mesg="Making residual images for major cycle no. " + str(major) + " (the parallel operation)";
00408         casalog.post(mesg);
00409         mkResOnly(mec,nodes,configfile=majorconfigfile,rootdir=rootdir,workdir=workdir,
00410                   msbasename=msbasename,imagebasename=imagebasename);
00411         cmd="go('clean')"
00412         mec.execute(cmd);
00413         #
00414         #--------------------Gather residuals----------------------
00415         #
00416         casalog.post("Making continuum residual image (the gather operation)");
00417         casalog.post("   Making average residual image...");
00418         mkContResImgWorkaround(mec,nodes,iatool,imagebasename=imagebasename,
00419                                outimagename=dirtyimagename);
00420         #
00421         # The PSF needs to be computed only once...being lazy, for now I am
00422         # letting it compute in every major cycle.
00423         #
00424         casalog.post("   Making average PSF image...");
00425         mkContResImgWorkaround(mec,nodes,iatool,imagebasename=imagebasename,
00426                                extension=".psf",outimagename=avgpsfname);
00427         #
00428         #--------------------Serial minor cycle--------------------
00429         #
00430         casalog.post("Doing the minor cycle (the serial operation)");
00431         if (minorIterRemaining > minorIterPerMajor):
00432             minorIterRemaining = minorIterPerMajor;
00433         if (minorIterRemaining > 0):
00434             casalog.post("Doing "+str(minorIterRemaining)+ " minor cycle iterations...");
00435             deconvtool.open(dirtyimagename, avgpsfname);
00436             deconvtool.clarkclean(niter=minorIterRemaining,model=tmpModelImageName);
00437             accumulateModel(tmpModelImageName,modelimagename,iatool);
00438         minorIterRemaining = minoriter - (major+1)*minorIterPerMajor;
00439         #
00440         #--------------------Scatter/update model------------------
00441         #
00442         casalog.post("Scattering the model image");
00443         for i in range(n):
00444             remotemodelimage=mkname(nodes[i],rootdir,workdir,imagebasename,".model");
00445             cmd = "rm -rf " + remotemodelimage;
00446             os.system(cmd);
00447             cmd = "cp -r " + modelimagename + " " + remotemodelimage;
00448             os.system(cmd);
00449             remoteresidualimage=mkname(nodes[i],rootdir,workdir,imagebasename,".residual");
00450             setModelImgCoordsys(remotemodelimage, remoteresidualimage);
00451 #         casalog.post("Synchornizing by sleeping for 10s....zzz.z.z...zzz....");
00452 #         os.system("sleep 10");
00453 #
00454 #===============================================================
00455 # A routine used for debugging continuum deconvolution.
00456 # The "dowhat" variable allows doing each step of the major-minor
00457 # cycle iteration at a time.
00458 #
00459 def pcontinuumdebugger(mec,iatool,deconvtool,nodes,
00460                        minorconfigfile,
00461                        majorconfigfile,
00462                        dowhat,
00463                        rootdir=PARALLEL_BASE_ROOT_DIR,
00464                        workdir=PARALLEL_WORK_DIR,
00465                        msbasename="pevla2_1h.16.ms-",
00466                        imagebasename="ptest.cont.im.",
00467                        minoriter=100,majoriter=10):
00468     n = len(nodes);
00469     dirtyimagename="dirtyavg.im.sum";
00470     avgpsfname = "psfavg.im.sum";
00471     modelimagename="avgcomp.im";
00472     tmpModelImageName="tmp.avgcomp.im";
00473     minorIterRemaining=minoriter;
00474     minorIterPerMajor = 100;
00475     
00476     major=0;
00477     if (dowhat == 1):
00478         print "Making residual images for major cycle no. ", major, "(the scatter operation)";
00479         mkResOnly(mec,nodes,configfile=majorconfigfile,rootdir=rootdir,workdir=workdir,
00480                   msbasename=msbasename,imagebasename=imagebasename);
00481         cmd="go('clean')"
00482         mec.execute(cmd);
00483 
00484     if (dowhat == 2):
00485         print "Making continuum dirt image (the gather operation)"
00486         print "   Making average dirty image...";
00487         mkContResImgWorkaround(mec,nodes,iatool,imagebasename=imagebasename,
00488                                outimagename=dirtyimagename);
00489         print "   Making average PSF image...";
00490         mkContResImgWorkaround(mec,nodes,iatool,imagebasename=imagebasename,
00491                                extension=".psf",outimagename=avgpsfname);
00492 
00493     if (dowhat == 3):
00494         print minorIterRemaining, minorIterPerMajor;
00495         print "Doing the minor cycle (the serial operation)"
00496         if (minorIterRemaining > minorIterPerMajor):
00497             minorIterRemaining = minorIterPerMajor;
00498         if (minorIterRemaining > 0):
00499             print "Doing ", minorIterRemaining, " minor cycle iterations...";
00500             print dirtyimagename, avgpsfname;
00501             deconvtool.open(dirtyimagename, avgpsfname);
00502             deconvtool.clarkclean(niter=minorIterRemaining,model=tmpModelImageName);
00503             deconvtool.done();
00504             accumulateModel(tmpModelImageName, modelimagename,iatool);
00505         minorIterRemaining = minoriter - (major+1)*minorIterPerMajor;
00506 
00507     if (dowhat == 4):
00508         print "Scattering the model image"
00509         for i in range(n):
00510             remotemodelimage=mkname(nodes[i],rootdir,workdir,imagebasename,".model");
00511             cmd = "rm -rf " + remotemodelimage;
00512             os.system(cmd);
00513             cmd = "cp -r " + modelimagename + " " + remotemodelimage;
00514             os.system(cmd);
00515             remoteresidualimage=mkname(nodes[i],rootdir,workdir,imagebasename,".residual");
00516             setModelImgCoordsys(remotemodelimage, remoteresidualimage);
00517 #         print "Synchornizing by sleeping for 10s....zzz.z.z...zzz....";
00518 #         os.system("sleep 10");