casa
$Rev:20696$
|
00001 # 00002 # This file was generated using xslt from its XML file 00003 # 00004 # Copyright 2008, Associated Universities Inc., Washington DC 00005 # 00006 import sys 00007 import os 00008 #from casac import * 00009 import casac 00010 import string 00011 import time 00012 import inspect 00013 import gc 00014 import numpy 00015 from odict import odict 00016 from taskmanager import tm 00017 from task_partition import partition 00018 class partition_cli_: 00019 __name__ = "partition" 00020 __async__ = {} 00021 rkey = None 00022 i_am_a_casapy_task = None 00023 # The existence of the i_am_a_casapy_task attribute allows help() 00024 # (and other) to treat casapy tasks as a special case. 00025 00026 def __init__(self) : 00027 self.__bases__ = (partition_cli_,) 00028 self.__doc__ = self.__call__.__doc__ 00029 00030 self.parameters={'vis':None, 'outputvis':None, 'createmms':None, 'separationaxis':None, 'numsubms':None, 'datacolumn':None, 'calmsselection':None, 'calmsname':None, 'calfield':None, 'calscan':None, 'calintent':None, 'field':None, 'spw':None, 'antenna':None, 'timebin':None, 'combine':None, 'timerange':None, 'scan':None, 'scanintent':None, 'array':None, 'uvrange':None, 'observation':None, 'async':None} 00031 00032 00033 def result(self, key=None): 00034 #### here we will scan the task-ids in __async__ 00035 #### and add any that have completed... 00036 if key is not None and self.__async__.has_key(key) and self.__async__[key] is not None: 00037 ret = tm.retrieve(self.__async__[key]) 00038 if ret['state'] == "done" : 00039 self.__async__[key] = None 00040 elif ret['state'] == 'crashed' : 00041 self.__async__[key] = None 00042 return ret 00043 return None 00044 00045 00046 def __call__(self, vis=None, outputvis=None, createmms=None, separationaxis=None, numsubms=None, datacolumn=None, calmsselection=None, calmsname=None, calfield=None, calscan=None, calintent=None, field=None, spw=None, antenna=None, timebin=None, combine=None, timerange=None, scan=None, scanintent=None, array=None, uvrange=None, observation=None, async=None): 00047 00048 """Experimental task to produce multi-MSs using parallelism 00049 Experimental version for use with parallelization framework. 00050 Do not use this for standard analysis... yet. 00051 00052 Partition is a task to create a multi-MS out of an MS. General selection 00053 parameters are included, and one or all of the various data columns 00054 (DATA, LAG_DATA and/or FLOAT_DATA, and possibly MODEL_DATA and/or 00055 CORRECTED_DATA) can be selected. It can also be used to create a normal 00056 MS, split based on the given data selection parameters. 00057 00058 The partition task creates a multi-MS using the parallelization framework. 00059 If a cluster is not present, it will create a default cluster based on the 00060 resources of the system. One can create a simple_cluster prior to running 00061 partition by doing the following. 00062 00063 from simple_cluster import * 00064 sc = simple_cluster() 00065 sc.init_cluster('cluster-config.txt', 'test') 00066 00067 The file 'cluster-config.txt' contains information on the machine that will 00068 be used for the cluster. Please see the help of simple_cluster for more information. 00069 00070 A multi-MS is structured to have a reference MS on the top directory and a 00071 sub-directory called SUBMSS, which contain each partitioned sub-MS. The 00072 reference MS contains links to the sub-tables of the first sub-MS. The other 00073 sub-MSs contain a copy of the sub-tables each. A multi-MS looks like this in disk. 00074 00075 ls ngc5921.mms 00076 ANTENNA FLAG_CMD POLARIZATION SPECTRAL_WINDOW table.dat 00077 DATA_DESCRIPTION HISTORY PROCESSOR STATE table.info 00078 FEED OBSERVATION SORTED_TABLE SUBMSS WEATHER 00079 FIELD POINTING SOURCE SYSCAL 00080 00081 ls ngc5921.mms/SUBMSS/ 00082 ngc5921.0000.ms/ ngc5921t.0002.ms/ ngc5921.0004.ms/ ngc5921.0006.ms/ 00083 ngc5921.0001.ms/ ngc5921.0003.ms/ ngc5921.0005.ms/ 00084 00085 Inside casapy, one can use the task listpartition to list the information 00086 from a multi-MS. 00087 00088 00089 Keyword arguments: 00090 vis -- Name of input visibility file 00091 default: none; example: vis='ngc5921.ms' 00092 00093 outputvis -- Name of output visibility file 00094 default: none; example: outputvis='ngc5921.mms' 00095 00096 createmms -- Create a multi-MS as the output. 00097 default: True 00098 If False, it will work like the split task and create a 00099 normal MS, split according to the given data selection parameters. 00100 Note that, when this parameter is set to False, a simple_cluster 00101 will not be created. 00102 00103 separationaxis -- Axis to do parallelization across. 00104 default: 'scan' 00105 Options: 'scan', 'spw', 'both' 00106 The 'both' option will try to partition in both scan and spw axes. 00107 00108 numsubms -- The number of sub-MSs to create. 00109 default: 64 00110 00111 00112 datacolumn -- Which data column to use when partitioning. 00113 default='data'; example: datacolumn='data' 00114 Options: 'data', 'model', 'corrected', 'all', 00115 'float_data', 'lag_data', 'float_data,data', and 00116 'lag_data,data'. 00117 N.B.: 'all' = whichever of the above that are present. 00118 Otherwise the selected column will go to DATA (or 00119 FLOAT_DATA) in the output. 00120 00121 calmsselection -- Method by which the calibration scans will be identified 00122 when creating a separate calibration MS. 00123 default:'none' 00124 Options: 'none', 'auto', 'manual'. 00125 00126 'auto' -- 00127 calmsname -- Name of output measurement set. 00128 default = ' ' 00129 00130 'manual' 00131 calmsname -- Name of output measurement set. 00132 default: '' 00133 calfield -- Field selection for calibration MS. 00134 default: '' 00135 calscans -- Scan selection for calibration MS. 00136 default: '' 00137 calintent -- Scan intent selection for calibration MS. 00138 default: '' 00139 00140 --- Data selection parameters (see help par.selectdata for more detailed 00141 information) 00142 00143 field -- Select field using field id(s) or field name(s). 00144 [run listobs to obtain the list iof d's or names] 00145 default: ''=all fields If field string is a non-negative 00146 integer, it is assumed to be a field index 00147 otherwise, it is assumed to be a field name 00148 field='0~2'; field ids 0,1,2 00149 field='0,4,5~7'; field ids 0,4,5,6,7 00150 field='3C286,3C295'; fields named 3C286 and 3C295 00151 field = '3,4C*'; field id 3, all names starting with 4C 00152 00153 spw -- Select spectral window/channels 00154 default: ''=all spectral windows and channels 00155 spw='0~2,4'; spectral windows 0,1,2,4 (all channels) 00156 spw='<2'; spectral windows less than 2 (i.e. 0,1) 00157 spw='0:5~61'; spw 0, channels 5 to 61 00158 spw='0,10,3:3~45'; spw 0,10 all channels, spw 3 - chans 3 to 45. 00159 spw='0~2:2~6'; spw 0,1,2 with channels 2 through 6 in each. 00160 spw = '*:3~64' channels 3 through 64 for all sp id's 00161 spw = ' :3~64' will NOT work. 00162 spw = '*:0;60~63' channel 0 and channels 60 to 63 for all IFs 00163 ';' needed to separate different channel ranges in one spw 00164 spw='0:0~10;15~60'; spectral window 0 with channels 0-10,15-60 00165 spw='0:0~10,1:20~30,2:1;2;4'; spw 0, channels 0-10, 00166 spw 1, channels 20-30, and spw 2, channels, 1, 2 and 4 00167 00168 antenna -- Select data based on antenna/baseline 00169 default: '' (all) 00170 Non-negative integers are assumed to be antenna indices, and 00171 anything else is taken as an antenna name. 00172 00173 Examples: 00174 antenna='5&6': baseline between antenna index 5 and index 6. 00175 antenna='VA05&VA06': baseline between VLA antenna 5 and 6. 00176 antenna='5&6;7&8': baselines 5-6 and 7-8 00177 antenna='5': all baselines with antenna 5 00178 antenna='5,6,10': all baselines including antennas 5, 6, or 10 00179 antenna='5,6,10&': all baselines with *only* antennas 5, 6, or 00180 10. (cross-correlations only. Use && 00181 to include autocorrelations, and &&& 00182 to get only autocorrelations.) 00183 antenna='!ea03,ea12,ea17': all baselines except those that 00184 include EVLA antennas ea03, ea12, or 00185 ea17. 00186 00187 timebin -- Interval width for time averaging. 00188 default: '0s' or '-1s' (no averaging) 00189 Example: timebin='30s' 00190 '10' means '10s' 00191 00192 combine -- Let time bins span changes in scan and/or state. 00193 default = '' (separate time bins by both of the above) 00194 combine = 'scan': Can be useful when the scan number 00195 goes up with each integration, 00196 as in many WSRT MSes. 00197 combine = ['scan', 'state']: disregard scan and state 00198 numbers when time averaging. 00199 combine = 'state,scan': Same as above. 00200 00201 timerange -- Select data based on time range: 00202 default = '' (all); examples, 00203 timerange = 'YYYY/MM/DD/hh:mm:ss~YYYY/MM/DD/hh:mm:ss' 00204 Note: if YYYY/MM/DD is missing date, timerange defaults to the 00205 first day in the dataset 00206 timerange='09:14:0~09:54:0' picks 40 min on first day 00207 timerange='25:00:00~27:30:00' picks 1 hr to 3 hr 30min 00208 on next day 00209 timerange='09:44:00' data within one integration of time 00210 timerange='>10:24:00' data after this time 00211 00212 array -- (Sub)array number range 00213 default: ''=all 00214 00215 uvrange -- Select data within uvrange (default units meters) 00216 default: ''=all; example: 00217 uvrange='0~1000klambda'; uvrange from 0-1000 kilo-lambda 00218 uvrange='>4klambda';uvranges greater than 4 kilo-lambda 00219 uvrange='0~1000km'; uvrange in kilometers 00220 00221 scan -- Scan number range 00222 default: ''=all 00223 00224 observation -- Select by observation ID(s) 00225 default: ''=all 00226 00227 00228 00229 """ 00230 if not hasattr(self, "__globals__") or self.__globals__ == None : 00231 self.__globals__=sys._getframe(len(inspect.stack())-1).f_globals 00232 #casac = self.__globals__['casac'] 00233 casalog = self.__globals__['casalog'] 00234 #casalog = casac.casac.logsink() 00235 self.__globals__['__last_task'] = 'partition' 00236 self.__globals__['taskname'] = 'partition' 00237 ### 00238 self.__globals__['update_params'](func=self.__globals__['taskname'],printtext=False,ipython_globals=self.__globals__) 00239 ### 00240 ### 00241 #Handle globals or user over-ride of arguments 00242 # 00243 function_signature_defaults=dict(zip(self.__call__.func_code.co_varnames,self.__call__.func_defaults)) 00244 useLocalDefaults = False 00245 00246 for item in function_signature_defaults.iteritems(): 00247 key,val = item 00248 keyVal = eval(key) 00249 if (keyVal == None): 00250 #user hasn't set it - use global/default 00251 pass 00252 else: 00253 #user has set it - use over-ride 00254 if (key != 'self') : 00255 useLocalDefaults = True 00256 00257 myparams = {} 00258 if useLocalDefaults : 00259 for item in function_signature_defaults.iteritems(): 00260 key,val = item 00261 keyVal = eval(key) 00262 exec('myparams[key] = keyVal') 00263 self.parameters[key] = keyVal 00264 if (keyVal == None): 00265 exec('myparams[key] = '+ key + ' = self.itsdefault(key)') 00266 keyVal = eval(key) 00267 if(type(keyVal) == dict) : 00268 if len(keyVal) > 0 : 00269 exec('myparams[key] = ' + key + ' = keyVal[len(keyVal)-1][\'value\']') 00270 else : 00271 exec('myparams[key] = ' + key + ' = {}') 00272 00273 else : 00274 async = self.parameters['async'] 00275 myparams['vis'] = vis = self.parameters['vis'] 00276 myparams['outputvis'] = outputvis = self.parameters['outputvis'] 00277 myparams['createmms'] = createmms = self.parameters['createmms'] 00278 myparams['separationaxis'] = separationaxis = self.parameters['separationaxis'] 00279 myparams['numsubms'] = numsubms = self.parameters['numsubms'] 00280 myparams['datacolumn'] = datacolumn = self.parameters['datacolumn'] 00281 myparams['calmsselection'] = calmsselection = self.parameters['calmsselection'] 00282 myparams['calmsname'] = calmsname = self.parameters['calmsname'] 00283 myparams['calfield'] = calfield = self.parameters['calfield'] 00284 myparams['calscan'] = calscan = self.parameters['calscan'] 00285 myparams['calintent'] = calintent = self.parameters['calintent'] 00286 myparams['field'] = field = self.parameters['field'] 00287 myparams['spw'] = spw = self.parameters['spw'] 00288 myparams['antenna'] = antenna = self.parameters['antenna'] 00289 myparams['timebin'] = timebin = self.parameters['timebin'] 00290 myparams['combine'] = combine = self.parameters['combine'] 00291 myparams['timerange'] = timerange = self.parameters['timerange'] 00292 myparams['scan'] = scan = self.parameters['scan'] 00293 myparams['scanintent'] = scanintent = self.parameters['scanintent'] 00294 myparams['array'] = array = self.parameters['array'] 00295 myparams['uvrange'] = uvrange = self.parameters['uvrange'] 00296 myparams['observation'] = observation = self.parameters['observation'] 00297 00298 00299 result = None 00300 00301 # 00302 # The following is work around to avoid a bug with current python translation 00303 # 00304 mytmp = {} 00305 00306 mytmp['vis'] = vis 00307 mytmp['outputvis'] = outputvis 00308 mytmp['createmms'] = createmms 00309 mytmp['separationaxis'] = separationaxis 00310 mytmp['numsubms'] = numsubms 00311 mytmp['datacolumn'] = datacolumn 00312 mytmp['calmsselection'] = calmsselection 00313 mytmp['calmsname'] = calmsname 00314 mytmp['calfield'] = calfield 00315 mytmp['calscan'] = calscan 00316 mytmp['calintent'] = calintent 00317 mytmp['field'] = field 00318 mytmp['spw'] = spw 00319 mytmp['antenna'] = antenna 00320 mytmp['timebin'] = timebin 00321 mytmp['combine'] = combine 00322 mytmp['timerange'] = timerange 00323 mytmp['scan'] = scan 00324 mytmp['scanintent'] = scanintent 00325 mytmp['array'] = array 00326 mytmp['uvrange'] = uvrange 00327 mytmp['observation'] = observation 00328 pathname='file:///'+os.environ.get('CASAPATH').split()[0]+'/share/xml/' 00329 trec = casac.casac.utils().torecord(pathname+'partition.xml') 00330 00331 casalog.origin('partition') 00332 try : 00333 #if not trec.has_key('partition') or not casac.casac.utils().verify(mytmp, trec['partition']) : 00334 #return False 00335 00336 casac.casac.utils().verify(mytmp, trec['partition'], True) 00337 scriptstr=[''] 00338 saveinputs = self.__globals__['saveinputs'] 00339 saveinputs('partition', 'partition.last', myparams, self.__globals__,scriptstr=scriptstr) 00340 if async : 00341 count = 0 00342 keybase = time.strftime("%y%m%d.%H%M%S") 00343 key = keybase + "_" + str(count) 00344 while self.__async__.has_key(key) : 00345 count += 1 00346 key = keybase + "_" + str(count) 00347 result = tm.execute('partition', vis, outputvis, createmms, separationaxis, numsubms, datacolumn, calmsselection, calmsname, calfield, calscan, calintent, field, spw, antenna, timebin, combine, timerange, scan, scanintent, array, uvrange, observation) 00348 print "Use: " 00349 print " tm.retrieve(return_value) # to retrieve the status" 00350 print 00351 self.rkey = key 00352 self.__async__[key] = result 00353 else : 00354 tname = 'partition' 00355 spaces = ' '*(18-len(tname)) 00356 casalog.post('\n##########################################'+ 00357 '\n##### Begin Task: ' + tname + spaces + ' #####') 00358 casalog.post(scriptstr[1][1:]+'\n', 'INFO') 00359 result = partition(vis, outputvis, createmms, separationaxis, numsubms, datacolumn, calmsselection, calmsname, calfield, calscan, calintent, field, spw, antenna, timebin, combine, timerange, scan, scanintent, array, uvrange, observation) 00360 casalog.post('##### End Task: ' + tname + ' ' + spaces + ' #####'+ 00361 '\n##########################################') 00362 00363 except Exception, instance: 00364 if(self.__globals__.has_key('__rethrow_casa_exceptions') and self.__globals__['__rethrow_casa_exceptions']) : 00365 raise 00366 else : 00367 #print '**** Error **** ',instance 00368 tname = 'partition' 00369 casalog.post('An error occurred running task '+tname+'.', 'ERROR') 00370 pass 00371 00372 gc.collect() 00373 return result 00374 # 00375 # 00376 # 00377 def paramgui(self, useGlobals=True, ipython_globals=None): 00378 """ 00379 Opens a parameter GUI for this task. If useGlobals is true, then any relevant global parameter settings are used. 00380 """ 00381 import paramgui 00382 if not hasattr(self, "__globals__") or self.__globals__ == None : 00383 self.__globals__=sys._getframe(len(inspect.stack())-1).f_globals 00384 00385 if useGlobals: 00386 if ipython_globals == None: 00387 myf=self.__globals__ 00388 else: 00389 myf=ipython_globals 00390 00391 paramgui.setGlobals(myf) 00392 else: 00393 paramgui.setGlobals({}) 00394 00395 paramgui.runTask('partition', myf['_ip']) 00396 paramgui.setGlobals({}) 00397 00398 # 00399 # 00400 # 00401 def defaults(self, param=None, ipython_globals=None, paramvalue=None, subparam=None): 00402 if not hasattr(self, "__globals__") or self.__globals__ == None : 00403 self.__globals__=sys._getframe(len(inspect.stack())-1).f_globals 00404 if ipython_globals == None: 00405 myf=self.__globals__ 00406 else: 00407 myf=ipython_globals 00408 00409 a = odict() 00410 a['vis'] = '' 00411 a['outputvis'] = '' 00412 a['createmms'] = True 00413 a['datacolumn'] = 'data' 00414 a['calmsselection'] = 'none' 00415 a['field'] = '' 00416 a['spw'] = '' 00417 a['antenna'] = '' 00418 a['timebin'] = '0s' 00419 a['timerange'] = '' 00420 a['scan'] = '' 00421 a['scanintent'] = '' 00422 a['array'] = '' 00423 a['uvrange'] = '' 00424 a['observation'] = '' 00425 00426 a['async']=False 00427 a['timebin'] = { 00428 0:odict([{'notvalue':'0s'}, {'combine':''}])} 00429 a['createmms'] = { 00430 0:odict([{'value':True}, {'separationaxis':'scan'}, {'numsubms':64}]), 00431 1:{'value':False}} 00432 a['calmsselection'] = { 00433 0:{'value':'none'}, 00434 1:odict([{'value':'auto'}, {'calmsname':''}]), 00435 2:odict([{'value':'manual'}, {'calmsname':''}, {'calfield':''}, {'calscan':''}, {'calintent':''}])} 00436 00437 ### This function sets the default values but also will return the list of 00438 ### parameters or the default value of a given parameter 00439 if(param == None): 00440 myf['__set_default_parameters'](a) 00441 elif(param == 'paramkeys'): 00442 return a.keys() 00443 else: 00444 if(paramvalue==None and subparam==None): 00445 if(a.has_key(param)): 00446 return a[param] 00447 else: 00448 return self.itsdefault(param) 00449 else: 00450 retval=a[param] 00451 if(type(a[param])==dict): 00452 for k in range(len(a[param])): 00453 valornotval='value' 00454 if(a[param][k].has_key('notvalue')): 00455 valornotval='notvalue' 00456 if((a[param][k][valornotval])==paramvalue): 00457 retval=a[param][k].copy() 00458 retval.pop(valornotval) 00459 if(subparam != None): 00460 if(retval.has_key(subparam)): 00461 retval=retval[subparam] 00462 else: 00463 retval=self.itsdefault(subparam) 00464 else: 00465 retval=self.itsdefault(subparam) 00466 return retval 00467 00468 00469 # 00470 # 00471 def check_params(self, param=None, value=None, ipython_globals=None): 00472 if ipython_globals == None: 00473 myf=self.__globals__ 00474 else: 00475 myf=ipython_globals 00476 # print 'param:', param, 'value:', value 00477 try : 00478 if str(type(value)) != "<type 'instance'>" : 00479 value0 = value 00480 value = myf['cu'].expandparam(param, value) 00481 matchtype = False 00482 if(type(value) == numpy.ndarray): 00483 if(type(value) == type(value0)): 00484 myf[param] = value.tolist() 00485 else: 00486 #print 'value:', value, 'value0:', value0 00487 #print 'type(value):', type(value), 'type(value0):', type(value0) 00488 myf[param] = value0 00489 if type(value0) != list : 00490 matchtype = True 00491 else : 00492 myf[param] = value 00493 value = myf['cu'].verifyparam({param:value}) 00494 if matchtype: 00495 value = False 00496 except Exception, instance: 00497 #ignore the exception and just return it unchecked 00498 myf[param] = value 00499 return value 00500 # 00501 # 00502 def description(self, key='partition', subkey=None): 00503 desc={'partition': 'Experimental task to produce multi-MSs using parallelism', 00504 'vis': 'Name of input measurement set', 00505 'outputvis': 'Name of output measurement set', 00506 'createmms': 'Should this create a multi-MS output', 00507 'separationaxis': 'Axis to do parallelization across(scan,spw,both)', 00508 'numsubms': 'The number of SubMSs to create', 00509 'datacolumn': 'Which data column(s) to split out', 00510 'calmsselection': 'Cal Data Selection (\'none\', \'auto\', \'manual\')', 00511 'calmsname': 'Name of output measurement set', 00512 'calfield': 'Field Selection for calibration ms', 00513 'calscan': 'Select data by scan numbers', 00514 'calintent': 'Select data by scan intent', 00515 'field': 'Select field using ID(s) or name(s)', 00516 'spw': 'Select spectral window/channels', 00517 'antenna': 'Select data based on antenna/baseline', 00518 'timebin': 'Bin width for time averaging', 00519 'combine': 'Let time bins span changes in scan and/or state', 00520 'timerange': 'Select data by time range', 00521 'scan': 'Select data by scan numbers', 00522 'scanintent': 'Select data by scan intent', 00523 'array': 'Select (sub)array(s) by array ID number', 00524 'uvrange': 'Select data by baseline length', 00525 'observation': 'Select by observation ID(s)', 00526 00527 'async': 'If true the taskname must be started using partition(...)' 00528 } 00529 00530 # 00531 # Set subfields defaults if needed 00532 # 00533 00534 if(desc.has_key(key)) : 00535 return desc[key] 00536 00537 def itsdefault(self, paramname) : 00538 a = {} 00539 a['vis'] = '' 00540 a['outputvis'] = '' 00541 a['createmms'] = True 00542 a['separationaxis'] = 'scan' 00543 a['numsubms'] = 64 00544 a['datacolumn'] = 'data' 00545 a['calmsselection'] = 'none' 00546 a['calmsname'] = '' 00547 a['calfield'] = '' 00548 a['calscan'] = '' 00549 a['calintent'] = '' 00550 a['field'] = '' 00551 a['spw'] = '' 00552 a['antenna'] = '' 00553 a['timebin'] = '0s' 00554 a['combine'] = '' 00555 a['timerange'] = '' 00556 a['scan'] = '' 00557 a['scanintent'] = '' 00558 a['array'] = '' 00559 a['uvrange'] = '' 00560 a['observation'] = '' 00561 00562 #a = sys._getframe(len(inspect.stack())-1).f_globals 00563 00564 if self.parameters['timebin'] != '0s': 00565 a['combine'] = '' 00566 00567 if self.parameters['createmms'] == True: 00568 a['separationaxis'] = 'scan' 00569 a['numsubms'] = 64 00570 00571 if self.parameters['calmsselection'] == 'auto': 00572 a['calmsname'] = '' 00573 00574 if self.parameters['calmsselection'] == 'manual': 00575 a['calmsname'] = '' 00576 a['calfield'] = '' 00577 a['calscan'] = '' 00578 a['calintent'] = '' 00579 00580 if a.has_key(paramname) : 00581 return a[paramname] 00582 partition_cli = partition_cli_()