casa
$Rev:20696$
|
00001 import os 00002 import re 00003 import shutil 00004 import time 00005 import tempfile 00006 00007 # shutil.copytree is useless with directories created by tempfile 00008 # (or any directories that already exist). 00009 from distutils.dir_util import copy_tree 00010 00011 from taskinit import * 00012 from update_spw import * 00013 00014 from parallel.parallel_task_helper import ParallelTaskHelper 00015 from virtualconcat_cli import virtualconcat_cli as virtualconcat 00016 00017 mycb, myms, mytb = gentools(['cb', 'ms', 'tb']) 00018 00019 def uvcontsub(vis, field, fitspw, combine, solint, fitorder, spw, want_cont): 00020 00021 if ParallelTaskHelper.isParallelMS(vis): 00022 helper = ParallelTaskHelper('uvcontsub', locals()) 00023 helper._consolidateOutput = False 00024 retVar = helper.go() 00025 00026 # Gather the list of continuum subtraction-SubMSs 00027 cont_subMS_list = [] 00028 contsub_subMS_list = [] 00029 for subMS in retVar: 00030 if retVar[subMS]: 00031 cont_subMS_list.append(subMS + ".cont") 00032 contsub_subMS_list.append(subMS + ".contsub") 00033 00034 # We have to sort the list because otherwise it 00035 # depends on the time the engines dispatches their sub-MSs 00036 cont_subMS_list.sort() 00037 contsub_subMS_list.sort() 00038 00039 # deal with the pointing table 00040 auxfile = "uvcontsub_aux2_"+str(time.time()) 00041 pnrows = 0 00042 try: 00043 mytb.open(vis+'/POINTING') 00044 pnrows = mytb.nrows() 00045 mytb.close() 00046 if(pnrows>0): 00047 shutil.copytree(os.path.realpath(vis+'/POINTING'), auxfile) 00048 except Exception, instance: 00049 casalog.post("Error handling POINTING table %s: %s" % 00050 (vis+'/POINTING',str(instance)),'SEVERE') 00051 00052 if want_cont: 00053 try: 00054 virtualconcat(concatvis=helper._arg['vis'] + ".cont",vis=cont_subMS_list, 00055 copypointing=False) 00056 except Exception, instance: 00057 casalog.post("Error concatenating continuum sub-MSs %s: %s" % 00058 (str(cont_subMS_list),str(instance)),'SEVERE') 00059 try: 00060 virtualconcat(concatvis=helper._arg['vis'] + ".contsub",vis=contsub_subMS_list, 00061 copypointing=False) 00062 except Exception, instance: 00063 casalog.post("Error concatenating continuum-subtracted sub-MSs %s: %s" % 00064 (str(contsub_subMS_list),str(instance)),'SEVERE') 00065 00066 # Remove continuum subtraction-SubMSs 00067 if want_cont: 00068 for subMS in cont_subMS_list: 00069 if (os.system("rm -rf " + subMS ) != 0): 00070 casalog.post("Problem removing continuum sub-MS %s into working directory" % (subMS),'SEVERE') 00071 for subMS in contsub_subMS_list: 00072 if (os.system("rm -rf " + subMS ) != 0): 00073 casalog.post("Problem removing continuum-subtracted sub-MS %s into working directory" % (subMS),'SEVERE') 00074 00075 if(pnrows>0): # put back the POINTING table 00076 casalog.post("Restoring the POINTING table ...",'NORMAL') 00077 try: 00078 if want_cont: 00079 theptab = os.path.realpath(helper._arg['vis'] + ".cont/POINTING") 00080 os.system('rm -rf '+theptab) 00081 shutil.copytree(auxfile, theptab) 00082 theptab = os.path.realpath(helper._arg['vis'] + ".contsub/POINTING") 00083 os.system('rm -rf '+theptab) 00084 os.system('mv '+auxfile+' '+theptab) 00085 except Exception, instance: 00086 casalog.post("Error restoring pointing table from %s: %s" % 00087 (auxfile,str(instance)),'SEVERE') 00088 00089 # Restore origin (otherwise gcwrap shows virtualconcat) 00090 casalog.origin('uvcontsub') 00091 00092 return True 00093 00094 # Run normal code 00095 try: 00096 casalog.origin('uvcontsub') 00097 00098 # Get these checks done and out of the way. 00099 # This one is redundant - it is already checked at the XML level. 00100 if not os.path.isdir(vis): 00101 raise Exception, 'Visibility data set not found - please verify the name' 00102 00103 00104 mytb.open(vis + '/SPECTRAL_WINDOW') 00105 allspw = '0~' + str(mytb.nrows() - 1) 00106 mytb.close() 00107 if 'spw' not in combine: 00108 spwmfitspw = subtract_spws(spw, fitspw) 00109 if spwmfitspw == 'UNKNOWN': 00110 spwmfitspw = subtract_spws(allspw, fitspw) 00111 if spwmfitspw: 00112 raise Exception, "combine must include 'spw' when the fit is being applied to spws outside fitspw." 00113 00114 # cb will put the continuum-subtracted data in CORRECTED_DATA, so 00115 # protect vis by splitting out its relevant parts to a working copy. 00116 csvis = vis.rstrip('/') + '.contsub' 00117 00118 # The working copy will need all of the channels in fitspw + spw, so we 00119 # may or may not need to filter it down to spw at the end. 00120 myfitspw = fitspw 00121 myspw = spw 00122 00123 # We only need the spws in the union of fitspw and spw, but keep all 00124 # the channels or the weights (as used by calibrater) will be messed up. 00125 tempspw = re.sub(r':[^,]+', '', join_spws(fitspw, spw)) 00126 if tempspw == allspw: 00127 tempspw = '' 00128 if tempspw: 00129 # The split will reindex the spws. Update spw and fitspw. 00130 # Do fitspw first because the spws in spw are supposed to be 00131 # a subset of the ones in fitspw. 00132 casalog.post('split is being run internally, and the selected spws') 00133 casalog.post('will be renumbered to start from 0 in the output!') 00134 00135 # Now get myfitspw. 00136 myfitspw = update_spwchan(vis, tempspw, fitspw) 00137 myspw = update_spwchan(vis, tempspw, spw) 00138 00139 final_csvis = csvis 00140 workingdir = os.path.abspath(os.path.dirname(vis.rstrip('/'))) 00141 csvis = tempfile.mkdtemp(prefix=csvis.split('/')[-1], dir=workingdir) 00142 00143 # ms does not have a colnames method, so open vis with tb even though 00144 # it is already open with myms. Note that both use nomodify=True, 00145 # however, and no problem was revealed in testing. 00146 mytb.open(vis, nomodify=True) 00147 if 'CORRECTED_DATA' in mytb.colnames(): 00148 whichcol = 'CORRECTED_DATA' 00149 else: 00150 # DON'T remind the user that split before uvcontsub wastes time - 00151 # scratch columns will eventually go away. 00152 whichcol = 'DATA' 00153 mytb.close() 00154 00155 casalog.post('Preparing to add scratch columns.') 00156 myfield = field 00157 if field == '': 00158 myfield = '*' 00159 if myfield != '*' and set(ms.msseltoindex(vis, 00160 field=myfield)['field']) == set(ms.msseltoindex(vis, 00161 field='*')['field']): 00162 myfield = '*' 00163 if whichcol != 'DATA' or tempspw != '' or myfield != '*': 00164 casalog.post('splitting to ' + csvis + ' with spw="' 00165 + tempspw + '"') 00166 myms.open(vis, nomodify=True) 00167 split_result = myms.split(csvis, spw=tempspw, field=myfield, whichcol=whichcol) 00168 myms.close() 00169 # jagonzal (CAS-4113): Take care of the trivial parallelization 00170 if not split_result: 00171 casalog.post("NullSelection: %s does not have data for spw=%s, field=%s, bailing out!" % (vis,tempspw,field),'SEVERE') 00172 shutil.rmtree(csvis) 00173 return False 00174 else: 00175 # This takes almost 30s/GB. (lustre, 8/2011) 00176 casalog.post('Copying ' + vis + ' to ' + csvis + ' with cp.') 00177 copy_tree(vis, csvis) 00178 00179 # It is less confusing if we write the history now that the "root" MS 00180 # is made, but before cb adds its messages. 00181 # 00182 # Not a dict, because we want to maintain the order. 00183 param_names = uvcontsub.func_code.co_varnames[:uvcontsub.func_code.co_argcount] 00184 param_vals = [eval(p) for p in param_names] 00185 00186 write_history(myms, csvis, 'uvcontsub', param_names, param_vals, 00187 casalog) 00188 00189 if (type(csvis) == str) and os.path.isdir(csvis): 00190 mycb.open(csvis) 00191 else: 00192 raise Exception, 'Visibility data set not found - please verify the name' 00193 00194 # select the data for continuum subtraction 00195 mycb.reset() 00196 mycb.selectvis(spw=myfitspw) 00197 00198 # Set up the solve 00199 amuellertab = tempfile.mkdtemp(prefix='Temp_contsub.tab', 00200 dir=workingdir) 00201 00202 mycb.setsolve(type='A', t=solint, table=amuellertab, combine=combine, 00203 fitorder=fitorder) 00204 00205 # solve for the continuum 00206 mycb.solve() 00207 00208 # subtract the continuum 00209 mycb.selectvis(spw=myspw) 00210 aspwmap=-1 00211 # if we combined on spw in solve, fanout the result with spwmap=-999; 00212 if 'spw' in combine: 00213 aspwmap = -999 00214 mycb.setapply(table=amuellertab, spwmap=aspwmap) 00215 00216 # Generate CORRECTED_DATA without continuum 00217 mycb.correct() 00218 00219 if want_cont: 00220 # Generate MODEL_DATA with only the continuum model 00221 mycb.corrupt() 00222 00223 mycb.close() 00224 00225 # Delete the temporary caltable 00226 shutil.rmtree(amuellertab) 00227 00228 # Move the line data from CORRECTED_DATA to DATA, and do any 00229 # final filtering by spw. 00230 myms.open(csvis) 00231 # Using ^ in spw is untested here! 00232 myms.split(final_csvis, spw=myspw, whichcol='corrected') 00233 myms.close() 00234 00235 #casalog.post("\"want_cont\" = \"%s\"" % want_cont, 'WARN') 00236 #casalog.post("\"csvis\" = \"%s\"" % csvis, 'WARN') 00237 if want_cont: 00238 myms.open(csvis) 00239 # No channel averaging (== skipping for fitorder == 0) is done 00240 # here, but it would be a reasonable option. The user can always 00241 # do it by running split again. 00242 myms.split(final_csvis[:-3], # .contsub -> .cont 00243 whichcol='MODEL_DATA', 00244 spw=myspw) 00245 myms.close() 00246 00247 #casalog.post("rming \"%s\"" % csvis, 'WARN') 00248 shutil.rmtree(csvis) 00249 00250 # jagonzal (CAS-4113): We have to return a boolean so that we can identify 00251 # the sub-MS that produce a continuum sub-MS to concatenate at the MMS level 00252 return True 00253 00254 except Exception, instance: 00255 casalog.post('Error in uvcontsub: ' + str(instance), 'SEVERE') 00256 mycb.close() # Harmless if cb is closed. 00257 raise Exception