casa  $Rev:20696$
 All Classes Namespaces Files Functions Variables
pCASA.py
Go to the documentation of this file.
00001 """
00002 This module adds multiMS support to CASA tasks.
00003 
00004 Example usage:
00005 
00006     1) Create the multiMS on disk. This will eventually be done by a
00007     parallel filler.
00008 
00009       pCASA.create("my.ms")
00010       pCASA.add("my.ms", "spw0.ms", "some_hostname")
00011       pCASA.add("my.ms", "spw1.ms", "some_hostname")
00012       pCASA.add("my.ms", "spw2.ms", "some_other_hostname")
00013       pCASA.add("my.ms", "spw3.ms")
00014 
00015    When no hostname is given (spw3.ms), the MS is assumed to be available
00016    from localhost. When finished, the contents of the multiMS can be shown
00017    with
00018 
00019       pCASA.list("my.ms")
00020 
00021    Named subMS entries can be removed with pCASA.remove()
00022 
00023    2) Now the following
00024 
00025       flagdata("my.ms", <parameters>)
00026 
00027    will have the effect of running
00028 
00029       flagdata("spw0.ms", <parameters>)
00030       flagdata("spw1.ms", <parameters>)
00031       flagdata("spw2.ms", <parameters>)
00032       flagdata("spw3.ms", <parameters>)
00033 
00034    on the given hosts, using parallel_go. This will also work in a single-
00035    machine (multi-core) environment. In a multi-host environment, notice
00036    that parallel_go requires password-less ssh in order to connect to the
00037    remote hosts.
00038 
00039    The user does not have to explicitly define the available hosts
00040    or the number of engines per host. The number of engines per host is
00041    determined (automatically) from the number of CPU cores on the
00042    local host. The available hosts are determined from the contents of
00043    the multiMS.
00044 
00045    It is possible (and a recommended way to avoid scheduling bottlenecks)
00046    to wrap more subMSs into one multiMS than the number of parallel engines
00047    (for example 20 subMSs on a single 8-core machine). The parallel engines
00048    are assigned and reassigned to individual subMSs on the fly, as they
00049    become idle.
00050 """
00051 import parallel_go
00052 import pickle
00053 import sets
00054 import socket
00055 import time
00056 import os
00057 import sys
00058 
00059 debug = False
00060 
00061 class pCASA:
00062     """Allows user to manipulate parallel configuration,
00063     e.g. number of engines
00064     """
00065     def __init__(self):
00066 
00067         # default to setting up N engines on each host,
00068         # where N is the number of cores on the local host
00069 
00070         # Python 2.6:  self.engines_per_host = multiprocessing.cpu_count()
00071 
00072         # POSIX
00073         try:
00074             self.engines_per_host = int(os.sysconf('SC_NPROCESSORS_ONLN'))
00075         except (AttributeError,ValueError):
00076             # As fallback
00077             self.engines_per_host = 1
00078         
00079         self.hosts = sets.Set()
00080         self.cluster = parallel_go.cluster()
00081             
00082 
00083     def register_host(self, hostname):
00084         self.hosts.add(hostname)
00085 
00086     def start(self):
00087 
00088         already_running_nodes_ip = sets.Set()
00089         # Convert to IP addresses, for reasons of uniqueness
00090         for node in self.cluster.get_nodes():
00091             already_running_nodes_ip.add(_ip(node))
00092             
00093         for host in self.hosts:
00094 
00095             if debug:
00096                 print "new host: ", host, _ip(host)
00097                 print "existing hosts:", already_running_nodes_ip
00098             if not _ip(host) in already_running_nodes_ip:
00099                 if debug:
00100                     print "Start engines on host", host
00101                 self.cluster.start_engine(host,
00102                                           self.engines_per_host,
00103                                           os.getcwd())
00104 
00105                 if debug:
00106                     print "ipengines started on host", host
00107 
00108 def _load(mms_name):
00109     """Returns the multiMS object, or throws
00110     an exception if the file could not be loaded as
00111     a multiMS"""
00112 
00113     if not os.path.lexists(mms_name):
00114         raise Exception("%s does not exist" % mms_name)
00115     
00116     try:
00117         f = open(mms_name, "r")
00118         mms = pickle.load(f)
00119         f.close()
00120     except:
00121         raise Exception("Could not load %s" % mms_name)
00122 
00123     if not isinstance(mms, multiMS):
00124         raise Exception("%s is not a multiMS. Its python type is %s" % \
00125                         (mms_name, type(mms)))
00126         
00127     if mms.version != multiMS.code_version:
00128         raise Exception(mms_name + " file version " + str(mms.version) +
00129                         " is incompatible with this code version: " +
00130                         str(multiMS.code_version))
00131     return mms
00132 
00133 def is_mms(mms_name):
00134     """Returns true if and only if
00135     the file with the given name contains a multiMS object
00136     """
00137     try:
00138         _load(mms_name)
00139         return True
00140     except:
00141         return False
00142 
00143 def list(mms_name):
00144     """Prints the contents of the given multiMS
00145     """
00146     mms = _load(mms_name)
00147 
00148     print mms_name, "(multiMS v" + str(mms.version) + "):"
00149     
00150     for s in mms.sub_mss:
00151         print "  %s %s" % (s.host, s.ms)
00152     if len(mms.sub_mss) == 0:
00153         print "  <empty>"
00154 
00155 def create(mms_name):
00156     """Create an empty multiMS"""
00157 
00158     if os.path.lexists(mms_name):
00159         raise Exception("%s already exists" % mms_name)
00160 
00161     mms = multiMS()
00162 
00163     f = open(mms_name, "w")
00164     pickle.dump(mms, f)
00165     f.close()
00166 
00167 def add(mms_name, subms_name, hostname = "localhost"):
00168     """Add a subMS entry to a multiMS"""
00169 
00170     mms = _load(mms_name)
00171 
00172     s = subMS(subms_name, hostname)
00173     mms.add(s)
00174     
00175     # Overwrite existing file
00176     f = open(mms_name, "w")
00177     pickle.dump(mms, f)
00178     f.close()
00179 
00180 def remove(mms_name, subms_name, hostname = ""):
00181     """Remove subMS entries from the multiMS
00182 
00183     If the hostname is not given, all subMSs with the given name will
00184     be removed. Otherwise only matching subMSs on the given host are
00185     removed. """
00186 
00187     mms = _load(mms_name)
00188     if not mms.remove(subms_name, hostname):
00189         hoststr = ""
00190         if hostname != "":
00191             hoststr = " on host " + str(hostname)
00192         print ("%s does not contain a subMS with name %s" % (mms_name, subms_name)) + hoststr
00193     else:
00194         f = open(mms_name, "w")
00195         pickle.dump(mms, f)
00196         f.close()
00197         
00198         if debug:
00199             print "Removed %s from %s" %(subms_name, mms_name)
00200        
00201 def _ip(host):
00202     """Returns a unique IP address of the given hostname,
00203     i.e. not 127.0.0.1 for localhost but localhost's global IP
00204     """
00205 
00206     try:
00207         ip = socket.gethostbyname(host)
00208     
00209         if ip == "127.0.0.1":
00210             ip = socket.gethostbyname(socket.gethostname())
00211 
00212         return ip
00213     except Exception, e:
00214         print "Could not get IP address of host '%s': %s" %  (host, str(e))
00215         raise
00216                         
00217 
00218 
00219 def _launch(engine, taskname, ms, parameters):
00220     """Launches a job"""
00221 
00222     print "%s engine %s: %s(\"%s\", ...)" % \
00223           (engine['host'], engine['id'], taskname, ms)
00224 
00225     args = []
00226     for (p, val) in parameters.items():
00227         if p == "vis":
00228             args.append(p + " = '" + ms + "'")
00229         else:
00230             if isinstance(val, str):
00231                 args.append(p + " = '" + val + "'")
00232             else:
00233                 args.append(p + " = " + str(val))
00234                 
00235     cmd = taskname + "(" + ", ".join(args) + ")"
00236     if debug:
00237         print cmd
00238     engine['job'] = pc.cluster.odo(cmd, engine['id'])
00239     engine['ms'] = ms
00240     engine['idle'] = False
00241 
00242 def _poor_mans_wait(engines, taskname):
00243     """Returns engine id and any exception thrown by the job.
00244     The return value of the job is not made available.
00245     
00246     Each pending job is polled once per second; this is inefficient
00247     and should be replaced with a 'wait' call that blocks
00248     until any job terminates and returns the ID of the job that
00249     terminated. But that 'wait' function does not seem to exist."""
00250 
00251     while True:
00252         for engine in engines.values():
00253             if not engine['idle']:
00254                 ex = None
00255                 try:
00256                     # get_result() will
00257                     #   return None: if the job is not done
00258                     #   return ResultList: if the job is done
00259                     #   rethrow the job's exception if the job threw
00260                     r = engine['job'].get_result(block = False)
00261                 except Exception, e:
00262                     ex = e
00263 
00264                 if ex != None or r != None:
00265                     if ex == None:
00266                         state = "success"
00267                     else:
00268                         state = "fail"
00269                     print "%s engine %s: %s(\"%s\", ...) %s" % \
00270                       (engine['host'], engine['id'], taskname, engine['ms'],
00271                        state)
00272                     engine['idle'] = True
00273                     return (engine['id'], ex)
00274 
00275         time.sleep(1)
00276 
00277 def execute(taskname, parameters):
00278     """Runs the given task on the given multiMS.
00279     If any of the jobs throw an exception, execution stops, and the
00280     first exception that happened is rethrown by this function"""
00281 
00282     global pc
00283 
00284     mms_name = parameters['vis']
00285     mms = _load(mms_name)
00286 
00287     for s in mms.sub_mss:
00288         pc.register_host(s.host)
00289         
00290     pc.start()
00291 
00292     # Convert engines to a more handy format (from lists to dictionaries),
00293     # and add idleness as an engine property
00294     engines = {}
00295     for e in pc.cluster.get_engines():
00296         engines[e[0]] = {}
00297         engines[e[0]]['id'] = e[0]
00298         engines[e[0]]['host'] = e[1]
00299         engines[e[0]]['pid'] = e[2]
00300         engines[e[0]]['idle'] = True
00301         engines[e[0]]['job'] = None
00302         
00303     m = len(mms.sub_mss)
00304 
00305     n = len(engines)
00306 
00307     print "Process %s using %s engines" % (mms_name, n)
00308 
00309     non_processed_submss = mms.sub_mss[:]
00310 
00311     status = None   # None: success
00312 
00313     #  The availble engines will be assigned to the subMSs
00314     #  on the fly, as engines become idle
00315     #  
00316     #  Pseudo code:
00317     #
00318     #  while still_data_to_process:
00319     #      search for an idle engine with access to the data
00320     #      if found:
00321     #          launch job
00322     #      else:
00323     #          if there are running engines:
00324     #              wait next idle engines
00325     #          else:
00326     #              give up
00327     #  while there are running engines:
00328     #      wait
00329     #
00330     while len(non_processed_submss) > 0 and status == None:
00331         if debug:
00332             print "Still to process =", non_processed_submss
00333             print "Engines =", engines
00334 
00335         found = False
00336         
00337         for engine in engines.values():
00338             if engine['idle']:
00339                 if debug:
00340                     print "idle engine", engine['id'], "is", engine['host'], _ip(engine['host'])
00341             
00342                 for subms in non_processed_submss:
00343                     if debug:
00344                         print "subMS", subms.ms, "is", subms.host, _ip(subms.host)
00345 
00346                     if _ip(engine['host']) == _ip(subms.host):
00347                     
00348                         _launch(engine, taskname, subms.ms, parameters)
00349 
00350                         non_processed_submss.remove(subms)
00351                         
00352                         found = True
00353                         break
00354 
00355                 if found:
00356                     break
00357 
00358         if not found:
00359             pending_job = False
00360             for engine in engines.values():
00361                 if not engine['idle']:
00362                     pending_job = True
00363                     break
00364             if pending_job:
00365                 (engine_id, s) = _poor_mans_wait(engines, taskname)
00366 
00367                 # If an exception was thrown, bail out ASAP
00368                 if s != None and status == None:
00369                     status = (engine_id, s)
00370                     
00371             else:
00372                 raise Exception("All " + str(len(engines)) + \
00373                                 "engine(s) are idle but no engine has access to any subMS.")       
00374 
00375 
00376         if debug:
00377             print "Still to process =", non_processed_submss
00378             print "casalogs =", pc.cluster.get_casalogs()
00379             print "ids =", pc.cluster.get_ids()
00380             print "properties =", pc.cluster.get_properties()
00381             print "engines =", pc.cluster.get_engines()
00382             print "nodes = ", pc.cluster.get_nodes()
00383 
00384     pending_job = True
00385     while pending_job:
00386         pending_job = False
00387         
00388         for engine in engines.values():
00389             if not engine['idle']:
00390                 pending_job = True
00391                 break
00392             
00393         if pending_job:
00394             (engine_id, s) = _poor_mans_wait(engines, taskname)
00395             
00396             if s != None and status == None:
00397                 status = (engine_id, s)
00398 
00399     if status != None:
00400         print >> sys.stderr, "Engine %s error while processing %s" % \
00401               (status[0], engines[status[0]]['ms'])
00402         
00403         raise status[1]
00404 
00405 
00406 # A subset of a multi MS
00407 class subMS:
00408     def __init__(self, ms_name, host):
00409         self.ms = ms_name
00410         self.host = host
00411 
00412 # A collection of subMSs
00413 class multiMS:
00414 
00415     code_version = "0.2"
00416     # Should be incremented every time this class changes (incompatibly
00417     # wrt to its pickle representation), in order to fail cleanly if
00418     # we try to load a multiMS pickel file which was written with a previous
00419     # (incompatible) version of this class
00420 
00421     def __init__(self):
00422         self.sub_mss = []
00423         self.version = multiMS.code_version
00424 
00425     def add(self, sub_ms):
00426         self.sub_mss.append(sub_ms)
00427 
00428     def remove(self, subms_name, hostname = ""):
00429         """Removes all subMSs with the given MS name.
00430         The hostname must match if it is provided.
00431         Returns true if and only if a matching subMS existed"""
00432 
00433         found = False
00434         
00435         while True:
00436             # Remove the first match, and repeat from the beginning until
00437             # there are no more matches
00438 
00439             f = False
00440 
00441             for s in self.sub_mss:
00442                 if s.ms == subms_name and \
00443                        (hostname == "" or s.host == hostname):
00444 
00445                     # Do not compare host IP addresses,
00446                     # Looking up the given hostname may not be possible
00447                     found = True
00448                     f = True
00449                     self.sub_mss.remove(s)
00450                     break
00451                 
00452             if not f:
00453                 break
00454                 
00455         return found
00456 
00457 
00458 pc = pCASA()