casa  $Rev:20696$
 All Classes Namespaces Files Functions Variables
simple_cluster.py
Go to the documentation of this file.
00001 import os
00002 import math
00003 import time
00004 import thread
00005 import commands
00006 import numpy as np
00007 from taskinit import *
00008 from tasksinfo import *
00009 from parallel_go import *
00010 
00011 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
00012 import traceback
00013 
00014 class simple_cluster:
00015     """The simple_cluster creates and maintains an ipcluster environment
00016     for controlling parallel execution of casa tasks (tools and scripts)
00017     """
00018     
00019     __default_mem_per_engine = 512
00020     __default_mem_fraction = 0.9
00021     __default_cpu_fraction = 0.9
00022     
00023     
00024     def __init__(self,monitoringFile='monitoring.log',verbose=False):
00025         
00026         self._project=""
00027         self._hosts=[]
00028         self._jobs={}
00029         self._rsrc={}
00030         self._job_title=1
00031         self._monitor_on=True
00032         self._monitor_running=False
00033         self._monitoringFile=monitoringFile
00034         self._verbose=verbose
00035         self._resource_on=True
00036         self._resource_running=False
00037         self._configdone=False
00038         self._cluster=cluster()
00039         self._JobQueueManager=None
00040         self._enginesRWoffsets={}
00041         self.__localCluster = False
00042         self.__running = False
00043         # jagonzal: This is basically the destructor (i.e. for graceful finalization)
00044         atexit.register(simple_cluster.stop_cluster,self)
00045 
00046     # jagonzal (CAS-4372): By-pass ssh redirection when deploying engines in localhost 
00047     def shell(self, hostname):
00048       """Creates the command line to execute the give command on the given host.
00049       If and only if the host is not localhost, ssh is used."""
00050 
00051       if self.uniqueIP(hostname) == self.uniqueIP("localhost"):
00052          return "eval "
00053       else:
00054          # f: Requests ssh to go to background just before command execution.
00055          # q: Quiet mode.  Causes all warning and diagnostic messages to be suppressed.
00056          # x: Disables X11 forwarding.
00057          return "ssh -fqx " + hostname
00058         
00059     def uniqueIP(self, hostname):
00060       """Returns a unique IP address of the given hostname,
00061       i.e. not 127.0.0.1 for localhost but localhost's global IP"""
00062       
00063       ip = socket.gethostbyname(hostname)
00064       
00065       if ip == "127.0.0.1":
00066          ip = socket.gethostbyname(socket.getfqdn())
00067          
00068       return ip
00069   
00070     ####################################################################
00071     # Static method that returns whatever the current definition of a
00072     # cluster is.  If none is defined the default cluster is created,
00073     # initialized and returned.
00074     ####################################################################
00075     @staticmethod
00076     def getCluster():
00077         # This method will check for a cluster existing at the global
00078         # scope.  If it does not exist then a default cluster will be
00079         # created.
00080         # The cluster object is returned.
00081         myf = sys._getframe(len(inspect.stack())-1).f_globals
00082         if not 'procCluster' in myf.keys():
00083             sc = simple_cluster()
00084             if (sc.init_cluster()):
00085                 return myf['procCluster']
00086             else:
00087                 return None
00088         else:
00089             sc = myf['procCluster']
00090             if not sc.isClusterRunning():
00091                 if (sc.init_cluster()):
00092                     return myf['procCluster']
00093                 else:
00094                     return None
00095             else:
00096                 return myf['procCluster']
00097     
00098     @staticmethod 
00099     def setDefaults(default_mem_per_engine=512,default_mem_fraction=0.9,default_cpu_fraction=0.9):
00100         
00101         simple_cluster.__default_mem_per_engine = default_mem_per_engine
00102         simple_cluster.__default_mem_fraction = default_mem_fraction
00103         simple_cluster.__default_cpu_fraction = default_cpu_fraction
00104         
00105     def isClusterRunning(self):
00106         return self.__running
00107     
00108     ###########################################################################
00109     ###   cluster verifiction
00110     ###########################################################################
00111     def config_cluster(self, cfg, force=False):
00112         """Read the configuration file and validate cluster definitions.  
00113 
00114         Keyword arguments:
00115         cfg -- the name of cluster configuration file
00116         force -- whether or not to reconfigure if a configured cluster exists
00117 
00118         A configuration file is an ASCII text file. Each line defines a node
00119         (also called host) with one line per host to be used, and the following 
00120         format: 
00121         
00122         - <hostname>, <number of engines>, <work directory>
00123         - <hostname>, <number of engines>, <work directory>, <fraction of total RAM>
00124         - <hostname>, <number of engines>, <work directory>, <fraction of total RAM>, <RAM per engine>
00125         
00126         where the interpretation of the parameters is as follows: 
00127         
00128         - hostname: Hostname of the target node where the cluster is deployed 
00129         
00130           NOTE: The hostname has to be provided w/o quotes
00131         
00132         - number of engines: Supports in turns 3 different formats 
00133         
00134             * If provided as an integer >1: It is interpreted as 
00135               the actual user-specified maximum number of engines
00136               
00137             * If provided as an integer =0: It will deploy as maximum 
00138               engines as possible, according to the idle CPU capacity 
00139               available at the target node
00140               
00141             * If provided as a float between 0 and 1: It is interpreted 
00142               as the percentage of idle CPU capacity that the cluster 
00143               can use in total at the target node
00144 
00145         - work directory: Area in which the cluster will put intermediate 
00146           files such as log files, configuration files, and monitoring files
00147         
00148           NOTE1: This area has to be accessible from the controller (user) machine, 
00149                  and mounted in the same path of the filesystem 
00150                  
00151           NOTE2: The path name has to be provided w/o quotes
00152         
00153         - fraction of total RAM: Supports in turns 3 different formats:
00154         
00155             * If provided as an integer >1: It is interpreted as the actual 
00156               user-specified maximum amount of RAM to be used in total at 
00157               the target node
00158               
00159             * If provided as an integer =0: It will deploy as maximum engines 
00160               as possible, according to the free RAM available at target node
00161               
00162             * If provided as a float between 0 and 1: It is interpreted as 
00163               the percentage of free RAM that the cluster can use in total 
00164               at the target node
00165             
00166         - RAM per engine: Integer, which is interpreted as the required memory 
00167           per engine in MB (default is 512MB) 
00168           
00169         It is also possible to add comments, by using the # character at the 
00170         beginning of the line. Example:
00171         
00172         #####################################################
00173         
00174         # CASA cluster configuration file for expert user
00175         orion, 10, /home/jdoe/test/myclusterhome1
00176         m42, 4, /home/jdoe/test/myclusterhome2, 0.6, 1024
00177         antares, 0.6, /home/jdoe/test/myclusterhome3, 0, 2048
00178         
00179         #####################################################
00180         
00181         - At host ``orion'': It will deploy up to 10 engines, with working 
00182           directory /home/jdoe/test/myclusterhome1, and using as much free 
00183           RAM available as possible (up to 90% by default), taking into 
00184           account that each engine can use up to 512 MB (the default and minimum)
00185           
00186         - At host ``m42'': It will deploy up to 4 engines, with working directory 
00187           /home/jdoe/test/myclusterhome2, and using at the most 60% of the free RAM 
00188           available, taking into account that each engine can use up to 1024 MB.   
00189           
00190         - At host ``antares'': It will deploy as many engines as possible, with 
00191           working directory /home/jdoe/test/myclusterhome3, using up to 60% of the 
00192           idle CPU capacity / cores, and as much free RAM available as possible 
00193           (up to 90% by default), taking into account that each engine can use up 
00194           to 2048 MB.  
00195  
00196         Normally, one does not call this function directly. 
00197         The init_cluster function will trigger this function.
00198 
00199         """
00200         
00201         casalog.origin("simple_cluster")
00202         
00203         if (len(self._hosts)>0 or self._configdone) and not force:
00204             casalog.post("Cluster already configured","WARN","config_cluster")
00205             return
00206         self._hosts=[]
00207         self._jobs={}
00208         configfile=open(cfg,'r')
00209         s=configfile.readlines()
00210         for line in s:
00211             sLine=line.rstrip()
00212             if str.find(sLine, '#')==0:
00213                 continue
00214             xyzd=str.split(sLine, ',')
00215             if len(xyzd)<3:
00216                 casalog.post("Node config should be at least: 'hostname, numengine, workdir'","WARN","config_cluster")
00217                 casalog.post("The following entry will be ignored: %s" % sLine,"WARN","config_cluster")
00218                 continue
00219             # jagonzal (CAS-4276): (Max) number of engines is not an integer any more
00220             # try:
00221             #     a=int(xyzd[1])
00222             # except:
00223             #     print 'the numofengines should be a integer instead of:', xyzd[1]
00224             #     print 'this entry will be ignored'
00225             #     continue
00226             [a, b, c]=[str.strip(xyzd[0]), float(xyzd[1]), str.strip(xyzd[2])]
00227             if len(a)<1:
00228                 casalog.post("Hostname can not be empty","WARN","config_cluster")
00229                 casalog.post("The following entry will be ignored: %s" % sLine,"WARN","config_cluster")
00230                 continue
00231             if len(c)<1:
00232                 casalog.post("Workdir can not be empty","WARN","config_cluster")
00233                 casalog.post("The following entry will be ignored: %s" % sLine,"WARN","config_cluster")
00234                 continue
00235             
00236             ### jagonzal (CAS-4276): New cluster specification file ###
00237              
00238             # Retrieve available resources to cap number of engines deployed 
00239             hostname = str(xyzd[0])
00240             (ncores_available,memory_available,cpu_available) = self.check_host_resources(hostname)
00241 
00242             # Determine maximum number of engines that can be deployed at target node
00243             max_engines_user = b
00244             if (max_engines_user<=0):
00245                 max_engines = int(self.__default_cpu_fraction*(cpu_available/100.0)*ncores_available)
00246                 if (max_engines < 2):
00247                     casalog.post("CPU free capacity available %s of %s cores would not support cluster mode at node %s, starting only 1 engine" 
00248                                  %(str(cpu_available),str(ncores_available),hostname),"WARN","config_cluster")
00249                     max_engines = 1                
00250             elif (max_engines_user<=1):
00251                 max_engines = int(max_engines_user*ncores_available)
00252             else:
00253                 max_engines = int(max_engines_user)
00254             
00255             casalog.post("Will deploy up to %s engines at node %s" % (str(max_engines),hostname), "INFO","config_cluster")
00256             
00257             # Determine maximum memory that can be used at target node
00258             if len(xyzd)>=4:
00259                 max_mem_user = float(xyzd[3])
00260                 if (max_mem_user<=0):
00261                     max_mem = int(round(self.__default_mem_fraction*memory_available))
00262                 elif (max_mem_user<=1):
00263                     max_mem = int(round(max_mem_user*memory_available))
00264                 else:
00265                     max_mem = int(max_mem_user)
00266             else:
00267                 max_mem = int(round(self.__default_mem_fraction*memory_available))
00268                     
00269                     
00270             casalog.post("Will use up to %sMB of memory at node %s" % (str(max_mem),hostname), "INFO","config_cluster")
00271             
00272             # User can provide an estimation of the amount of RAM memory necessary per engine        
00273             mem_per_engine = self.__default_mem_per_engine
00274             if len(xyzd)>=5:
00275                 mem_per_engine_user = float(xyzd[4])
00276                 if (mem_per_engine_user>self.__default_mem_per_engine):
00277                     mem_per_engine = mem_per_engine_user
00278             
00279             # Apply heuristics: If memory limits number of engines then we can increase the number of openMP threads
00280             nengines=int(max_mem/mem_per_engine)
00281             if (nengines < 2):
00282                 casalog.post("Free memory available %sMB would not support cluster mode at node %s, starting only 1 engine"
00283                              % (str(max_mem),hostname), "WARN","config_cluster")
00284                 nengines=1
00285             else:
00286                 casalog.post("Required memory per engine %sMB allows to deploy up to %s engines at node %s " 
00287                              % (str(mem_per_engine),str(nengines),hostname), "INFO","config_cluster")
00288         
00289             nengines = int(nengines)
00290             if (nengines>max_engines): 
00291                 casalog.post("Cap number of engines deployed at node %s from %s to %s in order to meet maximum number of engines constraint" 
00292                              % (hostname,str(nengines),str(max_engines)), "INFO","config_cluster")
00293                 nengines = max_engines       
00294             
00295             omp_num_nthreads = 1
00296             while (nengines*omp_num_nthreads*2 <= max_engines):
00297                 omp_num_nthreads *= 2
00298                 
00299             if (omp_num_nthreads>1):
00300                 casalog.post("Enabling openMP to %s threads per engine at node %s" % (str(omp_num_nthreads),hostname), "INFO","config_cluster")
00301                 
00302             self._hosts.append([a, nengines, c, omp_num_nthreads])
00303     
00304         self._configdone=self.validate_hosts()
00305 
00306         if not self._configdone:
00307             casalog.post("Failed to configure the cluster", "WARN","config_cluster")
00308 
00309     def check_host_resources(self,hostname):
00310         """
00311         jagonzal (CAS-4276 - New cluster specification file): Retrieve resources available
00312         at target node in order to dynamically deploy the engines to fit the idle capacity
00313         """
00314         
00315         casalog.origin("simple_cluster")
00316         
00317         ncores = 0
00318         memory = 0.
00319         cmd_os = self.shell(hostname) + " 'uname -s'"
00320         os = commands.getoutput(cmd_os)
00321         if (os == "Linux"):
00322             cmd_ncores = self.shell(hostname) + " 'cat /proc/cpuinfo' "           
00323             res_ncores = commands.getoutput(cmd_ncores)
00324             str_ncores = res_ncores.count("processor")
00325             
00326             try:
00327                 ncores = int(str_ncores)
00328             except:
00329                 casalog.post("Problem converting number of cores into numerical format at node %s: %s" % (hostname,str_ncores),"WARN","check_host_resources")
00330                 pass
00331             
00332             memory=0.0
00333             for memtype in ['MemFree',
00334                             'Buffers',
00335                             'Cached' # "Cached" has a the problem that there can also be "SwapCached"
00336                             ]:
00337                 cmd_memory = self.shell(hostname) + " 'cat /proc/meminfo | grep -v SwapCached | grep "+memtype+"'"
00338                 str_memory = commands.getoutput(cmd_memory)
00339                 str_memory = string.replace(str_memory,memtype+':','')
00340                 str_memory = string.replace(str_memory,"kB","")
00341                 str_memory = string.replace(str_memory," ","")
00342             
00343                 try:
00344                     memory += float(str_memory)/1024.
00345                 except:
00346                     casalog.post("Problem converting memory into numerical format at node %s: %s" % (hostname,str_memory),"WARN","check_host_resources")
00347                     break
00348             
00349             cmd_cpu = self.shell(hostname) + " 'top -b -n 1 | grep Cpu' "
00350             str_cpu = commands.getoutput(cmd_cpu)
00351             list_cpu = string.split(str_cpu,',')
00352             str_cpu = "100"
00353             for item in list_cpu:
00354                 if item.count("%id")>0:
00355                     str_cpu = string.replace(item,"%id","")
00356                     str_cpu = string.replace(str_cpu," ","")
00357             
00358             try:
00359                 cpu = float(str_cpu)
00360             except:
00361                 casalog.post("Problem converting available cpu into numerical format at node %s: %s" % (hostname,str_cpu),"WARN","check_host_resources")
00362             
00363         else: # Mac OSX
00364             cmd_ncores = self.shell(hostname) + " '/usr/sbin/sysctl -n hw.ncpu'"
00365             res_ncores = commands.getoutput(cmd_ncores)
00366             str_ncores = res_ncores
00367             
00368             try:
00369                 ncores = int(str_ncores)
00370             except:
00371                 casalog.post("Problem converting number of cores into numerical format at node %s: %s" % (hostname,str_ncores),"WARN","check_host_resources")
00372                 pass            
00373             
00374             cmd_memory = self.shell(hostname) + " 'top -l 1 | grep PhysMem: | cut -d , -f5 ' "
00375             str_memory = commands.getoutput(cmd_memory)
00376             str_memory = string.replace(str_memory,"M free.","")
00377             str_memory = string.replace(str_memory," ","")
00378 
00379             try:
00380                 memory = float(str_memory)
00381             except:
00382                 casalog.post("Problem converting memory into numerical format at node %s: %s" % (hostname,str_memory),"WARN","check_host_resources")
00383                 pass
00384             
00385             cmd_cpu = self.shell(hostname) + " 'top -l1 | grep usage' "
00386             str_cpu = commands.getoutput(cmd_cpu)
00387             list_cpu = string.split(str_cpu,',')
00388             str_cpu = "100"
00389             for item in list_cpu:
00390                 if item.count("% idle")>0:
00391                     str_cpu = string.replace(item,"% idle","")
00392                     str_cpu = string.replace(str_cpu," ","")
00393             
00394             try:
00395                 cpu = float(str_cpu)
00396             except:
00397                 casalog.post("Problem converting available cpu into numerical format at node %s: %s" % (hostname,str_cpu),"WARN","check_host_resources")            
00398         
00399         ncores = int(ncores)
00400         memory = int(round(memory))
00401         casalog.post("Host %s, Number of cores %s, Memory available %sMB, CPU capacity available %s%%" % (hostname,str(ncores),str(memory),str(cpu)), "INFO","check_host_resources")
00402         
00403         return (ncores,memory,cpu)
00404             
00405     
00406     def validate_hosts(self):
00407         """Validate the cluster specification.
00408 
00409         This function is normally called internally by configure_cluster
00410         function. 
00411 
00412         """
00413         
00414         casalog.origin("simple_cluster")
00415         
00416         # jagonzal (CAS-4287): Add a cluster-less mode to by-pass parallel processing for MMSs as requested 
00417         if (len(self._hosts)==1):
00418             if (self._hosts[0][1] == 1):
00419                 if (self.uniqueIP(self._hosts[0][0]) == self.uniqueIP("localhost")):
00420                     casalog.post("Only one engine can be deployed at localhost, disabling cluster mode","WARN","validate_hosts")
00421                     from parallel.parallel_task_helper import ParallelTaskHelper
00422                     ParallelTaskHelper.bypassParallelProcessing(1)
00423                     return False
00424         
00425         uhost=set()
00426         for i in range(len(self._hosts)):
00427             uhost.add(self._hosts[i][0])
00428         if len(uhost)==0:
00429             casalog.post("Configuration table is empty","WARN","validate_hosts")
00430             return False
00431         if len(uhost)<len(self._hosts):
00432             casalog.post("Configuration table contains repeated node name","WARN","validate_hosts")
00433             return False
00434         for i in range(len(self._hosts)):
00435             if type(self._hosts[i][1])!=int:
00436                 casalog.post("Number of engines must be an integer","WARN","validate_hosts")
00437                 return False
00438         for i in range(len(self._hosts)):
00439             if not os.path.exists(self._hosts[i][2]):
00440                 casalog.post("Directory %s does not exist" % self._hosts[i][2],"WARN","validate_hosts")
00441                 return False
00442         for i in range(len(self._hosts)):
00443             try:
00444                 tfile=self._hosts[i][2]+'/nosuchfail'
00445                 fid = open(tfile, 'w')
00446                 fid.close()
00447                 os.remove(tfile)
00448             except IOError:
00449                 casalog.post("Failed write permission to directory %s" % self._hosts[i][2],"WARN","validate_hosts")
00450                 return False
00451         return True
00452     
00453     
00454     ###########################################################################
00455     ###   project management
00456     ###########################################################################
00457     def create_project(self, proj=""):
00458         """Create a project. 
00459 
00460         Keyword arguments:
00461         proj -- the name of project (default: 'proj'+timestamp).  
00462 
00463         A project maintains a subdirectory under each node's work_dir. All 
00464         output files of an engine hosted on that node will by default store
00465         under the subdirectory.
00466 
00467         This function is normally called internally by init_cluster function. 
00468 
00469         Example:
00470         CASA <33>: sl.list_projects
00471         host: casa-dev-07 ------------------------>>>>
00472         bProj  bsplit  csplit  my_project
00473         host: casa-dev-08 ------------------------>>>>
00474         bProj  bsplit  csplit  my_project
00475         host: casa-dev-10 ------------------------>>>>
00476         bProj  bsplit  csplit
00477 
00478         CASA <34>: sl.create_project('dflag')
00479         output directory:
00480         /home/casa-dev-07/hye/ptest/dflag
00481         /home/casa-dev-08/hye/ptest/dflag
00482         /home/casa-dev-10/hye/ptest/dflag
00483 
00484         CASA <36>: sl.list_projects
00485         host: casa-dev-07 ------------------------>>>>
00486         bProj  bsplit  csplit  dflag  my_project
00487         host: casa-dev-08 ------------------------>>>>
00488         bProj  bsplit  csplit  dflag  my_project
00489         host: casa-dev-10 ------------------------>>>>
00490         bProj  bsplit  csplit  dflag
00491 
00492         """
00493         
00494         casalog.origin("simple_cluster")
00495         
00496         if not self._configdone:
00497             return
00498         if type(proj)!=str:
00499             casalog.post("Project name must be string","WARN","create_project")
00500             return
00501     
00502         self._project=proj.strip()
00503         if self._project=="":
00504             tm=time.strftime("%Y%b%d-%Hh%Mm%Ss", time.localtime())
00505             self._project='proj'+tm
00506     
00507         for i in range(len(self._hosts)):
00508             if not os.path.exists(self._hosts[i][2]+'/'+proj.strip()):
00509                 cmd='mkdir '+self._hosts[i][2]+'/'+self._project
00510                 os.system(cmd)
00511     
00512     def do_project(self, proj):
00513         """Use a project previously created. 
00514 
00515         Keyword arguments:
00516         proj -- the name of project.
00517 
00518         A project maintains a subdirectory under each node's work_dir. All 
00519         output files of an engine hosted on that node will by default store
00520         under the subdirectory.
00521 
00522         Example:
00523         CASA <38>: sl._project
00524         Out[38]: 'dflag'
00525 
00526         CASA <39>: sl.do_project('csplit')
00527         output directory:
00528         /home/casa-dev-07/hye/ptest/csplit
00529         /home/casa-dev-08/hye/ptest/csplit
00530         /home/casa-dev-10/hye/ptest/csplit
00531 
00532         CASA <40>: sl._project
00533         Out[40]: 'csplit'
00534 
00535         """
00536         
00537         casalog.origin("simple_cluster")
00538         
00539         if not self._configdone:
00540             return
00541         if type(proj)!=str or proj.strip()=="":
00542             casalog.post("Project name must be a nonempty string","WARN","do_project")
00543             return
00544     
00545         projexist=True
00546         for i in range(len(self._hosts)):
00547             if not os.path.exists(self._hosts[i][2]+'/'+proj.strip()):
00548                 casalog.post("No project directory found on %s" % self._hosts[i][0],"WARN","do_project")
00549                 projexist=False
00550         if projexist:
00551             self._project=proj.strip()
00552         casalog.post("Output directory:","INFO","do_project")
00553         for i in range(len(self._hosts)):
00554             casalog.post("Output directory: %s/%s" % (self._hosts[i][2],self._project),"INFO","do_project")
00555     
00556     def erase_project(self, proj):
00557         """Erase files and dirs of a project. 
00558 
00559         Keyword arguments:
00560         proj -- the name of project.
00561 
00562         A project maintains a subdirectory under each node's work_dir. All 
00563         output files of an engine hosted on that node will by default store
00564         under the subdirectory.
00565 
00566         Example:
00567         CASA <30>: sl.list_projects
00568         host: casa-dev-07 ------------------------>>>>
00569         aNew  bProj  bsplit  csplit  my_project
00570         host: casa-dev-08 ------------------------>>>>
00571         aNew  bProj  bsplit  csplit  my_project
00572         host: casa-dev-10 ------------------------>>>>
00573         bProj  bsplit  csplit
00574 
00575         CASA <31>: sl.erase_project('aNew')
00576 
00577         CASA <32>: sl.list_projects
00578         host: casa-dev-07 ------------------------>>>>
00579         bProj  bsplit  csplit  my_project
00580         host: casa-dev-08 ------------------------>>>>
00581         bProj  bsplit  csplit  my_project
00582         host: casa-dev-10 ------------------------>>>>
00583         bProj  bsplit  csplit
00584 
00585         """
00586         
00587         casalog.origin("simple_cluster")
00588         
00589         if not self._configdone:
00590             return
00591         if type(proj)!=str or proj.strip()=="":
00592             casalog.post("Project name must be a nonempty string","WARN","erase_project")
00593             return
00594         for i in range(len(self._hosts)):
00595             cmd='rm -rf '+self._hosts[i][2]+'/'+proj
00596             os.system(cmd)
00597         if proj==self._project:
00598             self._project=""
00599         if self._project=="":
00600             pass
00601     
00602     def clear_project(self, proj):
00603         """Remove all previous results of the proj
00604 
00605         Keyword arguments:
00606         proj -- the name of project.
00607 
00608         A project maintains a subdirectory under each node's work_dir. All 
00609         output files of an engine hosted on that node will by default store
00610         under the subdirectory.
00611 
00612         Example:
00613         CASA <27>: sl.list_project('my_project')
00614         host: casa-dev-07 ------------------------>>>>
00615         casapy-20101122165601-5.log  sim.alma.csv.mid-f0-s0-b640-e768.flux
00616         host: casa-dev-08 ------------------------>>>>
00617         casapy-20101122165601-6.log  sim.alma.csv.mid-f0-s0-b768-e896.flux
00618         host: casa-dev-10 ------------------------>>>>
00619         casapy-20101122165601-7.log  sim.alma.csv.mid-f0-s0-b320-e640.flux
00620 
00621         CASA <28>: sl.clear_project('my_project')
00622 
00623         CASA <29>: sl.list_project('my_project')
00624         host: casa-dev-07 ------------------------>>>>
00625         host: casa-dev-08 ------------------------>>>>
00626         host: casa-dev-10 ------------------------>>>>
00627 
00628         """
00629         
00630         casalog.origin("simple_cluster")
00631         
00632         # This can be a slow operation, it is better to do it parallel
00633         if not self._configdone:
00634             return
00635         if type(proj)!=str or proj.strip()=="":
00636             casalog.post("Project name must be a nonempty string","WARN","clear_project")
00637             return
00638         for i in range(len(self._hosts)):
00639             cmd='rm -rf '+self._hosts[i][2]+'/'+proj.strip()+'/*'
00640             os.system(cmd)
00641     
00642     def list_project(self, proj):
00643         """List previous results of the proj
00644 
00645         Keyword arguments:
00646         proj -- the name of project.
00647 
00648         A project maintains a subdirectory under each node's work_dir. All 
00649         output files of an engine hosted on that node will by default store
00650         under the subdirectory.
00651 
00652         Example:
00653         CASA <19>: sl.list_project('bsplit')
00654         host: casa-dev-07 ------------------------>>>>
00655         test_regression_TDEM0003-f0-s11.ms   test_regression_TDEM0003-f0-s13.ms
00656         host: casa-dev-08 ------------------------>>>>
00657         test_regression_TDEM0003-f0-s10.ms  test_regression_TDEM0003-f3-s9.ms
00658         host: casa-dev-10 ------------------------>>>>
00659         test_regression_TDEM0003-f0-s0.ms   test_regression_TDEM0003-f4-s11.ms
00660 
00661         """
00662         
00663         casalog.origin("simple_cluster")
00664         
00665         if not self._configdone:
00666             return
00667         if type(proj)!=str or proj.strip()=="":
00668             casalog.post("Project name must be a nonempty string","WARN","list_project")
00669             return
00670         for i in range(len(self._hosts)):
00671             casalog.post("Host: %s ------------------------>>>>" % self._hosts[i][0] ,"INFO","list_project")
00672             cmd='ls '+self._hosts[i][2]+'/'+proj
00673             os.system(cmd)
00674     
00675     def erase_projects(self):
00676         """Erase all previous results of all projects
00677 
00678         A project maintains a subdirectory under each node's work_dir. All 
00679         output files of an engine hosted on that node will by default store
00680         under the subdirectory.
00681 
00682         """
00683         if not self._configdone:
00684             return
00685         for i in range(len(self._hosts)):
00686             cmd='rm -rf '+self._hosts[i][2]+'/*'
00687             os.system(cmd)
00688         self._project=""
00689     
00690     def list_projects(self):
00691         """List all previous projects
00692 
00693         A project maintains a subdirectory under each node's work_dir. All 
00694         output files of an engine hosted on that node will by default store
00695         under the subdirectory.
00696 
00697         Example:
00698         <CASA 16:>sl.list_projects
00699         host: casa-dev-07 ------------------------>>>>
00700         aNew  bProj  bsplit  csplit  my_project
00701         host: casa-dev-08 ------------------------>>>>
00702         aNew  bProj  bsplit  csplit  my_project
00703         host: casa-dev-10 ------------------------>>>>
00704         bProj  bsplit  csplit
00705 
00706         """
00707         
00708         casalog.origin("simple_cluster")
00709         
00710         if not self._configdone:
00711             return
00712         for i in range(len(self._hosts)):
00713             casalog.post("Host: %s ------------------------>>>>" % self._hosts[i][0] ,"INFO","list_projects")
00714             cmd='ls '+self._hosts[i][2]+'/' 
00715             os.system(cmd)
00716     
00717     def reset_project(self):
00718         """Erase previous result and reset the status current project.
00719 
00720         A project maintains a subdirectory under each node's work_dir. All 
00721         output files of an engine hosted on that node will by default store
00722         under the subdirectory.
00723 
00724         Example:
00725         CASA <43>: sl.list_project('bProj')
00726         ....too many here...
00727 
00728         CASA <44>: sl.do_project('bProj')
00729         output directory:
00730         /home/casa-dev-07/hye/ptest/bProj
00731         /home/casa-dev-08/hye/ptest/bProj
00732         /home/casa-dev-10/hye/ptest/bProj
00733 
00734         CASA <45>: sl.list_project('bProj')
00735         host: casa-dev-07 ------------------------>>>>
00736         host: casa-dev-08 ------------------------>>>>
00737         host: casa-dev-10 ------------------------>>>>
00738 
00739         """
00740         if not self._configdone:
00741             return
00742         self.stop_monitor()
00743         self.clear_project(self._project)
00744         self._jobs={}
00745         self._job_title=1
00746         self._monitor_on=True
00747     
00748     def get_hosts(self):
00749         """List current cluster.
00750 
00751         CASA <48>: sl.get_hosts
00752         Out[48]:
00753         [['casa-dev-07', 4, '/home/casa-dev-07/hye/ptest'],
00754          ['casa-dev-08', 4, '/home/casa-dev-08/hye/ptest'],
00755          ['casa-dev-10', 4, '/home/casa-dev-10/hye/ptest']]
00756 
00757         """
00758         if not self._configdone:
00759             return
00760         return self._hosts 
00761     
00762     ###########################################################################
00763     ###   cluster management
00764     ###########################################################################
00765     
00766     # jagonzal (CAS-4292): This method is deprecated, because I'd like to 
00767     # avoid brute-force methods like killall which just hide state errors
00768     def cold_start(self):
00769         """kill all engines on all hosts. Shutdown current cluster.
00770         
00771         This is used if a complete restart of the cluster is needed. One can
00772         rerun init_cluster after this. This also kills possible leftover
00773         engines from previous sessions.
00774 
00775         """
00776         if not self._configdone:
00777             return
00778         # jagonzal (CAS-4292): Stop the cluster via parallel_go before using brute-force killall
00779         self.stop_cluster()
00780         for i in range(len(self._hosts)):
00781             hostname = self._hosts[i][0]
00782             cmd=self.shell(hostname) + ' "killall -9 ipengine"'
00783             os.system(cmd)
00784             
00785     #   jagonzal (CAS-4292): Method for gracefull finalization (e.g.: when closing casa)
00786     def stop_cluster(self):
00787         """ Destructor method to shut down the cluster gracefully """
00788         # Stop cluster and thread services
00789         self.stop_monitor()
00790         # Now we can stop the cluster w/o problems
00791         # jagonzal (CAS-4292): Stop the cluster w/o using brute-force killall as in cold_start
00792         self._cluster.stop_cluster()            
00793         self.__running = False
00794     
00795     def stop_nodes(self):
00796         """Stop all engines on all hosts of current cluster.
00797         
00798         After running this, the cluster contains no engines.
00799 
00800         """
00801         if not self._configdone:
00802             return
00803         for i in self._cluster.get_nodes():
00804             self._cluster.stop_node(i)
00805     
00806     def start_cluster(self):
00807         """Start a cluster with current configuration.
00808         
00809         Normally, one does not need to run this function directly. The 
00810         init_cluster will call this internally.
00811 
00812         """
00813         
00814         casalog.origin("simple_cluster")
00815         
00816         if not self._configdone:
00817             return
00818         for i in range(len(self._hosts)):
00819             self._cluster.start_engine(self._hosts[i][0], self._hosts[i][1], 
00820                                    self._hosts[i][2]+'/'+self._project,self._hosts[i][3])
00821             if (self._hosts[i][3]>1):
00822                 omp_num_threads = self._cluster.execute("print os.environ['OMP_NUM_THREADS']",i)[0]['stdout']
00823                 if (omp_num_threads.count(str(self._hosts[i][3]))>0):
00824                     casalog.post("Open MP enabled at host %s,  OMP_NUM_THREADS=%s" % (self._hosts[i][0],str(self._hosts[i][3])), "INFO","start_cluster")
00825                 else:
00826                     casalog.post("Problem enabling Open MP at host %s: %s" % (self._hosts[i][0],omp_num_threads),"WARN","start_cluster")
00827                     
00828         self.start_logger()
00829         self.start_monitor()
00830         # jagonzal (CAS-4324): Is better to update the resources in the
00831         # check_status method, after updating the status of the jobs
00832         # self.start_resource()
00833         self._rsrc = self.show_resource(True)
00834     
00835     def get_host(self, id):
00836         """Find out the name of the node that hosts this engine.
00837 
00838         Keyword arguments:
00839         id -- the engine id
00840 
00841         Example:
00842         CASA <50>: sl.get_host(8)
00843         Out[50]: 'casa-dev-10'
00844 
00845         """
00846         
00847         casalog.origin("simple_cluster")
00848         
00849         if not self._configdone:
00850             return
00851         ids=self._cluster.get_ids()
00852         if type(id)!=int:
00853             casalog.post("The argument must be an engine id (int)","WARN","get_host")
00854             return ''
00855         if ids.count(id)!=1:
00856             casalog.post("Engine %d does not exist" % id,"WARN","get_host")
00857             return ''
00858         e=self._cluster.get_engines()
00859         for i in range(len(e)):
00860             if e[i][0]==id:
00861                 return e[i][1] 
00862     
00863     def get_engine_store(self, id):
00864         """Get the root path where an engine writes out result
00865 
00866         Keyword arguments:
00867         id -- the engine id
00868 
00869         Example:
00870         CASA <52>: sl.get_engine_store(8)
00871         Out[52]: '/home/casa-dev-10/hye/ptest/bProj/'
00872 
00873         """
00874         if not self._configdone:
00875             return
00876         hst=self.get_host(id)
00877         for i in range(len(self._hosts)):
00878             if self._hosts[i][0]==hst:
00879                 pth=self._hosts[i][2]
00880                 sl=''
00881                 if not pth.endswith('/'):
00882                     sl='/' 
00883                 return pth+sl+self._project+'/'
00884     
00885     ###########################################################################
00886     ###   log management
00887     ###########################################################################
00888     def start_logger(self):
00889         """Link all engine logs to the current directory. 
00890 
00891         After running this, the current directory contains links to each of
00892         the engine logs with file name 'engine-[id].log such that one can 
00893         conveniently browse engine logs with casa logviewer.
00894 
00895         """
00896         if not self._configdone:
00897             return
00898         lg=self._cluster.get_casalogs()
00899         os.system('rm -f engine-*.log')
00900         for i in lg:
00901             eng='engine'+i[str.rfind(i,'-'):]
00902             os.symlink(i, eng)
00903     
00904     ###########################################################################
00905     ###   resource management
00906     ###########################################################################   
00907     def start_resource(self): 
00908         """Start monitoring resource usage.
00909 
00910         Four critical resource usage indicators (for parallel execution), 
00911         namely, %cpu, %iowait, %mem and %memswap on all hosts are continuously
00912         checked. This infomation can be used to tune the parallel performance.
00913 
00914         Normally, one does not call this function directly. The init_cluster
00915         will call this function.
00916 
00917         """
00918         
00919         if not self._configdone:
00920             return
00921         self._resource_on=True 
00922         self._rsrc={}
00923         return thread.start_new_thread(self.update_resource, ())
00924     
00925     def update_resource(self):
00926         """Set up repeated resource checking.
00927 
00928         Four critical resource usage indicators (for parallel execution), 
00929         namely, %cpu, %iowait, %mem and %memswap on all hosts are continuously
00930         checked. This infomation can be used to tune the parallel performance.
00931 
00932         Normally, one does not call this function directly. The init_cluster
00933         will call this function.
00934 
00935         """
00936                     
00937         self._resource_running=True
00938         if not self._configdone:
00939             return
00940         while self._resource_on:
00941             if ((len(self._jobs.keys())>0) or (len(self._rsrc.keys())==0)):
00942                 self.check_resource()
00943             time.sleep(5)
00944         self._resource_running=False
00945     
00946     def stop_resource(self): 
00947         """Stop monitoring resource usage.
00948 
00949         Four critical resource usage indicators (for parallel execution), 
00950         namely, %cpu, %iowait, %mem and %memswap on all hosts are continuously
00951         checked. This infomation can be used to tune the parallel performance.
00952 
00953         Normally, one does not call this function directly. The init_cluster
00954         will call this function.
00955 
00956         """
00957         if not self._configdone:
00958             return
00959         self._resource_on=False 
00960         while (self._resource_running):
00961             time.sleep(1)
00962         self._rsrc={}
00963     
00964     def show_resource(self, long=False):
00965         """
00966         jagonzal (CAS-4372): Old resource monitoring functions were causing crashes in NRAO cluster
00967         """
00968         if not self._configdone:
00969             return
00970         if long:
00971             return self.check_resource(False)
00972         else:
00973             self.check_resource(True)
00974     
00975     def check_resource(self, verbose_local=False):
00976         """
00977         jagonzal 29/05/12 (CAS-4137) - HPC project (CAS-4106)
00978         ========================================================================
00979         Advanced monitoring function that provides CPU,Memory,I/O and job queue
00980         stats per node and total per host. 
00981 
00982         There are 2 usage modes:
00983         - Called from the monitoring thread in regular time intervals 
00984           (i.e. within the check_status method), to only dump the stats 
00985           into a file or the terminal.
00986         - Called from the command line, to return a dictionary in addition 
00987           of dumping the stats into a file or into the terminal.
00988 
00989         In both cases logging can be controlled in the following way:
00990         - User can provide a file-name when creating the simple_cluster 
00991           object, trough the string parameter "monitoringFile", to specify 
00992           the location of the monitoring file, otherwise it defaults to 
00993           'monitoring.log' in the working directory. 
00994         - User can also switch verbose mode when creating the simple_cluster 
00995           object, trough the boolean parameter "verbose", to have the monitoring 
00996           info dumped into the terminal, otherwise it defaults to False and it 
00997           only dumps into the monitoring file. 
00998         - User can even use the stand alone method show_state specifying 
00999           verbosity only for that particular call via the "verbose_local" 
01000           parameter, and the method returns a dictionary with all the stats 
01001           per node and total per host. 
01002 
01003         Examples:
01004         - Stand-Alone usage from terminal to return a dictionary
01005           from simple_cluster import *
01006           sc = simple_cluster.getCluster()
01007           stats = sc.show_state(False)
01008         - Stand-Alone usage from terminal to print the stats into the terminal
01009           from simple_cluster import *
01010           sc = simple_cluster.getCluster()
01011           sc.show_state(True)
01012         - Service-Mode usage to specify a custom file for dumping the stats
01013           from simple_cluster import *
01014           sc = simple_cluster('mycustomfile.log')
01015           sc.init_cluster('cluster-config.txt','test-rhel')
01016         - Service-Mode usage to specify a custom file for dumping the stats
01017           and verbose mode to additionally dump the stats into the terminal
01018           from simple_cluster import *
01019           sc = simple_cluster('mycustomfile.log',True)
01020           sc.init_cluster('cluster-config.txt','test-rhel')
01021         """
01022 
01023         # Determine verbose mode
01024         verbose = self._verbose or verbose_local
01025 
01026         # Open monitoring file
01027         fid = open(self._monitoringFile + ".tmp", 'w')
01028                 
01029         # Print header
01030         print >> fid, "%20s%10s%10s%10s%10s%10s%10s%10s%15s%15s%20s%30s" % ( "Host","Engine","Status","CPU[%]","Memory[%]","Time[s]",
01031                                                                              "Read[MB]","Write[MB]","Read[MB/s]","Write[MB/s]","Job","Sub-MS")
01032         if (verbose):
01033             print "%20s%10s%10s%10s%10s%10s%10s%10s%15s%15s%20s%30s" % ("Host","Engine","Status","CPU[%]","Memory[%]","Time[s]",
01034                                                                         "Read[MB]","Write[MB]","Read[MB/s]","Write[MB/s]","Job","Sub-MS")
01035 
01036         result = {}
01037         engines_list = self._cluster.get_engines()
01038         for engine in engines_list:
01039             hostname = str(engine[1])            
01040             # First of all get operating system
01041             cms_os = self.shell(hostname) + " 'uname -s'"
01042             os = commands.getoutput(cms_os)
01043             # Get read/write activity
01044             read_bytes = 0.0
01045             write_bytes = 0.0
01046             if (os == "Linux"):
01047                 # Get read activity
01048                 cmd_read_bytes = self.shell(hostname) + " 'cat /proc/" + str(engine[2]) + "/io | grep read_bytes'"
01049                 read_bytes=commands.getoutput(cmd_read_bytes)
01050                 read_bytes = read_bytes.split(":")
01051                 try:
01052                     read_bytes = float(read_bytes[1])
01053                 except:
01054                     if (verbose):
01055                         print "Problem converting read_bytes into float for engine " + str(engine[0]) + " running in host " + str(engine[1])
01056                         print "read_bytes: [" +  str(read_bytes) + "]"
01057                     read_bytes = 0
01058                 # Get write activity
01059                 cmd_write_bytes = self.shell(hostname) + " 'cat /proc/" + str(engine[2]) + "/io | grep write_bytes | head -1'"
01060                 write_bytes=commands.getoutput(cmd_write_bytes)
01061                 write_bytes = write_bytes.split(":")
01062                 try:
01063                     write_bytes = float(write_bytes[1])
01064                 except:
01065                     if (verbose):
01066                         print "Problem converting write_bytes into float for engine " + str(engine[0]) + " running in host " + str(engine[1])
01067                         print "write_bytes: [" +  str(write_bytes) + "]"
01068                     write_bytes = 0.0
01069             # Get resources usage (cpu, mem, elapsed time since start)
01070             cmd_resources = self.shell(hostname) + " 'ps -p " + str(engine[2]) + " -o %cpu,%mem,etime' | tail -1"
01071             resources=commands.getoutput(cmd_resources)
01072             resources = resources.split(" ")
01073             while resources.count('')>0:
01074                 resources.remove('')
01075             # Convert CPU into number
01076             cpu = 0
01077             try:
01078                 cpu = round(float(resources[0]))
01079             except:
01080                 if (verbose):
01081                         print "Problem converting CPU into float for engine " + str(engine[0]) + " running in host " + str(engine[1])
01082                         print "CPU: [" +  resources[0] + "]"
01083             # Convert Memory into number
01084             memory = 0
01085             try:
01086                 memory = round(float(resources[1]))
01087             except:
01088                 if (verbose):
01089                         print "Problem converting memory into float for engine " + str(engine[0]) + " running in host " + str(engine[1])
01090                         print "Memory: [" +  resources[1] + "]"
01091             # Initialize engine RW offsets map
01092             if not self._enginesRWoffsets.has_key(engine[0]):
01093                 self._enginesRWoffsets[engine[0]] = {}
01094             # Store engine RW offsets values
01095             self._enginesRWoffsets[engine[0]]['read_offset'] = read_bytes
01096             self._enginesRWoffsets[engine[0]]['write_offset'] = write_bytes
01097             # Initialize host map
01098             if not result.has_key(engine[1]):
01099                 result[engine[1]] = {}
01100                 result[engine[1]]["CPU"] = 0.0
01101                 result[engine[1]]["Memory"] = 0.0
01102                 result[engine[1]]["Read"] = 0.0
01103                 result[engine[1]]["Write"] = 0.0
01104                 result[engine[1]]["ReadRate"] = 0.0
01105                 result[engine[1]]["WriteRate"] = 0.0
01106             # Initialize engine map
01107             if not result[engine[1]].has_key(engine[0]):
01108                 result[engine[1]][engine[0]] = {}
01109             # Store default engine values
01110             result[engine[1]][engine[0]]["CPU"] = cpu
01111             result[engine[1]][engine[0]]["Memory"] = memory
01112             result[engine[1]][engine[0]]["Read"] = 0
01113             result[engine[1]][engine[0]]["Write"] = 0
01114             result[engine[1]][engine[0]]["ReadRate"] = 0
01115             result[engine[1]][engine[0]]["WriteRate"] = 0
01116             result[engine[1]][engine[0]]["Sub-MS"] = ""
01117             result[engine[1]][engine[0]]["Status"] = "Idle"
01118             result[engine[1]][engine[0]]["Time"] = 0
01119             result[engine[1]][engine[0]]["Job"] = ""
01120             # Retrieve job status information from job structure
01121             for job in self._jobs.keys():
01122                 jobEngine = self._jobs[job]['engine']
01123                 if (jobEngine == engine[0]):           
01124                     result[engine[1]][engine[0]]["Sub-MS"] = self._jobs[job]['subms']
01125                     result[engine[1]][engine[0]]["Status"] = self._jobs[job]['status']
01126                     result[engine[1]][engine[0]]["Time"] = round(self._jobs[job]['time'])
01127                     result[engine[1]][engine[0]]["Job"] = self._jobs[job]['short'].split('=').pop().replace(' ','')
01128                     result[engine[1]][engine[0]]["Read"] = float(read_bytes - self._jobs[job]['read_offset'])/(1024*1024)
01129                     result[engine[1]][engine[0]]["Write"] = float(write_bytes - self._jobs[job]['write_offset'])/(1024*1024)                      
01130                     # Compute data rates
01131                     if (result[engine[1]][engine[0]]["Time"] > 0):
01132                         result[engine[1]][engine[0]]["ReadRate"] = result[engine[1]][engine[0]]["Read"] / result[engine[1]][engine[0]]["Time"]
01133                         result[engine[1]][engine[0]]["WriteRate"] = result[engine[1]][engine[0]]["Write"] / result[engine[1]][engine[0]]["Time"]
01134             # Accumulate host values
01135             result[engine[1]]["CPU"] += result[engine[1]][engine[0]]["CPU"]
01136             result[engine[1]]["Memory"] += result[engine[1]][engine[0]]["Memory"]
01137             result[engine[1]]["Read"] += result[engine[1]][engine[0]]["Read"]
01138             result[engine[1]]["Write"] += result[engine[1]][engine[0]]["Write"]
01139             result[engine[1]]["ReadRate"] += result[engine[1]][engine[0]]["ReadRate"]
01140             result[engine[1]]["WriteRate"] += result[engine[1]][engine[0]]["WriteRate"]
01141             # Print nodes info
01142             print >> fid, "%20s%10d%10s%10d%10d%10d%10d%10d%15d%15d%20s%30s" % ( engine[1],engine[0],
01143                                                                                  result[engine[1]][engine[0]]["Status"],
01144                                                                                  result[engine[1]][engine[0]]["CPU"],
01145                                                                                  result[engine[1]][engine[0]]["Memory"],
01146                                                                                  result[engine[1]][engine[0]]["Time"],
01147                                                                                  result[engine[1]][engine[0]]["Read"],
01148                                                                                  result[engine[1]][engine[0]]["Write"],
01149                                                                                  result[engine[1]][engine[0]]["ReadRate"],
01150                                                                                  result[engine[1]][engine[0]]["WriteRate"],
01151                                                                                  result[engine[1]][engine[0]]["Job"],
01152                                                                                  result[engine[1]][engine[0]]["Sub-MS"])
01153             if (verbose):
01154                 print "%20s%10d%10s%10d%10d%10d%10d%10d%15d%15d%20s%30s" % ( engine[1],engine[0],
01155                                                                              result[engine[1]][engine[0]]["Status"],
01156                                                                              result[engine[1]][engine[0]]["CPU"],
01157                                                                              result[engine[1]][engine[0]]["Memory"],
01158                                                                              result[engine[1]][engine[0]]["Time"],
01159                                                                              result[engine[1]][engine[0]]["Read"],
01160                                                                              result[engine[1]][engine[0]]["Write"],
01161                                                                              result[engine[1]][engine[0]]["ReadRate"],
01162                                                                              result[engine[1]][engine[0]]["WriteRate"],
01163                                                                              result[engine[1]][engine[0]]["Job"],
01164                                                                              result[engine[1]][engine[0]]["Sub-MS"])
01165 
01166         # Print separation between nodes and hosts info
01167         print >> fid, "%20s%10s%10s%10s%10s%10s%10s%10s%15s%15s%20s%30s" % ( "====================",
01168                                                                              "==========",
01169                                                                              "==========",
01170                                                                              "==========",
01171                                                                              "==========",
01172                                                                              "==========",
01173                                                                              "==========",
01174                                                                              "==========",
01175                                                                              "===============",
01176                                                                              "===============",
01177                                                                              "====================",
01178                                                                              "==============================")
01179 
01180         if (verbose):
01181             print "%20s%10s%10s%10s%10s%10s%10s%10s%15s%15s%20s%30s" % ( "====================",
01182                                                                          "==========",
01183                                                                          "==========",
01184                                                                          "==========",
01185                                                                          "==========",
01186                                                                          "==========",
01187                                                                          "==========",
01188                                                                          "==========",
01189                                                                          "===============",
01190                                                                          "===============",
01191                                                                          "====================",
01192                                                                          "==============================")
01193 
01194         # Print hosts info
01195         for host in result:
01196             print >> fid, "%20s%10s%10s%10d%10d%10s%10d%10d%15d%15d%20s%30s" % ( host,"Total","",
01197                                                                                  result[host]["CPU"],
01198                                                                                  result[host]["Memory"],
01199                                                                                  "",
01200                                                                                  result[host]["Read"],
01201                                                                                  result[host]["Write"],
01202                                                                                  result[host]["ReadRate"],
01203                                                                                  result[host]["WriteRate"],
01204                                                                                  "","")
01205         if (verbose):
01206             for host in result:
01207                 print "%20s%10s%10s%10d%10d%10s%10d%10d%15d%15d%20s%30s" % ( host,"Total","",
01208                                                                              result[host]["CPU"],
01209                                                                              result[host]["Memory"],
01210                                                                              "",
01211                                                                              result[host]["Read"],
01212                                                                              result[host]["Write"],
01213                                                                              result[host]["ReadRate"],
01214                                                                              result[host]["WriteRate"],
01215                                                                              "","")
01216 
01217         # Print final separator
01218         print >> fid, "%20s%10s%10s%10s%10s%10s%10s%10s%15s%15s%20s%30s" % ( "====================",
01219                                                                              "==========",
01220                                                                              "==========",
01221                                                                              "==========",
01222                                                                              "==========",
01223                                                                              "==========",
01224                                                                              "==========",
01225                                                                              "==========",
01226                                                                              "===============",
01227                                                                              "===============",
01228                                                                              "====================",
01229                                                                              "==============================")
01230         if (verbose):
01231             print "%20s%10s%10s%10s%10s%10s%10s%10s%15s%15s%20s%30s" % ( "====================",
01232                                                                          "==========",
01233                                                                          "==========",
01234                                                                          "==========",
01235                                                                          "==========",
01236                                                                          "==========",
01237                                                                          "==========",
01238                                                                          "==========",
01239                                                                          "===============",
01240                                                                          "===============",
01241                                                                          "====================",
01242                                                                          "==============================")
01243 
01244         # Close monitoring file
01245         fid.close()
01246 
01247         # Rename monitoring file
01248         commands.getstatusoutput("mv " + self._monitoringFile + ".tmp" + " " + self._monitoringFile)
01249 
01250         # Update resource member
01251         self._rsrc = result
01252         
01253         return result
01254 
01255     def get_return_list(self):
01256                 """
01257                 jagonzal (CAS-4376): Gather return variables from the different engines back to the main CASA controller instance
01258                 """
01259 
01260                 return_list = {}
01261                 jobQueue = self._JobQueueManager.getOutputJobs()
01262                 for job in jobQueue:
01263                         return_list[job.getCommandArguments()['vis']]=job.getReturnValues()
01264 
01265                 return return_list
01266 
01267 
01268     ###########################################################################
01269     ###   execution status management
01270     ###########################################################################
01271     def check_job(self):
01272         """Check the execution status of current noblock jobs  on all engines.
01273 
01274         This function can be used to block the terminal until all submitted
01275         jobs finish. 
01276 
01277         Example:
01278         CASA <2>: from simple_cluster import simple_cluster
01279         CASA <3>: sl=simple_cluster()
01280         CASA <4>: sl.init_cluster("my_cluster", "csplit")
01281         CASA <5>: sl.simple_split('/lustre/casa-store/hye/10B-209a_5s.ms/', '')
01282         CASA <6>: sl.check_job()
01283 
01284         """
01285 
01286         if not self._configdone:
01287             return
01288         done=False
01289         while(not done):
01290             time.sleep(5)
01291             done=True
01292             for i in sl._jobs.keys():
01293                 if not self._cluster.check_job(i):
01294                     done=False
01295     
01296     def check_status(self, notify=False):
01297         """Check the execution status of submitted no-block jobs
01298 
01299         Keyword arguments:
01300         notify -- whether or not to display detailed resource usage info
01301 
01302         Normally, one does not call this function directly. The start_monitor
01303         will call this function internally.
01304 
01305         """
01306         
01307         casalog.origin("simple_cluster")
01308 
01309         self._monitor_running = True
01310 
01311         if not self._configdone:
01312             return
01313         while self._monitor_on:
01314             time.sleep(5)
01315             curr={}
01316             try:
01317                 pend=self._cluster.queue_status()
01318                 for i in xrange(len(pend)):
01319                     (a, b)=pend[i]
01320                     curr[a]=b['pending']
01321             except:
01322                 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
01323                 # traceback.print_tb(sys.exc_info()[2])
01324                 pass
01325             for job in self._jobs.keys():
01326                 if type(job)==type(None):
01327                     self._jobs[job]['status']="unknown"
01328                 else:
01329                     eng=self._jobs[job]['engine']
01330                     sht=self._jobs[job]['short']
01331                     try:
01332                         x=1
01333                         try:
01334                             x=job.get_result(block=False)
01335                         except client.CompositeError, exception:
01336                             if notify and self._jobs[job]['status']=="scheduled":
01337                                 casalog.post("Error retrieving result of job %s from engine %s: %s, backtrace:" % (sht,str(eng),str(exception)),"SEVERE","check_status")
01338                                 exception.print_tracebacks()
01339                             self._jobs[job]['status']="broken"
01340                         except:
01341                             casalog.post("Error retrieving result of job %s from engine %s, backtrace:" % (sht,str(eng)),"SEVERE","check_status")
01342                             traceback.print_tb(sys.exc_info()[2])
01343 
01344                         if x==None:
01345                             cmd=self._jobs[job]['command']
01346                             if curr.has_key(eng):
01347                                 wk=eval(curr[eng].lstrip('execute(').rstrip(')'))
01348                                 if wk==cmd:
01349                                     self._jobs[job]['status']="running"
01350                                     if self._jobs[job]['start']=='':
01351                                         if notify:
01352                                             casalog.post("Engine %d job %s started" %(eng,sht),"INFO","check_status")
01353                                         self._jobs[job]['start']=time.time()
01354                                     self._jobs[job]['time']=time.time()-self._jobs[job]['start'] 
01355                                 else:
01356                                     pass
01357                         else:
01358                             if self._jobs[job]['status']=="running":
01359                                 if notify:
01360                                     casalog.post("Engine %d job %s finished" %(eng,sht),"INFO","check_status")
01361                                 self._jobs[job]['status']="done"
01362                             if self._jobs[job]['status']=="scheduled":
01363                                 if isinstance(x, int):
01364                                     if notify:
01365                                         casalog.post("Engine %d job %s broken" %(eng,sht),"SEVERE","check_status")
01366                                     self._jobs[job]['status']="broken"
01367                                 else:
01368                                     if notify:
01369                                         casalog.post("Engine %d job %s finished" %(eng,sht),"INFO","check_status")
01370                                     self._jobs[job]['status']="done"
01371                     except:
01372                         if notify and self._jobs[job]['status']=="running":
01373                             casalog.post("Engine %d job %s broken" %(eng,sht),"SEVERE","check_status")
01374                         self._jobs[job]['status']="broken"
01375                 
01376             # jagonzal (CAS-4324): This method consumes lots of resources, and the user terminal
01377             # is not very responsive while it's running, so we execute it only when there are jobs
01378             # beign processed.        
01379             if (len(self._jobs.keys())>0):
01380                 try:
01381                     self.check_resource()
01382                 except:
01383                     pass
01384 
01385             gr=set()
01386             for val in self._jobs.values():
01387                 gr.add(val['jobgroup'])
01388             for i in list(gr):
01389                 eml=''
01390                 jobname='None'
01391                 if i.count(':')>0:
01392                     pos=i.find(':')
01393                     jobname=i[pos+1:].strip()
01394                     eml=i[:pos].strip()
01395                 elif i.count('@')>0:
01396                     eml=i.strip()
01397                 else:
01398                     jobname=i.strip()
01399                 
01400                 finish=True
01401                 for val in self._jobs.values():
01402                     if val['jobgroup']==i and not (val['status']=='done' or 
01403                                                 val['status']=='broken'):
01404                         finish=False
01405                 if finish:
01406                     tm=time.ctime()
01407                     jobname=jobname[0:30]
01408                     msg='\n#### '+jobname+' '+ \
01409                           '#'*(68-len(jobname)-len(tm))+' '+tm+' ####'
01410                     msg+='\nengine    status  time(s)  command\n'
01411                     rmv=[]
01412                     for job in self._jobs.keys():
01413                         if self._jobs[job]['jobgroup']==i:
01414                             msg+="%6d%10s%9d%2s%s\n" % (self._jobs[job]['engine'], 
01415                                   self._jobs[job]['status'],
01416                                   int(self._jobs[job]['time']), 
01417                                   '  ',
01418                                   self._jobs[job]['command'])
01419                             rmv.append(self._jobs[job]['jobname'])
01420                     if i.strip()!='':
01421     
01422                         # Append to project result file
01423                         f=open(self._project+'.result', 'a')
01424                         f.write(msg)
01425                         f.write('\n')
01426                         f.close()
01427     
01428                         if eml!='' and eml.count(' ')==0:
01429                             #send email
01430                             #import smtplib
01431                             #from email.mime.text import MIMEText
01432                             #mal=MIMEText(msg)
01433                             #mal['Subject']='your parallel job finished'
01434                             #me=os.environ['USER']
01435                             #mal['From']=me
01436                             #mal['To']=i 
01437                             #s=smtplib.SMTP()
01438                             #s.connect()
01439                             #s.sendmail(me, [i], mal.as_string())
01440                             #s.close()
01441         
01442                             msgf='/tmp/emailmsg.txt'
01443                             f=open(msgf, 'w')
01444                             f.write(msg)
01445                             f.write('\n')
01446                             f.close()
01447                             cmd='/bin/mail -s "parallel job \''+jobname+\
01448                                 '\' finished" '+eml+' < '+msgf
01449                             os.system(cmd)
01450                              
01451                         for j in rmv:
01452                             self.remove_record(j)
01453 
01454         self._monitor_running = False
01455                             
01456     
01457     def start_monitor(self): 
01458         """Start monitoring  execution status of submitted no-block jobs
01459 
01460         Normally, one does not call this function directly. The init_cluster
01461         will call this function.
01462 
01463         """
01464         if not self._configdone:
01465             return
01466         self._monitor_on=True 
01467         return thread.start_new_thread(self.check_status, (True,))
01468     
01469     def stop_monitor(self): 
01470         """Stop monitoring execution status of submitted no-block jobs
01471 
01472         Normally, one does not call this function directly. 
01473 
01474         """
01475         if not self._configdone:
01476             return
01477         self._monitor_on=False
01478         while (self._monitor_running):
01479             time.sleep(1)
01480     
01481     def show_queue(self):
01482         """Display job queue.
01483 
01484         Example:
01485         CASA <2>: from simple_cluster import simple_cluster
01486         CASA <3>: sl=simple_cluster()
01487         CASA <4>: sl.init_cluster("my_cluster", "csplit")
01488         CASA <5>: sl.simple_split('/lustre/casa-store/hye/10B-209a_5s.ms/', 
01489                                   'you@nrao.edu:3rd split')
01490         CASA <6>: sl.show_queue()
01491 
01492         """
01493         if not self._configdone:
01494             return
01495         return self._cluster.queue_status()
01496     
01497     def get_status(self, long=False):
01498         """Display job execution status.
01499 
01500         Keyword arguments:
01501         long -- whether or not to display detailed execution status info
01502 
01503         Example:
01504         CASA <2>: from simple_cluster import simple_cluster
01505         CASA <3>: sl=simple_cluster()
01506         CASA <4>: sl.init_cluster("my_cluster", "csplit")
01507         CASA <5>: sl.simple_split('/lustre/casa-store/hye/10B-209a_5s.ms/', 
01508                                   'you@nrao.edu:3rd split')
01509         CASA <6>: sl.get_status()
01510         engine    status  time(s)     start  command   title
01511              0      done       31  16:41:56    split      15
01512              2 scheduled        0              split      78
01513              7      done       41  16:42:38    split      16
01514              9   running       51  16:42:59    split      17
01515              1      done       36  16:41:56    split      18
01516 
01517         """
01518 
01519         if not self._configdone:
01520             return
01521         if long:
01522             return self._jobs
01523         else:
01524             if len(self._jobs.keys())==0:
01525                 return self._jobs
01526             else:
01527                 print 'engine    status  time(s)     start  command   title'
01528                 for job in self._jobs.keys():
01529                     print "%6d%10s%9d%10s%9s%8d" % (self._jobs[job]['engine'], 
01530                            self._jobs[job]['status'],
01531                            int(self._jobs[job]['time']), 
01532                            '' if type(self._jobs[job]['start'])==str 
01533                               else
01534                                  time.strftime("%H:%M:%S", 
01535                                         time.localtime(self._jobs[job]['start'])),
01536                            self._jobs[job]['short'].strip()[:9],
01537                            self._jobs[job]['jobname'])
01538 
01539     def get_jobId(self, status):
01540         """Get a list of jobs of the given status
01541 
01542         Keyword arguments:
01543         status -- the job status or the job title
01544 
01545         Example:
01546         CASA <2>: from simple_cluster import simple_cluster
01547         CASA <3>: sl=simple_cluster()
01548         CASA <4>: sl.init_cluster("my_cluster", "csplit")
01549         CASA <5>: sl.simple_split('/lustre/casa-store/hye/10B-209a_5s.ms/', 
01550                                   'you@nrao.edu:3rd split')
01551         CASA <6>: sl.get_jobId('done')
01552 
01553         """
01554 
01555         if not self._configdone:
01556             return []
01557         if len(self._jobs.keys())==0:
01558             return [] 
01559         else:
01560             jobId=[] 
01561             for job in self._jobs.keys():
01562                 if self._jobs[job]['status']==status:
01563                     jobId.append(self._jobs[job]['jobname'])
01564             return jobId
01565 
01566     def remove_record(self, jobname=None):
01567         """Remove job execution status of a job.
01568 
01569         Keyword arguments:
01570         jobname -- the jobname or status of job(s) to be removed from display
01571 
01572         if jobName is not specified or is None all jobs are removed.
01573         """
01574 
01575         if not self._configdone:
01576             return
01577         
01578         for job in self._jobs.keys():
01579             if jobname is None:
01580                 del self._jobs[job]
01581             elif type(jobname)==int and self._jobs[job]['jobname']==jobname:
01582                del self._jobs[job]
01583             elif type(jobname)==str and self._jobs[job]['status']==jobname:
01584                del self._jobs[job]
01585     
01586     ###########################################################################
01587     ###   job distribution functions
01588     ###########################################################################
01589     def make_call(self, func, param):
01590         """Make a function call string with function name and parameters.
01591 
01592         Keyword arguments:
01593         func -- the name of the function
01594         param -- the dictionary of parameters and values
01595 
01596         Example:
01597         CASA <12>: param=dict()
01598         CASA <13>: param['vis']='NGC5921.ms'
01599         CASA <14>: param['spw']='4'
01600         CASA <15>: sl.make_call('flagdata', param)
01601           Out[15]: 'flagdata(vis="NGC5921.ms", spw=4)'
01602 
01603         """
01604         
01605         casalog.origin("simple_cluster")
01606         
01607         if type(func)!=str or type(param)!=dict:
01608             casalog.post("Func must be a str and param must be a dictionary","WARN","make_call")
01609             return None 
01610         cmd=func+'('
01611         for (k, v) in param.iteritems():
01612             cmd+=k+'='
01613             if type(v)==str:
01614                 cmd+='"'+v+'"'
01615             elif type(v)==np.ndarray:
01616                 cmd+=repr(v)
01617             else:
01618                 cmd+=str(v)
01619             cmd+=', '
01620         cmd=cmd[0:-2]
01621         cmd+=')'
01622         return cmd
01623     
01624     def do_and_record(self, cmd, id, group='', subMS=''):
01625         """Submit a function call to an engine and record its execution status.
01626 
01627         Keyword arguments:
01628         cmd -- the function call string
01629         id -- the id of the engine to be assigned 
01630         group -- the group this cmd belongs to and the receipt of notification
01631                  group can be an email address or a label of the job or both
01632                  separated by a ':'. Once all the jobs that has the same label
01633                  finish, an email notification will be sent.
01634 
01635         Example:
01636         CASA <12>: param=dict()
01637         CASA <13>: param['vis']='NGC5921.ms'
01638         CASA <14>: param['spw']='4'
01639         CASA <15>: cmd=sl.make_call('flagdata', param)
01640         CASA <17>: sl.do_and_record(cmd, 7, 'you@nrao.edu:flag ngc5921')
01641 
01642         """
01643         if not self._configdone:
01644             return
01645         job=self._cluster.odo(cmd, id)
01646         self._jobs[job]={}
01647         self._jobs[job]['start']=''
01648         self._jobs[job]['time']=0
01649         if self._enginesRWoffsets.has_key(id):
01650             self._jobs[job]['read_offset']=self._enginesRWoffsets[id]['read_offset']
01651             self._jobs[job]['write_offset']=self._enginesRWoffsets[id]['write_offset']
01652         else:
01653             self._jobs[job]['read_offset']=0
01654             self._jobs[job]['write_offset']=0
01655         self._jobs[job]['command']=cmd
01656         if len(cmd)<9:
01657             self._jobs[job]['short']=cmd
01658         else:
01659             self._jobs[job]['short']=cmd[:str.find(cmd, '(')]
01660         self._jobs[job]['subms']=subMS
01661         self._jobs[job]['status']="scheduled"
01662         self._jobs[job]['engine']=id
01663         self._jobs[job]['jobname']=self._job_title
01664         self._jobs[job]['jobgroup']=group
01665         self._job_title+=1
01666         return self._job_title-1
01667     
01668     ###########################################################################
01669     ###   result processing functions
01670     ###########################################################################
01671     
01672     def list_result(self):
01673         """read the project.result file and write out all labels
01674 
01675         Example:
01676         CASA <33>: sl.list_result
01677         Out[33]:
01678         ['#### new split ####################### Mon Mar 14 14:48:08 2011 ####',
01679          '#### flag ngc5921 #################### Wed Mar 16 10:43:12 2011 ####']
01680 
01681         """
01682         if not self._configdone:
01683             return
01684         f=open(self._project+'.result', 'r')
01685         s=f.readlines()
01686         vec=[]
01687         for line in s:
01688             sLine=line.rstrip()
01689             if str.find(sLine, '#### ')==0:
01690                 vec.append(sLine.strip())
01691             else:
01692                 continue
01693         f.close()
01694         return vec
01695     
01696     def get_result(self, tm):
01697         """read the project.result file and write out result for a label
01698 
01699         Keyword arguments:
01700         tm -- the result label
01701 
01702         Example:
01703         CASA <33>: sl.list_result
01704         Out[33]:
01705         ['#### new split ####################### Mon Mar 14 14:48:08 2011 ####',
01706          '#### flag ngc5921 #################### Wed Mar 16 10:43:12 2011 ####']
01707         CASA <34>: sl.get_result('new split')
01708         Out[34]:
01709         ....ommit...
01710 
01711         """
01712 
01713         if not self._configdone:
01714             return
01715         f=open(self._project+'.result', 'r')
01716         s=f.readlines()
01717         reach=False
01718         vec=[]
01719         for line in s:
01720             sLine=line.strip()
01721             if str.find(sLine, '#### ')==0:
01722                 if str.count(sLine, ' '+tm+' ')>0 and not reach:
01723                     reach=True
01724                 else:
01725                     reach=False
01726             else:
01727                 if reach and sLine!='' and not sLine.startswith('engine'):
01728                     vec.append(sLine)
01729         f.close()
01730         return vec
01731 
01732     def erase_result(self, tm):
01733         """read the project.result file and erase result for a label
01734 
01735         Keyword arguments:
01736         tm -- the result label
01737 
01738         Example:
01739         CASA <33>: sl.list_result
01740         Out[33]:
01741         ['#### new split ####################### Mon Mar 14 14:48:08 2011 ####',
01742          '#### flag ngc5921 #################### Wed Mar 16 10:43:12 2011 ####']
01743         CASA <34>: sl.erase_result('flag ngc5921')
01744         CASA <35>: sl.list_result
01745         Out[35]:
01746         ['#### new split ####################### Mon Mar 14 14:48:08 2011 ####']
01747 
01748         """
01749         if not self._configdone:
01750             return
01751         if type(tm)!=str or len(tm)==tm.count('#'):
01752             return 
01753         f=open(self._project+'.result', 'r')
01754         s=f.readlines()
01755         f.close()
01756         a=-1
01757         b=-1
01758         for line in xrange(len(s)):
01759             sLine=s[line].strip()
01760             if str.find(sLine, '#### ')==0:
01761                 if str.count(sLine, ' '+tm+' ')>0 and a==-1:
01762                     a=line
01763                 else:
01764                     b=line
01765         if a>-1 and b>a:
01766             f=open(self._project+'.result', 'w')
01767             f.writelines(s[:a])
01768             f.writelines(s[b:])
01769             f.close()
01770         return
01771     
01772     def get_output(self, result, item, **kwargs):
01773         """pick from result list the item that meets condistion in kwargs
01774 
01775         Keyword arguments:
01776         result -- the result label or the result from running get_result 
01777         item -- the result item to get
01778         kwargs -- the conditions to limit the result
01779 
01780         Example:
01781         CASA <33>: sl.list_result
01782           Out[33]:
01783         ['#### new split ####################### Mon Mar 14 14:48:08 2011 ####',
01784          '#### flag ngc5921 #################### Wed Mar 16 10:43:12 2011 ####']
01785         CASA <34>: sl.get_result('new split')
01786           Out[34]:
01787         ['10      done       30  split(vis="/lustre/casa-store/hye/10B-209a_5s.ms", outputvis="/home/casa-dev-10/hye/ptest/csplit/10B-209a_5s-f5-s10.ms", spw="10", datacolumn="DATA", field="J0738+1742")',
01788          '1      done       40  split(vis="/lustre/casa-store/hye/10B-209a_5s.ms", outputvis="/home/casa-dev-07/hye/ptest/csplit/10B-209a_5s-f3-s2.ms", spw="2", datacolumn="DATA", field="J0738+1742")',
01789          '2      done       75  split(vis="/lustre/casa-store/hye/10B-209a_5s.ms", outputvis="/home/casa-dev-07/hye/ptest/csplit/10B-209a_5s-f4-s10.ms", spw="10", datacolumn="DATA", field="2MJ0746")',
01790          ..... many other entries ...]
01791         CASA <35>: sl.get_output('new split', 'outputvis', field='3C84')
01792           Out[35]:
01793         ['/home/casa-dev-07/hye/ptest/csplit/10B-209a_5s-f1-s8.ms',
01794          '/home/casa-dev-10/hye/ptest/csplit/10B-209a_5s-f1-s14.ms',
01795          '/home/casa-dev-08/hye/ptest/csplit/10B-209a_5s-f0-s0.ms']
01796 
01797         """
01798         
01799         casalog.origin("simple_cluster")
01800         
01801         if not self._configdone:
01802             return
01803         if type(result)==str:
01804             result=self.get_result(result)
01805         if type(result)!=list:
01806             return []
01807         if len(result)==0:
01808             return []
01809         if type(item)!=str:
01810             casalog.post("The item name must be a string","WARN","get_output")
01811             return []
01812     
01813         vals=[]
01814         for key in kwargs:
01815             v=kwargs[key]
01816             if type(v)==str:
01817                 vals.append(str(key)+'="'+v+'"')
01818             elif type(v)==np.ndarray:
01819                 vals.append(str(key)+'='+repr(v))
01820             else:
01821                 vals.append(str(key)+'='+str(v))
01822     
01823         vec=[]
01824         for i in result:
01825             pick=True
01826             for j in vals:
01827                 if str.count(i, j)==0:
01828                     pick=False
01829             if pick:
01830                 a=str.find(i, item)
01831                 a=str.find(i, '=', a)
01832                 b=str.find(i, ',', a+1)
01833                 if a>=0 and b>=0:
01834                     vec.append(i[a+2:b-1])
01835         return vec
01836 
01837     def getVariables(self, varList, engine):
01838         """
01839         This method will return a list corresponding to all variables
01840         in the varList for the specified engine. This is a
01841         very thin wrapper around the pull method in the cluster.
01842         """
01843         if not isinstance(varList, list):
01844             varList = [varList]
01845 
01846         rtnVal = []
01847         for var in varList:
01848             pulled = self._cluster.pull(var, engine)
01849             rtnVal.append(pulled[engine])
01850 
01851         return rtnVal
01852         
01853     
01854     ###########################################################################
01855     ###   engine selection functions
01856     ###########################################################################
01857 
01858     def use_paths(self, dir_list=[]):
01859         """use engines that most close to the dirs (or ms)
01860 
01861         Keyword arguments:
01862         dir_list -- the result label
01863 
01864         Example:
01865         CASA <33>: sl.list_result
01866         Out[33]:
01867         ['#### new split ####################### Mon Mar 14 14:48:08 2011 ####',
01868         CASA <34>: sl.get_output('new split', 'outputvis', field='3C84')
01869           Out[34]:
01870         ['/home/casa-dev-07/hye/ptest/csplit/10B-209a_5s-f1-s8.ms',
01871          '/home/casa-dev-07/hye/ptest/csplit/10B-209a_5s-f0-s1.ms',
01872          '/home/casa-dev-07/hye/ptest/csplit/10B-209a_5s-f1-s9.ms']
01873         CASA <35>: sl.use_paths(
01874                      '/home/casa-dev-10/hye/ptest/csplit/10B-209a_5s-f1-s14.ms')
01875           Out[35]: [8]
01876 
01877         """
01878         
01879         casalog.origin("simple_cluster")
01880         
01881         if not self._configdone:
01882             return
01883         if len(dir_list)==0:
01884             casalog.post("dir_list can not be empty","WARN","use_paths")
01885             return []
01886         a=[]
01887         if type(dir_list)!=list:
01888             a.append(dir_list)
01889             dir_list=a
01890         if len(dir_list)>0:
01891             int_ok=True
01892             for i in dir_list:
01893                 if type(i)!=str:
01894                     int_ok=False
01895             if not int_ok:
01896                 casalog.post("path name in dir_list must be string","WARN","use_paths")
01897                 return []
01898     
01899         a=[]
01900         hst=self.get_hosts()
01901         for i in dir_list:
01902             int_ok=False
01903             for j in range(len(hst)):
01904                 if i.count(hst[j][2])>0:
01905                     a.append(hst[j][0])
01906                     int_ok=True
01907             if not int_ok:
01908                 casalog.post("Could not find a host for %s" % str(i),"WARN","use_paths")
01909                 return []
01910         
01911         e=dict()
01912         for k in xrange(len(hst)):
01913             h=hst[k][0]
01914             e[h]=[]
01915             for i in self._cluster.get_engines():
01916                 if i[1]==h: 
01917                     e[h].append(i[0])
01918         val=e.values()
01919         key=e.keys()
01920         lenth=[]
01921         pos=[]
01922         for i in xrange(len(val)):
01923             lenth.append(len(val[i]))
01924             pos.append(0)
01925         vec=[]
01926         for k in a:
01927             for s in xrange(len(e)):
01928                 if k==key[s]:
01929                     if pos[s]==lenth[s]:
01930                        pos[s]=0
01931                     vec.append(val[s][pos[s]])
01932                     pos[s]+=1
01933         return vec
01934 
01935     def use_hosts(self, host_list=[], engines_each=0):
01936         """use engines on the given nodes
01937 
01938         Keyword arguments:
01939         host_list -- the list of hosts
01940         engines_each -- number of engines to use on each host
01941 
01942         Example:
01943         CASA <45>: sl.get_hosts
01944           Out[45]:
01945                   [['casa-dev-07', 4, '/home/casa-dev-07/hye/ptest'],
01946                    ['casa-dev-08', 4, '/home/casa-dev-08/hye/ptest'],
01947                    ['casa-dev-10', 4, '/home/casa-dev-10/hye/ptest']]
01948         CASA <46>: sl.use_hosts(['casa-dev-07', 'casa-dev-10'], 2)
01949           Out[46]: [8, 9, 0, 1]
01950 
01951         """
01952         
01953         casalog.origin("simple_cluster")
01954 
01955         if not self._configdone:
01956             return
01957         if len(host_list)==0 and  engines_each==0:
01958             return self._cluster.get_ids()
01959 
01960         hst=self.get_hosts()
01961         a=[]
01962         if type(host_list)!=list:
01963             a.append(host_list)
01964             host_list=a
01965         if len(host_list)>0:
01966             int_ok=True
01967             for i in host_list:
01968                 if type(i)!=str:
01969                     int_ok=False
01970             if not int_ok:
01971                 casalog.post("host name in host_list must be string","WARN","use_hosts")
01972                 return []
01973         else:
01974             host_list=[]
01975             for j in range(len(hst)):
01976                 host_list.append(hst[j][0])
01977 
01978         for i in host_list:
01979             host_ok=False
01980             for j in range(len(hst)):
01981                 if hst[j][0]==i:
01982                     host_ok=True
01983             if not host_ok:
01984                 casalog.post("There is no host with name %s" % str(i),"WARN","use_hosts")
01985                 return []
01986 
01987        
01988         e=dict()
01989         for k in host_list:
01990             e[k]=[]
01991             for i in self._cluster.get_engines():
01992                 if i[1]==k: 
01993                     e[k].append(i[0])
01994         val=e.values()
01995         vec=[]
01996         if engines_each==0:
01997             engines_each=100
01998         for i in xrange(len(val)):
01999             j=0
02000             while j<min(engines_each, len(val[i])):
02001                 vec.append(val[i][j])
02002                 j+=1
02003         return vec
02004 
02005     def use_engines(self, use_id=[], spreadhost=1):
02006         """use engines on from a given list
02007 
02008         Keyword arguments:
02009         use_id -- the list of engine ids
02010         spreadhost -- whether to apply host first policy
02011 
02012         Example:
02013         CASA <52>: sl._cluster.get_ids()
02014           Out[52]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
02015         CASA <54>: sl.use_engines([0, 1, 2, 9])
02016           Out[54]: [0, 1, 2, 9]
02017         CASA <55>: sl.use_engines()
02018           Out[55]: [4, 8, 0, 5, 9, 1, 6, 10, 2, 7, 11, 3]
02019 
02020         """
02021         
02022         casalog.origin("simple_cluster")
02023         
02024         if not self._configdone:
02025             return
02026         if len(use_id)>0:
02027             int_ok=True
02028             for i in use_id:
02029                 if type(i)!=int:
02030                     int_ok=False
02031             if int_ok:
02032                 return use_id
02033             else:
02034                 casalog.post("Engine id in use_id must be integer","WARN","use_engines")
02035                 return []
02036         elif spreadhost==0:
02037             return self._cluster.get_ids()
02038         else:
02039             e=dict()
02040             hst=self.get_hosts()
02041             for i in range(len(hst)):
02042                 e[hst[i][0]]=[]
02043             for i in self._cluster.get_engines():
02044                 e[i[1]].append(i[0])
02045             val=e.values()
02046             lenth=[]
02047             pos=[]
02048             for i in xrange(len(val)):
02049                 lenth.append(len(val[i]))
02050                 pos.append(0)
02051             vec=[]
02052             while len(vec)<len(self._cluster.get_ids()):
02053                 for i in xrange(len(val)):
02054                    if pos[i]<lenth[i]:
02055                        vec.append(val[i][pos[i]])
02056                        pos[i]+=1
02057             return vec
02058     
02059     ###########################################################################
02060     ###   ms knowledge functions
02061     ###########################################################################
02062     def get_msname(self, vis):
02063         """get the ms name of given vis
02064 
02065         Keyword arguments:
02066         vis -- the path+name of visibility data
02067 
02068         Example:
02069         CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms"
02070         CASA <16>: sl.get_msname(vis)
02071           Out[18]: '10B-209a_5s'
02072 
02073         """
02074 
02075         vs=os.path.abspath(vis)
02076         msname=vs[str.rfind(vs,'/')+1:]
02077         if msname.endswith('.ms'):
02078             msname=msname[:str.rfind(msname, '.ms')]
02079         return msname
02080     
02081     def get_antenna_diam(self, vis):
02082         """get the diameter of antennas
02083 
02084         Keyword arguments:
02085         vis -- the path+name of visibility data
02086 
02087         Example:
02088         CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms"
02089         CASA <19>: sl.get_antenna_diam(vis)
02090           Out[19]: 25.0
02091 
02092         """
02093 
02094         tb.open(vis+'/ANTENNA')
02095         diams=tb.getcol('DISH_DIAMETER')
02096         diam=np.min(diams)
02097         if diam==0:
02098             diam=np.max(diams)
02099         tb.done()
02100         return diam
02101     
02102     def get_mean_reff(self, vis):
02103         """get the mean reference frequency
02104 
02105         Keyword arguments:
02106         vis -- the path+name of visibility data
02107 
02108         Example:
02109         CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms"
02110         CASA <20>: sl.get_mean_reff(vis)
02111           Out[20]: 6298222222.2222223
02112 
02113         """
02114 
02115         tb.open(vis+'/SPECTRAL_WINDOW')
02116         reff=tb.getcol('REF_FREQUENCY')
02117         tb.done()
02118         return reff.mean()
02119     
02120     def get_spw_reff(self, vis, spw=0):
02121         """get the reference frequency of spw
02122 
02123         Keyword arguments:
02124         vis -- the path+name of visibility data
02125         spw -- the spectral window id
02126 
02127         Example:
02128         CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms"
02129         CASA <21>: sl.get_spw_reff(vis, 8)
02130           Out[21]: 5056000000.0
02131 
02132         """
02133         
02134         casalog.origin("simple_cluster")
02135 
02136         tb.open(vis+'/SPECTRAL_WINDOW')
02137         if spw<0 or spw>=tb.nrows():
02138             casalog.post("Spectral window not available: %s" % str(spw),"WARN","get_spw_reff")
02139             return
02140         spw_reff=tb.getcell('REF_FREQUENCY', spw)
02141         tb.done()
02142         return spw_reff
02143     
02144     def get_spw_chan(self, vis, spw=0):
02145         """get the number of channels of spw
02146 
02147         Keyword arguments:
02148         vis -- the path+name of visibility data
02149         spw -- the spectral window id
02150 
02151         Example:
02152         CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms"
02153         CASA <24>: sl.get_spw_chan(vis, 8)
02154           Out[24]: 64
02155 
02156         """
02157         
02158         casalog.origin("simple_cluster")
02159         
02160         tb.open(vis+'/SPECTRAL_WINDOW')
02161         if spw<0 or spw>=tb.nrows():
02162             casalog.post("Spectral window not available: %s" % str(spw),"WARN","get_spw_chan")
02163             return
02164         spw_chan=tb.getcell('NUM_CHAN', spw)
02165         tb.done()
02166         return spw_chan
02167     
02168     def get_pol_corr(self, vis, pol=0):
02169         """get the number of coorelation of polarization 
02170 
02171         Keyword arguments:
02172         vis -- the path+name of visibility data
02173         pol -- the polarization id
02174 
02175         Example:
02176         CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms"
02177         CASA <31>: sl.get_pol_corr(vis, 0)
02178           Out[31]: 4
02179 
02180         """
02181         
02182         casalog.origin("simple_cluster")
02183         
02184         tb.open(vis+'/POLARIZATION')
02185         if pol<0 or pol>=tb.nrows():
02186             casalog.post("Polarization not available: %s" % str(pol),"WARN","get_pol_corr")
02187             return
02188         pol_corr=tb.getcell('NUM_CORR', pol)
02189         tb.done()
02190         return pol_corr
02191     
02192     def get_num_field(self, vis):
02193         """get the number of fields 
02194 
02195         Keyword arguments:
02196         vis -- the path+name of visibility data
02197 
02198         Example:
02199         CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms"
02200         CASA <32>: sl.get_num_field(vis)
02201           Out[32]: 6L
02202 
02203         """
02204 
02205         tb.open(vis+'/FIELD')
02206         num_field=tb.nrows()
02207         tb.done()
02208         return num_field
02209     
02210     def get_field_name(self, vis, id):
02211         """get the name of a field 
02212 
02213         Keyword arguments:
02214         vis -- the path+name of visibility data
02215         id -- the field id
02216 
02217         Example:
02218         CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms"
02219         CASA <35>: sl.get_field_name(vis, 5)
02220           Out[35]: 'J0738+1742'
02221 
02222         """
02223         
02224         casalog.origin("simple_cluster")
02225         
02226         tb.open(vis+'/FIELD')
02227         if id<0 or id>=tb.nrows():
02228             casalog.post("Field not available: %s" % str(id),"WARN","get_field_name") 
02229             return
02230         fn=tb.getcell('NAME', id)
02231         tb.done()
02232         return fn
02233     
02234     def get_num_spw(self, vis):
02235         """get the number of spectral windows 
02236 
02237         Keyword arguments:
02238         vis -- the path+name of visibility data
02239 
02240         Example:
02241         CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms"
02242         CASA <36>: sl.get_num_spw(vis)
02243           Out[36]: 18L
02244 
02245         """
02246 
02247         tb.open(vis+'/SPECTRAL_WINDOW')
02248         num_spw=tb.nrows()
02249         tb.done()
02250         return num_spw
02251     
02252     def get_num_desc(self, vis):
02253         """get number of data descriptions
02254 
02255         Keyword arguments:
02256         vis -- the path+name of visibility data
02257 
02258         Example:
02259         CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms"
02260         CASA <37>: sl.get_num_desc(vis)
02261           Out[37]: 18L
02262 
02263         """
02264 
02265         tb.open(vis+'/DATA_DESCRIPTION')
02266         num_desc=tb.nrows()
02267         tb.done()
02268         return num_desc
02269     
02270     def get_spw_id(self, vis, desc=0):
02271         """get spectral window id for desc
02272 
02273         Keyword arguments:
02274         vis -- the path+name of visibility data
02275         desc -- the data description id
02276 
02277         Example:
02278         CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms"
02279         CASA <38>: sl.get_spw_id(vis, 17)
02280           Out[38]: 17
02281 
02282         """
02283         
02284         casalog.origin("simple_cluster")
02285 
02286         tb.open(vis+'/DATA_DESCRIPTION')
02287         if desc<0 or desc>=tb.nrows():
02288             casalog.post("DDI not available: %s" % str(desc),"WARN","get_spw_id") 
02289             return
02290         spw_id=tb.getcell('SPECTRAL_WINDOW_ID', desc)
02291         tb.done()
02292         return spw_id
02293     
02294     def get_pol_id(self, vis, desc=0):
02295         """get polarization id for desc
02296 
02297         Keyword arguments:
02298         vis -- the path+name of visibility data
02299         desc -- the data description id
02300 
02301         Example:
02302         CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms"
02303         CASA <39>: sl.get_pol_id(vis, 17)
02304           Out[39]: 0
02305 
02306         """
02307         
02308         casalog.origin("simple_cluster")
02309 
02310         tb.open(vis+'/DATA_DESCRIPTION')
02311         if desc<0 or desc>=tb.nrows():
02312             casalog.post("DDI not available: %s" % str(desc),"WARN","get_pol_id") 
02313             return
02314         pol_id=tb.getcell('POLARIZATION_ID', desc)
02315         tb.done()
02316         return pol_id
02317     
02318     def get_field_desc(self, vis):
02319         """get source
02320 
02321         Keyword arguments:
02322         vis -- the path+name of visibility data
02323 
02324         Example:
02325         CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms"
02326         CASA <40>: sl.get_field_desc(vis)
02327           Out[40]:
02328         {(0, 0): {'cost': [0, 82],
02329                   'desc': 0,
02330                   'field': 0,
02331                   'nchan': 64,
02332                   'ncorr': 4,
02333                   'nrows': 16848,
02334                   'pol': 0,
02335                   'spw': 0},
02336          (0, 1): {'cost': [0, 82],
02337                   'desc': 1,
02338                   'field': 0,
02339                   'nchan': 64,
02340                   'ncorr': 4,
02341                   'nrows': 16848,
02342                   'pol': 0,
02343                   'spw': 1}
02344          ... ommit ...
02345         }
02346 
02347         """
02348 
02349         tb.open(vis)
02350         nrows=tb.nrows() 
02351         tb.done()
02352         k=nrows/1000
02353         t=nrows%1000
02354         if t>0:
02355            k+=1
02356         b={}
02357         tb.open(vis)
02358         for i in xrange(k):
02359             field=tb.getcol('FIELD_ID', i*1000, 1000)
02360             desc=tb.getcol('DATA_DESC_ID', i*1000, 1000)
02361             fd=zip(field, desc)
02362             newset=set(fd)
02363             for j in newset:
02364                 if b.has_key(j):
02365                    b[j]['nrows']=b[j]['nrows']+fd.count(j)
02366                 else:
02367                    a={}
02368                    a['field']=j[0]
02369                    a['desc']=j[1]
02370                    a['nrows']=fd.count(j)
02371                    b[j]=a
02372         tb.done()
02373         d=0
02374         for i in b.keys():
02375             b[i]['pol']=self.get_pol_id(vis, b[i]['desc'])
02376             b[i]['spw']=self.get_spw_id(vis, b[i]['desc'])
02377             b[i]['ncorr']=self.get_pol_corr(vis, b[i]['pol'])
02378             b[i]['nchan']=self.get_spw_chan(vis, b[i]['spw'])
02379             b[i]['cost']=b[i]['nchan']*b[i]['ncorr']*b[i]['nrows']
02380             d+=b[i]['cost']
02381         d/=len(b.keys()) 
02382         for i in b.keys():
02383             b[i]['cost']=[b[i]['cost']/d, len(b.keys())]
02384         return b
02385     
02386     
02387     ###########################################################################
02388     ###   setup -  
02389     ###########################################################################
02390     def init_cluster(self, clusterfile='', project=''):
02391         """Setup the cluster
02392 
02393         Keyword arguments:
02394         clusterfile -- the cluster definition file
02395         project -- the name of project (default: 'proj'+timestamp).  
02396 
02397         A configuration file is an ASCII text file. Each line defines a node
02398         (also called host) with one line per host to be used, and the following 
02399         format: 
02400         
02401         - <hostname>, <number of engines>, <work directory>
02402         - <hostname>, <number of engines>, <work directory>, <fraction of total RAM>
02403         - <hostname>, <number of engines>, <work directory>, <fraction of total RAM>, <RAM per engine>
02404         
02405         where the interpretation of the parameters is as follows: 
02406         
02407         - hostname: Hostname of the target node where the cluster is deployed 
02408         
02409           NOTE: The hostname has to be provided w/o quotes
02410         
02411         - number of engines: Supports in turns 3 different formats 
02412         
02413             * If provided as an integer >1: It is interpreted as 
02414               the actual user-specified maximum number of engines
02415               
02416             * If provided as an integer =0: It will deploy as maximum 
02417               engines as possible, according to the idle CPU capacity 
02418               available at the target node
02419               
02420             * If provided as a float between 0 and 1: It is interpreted 
02421               as the percentage of idle CPU capacity that the cluster 
02422               can use in total at the target node
02423 
02424         - work directory: Area in which the cluster will put intermediate 
02425           files such as log files, configuration files, and monitoring files
02426         
02427           NOTE1: This area has to be accessible from the controller (user) machine, 
02428                  and mounted in the same path of the filesystem 
02429                  
02430           NOTE2: The path name has to be provided w/o quotes
02431         
02432         - fraction of total RAM: Supports in turns 3 different formats:
02433         
02434             * If provided as an integer >1: It is interpreted as the actual 
02435               user-specified maximum amount of RAM to be used in total at 
02436               the target node
02437               
02438             * If provided as an integer =0: It will deploy as maximum engines 
02439               as possible, according to the free RAM available at target node
02440               
02441             * If provided as a float between 0 and 1: It is interpreted as 
02442               the percentage of free RAM that the cluster can use in total 
02443               at the target node
02444             
02445         - RAM per engine: Integer, which is interpreted as the required memory 
02446           per engine in MB (default is 512MB) 
02447           
02448         It is also possible to add comments, by using the # character at the 
02449         beginning of the line. Example:
02450         
02451         #####################################################
02452         
02453         # CASA cluster configuration file for expert user
02454         orion, 10, /home/jdoe/test/myclusterhome1
02455         m42, 4, /home/jdoe/test/myclusterhome2, 0.6, 1024
02456         antares, 0.6, /home/jdoe/test/myclusterhome3, 0, 2048
02457         
02458         #####################################################
02459         
02460         - At host ``orion'': It will deploy up to 10 engines, with working 
02461           directory /home/jdoe/test/myclusterhome1, and using as much free 
02462           RAM available as possible (up to 90% by default), taking into 
02463           account that each engine can use up to 512 MB (the default and minimum)
02464           
02465         - At host ``m42'': It will deploy up to 4 engines, with working directory 
02466           /home/jdoe/test/myclusterhome2, and using at the most 60% of the free RAM 
02467           available, taking into account that each engine can use up to 1024 MB.   
02468           
02469         - At host ``antares'': It will deploy as many engines as possible, with 
02470           working directory /home/jdoe/test/myclusterhome3, using up to 60% of the 
02471           idle CPU capacity / cores, and as much free RAM available as possible 
02472           (up to 90% by default), taking into account that each engine can use up 
02473           to 2048 MB.  
02474 
02475         Example:
02476         CASA <15>: from simple_cluster import *
02477         CASA <16>: sl=simple_cluster()
02478         CASA <17>: sl.init_cluster('cluster-config.txt', 'ProjectName')
02479 
02480         """
02481         
02482         casalog.origin("simple_cluster")
02483 
02484         if project==None or type(project)!=str or project.strip()=="":
02485             # Project name must be a non-empty string, otherwise set default
02486             project='cluster_project'
02487             casalog.post("No project specified using default project: " +\
02488                          "cluster_project", "WARN","init_cluster")
02489     
02490         if clusterfile==None or type(clusterfile)!=str or clusterfile.strip()=="":
02491             # Cluster file name must be a non-empty string, otherwise generate a default clusterfile
02492             # The default cluster should have:
02493             #    * One engine for each core on the current system
02494             #    * The working directory should be the cwd
02495             (sysname, nodename, release, version, machine)=os.uname()
02496             msg=nodename+', '+str(0)+', '+os.getcwd()
02497             # jagonzal (CAS-4293): Write default cluster config file in the 
02498             # current directory to avoid problems with writing permissions
02499             clusterfile='default_cluster'
02500             f=open(clusterfile, 'w')
02501             f.write(msg)
02502             f.close()
02503             self.__localCluster = True
02504     
02505         self.config_cluster(clusterfile, True)
02506         if not self._configdone:
02507             self.__running = False
02508             return False
02509         
02510         self.create_project(project)
02511         
02512         self.start_cluster()
02513 
02514         # Put the cluster object into the global namespace
02515         sys._getframe(len(inspect.stack())-1).f_globals['procCluster'] = self
02516         
02517         # Set running status
02518         self.__running = True
02519         
02520         return True
02521         
02522     
02523     ###########################################################################
02524     ###   example to distribute clean task over engines
02525     ###########################################################################
02526     def simple_clean(self, vs, nx, ny, mode='channel', email=''):
02527         """Make images with a simple cluster
02528 
02529         Keyword arguments:
02530         vs -- the visibility data
02531         nx, ny -- the size (pixels) of the image
02532         mode -- either 'channel' or 'continuum'
02533         email -- the email address to notify the completion
02534 
02535         Example:
02536         CASA <15>: from simple_cluster import *
02537         CASA <16>: sl=simple_cluster()
02538         CASA <17>: sl.init_cluster('my_cluster', 'aProj')
02539         CASA <18>: simple_clean(
02540             vis='/home/casa-dev-09/hye/ptest/sim.alma.csv.mid.ms', 
02541             nx=256, ny=256, mode='channel')
02542 
02543         """
02544         
02545         vis=os.path.abspath(vs)
02546         tb.clearlocks(vis)
02547     
02548         # Determine the cell size
02549         diam=self.get_antenna_diam(vis)
02550         freqmean=self.get_mean_reff(vis)
02551         casalog.post("vis: %s Diameter: %s FreqMean: %s" % (vis,str(diam),str(freqmean)),"INFO","simple_clean")
02552         fv=(3.0e8/freqmean/diam)*180*60*60/math.pi
02553         cell=[str(fv/nx)+'arcsec', str(fv/ny)+'arcsec']
02554     
02555         fdspw=self.get_field_desc(vis)
02556         ids=self._cluster.get_ids()
02557         msname=self.get_msname(vis)
02558     
02559         if len(fdspw)>len(ids):
02560             # More job chunks than engines, simply distribute by field and spw
02561             i=0
02562             for k in fdspw.values():
02563                 id_i=ids[i]
02564                 s={}
02565                 s['vis']=vis
02566                 fd=str(k['field'])
02567                 spw=str(k['spw'])
02568                 s['imagename']=self.get_engine_store(id_i)+msname+'-f'+fd+'-s'+spw
02569                 s['field']=fd
02570                 s['spw']=spw
02571                 s['mode']=mode
02572                 s['niter']=20000
02573                 s['threshold']='0.001mJy'
02574                 s['psfmode']='hogbom'
02575                 s['imagermode']='csclean'
02576                 s['imsize']=[nx, ny]
02577                 s['cell']=cell
02578                 s['calready']=False
02579                 cmd=self.make_call('clean', s) 
02580                 self.do_and_record(cmd, id_i, email)
02581                 i+=1
02582                 if i==len(ids):
02583                    i=0
02584         else:
02585             # Less job chanks than engines, need further devide by channel
02586             i=0
02587             for k in fdspw.values():
02588                 spwchan=self.get_spw_chan(vis, k['spw'])
02589         
02590                 nengs=len(ids)
02591                 nchan=1
02592                 try:
02593                     nchan=int(ceil(abs(float(spwchan))/nengs))
02594                 except:
02595                     # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
02596                     # traceback.print_tb(sys.exc_info()[2])
02597                     pass
02598         
02599                 for j in xrange(len(ids)):
02600                     id_i=ids[i]
02601                     start=j*nchan
02602                     s={}
02603                     s['vis']=vis
02604                     fd=str(k['field'])
02605                     spw=str(k['spw'])
02606                     s['imagename']=(self.get_engine_store(id_i)+msname+'-f'+fd+
02607                              '-s'+spw+'-b'+str(start)+'-e'+str(start+nchan))
02608                     s['field']=fd
02609                     s['spw']=spw
02610                     s['mode']='channel'
02611                     s['niter']=20000
02612                     s['start']=start
02613                     s['nchan']=nchan
02614                     s['threshold']='0.001mJy'
02615                     s['psfmode']='hogbom'
02616                     s['imagermode']='csclean'
02617                     s['imsize']=[nx, ny]
02618                     s['cell']=cell
02619                     s['calready']=False
02620                     cmd=self.make_call('clean', s) 
02621                     self.do_and_record(cmd, id_i, email)
02622                     i+=1
02623                     if i==len(ids):
02624                        i=0
02625         
02626         self.get_status()
02627     
02628     def simple_split(self, vs, email):
02629         """split by source (field, spw) with parallel engines
02630 
02631         Keyword arguments:
02632         vs -- the visibility data
02633         email -- the email address to notify the completion
02634 
02635         Example:
02636         CASA <15>: from simple_cluster import *
02637         CASA <16>: sl=simple_cluster()
02638         CASA <17>: sl.init_cluster('my_cluster', 'aProj')
02639         CASA <18): vis='/home/casa-dev-09/hye/ptest/sim.alma.csv.mid.ms', 
02640         CASA <18>: simple_split(vis)
02641 
02642         """
02643         vis=os.path.abspath(vs)
02644         tb.clearlocks(vis)
02645        
02646         casalog.post("vis: %s" % vis,"INFO","simple_split")
02647         fdspw=self.get_field_desc(vis)
02648         ids=self.use_engines()
02649         msname=self.get_msname(vis)
02650     
02651         i=0
02652         for k in fdspw.values():
02653             id_i=ids[i]
02654             s={}
02655             s['vis']=vis
02656             fd=str(k['field'])
02657             spw=str(k['spw'])
02658             s['outputvis']=self.get_engine_store(id_i)+msname+'-f'+ \
02659                                fd+'-s'+spw+'.ms'
02660             if os.path.exists(s['outputvis']):
02661                 os.system('rm -rf '+s['outputvis'])
02662             s['field']=self.get_field_name(vis, k['field']) 
02663             s['spw']=spw
02664             s['datacolumn']='DATA'
02665             cmd=self.make_call('split', s) 
02666             self.do_and_record(cmd, id_i, email)
02667             i+=1
02668             if i==len(ids):
02669                i=0
02670         self.get_status()
02671     
02672 #if __name__ == "__main__":
02673     #from simple_cluster import *
02674     #sl=simple_cluster()
02675     #sl.init_cluster('my_cluster', 'aProj')
02676     #vis='/home/casa-dev-09/hye/ptest/sim.alma.csv.mid.ms', 
02677     #simple_split(vis)
02678 
02679     #get_status()
02680 
02681     #init_cluster('my_cluster', 'my_project')
02682     #simple_clean(vis='/home/casa-dev-09/hye/ptest/sim.alma.csv.mid.ms', 
02683     #             nx=256, ny=256, mode='channel')
02684   
02685     #simple_clean(vis='/home/casa-dev-09/hye/para/sim.alma.csv.mid.ms', 
02686     #             nx=256, ny=256, mode='channel')
02687       
02688     #simple_clean(vis='/home/casa-dev-09/hye/pclean/sim100g_4chan15kRows.ms',
02689     #             nx=256, ny=256, mode='channel')
02690         
02691 
02692 
02693 ###########################################################################
02694 ###   job management classes
02695 ###########################################################################
02696 
02697 class JobData:
02698     """
02699     This class incapsulates a single job.  The commandName is the name
02700     of the task to be executed.  The jobInfo is a dictionary of all
02701     parameters that need to be handled.
02702     """
02703     class CommandInfo:
02704 
02705         def __init__(self, commandName, commandInfo, returnVariable):
02706             self.commandName = commandName
02707             self.commandInfo = commandInfo
02708             self.returnVariable = returnVariable
02709 
02710         def getReturnVariable(self):
02711             return self.returnVariable
02712         
02713         def getCommandLine(self):
02714             firstArgument = True
02715             output = "%s = %s(" % (self.returnVariable, self.commandName)
02716             for (arg,value) in self.commandInfo.items():
02717                 if firstArgument:
02718                     firstArgument = False
02719                 else:
02720                     output += ', '
02721                 if isinstance(value, str):
02722                     output += ("%s = '%s'" % (arg, value))
02723                 else:
02724                     output += ("%s = " % arg) + str(value)
02725             output += ')'
02726             return output
02727     
02728     
02729     def __init__(self, commandName, commandInfo = {}):
02730         self._commandList = []
02731         self.status  = 'new'
02732         self.addCommand(commandName, commandInfo)
02733         self._returnValues = None
02734             
02735 
02736     def addCommand(self, commandName, commandInfo):
02737         """
02738         Add an additional command to this Job to be exectued after
02739         previous Jobs.
02740         """
02741         rtnVar = "returnVar%d" % len(self._commandList)
02742         self._commandList.append(JobData.CommandInfo(commandName,
02743                                                      commandInfo,
02744                                                      rtnVar))
02745     def getCommandLine(self):
02746         """
02747         This method will return the command line(s) to be executed on the
02748         remote engine.  It is usually only needed for debugging or for
02749         the JobQueueManager.
02750         """
02751         output = ''
02752         for idx in xrange(len(self._commandList)):
02753             if idx > 0:
02754                 output += '; '
02755             output += self._commandList[idx].getCommandLine()
02756         return output
02757 
02758     def getCommandNames(self):
02759         """
02760         This method will return a list of command names that are associated
02761         with this job.
02762         """
02763         return [command.commandName for command in self._commandList]
02764     
02765 
02766     def getCommandArguments(self, commandName = None):
02767         """
02768         This method will return the command arguments associated with a
02769         particular job.
02770            * If commandName is not none the arguments for the command with
02771              that name are returned.
02772            * Otherwise a dictionary (with keys being the commandName and
02773              the value being the dictionary of arguments) is returned.
02774            * If there is only a single command the arguments for that
02775              command are returned as a dictionary.
02776         """
02777         returnValue = {}
02778         for command in self._commandList:
02779             if commandName is None or commandName == command.commandName:
02780                 returnValue[command.commandName] = command.commandInfo
02781                                                    
02782         if len(returnValue) == 1:
02783             return returnValue.values()[0]
02784         return returnValue
02785     
02786     def getReturnVariableList(self):
02787         return [ci.returnVariable for ci in self._commandList]
02788 
02789     def setReturnValues(self, valueList):
02790         self._returnValues = valueList
02791 
02792     def getReturnValues(self):
02793         if self._returnValues is not None:
02794             if len(self._returnValues) == 1:
02795                 return self._returnValues[0]
02796         return self._returnValues
02797 
02798 class JobQueueManager:
02799     def __init__(self, cluster = None):
02800         self.__cluster=cluster
02801         if self.__cluster is None:
02802             self.__cluster = simple_cluster.getCluster()
02803         # jagonzal: Create a cross reference for the simple_cluster using this JobQueue
02804         self.__cluster._JobQueueManager = self
02805         self.__inputQueue = []
02806         self.__outputQueue = {}
02807 
02808     def addJob(self, jobData):
02809         """
02810         Add another JobData object to the queue of jobs to be executed.
02811         """
02812         # TODO Check the type of jobData
02813         if not isinstance(jobData,list):
02814             jobData = [jobData]
02815         for job in jobData:
02816             job.status='pending'
02817             self.__inputQueue.append(job)
02818 
02819     def clearJobs(self):
02820         """
02821         Remove all jobs from the queue, this is usually a good idea
02822         before reusing a JobQueueManager.
02823         """
02824         self.__inputQueue = []
02825         self.__outputQueue = {}
02826         
02827     def getOutputJobs(self, status = None):
02828         """
02829         This returns all jobs in the output queue which
02830         match the specified status.  If no status is specified
02831         the entire list of output jobs is returned.
02832         """
02833         if status is None:
02834             return self.__outputQueue.values()
02835 
02836         returnList = []
02837         for job in self.__outputQueue.values():
02838             if job.status == status:
02839                 returnList.append(job)
02840         return returnList
02841         
02842     def getAllJobs(self):
02843         return self.__outputQueue.values()
02844 
02845     def executeQueue(self):
02846         """
02847         This method causes all jobs to be executed on the available engines
02848         It will block until the jobs are complete.
02849         """
02850         
02851         casalog.origin("simple_cluster")
02852         
02853         # Clear the queue of any existing results
02854         self.__cluster.remove_record()
02855         
02856         engineList = self.__cluster.use_engines()
02857 
02858         # jagonzal (CAS-): When there are 0 engines available an error must have happened
02859         if (len(engineList) < 1):
02860             casalog.post("There are 0 engines available, check the status of the cluster","WARN","executeQueue")
02861             return
02862 
02863         casalog.post("Executing %d jobs on %d engines" %
02864                      (len(self.__inputQueue), len(engineList)), "INFO","executeQueue")
02865 
02866         while len(self.__inputQueue) > 0:
02867             self._checkForCompletedJobs(engineList)
02868 
02869             for job in self.__inputQueue[:len(engineList)]:
02870                 self.__inputQueue.remove(job)
02871                 # This stores each job in the output queue indexed by it's JobID (name)
02872                 self.__outputQueue[self.__cluster.do_and_record\
02873                                    (job.getCommandLine(), 
02874                                     engineList.pop(),
02875                                     subMS=job.getCommandArguments()['vis'].split('/').pop())
02876                                    ]=job
02877 
02878             # Wait for a bit then try again
02879             time.sleep(1)
02880 
02881         # Now we need to wait for the rest of the jobs to complete
02882         while self._checkForCompletedJobs(engineList):
02883             time.sleep(1)
02884 
02885 
02886     def _checkForCompletedJobs(self, engineList):
02887         """ This method will look at all jobs in the status, if they are
02888         marked as done it will:
02889            * update the outputQueue
02890            * add the engine back to the engine list
02891            * remove the report from the status
02892         """
02893         statusReport = self.__cluster.get_status(True).values()
02894         if len(statusReport) == 0:
02895             # Nothing running
02896             return False
02897 
02898         # jagonzal: This seems to be a hook between the cluster queue ang the JobManager queue
02899         for job in self.__cluster.get_status(True).values():
02900             # Update the status of the Job
02901             self.__outputQueue[job['jobname']].status = job['status']
02902             
02903             if job['status'] == 'done' or job['status'] == 'broken':
02904                 if job['status'] == 'done':
02905                     # Get the return values is we're successful
02906                     self.__outputQueue[job['jobname']].setReturnValues \
02907                        (self.__cluster.getVariables\
02908                         (self.__outputQueue[job['jobname']].\
02909                          getReturnVariableList(),job['engine']))
02910                     
02911                 engineList.append(job['engine'])
02912                 self.__cluster.remove_record(job['jobname'])
02913         return True