00001
00002 from taskinit import *
00003 import os
00004 import copy
00005 import shutil
00006 import simple_cluster
00007 import partitionhelper as ph
00008
00009
00010 import traceback
00011
00012 class ParallelTaskHelper:
00013 """
00014 This is the extension of the TaskHelper to allow for parallel
00015 operation. For simple tasks all that should be required to make
00016 a task parallel is to use this rather than the TaskHelper method
00017 above
00018 """
00019
00020 __bypass_parallel_processing = 0
00021
00022 def __init__(self, task_name, args = {}):
00023 self._arg = args
00024 self._arguser = {}
00025 self._taskName = task_name
00026 self._executionList = []
00027 self._jobQueue = None
00028
00029 self.__originalParams = args
00030
00031 if (self.__bypass_parallel_processing == 0):
00032 self._cluster = simple_cluster.simple_cluster.getCluster()
00033 else:
00034 self._cluster = None
00035
00036 self._consolidateOutput = True
00037
00038 self._sequential_return_list = {}
00039
00040 def override_arg(self,arg,value):
00041 self._arguser[arg] = value
00042
00043 def initialize(self):
00044 """
00045 This is the setup portion.
00046 Currently it:
00047 * Finds the full path for the input vis.
00048 """
00049 self._arg['vis'] = os.path.abspath(self._arg['vis'])
00050
00051 def generateJobs(self):
00052 """
00053 This is the method which generates all of the actual jobs to be
00054 done. The default is to asume the input vis is a reference ms and
00055 build one job for each referenced ms.
00056 """
00057
00058 casalog.origin("ParallelTaskHelper")
00059
00060 try:
00061 msTool = mstool()
00062 if not msTool.open(self._arg['vis']):
00063 raise ValueError, "Unable to open MS %s," % self._arg['vis']
00064 if not msTool.ismultims():
00065 raise ValueError, \
00066 "MS is not a MultiMS, simple parallelization failed"
00067
00068 subMs_idx = 0
00069 for subMS in msTool.getreferencedtables():
00070 localArgs = copy.copy(self._arg)
00071 localArgs['vis'] = subMS
00072
00073 for key in self._arguser:
00074 localArgs[key] = self._arguser[key][subMs_idx]
00075 subMs_idx += 1
00076
00077 self._executionList.append(
00078 simple_cluster.JobData(self._taskName,localArgs))
00079
00080 msTool.close()
00081 return True
00082 except Exception, instance:
00083 casalog.post("Error handling MMS %s: %s" % (self._arg['vis'],instance),"WARN","generateJobs")
00084 msTool.close()
00085 return False
00086
00087
00088 def executeJobs(self):
00089
00090 casalog.origin("ParallelTaskHelper")
00091
00092
00093 if (self.__bypass_parallel_processing == 1):
00094 for job in self._executionList:
00095 parameters = job.getCommandArguments()
00096 try:
00097 exec("from taskinit import *; from tasks import *; " + job.getCommandLine())
00098
00099 if (parameters.has_key('outputvis')):
00100 self._sequential_return_list[parameters['outputvis']] = returnVar0
00101 else:
00102 self._sequential_return_list[parameters['vis']] = returnVar0
00103 except Exception, instance:
00104 str_instance = str(instance)
00105 if (string.find(str_instance,"NullSelection") == 0):
00106 casalog.post("Error running task sequentially %s: %s" % (job.getCommandLine(),str_instance),"WARN","executeJobs")
00107 traceback.print_tb(sys.exc_info()[2])
00108 else:
00109 casalog.post("Ignoring NullSelection error from %s" % (parameters['vis']),"INFO","executeJobs")
00110 self._executionList = []
00111 else:
00112 self._jobQueue = simple_cluster.JobQueueManager(self._cluster)
00113 self._jobQueue.addJob(self._executionList)
00114 self._jobQueue.executeQueue()
00115
00116 def postExecution(self):
00117
00118 casalog.origin("ParallelTaskHelper")
00119
00120 ret_list = {}
00121 if (self.__bypass_parallel_processing==1):
00122 ret_list = self._sequential_return_list
00123 self._sequential_return_list = {}
00124 elif (self._cluster != None):
00125 ret_list = self._cluster.get_return_list()
00126 else:
00127 return None
00128
00129 index = 0
00130 if isinstance(ret_list.values()[0],bool) and self._consolidateOutput:
00131 retval = True
00132 for subMs in ret_list:
00133 if not ret_list[subMs]:
00134 casalog.post("%s failed for sub-MS %s" % (self._taskName,subMs),"WARN","postExecution")
00135 retval = False
00136 index += 1
00137 return retval
00138 elif isinstance(ret_list.values()[0],dict) and self._consolidateOutput:
00139 ret_dict = {}
00140 for subMs in ret_list:
00141 dict_i = ret_list[subMs]
00142
00143 if isinstance(dict_i,dict):
00144 try:
00145 ret_dict = self.sum_dictionaries(dict_i,ret_dict)
00146 except Exception, instance:
00147 casalog.post("Error post processing MMS results %s: %s" % (subMs,instance),"WARN","postExecution")
00148 return ret_dict
00149 elif (ret_list.values()[0]==None) and self._consolidateOutput:
00150 return None
00151 else:
00152 return ret_list
00153
00154
00155 def sum_dictionaries(self,dict_list,ret_dict):
00156 for key in dict_list:
00157 item = dict_list[key]
00158 if isinstance(item,dict):
00159 if ret_dict.has_key(key):
00160 ret_dict[key] = self.sum_dictionaries(item,ret_dict[key])
00161 else:
00162 ret_dict[key] = self.sum_dictionaries(item,{})
00163 else:
00164 if ret_dict.has_key(key):
00165 if not isinstance(ret_dict[key],str):
00166 ret_dict[key] += item
00167 else:
00168 ret_dict[key] = item
00169 return ret_dict
00170
00171 def go(self):
00172
00173 casalog.origin("ParallelTaskHelper")
00174
00175 self.initialize()
00176 if (self.generateJobs()):
00177 self.executeJobs()
00178 try:
00179 retVar = self.postExecution()
00180 except Exception, instance:
00181 casalog.post("Error post processing MMS results %s: %s" % (self._arg['vis'],instance),"WARN","go")
00182 traceback.print_tb(sys.exc_info()[2])
00183 return False
00184 else:
00185 retVar = False
00186
00187
00188 casalog.origin(self._taskName)
00189
00190 return retVar
00191
00192 @staticmethod
00193 def getReferencedMSs(vis):
00194
00195 msTool = mstool()
00196 if not msTool.open(vis):
00197 raise ValueError, "Unable to open MS %s." % vis
00198
00199 if not msTool.ismultims():
00200 raise ValueError, "MS %s is not a reference MS." % vis
00201
00202 rtnValue = msTool.getreferencedtables()
00203 if not isinstance(rtnValue, list):
00204 rtnValue = [rtnValue]
00205
00206 msTool.close()
00207 return rtnValue
00208
00209
00210 @staticmethod
00211 def restoreSubtableAgreement(vis, mastersubms='', subtables=[]):
00212 """
00213 Tidy up the MMS vis by replacing the subtables of all SubMSs
00214 by the subtables from the SubMS given by "mastersubms".
00215 If specified, only the subtables in the list "subtables"
00216 are replaced, otherwise all.
00217 If "mastersubms" is not given, the first SubMS of the MMS
00218 will be used as master.
00219 """
00220
00221 msTool = mstool();
00222 msTool.open(vis)
00223 theSubMSs = msTool.getreferencedtables()
00224 msTool.close()
00225
00226 tbTool = tbtool();
00227
00228 if mastersubms=='':
00229 tbTool.open(vis)
00230 myKeyw = tbTool.getkeywords()
00231 tbTool.close()
00232 mastersubms=os.path.dirname(myKeyw['ANTENNA'].split(' ')[1])
00233
00234 mastersubms = os.path.abspath(mastersubms)
00235
00236 theSubTables = ph.getSubtables(mastersubms)
00237
00238 if subtables==[]:
00239 subtables=theSubTables
00240 else:
00241 for s in subtables:
00242 if not (s in theSubTables):
00243 raise ValueError, s+' is not a subtable of '+ mastersubms
00244
00245 origpath = os.getcwd()
00246 masterbase = os.path.basename(mastersubms)
00247
00248 for r in theSubMSs:
00249 rbase = os.path.basename(r)
00250 if not rbase==masterbase:
00251 for s in subtables:
00252 theSubTab = r+'/'+s
00253 if os.path.islink(theSubTab):
00254 if(os.path.basename(os.path.dirname(os.path.realpath(theSubTab)))!=masterbase):
00255
00256 os.chdir(r)
00257 shutil.rmtree(s, ignore_errors=True)
00258 os.symlink('../'+masterbase+'/'+s, s)
00259 os.chdir(origpath)
00260 else:
00261 shutil.rmtree(theSubTab, ignore_errors=True)
00262 shutil.copytree(mastersubms+'/'+s, theSubTab)
00263
00264 return True
00265
00266 @staticmethod
00267 def bypassParallelProcessing(switch=1):
00268 """
00269 # jagonzal (CAS-4287): Add a cluster-less mode to by-pass parallel processing for MMSs as requested
00270 switch=1 => Process each sub-Ms sequentially
00271 switch=2 => Process the MMS as a normal MS
00272 """
00273 ParallelTaskHelper.__bypass_parallel_processing = switch
00274
00275 @staticmethod
00276 def getBypassParallelProcessing():
00277 """
00278 # jagonzal (CAS-4287): Add a cluster-less mode to by-pass parallel processing for MMSs as requested
00279 switch=1 => Process each sub-Ms sequentially
00280 switch=2 => Process the MMS as a normal MS
00281 """
00282 return ParallelTaskHelper.__bypass_parallel_processing
00283
00284 @staticmethod
00285 def isParallelMS(vis):
00286 """
00287 This method will let us know if we can do the simple form
00288 of parallelization by invoking on many referenced mss.
00289 """
00290
00291
00292 if (ParallelTaskHelper.__bypass_parallel_processing == 2):
00293 return False
00294
00295 msTool = mstool()
00296 if not msTool.open(vis):
00297 raise ValueError, "Unable to open MS %s," % vis
00298 rtnVal = msTool.ismultims() and \
00299 isinstance(msTool.getreferencedtables(), list)
00300
00301 msTool.close()
00302 return rtnVal
00303
00304 @staticmethod
00305 def findAbsPath(input):
00306 if isinstance(input,str):
00307 return os.path.abspath(input)
00308
00309 if isinstance(input, list):
00310 rtnValue = []
00311 for file_i in input:
00312 rtnValue.append(os.path.abspath(file_i))
00313 return rtnValue
00314
00315
00316 return input
00317
00318 @staticmethod
00319 def listToCasaString(inputList):
00320 """
00321 This Method will take a list of integers and try to express them as a
00322 compact set using the CASA notation.
00323 """
00324 if inputList is None or len(inputList) == 0:
00325 return ''
00326
00327 def selectionString(rangeStart, rangeEnd):
00328 if rangeStart == rangeEnd:
00329 return str(rangeStart)
00330 return "%d~%d" % (rangeStart, rangeEnd)
00331
00332 inputList.sort()
00333 compactStrings = []
00334 rangeStart = inputList[0]
00335 lastValue = inputList[0]
00336 for val in inputList[1:]:
00337 if val > lastValue + 1:
00338 compactStrings.append(selectionString(rangeStart,lastValue))
00339 rangeStart = val
00340 lastValue = val
00341 compactStrings.append(selectionString(rangeStart,lastValue))
00342
00343 return ','.join([a for a in compactStrings])