casa  $Rev:20696$
 All Classes Namespaces Files Functions Variables
parallel_go.py
Go to the documentation of this file.
00001 from IPython.kernel import client 
00002 from subprocess import *
00003 import os
00004 import sys
00005 import commands
00006 import string
00007 import atexit
00008 import time
00009 import socket
00010 import types
00011 import inspect
00012 import casadef
00013 import numpy as np
00014 from math import *
00015 from get_user import get_user
00016 # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
00017 import traceback
00018 # jagonzal (CAS-4372): Introduce CASA logging system into cluster infrastructure
00019 from casac import *
00020 casalog = casac.logsink()
00021 casalog.setglobal(True)
00022 
00023 a=inspect.stack()
00024 stacklevel=0
00025 for k in range(len(a)):
00026    if a[k][1] == "<string>" or (string.find(a[k][1], 'ipython console') > 0 or string.find(a[k][1],"casapy.py") > 0):
00027       stacklevel=k
00028 
00029 myf=sys._getframe(stacklevel).f_globals
00030 
00031 if myf.has_key('casa') :
00032    casa = myf['casa']
00033 else:
00034    casa = { }
00035 
00036 class cluster(object):
00037 
00038    "control cluster engines for parallel tasks"
00039 
00040    _instance = None
00041    
00042    __client=None
00043    __controller=None
00044    __timestamp=None
00045    __engines=[]
00046    __ipythondir=os.environ['PWD']+'/ipython'
00047    if(os.environ.has_key('IPYTHONDIR')):
00048       __ipythondir=os.environ['IPYTHONDIR']
00049    __homepath=os.environ['HOME'] 
00050    __start_controller_file='start_controller.sh'   
00051    __start_engine_file='start_engine.sh'
00052    __stop_node_file='stop_node.sh'
00053    __stop_engine_file='stop_engine.sh'
00054    __stop_controller_file='stop_controller.sh'
00055    __cluster_rc_file='clusterrc.sh'
00056    __user = get_user()
00057    __prefix = '/tmp/' + __user + '-'
00058    __init_now=True
00059    __new_engs=[]
00060 
00061    def __new__(cls, *args, **kwargs):
00062        if not cls._instance:
00063            cls._instance = super(cluster, cls).__new__(
00064                                 cls, *args, **kwargs)
00065        return cls._instance
00066 
00067    def __call__(self):
00068        
00069        # If there is already a controller, use it
00070        return self
00071 
00072    def __init__(self):
00073       """Initialize a Cluster.
00074 
00075       A Cluster enables parallel and distributed execution of CASA tasks and tools on a set of networked computers. A culster consists of one controller and one or more engines. Each engine is an independent Python instance that takes Python commands over a network connection. The controller provides an interface for working with a set of engines. A user uses casapy console to command the controller. A password-less ssh access to the computers that hosts engines is required for the communication between controller and engines.
00076 
00077       """
00078 
00079       self.__client=None
00080       self.__controller=None
00081       self.__timestamp=None
00082       self.__engines=[]
00083       self.__ipythondir=os.environ['PWD']+'/ipython'
00084       if(os.environ.has_key('IPYTHONDIR')):
00085          self.__ipythondir=os.environ['IPYTHONDIR']
00086       else:
00087          os.environ['IPYTHONDIR']=self.__ipythondir
00088       self.__homepath=os.environ['HOME'] 
00089       if (self.__ipythondir==None or self.__ipythondir==''):
00090          os.environ["IPYTHONDIR"]=os.environ['HOME']+'.casa/ipython'
00091       self.__ipythondir=os.environ['IPYTHONDIR']
00092       self.__init_now=True
00093       self.__new_engs=[]
00094       atexit.register(cluster.stop_cluster,self)
00095 
00096    def _ip(self, host):
00097       """Returns a unique IP address of the given hostname,
00098       i.e. not 127.0.0.1 for localhost but localhost's global IP"""
00099       
00100       ip = socket.gethostbyname(host)
00101       
00102       if ip == "127.0.0.1":
00103          ip = socket.gethostbyname(socket.getfqdn())
00104          
00105       return ip
00106 
00107    def _cp(self, source, host, destination):
00108       """Creates the command to copy the source file to the destination file and destination host,
00109       using either scp or cp for the localhost. This is to avoid the requirement of password-less ssh
00110       in a single host environment."""
00111 
00112       if self._ip(host) == self._ip("localhost"):
00113          cmd = ['cp', source, destination]
00114       else:
00115          cmd = ['scp', source, host + ":" + destination]
00116 
00117       return cmd
00118 
00119    def _do(self, host, cmd):
00120       """Creates the command line to execute the give command on the given host.
00121       If and only if the host is not localhost, ssh is used."""
00122 
00123       if self._ip(host) == self._ip("localhost"):
00124          return cmd.split(" ")
00125       else:
00126          return ['ssh', '-f', '-q', '-x', host, cmd]
00127 
00128    def start_engine(self, node_name, num_engine, work_dir=None, omp_num_nthreads=1):
00129       """Start engines on the given node.
00130       @param node_name The name of the computer to host the engines.
00131       @param num_engine The number of the engines to initialize for this run. 
00132       @param work_dir The working directory where outputs and logs from the engines will be stored. If work_dir is not supplied or does not exist, the user's home directory will be used. 
00133       Running this command multiple times on the same node is ok. The total number of the engines on the node increases for each run.
00134       Every engine has a unique integer id. The id is the key to send the instructions to the engine. The available engine ids can be obtained by calling get_ids() or get_engines().
00135       """
00136       
00137       casalog.origin("parallel_go")
00138       
00139       # Start controller
00140       if not self.__start_controller():
00141          casalog.post("The controller is not started","WARN","start_engine")
00142          return False
00143 
00144       # Start the engine 
00145       out=open('/dev/null', 'w')
00146       err=open('/dev/null', 'w')
00147       cmd = self._cp(self.__ipythondir+'/'+self.__cluster_rc_file,
00148                 node_name,
00149                 self.__prefix+self.__cluster_rc_file)
00150       p=Popen(cmd, stdout=out, stderr=err)
00151       sts = os.waitpid(p.pid, 0)
00152       if sts[1] != 0:
00153           casalog.post("Command failed: %s" % str(join(cmd)),"WARN","start_engine")
00154       
00155       cmd = self._cp(self.__ipythondir+'/'+self.__start_engine_file,
00156                 node_name,
00157                 self.__prefix+self.__start_engine_file)
00158 
00159       p=Popen(cmd, stdout=out, stderr=err)
00160       sts = os.waitpid(p.pid, 0)
00161       if sts[1] != 0:
00162          casalog.post("Command failed: %s" % str(join(cmd)),"WARN","start_engine")
00163       for i in range(1, num_engine+1):
00164          args='bash '+self.__prefix+self.__start_engine_file
00165          cmd = self._do(node_name, args)
00166          q=Popen(cmd)
00167          sts = os.waitpid(q.pid, 0)
00168          if sts[1] != 0:
00169             casalog.post("Command failed: %s" % str(join(cmd)),"WARN","start_engine")
00170          casalog.post("start engine %s on %s" % (i, node_name),"INFO","start_engine")
00171       self.__engines=self.__update_cluster_info(num_engine, work_dir,omp_num_nthreads)
00172       
00173       out.close()
00174       err.close()
00175 
00176    # jagonzal (CAS-4292): This method crashes when initializing the nodes via __init_nodes,
00177    # so it is deprecated. Instead it is necessary to use directly the start_engine method 
00178    # which does not only start the engine, but also initializes it using scripts
00179    def start_cluster(self, cl_file):
00180       """Start engines that listed in a file
00181 
00182       @param cl_file The name of the file that defines the engines.
00183       The cl_file is a text file. Each line contains 3 columns with node name, number of engines and work directory separated by space. A line started with # will be ignored. Example:
00184       #-----------------------------------------
00185       #node_name num_of_engines work_dir
00186       casa-dev-01 4 /home/casa-dev-01/hye/cluster
00187       #casa-dev-02 3 /home/casa-dev-02/hye/cluster
00188       subzero 1 /home/subzero/hye/test
00189       #olddog  2 /home/olddog/hye
00190       #-----------------------------------------
00191 
00192       start_cluster and start_engine can be used multiple times. 
00193 
00194       """
00195       
00196       casalog.origin("parallel_go")
00197 
00198       # Start controller
00199       if not self.__start_controller():
00200          casalog.post("The controller is not started","WARN","start_cluster")
00201          return False
00202  
00203       # Process the file
00204       try:
00205          clf=open(cl_file, 'r')
00206          lines = clf.readlines()
00207          for line in lines:
00208             if line.startswith('#'):
00209                continue
00210             words = string.split(line)
00211             if len(words) < 3:
00212                 casalog.post("The node definition is invalid: %s" % line,"WARN","start_cluster")
00213                 continue
00214             try:
00215                int(words[1])
00216             except:
00217                # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
00218                # traceback.print_tb(sys.exc_info()[2])
00219                continue
00220                
00221             # Start all nodes
00222             self.__init_now=False
00223             casalog.post("start_engine(%s,%s,%s)" % (str(words[0]),str(words[1]),str(words[2])),"INFO","start_cluster")
00224             self.start_engine(words[0], int(words[1]), words[2])
00225 
00226          clf.close()
00227       except IOError:
00228          casalog.post("Cluster file '%s' doesn't exist" % cl_file,"SEVERE","start_cluster")
00229 
00230       if len(self.__new_engs)>0:
00231          self.__init_nodes(self.__new_engs)
00232          self.__engines=self.__client.pull(['id', 'host', 'pid', 'inited'])
00233       self.__new_engs=[]
00234       self.__init_now=True
00235   
00236    def __start_controller(self):
00237       """(Internal) Start the controller.
00238       
00239       A user does not need to call this function directly. When a user runs either start_cluster or start_engine, it will check the existence of a valid controller. If the controller does not exist, this function will be called auto matically. All engines will connect to the valid controller. 
00240 
00241       """
00242       
00243       casalog.origin("parallel_go")
00244        
00245       # If there is already a controller, use it
00246       if (self.__controller!=None):
00247          return True
00248      
00249       # First of all write bashrc file which is needed by other cluster files
00250       self.__write_bashrc()     
00251 
00252       # Generate time stamp and write start controller file
00253       from time import strftime
00254       timestamp=strftime("%Y%m%d%H%M%S")
00255       self.__write_start_controller(timestamp) 
00256       
00257       # Start controller in a detached terminal
00258       cmd = 'bash ' + self.__ipythondir + '/' + self.__start_controller_file
00259       self.__controller=Popen(cmd,shell=True).pid
00260       if (self.__controller==None):
00261          return False
00262       self.__timestamp=timestamp
00263       casalog.post("Controller %s started" % self.__controller  ,"INFO","start_controller")
00264       
00265       # Now write the rest of the cluster files
00266       self.__write_start_engine()
00267       self.__write_stop_controller()
00268       self.__write_stop_node()          
00269       
00270       # Wait for controller files to exist
00271       info=self.__ipythondir+'/log/casacontroller-'+str(self.__timestamp)+'-'+str(self.__controller)+'.log'
00272       meng=self.__ipythondir+'/security/casacontroller-mec-'+self.__timestamp+'.furl'
00273 
00274       for i in range(1, 15):
00275          if os.path.exists(info):
00276             break
00277          time.sleep(1)
00278 
00279       for i in range(1, 15):
00280          if os.path.exists(meng):
00281             break
00282          time.sleep(1)
00283 
00284       # Start-up client
00285       self.__client=client.MultiEngineClient(meng)
00286 
00287       return True
00288 
00289    def __write_start_engine(self):
00290       """(Internal) Create script for starting engines.
00291 
00292       The created script will be stored in the user's $IPYTHONDIR. The start_cluster and start_engine will upload the script to the node and execute it in the proper shell.
00293 
00294       """
00295 
00296       ef=open(self.__ipythondir+'/'+self.__start_engine_file, 'w')
00297       bash=commands.getoutput("which bash")
00298       ef.write('#!%s\n' % bash)
00299       ef.write('. %s%s\n' % (self.__prefix, self.__cluster_rc_file))
00300       cmd=commands.getoutput("which ipengine")
00301       ef.write('export contrid=%s\n' % self.__controller)
00302       ef.write('export stamp=%s\n' % self.__timestamp)
00303       ef.write(cmd+' --furl-file='+self.__ipythondir+'/security/casacontroller-engine-'+self.__timestamp+'.furl --logfile='+self.__ipythondir+'/log/casaengine-'+self.__timestamp+'-'+str(self.__controller)+'- 2>&1 | grep -v NullSelection &\n')
00304       ef.close()
00305       
00306    def __write_start_controller(self,timestamp):
00307       """
00308 
00309       """
00310 
00311       ef=open(self.__ipythondir+'/'+self.__start_controller_file, 'w')
00312       bash=commands.getoutput("which bash")
00313       ef.write('#!%s\n' % bash)
00314       ef.write('. %s/%s\n' % (self.__ipythondir, self.__cluster_rc_file))
00315       lfile=self.__ipythondir+'/log/casacontroller-'+timestamp+'-'
00316       ffile=self.__ipythondir+'/security/casacontroller-engine-'+timestamp+'.furl'
00317       efile=self.__ipythondir+'/security/casacontroller-mec-'+timestamp+'.furl'
00318       tfile=self.__ipythondir+'/security/casacontroller-tc-'+timestamp+'.furl'
00319       cmd = commands.getoutput("which ipcontroller")
00320       cmd += ' --engine-furl-file=' + ffile
00321       cmd += ' --multiengine-furl-file=' + efile
00322       cmd += ' --task-furl-file=' + tfile
00323       cmd += ' --logfile=' + lfile
00324       cmd += ' &\n'
00325       ef.write(cmd)
00326       ef.close()
00327 
00328    def __write_stop_node(self):
00329       """(Internal) Create script for stoping a node.
00330 
00331       The created script will be stored in the user's $IPYTHONDIR. The stop_cluster and stop_engine will upload the script to the node and execute it in the proper shell.
00332 
00333       """
00334       ef=open(self.__ipythondir+'/'+self.__stop_node_file, 'w')
00335       bash=commands.getoutput("which bash")
00336       ef.write('#!%s\n' % bash)
00337       # Stop all engines started by the current controller
00338       ef.write("ps -fu `whoami` | grep ipengine | grep -v grep | grep "+self.__timestamp+" | awk '{print $2}' | xargs kill -TERM>/dev/null")
00339       ef.close()
00340 
00341    def __write_stop_controller(self):
00342       """(Internal) Create script for stoping the controller.
00343 
00344       The created script will be stored in the user's $IPYTHONDIR. The stop_cluster will execute it in the proper shell.
00345 
00346       """
00347       ef=open(self.__ipythondir+'/'+self.__stop_controller_file, 'w')
00348       bash=commands.getoutput("which bash")
00349       ef.write('#!%s\n' % bash)
00350       ef.write("ps -ef | grep `whoami` | grep ipcontroller | grep -v grep | awk '{print $2}' | xargs kill -TERM >/dev/null")
00351       ef.close()
00352 
00353    def __write_bashrc(self):
00354       """(Internal) Create file containning bash startup instructions for the engine host.
00355 
00356       When the controller startup, the necessary environment information for running cluster is extracted from the user's current shell (that runs this casapy session) and written to a rc file. The created script will be stored in the user's $IPYTHONDIR. The start_cluster and start_engine will upload the rc file to the nodes and establish the engine environment.
00357 
00358       """
00359       bashrc=open(self.__ipythondir+'/'+self.__cluster_rc_file, 'w')
00360       bash=commands.getoutput("which bash")
00361       bashrc.write("#!%s\n" % bash)
00362       envList=['PATH', 'LD_LIBRARY_PATH', 'IPYTHONDIR', 
00363                'CASAPATH', 'CASAARCH',
00364                'PYTHONHOME', '__CASAPY_PYTHONDIR',
00365                'PGPLOT_DEV', 'PGPLOT_DIR', 'PGPLOT_FONT'] 
00366       for param in envList:
00367          try:
00368             bashrc.write('export %s="%s"\n' % (param,os.environ[param]))
00369          except:
00370             # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
00371             # traceback.print_tb(sys.exc_info()[2])
00372             pass
00373 
00374       bashrc.write("export HOSTNAME=`uname -n`")
00375       bashrc.close()
00376 
00377    def stop_engine(self, engine_id):
00378       """Stop an engine.
00379 
00380       @param engine_id The id of the engine to be stopped.
00381       If an engine with the given id is in the current cluster, running this function will stop the engine and remove it from the engine list.
00382       
00383       """
00384       
00385       casalog.origin("parallel_go")
00386        
00387       if type(engine_id).__name__ != 'int':
00388           casalog.post("engine id must be an integer","WARN","stop_engine")
00389           return None 
00390 
00391       node_name=''
00392       procid=None
00393       for i in self.__engines:
00394          if (i[0]==engine_id):
00395             node_name=i[1]
00396             procid=i[2]
00397 
00398       if (node_name=='' or procid is None):
00399           casalog.post("Could not find engine %d" % engine_id,"WARN","stop_engine")
00400           return
00401 
00402       ef=open(self.__ipythondir+'/'+self.__stop_engine_file, 'w')
00403       bash=commands.getoutput("which bash")
00404       ef.write('#!%s\n' % bash)
00405       ef.write("kill -9 %d" % procid)
00406       ef.close()
00407 
00408       out=open('/dev/null', 'w')
00409       err=open('/dev/null', 'w')
00410       cmd = self._cp(self.__ipythondir+'/'+self.__stop_engine_file,
00411                 node_name,
00412                 self.__prefix+self.__stop_engine_file)
00413 
00414       p=Popen(cmd, stdout=out, stderr=err)
00415       out.close()
00416       err.close()
00417       sts = os.waitpid(p.pid, 0)
00418       args='bash '+self.__prefix+self.__stop_engine_file
00419       Popen(self._do(node_name, args))
00420       casalog.post("stop engine %d on %s" % (engine_id, node_name),"INFO","stop_engine")
00421       self.__engines=self.__update_cluster_info(-1)
00422 
00423 
00424    def stop_node(self, node_name):
00425       """Stop a node (a engine-host computer)
00426 
00427       @param node_node The node to be stopped.
00428       If a computer with the given name is in the current cluster, running this function will stop all the engines currently running on that node and remove the node and engines from the engine list. This function will not shutdown the computer.
00429      
00430       """
00431       
00432       casalog.origin("parallel_go")
00433       
00434       if type(node_name).__name__ != 'str':
00435          casalog.post("node_name must be a string","WARN","stop_node")
00436          return None
00437 
00438       if self.get_nodes().count(node_name) == 0:
00439           casalog.post("There is no host with name %s" % node_name,"WARN","stop_node")
00440           return None
00441 
00442       out=open('/dev/null', 'w')
00443       err=open('/dev/null', 'w')
00444       cmd = self._cp(self.__ipythondir+'/'+self.__stop_node_file,
00445                 node_name,
00446                 self.__prefix+self.__stop_node_file)
00447       p=Popen(cmd, stdout=out, stderr=err)
00448       out.close()
00449       err.close()
00450       sts = os.waitpid(p.pid, 0)
00451       args='bash '+self.__prefix+self.__stop_node_file
00452       Popen(self._do(node_name, args))
00453       casalog.post("stop engines on %s" % node_name,"INFO","stop_node")
00454 
00455       num_engine=0
00456       for i in self.__engines:
00457          if i[1]==node_name:
00458             num_engine=num_engine-1
00459 
00460       self.__engines=self.__update_cluster_info(num_engine)
00461 
00462    def __stop_controller(self):
00463       """(Internal) Stop the controller.
00464 
00465       This is the last thing for quiting the cluster gracely. 
00466 
00467       """
00468       
00469       casalog.origin("parallel_go")
00470       
00471       # If it is already down
00472       if (self.__controller==None):
00473          return True
00474 
00475       import commands
00476       node_name=commands.getoutput("uname -n")
00477       out=open('/dev/null', 'w')
00478       err=open('/dev/null', 'w')
00479       cmd = self._cp(self.__ipythondir+'/'+self.__stop_controller_file,
00480                 node_name,
00481                 self.__prefix+self.__stop_controller_file)
00482       p=Popen(cmd, stdout=out, stderr=err)
00483       out.close()
00484       err.close()
00485       sts = os.waitpid(p.pid, 0)
00486       args='bash '+self.__prefix+self.__stop_controller_file
00487       Popen(self._do(node_name, args))
00488 
00489       try:
00490          os.remove(self.__ipythondir+'/'+self.__cluster_rc_file)
00491          os.remove(self.__ipythondir+'/'+self.__start_engine_file)
00492          os.remove(self.__ipythondir+'/'+self.__stop_node_file)
00493          os.remove(self.__ipythondir+'/'+self.__stop_controller_file)
00494          os.remove(self.__ipythondir+'/'+self.__stop_engine_file)
00495       except:
00496          # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
00497          # traceback.print_tb(sys.exc_info()[2])
00498          pass
00499 
00500       try:
00501          self.__controller=None
00502       except:
00503          # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
00504          # traceback.print_tb(sys.exc_info()[2])
00505          pass
00506 
00507       casalog.post("Controller stopped","INFO","stop_controller")
00508       return True
00509 
00510    def stop_cluster(self):
00511       """Stop the cluster
00512 
00513       This function stops all the running engines and the controller.
00514 
00515       """
00516 
00517       # jagonzal (CAS-4292): We have to check the controller instance directly because the method
00518       # start_cluster does not work properly (crashes when initializing the nodes via __init_nodes).
00519       # Actually start_cluster is deprecated, and it is necessary to use directly the start_engine 
00520       # method which does not only start the engine, but also initializes it using scripts
00521       if ((self.__controller==None) or (self.__client==None)):
00522          return
00523       # jagonzal (CAS-CHANGE): Do not use brute-force kill to schut down cluster
00524       else:
00525          # Kill the engines and controller using kernel.multiengineclient interface
00526          try:
00527              self.__client.kill(True,self.__engines,False)
00528              del self.__client
00529          except:
00530              traceback.print_exception((sys.exc_info()[0]), (sys.exc_info()[1]), (sys.exc_info()[2]))
00531          # Reset state before doing anything else, otherwise we may try to use one method from the client object
00532          self.__client=None
00533          self.__controller=None             
00534          # Update cluster info
00535          self.__engines=[]
00536          # Remove initialization/shut-down scripts
00537          try:
00538              os.remove(self.__ipythondir+'/'+self.__start_controller_file)             
00539              os.remove(self.__ipythondir+'/'+self.__cluster_rc_file)
00540              os.remove(self.__ipythondir+'/'+self.__start_engine_file)
00541              os.remove(self.__ipythondir+'/'+self.__stop_node_file)
00542              os.remove(self.__ipythondir+'/'+self.__stop_controller_file)
00543              
00544              os.remove(self.__prefix+self.__cluster_rc_file)
00545              os.remove(self.__prefix+self.__start_engine_file)            
00546          except:
00547              traceback.print_exception((sys.exc_info()[0]), (sys.exc_info()[1]), (sys.exc_info()[2]))             
00548          # jagonzal (CAS-4370): Remove all the ipcontroller/ipengine files because
00549          # otherwise it might confuse future cluster/MultiEngineClient instances
00550          self.wash_logs()
00551          return
00552 
00553       ### jagonzal (CAS-4292): Code below is deprecated ###
00554       
00555       # shutdown all engines
00556       elist=[]
00557       for i in self.__engines:
00558          elist.append(i[1])
00559       fruit=set(elist)
00560 
00561       for i in fruit:
00562          try:
00563             self.stop_node(i)
00564          except:
00565             # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
00566             traceback.print_exception((sys.exc_info()[0]), (sys.exc_info()[1]), (sys.exc_info()[2]))
00567             continue
00568 
00569       # shutdone controller
00570       try:
00571          self.__stop_controller()
00572       except:
00573          # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
00574          traceback.print_exception((sys.exc_info()[0]), (sys.exc_info()[1]), (sys.exc_info()[2]))
00575          pass
00576 
00577       try:
00578          # jagonzal (CAS-4106): We have to shut down the client, not activate it
00579          # besides, the activate method only enables parallel magic commands
00580          self.__client=None
00581       except:
00582          # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
00583          traceback.print_exception((sys.exc_info()[0]), (sys.exc_info()[1]), (sys.exc_info()[2]))
00584          pass
00585 
00586    def wash_logs(self):
00587       """Clean up the cluster log files.
00588 
00589       A set of logs containing controller-engine information will be created every time a cluster is created. This function deletes all cluster log files that cumulated in the user's $IPYTHONDIR, if there is no active cluster running. (The files will be removed only before starting any engine of after stoping the whole cluster.
00590 
00591       """
00592       # do this only if no controller running
00593       if (self.__controller!=None):
00594          return True
00595 
00596       # jagonzal (CAS-4370): Remove all the ipcontroller/ipengine files because
00597       # otherwise it might confuse future cluster/MultiEngineClient instances
00598       os.system("rm -rf %s/log/*" % self.__ipythondir)
00599       os.system("rm -rf %s/security/*" % self.__ipythondir)
00600 
00601    def __init_nodes(self, i):
00602      """(Internal) Initialize engines
00603 
00604      @param i The list of the engine ids
00605      An engine is a Python interpreter. To make an engine capable of running CASA tasks and tools, we must setup the environment and import necessary modules. This function effectively make every engine a running CASA instance (except that it is a non-interactive CASA running in Python, in contrast the casapy that is an interactive CASA running in IPython).
00606 
00607      """
00608      
00609      casalog.origin("parallel_go")
00610      casalog.post("Initialize engines %s" %str(i),"INFO","init_nodes")
00611      
00612      self.__client.push({'casa': casa })
00613      self.__client.execute('import os', i)
00614      self.__client.execute('if os.path.isdir(work_dir):os.chdir(work_dir)\nelse:work_dir=os.environ["HOME"]', i)
00615      phome=''
00616      try:
00617        phome=os.environ["PYTHONHOME"]
00618      except:
00619        # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
00620        # traceback.print_tb(sys.exc_info()[2])
00621        pass
00622      
00623      if phome=='':
00624         try:
00625            v=str.split(os.environ["CASAPATH"], ' ')
00626            phome=v[0]+'/'+v[1]
00627         except:
00628            # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
00629            # traceback.print_tb(sys.exc_info()[2])
00630            pass
00631 
00632      dhome=''
00633      try:
00634        dhome=os.environ["CASAARCH"]
00635      except:
00636        # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
00637        # traceback.print_tb(sys.exc_info()[2])
00638        pass
00639 
00640      if phome=='':
00641          casalog.post("could not locate casa_in_py.py","SEVERE","init_nodes")
00642          return None
00643 
00644      if (dhome!=phome):
00645         phome=dhome
00646 
00647      sdir = casadef.python_library_directory + '/'
00648      self.__client.push(dict(phome=phome), i)
00649      self.__client.execute('import sys', i)
00650      self.__client.push(dict(sdir=sdir), i)
00651      self.__client.execute('scriptdir=sdir', i)
00652 
00653      self.__client.execute('sys.path.insert(2, scriptdir)', i)
00654      try:
00655         self.__client.execute("execfile(scriptdir+'casa_in_py.py')", i)
00656         self.__client.execute('inited=True', i)
00657      except client.CompositeError, exception:
00658          casalog.post("Error initializing engine %s: %s" % (str(i), str(exception)),"SEVERE","init_nodes")
00659          exception.print_tracebacks()
00660      except:
00661          casalog.post("Error initializing engine %s" % str(i),"SEVERE","init_nodes")
00662          traceback.print_tb(sys.exc_info()[2])     
00663     
00664 
00665    def reset_cluster(self):
00666       """Re-initialize the engines.
00667 
00668       This function reset the running environment for all the available engines.
00669       
00670       """
00671       
00672       casalog.origin("parallel_go")
00673       
00674       if self.__client is None:
00675          casalog.post("Multiengineclient is not initialized","WARN","reset_cluster")
00676          return None
00677 
00678       try:
00679          tobeinit=self.__client.pull('id')
00680       except:
00681          # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
00682          # traceback.print_tb(sys.exc_info()[2])
00683          return None
00684          
00685 
00686       if len(tobeinit)>0:
00687          self.__init_nodes(tobeinit)
00688          self.__engines=self.__client.pull(['id', 'host', 'pid', 'inited'])
00689 
00690    def __update_cluster_info(self, num_engine, work_dir=None,omp_num_nthreads=1):
00691       """(Internal) Construct the list of engines.
00692 
00693       @param num_engine The number of new engines 
00694       @param work_dir The initial working directory 
00695       This function appends num_engine engines to the engine list and setup initial Python environment on them. Before further initialization, an engine can only run Python programs (it can not run CASA tasks or tools).
00696 
00697       """
00698       
00699       casalog.origin("parallel_go")
00700 
00701       if self.__client is None :
00702          casalog.post("Controller is not initialized","WARN","update_cluster_info")
00703          return [] 
00704 
00705       engs=len(self.__engines)+num_engine
00706       if engs<0:
00707          engs=0
00708       i=0
00709       idlist=self.__client.get_ids()
00710       while (len(idlist)!=engs and i<10):
00711          idlist=self.__client.get_ids()
00712          time.sleep(1)
00713          i=i+1
00714          
00715       # Here we only take care of the quick-init-abel items
00716       # The init of real casa_in_py will be done in parallel 
00717       tobeinit=[]
00718       for i in idlist:
00719          inited=False
00720          try:
00721             inited=self.__client.pull('inited', i)
00722          except:
00723 
00724             # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
00725             # traceback.print_tb(sys.exc_info()[2])
00726 
00727             tobeinit.append(i)
00728             self.__client.execute('id=%d'%i, i)
00729             self.__client.execute('import os', i)
00730             self.__client.execute('import socket', i)
00731             self.__client.execute('host=socket.gethostname()', i)
00732             self.__client.execute('pid=os.getpid()', i)
00733             self.__client.execute('job=None', i)
00734             self.__client.execute('import signal', i)
00735             self.__client.execute('original_sigint_handler = signal.signal(signal.SIGINT,signal.SIG_IGN)', i)
00736             # jagonzal (CAS-4276): New cluster specification file allows to automatically set the number of open MP threads
00737             self.__client.execute("os.environ['OMP_NUM_THREADS']='"+str(omp_num_nthreads)+"'", i)
00738 
00739             if work_dir!=None and os.path.isdir(work_dir):
00740                self.__client.push(dict(work_dir=work_dir), i)
00741             else:
00742                self.__client.execute('work_dir=os.environ["HOME"]', i)
00743 
00744             # These are environment variabls set for each node at startup. 
00745             # It may be better to set as global in this module then pass to each engine when update_cluster_info
00746 
00747             self.__client.execute('contrid=os.environ["contrid"]', i)
00748             self.__client.execute('stamp=os.environ["stamp"]', i)
00749             self.__client.execute('inited=False', i)
00750 
00751       self.__new_engs.extend(tobeinit)
00752 
00753       if self.__init_now:
00754          if len(self.__new_engs)>0:
00755             self.__init_nodes(self.__new_engs)
00756 
00757          self.__init_now=True
00758          self.__new_engs=[]
00759 
00760       if len(idlist)>0:
00761          return self.__client.pull(['id', 'host', 'pid', 'inited'])
00762       else:
00763          return []
00764 
00765    def get_casalogs(self):
00766       """Get a list of the casa logs for all the current cluster engines.
00767 
00768       Each working engine is a CASA instance and saves its own log. This function retrun the list of logs with their full path. One can view the log contents with casalogviewer.
00769 
00770       """
00771       try:
00772          self.__client.execute('tmp=work_dir+"/"+thelogfile')
00773          return self.__client.pull('tmp')
00774       except:
00775          # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
00776          # traceback.print_tb(sys.exc_info()[2])
00777          return None
00778 
00779    def read_casalogs(self):
00780       """Read the casa log files.
00781 
00782       The current implementation of this function is only a prototype. A multi-log viewer needs to be developed.
00783 
00784       """
00785       
00786       casalog.origin("parallel_go")
00787       
00788       import os
00789       import string
00790       _logs = self.get_casalogs()
00791       if _logs != None:
00792          files = string.join(_logs, ' ')
00793          os.system("emacs "+files+ "&")
00794       else:
00795          casalog.post("Cannot read casalogs","WARN","read_casalogs")
00796 
00797    def pad_task_id(self, b='', task_id=[]):
00798       """Generate a dictionary of id-padded variables 
00799 
00800       @param b The base name to be padded
00801       @param task_id A list of integers to pad the base name 
00802       One way of distributing varaibles to a set of engnines is through python a dictionary. This is a convenience function for quick generating a dictionary of padded names. Example:
00803       x=c.pad_task_id('basename', [3, 5, 8])
00804       x
00805       {3: 'basename-3', 5: 'basename-5', 8: 'basename-8'}
00806       x=c.pad_task_id([1,3],[0,1,2,3])
00807       x
00808       {0: '1-0', 1: '3-1', 2: '3-2', 3: '3-3'}
00809       x=c.pad_task_id(['a', 'b','c','d','e'],[0,1,2,3])
00810       x
00811       {0: 'a-0', 1: 'b-1', 2: 'c-2', 3: 'd-3'}
00812       y=c.pad_task_id(x)
00813       y
00814       {0: 'a-0-0', 1: 'b-1-1', 2: 'c-2-2', 3: 'd-3-3'}
00815      
00816       """
00817       
00818       casalog.origin("parallel_go")
00819 
00820       base={} 
00821 
00822       int_id=True
00823       for j in task_id:
00824          if type(j)!=types.IntType or j<0:
00825             casalog.post("Task id %s must be a positive integer" % str(j),"WARN","pad_task_id")
00826             int_id=False
00827             break
00828       if not int_id:
00829          return base
00830 
00831       if type(b)==list:
00832          for j in range(len(b)):
00833             if type(b[j])!=str:
00834                b[j]=str(b[j])
00835 
00836       if len(task_id)==0:
00837          task_id=list(xrange(0, len(self.__engines))) 
00838       if type(b)==str:
00839          for j in task_id:
00840             base[j]=b+'-'+str(j)
00841       if type(b)==list:
00842          k=len(b)
00843          m=len(task_id)
00844          if m<=k:
00845             for j in range(m):
00846                base[task_id[j]]=b[j]+'-'+str(task_id[j])
00847          else:
00848             for j in range(k):
00849                base[task_id[j]]=b[j]+'-'+str(task_id[j])
00850             for j in range(k,m):
00851                base[task_id[j]]=b[k-1]+'-'+str(task_id[j])
00852 
00853       if type(b)==dict:
00854           for i in b.keys():
00855              base[i]=b[i]+'-'+str(i)
00856 
00857       return base
00858        
00859    def one_to_n(self, arg, task_id=[]):
00860       """Genrate a dictionary of one variable for n keys
00861 
00862       @param arg The variable to be distributed
00863       @param task_id The list of integer ids
00864       One way of distributing varaibles to a set of engnines is through python a dictionary. This is a convenience function for quick generating a dictionary of same variable for n keys. Example:
00865       x=c.one_to_n('basename', [1, 2, 7])
00866       x
00867       {1: 'basename', 2: 'basename', 7: 'basename'}
00868 
00869       """
00870       
00871       casalog.origin("parallel_go")
00872       
00873       # assign 1 value to n targets
00874       base={} 
00875       int_id=True
00876       for j in task_id:
00877          if type(j)!=types.IntType or j<0:
00878             casalog.post("Task id %s must be a positive integer" % str(j),"WARN","one_to_n")
00879             int_id=False
00880             break
00881       if not int_id:
00882          return base
00883 
00884       if len(task_id)==0:
00885          task_id=list(xrange(0, len(self.__engines))) 
00886       for j in task_id:
00887          base[j]=arg
00888       return base
00889        
00890    def n_to_n(self, args=[], task_id=[]):
00891       """Generate a dictionary of n varables
00892 
00893       @param arags A list of n variables
00894       @param task_id A list of n integer ids
00895       One way of distributing varaibles to a set of engnines is through python a dictionary. This is a convenience function for quick generating a dictionary of a set of n variables for n keys. Example:
00896       x=c.n_to_n(['a', 'b', 'c'], [3, 6, 7])
00897       x
00898       {3: 'a', 6: 'b', 7: 'c'}
00899 
00900       """
00901       
00902       casalog.origin("parallel_go")
00903       
00904       # Assign n value to n targets
00905       base={} 
00906 
00907       if len(args)==0:
00908          return base
00909 
00910       int_id=True
00911       for j in task_id:
00912          if type(j)!=types.IntType or j<0:
00913             casalog.post("Task id %s must be a positive integer" % str(j),"WARN","n_to_n")
00914             int_id=False
00915             break
00916       if not int_id:
00917          return base
00918 
00919       if len(task_id)==0:
00920          task_id=list(xrange(0, len(self.__engines))) 
00921 
00922       i=-1
00923       for j in task_id:
00924          i=i+1
00925          if i==len(args):
00926             break
00927          base[j]=args[i]
00928       return base
00929 
00930    def split_int(self, start, end, task_id=[]):
00931       """Generate a dictionary to distribute the spectral windows
00932 
00933       @param start The start integer value
00934       @param end The end integer value
00935       @param task_id The list of integer ids
00936       This is a convenience function for quick generating a dictionary of integer start points. Example:
00937       x=c.split_int(9, 127, [2,3,4])
00938       x
00939       {2: 9, 3: 49, 4: 89 }
00940 
00941       """
00942       
00943       casalog.origin("parallel_go")
00944       
00945       base={}
00946       if len(task_id)==0:
00947          task_id=list(xrange(0, len(self.__engines))) 
00948 
00949       if len(task_id)==0:
00950          casalog.post("There are no engines available","WARN","split_int")
00951          return base
00952 
00953       if type(start)!=int or type(end)!=int:
00954          casalog.post("start and end point must be integer","WARN","split_int")
00955          return base
00956 
00957       if start<0:
00958          casalog.post("start point must be greater than 0","WARN","split_int")
00959          return base
00960 
00961       if start>=end:
00962          casalog.post("end point must be greate than start point","WARN","split_int")
00963          return base
00964 
00965       nx=1
00966       try:
00967          nx=int(ceil(abs(float(end - start))/len(task_id)))
00968       except:
00969          # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
00970          # traceback.print_tb(sys.exc_info()[2])
00971          pass
00972 
00973       i=-1
00974       for j in task_id:
00975          i=i+1
00976          if i>=len(task_id):
00977             break
00978          st=i*nx
00979          base[j]=st+start
00980       return base
00981        
00982    def split_channel(self, spw, nchan, task_id=[]):
00983       """Generate a dictionary to distribute the spectral windows
00984 
00985       @param spw The spectral window
00986       @param nchan The number of channels to split
00987       @param task_id The list of integer ids
00988       One way of distributing a spectral windows to a set of engnines is through python a dictionary. This is a convenience function for quick generating a dictionary of spw expressions. Example:
00989       x=c.split_channel(1, 127, [2,3,4])
00990       x
00991       {0: '1:0~42', 1: '1:43~85', 2: '1:86~128'}
00992 
00993       """
00994       
00995       casalog.origin("parallel_go")
00996       
00997       base={}
00998       if len(task_id)==0:
00999          task_id=list(xrange(0, len(self.__engines))) 
01000 
01001       if len(task_id)==0:
01002          casalog.post("There are no engines available","WARN","split_channel")
01003          return base
01004 
01005       if nchan<len(task_id):
01006          casalog.post("There are no enough channels to split","WARN","split_channel")
01007          return base
01008 
01009       nx=1
01010       try:
01011          nx=int(ceil(abs(float(nchan))/len(task_id)))
01012       except:
01013          # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
01014          # traceback.print_tb(sys.exc_info()[2])
01015          pass
01016 
01017       i=-1
01018       for j in task_id:
01019          i=i+1
01020          if i==len(task_id):
01021             break
01022          st=i*nx
01023          se=st+nx-1
01024          base[j]=str(spw)+":"+str(st)+"~"+str(se)
01025       return base
01026        
01027    def pgc(self,*args,**kwargs):
01028       """Parallel execution of commands and/or dictionary 
01029          of commands
01030 
01031       @param *args any number of commands or dictionary of
01032              commands (where the key of the dictionary is the
01033              engine id)
01034       @param **kwargs available options are
01035              job=<str> or jobname=<str>
01036              block=<True/False>
01037 
01038       Example:
01039       c.pgc({0:'ya=3',1:'ya="b"'})
01040       c.pull('ya')
01041       {0: 3, 1: 'b'}
01042 
01043       c.pgc('xa=-1')
01044       c.pull('xa')
01045       {0: -1, 1: -1, 2: -1, 3: -1}
01046       c.pull('job')
01047       Out[23]: {0:'xa=-1', 1:'xa=-1', 2:'xa=-1', 3:'xa=-1'}
01048 
01049       """
01050       
01051       casalog.origin("parallel_go")
01052 
01053       tasks={}
01054       for j in self.__client.get_ids():
01055          tasks[j]=[]
01056       
01057       for i in args:
01058          if type(i)==types.DictType:
01059             for j in i.keys():
01060                if type(j)!=types.IntType or j<0:
01061                   casalog.post("task id %s must be a positive integer" % str(j),"WARN","pgc")
01062                   pass
01063                else:
01064                   st=''
01065                   if type(i[j])==types.StringType:
01066                      st=i[j]
01067                   else:
01068                      pass
01069                   if st!='':
01070                      tasks[j].append(st)
01071          elif type(i)==types.StringType:
01072             # For all tasks
01073             for j in xrange(0, len(self.__engines)): 
01074                tasks[j].append(i)
01075          else:
01076             casalog.post("command %s must be a string or a dictionary" % str(i),"WARN","pgc")
01077 
01078       # May be better to use non-block mode and catch the result
01079 
01080       # How to give name, say 'cmd_name', to a set of commands a name such
01081       # that cluster.pull('cmd_name') returns the script is excuteded?
01082      
01083       keys = kwargs.keys()
01084       job='NoName'
01085       block=True
01086       for kw in keys:
01087          if kw.lower()=='job' or kw.lower()=='jobname':
01088             job=kwargs[kw]
01089          if kw.lower()=='block':
01090             block=kwargs[kw]
01091  
01092       for i in tasks.keys():      
01093          cmd=string.join(tasks[i], '\n')
01094          self.__client.push(dict(job=cmd), i)
01095 
01096       return self.__client.execute('exec(job)',
01097               block=block,targets=tasks.keys())
01098 
01099    def parallel_go_commands(self,*args,**kwargs):
01100       """Parallel execution of commands and/or dictionary 
01101          of commands
01102 
01103 
01104       """
01105       self.pgc(*args,**kwargs)
01106  
01107    def pgk(self, **kwargs):
01108       """Parallel execution to set keywords
01109 
01110       @param **kwargs keyword args 
01111 
01112       Example:
01113       x=np.zeros((3,3))
01114       c.pgk(c={1:x},d=6,t='b',s={0:'y'})
01115       c.pull('c')
01116       {1: array([[ 0.,  0.,  0.],
01117                  [ 0.,  0.,  0.],
01118                  [ 0.,  0.,  0.]])}
01119       c.pull('d')
01120       {0: 6, 1: 6}
01121       c.pull('s')
01122       {0: 'y'}
01123       c.pull('t')
01124       {0: 'b', 1: 'b'}
01125       """
01126       
01127       casalog.origin("parallel_go")
01128 
01129       tasks={}
01130       for j in self.__client.get_ids():
01131          tasks[j]=dict()
01132       
01133       keys = kwargs.keys()
01134 
01135       for kw in keys: 
01136          vals=kwargs[kw]
01137          if type(vals)==types.DictType:
01138             for j in vals.keys():
01139                if type(j)!=types.IntType or j<0:
01140                   casalog.post("task id %s must be a positive integer" % str(j),"WARN","pgk")
01141                   pass
01142                else:
01143                   tasks[j][kw]=vals[j]
01144          else:
01145             for j in tasks.keys():
01146                tasks[j][kw]=vals
01147 
01148       for i in tasks.keys():      
01149          self.__client.push(tasks[i], i, True)
01150          
01151       return tasks
01152 
01153    def make_command(self, func, **kwargs):
01154       """Make command strings to be distributed to engines
01155 
01156       @func function name 
01157       @kwargs **kwargs available 
01158 
01159       Example:
01160       x=np.ones((3,3))
01161       c.make_command(func=None,c={1:x},d=6,t='b',s={0:'y'})
01162       {0: 's="y"; t="b"; d=6',
01163        1: 'c=array([[ 1., 1., 1.],\n[ 1., 1., 1.],\n[ 1., 1., 1.]]); t="b"; d=6'}
01164       c.make_command(func='g',c={1:x},d=6,t='b',s={0:'y'})
01165       {0: 'g(s="y", t="b", d=6)',
01166        1: 'g(c=array([[1., 1., 1.],\n[1., 1., 1.],\n[1., 1., 1.]]), t="b", d=6)'}
01167       """
01168       
01169       casalog.origin("parallel_go")
01170 
01171       tasks=self.pgk(**kwargs)
01172 
01173       if func!=None and type(func)!=str:
01174         casalog.post("func must be a str","WARN","make_command")
01175         return None
01176 
01177       if len(tasks)==0:
01178         casalog.post("Parameters not specified","WARN","make_command")
01179         return None
01180 
01181       if func==None or len(str.strip(func))==0:
01182          func=''
01183 
01184       func=str.strip(func)
01185 
01186       cmds=dict()
01187       for i in tasks.keys(): 
01188         cmd=''
01189         for (k, v) in tasks[i].iteritems():
01190           cmd+=k+'='
01191           if type(v)==str:
01192               cmd+='"'+v+'"'
01193           elif type(v)==np.ndarray:
01194               cmd+=repr(v)
01195           else:
01196               cmd+=str(v)
01197           if func=='':
01198             cmd+='; '
01199           else:
01200             cmd+=', '
01201         cmd=cmd[0:-2]
01202         if func!='':
01203            cmd=func+'('+cmd+')'
01204         cmds[i]=cmd
01205 
01206       return cmds
01207   
01208                
01209    def parallel_go_keywords(self, **kwargs):
01210       """Parallel execution to set keywords
01211 
01212 
01213       """
01214       self.pgk(**kwargs)
01215 
01216    def hello(self):
01217       """Parallel execution to print 'hello' message from all engines
01218 
01219 
01220       """
01221       
01222       casalog.origin("parallel_go")
01223       
01224       casalog.post("Hello CASA Controller","INFO","hello")
01225       
01226       if self.get_engines() != []:
01227          return self.__client.execute('casalog.origin("parallel_go");casalog.post("Hello CASA Controller","INFO","hello")')
01228       else:
01229          return None
01230 
01231    def __set_cwds(self, clusterdir):
01232       """Set current working dir for all engines
01233 
01234 
01235       """
01236       # This is not very useful because dirs are generally different cross nodes 
01237       self.__client.execute('import os')
01238       self.__client.push(dict(clusterdir=clusterdir))
01239       self.__client.execute('os.chdir(clusterdir)')
01240       self.__client.execute("user=self.__user")
01241       self.__client.execute('print user')
01242       self.__client.execute('import socket')
01243       self.__client.execute('host=socket.gethostname()')
01244       self.__client.execute('print host')
01245       self.__client.execute('print os.getcwd()')
01246 
01247    def get_ids(self):
01248       """get ids for all available engines
01249 
01250 
01251       """
01252       try:
01253          return self.__client.get_ids()
01254       except:
01255          # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
01256          # traceback.print_tb(sys.exc_info()[2])
01257          return []
01258 
01259    def get_nodes(self):
01260       """get hostnames for all available engines
01261 
01262 
01263       """
01264       from sets import Set 
01265       elist=[]
01266       for i in self.__engines:
01267          elist.append(i[1])
01268       return list(Set(elist))
01269 
01270    def get_engines(self):
01271       """get current status of the engines
01272 
01273 
01274       """
01275       return self.__engines
01276 
01277    def get_stdout(self,cmd):
01278       """get the standard output from all engines for execting a comment
01279 
01280 
01281       """
01282       return commands.getstatusoutput(cmd)      
01283 
01284    def pdo(self,job):
01285       """parallel execution of a job
01286 
01287 
01288       """
01289       return self.__client.execute(job)
01290 
01291    def odo(self,job,nodes):
01292       """execute a job on a subset of engines
01293 
01294 
01295       """
01296       return self.__client.execute(job,block=False,targets=nodes)
01297   
01298    def execute(self,job,nodes):
01299       """execute a job on a subset of engines in blocking mode
01300 
01301 
01302       """
01303       return self.__client.execute(job,block=True,targets=nodes)  
01304   
01305    def queue_status(self):
01306       """query to queue status
01307 
01308 
01309       """
01310       return self.__client.queue_status()
01311 
01312    def clear_queue(self):
01313       """remove all jobs from the queue
01314 
01315 
01316       """
01317       return self.__client.clear_queue()
01318 
01319    def get_timer(self, timer=''):
01320       """get the eleapsed time for a timer
01321 
01322       """
01323       
01324       casalog.origin("parallel_go")
01325 
01326       base={}
01327       prop=self.__client.get_properties() 
01328       for i in self.get_ids():
01329           try:
01330              ky=prop[i]['timertype']
01331              if ky=='proc':
01332                 end=time.clock()
01333              else:
01334                 end=time.time()
01335              base[i]='%.2f sec' % (end-prop[i][timer])
01336           except:
01337              pass
01338              # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
01339              # traceback.print_tb(sys.exc_info()[2])
01340          
01341       casalog.post("Timer: %s" % str(base),"INFO","get_timer")
01342       return
01343 
01344    def set_timer(self,timer='timer',type='proc',
01345                  targets=None,block=None):
01346       """set a timer 
01347 
01348 
01349       """
01350       
01351       if self.__client==None:
01352           return
01353 
01354       properties={}
01355       if type=='proc':
01356           properties[timer]=time.clock()
01357       else:
01358           properties[timer]=time.time()
01359 
01360       properties['timertype']=type
01361 
01362       self.__client.set_properties(properties,
01363                targets, block)
01364 
01365    def del_timer(self, timer=['']):
01366       """delete a timer
01367 
01368       """
01369       
01370       casalog.origin("parallel_go")
01371 
01372       for i in self.get_ids():
01373           self.__client.del_properties(timer, i)
01374           casalog.post("Delete timer %s %s" % (str(timer),str(i)),"INFO","del_timer")
01375 
01376       return
01377 
01378    def get_properties(self):
01379       """get the set properties from all engines
01380 
01381 
01382       """
01383       return self.__client.get_properties()
01384 
01385    def set_properties(self, properties, 
01386                       targets=None, block=None):
01387       """set properties for target engines
01388 
01389       @param properties a dictionary its keys are 
01390 
01391 
01392       """
01393       self.__client.set_properties(
01394                 properties, targets, block)
01395 
01396 
01397    def keys(self):
01398       """get all keys from all engines
01399 
01400 
01401       """
01402       return self.__client.keys()
01403 
01404    def push(self, **kwargs):
01405       """set values to the engines
01406       @param kekword value to distribute
01407       @param targets, the engines of interest
01408       By default, this function set the keyword values to all engines.
01409       To set values on a subset of engines, use kekword parameter targets,
01410       whick takes integer or array of integer of engine ids.
01411       You can also use function pgk to set values onto the engines. 
01412       Example:
01413       c.push(a=[1,3,7.1])
01414       c.pull('a')
01415       {0: [1, 3, 7.0999999999999996], 1: [1, 3, 7.0999999999999996]}
01416       c.push(b=[1.2,3.7], targets=1)
01417       c.pull('b',[1])
01418       {1: [1.2, 3.7000000000000002]}
01419       c.pull('b')
01420       {1: [1.2, 3.7000000000000002]}
01421 
01422       """
01423       
01424       casalog.origin("parallel_go")
01425 
01426       keys = kwargs.keys()
01427       #keys.sort()
01428       if len(keys)==0:
01429           return False
01430 
01431       tgt=[]
01432       targets=None
01433       for kw in keys:
01434          if kw.lower()=='targets':
01435             targets=kwargs[kw]
01436             break
01437 
01438       if targets=='all' or targets==None or \
01439          type(targets)==list and len(targets)==0:
01440           tgt=list(xrange(0, len(self.__engines))) 
01441       elif type(targets)==list:
01442           for j in targets:
01443               if type(j)==types.IntType and j>=0:
01444                   tgt.append(j)
01445       elif type(targets)==int and targets>=0:
01446           tgt.append(targets)
01447            
01448       if len(tgt)==0:
01449            casalog.post("There are no target engines","WARN","push")
01450            return False
01451 
01452       ok=True
01453       for i in tgt: 
01454           try:
01455               self.__client.push(dict(kwargs),i)
01456           except:
01457               pass
01458               # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
01459               # traceback.print_tb(sys.exc_info()[2])
01460 
01461       return ok
01462 
01463    def pull(self, key, targets='all'):
01464       """get the value of a key
01465       @param key the var of interest
01466       @param targets, the engines of interest
01467       Example:
01468       c.pgc({0:'ya=3',1:'ya="b"'})
01469       c.pull('ya')
01470       {0: 3, 1: 'b'}
01471       c.pull('ya',[1])
01472       {1: 'b'}
01473       c.pull('ya',1)
01474       {1: 'b'}
01475 
01476       """
01477       
01478       casalog.origin("parallel_go")
01479       
01480       base={} 
01481       tgt=[]
01482       if targets=='all' or \
01483          type(targets)==list and len(targets)==0:
01484           tgt=list(xrange(0, len(self.__engines))) 
01485       elif type(targets)==list:
01486           for j in targets:
01487               if type(j)==types.IntType and j>=0:
01488                   tgt.append(j)
01489       elif type(targets)==int and targets>=0:
01490           tgt.append(targets)
01491            
01492       if len(tgt)==0:
01493            casalog.post("There are no target engines","WARN","push")
01494            return base
01495 
01496       for i in tgt: 
01497           rslt=None
01498           try:
01499               rslt=self.__client.pull(key,i)
01500           except:
01501               # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
01502               # traceback.print_tb(sys.exc_info()[2])
01503               pass 
01504           if rslt!=None:
01505               base[i]=rslt[0]
01506 
01507       return base
01508 
01509    def get_result(self, i):
01510       """get the result of previous execution
01511 
01512 
01513       """
01514       
01515       casalog.origin("parallel_go")
01516       
01517       # jagonzal (CAS-4375): We have to capture the engine's exceptions at this level
01518       try:
01519            res = self.__client.get_result()[i]
01520       except client.CompositeError, exception:
01521            casalog.post("Error retrieving result from engine %s: %s" % (str(i),str(exception)),"SEVERE","get_result")
01522            exception.print_tracebacks()
01523            res = None
01524       except:
01525            # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
01526            casalog.post("Error retrieving result from engine %s" % (str(i)),"SEVERE","get_result")
01527            traceback.print_tb(sys.exc_info()[2])
01528 
01529       return res
01530 
01531    def activate(self):
01532       """set the cluster to parallel execution mode
01533 
01534   
01535       """
01536       return self.__client.activate()
01537 
01538 
01539    def parallel_go_task(self,taskname=None,outfile='',
01540                    target=[],ipython_globals=None):
01541        """ Make parallel tasks using current input values 
01542 
01543 
01544        """
01545        self.pgt(taskname,outfile,target,ipython_globals)
01546 
01547    def pgt(self, taskname=None,outfile='',
01548                     target=[],ipython_globals=None):
01549        """ Make parallel tasks using current input values 
01550    
01551        taskname -- Name of task
01552            default: None = current active task; 
01553            example: taskname='bandpass'
01554            <Options: type tasklist() for the complete list>
01555        outfile -- Output file for the task inputs
01556            default: '' = taskname.parallel;
01557            example: outfile=taskname.orion
01558        target -- List of integer parallel engine ids
01559            default: [] = all current active engines;
01560            example: target=[0,2,4]
01561    
01562        """
01563        
01564        casalog.origin("parallel_go")
01565        
01566        base={} 
01567        for j in target:
01568            if type(j)!=types.IntType or j<0:
01569                casalog.post("engine id %s must be a positive integer" % str(j),"WARN","pgt")
01570                return base
01571 
01572        if len(target)==0:
01573            target=list(xrange(0, len(self.__engines))) 
01574        if len(target)==0:
01575            casalog.post("There are no target engines","WARN","pgt")
01576            return base
01577 
01578        try:
01579            if ipython_globals == None:
01580                t=len(inspect.stack())-1 
01581                myf=sys._getframe(t).f_globals
01582            else:
01583                myf=ipython_globals
01584    
01585            if taskname==None or taskname=='' or \
01586               type(taskname)!=str:
01587                taskname=myf['taskname']
01588    
01589            if outfile=='' or outfile==None or \
01590               type(outfile)!=str:
01591                outfile=taskname+'.parallel'
01592    
01593            tname=myf[taskname]
01594            if not myf.has_key(taskname) and \
01595               str(type(tname))!="<type 'instance'>" and \
01596               not hasattr(tname,"defaults"):
01597                raise TypeError("task %s is not defined " %
01598                                taskname)
01599            else:
01600                myf['taskname']=taskname
01601                myf['update_params'](func=myf['taskname'],
01602                      printtext=False, ipython_globals=myf)
01603            
01604            for j in target:
01605                script=taskname+'('
01606                for k in myf[taskname].parameters:
01607                    par=myf[taskname].parameters[k]
01608                    if type(par)==dict:  
01609                        val=par
01610                        for v in par.keys():
01611                            if type(v)==types.IntType and j==v:
01612                                val=par[v]
01613                                break
01614                            elif type(v)==str:
01615                                a=-1
01616                                try:
01617                                    a=int(v)
01618                                except:
01619                                    # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
01620                                    # traceback.print_tb(sys.exc_info()[2])
01621                                    pass
01622                                if a!=-1 and a==j:
01623                                    val=par[v]
01624                                    break
01625                        if type(val)==str:
01626                            script=script+k+"='"+val+"',"
01627                        else:
01628                            script=script+k+"="+str(val)+","
01629                    elif type(par)==str:
01630                        script=script+k+"='"+par+"',"
01631                    else:
01632                        script=script+k+"="+str(par)+","
01633                script=script.rstrip(',')
01634                script=script+')'
01635                base[j]=script
01636            return base
01637        except TypeError, e:
01638            casalog.post("TypeError: %s" % str(e),"SEVERE","pgt")
01639 
01640 
01641    def check_job(self, job, verbose=True):
01642       """check the status of an asynch job
01643 
01644   
01645       """
01646       
01647       casalog.origin("parallel_go")
01648       
01649       if type(job)==type(None):
01650           print "job None has no status"
01651           return True
01652       try:
01653          x=job.get_result(block=False)
01654          if x==None:
01655             if verbose:
01656                casalog.post("job '%s' has not finished yet, result is pending" % job,"INFO","check_job")
01657             return False
01658          else:
01659             if verbose:
01660                casalog.post("job '%s' done" % job,"INFO","check_job")
01661             return True
01662       except:
01663          # jagonzal (CAS-4106): Properly report all the exceptions and errors in the cluster framework
01664          # traceback.print_tb(sys.exc_info()[2])
01665          return False
01666 
01667    def howto(self):
01668       print """A simple example for use the cluster
01669 from parallel_go import *
01670 c=cluster()
01671 c.start_engine('casa-dev-08',2,'/home/casa-dev-08/hye/cluster')
01672 #can use tb.clearlocks() to remove leftover
01673 c.pgc('default("split")')
01674 c.pgc('inp("split")')
01675 c.pgk(mspath='/home/casa-dev-01/hye/test/')
01676 c.pgk(msfile='ngc5921.ms')
01677 c.pgc('vis=mspath+msfile')
01678 c.pull('vis')
01679 tb.clearlocks('/home/casa-dev-01/hye/test/5921.ms')
01680 c.pgc('outputvis=work_dir+"/"+msfile+"-"+str(id)')
01681 #alternatively 
01682 #for i in c.get_ids():
01683 #    p=c.pull('work_dir')[i]
01684 #    f=c.pull('msfile')[i]
01685 #    o=p+"/"+f+"--"+str(i)
01686 #    c.pgk(outputvis={i: o})
01687 c.pull('outputvis')
01688 c.pgc('field="2"')
01689 spw=c.split_channel(0, 64)
01690 spw[0]='0:6~15'
01691 spw[3]='0:48~55'
01692 c.pgk(spw=spw)
01693 c.pgc('inp("split")')
01694 c.pgc('go("split")')
01695 c.read_casalogs()
01696 c.pgc('inp("clean")')
01697 c.pgc('vis=outputvis')
01698 c.pgk(imagetag='.clean')
01699 c.pgc('imagename=vis+imagetag')
01700 c.pgc('inp("clean")')
01701 c.pgc('go("clean")')
01702 c.pgc('import commands')
01703 c.pgc('a=commands.getstatusoutput("ls ")')
01704 """
01705 
01706    def use_often(self):
01707       print """Frequently used commands
01708 from parallel_go import *
01709 c=cluster()
01710 c.hello()
01711 c.get_ids()
01712 c.get_nodes()
01713 c.activate()
01714 px print "cluster activated"
01715 c.pdo 'print "parallel"'
01716 c.odo('print "node 0 only"', 0)
01717 c.odo('print "node 0 and 1"', [0,1])
01718 c.odo 'print "node 1 only"', 1
01719 c.queue_status()
01720 c.clear_queue()
01721 c.get_properties()
01722 c.keys()
01723 c.pull('mpi')
01724 c.get_result(1)
01725 
01726 #pg.activate()
01727 #px 'from casa_in_py import *'
01728 """
01729 
01730    def example(self):
01731       print """example: run clean on 4 engines
01732 
01733 from parallel_go import *
01734 c=cluster()
01735 c.start_engine('casa-dev-08', 4,
01736                '/home/casa-dev-08/hye/cluster')
01737 default clean
01738 mspath='/home/casa-dev-09/hye/test/ngc6503/ngc6503_output/'
01739 msname='ngc6503.ms.contsub'
01740 vis=mspath+msname
01741 wpath=[]
01742 for i in c.pull('work_dir'):
01743    wpath.append(i+'/'+msname+'.clean')
01744 
01745 imagename=c.pad_task_id(wpath)
01746 mode='channel'
01747 start=c.split_int(9, 127)
01748 nchan=40
01749 width=1
01750 calready=False
01751 gain=0.1
01752 msize=[370,250]
01753 psfmode='clark'
01754 cell=[4.,4.]
01755 niter=10000
01756 threshold=0.0003
01757 taskghting='briggs'
01758 #rmode = 'norm'
01759 robust=0.5
01760 mask = ''
01761 s=c.pgt()
01762 job=[]
01763 for i in c.get_ids():
01764     job.append(c.odo(s[i], i))
01765 
01766 c.check_job(job[0])
01767 
01768 c.get_result(0)
01769 
01770 """
01771 
01772 
01773 cluster=cluster()
01774 
01775 
01776 """
01777 for i in 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16;
01778 do ssh casa-dev-$i "ps -ef | grep hye" ; done
01779 """
01780 """
01781 c.pgc('import time', {0: 'time.sleep(10); x=5; y="y is y"', 1: 'time.sleep(12);a=7;b="b is not y"'},block=False,job="wakeup")
01782 
01783 c.pull('x',0)
01784 c.pull('y',0)
01785 c.pull('a',1)
01786 c.pull('b',1)
01787 c.odo('print x', 0)
01788 c.odo('print x', 0).r
01789 
01790 """