casa
$Rev:20696$
|
00001 import os 00002 import math 00003 import time 00004 import thread 00005 import commands 00006 import numpy as np 00007 from taskinit import * 00008 from tasksinfo import * 00009 from parallel_go import * 00010 00011 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 00012 import traceback 00013 00014 class simple_cluster: 00015 """The simple_cluster creates and maintains an ipcluster environment 00016 for controlling parallel execution of casa tasks (tools and scripts) 00017 """ 00018 00019 __default_mem_per_engine = 512 00020 __default_mem_fraction = 0.9 00021 __default_cpu_fraction = 0.9 00022 00023 00024 def __init__(self,monitoringFile='monitoring.log',verbose=False): 00025 00026 self._project="" 00027 self._hosts=[] 00028 self._jobs={} 00029 self._rsrc={} 00030 self._job_title=1 00031 self._monitor_on=True 00032 self._monitor_running=False 00033 self._monitoringFile=monitoringFile 00034 self._verbose=verbose 00035 self._resource_on=True 00036 self._resource_running=False 00037 self._configdone=False 00038 self._cluster=cluster() 00039 self._JobQueueManager=None 00040 self._enginesRWoffsets={} 00041 self.__localCluster = False 00042 self.__running = False 00043 # jagonzal: This is basically the destructor (i.e. for graceful finalization) 00044 atexit.register(simple_cluster.stop_cluster,self) 00045 00046 # jagonzal (CAS-4372): By-pass ssh redirection when deploying engines in localhost 00047 def shell(self, hostname): 00048 """Creates the command line to execute the give command on the given host. 00049 If and only if the host is not localhost, ssh is used.""" 00050 00051 if self.uniqueIP(hostname) == self.uniqueIP("localhost"): 00052 return "eval " 00053 else: 00054 # f: Requests ssh to go to background just before command execution. 00055 # q: Quiet mode. Causes all warning and diagnostic messages to be suppressed. 00056 # x: Disables X11 forwarding. 00057 return "ssh -fqx " + hostname 00058 00059 def uniqueIP(self, hostname): 00060 """Returns a unique IP address of the given hostname, 00061 i.e. not 127.0.0.1 for localhost but localhost's global IP""" 00062 00063 ip = socket.gethostbyname(hostname) 00064 00065 if ip == "127.0.0.1": 00066 ip = socket.gethostbyname(socket.getfqdn()) 00067 00068 return ip 00069 00070 #################################################################### 00071 # Static method that returns whatever the current definition of a 00072 # cluster is. If none is defined the default cluster is created, 00073 # initialized and returned. 00074 #################################################################### 00075 @staticmethod 00076 def getCluster(): 00077 # This method will check for a cluster existing at the global 00078 # scope. If it does not exist then a default cluster will be 00079 # created. 00080 # The cluster object is returned. 00081 myf = sys._getframe(len(inspect.stack())-1).f_globals 00082 if not 'procCluster' in myf.keys(): 00083 sc = simple_cluster() 00084 if (sc.init_cluster()): 00085 return myf['procCluster'] 00086 else: 00087 return None 00088 else: 00089 sc = myf['procCluster'] 00090 if not sc.isClusterRunning(): 00091 if (sc.init_cluster()): 00092 return myf['procCluster'] 00093 else: 00094 return None 00095 else: 00096 return myf['procCluster'] 00097 00098 @staticmethod 00099 def setDefaults(default_mem_per_engine=512,default_mem_fraction=0.9,default_cpu_fraction=0.9): 00100 00101 simple_cluster.__default_mem_per_engine = default_mem_per_engine 00102 simple_cluster.__default_mem_fraction = default_mem_fraction 00103 simple_cluster.__default_cpu_fraction = default_cpu_fraction 00104 00105 def isClusterRunning(self): 00106 return self.__running 00107 00108 ########################################################################### 00109 ### cluster verifiction 00110 ########################################################################### 00111 def config_cluster(self, cfg, force=False): 00112 """Read the configuration file and validate cluster definitions. 00113 00114 Keyword arguments: 00115 cfg -- the name of cluster configuration file 00116 force -- whether or not to reconfigure if a configured cluster exists 00117 00118 A configuration file is an ASCII text file. Each line defines a node 00119 (also called host) with one line per host to be used, and the following 00120 format: 00121 00122 - <hostname>, <number of engines>, <work directory> 00123 - <hostname>, <number of engines>, <work directory>, <fraction of total RAM> 00124 - <hostname>, <number of engines>, <work directory>, <fraction of total RAM>, <RAM per engine> 00125 00126 where the interpretation of the parameters is as follows: 00127 00128 - hostname: Hostname of the target node where the cluster is deployed 00129 00130 NOTE: The hostname has to be provided w/o quotes 00131 00132 - number of engines: Supports in turns 3 different formats 00133 00134 * If provided as an integer >1: It is interpreted as 00135 the actual user-specified maximum number of engines 00136 00137 * If provided as an integer =0: It will deploy as maximum 00138 engines as possible, according to the idle CPU capacity 00139 available at the target node 00140 00141 * If provided as a float between 0 and 1: It is interpreted 00142 as the percentage of idle CPU capacity that the cluster 00143 can use in total at the target node 00144 00145 - work directory: Area in which the cluster will put intermediate 00146 files such as log files, configuration files, and monitoring files 00147 00148 NOTE1: This area has to be accessible from the controller (user) machine, 00149 and mounted in the same path of the filesystem 00150 00151 NOTE2: The path name has to be provided w/o quotes 00152 00153 - fraction of total RAM: Supports in turns 3 different formats: 00154 00155 * If provided as an integer >1: It is interpreted as the actual 00156 user-specified maximum amount of RAM to be used in total at 00157 the target node 00158 00159 * If provided as an integer =0: It will deploy as maximum engines 00160 as possible, according to the free RAM available at target node 00161 00162 * If provided as a float between 0 and 1: It is interpreted as 00163 the percentage of free RAM that the cluster can use in total 00164 at the target node 00165 00166 - RAM per engine: Integer, which is interpreted as the required memory 00167 per engine in MB (default is 512MB) 00168 00169 It is also possible to add comments, by using the # character at the 00170 beginning of the line. Example: 00171 00172 ##################################################### 00173 00174 # CASA cluster configuration file for expert user 00175 orion, 10, /home/jdoe/test/myclusterhome1 00176 m42, 4, /home/jdoe/test/myclusterhome2, 0.6, 1024 00177 antares, 0.6, /home/jdoe/test/myclusterhome3, 0, 2048 00178 00179 ##################################################### 00180 00181 - At host ``orion'': It will deploy up to 10 engines, with working 00182 directory /home/jdoe/test/myclusterhome1, and using as much free 00183 RAM available as possible (up to 90% by default), taking into 00184 account that each engine can use up to 512 MB (the default and minimum) 00185 00186 - At host ``m42'': It will deploy up to 4 engines, with working directory 00187 /home/jdoe/test/myclusterhome2, and using at the most 60% of the free RAM 00188 available, taking into account that each engine can use up to 1024 MB. 00189 00190 - At host ``antares'': It will deploy as many engines as possible, with 00191 working directory /home/jdoe/test/myclusterhome3, using up to 60% of the 00192 idle CPU capacity / cores, and as much free RAM available as possible 00193 (up to 90% by default), taking into account that each engine can use up 00194 to 2048 MB. 00195 00196 Normally, one does not call this function directly. 00197 The init_cluster function will trigger this function. 00198 00199 """ 00200 00201 casalog.origin("simple_cluster") 00202 00203 if (len(self._hosts)>0 or self._configdone) and not force: 00204 casalog.post("Cluster already configured","WARN","config_cluster") 00205 return 00206 self._hosts=[] 00207 self._jobs={} 00208 configfile=open(cfg,'r') 00209 s=configfile.readlines() 00210 for line in s: 00211 sLine=line.rstrip() 00212 if str.find(sLine, '#')==0: 00213 continue 00214 xyzd=str.split(sLine, ',') 00215 if len(xyzd)<3: 00216 casalog.post("Node config should be at least: 'hostname, numengine, workdir'","WARN","config_cluster") 00217 casalog.post("The following entry will be ignored: %s" % sLine,"WARN","config_cluster") 00218 continue 00219 # jagonzal (CAS-4276): (Max) number of engines is not an integer any more 00220 # try: 00221 # a=int(xyzd[1]) 00222 # except: 00223 # print 'the numofengines should be a integer instead of:', xyzd[1] 00224 # print 'this entry will be ignored' 00225 # continue 00226 [a, b, c]=[str.strip(xyzd[0]), float(xyzd[1]), str.strip(xyzd[2])] 00227 if len(a)<1: 00228 casalog.post("Hostname can not be empty","WARN","config_cluster") 00229 casalog.post("The following entry will be ignored: %s" % sLine,"WARN","config_cluster") 00230 continue 00231 if len(c)<1: 00232 casalog.post("Workdir can not be empty","WARN","config_cluster") 00233 casalog.post("The following entry will be ignored: %s" % sLine,"WARN","config_cluster") 00234 continue 00235 00236 ### jagonzal (CAS-4276): New cluster specification file ### 00237 00238 # Retrieve available resources to cap number of engines deployed 00239 hostname = str(xyzd[0]) 00240 (ncores_available,memory_available,cpu_available) = self.check_host_resources(hostname) 00241 00242 # Determine maximum number of engines that can be deployed at target node 00243 max_engines_user = b 00244 if (max_engines_user<=0): 00245 max_engines = int(self.__default_cpu_fraction*(cpu_available/100.0)*ncores_available) 00246 if (max_engines < 2): 00247 casalog.post("CPU free capacity available %s of %s cores would not support cluster mode at node %s, starting only 1 engine" 00248 %(str(cpu_available),str(ncores_available),hostname),"WARN","config_cluster") 00249 max_engines = 1 00250 elif (max_engines_user<=1): 00251 max_engines = int(max_engines_user*ncores_available) 00252 else: 00253 max_engines = int(max_engines_user) 00254 00255 casalog.post("Will deploy up to %s engines at node %s" % (str(max_engines),hostname), "INFO","config_cluster") 00256 00257 # Determine maximum memory that can be used at target node 00258 if len(xyzd)>=4: 00259 max_mem_user = float(xyzd[3]) 00260 if (max_mem_user<=0): 00261 max_mem = int(round(self.__default_mem_fraction*memory_available)) 00262 elif (max_mem_user<=1): 00263 max_mem = int(round(max_mem_user*memory_available)) 00264 else: 00265 max_mem = int(max_mem_user) 00266 else: 00267 max_mem = int(round(self.__default_mem_fraction*memory_available)) 00268 00269 00270 casalog.post("Will use up to %sMB of memory at node %s" % (str(max_mem),hostname), "INFO","config_cluster") 00271 00272 # User can provide an estimation of the amount of RAM memory necessary per engine 00273 mem_per_engine = self.__default_mem_per_engine 00274 if len(xyzd)>=5: 00275 mem_per_engine_user = float(xyzd[4]) 00276 if (mem_per_engine_user>self.__default_mem_per_engine): 00277 mem_per_engine = mem_per_engine_user 00278 00279 # Apply heuristics: If memory limits number of engines then we can increase the number of openMP threads 00280 nengines=int(max_mem/mem_per_engine) 00281 if (nengines < 2): 00282 casalog.post("Free memory available %sMB would not support cluster mode at node %s, starting only 1 engine" 00283 % (str(max_mem),hostname), "WARN","config_cluster") 00284 nengines=1 00285 else: 00286 casalog.post("Required memory per engine %sMB allows to deploy up to %s engines at node %s " 00287 % (str(mem_per_engine),str(nengines),hostname), "INFO","config_cluster") 00288 00289 nengines = int(nengines) 00290 if (nengines>max_engines): 00291 casalog.post("Cap number of engines deployed at node %s from %s to %s in order to meet maximum number of engines constraint" 00292 % (hostname,str(nengines),str(max_engines)), "INFO","config_cluster") 00293 nengines = max_engines 00294 00295 omp_num_nthreads = 1 00296 while (nengines*omp_num_nthreads*2 <= max_engines): 00297 omp_num_nthreads *= 2 00298 00299 if (omp_num_nthreads>1): 00300 casalog.post("Enabling openMP to %s threads per engine at node %s" % (str(omp_num_nthreads),hostname), "INFO","config_cluster") 00301 00302 self._hosts.append([a, nengines, c, omp_num_nthreads]) 00303 00304 self._configdone=self.validate_hosts() 00305 00306 if not self._configdone: 00307 casalog.post("Failed to configure the cluster", "WARN","config_cluster") 00308 00309 def check_host_resources(self,hostname): 00310 """ 00311 jagonzal (CAS-4276 - New cluster specification file): Retrieve resources available 00312 at target node in order to dynamically deploy the engines to fit the idle capacity 00313 """ 00314 00315 casalog.origin("simple_cluster") 00316 00317 ncores = 0 00318 memory = 0. 00319 cmd_os = self.shell(hostname) + " 'uname -s'" 00320 os = commands.getoutput(cmd_os) 00321 if (os == "Linux"): 00322 cmd_ncores = self.shell(hostname) + " 'cat /proc/cpuinfo' " 00323 res_ncores = commands.getoutput(cmd_ncores) 00324 str_ncores = res_ncores.count("processor") 00325 00326 try: 00327 ncores = int(str_ncores) 00328 except: 00329 casalog.post("Problem converting number of cores into numerical format at node %s: %s" % (hostname,str_ncores),"WARN","check_host_resources") 00330 pass 00331 00332 memory=0.0 00333 for memtype in ['MemFree', 00334 'Buffers', 00335 'Cached' # "Cached" has a the problem that there can also be "SwapCached" 00336 ]: 00337 cmd_memory = self.shell(hostname) + " 'cat /proc/meminfo | grep -v SwapCached | grep "+memtype+"'" 00338 str_memory = commands.getoutput(cmd_memory) 00339 str_memory = string.replace(str_memory,memtype+':','') 00340 str_memory = string.replace(str_memory,"kB","") 00341 str_memory = string.replace(str_memory," ","") 00342 00343 try: 00344 memory += float(str_memory)/1024. 00345 except: 00346 casalog.post("Problem converting memory into numerical format at node %s: %s" % (hostname,str_memory),"WARN","check_host_resources") 00347 break 00348 00349 cmd_cpu = self.shell(hostname) + " 'top -b -n 1 | grep Cpu' " 00350 str_cpu = commands.getoutput(cmd_cpu) 00351 list_cpu = string.split(str_cpu,',') 00352 str_cpu = "100" 00353 for item in list_cpu: 00354 if item.count("%id")>0: 00355 str_cpu = string.replace(item,"%id","") 00356 str_cpu = string.replace(str_cpu," ","") 00357 00358 try: 00359 cpu = float(str_cpu) 00360 except: 00361 casalog.post("Problem converting available cpu into numerical format at node %s: %s" % (hostname,str_cpu),"WARN","check_host_resources") 00362 00363 else: # Mac OSX 00364 cmd_ncores = self.shell(hostname) + " '/usr/sbin/sysctl -n hw.ncpu'" 00365 res_ncores = commands.getoutput(cmd_ncores) 00366 str_ncores = res_ncores 00367 00368 try: 00369 ncores = int(str_ncores) 00370 except: 00371 casalog.post("Problem converting number of cores into numerical format at node %s: %s" % (hostname,str_ncores),"WARN","check_host_resources") 00372 pass 00373 00374 cmd_memory = self.shell(hostname) + " 'top -l 1 | grep PhysMem: | cut -d , -f5 ' " 00375 str_memory = commands.getoutput(cmd_memory) 00376 str_memory = string.replace(str_memory,"M free.","") 00377 str_memory = string.replace(str_memory," ","") 00378 00379 try: 00380 memory = float(str_memory) 00381 except: 00382 casalog.post("Problem converting memory into numerical format at node %s: %s" % (hostname,str_memory),"WARN","check_host_resources") 00383 pass 00384 00385 cmd_cpu = self.shell(hostname) + " 'top -l1 | grep usage' " 00386 str_cpu = commands.getoutput(cmd_cpu) 00387 list_cpu = string.split(str_cpu,',') 00388 str_cpu = "100" 00389 for item in list_cpu: 00390 if item.count("% idle")>0: 00391 str_cpu = string.replace(item,"% idle","") 00392 str_cpu = string.replace(str_cpu," ","") 00393 00394 try: 00395 cpu = float(str_cpu) 00396 except: 00397 casalog.post("Problem converting available cpu into numerical format at node %s: %s" % (hostname,str_cpu),"WARN","check_host_resources") 00398 00399 ncores = int(ncores) 00400 memory = int(round(memory)) 00401 casalog.post("Host %s, Number of cores %s, Memory available %sMB, CPU capacity available %s%%" % (hostname,str(ncores),str(memory),str(cpu)), "INFO","check_host_resources") 00402 00403 return (ncores,memory,cpu) 00404 00405 00406 def validate_hosts(self): 00407 """Validate the cluster specification. 00408 00409 This function is normally called internally by configure_cluster 00410 function. 00411 00412 """ 00413 00414 casalog.origin("simple_cluster") 00415 00416 # jagonzal (CAS-4287): Add a cluster-less mode to by-pass parallel processing for MMSs as requested 00417 if (len(self._hosts)==1): 00418 if (self._hosts[0][1] == 1): 00419 if (self.uniqueIP(self._hosts[0][0]) == self.uniqueIP("localhost")): 00420 casalog.post("Only one engine can be deployed at localhost, disabling cluster mode","WARN","validate_hosts") 00421 from parallel.parallel_task_helper import ParallelTaskHelper 00422 ParallelTaskHelper.bypassParallelProcessing(1) 00423 return False 00424 00425 uhost=set() 00426 for i in range(len(self._hosts)): 00427 uhost.add(self._hosts[i][0]) 00428 if len(uhost)==0: 00429 casalog.post("Configuration table is empty","WARN","validate_hosts") 00430 return False 00431 if len(uhost)<len(self._hosts): 00432 casalog.post("Configuration table contains repeated node name","WARN","validate_hosts") 00433 return False 00434 for i in range(len(self._hosts)): 00435 if type(self._hosts[i][1])!=int: 00436 casalog.post("Number of engines must be an integer","WARN","validate_hosts") 00437 return False 00438 for i in range(len(self._hosts)): 00439 if not os.path.exists(self._hosts[i][2]): 00440 casalog.post("Directory %s does not exist" % self._hosts[i][2],"WARN","validate_hosts") 00441 return False 00442 for i in range(len(self._hosts)): 00443 try: 00444 tfile=self._hosts[i][2]+'/nosuchfail' 00445 fid = open(tfile, 'w') 00446 fid.close() 00447 os.remove(tfile) 00448 except IOError: 00449 casalog.post("Failed write permission to directory %s" % self._hosts[i][2],"WARN","validate_hosts") 00450 return False 00451 return True 00452 00453 00454 ########################################################################### 00455 ### project management 00456 ########################################################################### 00457 def create_project(self, proj=""): 00458 """Create a project. 00459 00460 Keyword arguments: 00461 proj -- the name of project (default: 'proj'+timestamp). 00462 00463 A project maintains a subdirectory under each node's work_dir. All 00464 output files of an engine hosted on that node will by default store 00465 under the subdirectory. 00466 00467 This function is normally called internally by init_cluster function. 00468 00469 Example: 00470 CASA <33>: sl.list_projects 00471 host: casa-dev-07 ------------------------>>>> 00472 bProj bsplit csplit my_project 00473 host: casa-dev-08 ------------------------>>>> 00474 bProj bsplit csplit my_project 00475 host: casa-dev-10 ------------------------>>>> 00476 bProj bsplit csplit 00477 00478 CASA <34>: sl.create_project('dflag') 00479 output directory: 00480 /home/casa-dev-07/hye/ptest/dflag 00481 /home/casa-dev-08/hye/ptest/dflag 00482 /home/casa-dev-10/hye/ptest/dflag 00483 00484 CASA <36>: sl.list_projects 00485 host: casa-dev-07 ------------------------>>>> 00486 bProj bsplit csplit dflag my_project 00487 host: casa-dev-08 ------------------------>>>> 00488 bProj bsplit csplit dflag my_project 00489 host: casa-dev-10 ------------------------>>>> 00490 bProj bsplit csplit dflag 00491 00492 """ 00493 00494 casalog.origin("simple_cluster") 00495 00496 if not self._configdone: 00497 return 00498 if type(proj)!=str: 00499 casalog.post("Project name must be string","WARN","create_project") 00500 return 00501 00502 self._project=proj.strip() 00503 if self._project=="": 00504 tm=time.strftime("%Y%b%d-%Hh%Mm%Ss", time.localtime()) 00505 self._project='proj'+tm 00506 00507 for i in range(len(self._hosts)): 00508 if not os.path.exists(self._hosts[i][2]+'/'+proj.strip()): 00509 cmd='mkdir '+self._hosts[i][2]+'/'+self._project 00510 os.system(cmd) 00511 00512 def do_project(self, proj): 00513 """Use a project previously created. 00514 00515 Keyword arguments: 00516 proj -- the name of project. 00517 00518 A project maintains a subdirectory under each node's work_dir. All 00519 output files of an engine hosted on that node will by default store 00520 under the subdirectory. 00521 00522 Example: 00523 CASA <38>: sl._project 00524 Out[38]: 'dflag' 00525 00526 CASA <39>: sl.do_project('csplit') 00527 output directory: 00528 /home/casa-dev-07/hye/ptest/csplit 00529 /home/casa-dev-08/hye/ptest/csplit 00530 /home/casa-dev-10/hye/ptest/csplit 00531 00532 CASA <40>: sl._project 00533 Out[40]: 'csplit' 00534 00535 """ 00536 00537 casalog.origin("simple_cluster") 00538 00539 if not self._configdone: 00540 return 00541 if type(proj)!=str or proj.strip()=="": 00542 casalog.post("Project name must be a nonempty string","WARN","do_project") 00543 return 00544 00545 projexist=True 00546 for i in range(len(self._hosts)): 00547 if not os.path.exists(self._hosts[i][2]+'/'+proj.strip()): 00548 casalog.post("No project directory found on %s" % self._hosts[i][0],"WARN","do_project") 00549 projexist=False 00550 if projexist: 00551 self._project=proj.strip() 00552 casalog.post("Output directory:","INFO","do_project") 00553 for i in range(len(self._hosts)): 00554 casalog.post("Output directory: %s/%s" % (self._hosts[i][2],self._project),"INFO","do_project") 00555 00556 def erase_project(self, proj): 00557 """Erase files and dirs of a project. 00558 00559 Keyword arguments: 00560 proj -- the name of project. 00561 00562 A project maintains a subdirectory under each node's work_dir. All 00563 output files of an engine hosted on that node will by default store 00564 under the subdirectory. 00565 00566 Example: 00567 CASA <30>: sl.list_projects 00568 host: casa-dev-07 ------------------------>>>> 00569 aNew bProj bsplit csplit my_project 00570 host: casa-dev-08 ------------------------>>>> 00571 aNew bProj bsplit csplit my_project 00572 host: casa-dev-10 ------------------------>>>> 00573 bProj bsplit csplit 00574 00575 CASA <31>: sl.erase_project('aNew') 00576 00577 CASA <32>: sl.list_projects 00578 host: casa-dev-07 ------------------------>>>> 00579 bProj bsplit csplit my_project 00580 host: casa-dev-08 ------------------------>>>> 00581 bProj bsplit csplit my_project 00582 host: casa-dev-10 ------------------------>>>> 00583 bProj bsplit csplit 00584 00585 """ 00586 00587 casalog.origin("simple_cluster") 00588 00589 if not self._configdone: 00590 return 00591 if type(proj)!=str or proj.strip()=="": 00592 casalog.post("Project name must be a nonempty string","WARN","erase_project") 00593 return 00594 for i in range(len(self._hosts)): 00595 cmd='rm -rf '+self._hosts[i][2]+'/'+proj 00596 os.system(cmd) 00597 if proj==self._project: 00598 self._project="" 00599 if self._project=="": 00600 pass 00601 00602 def clear_project(self, proj): 00603 """Remove all previous results of the proj 00604 00605 Keyword arguments: 00606 proj -- the name of project. 00607 00608 A project maintains a subdirectory under each node's work_dir. All 00609 output files of an engine hosted on that node will by default store 00610 under the subdirectory. 00611 00612 Example: 00613 CASA <27>: sl.list_project('my_project') 00614 host: casa-dev-07 ------------------------>>>> 00615 casapy-20101122165601-5.log sim.alma.csv.mid-f0-s0-b640-e768.flux 00616 host: casa-dev-08 ------------------------>>>> 00617 casapy-20101122165601-6.log sim.alma.csv.mid-f0-s0-b768-e896.flux 00618 host: casa-dev-10 ------------------------>>>> 00619 casapy-20101122165601-7.log sim.alma.csv.mid-f0-s0-b320-e640.flux 00620 00621 CASA <28>: sl.clear_project('my_project') 00622 00623 CASA <29>: sl.list_project('my_project') 00624 host: casa-dev-07 ------------------------>>>> 00625 host: casa-dev-08 ------------------------>>>> 00626 host: casa-dev-10 ------------------------>>>> 00627 00628 """ 00629 00630 casalog.origin("simple_cluster") 00631 00632 # This can be a slow operation, it is better to do it parallel 00633 if not self._configdone: 00634 return 00635 if type(proj)!=str or proj.strip()=="": 00636 casalog.post("Project name must be a nonempty string","WARN","clear_project") 00637 return 00638 for i in range(len(self._hosts)): 00639 cmd='rm -rf '+self._hosts[i][2]+'/'+proj.strip()+'/*' 00640 os.system(cmd) 00641 00642 def list_project(self, proj): 00643 """List previous results of the proj 00644 00645 Keyword arguments: 00646 proj -- the name of project. 00647 00648 A project maintains a subdirectory under each node's work_dir. All 00649 output files of an engine hosted on that node will by default store 00650 under the subdirectory. 00651 00652 Example: 00653 CASA <19>: sl.list_project('bsplit') 00654 host: casa-dev-07 ------------------------>>>> 00655 test_regression_TDEM0003-f0-s11.ms test_regression_TDEM0003-f0-s13.ms 00656 host: casa-dev-08 ------------------------>>>> 00657 test_regression_TDEM0003-f0-s10.ms test_regression_TDEM0003-f3-s9.ms 00658 host: casa-dev-10 ------------------------>>>> 00659 test_regression_TDEM0003-f0-s0.ms test_regression_TDEM0003-f4-s11.ms 00660 00661 """ 00662 00663 casalog.origin("simple_cluster") 00664 00665 if not self._configdone: 00666 return 00667 if type(proj)!=str or proj.strip()=="": 00668 casalog.post("Project name must be a nonempty string","WARN","list_project") 00669 return 00670 for i in range(len(self._hosts)): 00671 casalog.post("Host: %s ------------------------>>>>" % self._hosts[i][0] ,"INFO","list_project") 00672 cmd='ls '+self._hosts[i][2]+'/'+proj 00673 os.system(cmd) 00674 00675 def erase_projects(self): 00676 """Erase all previous results of all projects 00677 00678 A project maintains a subdirectory under each node's work_dir. All 00679 output files of an engine hosted on that node will by default store 00680 under the subdirectory. 00681 00682 """ 00683 if not self._configdone: 00684 return 00685 for i in range(len(self._hosts)): 00686 cmd='rm -rf '+self._hosts[i][2]+'/*' 00687 os.system(cmd) 00688 self._project="" 00689 00690 def list_projects(self): 00691 """List all previous projects 00692 00693 A project maintains a subdirectory under each node's work_dir. All 00694 output files of an engine hosted on that node will by default store 00695 under the subdirectory. 00696 00697 Example: 00698 <CASA 16:>sl.list_projects 00699 host: casa-dev-07 ------------------------>>>> 00700 aNew bProj bsplit csplit my_project 00701 host: casa-dev-08 ------------------------>>>> 00702 aNew bProj bsplit csplit my_project 00703 host: casa-dev-10 ------------------------>>>> 00704 bProj bsplit csplit 00705 00706 """ 00707 00708 casalog.origin("simple_cluster") 00709 00710 if not self._configdone: 00711 return 00712 for i in range(len(self._hosts)): 00713 casalog.post("Host: %s ------------------------>>>>" % self._hosts[i][0] ,"INFO","list_projects") 00714 cmd='ls '+self._hosts[i][2]+'/' 00715 os.system(cmd) 00716 00717 def reset_project(self): 00718 """Erase previous result and reset the status current project. 00719 00720 A project maintains a subdirectory under each node's work_dir. All 00721 output files of an engine hosted on that node will by default store 00722 under the subdirectory. 00723 00724 Example: 00725 CASA <43>: sl.list_project('bProj') 00726 ....too many here... 00727 00728 CASA <44>: sl.do_project('bProj') 00729 output directory: 00730 /home/casa-dev-07/hye/ptest/bProj 00731 /home/casa-dev-08/hye/ptest/bProj 00732 /home/casa-dev-10/hye/ptest/bProj 00733 00734 CASA <45>: sl.list_project('bProj') 00735 host: casa-dev-07 ------------------------>>>> 00736 host: casa-dev-08 ------------------------>>>> 00737 host: casa-dev-10 ------------------------>>>> 00738 00739 """ 00740 if not self._configdone: 00741 return 00742 self.stop_monitor() 00743 self.clear_project(self._project) 00744 self._jobs={} 00745 self._job_title=1 00746 self._monitor_on=True 00747 00748 def get_hosts(self): 00749 """List current cluster. 00750 00751 CASA <48>: sl.get_hosts 00752 Out[48]: 00753 [['casa-dev-07', 4, '/home/casa-dev-07/hye/ptest'], 00754 ['casa-dev-08', 4, '/home/casa-dev-08/hye/ptest'], 00755 ['casa-dev-10', 4, '/home/casa-dev-10/hye/ptest']] 00756 00757 """ 00758 if not self._configdone: 00759 return 00760 return self._hosts 00761 00762 ########################################################################### 00763 ### cluster management 00764 ########################################################################### 00765 00766 # jagonzal (CAS-4292): This method is deprecated, because I'd like to 00767 # avoid brute-force methods like killall which just hide state errors 00768 def cold_start(self): 00769 """kill all engines on all hosts. Shutdown current cluster. 00770 00771 This is used if a complete restart of the cluster is needed. One can 00772 rerun init_cluster after this. This also kills possible leftover 00773 engines from previous sessions. 00774 00775 """ 00776 if not self._configdone: 00777 return 00778 # jagonzal (CAS-4292): Stop the cluster via parallel_go before using brute-force killall 00779 self.stop_cluster() 00780 for i in range(len(self._hosts)): 00781 hostname = self._hosts[i][0] 00782 cmd=self.shell(hostname) + ' "killall -9 ipengine"' 00783 os.system(cmd) 00784 00785 # jagonzal (CAS-4292): Method for gracefull finalization (e.g.: when closing casa) 00786 def stop_cluster(self): 00787 """ Destructor method to shut down the cluster gracefully """ 00788 # Stop cluster and thread services 00789 self.stop_monitor() 00790 # Now we can stop the cluster w/o problems 00791 # jagonzal (CAS-4292): Stop the cluster w/o using brute-force killall as in cold_start 00792 self._cluster.stop_cluster() 00793 self.__running = False 00794 00795 def stop_nodes(self): 00796 """Stop all engines on all hosts of current cluster. 00797 00798 After running this, the cluster contains no engines. 00799 00800 """ 00801 if not self._configdone: 00802 return 00803 for i in self._cluster.get_nodes(): 00804 self._cluster.stop_node(i) 00805 00806 def start_cluster(self): 00807 """Start a cluster with current configuration. 00808 00809 Normally, one does not need to run this function directly. The 00810 init_cluster will call this internally. 00811 00812 """ 00813 00814 casalog.origin("simple_cluster") 00815 00816 if not self._configdone: 00817 return 00818 for i in range(len(self._hosts)): 00819 self._cluster.start_engine(self._hosts[i][0], self._hosts[i][1], 00820 self._hosts[i][2]+'/'+self._project,self._hosts[i][3]) 00821 if (self._hosts[i][3]>1): 00822 omp_num_threads = self._cluster.execute("print os.environ['OMP_NUM_THREADS']",i)[0]['stdout'] 00823 if (omp_num_threads.count(str(self._hosts[i][3]))>0): 00824 casalog.post("Open MP enabled at host %s, OMP_NUM_THREADS=%s" % (self._hosts[i][0],str(self._hosts[i][3])), "INFO","start_cluster") 00825 else: 00826 casalog.post("Problem enabling Open MP at host %s: %s" % (self._hosts[i][0],omp_num_threads),"WARN","start_cluster") 00827 00828 self.start_logger() 00829 self.start_monitor() 00830 # jagonzal (CAS-4324): Is better to update the resources in the 00831 # check_status method, after updating the status of the jobs 00832 # self.start_resource() 00833 self._rsrc = self.show_resource(True) 00834 00835 def get_host(self, id): 00836 """Find out the name of the node that hosts this engine. 00837 00838 Keyword arguments: 00839 id -- the engine id 00840 00841 Example: 00842 CASA <50>: sl.get_host(8) 00843 Out[50]: 'casa-dev-10' 00844 00845 """ 00846 00847 casalog.origin("simple_cluster") 00848 00849 if not self._configdone: 00850 return 00851 ids=self._cluster.get_ids() 00852 if type(id)!=int: 00853 casalog.post("The argument must be an engine id (int)","WARN","get_host") 00854 return '' 00855 if ids.count(id)!=1: 00856 casalog.post("Engine %d does not exist" % id,"WARN","get_host") 00857 return '' 00858 e=self._cluster.get_engines() 00859 for i in range(len(e)): 00860 if e[i][0]==id: 00861 return e[i][1] 00862 00863 def get_engine_store(self, id): 00864 """Get the root path where an engine writes out result 00865 00866 Keyword arguments: 00867 id -- the engine id 00868 00869 Example: 00870 CASA <52>: sl.get_engine_store(8) 00871 Out[52]: '/home/casa-dev-10/hye/ptest/bProj/' 00872 00873 """ 00874 if not self._configdone: 00875 return 00876 hst=self.get_host(id) 00877 for i in range(len(self._hosts)): 00878 if self._hosts[i][0]==hst: 00879 pth=self._hosts[i][2] 00880 sl='' 00881 if not pth.endswith('/'): 00882 sl='/' 00883 return pth+sl+self._project+'/' 00884 00885 ########################################################################### 00886 ### log management 00887 ########################################################################### 00888 def start_logger(self): 00889 """Link all engine logs to the current directory. 00890 00891 After running this, the current directory contains links to each of 00892 the engine logs with file name 'engine-[id].log such that one can 00893 conveniently browse engine logs with casa logviewer. 00894 00895 """ 00896 if not self._configdone: 00897 return 00898 lg=self._cluster.get_casalogs() 00899 os.system('rm -f engine-*.log') 00900 for i in lg: 00901 eng='engine'+i[str.rfind(i,'-'):] 00902 os.symlink(i, eng) 00903 00904 ########################################################################### 00905 ### resource management 00906 ########################################################################### 00907 def start_resource(self): 00908 """Start monitoring resource usage. 00909 00910 Four critical resource usage indicators (for parallel execution), 00911 namely, %cpu, %iowait, %mem and %memswap on all hosts are continuously 00912 checked. This infomation can be used to tune the parallel performance. 00913 00914 Normally, one does not call this function directly. The init_cluster 00915 will call this function. 00916 00917 """ 00918 00919 if not self._configdone: 00920 return 00921 self._resource_on=True 00922 self._rsrc={} 00923 return thread.start_new_thread(self.update_resource, ()) 00924 00925 def update_resource(self): 00926 """Set up repeated resource checking. 00927 00928 Four critical resource usage indicators (for parallel execution), 00929 namely, %cpu, %iowait, %mem and %memswap on all hosts are continuously 00930 checked. This infomation can be used to tune the parallel performance. 00931 00932 Normally, one does not call this function directly. The init_cluster 00933 will call this function. 00934 00935 """ 00936 00937 self._resource_running=True 00938 if not self._configdone: 00939 return 00940 while self._resource_on: 00941 if ((len(self._jobs.keys())>0) or (len(self._rsrc.keys())==0)): 00942 self.check_resource() 00943 time.sleep(5) 00944 self._resource_running=False 00945 00946 def stop_resource(self): 00947 """Stop monitoring resource usage. 00948 00949 Four critical resource usage indicators (for parallel execution), 00950 namely, %cpu, %iowait, %mem and %memswap on all hosts are continuously 00951 checked. This infomation can be used to tune the parallel performance. 00952 00953 Normally, one does not call this function directly. The init_cluster 00954 will call this function. 00955 00956 """ 00957 if not self._configdone: 00958 return 00959 self._resource_on=False 00960 while (self._resource_running): 00961 time.sleep(1) 00962 self._rsrc={} 00963 00964 def show_resource(self, long=False): 00965 """ 00966 jagonzal (CAS-4372): Old resource monitoring functions were causing crashes in NRAO cluster 00967 """ 00968 if not self._configdone: 00969 return 00970 if long: 00971 return self.check_resource(False) 00972 else: 00973 self.check_resource(True) 00974 00975 def check_resource(self, verbose_local=False): 00976 """ 00977 jagonzal 29/05/12 (CAS-4137) - HPC project (CAS-4106) 00978 ======================================================================== 00979 Advanced monitoring function that provides CPU,Memory,I/O and job queue 00980 stats per node and total per host. 00981 00982 There are 2 usage modes: 00983 - Called from the monitoring thread in regular time intervals 00984 (i.e. within the check_status method), to only dump the stats 00985 into a file or the terminal. 00986 - Called from the command line, to return a dictionary in addition 00987 of dumping the stats into a file or into the terminal. 00988 00989 In both cases logging can be controlled in the following way: 00990 - User can provide a file-name when creating the simple_cluster 00991 object, trough the string parameter "monitoringFile", to specify 00992 the location of the monitoring file, otherwise it defaults to 00993 'monitoring.log' in the working directory. 00994 - User can also switch verbose mode when creating the simple_cluster 00995 object, trough the boolean parameter "verbose", to have the monitoring 00996 info dumped into the terminal, otherwise it defaults to False and it 00997 only dumps into the monitoring file. 00998 - User can even use the stand alone method show_state specifying 00999 verbosity only for that particular call via the "verbose_local" 01000 parameter, and the method returns a dictionary with all the stats 01001 per node and total per host. 01002 01003 Examples: 01004 - Stand-Alone usage from terminal to return a dictionary 01005 from simple_cluster import * 01006 sc = simple_cluster.getCluster() 01007 stats = sc.show_state(False) 01008 - Stand-Alone usage from terminal to print the stats into the terminal 01009 from simple_cluster import * 01010 sc = simple_cluster.getCluster() 01011 sc.show_state(True) 01012 - Service-Mode usage to specify a custom file for dumping the stats 01013 from simple_cluster import * 01014 sc = simple_cluster('mycustomfile.log') 01015 sc.init_cluster('cluster-config.txt','test-rhel') 01016 - Service-Mode usage to specify a custom file for dumping the stats 01017 and verbose mode to additionally dump the stats into the terminal 01018 from simple_cluster import * 01019 sc = simple_cluster('mycustomfile.log',True) 01020 sc.init_cluster('cluster-config.txt','test-rhel') 01021 """ 01022 01023 # Determine verbose mode 01024 verbose = self._verbose or verbose_local 01025 01026 # Open monitoring file 01027 fid = open(self._monitoringFile + ".tmp", 'w') 01028 01029 # Print header 01030 print >> fid, "%20s%10s%10s%10s%10s%10s%10s%10s%15s%15s%20s%30s" % ( "Host","Engine","Status","CPU[%]","Memory[%]","Time[s]", 01031 "Read[MB]","Write[MB]","Read[MB/s]","Write[MB/s]","Job","Sub-MS") 01032 if (verbose): 01033 print "%20s%10s%10s%10s%10s%10s%10s%10s%15s%15s%20s%30s" % ("Host","Engine","Status","CPU[%]","Memory[%]","Time[s]", 01034 "Read[MB]","Write[MB]","Read[MB/s]","Write[MB/s]","Job","Sub-MS") 01035 01036 result = {} 01037 engines_list = self._cluster.get_engines() 01038 for engine in engines_list: 01039 hostname = str(engine[1]) 01040 # First of all get operating system 01041 cms_os = self.shell(hostname) + " 'uname -s'" 01042 os = commands.getoutput(cms_os) 01043 # Get read/write activity 01044 read_bytes = 0.0 01045 write_bytes = 0.0 01046 if (os == "Linux"): 01047 # Get read activity 01048 cmd_read_bytes = self.shell(hostname) + " 'cat /proc/" + str(engine[2]) + "/io | grep read_bytes'" 01049 read_bytes=commands.getoutput(cmd_read_bytes) 01050 read_bytes = read_bytes.split(":") 01051 try: 01052 read_bytes = float(read_bytes[1]) 01053 except: 01054 if (verbose): 01055 print "Problem converting read_bytes into float for engine " + str(engine[0]) + " running in host " + str(engine[1]) 01056 print "read_bytes: [" + str(read_bytes) + "]" 01057 read_bytes = 0 01058 # Get write activity 01059 cmd_write_bytes = self.shell(hostname) + " 'cat /proc/" + str(engine[2]) + "/io | grep write_bytes | head -1'" 01060 write_bytes=commands.getoutput(cmd_write_bytes) 01061 write_bytes = write_bytes.split(":") 01062 try: 01063 write_bytes = float(write_bytes[1]) 01064 except: 01065 if (verbose): 01066 print "Problem converting write_bytes into float for engine " + str(engine[0]) + " running in host " + str(engine[1]) 01067 print "write_bytes: [" + str(write_bytes) + "]" 01068 write_bytes = 0.0 01069 # Get resources usage (cpu, mem, elapsed time since start) 01070 cmd_resources = self.shell(hostname) + " 'ps -p " + str(engine[2]) + " -o %cpu,%mem,etime' | tail -1" 01071 resources=commands.getoutput(cmd_resources) 01072 resources = resources.split(" ") 01073 while resources.count('')>0: 01074 resources.remove('') 01075 # Convert CPU into number 01076 cpu = 0 01077 try: 01078 cpu = round(float(resources[0])) 01079 except: 01080 if (verbose): 01081 print "Problem converting CPU into float for engine " + str(engine[0]) + " running in host " + str(engine[1]) 01082 print "CPU: [" + resources[0] + "]" 01083 # Convert Memory into number 01084 memory = 0 01085 try: 01086 memory = round(float(resources[1])) 01087 except: 01088 if (verbose): 01089 print "Problem converting memory into float for engine " + str(engine[0]) + " running in host " + str(engine[1]) 01090 print "Memory: [" + resources[1] + "]" 01091 # Initialize engine RW offsets map 01092 if not self._enginesRWoffsets.has_key(engine[0]): 01093 self._enginesRWoffsets[engine[0]] = {} 01094 # Store engine RW offsets values 01095 self._enginesRWoffsets[engine[0]]['read_offset'] = read_bytes 01096 self._enginesRWoffsets[engine[0]]['write_offset'] = write_bytes 01097 # Initialize host map 01098 if not result.has_key(engine[1]): 01099 result[engine[1]] = {} 01100 result[engine[1]]["CPU"] = 0.0 01101 result[engine[1]]["Memory"] = 0.0 01102 result[engine[1]]["Read"] = 0.0 01103 result[engine[1]]["Write"] = 0.0 01104 result[engine[1]]["ReadRate"] = 0.0 01105 result[engine[1]]["WriteRate"] = 0.0 01106 # Initialize engine map 01107 if not result[engine[1]].has_key(engine[0]): 01108 result[engine[1]][engine[0]] = {} 01109 # Store default engine values 01110 result[engine[1]][engine[0]]["CPU"] = cpu 01111 result[engine[1]][engine[0]]["Memory"] = memory 01112 result[engine[1]][engine[0]]["Read"] = 0 01113 result[engine[1]][engine[0]]["Write"] = 0 01114 result[engine[1]][engine[0]]["ReadRate"] = 0 01115 result[engine[1]][engine[0]]["WriteRate"] = 0 01116 result[engine[1]][engine[0]]["Sub-MS"] = "" 01117 result[engine[1]][engine[0]]["Status"] = "Idle" 01118 result[engine[1]][engine[0]]["Time"] = 0 01119 result[engine[1]][engine[0]]["Job"] = "" 01120 # Retrieve job status information from job structure 01121 for job in self._jobs.keys(): 01122 jobEngine = self._jobs[job]['engine'] 01123 if (jobEngine == engine[0]): 01124 result[engine[1]][engine[0]]["Sub-MS"] = self._jobs[job]['subms'] 01125 result[engine[1]][engine[0]]["Status"] = self._jobs[job]['status'] 01126 result[engine[1]][engine[0]]["Time"] = round(self._jobs[job]['time']) 01127 result[engine[1]][engine[0]]["Job"] = self._jobs[job]['short'].split('=').pop().replace(' ','') 01128 result[engine[1]][engine[0]]["Read"] = float(read_bytes - self._jobs[job]['read_offset'])/(1024*1024) 01129 result[engine[1]][engine[0]]["Write"] = float(write_bytes - self._jobs[job]['write_offset'])/(1024*1024) 01130 # Compute data rates 01131 if (result[engine[1]][engine[0]]["Time"] > 0): 01132 result[engine[1]][engine[0]]["ReadRate"] = result[engine[1]][engine[0]]["Read"] / result[engine[1]][engine[0]]["Time"] 01133 result[engine[1]][engine[0]]["WriteRate"] = result[engine[1]][engine[0]]["Write"] / result[engine[1]][engine[0]]["Time"] 01134 # Accumulate host values 01135 result[engine[1]]["CPU"] += result[engine[1]][engine[0]]["CPU"] 01136 result[engine[1]]["Memory"] += result[engine[1]][engine[0]]["Memory"] 01137 result[engine[1]]["Read"] += result[engine[1]][engine[0]]["Read"] 01138 result[engine[1]]["Write"] += result[engine[1]][engine[0]]["Write"] 01139 result[engine[1]]["ReadRate"] += result[engine[1]][engine[0]]["ReadRate"] 01140 result[engine[1]]["WriteRate"] += result[engine[1]][engine[0]]["WriteRate"] 01141 # Print nodes info 01142 print >> fid, "%20s%10d%10s%10d%10d%10d%10d%10d%15d%15d%20s%30s" % ( engine[1],engine[0], 01143 result[engine[1]][engine[0]]["Status"], 01144 result[engine[1]][engine[0]]["CPU"], 01145 result[engine[1]][engine[0]]["Memory"], 01146 result[engine[1]][engine[0]]["Time"], 01147 result[engine[1]][engine[0]]["Read"], 01148 result[engine[1]][engine[0]]["Write"], 01149 result[engine[1]][engine[0]]["ReadRate"], 01150 result[engine[1]][engine[0]]["WriteRate"], 01151 result[engine[1]][engine[0]]["Job"], 01152 result[engine[1]][engine[0]]["Sub-MS"]) 01153 if (verbose): 01154 print "%20s%10d%10s%10d%10d%10d%10d%10d%15d%15d%20s%30s" % ( engine[1],engine[0], 01155 result[engine[1]][engine[0]]["Status"], 01156 result[engine[1]][engine[0]]["CPU"], 01157 result[engine[1]][engine[0]]["Memory"], 01158 result[engine[1]][engine[0]]["Time"], 01159 result[engine[1]][engine[0]]["Read"], 01160 result[engine[1]][engine[0]]["Write"], 01161 result[engine[1]][engine[0]]["ReadRate"], 01162 result[engine[1]][engine[0]]["WriteRate"], 01163 result[engine[1]][engine[0]]["Job"], 01164 result[engine[1]][engine[0]]["Sub-MS"]) 01165 01166 # Print separation between nodes and hosts info 01167 print >> fid, "%20s%10s%10s%10s%10s%10s%10s%10s%15s%15s%20s%30s" % ( "====================", 01168 "==========", 01169 "==========", 01170 "==========", 01171 "==========", 01172 "==========", 01173 "==========", 01174 "==========", 01175 "===============", 01176 "===============", 01177 "====================", 01178 "==============================") 01179 01180 if (verbose): 01181 print "%20s%10s%10s%10s%10s%10s%10s%10s%15s%15s%20s%30s" % ( "====================", 01182 "==========", 01183 "==========", 01184 "==========", 01185 "==========", 01186 "==========", 01187 "==========", 01188 "==========", 01189 "===============", 01190 "===============", 01191 "====================", 01192 "==============================") 01193 01194 # Print hosts info 01195 for host in result: 01196 print >> fid, "%20s%10s%10s%10d%10d%10s%10d%10d%15d%15d%20s%30s" % ( host,"Total","", 01197 result[host]["CPU"], 01198 result[host]["Memory"], 01199 "", 01200 result[host]["Read"], 01201 result[host]["Write"], 01202 result[host]["ReadRate"], 01203 result[host]["WriteRate"], 01204 "","") 01205 if (verbose): 01206 for host in result: 01207 print "%20s%10s%10s%10d%10d%10s%10d%10d%15d%15d%20s%30s" % ( host,"Total","", 01208 result[host]["CPU"], 01209 result[host]["Memory"], 01210 "", 01211 result[host]["Read"], 01212 result[host]["Write"], 01213 result[host]["ReadRate"], 01214 result[host]["WriteRate"], 01215 "","") 01216 01217 # Print final separator 01218 print >> fid, "%20s%10s%10s%10s%10s%10s%10s%10s%15s%15s%20s%30s" % ( "====================", 01219 "==========", 01220 "==========", 01221 "==========", 01222 "==========", 01223 "==========", 01224 "==========", 01225 "==========", 01226 "===============", 01227 "===============", 01228 "====================", 01229 "==============================") 01230 if (verbose): 01231 print "%20s%10s%10s%10s%10s%10s%10s%10s%15s%15s%20s%30s" % ( "====================", 01232 "==========", 01233 "==========", 01234 "==========", 01235 "==========", 01236 "==========", 01237 "==========", 01238 "==========", 01239 "===============", 01240 "===============", 01241 "====================", 01242 "==============================") 01243 01244 # Close monitoring file 01245 fid.close() 01246 01247 # Rename monitoring file 01248 commands.getstatusoutput("mv " + self._monitoringFile + ".tmp" + " " + self._monitoringFile) 01249 01250 # Update resource member 01251 self._rsrc = result 01252 01253 return result 01254 01255 def get_return_list(self): 01256 """ 01257 jagonzal (CAS-4376): Gather return variables from the different engines back to the main CASA controller instance 01258 """ 01259 01260 return_list = {} 01261 jobQueue = self._JobQueueManager.getOutputJobs() 01262 for job in jobQueue: 01263 return_list[job.getCommandArguments()['vis']]=job.getReturnValues() 01264 01265 return return_list 01266 01267 01268 ########################################################################### 01269 ### execution status management 01270 ########################################################################### 01271 def check_job(self): 01272 """Check the execution status of current noblock jobs on all engines. 01273 01274 This function can be used to block the terminal until all submitted 01275 jobs finish. 01276 01277 Example: 01278 CASA <2>: from simple_cluster import simple_cluster 01279 CASA <3>: sl=simple_cluster() 01280 CASA <4>: sl.init_cluster("my_cluster", "csplit") 01281 CASA <5>: sl.simple_split('/lustre/casa-store/hye/10B-209a_5s.ms/', '') 01282 CASA <6>: sl.check_job() 01283 01284 """ 01285 01286 if not self._configdone: 01287 return 01288 done=False 01289 while(not done): 01290 time.sleep(5) 01291 done=True 01292 for i in sl._jobs.keys(): 01293 if not self._cluster.check_job(i): 01294 done=False 01295 01296 def check_status(self, notify=False): 01297 """Check the execution status of submitted no-block jobs 01298 01299 Keyword arguments: 01300 notify -- whether or not to display detailed resource usage info 01301 01302 Normally, one does not call this function directly. The start_monitor 01303 will call this function internally. 01304 01305 """ 01306 01307 casalog.origin("simple_cluster") 01308 01309 self._monitor_running = True 01310 01311 if not self._configdone: 01312 return 01313 while self._monitor_on: 01314 time.sleep(5) 01315 curr={} 01316 try: 01317 pend=self._cluster.queue_status() 01318 for i in xrange(len(pend)): 01319 (a, b)=pend[i] 01320 curr[a]=b['pending'] 01321 except: 01322 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 01323 # traceback.print_tb(sys.exc_info()[2]) 01324 pass 01325 for job in self._jobs.keys(): 01326 if type(job)==type(None): 01327 self._jobs[job]['status']="unknown" 01328 else: 01329 eng=self._jobs[job]['engine'] 01330 sht=self._jobs[job]['short'] 01331 try: 01332 x=1 01333 try: 01334 x=job.get_result(block=False) 01335 except client.CompositeError, exception: 01336 if notify and self._jobs[job]['status']=="scheduled": 01337 casalog.post("Error retrieving result of job %s from engine %s: %s, backtrace:" % (sht,str(eng),str(exception)),"SEVERE","check_status") 01338 exception.print_tracebacks() 01339 self._jobs[job]['status']="broken" 01340 except: 01341 casalog.post("Error retrieving result of job %s from engine %s, backtrace:" % (sht,str(eng)),"SEVERE","check_status") 01342 traceback.print_tb(sys.exc_info()[2]) 01343 01344 if x==None: 01345 cmd=self._jobs[job]['command'] 01346 if curr.has_key(eng): 01347 wk=eval(curr[eng].lstrip('execute(').rstrip(')')) 01348 if wk==cmd: 01349 self._jobs[job]['status']="running" 01350 if self._jobs[job]['start']=='': 01351 if notify: 01352 casalog.post("Engine %d job %s started" %(eng,sht),"INFO","check_status") 01353 self._jobs[job]['start']=time.time() 01354 self._jobs[job]['time']=time.time()-self._jobs[job]['start'] 01355 else: 01356 pass 01357 else: 01358 if self._jobs[job]['status']=="running": 01359 if notify: 01360 casalog.post("Engine %d job %s finished" %(eng,sht),"INFO","check_status") 01361 self._jobs[job]['status']="done" 01362 if self._jobs[job]['status']=="scheduled": 01363 if isinstance(x, int): 01364 if notify: 01365 casalog.post("Engine %d job %s broken" %(eng,sht),"SEVERE","check_status") 01366 self._jobs[job]['status']="broken" 01367 else: 01368 if notify: 01369 casalog.post("Engine %d job %s finished" %(eng,sht),"INFO","check_status") 01370 self._jobs[job]['status']="done" 01371 except: 01372 if notify and self._jobs[job]['status']=="running": 01373 casalog.post("Engine %d job %s broken" %(eng,sht),"SEVERE","check_status") 01374 self._jobs[job]['status']="broken" 01375 01376 # jagonzal (CAS-4324): This method consumes lots of resources, and the user terminal 01377 # is not very responsive while it's running, so we execute it only when there are jobs 01378 # beign processed. 01379 if (len(self._jobs.keys())>0): 01380 try: 01381 self.check_resource() 01382 except: 01383 pass 01384 01385 gr=set() 01386 for val in self._jobs.values(): 01387 gr.add(val['jobgroup']) 01388 for i in list(gr): 01389 eml='' 01390 jobname='None' 01391 if i.count(':')>0: 01392 pos=i.find(':') 01393 jobname=i[pos+1:].strip() 01394 eml=i[:pos].strip() 01395 elif i.count('@')>0: 01396 eml=i.strip() 01397 else: 01398 jobname=i.strip() 01399 01400 finish=True 01401 for val in self._jobs.values(): 01402 if val['jobgroup']==i and not (val['status']=='done' or 01403 val['status']=='broken'): 01404 finish=False 01405 if finish: 01406 tm=time.ctime() 01407 jobname=jobname[0:30] 01408 msg='\n#### '+jobname+' '+ \ 01409 '#'*(68-len(jobname)-len(tm))+' '+tm+' ####' 01410 msg+='\nengine status time(s) command\n' 01411 rmv=[] 01412 for job in self._jobs.keys(): 01413 if self._jobs[job]['jobgroup']==i: 01414 msg+="%6d%10s%9d%2s%s\n" % (self._jobs[job]['engine'], 01415 self._jobs[job]['status'], 01416 int(self._jobs[job]['time']), 01417 ' ', 01418 self._jobs[job]['command']) 01419 rmv.append(self._jobs[job]['jobname']) 01420 if i.strip()!='': 01421 01422 # Append to project result file 01423 f=open(self._project+'.result', 'a') 01424 f.write(msg) 01425 f.write('\n') 01426 f.close() 01427 01428 if eml!='' and eml.count(' ')==0: 01429 #send email 01430 #import smtplib 01431 #from email.mime.text import MIMEText 01432 #mal=MIMEText(msg) 01433 #mal['Subject']='your parallel job finished' 01434 #me=os.environ['USER'] 01435 #mal['From']=me 01436 #mal['To']=i 01437 #s=smtplib.SMTP() 01438 #s.connect() 01439 #s.sendmail(me, [i], mal.as_string()) 01440 #s.close() 01441 01442 msgf='/tmp/emailmsg.txt' 01443 f=open(msgf, 'w') 01444 f.write(msg) 01445 f.write('\n') 01446 f.close() 01447 cmd='/bin/mail -s "parallel job \''+jobname+\ 01448 '\' finished" '+eml+' < '+msgf 01449 os.system(cmd) 01450 01451 for j in rmv: 01452 self.remove_record(j) 01453 01454 self._monitor_running = False 01455 01456 01457 def start_monitor(self): 01458 """Start monitoring execution status of submitted no-block jobs 01459 01460 Normally, one does not call this function directly. The init_cluster 01461 will call this function. 01462 01463 """ 01464 if not self._configdone: 01465 return 01466 self._monitor_on=True 01467 return thread.start_new_thread(self.check_status, (True,)) 01468 01469 def stop_monitor(self): 01470 """Stop monitoring execution status of submitted no-block jobs 01471 01472 Normally, one does not call this function directly. 01473 01474 """ 01475 if not self._configdone: 01476 return 01477 self._monitor_on=False 01478 while (self._monitor_running): 01479 time.sleep(1) 01480 01481 def show_queue(self): 01482 """Display job queue. 01483 01484 Example: 01485 CASA <2>: from simple_cluster import simple_cluster 01486 CASA <3>: sl=simple_cluster() 01487 CASA <4>: sl.init_cluster("my_cluster", "csplit") 01488 CASA <5>: sl.simple_split('/lustre/casa-store/hye/10B-209a_5s.ms/', 01489 'you@nrao.edu:3rd split') 01490 CASA <6>: sl.show_queue() 01491 01492 """ 01493 if not self._configdone: 01494 return 01495 return self._cluster.queue_status() 01496 01497 def get_status(self, long=False): 01498 """Display job execution status. 01499 01500 Keyword arguments: 01501 long -- whether or not to display detailed execution status info 01502 01503 Example: 01504 CASA <2>: from simple_cluster import simple_cluster 01505 CASA <3>: sl=simple_cluster() 01506 CASA <4>: sl.init_cluster("my_cluster", "csplit") 01507 CASA <5>: sl.simple_split('/lustre/casa-store/hye/10B-209a_5s.ms/', 01508 'you@nrao.edu:3rd split') 01509 CASA <6>: sl.get_status() 01510 engine status time(s) start command title 01511 0 done 31 16:41:56 split 15 01512 2 scheduled 0 split 78 01513 7 done 41 16:42:38 split 16 01514 9 running 51 16:42:59 split 17 01515 1 done 36 16:41:56 split 18 01516 01517 """ 01518 01519 if not self._configdone: 01520 return 01521 if long: 01522 return self._jobs 01523 else: 01524 if len(self._jobs.keys())==0: 01525 return self._jobs 01526 else: 01527 print 'engine status time(s) start command title' 01528 for job in self._jobs.keys(): 01529 print "%6d%10s%9d%10s%9s%8d" % (self._jobs[job]['engine'], 01530 self._jobs[job]['status'], 01531 int(self._jobs[job]['time']), 01532 '' if type(self._jobs[job]['start'])==str 01533 else 01534 time.strftime("%H:%M:%S", 01535 time.localtime(self._jobs[job]['start'])), 01536 self._jobs[job]['short'].strip()[:9], 01537 self._jobs[job]['jobname']) 01538 01539 def get_jobId(self, status): 01540 """Get a list of jobs of the given status 01541 01542 Keyword arguments: 01543 status -- the job status or the job title 01544 01545 Example: 01546 CASA <2>: from simple_cluster import simple_cluster 01547 CASA <3>: sl=simple_cluster() 01548 CASA <4>: sl.init_cluster("my_cluster", "csplit") 01549 CASA <5>: sl.simple_split('/lustre/casa-store/hye/10B-209a_5s.ms/', 01550 'you@nrao.edu:3rd split') 01551 CASA <6>: sl.get_jobId('done') 01552 01553 """ 01554 01555 if not self._configdone: 01556 return [] 01557 if len(self._jobs.keys())==0: 01558 return [] 01559 else: 01560 jobId=[] 01561 for job in self._jobs.keys(): 01562 if self._jobs[job]['status']==status: 01563 jobId.append(self._jobs[job]['jobname']) 01564 return jobId 01565 01566 def remove_record(self, jobname=None): 01567 """Remove job execution status of a job. 01568 01569 Keyword arguments: 01570 jobname -- the jobname or status of job(s) to be removed from display 01571 01572 if jobName is not specified or is None all jobs are removed. 01573 """ 01574 01575 if not self._configdone: 01576 return 01577 01578 for job in self._jobs.keys(): 01579 if jobname is None: 01580 del self._jobs[job] 01581 elif type(jobname)==int and self._jobs[job]['jobname']==jobname: 01582 del self._jobs[job] 01583 elif type(jobname)==str and self._jobs[job]['status']==jobname: 01584 del self._jobs[job] 01585 01586 ########################################################################### 01587 ### job distribution functions 01588 ########################################################################### 01589 def make_call(self, func, param): 01590 """Make a function call string with function name and parameters. 01591 01592 Keyword arguments: 01593 func -- the name of the function 01594 param -- the dictionary of parameters and values 01595 01596 Example: 01597 CASA <12>: param=dict() 01598 CASA <13>: param['vis']='NGC5921.ms' 01599 CASA <14>: param['spw']='4' 01600 CASA <15>: sl.make_call('flagdata', param) 01601 Out[15]: 'flagdata(vis="NGC5921.ms", spw=4)' 01602 01603 """ 01604 01605 casalog.origin("simple_cluster") 01606 01607 if type(func)!=str or type(param)!=dict: 01608 casalog.post("Func must be a str and param must be a dictionary","WARN","make_call") 01609 return None 01610 cmd=func+'(' 01611 for (k, v) in param.iteritems(): 01612 cmd+=k+'=' 01613 if type(v)==str: 01614 cmd+='"'+v+'"' 01615 elif type(v)==np.ndarray: 01616 cmd+=repr(v) 01617 else: 01618 cmd+=str(v) 01619 cmd+=', ' 01620 cmd=cmd[0:-2] 01621 cmd+=')' 01622 return cmd 01623 01624 def do_and_record(self, cmd, id, group='', subMS=''): 01625 """Submit a function call to an engine and record its execution status. 01626 01627 Keyword arguments: 01628 cmd -- the function call string 01629 id -- the id of the engine to be assigned 01630 group -- the group this cmd belongs to and the receipt of notification 01631 group can be an email address or a label of the job or both 01632 separated by a ':'. Once all the jobs that has the same label 01633 finish, an email notification will be sent. 01634 01635 Example: 01636 CASA <12>: param=dict() 01637 CASA <13>: param['vis']='NGC5921.ms' 01638 CASA <14>: param['spw']='4' 01639 CASA <15>: cmd=sl.make_call('flagdata', param) 01640 CASA <17>: sl.do_and_record(cmd, 7, 'you@nrao.edu:flag ngc5921') 01641 01642 """ 01643 if not self._configdone: 01644 return 01645 job=self._cluster.odo(cmd, id) 01646 self._jobs[job]={} 01647 self._jobs[job]['start']='' 01648 self._jobs[job]['time']=0 01649 if self._enginesRWoffsets.has_key(id): 01650 self._jobs[job]['read_offset']=self._enginesRWoffsets[id]['read_offset'] 01651 self._jobs[job]['write_offset']=self._enginesRWoffsets[id]['write_offset'] 01652 else: 01653 self._jobs[job]['read_offset']=0 01654 self._jobs[job]['write_offset']=0 01655 self._jobs[job]['command']=cmd 01656 if len(cmd)<9: 01657 self._jobs[job]['short']=cmd 01658 else: 01659 self._jobs[job]['short']=cmd[:str.find(cmd, '(')] 01660 self._jobs[job]['subms']=subMS 01661 self._jobs[job]['status']="scheduled" 01662 self._jobs[job]['engine']=id 01663 self._jobs[job]['jobname']=self._job_title 01664 self._jobs[job]['jobgroup']=group 01665 self._job_title+=1 01666 return self._job_title-1 01667 01668 ########################################################################### 01669 ### result processing functions 01670 ########################################################################### 01671 01672 def list_result(self): 01673 """read the project.result file and write out all labels 01674 01675 Example: 01676 CASA <33>: sl.list_result 01677 Out[33]: 01678 ['#### new split ####################### Mon Mar 14 14:48:08 2011 ####', 01679 '#### flag ngc5921 #################### Wed Mar 16 10:43:12 2011 ####'] 01680 01681 """ 01682 if not self._configdone: 01683 return 01684 f=open(self._project+'.result', 'r') 01685 s=f.readlines() 01686 vec=[] 01687 for line in s: 01688 sLine=line.rstrip() 01689 if str.find(sLine, '#### ')==0: 01690 vec.append(sLine.strip()) 01691 else: 01692 continue 01693 f.close() 01694 return vec 01695 01696 def get_result(self, tm): 01697 """read the project.result file and write out result for a label 01698 01699 Keyword arguments: 01700 tm -- the result label 01701 01702 Example: 01703 CASA <33>: sl.list_result 01704 Out[33]: 01705 ['#### new split ####################### Mon Mar 14 14:48:08 2011 ####', 01706 '#### flag ngc5921 #################### Wed Mar 16 10:43:12 2011 ####'] 01707 CASA <34>: sl.get_result('new split') 01708 Out[34]: 01709 ....ommit... 01710 01711 """ 01712 01713 if not self._configdone: 01714 return 01715 f=open(self._project+'.result', 'r') 01716 s=f.readlines() 01717 reach=False 01718 vec=[] 01719 for line in s: 01720 sLine=line.strip() 01721 if str.find(sLine, '#### ')==0: 01722 if str.count(sLine, ' '+tm+' ')>0 and not reach: 01723 reach=True 01724 else: 01725 reach=False 01726 else: 01727 if reach and sLine!='' and not sLine.startswith('engine'): 01728 vec.append(sLine) 01729 f.close() 01730 return vec 01731 01732 def erase_result(self, tm): 01733 """read the project.result file and erase result for a label 01734 01735 Keyword arguments: 01736 tm -- the result label 01737 01738 Example: 01739 CASA <33>: sl.list_result 01740 Out[33]: 01741 ['#### new split ####################### Mon Mar 14 14:48:08 2011 ####', 01742 '#### flag ngc5921 #################### Wed Mar 16 10:43:12 2011 ####'] 01743 CASA <34>: sl.erase_result('flag ngc5921') 01744 CASA <35>: sl.list_result 01745 Out[35]: 01746 ['#### new split ####################### Mon Mar 14 14:48:08 2011 ####'] 01747 01748 """ 01749 if not self._configdone: 01750 return 01751 if type(tm)!=str or len(tm)==tm.count('#'): 01752 return 01753 f=open(self._project+'.result', 'r') 01754 s=f.readlines() 01755 f.close() 01756 a=-1 01757 b=-1 01758 for line in xrange(len(s)): 01759 sLine=s[line].strip() 01760 if str.find(sLine, '#### ')==0: 01761 if str.count(sLine, ' '+tm+' ')>0 and a==-1: 01762 a=line 01763 else: 01764 b=line 01765 if a>-1 and b>a: 01766 f=open(self._project+'.result', 'w') 01767 f.writelines(s[:a]) 01768 f.writelines(s[b:]) 01769 f.close() 01770 return 01771 01772 def get_output(self, result, item, **kwargs): 01773 """pick from result list the item that meets condistion in kwargs 01774 01775 Keyword arguments: 01776 result -- the result label or the result from running get_result 01777 item -- the result item to get 01778 kwargs -- the conditions to limit the result 01779 01780 Example: 01781 CASA <33>: sl.list_result 01782 Out[33]: 01783 ['#### new split ####################### Mon Mar 14 14:48:08 2011 ####', 01784 '#### flag ngc5921 #################### Wed Mar 16 10:43:12 2011 ####'] 01785 CASA <34>: sl.get_result('new split') 01786 Out[34]: 01787 ['10 done 30 split(vis="/lustre/casa-store/hye/10B-209a_5s.ms", outputvis="/home/casa-dev-10/hye/ptest/csplit/10B-209a_5s-f5-s10.ms", spw="10", datacolumn="DATA", field="J0738+1742")', 01788 '1 done 40 split(vis="/lustre/casa-store/hye/10B-209a_5s.ms", outputvis="/home/casa-dev-07/hye/ptest/csplit/10B-209a_5s-f3-s2.ms", spw="2", datacolumn="DATA", field="J0738+1742")', 01789 '2 done 75 split(vis="/lustre/casa-store/hye/10B-209a_5s.ms", outputvis="/home/casa-dev-07/hye/ptest/csplit/10B-209a_5s-f4-s10.ms", spw="10", datacolumn="DATA", field="2MJ0746")', 01790 ..... many other entries ...] 01791 CASA <35>: sl.get_output('new split', 'outputvis', field='3C84') 01792 Out[35]: 01793 ['/home/casa-dev-07/hye/ptest/csplit/10B-209a_5s-f1-s8.ms', 01794 '/home/casa-dev-10/hye/ptest/csplit/10B-209a_5s-f1-s14.ms', 01795 '/home/casa-dev-08/hye/ptest/csplit/10B-209a_5s-f0-s0.ms'] 01796 01797 """ 01798 01799 casalog.origin("simple_cluster") 01800 01801 if not self._configdone: 01802 return 01803 if type(result)==str: 01804 result=self.get_result(result) 01805 if type(result)!=list: 01806 return [] 01807 if len(result)==0: 01808 return [] 01809 if type(item)!=str: 01810 casalog.post("The item name must be a string","WARN","get_output") 01811 return [] 01812 01813 vals=[] 01814 for key in kwargs: 01815 v=kwargs[key] 01816 if type(v)==str: 01817 vals.append(str(key)+'="'+v+'"') 01818 elif type(v)==np.ndarray: 01819 vals.append(str(key)+'='+repr(v)) 01820 else: 01821 vals.append(str(key)+'='+str(v)) 01822 01823 vec=[] 01824 for i in result: 01825 pick=True 01826 for j in vals: 01827 if str.count(i, j)==0: 01828 pick=False 01829 if pick: 01830 a=str.find(i, item) 01831 a=str.find(i, '=', a) 01832 b=str.find(i, ',', a+1) 01833 if a>=0 and b>=0: 01834 vec.append(i[a+2:b-1]) 01835 return vec 01836 01837 def getVariables(self, varList, engine): 01838 """ 01839 This method will return a list corresponding to all variables 01840 in the varList for the specified engine. This is a 01841 very thin wrapper around the pull method in the cluster. 01842 """ 01843 if not isinstance(varList, list): 01844 varList = [varList] 01845 01846 rtnVal = [] 01847 for var in varList: 01848 pulled = self._cluster.pull(var, engine) 01849 rtnVal.append(pulled[engine]) 01850 01851 return rtnVal 01852 01853 01854 ########################################################################### 01855 ### engine selection functions 01856 ########################################################################### 01857 01858 def use_paths(self, dir_list=[]): 01859 """use engines that most close to the dirs (or ms) 01860 01861 Keyword arguments: 01862 dir_list -- the result label 01863 01864 Example: 01865 CASA <33>: sl.list_result 01866 Out[33]: 01867 ['#### new split ####################### Mon Mar 14 14:48:08 2011 ####', 01868 CASA <34>: sl.get_output('new split', 'outputvis', field='3C84') 01869 Out[34]: 01870 ['/home/casa-dev-07/hye/ptest/csplit/10B-209a_5s-f1-s8.ms', 01871 '/home/casa-dev-07/hye/ptest/csplit/10B-209a_5s-f0-s1.ms', 01872 '/home/casa-dev-07/hye/ptest/csplit/10B-209a_5s-f1-s9.ms'] 01873 CASA <35>: sl.use_paths( 01874 '/home/casa-dev-10/hye/ptest/csplit/10B-209a_5s-f1-s14.ms') 01875 Out[35]: [8] 01876 01877 """ 01878 01879 casalog.origin("simple_cluster") 01880 01881 if not self._configdone: 01882 return 01883 if len(dir_list)==0: 01884 casalog.post("dir_list can not be empty","WARN","use_paths") 01885 return [] 01886 a=[] 01887 if type(dir_list)!=list: 01888 a.append(dir_list) 01889 dir_list=a 01890 if len(dir_list)>0: 01891 int_ok=True 01892 for i in dir_list: 01893 if type(i)!=str: 01894 int_ok=False 01895 if not int_ok: 01896 casalog.post("path name in dir_list must be string","WARN","use_paths") 01897 return [] 01898 01899 a=[] 01900 hst=self.get_hosts() 01901 for i in dir_list: 01902 int_ok=False 01903 for j in range(len(hst)): 01904 if i.count(hst[j][2])>0: 01905 a.append(hst[j][0]) 01906 int_ok=True 01907 if not int_ok: 01908 casalog.post("Could not find a host for %s" % str(i),"WARN","use_paths") 01909 return [] 01910 01911 e=dict() 01912 for k in xrange(len(hst)): 01913 h=hst[k][0] 01914 e[h]=[] 01915 for i in self._cluster.get_engines(): 01916 if i[1]==h: 01917 e[h].append(i[0]) 01918 val=e.values() 01919 key=e.keys() 01920 lenth=[] 01921 pos=[] 01922 for i in xrange(len(val)): 01923 lenth.append(len(val[i])) 01924 pos.append(0) 01925 vec=[] 01926 for k in a: 01927 for s in xrange(len(e)): 01928 if k==key[s]: 01929 if pos[s]==lenth[s]: 01930 pos[s]=0 01931 vec.append(val[s][pos[s]]) 01932 pos[s]+=1 01933 return vec 01934 01935 def use_hosts(self, host_list=[], engines_each=0): 01936 """use engines on the given nodes 01937 01938 Keyword arguments: 01939 host_list -- the list of hosts 01940 engines_each -- number of engines to use on each host 01941 01942 Example: 01943 CASA <45>: sl.get_hosts 01944 Out[45]: 01945 [['casa-dev-07', 4, '/home/casa-dev-07/hye/ptest'], 01946 ['casa-dev-08', 4, '/home/casa-dev-08/hye/ptest'], 01947 ['casa-dev-10', 4, '/home/casa-dev-10/hye/ptest']] 01948 CASA <46>: sl.use_hosts(['casa-dev-07', 'casa-dev-10'], 2) 01949 Out[46]: [8, 9, 0, 1] 01950 01951 """ 01952 01953 casalog.origin("simple_cluster") 01954 01955 if not self._configdone: 01956 return 01957 if len(host_list)==0 and engines_each==0: 01958 return self._cluster.get_ids() 01959 01960 hst=self.get_hosts() 01961 a=[] 01962 if type(host_list)!=list: 01963 a.append(host_list) 01964 host_list=a 01965 if len(host_list)>0: 01966 int_ok=True 01967 for i in host_list: 01968 if type(i)!=str: 01969 int_ok=False 01970 if not int_ok: 01971 casalog.post("host name in host_list must be string","WARN","use_hosts") 01972 return [] 01973 else: 01974 host_list=[] 01975 for j in range(len(hst)): 01976 host_list.append(hst[j][0]) 01977 01978 for i in host_list: 01979 host_ok=False 01980 for j in range(len(hst)): 01981 if hst[j][0]==i: 01982 host_ok=True 01983 if not host_ok: 01984 casalog.post("There is no host with name %s" % str(i),"WARN","use_hosts") 01985 return [] 01986 01987 01988 e=dict() 01989 for k in host_list: 01990 e[k]=[] 01991 for i in self._cluster.get_engines(): 01992 if i[1]==k: 01993 e[k].append(i[0]) 01994 val=e.values() 01995 vec=[] 01996 if engines_each==0: 01997 engines_each=100 01998 for i in xrange(len(val)): 01999 j=0 02000 while j<min(engines_each, len(val[i])): 02001 vec.append(val[i][j]) 02002 j+=1 02003 return vec 02004 02005 def use_engines(self, use_id=[], spreadhost=1): 02006 """use engines on from a given list 02007 02008 Keyword arguments: 02009 use_id -- the list of engine ids 02010 spreadhost -- whether to apply host first policy 02011 02012 Example: 02013 CASA <52>: sl._cluster.get_ids() 02014 Out[52]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] 02015 CASA <54>: sl.use_engines([0, 1, 2, 9]) 02016 Out[54]: [0, 1, 2, 9] 02017 CASA <55>: sl.use_engines() 02018 Out[55]: [4, 8, 0, 5, 9, 1, 6, 10, 2, 7, 11, 3] 02019 02020 """ 02021 02022 casalog.origin("simple_cluster") 02023 02024 if not self._configdone: 02025 return 02026 if len(use_id)>0: 02027 int_ok=True 02028 for i in use_id: 02029 if type(i)!=int: 02030 int_ok=False 02031 if int_ok: 02032 return use_id 02033 else: 02034 casalog.post("Engine id in use_id must be integer","WARN","use_engines") 02035 return [] 02036 elif spreadhost==0: 02037 return self._cluster.get_ids() 02038 else: 02039 e=dict() 02040 hst=self.get_hosts() 02041 for i in range(len(hst)): 02042 e[hst[i][0]]=[] 02043 for i in self._cluster.get_engines(): 02044 e[i[1]].append(i[0]) 02045 val=e.values() 02046 lenth=[] 02047 pos=[] 02048 for i in xrange(len(val)): 02049 lenth.append(len(val[i])) 02050 pos.append(0) 02051 vec=[] 02052 while len(vec)<len(self._cluster.get_ids()): 02053 for i in xrange(len(val)): 02054 if pos[i]<lenth[i]: 02055 vec.append(val[i][pos[i]]) 02056 pos[i]+=1 02057 return vec 02058 02059 ########################################################################### 02060 ### ms knowledge functions 02061 ########################################################################### 02062 def get_msname(self, vis): 02063 """get the ms name of given vis 02064 02065 Keyword arguments: 02066 vis -- the path+name of visibility data 02067 02068 Example: 02069 CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms" 02070 CASA <16>: sl.get_msname(vis) 02071 Out[18]: '10B-209a_5s' 02072 02073 """ 02074 02075 vs=os.path.abspath(vis) 02076 msname=vs[str.rfind(vs,'/')+1:] 02077 if msname.endswith('.ms'): 02078 msname=msname[:str.rfind(msname, '.ms')] 02079 return msname 02080 02081 def get_antenna_diam(self, vis): 02082 """get the diameter of antennas 02083 02084 Keyword arguments: 02085 vis -- the path+name of visibility data 02086 02087 Example: 02088 CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms" 02089 CASA <19>: sl.get_antenna_diam(vis) 02090 Out[19]: 25.0 02091 02092 """ 02093 02094 tb.open(vis+'/ANTENNA') 02095 diams=tb.getcol('DISH_DIAMETER') 02096 diam=np.min(diams) 02097 if diam==0: 02098 diam=np.max(diams) 02099 tb.done() 02100 return diam 02101 02102 def get_mean_reff(self, vis): 02103 """get the mean reference frequency 02104 02105 Keyword arguments: 02106 vis -- the path+name of visibility data 02107 02108 Example: 02109 CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms" 02110 CASA <20>: sl.get_mean_reff(vis) 02111 Out[20]: 6298222222.2222223 02112 02113 """ 02114 02115 tb.open(vis+'/SPECTRAL_WINDOW') 02116 reff=tb.getcol('REF_FREQUENCY') 02117 tb.done() 02118 return reff.mean() 02119 02120 def get_spw_reff(self, vis, spw=0): 02121 """get the reference frequency of spw 02122 02123 Keyword arguments: 02124 vis -- the path+name of visibility data 02125 spw -- the spectral window id 02126 02127 Example: 02128 CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms" 02129 CASA <21>: sl.get_spw_reff(vis, 8) 02130 Out[21]: 5056000000.0 02131 02132 """ 02133 02134 casalog.origin("simple_cluster") 02135 02136 tb.open(vis+'/SPECTRAL_WINDOW') 02137 if spw<0 or spw>=tb.nrows(): 02138 casalog.post("Spectral window not available: %s" % str(spw),"WARN","get_spw_reff") 02139 return 02140 spw_reff=tb.getcell('REF_FREQUENCY', spw) 02141 tb.done() 02142 return spw_reff 02143 02144 def get_spw_chan(self, vis, spw=0): 02145 """get the number of channels of spw 02146 02147 Keyword arguments: 02148 vis -- the path+name of visibility data 02149 spw -- the spectral window id 02150 02151 Example: 02152 CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms" 02153 CASA <24>: sl.get_spw_chan(vis, 8) 02154 Out[24]: 64 02155 02156 """ 02157 02158 casalog.origin("simple_cluster") 02159 02160 tb.open(vis+'/SPECTRAL_WINDOW') 02161 if spw<0 or spw>=tb.nrows(): 02162 casalog.post("Spectral window not available: %s" % str(spw),"WARN","get_spw_chan") 02163 return 02164 spw_chan=tb.getcell('NUM_CHAN', spw) 02165 tb.done() 02166 return spw_chan 02167 02168 def get_pol_corr(self, vis, pol=0): 02169 """get the number of coorelation of polarization 02170 02171 Keyword arguments: 02172 vis -- the path+name of visibility data 02173 pol -- the polarization id 02174 02175 Example: 02176 CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms" 02177 CASA <31>: sl.get_pol_corr(vis, 0) 02178 Out[31]: 4 02179 02180 """ 02181 02182 casalog.origin("simple_cluster") 02183 02184 tb.open(vis+'/POLARIZATION') 02185 if pol<0 or pol>=tb.nrows(): 02186 casalog.post("Polarization not available: %s" % str(pol),"WARN","get_pol_corr") 02187 return 02188 pol_corr=tb.getcell('NUM_CORR', pol) 02189 tb.done() 02190 return pol_corr 02191 02192 def get_num_field(self, vis): 02193 """get the number of fields 02194 02195 Keyword arguments: 02196 vis -- the path+name of visibility data 02197 02198 Example: 02199 CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms" 02200 CASA <32>: sl.get_num_field(vis) 02201 Out[32]: 6L 02202 02203 """ 02204 02205 tb.open(vis+'/FIELD') 02206 num_field=tb.nrows() 02207 tb.done() 02208 return num_field 02209 02210 def get_field_name(self, vis, id): 02211 """get the name of a field 02212 02213 Keyword arguments: 02214 vis -- the path+name of visibility data 02215 id -- the field id 02216 02217 Example: 02218 CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms" 02219 CASA <35>: sl.get_field_name(vis, 5) 02220 Out[35]: 'J0738+1742' 02221 02222 """ 02223 02224 casalog.origin("simple_cluster") 02225 02226 tb.open(vis+'/FIELD') 02227 if id<0 or id>=tb.nrows(): 02228 casalog.post("Field not available: %s" % str(id),"WARN","get_field_name") 02229 return 02230 fn=tb.getcell('NAME', id) 02231 tb.done() 02232 return fn 02233 02234 def get_num_spw(self, vis): 02235 """get the number of spectral windows 02236 02237 Keyword arguments: 02238 vis -- the path+name of visibility data 02239 02240 Example: 02241 CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms" 02242 CASA <36>: sl.get_num_spw(vis) 02243 Out[36]: 18L 02244 02245 """ 02246 02247 tb.open(vis+'/SPECTRAL_WINDOW') 02248 num_spw=tb.nrows() 02249 tb.done() 02250 return num_spw 02251 02252 def get_num_desc(self, vis): 02253 """get number of data descriptions 02254 02255 Keyword arguments: 02256 vis -- the path+name of visibility data 02257 02258 Example: 02259 CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms" 02260 CASA <37>: sl.get_num_desc(vis) 02261 Out[37]: 18L 02262 02263 """ 02264 02265 tb.open(vis+'/DATA_DESCRIPTION') 02266 num_desc=tb.nrows() 02267 tb.done() 02268 return num_desc 02269 02270 def get_spw_id(self, vis, desc=0): 02271 """get spectral window id for desc 02272 02273 Keyword arguments: 02274 vis -- the path+name of visibility data 02275 desc -- the data description id 02276 02277 Example: 02278 CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms" 02279 CASA <38>: sl.get_spw_id(vis, 17) 02280 Out[38]: 17 02281 02282 """ 02283 02284 casalog.origin("simple_cluster") 02285 02286 tb.open(vis+'/DATA_DESCRIPTION') 02287 if desc<0 or desc>=tb.nrows(): 02288 casalog.post("DDI not available: %s" % str(desc),"WARN","get_spw_id") 02289 return 02290 spw_id=tb.getcell('SPECTRAL_WINDOW_ID', desc) 02291 tb.done() 02292 return spw_id 02293 02294 def get_pol_id(self, vis, desc=0): 02295 """get polarization id for desc 02296 02297 Keyword arguments: 02298 vis -- the path+name of visibility data 02299 desc -- the data description id 02300 02301 Example: 02302 CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms" 02303 CASA <39>: sl.get_pol_id(vis, 17) 02304 Out[39]: 0 02305 02306 """ 02307 02308 casalog.origin("simple_cluster") 02309 02310 tb.open(vis+'/DATA_DESCRIPTION') 02311 if desc<0 or desc>=tb.nrows(): 02312 casalog.post("DDI not available: %s" % str(desc),"WARN","get_pol_id") 02313 return 02314 pol_id=tb.getcell('POLARIZATION_ID', desc) 02315 tb.done() 02316 return pol_id 02317 02318 def get_field_desc(self, vis): 02319 """get source 02320 02321 Keyword arguments: 02322 vis -- the path+name of visibility data 02323 02324 Example: 02325 CASA <15>: vis="/lustre/casa-store/hye/10B-209a_5s.ms" 02326 CASA <40>: sl.get_field_desc(vis) 02327 Out[40]: 02328 {(0, 0): {'cost': [0, 82], 02329 'desc': 0, 02330 'field': 0, 02331 'nchan': 64, 02332 'ncorr': 4, 02333 'nrows': 16848, 02334 'pol': 0, 02335 'spw': 0}, 02336 (0, 1): {'cost': [0, 82], 02337 'desc': 1, 02338 'field': 0, 02339 'nchan': 64, 02340 'ncorr': 4, 02341 'nrows': 16848, 02342 'pol': 0, 02343 'spw': 1} 02344 ... ommit ... 02345 } 02346 02347 """ 02348 02349 tb.open(vis) 02350 nrows=tb.nrows() 02351 tb.done() 02352 k=nrows/1000 02353 t=nrows%1000 02354 if t>0: 02355 k+=1 02356 b={} 02357 tb.open(vis) 02358 for i in xrange(k): 02359 field=tb.getcol('FIELD_ID', i*1000, 1000) 02360 desc=tb.getcol('DATA_DESC_ID', i*1000, 1000) 02361 fd=zip(field, desc) 02362 newset=set(fd) 02363 for j in newset: 02364 if b.has_key(j): 02365 b[j]['nrows']=b[j]['nrows']+fd.count(j) 02366 else: 02367 a={} 02368 a['field']=j[0] 02369 a['desc']=j[1] 02370 a['nrows']=fd.count(j) 02371 b[j]=a 02372 tb.done() 02373 d=0 02374 for i in b.keys(): 02375 b[i]['pol']=self.get_pol_id(vis, b[i]['desc']) 02376 b[i]['spw']=self.get_spw_id(vis, b[i]['desc']) 02377 b[i]['ncorr']=self.get_pol_corr(vis, b[i]['pol']) 02378 b[i]['nchan']=self.get_spw_chan(vis, b[i]['spw']) 02379 b[i]['cost']=b[i]['nchan']*b[i]['ncorr']*b[i]['nrows'] 02380 d+=b[i]['cost'] 02381 d/=len(b.keys()) 02382 for i in b.keys(): 02383 b[i]['cost']=[b[i]['cost']/d, len(b.keys())] 02384 return b 02385 02386 02387 ########################################################################### 02388 ### setup - 02389 ########################################################################### 02390 def init_cluster(self, clusterfile='', project=''): 02391 """Setup the cluster 02392 02393 Keyword arguments: 02394 clusterfile -- the cluster definition file 02395 project -- the name of project (default: 'proj'+timestamp). 02396 02397 A configuration file is an ASCII text file. Each line defines a node 02398 (also called host) with one line per host to be used, and the following 02399 format: 02400 02401 - <hostname>, <number of engines>, <work directory> 02402 - <hostname>, <number of engines>, <work directory>, <fraction of total RAM> 02403 - <hostname>, <number of engines>, <work directory>, <fraction of total RAM>, <RAM per engine> 02404 02405 where the interpretation of the parameters is as follows: 02406 02407 - hostname: Hostname of the target node where the cluster is deployed 02408 02409 NOTE: The hostname has to be provided w/o quotes 02410 02411 - number of engines: Supports in turns 3 different formats 02412 02413 * If provided as an integer >1: It is interpreted as 02414 the actual user-specified maximum number of engines 02415 02416 * If provided as an integer =0: It will deploy as maximum 02417 engines as possible, according to the idle CPU capacity 02418 available at the target node 02419 02420 * If provided as a float between 0 and 1: It is interpreted 02421 as the percentage of idle CPU capacity that the cluster 02422 can use in total at the target node 02423 02424 - work directory: Area in which the cluster will put intermediate 02425 files such as log files, configuration files, and monitoring files 02426 02427 NOTE1: This area has to be accessible from the controller (user) machine, 02428 and mounted in the same path of the filesystem 02429 02430 NOTE2: The path name has to be provided w/o quotes 02431 02432 - fraction of total RAM: Supports in turns 3 different formats: 02433 02434 * If provided as an integer >1: It is interpreted as the actual 02435 user-specified maximum amount of RAM to be used in total at 02436 the target node 02437 02438 * If provided as an integer =0: It will deploy as maximum engines 02439 as possible, according to the free RAM available at target node 02440 02441 * If provided as a float between 0 and 1: It is interpreted as 02442 the percentage of free RAM that the cluster can use in total 02443 at the target node 02444 02445 - RAM per engine: Integer, which is interpreted as the required memory 02446 per engine in MB (default is 512MB) 02447 02448 It is also possible to add comments, by using the # character at the 02449 beginning of the line. Example: 02450 02451 ##################################################### 02452 02453 # CASA cluster configuration file for expert user 02454 orion, 10, /home/jdoe/test/myclusterhome1 02455 m42, 4, /home/jdoe/test/myclusterhome2, 0.6, 1024 02456 antares, 0.6, /home/jdoe/test/myclusterhome3, 0, 2048 02457 02458 ##################################################### 02459 02460 - At host ``orion'': It will deploy up to 10 engines, with working 02461 directory /home/jdoe/test/myclusterhome1, and using as much free 02462 RAM available as possible (up to 90% by default), taking into 02463 account that each engine can use up to 512 MB (the default and minimum) 02464 02465 - At host ``m42'': It will deploy up to 4 engines, with working directory 02466 /home/jdoe/test/myclusterhome2, and using at the most 60% of the free RAM 02467 available, taking into account that each engine can use up to 1024 MB. 02468 02469 - At host ``antares'': It will deploy as many engines as possible, with 02470 working directory /home/jdoe/test/myclusterhome3, using up to 60% of the 02471 idle CPU capacity / cores, and as much free RAM available as possible 02472 (up to 90% by default), taking into account that each engine can use up 02473 to 2048 MB. 02474 02475 Example: 02476 CASA <15>: from simple_cluster import * 02477 CASA <16>: sl=simple_cluster() 02478 CASA <17>: sl.init_cluster('cluster-config.txt', 'ProjectName') 02479 02480 """ 02481 02482 casalog.origin("simple_cluster") 02483 02484 if project==None or type(project)!=str or project.strip()=="": 02485 # Project name must be a non-empty string, otherwise set default 02486 project='cluster_project' 02487 casalog.post("No project specified using default project: " +\ 02488 "cluster_project", "WARN","init_cluster") 02489 02490 if clusterfile==None or type(clusterfile)!=str or clusterfile.strip()=="": 02491 # Cluster file name must be a non-empty string, otherwise generate a default clusterfile 02492 # The default cluster should have: 02493 # * One engine for each core on the current system 02494 # * The working directory should be the cwd 02495 (sysname, nodename, release, version, machine)=os.uname() 02496 msg=nodename+', '+str(0)+', '+os.getcwd() 02497 # jagonzal (CAS-4293): Write default cluster config file in the 02498 # current directory to avoid problems with writing permissions 02499 clusterfile='default_cluster' 02500 f=open(clusterfile, 'w') 02501 f.write(msg) 02502 f.close() 02503 self.__localCluster = True 02504 02505 self.config_cluster(clusterfile, True) 02506 if not self._configdone: 02507 self.__running = False 02508 return False 02509 02510 self.create_project(project) 02511 02512 self.start_cluster() 02513 02514 # Put the cluster object into the global namespace 02515 sys._getframe(len(inspect.stack())-1).f_globals['procCluster'] = self 02516 02517 # Set running status 02518 self.__running = True 02519 02520 return True 02521 02522 02523 ########################################################################### 02524 ### example to distribute clean task over engines 02525 ########################################################################### 02526 def simple_clean(self, vs, nx, ny, mode='channel', email=''): 02527 """Make images with a simple cluster 02528 02529 Keyword arguments: 02530 vs -- the visibility data 02531 nx, ny -- the size (pixels) of the image 02532 mode -- either 'channel' or 'continuum' 02533 email -- the email address to notify the completion 02534 02535 Example: 02536 CASA <15>: from simple_cluster import * 02537 CASA <16>: sl=simple_cluster() 02538 CASA <17>: sl.init_cluster('my_cluster', 'aProj') 02539 CASA <18>: simple_clean( 02540 vis='/home/casa-dev-09/hye/ptest/sim.alma.csv.mid.ms', 02541 nx=256, ny=256, mode='channel') 02542 02543 """ 02544 02545 vis=os.path.abspath(vs) 02546 tb.clearlocks(vis) 02547 02548 # Determine the cell size 02549 diam=self.get_antenna_diam(vis) 02550 freqmean=self.get_mean_reff(vis) 02551 casalog.post("vis: %s Diameter: %s FreqMean: %s" % (vis,str(diam),str(freqmean)),"INFO","simple_clean") 02552 fv=(3.0e8/freqmean/diam)*180*60*60/math.pi 02553 cell=[str(fv/nx)+'arcsec', str(fv/ny)+'arcsec'] 02554 02555 fdspw=self.get_field_desc(vis) 02556 ids=self._cluster.get_ids() 02557 msname=self.get_msname(vis) 02558 02559 if len(fdspw)>len(ids): 02560 # More job chunks than engines, simply distribute by field and spw 02561 i=0 02562 for k in fdspw.values(): 02563 id_i=ids[i] 02564 s={} 02565 s['vis']=vis 02566 fd=str(k['field']) 02567 spw=str(k['spw']) 02568 s['imagename']=self.get_engine_store(id_i)+msname+'-f'+fd+'-s'+spw 02569 s['field']=fd 02570 s['spw']=spw 02571 s['mode']=mode 02572 s['niter']=20000 02573 s['threshold']='0.001mJy' 02574 s['psfmode']='hogbom' 02575 s['imagermode']='csclean' 02576 s['imsize']=[nx, ny] 02577 s['cell']=cell 02578 s['calready']=False 02579 cmd=self.make_call('clean', s) 02580 self.do_and_record(cmd, id_i, email) 02581 i+=1 02582 if i==len(ids): 02583 i=0 02584 else: 02585 # Less job chanks than engines, need further devide by channel 02586 i=0 02587 for k in fdspw.values(): 02588 spwchan=self.get_spw_chan(vis, k['spw']) 02589 02590 nengs=len(ids) 02591 nchan=1 02592 try: 02593 nchan=int(ceil(abs(float(spwchan))/nengs)) 02594 except: 02595 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 02596 # traceback.print_tb(sys.exc_info()[2]) 02597 pass 02598 02599 for j in xrange(len(ids)): 02600 id_i=ids[i] 02601 start=j*nchan 02602 s={} 02603 s['vis']=vis 02604 fd=str(k['field']) 02605 spw=str(k['spw']) 02606 s['imagename']=(self.get_engine_store(id_i)+msname+'-f'+fd+ 02607 '-s'+spw+'-b'+str(start)+'-e'+str(start+nchan)) 02608 s['field']=fd 02609 s['spw']=spw 02610 s['mode']='channel' 02611 s['niter']=20000 02612 s['start']=start 02613 s['nchan']=nchan 02614 s['threshold']='0.001mJy' 02615 s['psfmode']='hogbom' 02616 s['imagermode']='csclean' 02617 s['imsize']=[nx, ny] 02618 s['cell']=cell 02619 s['calready']=False 02620 cmd=self.make_call('clean', s) 02621 self.do_and_record(cmd, id_i, email) 02622 i+=1 02623 if i==len(ids): 02624 i=0 02625 02626 self.get_status() 02627 02628 def simple_split(self, vs, email): 02629 """split by source (field, spw) with parallel engines 02630 02631 Keyword arguments: 02632 vs -- the visibility data 02633 email -- the email address to notify the completion 02634 02635 Example: 02636 CASA <15>: from simple_cluster import * 02637 CASA <16>: sl=simple_cluster() 02638 CASA <17>: sl.init_cluster('my_cluster', 'aProj') 02639 CASA <18): vis='/home/casa-dev-09/hye/ptest/sim.alma.csv.mid.ms', 02640 CASA <18>: simple_split(vis) 02641 02642 """ 02643 vis=os.path.abspath(vs) 02644 tb.clearlocks(vis) 02645 02646 casalog.post("vis: %s" % vis,"INFO","simple_split") 02647 fdspw=self.get_field_desc(vis) 02648 ids=self.use_engines() 02649 msname=self.get_msname(vis) 02650 02651 i=0 02652 for k in fdspw.values(): 02653 id_i=ids[i] 02654 s={} 02655 s['vis']=vis 02656 fd=str(k['field']) 02657 spw=str(k['spw']) 02658 s['outputvis']=self.get_engine_store(id_i)+msname+'-f'+ \ 02659 fd+'-s'+spw+'.ms' 02660 if os.path.exists(s['outputvis']): 02661 os.system('rm -rf '+s['outputvis']) 02662 s['field']=self.get_field_name(vis, k['field']) 02663 s['spw']=spw 02664 s['datacolumn']='DATA' 02665 cmd=self.make_call('split', s) 02666 self.do_and_record(cmd, id_i, email) 02667 i+=1 02668 if i==len(ids): 02669 i=0 02670 self.get_status() 02671 02672 #if __name__ == "__main__": 02673 #from simple_cluster import * 02674 #sl=simple_cluster() 02675 #sl.init_cluster('my_cluster', 'aProj') 02676 #vis='/home/casa-dev-09/hye/ptest/sim.alma.csv.mid.ms', 02677 #simple_split(vis) 02678 02679 #get_status() 02680 02681 #init_cluster('my_cluster', 'my_project') 02682 #simple_clean(vis='/home/casa-dev-09/hye/ptest/sim.alma.csv.mid.ms', 02683 # nx=256, ny=256, mode='channel') 02684 02685 #simple_clean(vis='/home/casa-dev-09/hye/para/sim.alma.csv.mid.ms', 02686 # nx=256, ny=256, mode='channel') 02687 02688 #simple_clean(vis='/home/casa-dev-09/hye/pclean/sim100g_4chan15kRows.ms', 02689 # nx=256, ny=256, mode='channel') 02690 02691 02692 02693 ########################################################################### 02694 ### job management classes 02695 ########################################################################### 02696 02697 class JobData: 02698 """ 02699 This class incapsulates a single job. The commandName is the name 02700 of the task to be executed. The jobInfo is a dictionary of all 02701 parameters that need to be handled. 02702 """ 02703 class CommandInfo: 02704 02705 def __init__(self, commandName, commandInfo, returnVariable): 02706 self.commandName = commandName 02707 self.commandInfo = commandInfo 02708 self.returnVariable = returnVariable 02709 02710 def getReturnVariable(self): 02711 return self.returnVariable 02712 02713 def getCommandLine(self): 02714 firstArgument = True 02715 output = "%s = %s(" % (self.returnVariable, self.commandName) 02716 for (arg,value) in self.commandInfo.items(): 02717 if firstArgument: 02718 firstArgument = False 02719 else: 02720 output += ', ' 02721 if isinstance(value, str): 02722 output += ("%s = '%s'" % (arg, value)) 02723 else: 02724 output += ("%s = " % arg) + str(value) 02725 output += ')' 02726 return output 02727 02728 02729 def __init__(self, commandName, commandInfo = {}): 02730 self._commandList = [] 02731 self.status = 'new' 02732 self.addCommand(commandName, commandInfo) 02733 self._returnValues = None 02734 02735 02736 def addCommand(self, commandName, commandInfo): 02737 """ 02738 Add an additional command to this Job to be exectued after 02739 previous Jobs. 02740 """ 02741 rtnVar = "returnVar%d" % len(self._commandList) 02742 self._commandList.append(JobData.CommandInfo(commandName, 02743 commandInfo, 02744 rtnVar)) 02745 def getCommandLine(self): 02746 """ 02747 This method will return the command line(s) to be executed on the 02748 remote engine. It is usually only needed for debugging or for 02749 the JobQueueManager. 02750 """ 02751 output = '' 02752 for idx in xrange(len(self._commandList)): 02753 if idx > 0: 02754 output += '; ' 02755 output += self._commandList[idx].getCommandLine() 02756 return output 02757 02758 def getCommandNames(self): 02759 """ 02760 This method will return a list of command names that are associated 02761 with this job. 02762 """ 02763 return [command.commandName for command in self._commandList] 02764 02765 02766 def getCommandArguments(self, commandName = None): 02767 """ 02768 This method will return the command arguments associated with a 02769 particular job. 02770 * If commandName is not none the arguments for the command with 02771 that name are returned. 02772 * Otherwise a dictionary (with keys being the commandName and 02773 the value being the dictionary of arguments) is returned. 02774 * If there is only a single command the arguments for that 02775 command are returned as a dictionary. 02776 """ 02777 returnValue = {} 02778 for command in self._commandList: 02779 if commandName is None or commandName == command.commandName: 02780 returnValue[command.commandName] = command.commandInfo 02781 02782 if len(returnValue) == 1: 02783 return returnValue.values()[0] 02784 return returnValue 02785 02786 def getReturnVariableList(self): 02787 return [ci.returnVariable for ci in self._commandList] 02788 02789 def setReturnValues(self, valueList): 02790 self._returnValues = valueList 02791 02792 def getReturnValues(self): 02793 if self._returnValues is not None: 02794 if len(self._returnValues) == 1: 02795 return self._returnValues[0] 02796 return self._returnValues 02797 02798 class JobQueueManager: 02799 def __init__(self, cluster = None): 02800 self.__cluster=cluster 02801 if self.__cluster is None: 02802 self.__cluster = simple_cluster.getCluster() 02803 # jagonzal: Create a cross reference for the simple_cluster using this JobQueue 02804 self.__cluster._JobQueueManager = self 02805 self.__inputQueue = [] 02806 self.__outputQueue = {} 02807 02808 def addJob(self, jobData): 02809 """ 02810 Add another JobData object to the queue of jobs to be executed. 02811 """ 02812 # TODO Check the type of jobData 02813 if not isinstance(jobData,list): 02814 jobData = [jobData] 02815 for job in jobData: 02816 job.status='pending' 02817 self.__inputQueue.append(job) 02818 02819 def clearJobs(self): 02820 """ 02821 Remove all jobs from the queue, this is usually a good idea 02822 before reusing a JobQueueManager. 02823 """ 02824 self.__inputQueue = [] 02825 self.__outputQueue = {} 02826 02827 def getOutputJobs(self, status = None): 02828 """ 02829 This returns all jobs in the output queue which 02830 match the specified status. If no status is specified 02831 the entire list of output jobs is returned. 02832 """ 02833 if status is None: 02834 return self.__outputQueue.values() 02835 02836 returnList = [] 02837 for job in self.__outputQueue.values(): 02838 if job.status == status: 02839 returnList.append(job) 02840 return returnList 02841 02842 def getAllJobs(self): 02843 return self.__outputQueue.values() 02844 02845 def executeQueue(self): 02846 """ 02847 This method causes all jobs to be executed on the available engines 02848 It will block until the jobs are complete. 02849 """ 02850 02851 casalog.origin("simple_cluster") 02852 02853 # Clear the queue of any existing results 02854 self.__cluster.remove_record() 02855 02856 engineList = self.__cluster.use_engines() 02857 02858 # jagonzal (CAS-): When there are 0 engines available an error must have happened 02859 if (len(engineList) < 1): 02860 casalog.post("There are 0 engines available, check the status of the cluster","WARN","executeQueue") 02861 return 02862 02863 casalog.post("Executing %d jobs on %d engines" % 02864 (len(self.__inputQueue), len(engineList)), "INFO","executeQueue") 02865 02866 while len(self.__inputQueue) > 0: 02867 self._checkForCompletedJobs(engineList) 02868 02869 for job in self.__inputQueue[:len(engineList)]: 02870 self.__inputQueue.remove(job) 02871 # This stores each job in the output queue indexed by it's JobID (name) 02872 self.__outputQueue[self.__cluster.do_and_record\ 02873 (job.getCommandLine(), 02874 engineList.pop(), 02875 subMS=job.getCommandArguments()['vis'].split('/').pop()) 02876 ]=job 02877 02878 # Wait for a bit then try again 02879 time.sleep(1) 02880 02881 # Now we need to wait for the rest of the jobs to complete 02882 while self._checkForCompletedJobs(engineList): 02883 time.sleep(1) 02884 02885 02886 def _checkForCompletedJobs(self, engineList): 02887 """ This method will look at all jobs in the status, if they are 02888 marked as done it will: 02889 * update the outputQueue 02890 * add the engine back to the engine list 02891 * remove the report from the status 02892 """ 02893 statusReport = self.__cluster.get_status(True).values() 02894 if len(statusReport) == 0: 02895 # Nothing running 02896 return False 02897 02898 # jagonzal: This seems to be a hook between the cluster queue ang the JobManager queue 02899 for job in self.__cluster.get_status(True).values(): 02900 # Update the status of the Job 02901 self.__outputQueue[job['jobname']].status = job['status'] 02902 02903 if job['status'] == 'done' or job['status'] == 'broken': 02904 if job['status'] == 'done': 02905 # Get the return values is we're successful 02906 self.__outputQueue[job['jobname']].setReturnValues \ 02907 (self.__cluster.getVariables\ 02908 (self.__outputQueue[job['jobname']].\ 02909 getReturnVariableList(),job['engine'])) 02910 02911 engineList.append(job['engine']) 02912 self.__cluster.remove_record(job['jobname']) 02913 return True