casa  $Rev:20696$
 All Classes Namespaces Files Functions Variables
task_split.py
Go to the documentation of this file.
00001 import os, re
00002 import string
00003 import time
00004 import shutil
00005 from taskinit import casalog, mstool, qa, tbtool, write_history
00006 from update_spw import update_spwchan
00007 from parallel.parallel_task_helper import ParallelTaskHelper
00008 import partitionhelper as ph
00009 
00010 def split(vis, outputvis, datacolumn, field, spw, width, antenna,
00011           timebin, timerange, scan, intent, array, uvrange,
00012           correlation, observation, combine, keepflags, keepmms):
00013     """Create a visibility subset from an existing visibility set:
00014 
00015     Keyword arguments:
00016     vis -- Name of input visibility file (MS)
00017             default: none; example: vis='ngc5921.ms'
00018     outputvis -- Name of output visibility file (MS)
00019                   default: none; example: outputvis='ngc5921_src.ms'
00020     datacolumn -- Which data column to split out
00021                   default='corrected'; example: datacolumn='data'
00022                   Options: 'data', 'corrected', 'model', 'all',
00023                   'float_data', 'lag_data', 'float_data,data', and
00024                   'lag_data,data'.
00025                   note: 'all' = whichever of the above that are present.
00026     field -- Field name
00027               default: field = '' means  use all sources
00028               field = 1 # will get field_id=1 (if you give it an
00029                           integer, it will retrieve the source with that index)
00030               field = '1328+307' specifies source '1328+307'.
00031                  Minimum match can be used, egs  field = '13*' will
00032                  retrieve '1328+307' if it is unique or exists.
00033                  Source names with imbedded blanks cannot be included.
00034     spw -- Spectral window index identifier
00035             default=-1 (all); example: spw=1
00036     antenna -- antenna names
00037                default '' (all),
00038                antenna = '3 & 7' gives one baseline with antennaid = 3,7.
00039     timebin -- Interval width for time averaging.
00040                default: '0s' or '-1s' (no averaging)
00041                example: timebin='30s'
00042     timerange -- Time range
00043                  default='' means all times.  examples:
00044                  timerange = 'YYYY/MM/DD/hh:mm:ss~YYYY/MM/DD/hh:mm:ss'
00045                  timerange='< YYYY/MM/DD/HH:MM:SS.sss'
00046                  timerange='> YYYY/MM/DD/HH:MM:SS.sss'
00047                  timerange='< ddd/HH:MM:SS.sss'
00048                  timerange='> ddd/HH:MM:SS.sss'
00049     scan -- Scan numbers to select.
00050             default '' (all).
00051     intent -- Scan intents to select.
00052             default '' (all).
00053     array -- (Sub)array IDs to select.     
00054              default '' (all).
00055     uvrange -- uv distance range to select.
00056                default '' (all).
00057     correlation -- Select correlations, e.g. 'rr, ll' or ['XY', 'YX'].
00058                    default '' (all).
00059     observation -- Select by observation ID(s).
00060                    default '' (all).
00061     combine -- Data descriptors that time averaging can ignore:
00062                   scan, and/or state
00063                   Default '' (none)
00064     keepflags -- Keep flagged data, if possible
00065                  Default True
00066 
00067     keepmms -- If the input is a multi-MS, make the output one, too. (experimental)
00068                Default: False
00069                  
00070     """
00071 
00072     casalog.origin('split')
00073     mylocals = locals()
00074     rval = True
00075     try:
00076 
00077         if (keepmms and ParallelTaskHelper.isParallelMS(vis)): 
00078             if (timebin!='0s' and timebin!='-1s'): 
00079                 casalog.post('Averaging over time with keepmms=True may lead to results different\n'
00080                              +'  from those obtained with keepmms=False due to different binning.', 'WARN')
00081                             
00082             myms = mstool()
00083             myms.open(vis)
00084             mses = myms.getreferencedtables()
00085             myms.close() 
00086             mses.sort()
00087 
00088             nfail = 0
00089             if os.path.exists(outputvis):
00090                 raise ValueError, "Output MS %s already exists - will not overwrite." % outputvis
00091             tempout = outputvis+str(time.time())
00092             os.mkdir(tempout)
00093             successfulmses = []
00094             mastersubms = ''
00095             masterptab = ''
00096             emptyptab = tempout+'/EMPTY_POINTING'
00097             nochangeinpointing = (str(antenna)+str(timerange)=='')
00098 
00099             if nochangeinpointing:    
00100                 # resulting pointing table is the same for all
00101                 #  -> replace by empty table if it is a link and won't be modified anyway
00102                 #     and put back original into the master after split
00103 
00104                 # find the master
00105                 for m in mses:
00106                     theptab = m+'/POINTING'
00107                     if not os.path.islink(theptab):
00108                         #print "is master ", theptab
00109                         mastersubms = m
00110                         masterptab = m+'/POINTING'
00111                         # save time by not copying the POINTING table len(mses) times
00112                         myttb = tbtool()
00113                         myttb.open(masterptab)
00114                         tmpp = myttb.copy(newtablename=emptyptab, norows=True)
00115                         myttb.close()
00116                         del myttb
00117                         tmpp.close()
00118                         del tmpp
00119                         break
00120 
00121             mytb = tbtool()
00122 
00123             # prepare the input MMS for processing
00124             replaced = []
00125             outputviss = []
00126             theptabs = []
00127             
00128             for m in mses:
00129 
00130                 # make sure the SORTED_TABLE keywords are disabled
00131                 mytb.open(m, nomodify=False)
00132                 if 'SORTED_TABLE' in mytb.keywordnames():
00133                     tobedel = mytb.getkeyword('SORTED_TABLE').split(' ')[1]
00134                     mytb.removekeyword('SORTED_TABLE')
00135                     os.system('rm -rf '+tobedel)
00136                 mytb.close()
00137 
00138                 # deal with the POINTING table
00139                 theptab = m+'/POINTING'
00140                 theptabs.append(theptab)
00141 
00142                 if nochangeinpointing and os.path.islink(theptab):
00143                     #print "is link ", theptab
00144                     os.remove(theptab)
00145                     shutil.copytree(emptyptab, theptab)
00146                     replaced.append(True)
00147                 else:
00148                     replaced.append(False)
00149 
00150                 # run split
00151                 outputviss.append(os.path.abspath(tempout+'/'+os.path.basename(m)))
00152             # end for
00153 
00154             # send off the jobs
00155             print 'Running split_core ... '
00156             helper = ParallelTaskHelper('split', mylocals)
00157             helper.override_arg('outputvis',outputviss)
00158             helper._consolidateOutput = False
00159             goretval = helper.go()
00160 
00161             for i in xrange(len(mses)):
00162                 m = mses[i]
00163 
00164                 # deal with the POINTING table
00165                 if replaced[i]:
00166                     # restore link
00167                     shutil.rmtree(theptabs[i], ignore_errors=True)
00168                     os.symlink('../'+os.path.basename(mastersubms)+'/POINTING', theptabs[i])
00169                     # (link in target will be created my makeMMS)
00170 
00171                 # accumulate list of successful splits
00172                 if not goretval[m]:
00173                     nfail+=1
00174                 else:
00175                     successfulmses.append(outputviss[i])
00176 
00177             if nfail>0: # there were unsuccessful splits
00178                 if len(successfulmses)==0:
00179                     casalog.post('Split failed in all subMSs.', 'WARN')
00180                     rval=False
00181                 else:
00182                     casalog.post('*** Summary: there were failures in '+str(nfail)+' SUBMSs:', 'WARN')
00183                     casalog.post('*** (these are harmless if they are caused by selection):', 'WARN')
00184                     for m in mses:
00185                         if not goretval[m]:
00186                             casalog.post(os.path.basename(m)+': '+str(goretval[m]), 'WARN')
00187                         else:
00188                             casalog.post(os.path.basename(m)+': '+str(goretval[m]), 'NORMAL') 
00189 
00190                     casalog.post('Will construct MMS from subMSs with successful selection ...', 'NORMAL')
00191 
00192                     if nochangeinpointing: # need to take care of POINTING table
00193                         # in case the master subms did not make it
00194                         if not (tempout+'/'+os.path.basename(mastersubms) in successfulmses):
00195                             # old master subms was not selected.
00196                             # copy the original masterptab into the new master
00197                             shutil.rmtree(successfulmses[0]+'/POINTING')
00198                             shutil.copytree(masterptab, successfulmses[0]+'/POINTING')
00199                     
00200             if rval: # construct new MMS from the output
00201                 if(width==1 and str(field)+str(spw)+str(antenna)+str(timerange)+str(scan)+str(intent)\
00202                    +str(array)+str(uvrange)+str(correlation)+str(observation)==''):
00203                     ph.makeMMS(outputvis, successfulmses)
00204                 else:
00205                     myms.open(successfulmses[0], nomodify=False)
00206                     auxfile = "split_aux_"+str(time.time())
00207                     for i in xrange(1,len(successfulmses)):
00208                         myms.virtconcatenate(successfulmses[i], auxfile, '1Hz', '10mas', True)
00209                     myms.close()
00210                     os.remove(auxfile)
00211                     ph.makeMMS(outputvis, successfulmses, True, ['POINTING']) 
00212 
00213 
00214             shutil.rmtree(tempout, ignore_errors=True)
00215 
00216 
00217 
00218         else: # do not output an MMS
00219 
00220             rval = split_core(vis, outputvis, datacolumn, field, spw, width, antenna,
00221                               timebin, timerange, scan, intent, array, uvrange,
00222                               correlation, observation, combine, keepflags)
00223 
00224     except Exception, instance:
00225             casalog.post("*** Error: %s" % (instance), 'SEVERE')
00226             rval = False
00227        
00228 
00229     return rval
00230 
00231 def split_core(vis, outputvis, datacolumn, field, spw, width, antenna,
00232                timebin, timerange, scan, intent, array, uvrange,
00233                correlation, observation, combine, keepflags):
00234 
00235     retval = True
00236 
00237     if not outputvis or outputvis.isspace():
00238         raise ValueError, 'Please specify outputvis'
00239 
00240     myms = mstool()
00241     mytb = None
00242     if ((type(vis)==str) & (os.path.exists(vis))):
00243         myms.open(vis, nomodify=True)
00244     else:
00245         raise ValueError, 'Visibility data set not found - please verify the name'
00246     if os.path.exists(outputvis):
00247         myms.close()
00248         raise ValueError, "Output MS %s already exists - will not overwrite." % outputvis
00249 
00250     # No longer needed.  When did it get put in?  Note that the default
00251     # spw='*' in myms.split ends up as '' since the default type for a variant
00252     # is BOOLVEC.  (Of course!)  Therefore both split and myms.split must
00253     # work properly when spw=''.
00254     #if(spw == ''):
00255     #    spw = '*'
00256     
00257     if(type(antenna) == list):
00258         antenna = ', '.join([str(ant) for ant in antenna])
00259 
00260     ## Accept digits without units ...assume seconds
00261     timebin = qa.convert(qa.quantity(timebin), 's')['value']
00262     timebin = str(timebin) + 's'
00263     
00264     if timebin == '0s':
00265         timebin = '-1s'
00266 
00267     # MSStateGram is picky ('CALIBRATE_WVR.REFERENCE, OBSERVE_TARGET_ON_SOURCE'
00268     # doesn't work, but 'CALIBRATE_WVR.REFERENCE,OBSERVE_TARGET_ON_SOURCE'
00269     # does), and I don't want to mess with bison now.  A .upper() might be a
00270     # good idea too, but the MS def'n v.2 does not say whether OBS_MODE should
00271     # be case-insensitive.
00272     intent = intent.replace(', ', ',')
00273 
00274     if '^' in spw:
00275         casalog.post("The interpretation of ^n in split's spw strings has changed from 'average n' to 'skip n' channels!", 'WARN')
00276         casalog.post("Watch for Slicer errors", 'WARN')
00277         
00278     if type(width) == str:
00279         try:
00280             if(width.isdigit()):
00281                 width=[string.atoi(width)]
00282             elif(width.count('[') == 1 and width.count(']') == 1):
00283                 width = width.replace('[', '')
00284                 width = width.replace(']', '')
00285                 splitwidth = width.split(',')
00286                 width = []
00287                 for ws in splitwidth:
00288                     if(ws.isdigit()):
00289                         width.append(string.atoi(ws)) 
00290             else:
00291                 width = [1]
00292         except:
00293             raise TypeError, 'parameter width is invalid...using 1'
00294 
00295     if type(correlation) == list:
00296         correlation = ', '.join(correlation)
00297     correlation = correlation.upper()
00298 
00299     if hasattr(combine, '__iter__'):
00300         combine = ', '.join(combine)
00301 
00302     if type(spw) == list:
00303         spw = ','.join([str(s) for s in spw])
00304     elif type(spw) == int:
00305         spw = str(spw)
00306     do_chan_mod = spw.find('^') > -1     # '0:2~11^1' would be pointless.
00307     if not do_chan_mod:                  # ...look in width.
00308         if type(width) == int and width > 1:
00309             do_chan_mod = True
00310         elif hasattr(width, '__iter__'):
00311             for w in width:
00312                 if w > 1:
00313                     do_chan_mod = True
00314                     break
00315 
00316     do_both_chan_and_time_mod = (do_chan_mod and
00317                                  string.atof(timebin[:-1]) > 0.0)
00318     if do_both_chan_and_time_mod:
00319         # Do channel averaging first because it might be included in the spw
00320         # string.
00321         import tempfile
00322         # We want the directory outputvis is in, not /tmp, because /tmp
00323         # might not have enough space.
00324         # outputvis is itself a directory, so strip off a trailing slash if
00325         # it is present.
00326         # I don't know if giving tempfile an absolute directory is necessary -
00327         # dir='' is effectively '.' in Ubuntu.
00328         workingdir = os.path.abspath(os.path.dirname(outputvis.rstrip('/')))
00329         cavms = tempfile.mkdtemp(suffix=outputvis, dir=workingdir)
00330 
00331         casalog.post('Channel averaging to ' + cavms)
00332         if not myms.split(outputms=cavms,     field=field,
00333                           spw=spw,            step=width,
00334                           baseline=antenna,   subarray=array,
00335                           timebin='',         time=timerange,
00336                           whichcol=datacolumn,
00337                           scan=scan,          uvrange=uvrange,
00338                           combine=combine,
00339                           correlation=correlation, intent=intent,
00340                           obs=str(observation)):
00341             myms.close()
00342             if os.path.isdir(cavms):
00343                 import shutil
00344                 shutil.rmtree(cavms)
00345             return False
00346         
00347         # The selection was already made, so blank them before time averaging.
00348         field = ''
00349         spw = ''
00350         width = [1]
00351         antenna = ''
00352         array = ''
00353         timerange = ''
00354         datacolumn = 'all'
00355         scan = ''
00356         intent = ''
00357         uvrange = ''
00358         observation = ''
00359 
00360         myms.close()
00361         myms.open(cavms)
00362         casalog.post('Starting time averaging')
00363 
00364     if keepflags:
00365         taqlstr = ''
00366     else:
00367         taqlstr = 'NOT (FLAG_ROW OR ALL(FLAG))'
00368 
00369     if not myms.split(outputms=outputvis,  field=field,
00370                       spw=spw,             step=width,
00371                       baseline=antenna,    subarray=array,
00372                       timebin=timebin,     time=timerange,
00373                       whichcol=datacolumn,
00374                       scan=scan,           uvrange=uvrange,
00375                       combine=combine,
00376                       correlation=correlation,
00377                       taql=taqlstr, intent=intent,
00378                       obs=str(observation)):
00379         myms.close()
00380         return False
00381     myms.close()
00382 
00383     if do_both_chan_and_time_mod:
00384         import shutil
00385         shutil.rmtree(cavms)
00386 
00387     # Write history to output MS, not the input ms.
00388     try:
00389         param_names = split_core.func_code.co_varnames[:split_core.func_code.co_argcount]
00390         param_vals = [eval(p) for p in param_names]   
00391         retval &= write_history(myms, outputvis, 'split', param_names, param_vals,
00392                                 casalog)
00393     except Exception, instance:
00394         casalog.post("*** Error \'%s\' updating HISTORY" % (instance),
00395                      'WARN')
00396 
00397     # Update FLAG_CMD if necessary.
00398     if ((spw != '') and (spw != '*')) or do_chan_mod:
00399         isopen = False
00400         mytb = tbtool()
00401         try:
00402             mytb.open(outputvis + '/FLAG_CMD', nomodify=False)
00403             isopen = True
00404             nflgcmds = mytb.nrows()
00405             
00406             if nflgcmds > 0:
00407                 mademod = False
00408                 cmds = mytb.getcol('COMMAND')
00409                 widths = {}
00410                 #print "width =", width
00411                 if hasattr(width, 'has_key'):
00412                     widths = width
00413                 else:
00414                     if hasattr(width, '__iter__') and len(width) > 1:
00415                         for i in xrange(len(width)):
00416                             widths[i] = width[i]
00417                     elif width != 1:
00418                         #print 'using myms.msseltoindex + a scalar width'
00419                         nspw = len(myms.msseltoindex(vis=vis,
00420                                                      spw='*')['spw'])
00421                         if hasattr(width, '__iter__'):
00422                             w = width[0]
00423                         else:
00424                             w = width
00425                         for i in xrange(nspw):
00426                             widths[i] = w
00427                 #print 'widths =', widths 
00428                 for rownum in xrange(nflgcmds):
00429                     # Matches a bare number or a string quoted any way.
00430                     spwmatch = re.search(r'spw\s*=\s*(\S+)', cmds[rownum])
00431                     if spwmatch:
00432                         sch1 = spwmatch.groups()[0]
00433                         sch1 = re.sub(r"[\'\"]", '', sch1)  # Dequote
00434                         # Provide a default in case the split selection excludes
00435                         # cmds[rownum].  update_spwchan() will throw an exception
00436                         # in that case.
00437                         cmd = ''
00438                         try:
00439                             #print 'sch1 =', sch1
00440                             sch2 = update_spwchan(vis, spw, sch1, truncate=True,
00441                                                   widths=widths)
00442                             #print 'sch2 =', sch2
00443                             ##print 'spwmatch.group() =', spwmatch.group()
00444                             if sch2:
00445                                 repl = ''
00446                                 if sch2 != '*':
00447                                     repl = "spw='" + sch2 + "'"
00448                                 cmd = cmds[rownum].replace(spwmatch.group(), repl)
00449                         #except: # cmd[rownum] no longer applies.
00450                         except Exception, e:
00451                             casalog.post(
00452                                 "Error %s updating row %d of FLAG_CMD" % (e,
00453                                                                           rownum),
00454                                          'WARN')
00455                             casalog.post('sch1 = ' + sch1, 'DEBUG1')
00456                             casalog.post('cmd = ' + cmd, 'DEBUG1')
00457                         if cmd != cmds[rownum]:
00458                             mademod = True
00459                             cmds[rownum] = cmd
00460                 if mademod:
00461                     casalog.post('Updating FLAG_CMD', 'INFO')
00462                     mytb.putcol('COMMAND', cmds)
00463 
00464             mytb.close()
00465 
00466             
00467         except Exception, instance:
00468             if isopen:
00469                 mytb.close()
00470             myms = None
00471             mytb = None
00472             casalog.post("*** Error \'%s\' updating FLAG_CMD" % (instance),
00473                          'SEVERE')
00474             return False
00475 
00476     myms = None
00477     mytb = None
00478 
00479     return retval