casa
$Rev:20696$
|
00001 import os, re 00002 import string 00003 import time 00004 import shutil 00005 from taskinit import casalog, mstool, qa, tbtool, write_history 00006 from update_spw import update_spwchan 00007 from parallel.parallel_task_helper import ParallelTaskHelper 00008 import partitionhelper as ph 00009 00010 def split(vis, outputvis, datacolumn, field, spw, width, antenna, 00011 timebin, timerange, scan, intent, array, uvrange, 00012 correlation, observation, combine, keepflags, keepmms): 00013 """Create a visibility subset from an existing visibility set: 00014 00015 Keyword arguments: 00016 vis -- Name of input visibility file (MS) 00017 default: none; example: vis='ngc5921.ms' 00018 outputvis -- Name of output visibility file (MS) 00019 default: none; example: outputvis='ngc5921_src.ms' 00020 datacolumn -- Which data column to split out 00021 default='corrected'; example: datacolumn='data' 00022 Options: 'data', 'corrected', 'model', 'all', 00023 'float_data', 'lag_data', 'float_data,data', and 00024 'lag_data,data'. 00025 note: 'all' = whichever of the above that are present. 00026 field -- Field name 00027 default: field = '' means use all sources 00028 field = 1 # will get field_id=1 (if you give it an 00029 integer, it will retrieve the source with that index) 00030 field = '1328+307' specifies source '1328+307'. 00031 Minimum match can be used, egs field = '13*' will 00032 retrieve '1328+307' if it is unique or exists. 00033 Source names with imbedded blanks cannot be included. 00034 spw -- Spectral window index identifier 00035 default=-1 (all); example: spw=1 00036 antenna -- antenna names 00037 default '' (all), 00038 antenna = '3 & 7' gives one baseline with antennaid = 3,7. 00039 timebin -- Interval width for time averaging. 00040 default: '0s' or '-1s' (no averaging) 00041 example: timebin='30s' 00042 timerange -- Time range 00043 default='' means all times. examples: 00044 timerange = 'YYYY/MM/DD/hh:mm:ss~YYYY/MM/DD/hh:mm:ss' 00045 timerange='< YYYY/MM/DD/HH:MM:SS.sss' 00046 timerange='> YYYY/MM/DD/HH:MM:SS.sss' 00047 timerange='< ddd/HH:MM:SS.sss' 00048 timerange='> ddd/HH:MM:SS.sss' 00049 scan -- Scan numbers to select. 00050 default '' (all). 00051 intent -- Scan intents to select. 00052 default '' (all). 00053 array -- (Sub)array IDs to select. 00054 default '' (all). 00055 uvrange -- uv distance range to select. 00056 default '' (all). 00057 correlation -- Select correlations, e.g. 'rr, ll' or ['XY', 'YX']. 00058 default '' (all). 00059 observation -- Select by observation ID(s). 00060 default '' (all). 00061 combine -- Data descriptors that time averaging can ignore: 00062 scan, and/or state 00063 Default '' (none) 00064 keepflags -- Keep flagged data, if possible 00065 Default True 00066 00067 keepmms -- If the input is a multi-MS, make the output one, too. (experimental) 00068 Default: False 00069 00070 """ 00071 00072 casalog.origin('split') 00073 mylocals = locals() 00074 rval = True 00075 try: 00076 00077 if (keepmms and ParallelTaskHelper.isParallelMS(vis)): 00078 if (timebin!='0s' and timebin!='-1s'): 00079 casalog.post('Averaging over time with keepmms=True may lead to results different\n' 00080 +' from those obtained with keepmms=False due to different binning.', 'WARN') 00081 00082 myms = mstool() 00083 myms.open(vis) 00084 mses = myms.getreferencedtables() 00085 myms.close() 00086 mses.sort() 00087 00088 nfail = 0 00089 if os.path.exists(outputvis): 00090 raise ValueError, "Output MS %s already exists - will not overwrite." % outputvis 00091 tempout = outputvis+str(time.time()) 00092 os.mkdir(tempout) 00093 successfulmses = [] 00094 mastersubms = '' 00095 masterptab = '' 00096 emptyptab = tempout+'/EMPTY_POINTING' 00097 nochangeinpointing = (str(antenna)+str(timerange)=='') 00098 00099 if nochangeinpointing: 00100 # resulting pointing table is the same for all 00101 # -> replace by empty table if it is a link and won't be modified anyway 00102 # and put back original into the master after split 00103 00104 # find the master 00105 for m in mses: 00106 theptab = m+'/POINTING' 00107 if not os.path.islink(theptab): 00108 #print "is master ", theptab 00109 mastersubms = m 00110 masterptab = m+'/POINTING' 00111 # save time by not copying the POINTING table len(mses) times 00112 myttb = tbtool() 00113 myttb.open(masterptab) 00114 tmpp = myttb.copy(newtablename=emptyptab, norows=True) 00115 myttb.close() 00116 del myttb 00117 tmpp.close() 00118 del tmpp 00119 break 00120 00121 mytb = tbtool() 00122 00123 # prepare the input MMS for processing 00124 replaced = [] 00125 outputviss = [] 00126 theptabs = [] 00127 00128 for m in mses: 00129 00130 # make sure the SORTED_TABLE keywords are disabled 00131 mytb.open(m, nomodify=False) 00132 if 'SORTED_TABLE' in mytb.keywordnames(): 00133 tobedel = mytb.getkeyword('SORTED_TABLE').split(' ')[1] 00134 mytb.removekeyword('SORTED_TABLE') 00135 os.system('rm -rf '+tobedel) 00136 mytb.close() 00137 00138 # deal with the POINTING table 00139 theptab = m+'/POINTING' 00140 theptabs.append(theptab) 00141 00142 if nochangeinpointing and os.path.islink(theptab): 00143 #print "is link ", theptab 00144 os.remove(theptab) 00145 shutil.copytree(emptyptab, theptab) 00146 replaced.append(True) 00147 else: 00148 replaced.append(False) 00149 00150 # run split 00151 outputviss.append(os.path.abspath(tempout+'/'+os.path.basename(m))) 00152 # end for 00153 00154 # send off the jobs 00155 print 'Running split_core ... ' 00156 helper = ParallelTaskHelper('split', mylocals) 00157 helper.override_arg('outputvis',outputviss) 00158 helper._consolidateOutput = False 00159 goretval = helper.go() 00160 00161 for i in xrange(len(mses)): 00162 m = mses[i] 00163 00164 # deal with the POINTING table 00165 if replaced[i]: 00166 # restore link 00167 shutil.rmtree(theptabs[i], ignore_errors=True) 00168 os.symlink('../'+os.path.basename(mastersubms)+'/POINTING', theptabs[i]) 00169 # (link in target will be created my makeMMS) 00170 00171 # accumulate list of successful splits 00172 if not goretval[m]: 00173 nfail+=1 00174 else: 00175 successfulmses.append(outputviss[i]) 00176 00177 if nfail>0: # there were unsuccessful splits 00178 if len(successfulmses)==0: 00179 casalog.post('Split failed in all subMSs.', 'WARN') 00180 rval=False 00181 else: 00182 casalog.post('*** Summary: there were failures in '+str(nfail)+' SUBMSs:', 'WARN') 00183 casalog.post('*** (these are harmless if they are caused by selection):', 'WARN') 00184 for m in mses: 00185 if not goretval[m]: 00186 casalog.post(os.path.basename(m)+': '+str(goretval[m]), 'WARN') 00187 else: 00188 casalog.post(os.path.basename(m)+': '+str(goretval[m]), 'NORMAL') 00189 00190 casalog.post('Will construct MMS from subMSs with successful selection ...', 'NORMAL') 00191 00192 if nochangeinpointing: # need to take care of POINTING table 00193 # in case the master subms did not make it 00194 if not (tempout+'/'+os.path.basename(mastersubms) in successfulmses): 00195 # old master subms was not selected. 00196 # copy the original masterptab into the new master 00197 shutil.rmtree(successfulmses[0]+'/POINTING') 00198 shutil.copytree(masterptab, successfulmses[0]+'/POINTING') 00199 00200 if rval: # construct new MMS from the output 00201 if(width==1 and str(field)+str(spw)+str(antenna)+str(timerange)+str(scan)+str(intent)\ 00202 +str(array)+str(uvrange)+str(correlation)+str(observation)==''): 00203 ph.makeMMS(outputvis, successfulmses) 00204 else: 00205 myms.open(successfulmses[0], nomodify=False) 00206 auxfile = "split_aux_"+str(time.time()) 00207 for i in xrange(1,len(successfulmses)): 00208 myms.virtconcatenate(successfulmses[i], auxfile, '1Hz', '10mas', True) 00209 myms.close() 00210 os.remove(auxfile) 00211 ph.makeMMS(outputvis, successfulmses, True, ['POINTING']) 00212 00213 00214 shutil.rmtree(tempout, ignore_errors=True) 00215 00216 00217 00218 else: # do not output an MMS 00219 00220 rval = split_core(vis, outputvis, datacolumn, field, spw, width, antenna, 00221 timebin, timerange, scan, intent, array, uvrange, 00222 correlation, observation, combine, keepflags) 00223 00224 except Exception, instance: 00225 casalog.post("*** Error: %s" % (instance), 'SEVERE') 00226 rval = False 00227 00228 00229 return rval 00230 00231 def split_core(vis, outputvis, datacolumn, field, spw, width, antenna, 00232 timebin, timerange, scan, intent, array, uvrange, 00233 correlation, observation, combine, keepflags): 00234 00235 retval = True 00236 00237 if not outputvis or outputvis.isspace(): 00238 raise ValueError, 'Please specify outputvis' 00239 00240 myms = mstool() 00241 mytb = None 00242 if ((type(vis)==str) & (os.path.exists(vis))): 00243 myms.open(vis, nomodify=True) 00244 else: 00245 raise ValueError, 'Visibility data set not found - please verify the name' 00246 if os.path.exists(outputvis): 00247 myms.close() 00248 raise ValueError, "Output MS %s already exists - will not overwrite." % outputvis 00249 00250 # No longer needed. When did it get put in? Note that the default 00251 # spw='*' in myms.split ends up as '' since the default type for a variant 00252 # is BOOLVEC. (Of course!) Therefore both split and myms.split must 00253 # work properly when spw=''. 00254 #if(spw == ''): 00255 # spw = '*' 00256 00257 if(type(antenna) == list): 00258 antenna = ', '.join([str(ant) for ant in antenna]) 00259 00260 ## Accept digits without units ...assume seconds 00261 timebin = qa.convert(qa.quantity(timebin), 's')['value'] 00262 timebin = str(timebin) + 's' 00263 00264 if timebin == '0s': 00265 timebin = '-1s' 00266 00267 # MSStateGram is picky ('CALIBRATE_WVR.REFERENCE, OBSERVE_TARGET_ON_SOURCE' 00268 # doesn't work, but 'CALIBRATE_WVR.REFERENCE,OBSERVE_TARGET_ON_SOURCE' 00269 # does), and I don't want to mess with bison now. A .upper() might be a 00270 # good idea too, but the MS def'n v.2 does not say whether OBS_MODE should 00271 # be case-insensitive. 00272 intent = intent.replace(', ', ',') 00273 00274 if '^' in spw: 00275 casalog.post("The interpretation of ^n in split's spw strings has changed from 'average n' to 'skip n' channels!", 'WARN') 00276 casalog.post("Watch for Slicer errors", 'WARN') 00277 00278 if type(width) == str: 00279 try: 00280 if(width.isdigit()): 00281 width=[string.atoi(width)] 00282 elif(width.count('[') == 1 and width.count(']') == 1): 00283 width = width.replace('[', '') 00284 width = width.replace(']', '') 00285 splitwidth = width.split(',') 00286 width = [] 00287 for ws in splitwidth: 00288 if(ws.isdigit()): 00289 width.append(string.atoi(ws)) 00290 else: 00291 width = [1] 00292 except: 00293 raise TypeError, 'parameter width is invalid...using 1' 00294 00295 if type(correlation) == list: 00296 correlation = ', '.join(correlation) 00297 correlation = correlation.upper() 00298 00299 if hasattr(combine, '__iter__'): 00300 combine = ', '.join(combine) 00301 00302 if type(spw) == list: 00303 spw = ','.join([str(s) for s in spw]) 00304 elif type(spw) == int: 00305 spw = str(spw) 00306 do_chan_mod = spw.find('^') > -1 # '0:2~11^1' would be pointless. 00307 if not do_chan_mod: # ...look in width. 00308 if type(width) == int and width > 1: 00309 do_chan_mod = True 00310 elif hasattr(width, '__iter__'): 00311 for w in width: 00312 if w > 1: 00313 do_chan_mod = True 00314 break 00315 00316 do_both_chan_and_time_mod = (do_chan_mod and 00317 string.atof(timebin[:-1]) > 0.0) 00318 if do_both_chan_and_time_mod: 00319 # Do channel averaging first because it might be included in the spw 00320 # string. 00321 import tempfile 00322 # We want the directory outputvis is in, not /tmp, because /tmp 00323 # might not have enough space. 00324 # outputvis is itself a directory, so strip off a trailing slash if 00325 # it is present. 00326 # I don't know if giving tempfile an absolute directory is necessary - 00327 # dir='' is effectively '.' in Ubuntu. 00328 workingdir = os.path.abspath(os.path.dirname(outputvis.rstrip('/'))) 00329 cavms = tempfile.mkdtemp(suffix=outputvis, dir=workingdir) 00330 00331 casalog.post('Channel averaging to ' + cavms) 00332 if not myms.split(outputms=cavms, field=field, 00333 spw=spw, step=width, 00334 baseline=antenna, subarray=array, 00335 timebin='', time=timerange, 00336 whichcol=datacolumn, 00337 scan=scan, uvrange=uvrange, 00338 combine=combine, 00339 correlation=correlation, intent=intent, 00340 obs=str(observation)): 00341 myms.close() 00342 if os.path.isdir(cavms): 00343 import shutil 00344 shutil.rmtree(cavms) 00345 return False 00346 00347 # The selection was already made, so blank them before time averaging. 00348 field = '' 00349 spw = '' 00350 width = [1] 00351 antenna = '' 00352 array = '' 00353 timerange = '' 00354 datacolumn = 'all' 00355 scan = '' 00356 intent = '' 00357 uvrange = '' 00358 observation = '' 00359 00360 myms.close() 00361 myms.open(cavms) 00362 casalog.post('Starting time averaging') 00363 00364 if keepflags: 00365 taqlstr = '' 00366 else: 00367 taqlstr = 'NOT (FLAG_ROW OR ALL(FLAG))' 00368 00369 if not myms.split(outputms=outputvis, field=field, 00370 spw=spw, step=width, 00371 baseline=antenna, subarray=array, 00372 timebin=timebin, time=timerange, 00373 whichcol=datacolumn, 00374 scan=scan, uvrange=uvrange, 00375 combine=combine, 00376 correlation=correlation, 00377 taql=taqlstr, intent=intent, 00378 obs=str(observation)): 00379 myms.close() 00380 return False 00381 myms.close() 00382 00383 if do_both_chan_and_time_mod: 00384 import shutil 00385 shutil.rmtree(cavms) 00386 00387 # Write history to output MS, not the input ms. 00388 try: 00389 param_names = split_core.func_code.co_varnames[:split_core.func_code.co_argcount] 00390 param_vals = [eval(p) for p in param_names] 00391 retval &= write_history(myms, outputvis, 'split', param_names, param_vals, 00392 casalog) 00393 except Exception, instance: 00394 casalog.post("*** Error \'%s\' updating HISTORY" % (instance), 00395 'WARN') 00396 00397 # Update FLAG_CMD if necessary. 00398 if ((spw != '') and (spw != '*')) or do_chan_mod: 00399 isopen = False 00400 mytb = tbtool() 00401 try: 00402 mytb.open(outputvis + '/FLAG_CMD', nomodify=False) 00403 isopen = True 00404 nflgcmds = mytb.nrows() 00405 00406 if nflgcmds > 0: 00407 mademod = False 00408 cmds = mytb.getcol('COMMAND') 00409 widths = {} 00410 #print "width =", width 00411 if hasattr(width, 'has_key'): 00412 widths = width 00413 else: 00414 if hasattr(width, '__iter__') and len(width) > 1: 00415 for i in xrange(len(width)): 00416 widths[i] = width[i] 00417 elif width != 1: 00418 #print 'using myms.msseltoindex + a scalar width' 00419 nspw = len(myms.msseltoindex(vis=vis, 00420 spw='*')['spw']) 00421 if hasattr(width, '__iter__'): 00422 w = width[0] 00423 else: 00424 w = width 00425 for i in xrange(nspw): 00426 widths[i] = w 00427 #print 'widths =', widths 00428 for rownum in xrange(nflgcmds): 00429 # Matches a bare number or a string quoted any way. 00430 spwmatch = re.search(r'spw\s*=\s*(\S+)', cmds[rownum]) 00431 if spwmatch: 00432 sch1 = spwmatch.groups()[0] 00433 sch1 = re.sub(r"[\'\"]", '', sch1) # Dequote 00434 # Provide a default in case the split selection excludes 00435 # cmds[rownum]. update_spwchan() will throw an exception 00436 # in that case. 00437 cmd = '' 00438 try: 00439 #print 'sch1 =', sch1 00440 sch2 = update_spwchan(vis, spw, sch1, truncate=True, 00441 widths=widths) 00442 #print 'sch2 =', sch2 00443 ##print 'spwmatch.group() =', spwmatch.group() 00444 if sch2: 00445 repl = '' 00446 if sch2 != '*': 00447 repl = "spw='" + sch2 + "'" 00448 cmd = cmds[rownum].replace(spwmatch.group(), repl) 00449 #except: # cmd[rownum] no longer applies. 00450 except Exception, e: 00451 casalog.post( 00452 "Error %s updating row %d of FLAG_CMD" % (e, 00453 rownum), 00454 'WARN') 00455 casalog.post('sch1 = ' + sch1, 'DEBUG1') 00456 casalog.post('cmd = ' + cmd, 'DEBUG1') 00457 if cmd != cmds[rownum]: 00458 mademod = True 00459 cmds[rownum] = cmd 00460 if mademod: 00461 casalog.post('Updating FLAG_CMD', 'INFO') 00462 mytb.putcol('COMMAND', cmds) 00463 00464 mytb.close() 00465 00466 00467 except Exception, instance: 00468 if isopen: 00469 mytb.close() 00470 myms = None 00471 mytb = None 00472 casalog.post("*** Error \'%s\' updating FLAG_CMD" % (instance), 00473 'SEVERE') 00474 return False 00475 00476 myms = None 00477 mytb = None 00478 00479 return retval