casa  $Rev:20696$
 All Classes Namespaces Files Functions Variables
parallel_task_helper.py
Go to the documentation of this file.
00001 #!/usr/bin/env python
00002 from taskinit import *
00003 import os
00004 import copy
00005 import shutil
00006 import simple_cluster
00007 import partitionhelper as ph
00008 
00009 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
00010 import traceback
00011 
00012 class ParallelTaskHelper:
00013     """
00014     This is the extension of the TaskHelper to allow for parallel
00015     operation.  For simple tasks all that should be required to make
00016     a task parallel is to use this rather than the TaskHelper method
00017     above
00018     """
00019     
00020     __bypass_parallel_processing = 0
00021     
00022     def __init__(self, task_name, args = {}):
00023         self._arg = args
00024         self._arguser = {}
00025         self._taskName = task_name
00026         self._executionList = []
00027         self._jobQueue = None
00028         # Cache the initial inputs
00029         self.__originalParams = args
00030         # jagonzal: Add reference to cluster object
00031         if (self.__bypass_parallel_processing == 0):
00032             self._cluster = simple_cluster.simple_cluster.getCluster()
00033         else:
00034             self._cluster = None
00035         # jagonzal: To inhibit return values consolidation
00036         self._consolidateOutput = True
00037         # jagonzal (CAS-4287): Add a cluster-less mode to by-pass parallel processing for MMSs as requested 
00038         self._sequential_return_list = {}
00039         
00040     def override_arg(self,arg,value):
00041         self._arguser[arg] = value
00042 
00043     def initialize(self):
00044         """
00045         This is the setup portion.
00046         Currently it:
00047            * Finds the full path for the input vis.
00048         """
00049         self._arg['vis'] = os.path.abspath(self._arg['vis'])
00050 
00051     def generateJobs(self):
00052         """
00053         This is the method which generates all of the actual jobs to be
00054         done.  The default is to asume the input vis is a reference ms and
00055         build one job for each referenced ms.
00056         """
00057         
00058         casalog.origin("ParallelTaskHelper")
00059         
00060         try:
00061             msTool = mstool()
00062             if not msTool.open(self._arg['vis']):
00063                 raise ValueError, "Unable to open MS %s," % self._arg['vis']
00064             if not msTool.ismultims():
00065                 raise ValueError, \
00066                       "MS is not a MultiMS, simple parallelization failed"
00067 
00068             subMs_idx = 0
00069             for subMS in msTool.getreferencedtables():
00070                 localArgs = copy.copy(self._arg)
00071                 localArgs['vis'] = subMS
00072                 
00073                 for key in self._arguser:
00074                     localArgs[key] = self._arguser[key][subMs_idx]
00075                 subMs_idx += 1
00076                 
00077                 self._executionList.append(
00078                     simple_cluster.JobData(self._taskName,localArgs))
00079                 
00080             msTool.close()
00081             return True
00082         except Exception, instance:
00083             casalog.post("Error handling MMS %s: %s" % (self._arg['vis'],instance),"WARN","generateJobs")
00084             msTool.close()
00085             return False
00086 
00087 
00088     def executeJobs(self):
00089         
00090         casalog.origin("ParallelTaskHelper")
00091         
00092         # jagonzal (CAS-4287): Add a cluster-less mode to by-pass parallel processing for MMSs as requested 
00093         if (self.__bypass_parallel_processing == 1):
00094             for job in self._executionList:
00095                 parameters = job.getCommandArguments()
00096                 try:
00097                     exec("from taskinit import *; from tasks import *; " + job.getCommandLine())
00098                     # jagonzal: Special case for partition
00099                     if (parameters.has_key('outputvis')):
00100                         self._sequential_return_list[parameters['outputvis']] = returnVar0
00101                     else:
00102                         self._sequential_return_list[parameters['vis']] = returnVar0
00103                 except Exception, instance:
00104                     str_instance = str(instance)
00105                     if (string.find(str_instance,"NullSelection") == 0):
00106                         casalog.post("Error running task sequentially %s: %s" % (job.getCommandLine(),str_instance),"WARN","executeJobs")
00107                         traceback.print_tb(sys.exc_info()[2])
00108                     else:
00109                         casalog.post("Ignoring NullSelection error from %s" % (parameters['vis']),"INFO","executeJobs")
00110             self._executionList = []
00111         else:
00112             self._jobQueue = simple_cluster.JobQueueManager(self._cluster)
00113             self._jobQueue.addJob(self._executionList)
00114             self._jobQueue.executeQueue()
00115 
00116     def postExecution(self):   
00117         
00118         casalog.origin("ParallelTaskHelper")
00119            
00120         ret_list = {}
00121         if (self.__bypass_parallel_processing==1):
00122             ret_list = self._sequential_return_list
00123             self._sequential_return_list = {}        
00124         elif (self._cluster != None):
00125             ret_list =  self._cluster.get_return_list()
00126         else:
00127             return None
00128         
00129         index = 0
00130         if isinstance(ret_list.values()[0],bool) and self._consolidateOutput:
00131             retval = True
00132             for subMs in ret_list:
00133                 if not ret_list[subMs]:
00134                     casalog.post("%s failed for sub-MS %s" % (self._taskName,subMs),"WARN","postExecution")
00135                     retval = False
00136                 index += 1
00137             return retval
00138         elif isinstance(ret_list.values()[0],dict) and self._consolidateOutput:
00139             ret_dict = {}
00140             for subMs in ret_list:
00141                 dict_i = ret_list[subMs]
00142                 # jagonzal (CAS-4119): Neglectable NullSelection errors may cause flagdata to return None
00143                 if isinstance(dict_i,dict):
00144                     try:
00145                         ret_dict = self.sum_dictionaries(dict_i,ret_dict)
00146                     except Exception, instance:
00147                         casalog.post("Error post processing MMS results %s: %s" % (subMs,instance),"WARN","postExecution")
00148             return ret_dict     
00149         elif (ret_list.values()[0]==None) and self._consolidateOutput:
00150              return None      
00151         else:
00152             return ret_list
00153         
00154     # jagonzal (CAS-4376): Consolidate list of return variables from the different engines into one single value 
00155     def sum_dictionaries(self,dict_list,ret_dict):
00156         for key in dict_list:
00157             item = dict_list[key]
00158             if isinstance(item,dict):
00159                 if ret_dict.has_key(key):
00160                     ret_dict[key] = self.sum_dictionaries(item,ret_dict[key])
00161                 else:
00162                     ret_dict[key] = self.sum_dictionaries(item,{})
00163             else:
00164                 if ret_dict.has_key(key):
00165                     if not isinstance(ret_dict[key],str):
00166                         ret_dict[key] += item
00167                 else:
00168                     ret_dict[key] = item
00169         return ret_dict   
00170             
00171     def go(self):
00172         
00173         casalog.origin("ParallelTaskHelper")
00174         
00175         self.initialize()
00176         if (self.generateJobs()):
00177             self.executeJobs()
00178             try:
00179                 retVar = self.postExecution()
00180             except Exception, instance:
00181                 casalog.post("Error post processing MMS results %s: %s" % (self._arg['vis'],instance),"WARN","go")
00182                 traceback.print_tb(sys.exc_info()[2])
00183                 return False
00184         else:
00185             retVar = False
00186             
00187         # Restore casalog origin
00188         casalog.origin(self._taskName)
00189         
00190         return retVar
00191 
00192     @staticmethod
00193     def getReferencedMSs(vis):
00194         
00195         msTool = mstool()
00196         if not msTool.open(vis):
00197             raise ValueError, "Unable to open MS %s." % vis
00198 
00199         if not msTool.ismultims():
00200             raise ValueError, "MS %s is not a reference MS." % vis
00201 
00202         rtnValue = msTool.getreferencedtables()
00203         if not isinstance(rtnValue, list):
00204             rtnValue = [rtnValue]
00205       
00206         msTool.close()
00207         return rtnValue
00208 
00209 
00210     @staticmethod
00211     def restoreSubtableAgreement(vis, mastersubms='', subtables=[]):
00212         """
00213         Tidy up the MMS vis by replacing the subtables of all SubMSs
00214         by the subtables from the SubMS given by "mastersubms".
00215         If specified, only the subtables in the list "subtables"
00216         are replaced, otherwise all.
00217         If "mastersubms" is not given, the first SubMS of the MMS
00218         will be used as master.
00219         """
00220 
00221         msTool = mstool();
00222         msTool.open(vis)
00223         theSubMSs = msTool.getreferencedtables()
00224         msTool.close()
00225 
00226         tbTool = tbtool();
00227         
00228         if mastersubms=='':
00229             tbTool.open(vis)
00230             myKeyw = tbTool.getkeywords()
00231             tbTool.close()
00232             mastersubms=os.path.dirname(myKeyw['ANTENNA'].split(' ')[1]) #assume ANTENNA is present
00233 
00234         mastersubms = os.path.abspath(mastersubms)
00235             
00236         theSubTables = ph.getSubtables(mastersubms)
00237 
00238         if subtables==[]:
00239             subtables=theSubTables
00240         else:
00241             for s in subtables:
00242                 if not (s in theSubTables):
00243                     raise ValueError, s+' is not a subtable of '+ mastersubms
00244 
00245         origpath = os.getcwd()      
00246         masterbase = os.path.basename(mastersubms)
00247         
00248         for r in theSubMSs:
00249             rbase = os.path.basename(r)
00250             if not rbase==masterbase:
00251                 for s in subtables:
00252                     theSubTab = r+'/'+s
00253                     if os.path.islink(theSubTab): # don't copy over links
00254                         if(os.path.basename(os.path.dirname(os.path.realpath(theSubTab)))!=masterbase):
00255                             # the mastersubms has changed: make new link
00256                             os.chdir(r)
00257                             shutil.rmtree(s, ignore_errors=True)
00258                             os.symlink('../'+masterbase+'/'+s, s)
00259                             os.chdir(origpath)
00260                     else:    
00261                         shutil.rmtree(theSubTab, ignore_errors=True)
00262                         shutil.copytree(mastersubms+'/'+s, theSubTab)
00263 
00264         return True
00265 
00266     @staticmethod
00267     def bypassParallelProcessing(switch=1):
00268         """
00269         # jagonzal (CAS-4287): Add a cluster-less mode to by-pass parallel processing for MMSs as requested 
00270         switch=1 => Process each sub-Ms sequentially
00271         switch=2 => Process the MMS as a normal MS
00272         """        
00273         ParallelTaskHelper.__bypass_parallel_processing = switch
00274         
00275     @staticmethod
00276     def getBypassParallelProcessing():
00277         """
00278         # jagonzal (CAS-4287): Add a cluster-less mode to by-pass parallel processing for MMSs as requested 
00279         switch=1 => Process each sub-Ms sequentially
00280         switch=2 => Process the MMS as a normal MS
00281         """        
00282         return ParallelTaskHelper.__bypass_parallel_processing        
00283     
00284     @staticmethod
00285     def isParallelMS(vis):
00286         """
00287         This method will let us know if we can do the simple form
00288         of parallelization by invoking on many referenced mss.
00289         """
00290         
00291         # jagonzal (CAS-4287): Add a cluster-less mode to by-pass parallel processing for MMSs as requested 
00292         if (ParallelTaskHelper.__bypass_parallel_processing == 2):
00293             return False
00294         
00295         msTool = mstool()
00296         if not msTool.open(vis):
00297             raise ValueError, "Unable to open MS %s," % vis
00298         rtnVal = msTool.ismultims() and \
00299                  isinstance(msTool.getreferencedtables(), list)
00300 
00301         msTool.close()
00302         return rtnVal
00303     
00304     @staticmethod
00305     def findAbsPath(input):
00306         if isinstance(input,str):
00307             return os.path.abspath(input)
00308 
00309         if isinstance(input, list):
00310             rtnValue = []
00311             for file_i in input:
00312                 rtnValue.append(os.path.abspath(file_i))
00313             return rtnValue
00314 
00315         # Your on your own, don't know what to do
00316         return input
00317 
00318     @staticmethod
00319     def listToCasaString(inputList):
00320         """
00321         This Method will take a list of integers and try to express them as a 
00322         compact set using the CASA notation.
00323         """
00324         if inputList is None or len(inputList) == 0:
00325             return ''
00326         
00327         def selectionString(rangeStart, rangeEnd):
00328             if rangeStart == rangeEnd:
00329                 return str(rangeStart)
00330             return "%d~%d" % (rangeStart, rangeEnd)
00331     
00332         inputList.sort()
00333         compactStrings = []
00334         rangeStart = inputList[0]
00335         lastValue = inputList[0]
00336         for val in inputList[1:]:
00337             if val > lastValue + 1:
00338                 compactStrings.append(selectionString(rangeStart,lastValue))
00339                 rangeStart = val
00340             lastValue = val
00341         compactStrings.append(selectionString(rangeStart,lastValue))
00342 
00343         return ','.join([a for a in compactStrings])