casa
$Rev:20696$
|
00001 from IPython.kernel import client 00002 from subprocess import * 00003 import os 00004 import sys 00005 import commands 00006 import string 00007 import atexit 00008 import time 00009 import socket 00010 import types 00011 import inspect 00012 import casadef 00013 import numpy as np 00014 from math import * 00015 from get_user import get_user 00016 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 00017 import traceback 00018 # jagonzal (CAS-4372): Introduce CASA logging system into cluster infrastructure 00019 from casac import * 00020 casalog = casac.logsink() 00021 casalog.setglobal(True) 00022 00023 a=inspect.stack() 00024 stacklevel=0 00025 for k in range(len(a)): 00026 if a[k][1] == "<string>" or (string.find(a[k][1], 'ipython console') > 0 or string.find(a[k][1],"casapy.py") > 0): 00027 stacklevel=k 00028 00029 myf=sys._getframe(stacklevel).f_globals 00030 00031 if myf.has_key('casa') : 00032 casa = myf['casa'] 00033 else: 00034 casa = { } 00035 00036 class cluster(object): 00037 00038 "control cluster engines for parallel tasks" 00039 00040 _instance = None 00041 00042 __client=None 00043 __controller=None 00044 __timestamp=None 00045 __engines=[] 00046 __ipythondir=os.environ['PWD']+'/ipython' 00047 if(os.environ.has_key('IPYTHONDIR')): 00048 __ipythondir=os.environ['IPYTHONDIR'] 00049 __homepath=os.environ['HOME'] 00050 __start_controller_file='start_controller.sh' 00051 __start_engine_file='start_engine.sh' 00052 __stop_node_file='stop_node.sh' 00053 __stop_engine_file='stop_engine.sh' 00054 __stop_controller_file='stop_controller.sh' 00055 __cluster_rc_file='clusterrc.sh' 00056 __user = get_user() 00057 __prefix = '/tmp/' + __user + '-' 00058 __init_now=True 00059 __new_engs=[] 00060 00061 def __new__(cls, *args, **kwargs): 00062 if not cls._instance: 00063 cls._instance = super(cluster, cls).__new__( 00064 cls, *args, **kwargs) 00065 return cls._instance 00066 00067 def __call__(self): 00068 00069 # If there is already a controller, use it 00070 return self 00071 00072 def __init__(self): 00073 """Initialize a Cluster. 00074 00075 A Cluster enables parallel and distributed execution of CASA tasks and tools on a set of networked computers. A culster consists of one controller and one or more engines. Each engine is an independent Python instance that takes Python commands over a network connection. The controller provides an interface for working with a set of engines. A user uses casapy console to command the controller. A password-less ssh access to the computers that hosts engines is required for the communication between controller and engines. 00076 00077 """ 00078 00079 self.__client=None 00080 self.__controller=None 00081 self.__timestamp=None 00082 self.__engines=[] 00083 self.__ipythondir=os.environ['PWD']+'/ipython' 00084 if(os.environ.has_key('IPYTHONDIR')): 00085 self.__ipythondir=os.environ['IPYTHONDIR'] 00086 else: 00087 os.environ['IPYTHONDIR']=self.__ipythondir 00088 self.__homepath=os.environ['HOME'] 00089 if (self.__ipythondir==None or self.__ipythondir==''): 00090 os.environ["IPYTHONDIR"]=os.environ['HOME']+'.casa/ipython' 00091 self.__ipythondir=os.environ['IPYTHONDIR'] 00092 self.__init_now=True 00093 self.__new_engs=[] 00094 atexit.register(cluster.stop_cluster,self) 00095 00096 def _ip(self, host): 00097 """Returns a unique IP address of the given hostname, 00098 i.e. not 127.0.0.1 for localhost but localhost's global IP""" 00099 00100 ip = socket.gethostbyname(host) 00101 00102 if ip == "127.0.0.1": 00103 ip = socket.gethostbyname(socket.getfqdn()) 00104 00105 return ip 00106 00107 def _cp(self, source, host, destination): 00108 """Creates the command to copy the source file to the destination file and destination host, 00109 using either scp or cp for the localhost. This is to avoid the requirement of password-less ssh 00110 in a single host environment.""" 00111 00112 if self._ip(host) == self._ip("localhost"): 00113 cmd = ['cp', source, destination] 00114 else: 00115 cmd = ['scp', source, host + ":" + destination] 00116 00117 return cmd 00118 00119 def _do(self, host, cmd): 00120 """Creates the command line to execute the give command on the given host. 00121 If and only if the host is not localhost, ssh is used.""" 00122 00123 if self._ip(host) == self._ip("localhost"): 00124 return cmd.split(" ") 00125 else: 00126 return ['ssh', '-f', '-q', '-x', host, cmd] 00127 00128 def start_engine(self, node_name, num_engine, work_dir=None, omp_num_nthreads=1): 00129 """Start engines on the given node. 00130 @param node_name The name of the computer to host the engines. 00131 @param num_engine The number of the engines to initialize for this run. 00132 @param work_dir The working directory where outputs and logs from the engines will be stored. If work_dir is not supplied or does not exist, the user's home directory will be used. 00133 Running this command multiple times on the same node is ok. The total number of the engines on the node increases for each run. 00134 Every engine has a unique integer id. The id is the key to send the instructions to the engine. The available engine ids can be obtained by calling get_ids() or get_engines(). 00135 """ 00136 00137 casalog.origin("parallel_go") 00138 00139 # Start controller 00140 if not self.__start_controller(): 00141 casalog.post("The controller is not started","WARN","start_engine") 00142 return False 00143 00144 # Start the engine 00145 out=open('/dev/null', 'w') 00146 err=open('/dev/null', 'w') 00147 cmd = self._cp(self.__ipythondir+'/'+self.__cluster_rc_file, 00148 node_name, 00149 self.__prefix+self.__cluster_rc_file) 00150 p=Popen(cmd, stdout=out, stderr=err) 00151 sts = os.waitpid(p.pid, 0) 00152 if sts[1] != 0: 00153 casalog.post("Command failed: %s" % str(join(cmd)),"WARN","start_engine") 00154 00155 cmd = self._cp(self.__ipythondir+'/'+self.__start_engine_file, 00156 node_name, 00157 self.__prefix+self.__start_engine_file) 00158 00159 p=Popen(cmd, stdout=out, stderr=err) 00160 sts = os.waitpid(p.pid, 0) 00161 if sts[1] != 0: 00162 casalog.post("Command failed: %s" % str(join(cmd)),"WARN","start_engine") 00163 for i in range(1, num_engine+1): 00164 args='bash '+self.__prefix+self.__start_engine_file 00165 cmd = self._do(node_name, args) 00166 q=Popen(cmd) 00167 sts = os.waitpid(q.pid, 0) 00168 if sts[1] != 0: 00169 casalog.post("Command failed: %s" % str(join(cmd)),"WARN","start_engine") 00170 casalog.post("start engine %s on %s" % (i, node_name),"INFO","start_engine") 00171 self.__engines=self.__update_cluster_info(num_engine, work_dir,omp_num_nthreads) 00172 00173 out.close() 00174 err.close() 00175 00176 # jagonzal (CAS-4292): This method crashes when initializing the nodes via __init_nodes, 00177 # so it is deprecated. Instead it is necessary to use directly the start_engine method 00178 # which does not only start the engine, but also initializes it using scripts 00179 def start_cluster(self, cl_file): 00180 """Start engines that listed in a file 00181 00182 @param cl_file The name of the file that defines the engines. 00183 The cl_file is a text file. Each line contains 3 columns with node name, number of engines and work directory separated by space. A line started with # will be ignored. Example: 00184 #----------------------------------------- 00185 #node_name num_of_engines work_dir 00186 casa-dev-01 4 /home/casa-dev-01/hye/cluster 00187 #casa-dev-02 3 /home/casa-dev-02/hye/cluster 00188 subzero 1 /home/subzero/hye/test 00189 #olddog 2 /home/olddog/hye 00190 #----------------------------------------- 00191 00192 start_cluster and start_engine can be used multiple times. 00193 00194 """ 00195 00196 casalog.origin("parallel_go") 00197 00198 # Start controller 00199 if not self.__start_controller(): 00200 casalog.post("The controller is not started","WARN","start_cluster") 00201 return False 00202 00203 # Process the file 00204 try: 00205 clf=open(cl_file, 'r') 00206 lines = clf.readlines() 00207 for line in lines: 00208 if line.startswith('#'): 00209 continue 00210 words = string.split(line) 00211 if len(words) < 3: 00212 casalog.post("The node definition is invalid: %s" % line,"WARN","start_cluster") 00213 continue 00214 try: 00215 int(words[1]) 00216 except: 00217 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 00218 # traceback.print_tb(sys.exc_info()[2]) 00219 continue 00220 00221 # Start all nodes 00222 self.__init_now=False 00223 casalog.post("start_engine(%s,%s,%s)" % (str(words[0]),str(words[1]),str(words[2])),"INFO","start_cluster") 00224 self.start_engine(words[0], int(words[1]), words[2]) 00225 00226 clf.close() 00227 except IOError: 00228 casalog.post("Cluster file '%s' doesn't exist" % cl_file,"SEVERE","start_cluster") 00229 00230 if len(self.__new_engs)>0: 00231 self.__init_nodes(self.__new_engs) 00232 self.__engines=self.__client.pull(['id', 'host', 'pid', 'inited']) 00233 self.__new_engs=[] 00234 self.__init_now=True 00235 00236 def __start_controller(self): 00237 """(Internal) Start the controller. 00238 00239 A user does not need to call this function directly. When a user runs either start_cluster or start_engine, it will check the existence of a valid controller. If the controller does not exist, this function will be called auto matically. All engines will connect to the valid controller. 00240 00241 """ 00242 00243 casalog.origin("parallel_go") 00244 00245 # If there is already a controller, use it 00246 if (self.__controller!=None): 00247 return True 00248 00249 # First of all write bashrc file which is needed by other cluster files 00250 self.__write_bashrc() 00251 00252 # Generate time stamp and write start controller file 00253 from time import strftime 00254 timestamp=strftime("%Y%m%d%H%M%S") 00255 self.__write_start_controller(timestamp) 00256 00257 # Start controller in a detached terminal 00258 cmd = 'bash ' + self.__ipythondir + '/' + self.__start_controller_file 00259 self.__controller=Popen(cmd,shell=True).pid 00260 if (self.__controller==None): 00261 return False 00262 self.__timestamp=timestamp 00263 casalog.post("Controller %s started" % self.__controller ,"INFO","start_controller") 00264 00265 # Now write the rest of the cluster files 00266 self.__write_start_engine() 00267 self.__write_stop_controller() 00268 self.__write_stop_node() 00269 00270 # Wait for controller files to exist 00271 info=self.__ipythondir+'/log/casacontroller-'+str(self.__timestamp)+'-'+str(self.__controller)+'.log' 00272 meng=self.__ipythondir+'/security/casacontroller-mec-'+self.__timestamp+'.furl' 00273 00274 for i in range(1, 15): 00275 if os.path.exists(info): 00276 break 00277 time.sleep(1) 00278 00279 for i in range(1, 15): 00280 if os.path.exists(meng): 00281 break 00282 time.sleep(1) 00283 00284 # Start-up client 00285 self.__client=client.MultiEngineClient(meng) 00286 00287 return True 00288 00289 def __write_start_engine(self): 00290 """(Internal) Create script for starting engines. 00291 00292 The created script will be stored in the user's $IPYTHONDIR. The start_cluster and start_engine will upload the script to the node and execute it in the proper shell. 00293 00294 """ 00295 00296 ef=open(self.__ipythondir+'/'+self.__start_engine_file, 'w') 00297 bash=commands.getoutput("which bash") 00298 ef.write('#!%s\n' % bash) 00299 ef.write('. %s%s\n' % (self.__prefix, self.__cluster_rc_file)) 00300 cmd=commands.getoutput("which ipengine") 00301 ef.write('export contrid=%s\n' % self.__controller) 00302 ef.write('export stamp=%s\n' % self.__timestamp) 00303 ef.write(cmd+' --furl-file='+self.__ipythondir+'/security/casacontroller-engine-'+self.__timestamp+'.furl --logfile='+self.__ipythondir+'/log/casaengine-'+self.__timestamp+'-'+str(self.__controller)+'- 2>&1 | grep -v NullSelection &\n') 00304 ef.close() 00305 00306 def __write_start_controller(self,timestamp): 00307 """ 00308 00309 """ 00310 00311 ef=open(self.__ipythondir+'/'+self.__start_controller_file, 'w') 00312 bash=commands.getoutput("which bash") 00313 ef.write('#!%s\n' % bash) 00314 ef.write('. %s/%s\n' % (self.__ipythondir, self.__cluster_rc_file)) 00315 lfile=self.__ipythondir+'/log/casacontroller-'+timestamp+'-' 00316 ffile=self.__ipythondir+'/security/casacontroller-engine-'+timestamp+'.furl' 00317 efile=self.__ipythondir+'/security/casacontroller-mec-'+timestamp+'.furl' 00318 tfile=self.__ipythondir+'/security/casacontroller-tc-'+timestamp+'.furl' 00319 cmd = commands.getoutput("which ipcontroller") 00320 cmd += ' --engine-furl-file=' + ffile 00321 cmd += ' --multiengine-furl-file=' + efile 00322 cmd += ' --task-furl-file=' + tfile 00323 cmd += ' --logfile=' + lfile 00324 cmd += ' &\n' 00325 ef.write(cmd) 00326 ef.close() 00327 00328 def __write_stop_node(self): 00329 """(Internal) Create script for stoping a node. 00330 00331 The created script will be stored in the user's $IPYTHONDIR. The stop_cluster and stop_engine will upload the script to the node and execute it in the proper shell. 00332 00333 """ 00334 ef=open(self.__ipythondir+'/'+self.__stop_node_file, 'w') 00335 bash=commands.getoutput("which bash") 00336 ef.write('#!%s\n' % bash) 00337 # Stop all engines started by the current controller 00338 ef.write("ps -fu `whoami` | grep ipengine | grep -v grep | grep "+self.__timestamp+" | awk '{print $2}' | xargs kill -TERM>/dev/null") 00339 ef.close() 00340 00341 def __write_stop_controller(self): 00342 """(Internal) Create script for stoping the controller. 00343 00344 The created script will be stored in the user's $IPYTHONDIR. The stop_cluster will execute it in the proper shell. 00345 00346 """ 00347 ef=open(self.__ipythondir+'/'+self.__stop_controller_file, 'w') 00348 bash=commands.getoutput("which bash") 00349 ef.write('#!%s\n' % bash) 00350 ef.write("ps -ef | grep `whoami` | grep ipcontroller | grep -v grep | awk '{print $2}' | xargs kill -TERM >/dev/null") 00351 ef.close() 00352 00353 def __write_bashrc(self): 00354 """(Internal) Create file containning bash startup instructions for the engine host. 00355 00356 When the controller startup, the necessary environment information for running cluster is extracted from the user's current shell (that runs this casapy session) and written to a rc file. The created script will be stored in the user's $IPYTHONDIR. The start_cluster and start_engine will upload the rc file to the nodes and establish the engine environment. 00357 00358 """ 00359 bashrc=open(self.__ipythondir+'/'+self.__cluster_rc_file, 'w') 00360 bash=commands.getoutput("which bash") 00361 bashrc.write("#!%s\n" % bash) 00362 envList=['PATH', 'LD_LIBRARY_PATH', 'IPYTHONDIR', 00363 'CASAPATH', 'CASAARCH', 00364 'PYTHONHOME', '__CASAPY_PYTHONDIR', 00365 'PGPLOT_DEV', 'PGPLOT_DIR', 'PGPLOT_FONT'] 00366 for param in envList: 00367 try: 00368 bashrc.write('export %s="%s"\n' % (param,os.environ[param])) 00369 except: 00370 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 00371 # traceback.print_tb(sys.exc_info()[2]) 00372 pass 00373 00374 bashrc.write("export HOSTNAME=`uname -n`") 00375 bashrc.close() 00376 00377 def stop_engine(self, engine_id): 00378 """Stop an engine. 00379 00380 @param engine_id The id of the engine to be stopped. 00381 If an engine with the given id is in the current cluster, running this function will stop the engine and remove it from the engine list. 00382 00383 """ 00384 00385 casalog.origin("parallel_go") 00386 00387 if type(engine_id).__name__ != 'int': 00388 casalog.post("engine id must be an integer","WARN","stop_engine") 00389 return None 00390 00391 node_name='' 00392 procid=None 00393 for i in self.__engines: 00394 if (i[0]==engine_id): 00395 node_name=i[1] 00396 procid=i[2] 00397 00398 if (node_name=='' or procid is None): 00399 casalog.post("Could not find engine %d" % engine_id,"WARN","stop_engine") 00400 return 00401 00402 ef=open(self.__ipythondir+'/'+self.__stop_engine_file, 'w') 00403 bash=commands.getoutput("which bash") 00404 ef.write('#!%s\n' % bash) 00405 ef.write("kill -9 %d" % procid) 00406 ef.close() 00407 00408 out=open('/dev/null', 'w') 00409 err=open('/dev/null', 'w') 00410 cmd = self._cp(self.__ipythondir+'/'+self.__stop_engine_file, 00411 node_name, 00412 self.__prefix+self.__stop_engine_file) 00413 00414 p=Popen(cmd, stdout=out, stderr=err) 00415 out.close() 00416 err.close() 00417 sts = os.waitpid(p.pid, 0) 00418 args='bash '+self.__prefix+self.__stop_engine_file 00419 Popen(self._do(node_name, args)) 00420 casalog.post("stop engine %d on %s" % (engine_id, node_name),"INFO","stop_engine") 00421 self.__engines=self.__update_cluster_info(-1) 00422 00423 00424 def stop_node(self, node_name): 00425 """Stop a node (a engine-host computer) 00426 00427 @param node_node The node to be stopped. 00428 If a computer with the given name is in the current cluster, running this function will stop all the engines currently running on that node and remove the node and engines from the engine list. This function will not shutdown the computer. 00429 00430 """ 00431 00432 casalog.origin("parallel_go") 00433 00434 if type(node_name).__name__ != 'str': 00435 casalog.post("node_name must be a string","WARN","stop_node") 00436 return None 00437 00438 if self.get_nodes().count(node_name) == 0: 00439 casalog.post("There is no host with name %s" % node_name,"WARN","stop_node") 00440 return None 00441 00442 out=open('/dev/null', 'w') 00443 err=open('/dev/null', 'w') 00444 cmd = self._cp(self.__ipythondir+'/'+self.__stop_node_file, 00445 node_name, 00446 self.__prefix+self.__stop_node_file) 00447 p=Popen(cmd, stdout=out, stderr=err) 00448 out.close() 00449 err.close() 00450 sts = os.waitpid(p.pid, 0) 00451 args='bash '+self.__prefix+self.__stop_node_file 00452 Popen(self._do(node_name, args)) 00453 casalog.post("stop engines on %s" % node_name,"INFO","stop_node") 00454 00455 num_engine=0 00456 for i in self.__engines: 00457 if i[1]==node_name: 00458 num_engine=num_engine-1 00459 00460 self.__engines=self.__update_cluster_info(num_engine) 00461 00462 def __stop_controller(self): 00463 """(Internal) Stop the controller. 00464 00465 This is the last thing for quiting the cluster gracely. 00466 00467 """ 00468 00469 casalog.origin("parallel_go") 00470 00471 # If it is already down 00472 if (self.__controller==None): 00473 return True 00474 00475 import commands 00476 node_name=commands.getoutput("uname -n") 00477 out=open('/dev/null', 'w') 00478 err=open('/dev/null', 'w') 00479 cmd = self._cp(self.__ipythondir+'/'+self.__stop_controller_file, 00480 node_name, 00481 self.__prefix+self.__stop_controller_file) 00482 p=Popen(cmd, stdout=out, stderr=err) 00483 out.close() 00484 err.close() 00485 sts = os.waitpid(p.pid, 0) 00486 args='bash '+self.__prefix+self.__stop_controller_file 00487 Popen(self._do(node_name, args)) 00488 00489 try: 00490 os.remove(self.__ipythondir+'/'+self.__cluster_rc_file) 00491 os.remove(self.__ipythondir+'/'+self.__start_engine_file) 00492 os.remove(self.__ipythondir+'/'+self.__stop_node_file) 00493 os.remove(self.__ipythondir+'/'+self.__stop_controller_file) 00494 os.remove(self.__ipythondir+'/'+self.__stop_engine_file) 00495 except: 00496 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 00497 # traceback.print_tb(sys.exc_info()[2]) 00498 pass 00499 00500 try: 00501 self.__controller=None 00502 except: 00503 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 00504 # traceback.print_tb(sys.exc_info()[2]) 00505 pass 00506 00507 casalog.post("Controller stopped","INFO","stop_controller") 00508 return True 00509 00510 def stop_cluster(self): 00511 """Stop the cluster 00512 00513 This function stops all the running engines and the controller. 00514 00515 """ 00516 00517 # jagonzal (CAS-4292): We have to check the controller instance directly because the method 00518 # start_cluster does not work properly (crashes when initializing the nodes via __init_nodes). 00519 # Actually start_cluster is deprecated, and it is necessary to use directly the start_engine 00520 # method which does not only start the engine, but also initializes it using scripts 00521 if ((self.__controller==None) or (self.__client==None)): 00522 return 00523 # jagonzal (CAS-CHANGE): Do not use brute-force kill to schut down cluster 00524 else: 00525 # Kill the engines and controller using kernel.multiengineclient interface 00526 try: 00527 self.__client.kill(True,self.__engines,False) 00528 del self.__client 00529 except: 00530 traceback.print_exception((sys.exc_info()[0]), (sys.exc_info()[1]), (sys.exc_info()[2])) 00531 # Reset state before doing anything else, otherwise we may try to use one method from the client object 00532 self.__client=None 00533 self.__controller=None 00534 # Update cluster info 00535 self.__engines=[] 00536 # Remove initialization/shut-down scripts 00537 try: 00538 os.remove(self.__ipythondir+'/'+self.__start_controller_file) 00539 os.remove(self.__ipythondir+'/'+self.__cluster_rc_file) 00540 os.remove(self.__ipythondir+'/'+self.__start_engine_file) 00541 os.remove(self.__ipythondir+'/'+self.__stop_node_file) 00542 os.remove(self.__ipythondir+'/'+self.__stop_controller_file) 00543 00544 os.remove(self.__prefix+self.__cluster_rc_file) 00545 os.remove(self.__prefix+self.__start_engine_file) 00546 except: 00547 traceback.print_exception((sys.exc_info()[0]), (sys.exc_info()[1]), (sys.exc_info()[2])) 00548 # jagonzal (CAS-4370): Remove all the ipcontroller/ipengine files because 00549 # otherwise it might confuse future cluster/MultiEngineClient instances 00550 self.wash_logs() 00551 return 00552 00553 ### jagonzal (CAS-4292): Code below is deprecated ### 00554 00555 # shutdown all engines 00556 elist=[] 00557 for i in self.__engines: 00558 elist.append(i[1]) 00559 fruit=set(elist) 00560 00561 for i in fruit: 00562 try: 00563 self.stop_node(i) 00564 except: 00565 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 00566 traceback.print_exception((sys.exc_info()[0]), (sys.exc_info()[1]), (sys.exc_info()[2])) 00567 continue 00568 00569 # shutdone controller 00570 try: 00571 self.__stop_controller() 00572 except: 00573 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 00574 traceback.print_exception((sys.exc_info()[0]), (sys.exc_info()[1]), (sys.exc_info()[2])) 00575 pass 00576 00577 try: 00578 # jagonzal (CAS-4106): We have to shut down the client, not activate it 00579 # besides, the activate method only enables parallel magic commands 00580 self.__client=None 00581 except: 00582 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 00583 traceback.print_exception((sys.exc_info()[0]), (sys.exc_info()[1]), (sys.exc_info()[2])) 00584 pass 00585 00586 def wash_logs(self): 00587 """Clean up the cluster log files. 00588 00589 A set of logs containing controller-engine information will be created every time a cluster is created. This function deletes all cluster log files that cumulated in the user's $IPYTHONDIR, if there is no active cluster running. (The files will be removed only before starting any engine of after stoping the whole cluster. 00590 00591 """ 00592 # do this only if no controller running 00593 if (self.__controller!=None): 00594 return True 00595 00596 # jagonzal (CAS-4370): Remove all the ipcontroller/ipengine files because 00597 # otherwise it might confuse future cluster/MultiEngineClient instances 00598 os.system("rm -rf %s/log/*" % self.__ipythondir) 00599 os.system("rm -rf %s/security/*" % self.__ipythondir) 00600 00601 def __init_nodes(self, i): 00602 """(Internal) Initialize engines 00603 00604 @param i The list of the engine ids 00605 An engine is a Python interpreter. To make an engine capable of running CASA tasks and tools, we must setup the environment and import necessary modules. This function effectively make every engine a running CASA instance (except that it is a non-interactive CASA running in Python, in contrast the casapy that is an interactive CASA running in IPython). 00606 00607 """ 00608 00609 casalog.origin("parallel_go") 00610 casalog.post("Initialize engines %s" %str(i),"INFO","init_nodes") 00611 00612 self.__client.push({'casa': casa }) 00613 self.__client.execute('import os', i) 00614 self.__client.execute('if os.path.isdir(work_dir):os.chdir(work_dir)\nelse:work_dir=os.environ["HOME"]', i) 00615 phome='' 00616 try: 00617 phome=os.environ["PYTHONHOME"] 00618 except: 00619 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 00620 # traceback.print_tb(sys.exc_info()[2]) 00621 pass 00622 00623 if phome=='': 00624 try: 00625 v=str.split(os.environ["CASAPATH"], ' ') 00626 phome=v[0]+'/'+v[1] 00627 except: 00628 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 00629 # traceback.print_tb(sys.exc_info()[2]) 00630 pass 00631 00632 dhome='' 00633 try: 00634 dhome=os.environ["CASAARCH"] 00635 except: 00636 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 00637 # traceback.print_tb(sys.exc_info()[2]) 00638 pass 00639 00640 if phome=='': 00641 casalog.post("could not locate casa_in_py.py","SEVERE","init_nodes") 00642 return None 00643 00644 if (dhome!=phome): 00645 phome=dhome 00646 00647 sdir = casadef.python_library_directory + '/' 00648 self.__client.push(dict(phome=phome), i) 00649 self.__client.execute('import sys', i) 00650 self.__client.push(dict(sdir=sdir), i) 00651 self.__client.execute('scriptdir=sdir', i) 00652 00653 self.__client.execute('sys.path.insert(2, scriptdir)', i) 00654 try: 00655 self.__client.execute("execfile(scriptdir+'casa_in_py.py')", i) 00656 self.__client.execute('inited=True', i) 00657 except client.CompositeError, exception: 00658 casalog.post("Error initializing engine %s: %s" % (str(i), str(exception)),"SEVERE","init_nodes") 00659 exception.print_tracebacks() 00660 except: 00661 casalog.post("Error initializing engine %s" % str(i),"SEVERE","init_nodes") 00662 traceback.print_tb(sys.exc_info()[2]) 00663 00664 00665 def reset_cluster(self): 00666 """Re-initialize the engines. 00667 00668 This function reset the running environment for all the available engines. 00669 00670 """ 00671 00672 casalog.origin("parallel_go") 00673 00674 if self.__client is None: 00675 casalog.post("Multiengineclient is not initialized","WARN","reset_cluster") 00676 return None 00677 00678 try: 00679 tobeinit=self.__client.pull('id') 00680 except: 00681 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 00682 # traceback.print_tb(sys.exc_info()[2]) 00683 return None 00684 00685 00686 if len(tobeinit)>0: 00687 self.__init_nodes(tobeinit) 00688 self.__engines=self.__client.pull(['id', 'host', 'pid', 'inited']) 00689 00690 def __update_cluster_info(self, num_engine, work_dir=None,omp_num_nthreads=1): 00691 """(Internal) Construct the list of engines. 00692 00693 @param num_engine The number of new engines 00694 @param work_dir The initial working directory 00695 This function appends num_engine engines to the engine list and setup initial Python environment on them. Before further initialization, an engine can only run Python programs (it can not run CASA tasks or tools). 00696 00697 """ 00698 00699 casalog.origin("parallel_go") 00700 00701 if self.__client is None : 00702 casalog.post("Controller is not initialized","WARN","update_cluster_info") 00703 return [] 00704 00705 engs=len(self.__engines)+num_engine 00706 if engs<0: 00707 engs=0 00708 i=0 00709 idlist=self.__client.get_ids() 00710 while (len(idlist)!=engs and i<10): 00711 idlist=self.__client.get_ids() 00712 time.sleep(1) 00713 i=i+1 00714 00715 # Here we only take care of the quick-init-abel items 00716 # The init of real casa_in_py will be done in parallel 00717 tobeinit=[] 00718 for i in idlist: 00719 inited=False 00720 try: 00721 inited=self.__client.pull('inited', i) 00722 except: 00723 00724 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 00725 # traceback.print_tb(sys.exc_info()[2]) 00726 00727 tobeinit.append(i) 00728 self.__client.execute('id=%d'%i, i) 00729 self.__client.execute('import os', i) 00730 self.__client.execute('import socket', i) 00731 self.__client.execute('host=socket.gethostname()', i) 00732 self.__client.execute('pid=os.getpid()', i) 00733 self.__client.execute('job=None', i) 00734 self.__client.execute('import signal', i) 00735 self.__client.execute('original_sigint_handler = signal.signal(signal.SIGINT,signal.SIG_IGN)', i) 00736 # jagonzal (CAS-4276): New cluster specification file allows to automatically set the number of open MP threads 00737 self.__client.execute("os.environ['OMP_NUM_THREADS']='"+str(omp_num_nthreads)+"'", i) 00738 00739 if work_dir!=None and os.path.isdir(work_dir): 00740 self.__client.push(dict(work_dir=work_dir), i) 00741 else: 00742 self.__client.execute('work_dir=os.environ["HOME"]', i) 00743 00744 # These are environment variabls set for each node at startup. 00745 # It may be better to set as global in this module then pass to each engine when update_cluster_info 00746 00747 self.__client.execute('contrid=os.environ["contrid"]', i) 00748 self.__client.execute('stamp=os.environ["stamp"]', i) 00749 self.__client.execute('inited=False', i) 00750 00751 self.__new_engs.extend(tobeinit) 00752 00753 if self.__init_now: 00754 if len(self.__new_engs)>0: 00755 self.__init_nodes(self.__new_engs) 00756 00757 self.__init_now=True 00758 self.__new_engs=[] 00759 00760 if len(idlist)>0: 00761 return self.__client.pull(['id', 'host', 'pid', 'inited']) 00762 else: 00763 return [] 00764 00765 def get_casalogs(self): 00766 """Get a list of the casa logs for all the current cluster engines. 00767 00768 Each working engine is a CASA instance and saves its own log. This function retrun the list of logs with their full path. One can view the log contents with casalogviewer. 00769 00770 """ 00771 try: 00772 self.__client.execute('tmp=work_dir+"/"+thelogfile') 00773 return self.__client.pull('tmp') 00774 except: 00775 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 00776 # traceback.print_tb(sys.exc_info()[2]) 00777 return None 00778 00779 def read_casalogs(self): 00780 """Read the casa log files. 00781 00782 The current implementation of this function is only a prototype. A multi-log viewer needs to be developed. 00783 00784 """ 00785 00786 casalog.origin("parallel_go") 00787 00788 import os 00789 import string 00790 _logs = self.get_casalogs() 00791 if _logs != None: 00792 files = string.join(_logs, ' ') 00793 os.system("emacs "+files+ "&") 00794 else: 00795 casalog.post("Cannot read casalogs","WARN","read_casalogs") 00796 00797 def pad_task_id(self, b='', task_id=[]): 00798 """Generate a dictionary of id-padded variables 00799 00800 @param b The base name to be padded 00801 @param task_id A list of integers to pad the base name 00802 One way of distributing varaibles to a set of engnines is through python a dictionary. This is a convenience function for quick generating a dictionary of padded names. Example: 00803 x=c.pad_task_id('basename', [3, 5, 8]) 00804 x 00805 {3: 'basename-3', 5: 'basename-5', 8: 'basename-8'} 00806 x=c.pad_task_id([1,3],[0,1,2,3]) 00807 x 00808 {0: '1-0', 1: '3-1', 2: '3-2', 3: '3-3'} 00809 x=c.pad_task_id(['a', 'b','c','d','e'],[0,1,2,3]) 00810 x 00811 {0: 'a-0', 1: 'b-1', 2: 'c-2', 3: 'd-3'} 00812 y=c.pad_task_id(x) 00813 y 00814 {0: 'a-0-0', 1: 'b-1-1', 2: 'c-2-2', 3: 'd-3-3'} 00815 00816 """ 00817 00818 casalog.origin("parallel_go") 00819 00820 base={} 00821 00822 int_id=True 00823 for j in task_id: 00824 if type(j)!=types.IntType or j<0: 00825 casalog.post("Task id %s must be a positive integer" % str(j),"WARN","pad_task_id") 00826 int_id=False 00827 break 00828 if not int_id: 00829 return base 00830 00831 if type(b)==list: 00832 for j in range(len(b)): 00833 if type(b[j])!=str: 00834 b[j]=str(b[j]) 00835 00836 if len(task_id)==0: 00837 task_id=list(xrange(0, len(self.__engines))) 00838 if type(b)==str: 00839 for j in task_id: 00840 base[j]=b+'-'+str(j) 00841 if type(b)==list: 00842 k=len(b) 00843 m=len(task_id) 00844 if m<=k: 00845 for j in range(m): 00846 base[task_id[j]]=b[j]+'-'+str(task_id[j]) 00847 else: 00848 for j in range(k): 00849 base[task_id[j]]=b[j]+'-'+str(task_id[j]) 00850 for j in range(k,m): 00851 base[task_id[j]]=b[k-1]+'-'+str(task_id[j]) 00852 00853 if type(b)==dict: 00854 for i in b.keys(): 00855 base[i]=b[i]+'-'+str(i) 00856 00857 return base 00858 00859 def one_to_n(self, arg, task_id=[]): 00860 """Genrate a dictionary of one variable for n keys 00861 00862 @param arg The variable to be distributed 00863 @param task_id The list of integer ids 00864 One way of distributing varaibles to a set of engnines is through python a dictionary. This is a convenience function for quick generating a dictionary of same variable for n keys. Example: 00865 x=c.one_to_n('basename', [1, 2, 7]) 00866 x 00867 {1: 'basename', 2: 'basename', 7: 'basename'} 00868 00869 """ 00870 00871 casalog.origin("parallel_go") 00872 00873 # assign 1 value to n targets 00874 base={} 00875 int_id=True 00876 for j in task_id: 00877 if type(j)!=types.IntType or j<0: 00878 casalog.post("Task id %s must be a positive integer" % str(j),"WARN","one_to_n") 00879 int_id=False 00880 break 00881 if not int_id: 00882 return base 00883 00884 if len(task_id)==0: 00885 task_id=list(xrange(0, len(self.__engines))) 00886 for j in task_id: 00887 base[j]=arg 00888 return base 00889 00890 def n_to_n(self, args=[], task_id=[]): 00891 """Generate a dictionary of n varables 00892 00893 @param arags A list of n variables 00894 @param task_id A list of n integer ids 00895 One way of distributing varaibles to a set of engnines is through python a dictionary. This is a convenience function for quick generating a dictionary of a set of n variables for n keys. Example: 00896 x=c.n_to_n(['a', 'b', 'c'], [3, 6, 7]) 00897 x 00898 {3: 'a', 6: 'b', 7: 'c'} 00899 00900 """ 00901 00902 casalog.origin("parallel_go") 00903 00904 # Assign n value to n targets 00905 base={} 00906 00907 if len(args)==0: 00908 return base 00909 00910 int_id=True 00911 for j in task_id: 00912 if type(j)!=types.IntType or j<0: 00913 casalog.post("Task id %s must be a positive integer" % str(j),"WARN","n_to_n") 00914 int_id=False 00915 break 00916 if not int_id: 00917 return base 00918 00919 if len(task_id)==0: 00920 task_id=list(xrange(0, len(self.__engines))) 00921 00922 i=-1 00923 for j in task_id: 00924 i=i+1 00925 if i==len(args): 00926 break 00927 base[j]=args[i] 00928 return base 00929 00930 def split_int(self, start, end, task_id=[]): 00931 """Generate a dictionary to distribute the spectral windows 00932 00933 @param start The start integer value 00934 @param end The end integer value 00935 @param task_id The list of integer ids 00936 This is a convenience function for quick generating a dictionary of integer start points. Example: 00937 x=c.split_int(9, 127, [2,3,4]) 00938 x 00939 {2: 9, 3: 49, 4: 89 } 00940 00941 """ 00942 00943 casalog.origin("parallel_go") 00944 00945 base={} 00946 if len(task_id)==0: 00947 task_id=list(xrange(0, len(self.__engines))) 00948 00949 if len(task_id)==0: 00950 casalog.post("There are no engines available","WARN","split_int") 00951 return base 00952 00953 if type(start)!=int or type(end)!=int: 00954 casalog.post("start and end point must be integer","WARN","split_int") 00955 return base 00956 00957 if start<0: 00958 casalog.post("start point must be greater than 0","WARN","split_int") 00959 return base 00960 00961 if start>=end: 00962 casalog.post("end point must be greate than start point","WARN","split_int") 00963 return base 00964 00965 nx=1 00966 try: 00967 nx=int(ceil(abs(float(end - start))/len(task_id))) 00968 except: 00969 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 00970 # traceback.print_tb(sys.exc_info()[2]) 00971 pass 00972 00973 i=-1 00974 for j in task_id: 00975 i=i+1 00976 if i>=len(task_id): 00977 break 00978 st=i*nx 00979 base[j]=st+start 00980 return base 00981 00982 def split_channel(self, spw, nchan, task_id=[]): 00983 """Generate a dictionary to distribute the spectral windows 00984 00985 @param spw The spectral window 00986 @param nchan The number of channels to split 00987 @param task_id The list of integer ids 00988 One way of distributing a spectral windows to a set of engnines is through python a dictionary. This is a convenience function for quick generating a dictionary of spw expressions. Example: 00989 x=c.split_channel(1, 127, [2,3,4]) 00990 x 00991 {0: '1:0~42', 1: '1:43~85', 2: '1:86~128'} 00992 00993 """ 00994 00995 casalog.origin("parallel_go") 00996 00997 base={} 00998 if len(task_id)==0: 00999 task_id=list(xrange(0, len(self.__engines))) 01000 01001 if len(task_id)==0: 01002 casalog.post("There are no engines available","WARN","split_channel") 01003 return base 01004 01005 if nchan<len(task_id): 01006 casalog.post("There are no enough channels to split","WARN","split_channel") 01007 return base 01008 01009 nx=1 01010 try: 01011 nx=int(ceil(abs(float(nchan))/len(task_id))) 01012 except: 01013 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 01014 # traceback.print_tb(sys.exc_info()[2]) 01015 pass 01016 01017 i=-1 01018 for j in task_id: 01019 i=i+1 01020 if i==len(task_id): 01021 break 01022 st=i*nx 01023 se=st+nx-1 01024 base[j]=str(spw)+":"+str(st)+"~"+str(se) 01025 return base 01026 01027 def pgc(self,*args,**kwargs): 01028 """Parallel execution of commands and/or dictionary 01029 of commands 01030 01031 @param *args any number of commands or dictionary of 01032 commands (where the key of the dictionary is the 01033 engine id) 01034 @param **kwargs available options are 01035 job=<str> or jobname=<str> 01036 block=<True/False> 01037 01038 Example: 01039 c.pgc({0:'ya=3',1:'ya="b"'}) 01040 c.pull('ya') 01041 {0: 3, 1: 'b'} 01042 01043 c.pgc('xa=-1') 01044 c.pull('xa') 01045 {0: -1, 1: -1, 2: -1, 3: -1} 01046 c.pull('job') 01047 Out[23]: {0:'xa=-1', 1:'xa=-1', 2:'xa=-1', 3:'xa=-1'} 01048 01049 """ 01050 01051 casalog.origin("parallel_go") 01052 01053 tasks={} 01054 for j in self.__client.get_ids(): 01055 tasks[j]=[] 01056 01057 for i in args: 01058 if type(i)==types.DictType: 01059 for j in i.keys(): 01060 if type(j)!=types.IntType or j<0: 01061 casalog.post("task id %s must be a positive integer" % str(j),"WARN","pgc") 01062 pass 01063 else: 01064 st='' 01065 if type(i[j])==types.StringType: 01066 st=i[j] 01067 else: 01068 pass 01069 if st!='': 01070 tasks[j].append(st) 01071 elif type(i)==types.StringType: 01072 # For all tasks 01073 for j in xrange(0, len(self.__engines)): 01074 tasks[j].append(i) 01075 else: 01076 casalog.post("command %s must be a string or a dictionary" % str(i),"WARN","pgc") 01077 01078 # May be better to use non-block mode and catch the result 01079 01080 # How to give name, say 'cmd_name', to a set of commands a name such 01081 # that cluster.pull('cmd_name') returns the script is excuteded? 01082 01083 keys = kwargs.keys() 01084 job='NoName' 01085 block=True 01086 for kw in keys: 01087 if kw.lower()=='job' or kw.lower()=='jobname': 01088 job=kwargs[kw] 01089 if kw.lower()=='block': 01090 block=kwargs[kw] 01091 01092 for i in tasks.keys(): 01093 cmd=string.join(tasks[i], '\n') 01094 self.__client.push(dict(job=cmd), i) 01095 01096 return self.__client.execute('exec(job)', 01097 block=block,targets=tasks.keys()) 01098 01099 def parallel_go_commands(self,*args,**kwargs): 01100 """Parallel execution of commands and/or dictionary 01101 of commands 01102 01103 01104 """ 01105 self.pgc(*args,**kwargs) 01106 01107 def pgk(self, **kwargs): 01108 """Parallel execution to set keywords 01109 01110 @param **kwargs keyword args 01111 01112 Example: 01113 x=np.zeros((3,3)) 01114 c.pgk(c={1:x},d=6,t='b',s={0:'y'}) 01115 c.pull('c') 01116 {1: array([[ 0., 0., 0.], 01117 [ 0., 0., 0.], 01118 [ 0., 0., 0.]])} 01119 c.pull('d') 01120 {0: 6, 1: 6} 01121 c.pull('s') 01122 {0: 'y'} 01123 c.pull('t') 01124 {0: 'b', 1: 'b'} 01125 """ 01126 01127 casalog.origin("parallel_go") 01128 01129 tasks={} 01130 for j in self.__client.get_ids(): 01131 tasks[j]=dict() 01132 01133 keys = kwargs.keys() 01134 01135 for kw in keys: 01136 vals=kwargs[kw] 01137 if type(vals)==types.DictType: 01138 for j in vals.keys(): 01139 if type(j)!=types.IntType or j<0: 01140 casalog.post("task id %s must be a positive integer" % str(j),"WARN","pgk") 01141 pass 01142 else: 01143 tasks[j][kw]=vals[j] 01144 else: 01145 for j in tasks.keys(): 01146 tasks[j][kw]=vals 01147 01148 for i in tasks.keys(): 01149 self.__client.push(tasks[i], i, True) 01150 01151 return tasks 01152 01153 def make_command(self, func, **kwargs): 01154 """Make command strings to be distributed to engines 01155 01156 @func function name 01157 @kwargs **kwargs available 01158 01159 Example: 01160 x=np.ones((3,3)) 01161 c.make_command(func=None,c={1:x},d=6,t='b',s={0:'y'}) 01162 {0: 's="y"; t="b"; d=6', 01163 1: 'c=array([[ 1., 1., 1.],\n[ 1., 1., 1.],\n[ 1., 1., 1.]]); t="b"; d=6'} 01164 c.make_command(func='g',c={1:x},d=6,t='b',s={0:'y'}) 01165 {0: 'g(s="y", t="b", d=6)', 01166 1: 'g(c=array([[1., 1., 1.],\n[1., 1., 1.],\n[1., 1., 1.]]), t="b", d=6)'} 01167 """ 01168 01169 casalog.origin("parallel_go") 01170 01171 tasks=self.pgk(**kwargs) 01172 01173 if func!=None and type(func)!=str: 01174 casalog.post("func must be a str","WARN","make_command") 01175 return None 01176 01177 if len(tasks)==0: 01178 casalog.post("Parameters not specified","WARN","make_command") 01179 return None 01180 01181 if func==None or len(str.strip(func))==0: 01182 func='' 01183 01184 func=str.strip(func) 01185 01186 cmds=dict() 01187 for i in tasks.keys(): 01188 cmd='' 01189 for (k, v) in tasks[i].iteritems(): 01190 cmd+=k+'=' 01191 if type(v)==str: 01192 cmd+='"'+v+'"' 01193 elif type(v)==np.ndarray: 01194 cmd+=repr(v) 01195 else: 01196 cmd+=str(v) 01197 if func=='': 01198 cmd+='; ' 01199 else: 01200 cmd+=', ' 01201 cmd=cmd[0:-2] 01202 if func!='': 01203 cmd=func+'('+cmd+')' 01204 cmds[i]=cmd 01205 01206 return cmds 01207 01208 01209 def parallel_go_keywords(self, **kwargs): 01210 """Parallel execution to set keywords 01211 01212 01213 """ 01214 self.pgk(**kwargs) 01215 01216 def hello(self): 01217 """Parallel execution to print 'hello' message from all engines 01218 01219 01220 """ 01221 01222 casalog.origin("parallel_go") 01223 01224 casalog.post("Hello CASA Controller","INFO","hello") 01225 01226 if self.get_engines() != []: 01227 return self.__client.execute('casalog.origin("parallel_go");casalog.post("Hello CASA Controller","INFO","hello")') 01228 else: 01229 return None 01230 01231 def __set_cwds(self, clusterdir): 01232 """Set current working dir for all engines 01233 01234 01235 """ 01236 # This is not very useful because dirs are generally different cross nodes 01237 self.__client.execute('import os') 01238 self.__client.push(dict(clusterdir=clusterdir)) 01239 self.__client.execute('os.chdir(clusterdir)') 01240 self.__client.execute("user=self.__user") 01241 self.__client.execute('print user') 01242 self.__client.execute('import socket') 01243 self.__client.execute('host=socket.gethostname()') 01244 self.__client.execute('print host') 01245 self.__client.execute('print os.getcwd()') 01246 01247 def get_ids(self): 01248 """get ids for all available engines 01249 01250 01251 """ 01252 try: 01253 return self.__client.get_ids() 01254 except: 01255 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 01256 # traceback.print_tb(sys.exc_info()[2]) 01257 return [] 01258 01259 def get_nodes(self): 01260 """get hostnames for all available engines 01261 01262 01263 """ 01264 from sets import Set 01265 elist=[] 01266 for i in self.__engines: 01267 elist.append(i[1]) 01268 return list(Set(elist)) 01269 01270 def get_engines(self): 01271 """get current status of the engines 01272 01273 01274 """ 01275 return self.__engines 01276 01277 def get_stdout(self,cmd): 01278 """get the standard output from all engines for execting a comment 01279 01280 01281 """ 01282 return commands.getstatusoutput(cmd) 01283 01284 def pdo(self,job): 01285 """parallel execution of a job 01286 01287 01288 """ 01289 return self.__client.execute(job) 01290 01291 def odo(self,job,nodes): 01292 """execute a job on a subset of engines 01293 01294 01295 """ 01296 return self.__client.execute(job,block=False,targets=nodes) 01297 01298 def execute(self,job,nodes): 01299 """execute a job on a subset of engines in blocking mode 01300 01301 01302 """ 01303 return self.__client.execute(job,block=True,targets=nodes) 01304 01305 def queue_status(self): 01306 """query to queue status 01307 01308 01309 """ 01310 return self.__client.queue_status() 01311 01312 def clear_queue(self): 01313 """remove all jobs from the queue 01314 01315 01316 """ 01317 return self.__client.clear_queue() 01318 01319 def get_timer(self, timer=''): 01320 """get the eleapsed time for a timer 01321 01322 """ 01323 01324 casalog.origin("parallel_go") 01325 01326 base={} 01327 prop=self.__client.get_properties() 01328 for i in self.get_ids(): 01329 try: 01330 ky=prop[i]['timertype'] 01331 if ky=='proc': 01332 end=time.clock() 01333 else: 01334 end=time.time() 01335 base[i]='%.2f sec' % (end-prop[i][timer]) 01336 except: 01337 pass 01338 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 01339 # traceback.print_tb(sys.exc_info()[2]) 01340 01341 casalog.post("Timer: %s" % str(base),"INFO","get_timer") 01342 return 01343 01344 def set_timer(self,timer='timer',type='proc', 01345 targets=None,block=None): 01346 """set a timer 01347 01348 01349 """ 01350 01351 if self.__client==None: 01352 return 01353 01354 properties={} 01355 if type=='proc': 01356 properties[timer]=time.clock() 01357 else: 01358 properties[timer]=time.time() 01359 01360 properties['timertype']=type 01361 01362 self.__client.set_properties(properties, 01363 targets, block) 01364 01365 def del_timer(self, timer=['']): 01366 """delete a timer 01367 01368 """ 01369 01370 casalog.origin("parallel_go") 01371 01372 for i in self.get_ids(): 01373 self.__client.del_properties(timer, i) 01374 casalog.post("Delete timer %s %s" % (str(timer),str(i)),"INFO","del_timer") 01375 01376 return 01377 01378 def get_properties(self): 01379 """get the set properties from all engines 01380 01381 01382 """ 01383 return self.__client.get_properties() 01384 01385 def set_properties(self, properties, 01386 targets=None, block=None): 01387 """set properties for target engines 01388 01389 @param properties a dictionary its keys are 01390 01391 01392 """ 01393 self.__client.set_properties( 01394 properties, targets, block) 01395 01396 01397 def keys(self): 01398 """get all keys from all engines 01399 01400 01401 """ 01402 return self.__client.keys() 01403 01404 def push(self, **kwargs): 01405 """set values to the engines 01406 @param kekword value to distribute 01407 @param targets, the engines of interest 01408 By default, this function set the keyword values to all engines. 01409 To set values on a subset of engines, use kekword parameter targets, 01410 whick takes integer or array of integer of engine ids. 01411 You can also use function pgk to set values onto the engines. 01412 Example: 01413 c.push(a=[1,3,7.1]) 01414 c.pull('a') 01415 {0: [1, 3, 7.0999999999999996], 1: [1, 3, 7.0999999999999996]} 01416 c.push(b=[1.2,3.7], targets=1) 01417 c.pull('b',[1]) 01418 {1: [1.2, 3.7000000000000002]} 01419 c.pull('b') 01420 {1: [1.2, 3.7000000000000002]} 01421 01422 """ 01423 01424 casalog.origin("parallel_go") 01425 01426 keys = kwargs.keys() 01427 #keys.sort() 01428 if len(keys)==0: 01429 return False 01430 01431 tgt=[] 01432 targets=None 01433 for kw in keys: 01434 if kw.lower()=='targets': 01435 targets=kwargs[kw] 01436 break 01437 01438 if targets=='all' or targets==None or \ 01439 type(targets)==list and len(targets)==0: 01440 tgt=list(xrange(0, len(self.__engines))) 01441 elif type(targets)==list: 01442 for j in targets: 01443 if type(j)==types.IntType and j>=0: 01444 tgt.append(j) 01445 elif type(targets)==int and targets>=0: 01446 tgt.append(targets) 01447 01448 if len(tgt)==0: 01449 casalog.post("There are no target engines","WARN","push") 01450 return False 01451 01452 ok=True 01453 for i in tgt: 01454 try: 01455 self.__client.push(dict(kwargs),i) 01456 except: 01457 pass 01458 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 01459 # traceback.print_tb(sys.exc_info()[2]) 01460 01461 return ok 01462 01463 def pull(self, key, targets='all'): 01464 """get the value of a key 01465 @param key the var of interest 01466 @param targets, the engines of interest 01467 Example: 01468 c.pgc({0:'ya=3',1:'ya="b"'}) 01469 c.pull('ya') 01470 {0: 3, 1: 'b'} 01471 c.pull('ya',[1]) 01472 {1: 'b'} 01473 c.pull('ya',1) 01474 {1: 'b'} 01475 01476 """ 01477 01478 casalog.origin("parallel_go") 01479 01480 base={} 01481 tgt=[] 01482 if targets=='all' or \ 01483 type(targets)==list and len(targets)==0: 01484 tgt=list(xrange(0, len(self.__engines))) 01485 elif type(targets)==list: 01486 for j in targets: 01487 if type(j)==types.IntType and j>=0: 01488 tgt.append(j) 01489 elif type(targets)==int and targets>=0: 01490 tgt.append(targets) 01491 01492 if len(tgt)==0: 01493 casalog.post("There are no target engines","WARN","push") 01494 return base 01495 01496 for i in tgt: 01497 rslt=None 01498 try: 01499 rslt=self.__client.pull(key,i) 01500 except: 01501 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 01502 # traceback.print_tb(sys.exc_info()[2]) 01503 pass 01504 if rslt!=None: 01505 base[i]=rslt[0] 01506 01507 return base 01508 01509 def get_result(self, i): 01510 """get the result of previous execution 01511 01512 01513 """ 01514 01515 casalog.origin("parallel_go") 01516 01517 # jagonzal (CAS-4375): We have to capture the engine's exceptions at this level 01518 try: 01519 res = self.__client.get_result()[i] 01520 except client.CompositeError, exception: 01521 casalog.post("Error retrieving result from engine %s: %s" % (str(i),str(exception)),"SEVERE","get_result") 01522 exception.print_tracebacks() 01523 res = None 01524 except: 01525 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 01526 casalog.post("Error retrieving result from engine %s" % (str(i)),"SEVERE","get_result") 01527 traceback.print_tb(sys.exc_info()[2]) 01528 01529 return res 01530 01531 def activate(self): 01532 """set the cluster to parallel execution mode 01533 01534 01535 """ 01536 return self.__client.activate() 01537 01538 01539 def parallel_go_task(self,taskname=None,outfile='', 01540 target=[],ipython_globals=None): 01541 """ Make parallel tasks using current input values 01542 01543 01544 """ 01545 self.pgt(taskname,outfile,target,ipython_globals) 01546 01547 def pgt(self, taskname=None,outfile='', 01548 target=[],ipython_globals=None): 01549 """ Make parallel tasks using current input values 01550 01551 taskname -- Name of task 01552 default: None = current active task; 01553 example: taskname='bandpass' 01554 <Options: type tasklist() for the complete list> 01555 outfile -- Output file for the task inputs 01556 default: '' = taskname.parallel; 01557 example: outfile=taskname.orion 01558 target -- List of integer parallel engine ids 01559 default: [] = all current active engines; 01560 example: target=[0,2,4] 01561 01562 """ 01563 01564 casalog.origin("parallel_go") 01565 01566 base={} 01567 for j in target: 01568 if type(j)!=types.IntType or j<0: 01569 casalog.post("engine id %s must be a positive integer" % str(j),"WARN","pgt") 01570 return base 01571 01572 if len(target)==0: 01573 target=list(xrange(0, len(self.__engines))) 01574 if len(target)==0: 01575 casalog.post("There are no target engines","WARN","pgt") 01576 return base 01577 01578 try: 01579 if ipython_globals == None: 01580 t=len(inspect.stack())-1 01581 myf=sys._getframe(t).f_globals 01582 else: 01583 myf=ipython_globals 01584 01585 if taskname==None or taskname=='' or \ 01586 type(taskname)!=str: 01587 taskname=myf['taskname'] 01588 01589 if outfile=='' or outfile==None or \ 01590 type(outfile)!=str: 01591 outfile=taskname+'.parallel' 01592 01593 tname=myf[taskname] 01594 if not myf.has_key(taskname) and \ 01595 str(type(tname))!="<type 'instance'>" and \ 01596 not hasattr(tname,"defaults"): 01597 raise TypeError("task %s is not defined " % 01598 taskname) 01599 else: 01600 myf['taskname']=taskname 01601 myf['update_params'](func=myf['taskname'], 01602 printtext=False, ipython_globals=myf) 01603 01604 for j in target: 01605 script=taskname+'(' 01606 for k in myf[taskname].parameters: 01607 par=myf[taskname].parameters[k] 01608 if type(par)==dict: 01609 val=par 01610 for v in par.keys(): 01611 if type(v)==types.IntType and j==v: 01612 val=par[v] 01613 break 01614 elif type(v)==str: 01615 a=-1 01616 try: 01617 a=int(v) 01618 except: 01619 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 01620 # traceback.print_tb(sys.exc_info()[2]) 01621 pass 01622 if a!=-1 and a==j: 01623 val=par[v] 01624 break 01625 if type(val)==str: 01626 script=script+k+"='"+val+"'," 01627 else: 01628 script=script+k+"="+str(val)+"," 01629 elif type(par)==str: 01630 script=script+k+"='"+par+"'," 01631 else: 01632 script=script+k+"="+str(par)+"," 01633 script=script.rstrip(',') 01634 script=script+')' 01635 base[j]=script 01636 return base 01637 except TypeError, e: 01638 casalog.post("TypeError: %s" % str(e),"SEVERE","pgt") 01639 01640 01641 def check_job(self, job, verbose=True): 01642 """check the status of an asynch job 01643 01644 01645 """ 01646 01647 casalog.origin("parallel_go") 01648 01649 if type(job)==type(None): 01650 print "job None has no status" 01651 return True 01652 try: 01653 x=job.get_result(block=False) 01654 if x==None: 01655 if verbose: 01656 casalog.post("job '%s' has not finished yet, result is pending" % job,"INFO","check_job") 01657 return False 01658 else: 01659 if verbose: 01660 casalog.post("job '%s' done" % job,"INFO","check_job") 01661 return True 01662 except: 01663 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework 01664 # traceback.print_tb(sys.exc_info()[2]) 01665 return False 01666 01667 def howto(self): 01668 print """A simple example for use the cluster 01669 from parallel_go import * 01670 c=cluster() 01671 c.start_engine('casa-dev-08',2,'/home/casa-dev-08/hye/cluster') 01672 #can use tb.clearlocks() to remove leftover 01673 c.pgc('default("split")') 01674 c.pgc('inp("split")') 01675 c.pgk(mspath='/home/casa-dev-01/hye/test/') 01676 c.pgk(msfile='ngc5921.ms') 01677 c.pgc('vis=mspath+msfile') 01678 c.pull('vis') 01679 tb.clearlocks('/home/casa-dev-01/hye/test/5921.ms') 01680 c.pgc('outputvis=work_dir+"/"+msfile+"-"+str(id)') 01681 #alternatively 01682 #for i in c.get_ids(): 01683 # p=c.pull('work_dir')[i] 01684 # f=c.pull('msfile')[i] 01685 # o=p+"/"+f+"--"+str(i) 01686 # c.pgk(outputvis={i: o}) 01687 c.pull('outputvis') 01688 c.pgc('field="2"') 01689 spw=c.split_channel(0, 64) 01690 spw[0]='0:6~15' 01691 spw[3]='0:48~55' 01692 c.pgk(spw=spw) 01693 c.pgc('inp("split")') 01694 c.pgc('go("split")') 01695 c.read_casalogs() 01696 c.pgc('inp("clean")') 01697 c.pgc('vis=outputvis') 01698 c.pgk(imagetag='.clean') 01699 c.pgc('imagename=vis+imagetag') 01700 c.pgc('inp("clean")') 01701 c.pgc('go("clean")') 01702 c.pgc('import commands') 01703 c.pgc('a=commands.getstatusoutput("ls ")') 01704 """ 01705 01706 def use_often(self): 01707 print """Frequently used commands 01708 from parallel_go import * 01709 c=cluster() 01710 c.hello() 01711 c.get_ids() 01712 c.get_nodes() 01713 c.activate() 01714 px print "cluster activated" 01715 c.pdo 'print "parallel"' 01716 c.odo('print "node 0 only"', 0) 01717 c.odo('print "node 0 and 1"', [0,1]) 01718 c.odo 'print "node 1 only"', 1 01719 c.queue_status() 01720 c.clear_queue() 01721 c.get_properties() 01722 c.keys() 01723 c.pull('mpi') 01724 c.get_result(1) 01725 01726 #pg.activate() 01727 #px 'from casa_in_py import *' 01728 """ 01729 01730 def example(self): 01731 print """example: run clean on 4 engines 01732 01733 from parallel_go import * 01734 c=cluster() 01735 c.start_engine('casa-dev-08', 4, 01736 '/home/casa-dev-08/hye/cluster') 01737 default clean 01738 mspath='/home/casa-dev-09/hye/test/ngc6503/ngc6503_output/' 01739 msname='ngc6503.ms.contsub' 01740 vis=mspath+msname 01741 wpath=[] 01742 for i in c.pull('work_dir'): 01743 wpath.append(i+'/'+msname+'.clean') 01744 01745 imagename=c.pad_task_id(wpath) 01746 mode='channel' 01747 start=c.split_int(9, 127) 01748 nchan=40 01749 width=1 01750 calready=False 01751 gain=0.1 01752 msize=[370,250] 01753 psfmode='clark' 01754 cell=[4.,4.] 01755 niter=10000 01756 threshold=0.0003 01757 taskghting='briggs' 01758 #rmode = 'norm' 01759 robust=0.5 01760 mask = '' 01761 s=c.pgt() 01762 job=[] 01763 for i in c.get_ids(): 01764 job.append(c.odo(s[i], i)) 01765 01766 c.check_job(job[0]) 01767 01768 c.get_result(0) 01769 01770 """ 01771 01772 01773 cluster=cluster() 01774 01775 01776 """ 01777 for i in 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16; 01778 do ssh casa-dev-$i "ps -ef | grep hye" ; done 01779 """ 01780 """ 01781 c.pgc('import time', {0: 'time.sleep(10); x=5; y="y is y"', 1: 'time.sleep(12);a=7;b="b is not y"'},block=False,job="wakeup") 01782 01783 c.pull('x',0) 01784 c.pull('y',0) 01785 c.pull('a',1) 01786 c.pull('b',1) 01787 c.odo('print x', 0) 01788 c.odo('print x', 0).r 01789 01790 """