casa
$Rev:20696$
|
00001 """ 00002 This module adds multiMS support to CASA tasks. 00003 00004 Example usage: 00005 00006 1) Create the multiMS on disk. This will eventually be done by a 00007 parallel filler. 00008 00009 pCASA.create("my.ms") 00010 pCASA.add("my.ms", "spw0.ms", "some_hostname") 00011 pCASA.add("my.ms", "spw1.ms", "some_hostname") 00012 pCASA.add("my.ms", "spw2.ms", "some_other_hostname") 00013 pCASA.add("my.ms", "spw3.ms") 00014 00015 When no hostname is given (spw3.ms), the MS is assumed to be available 00016 from localhost. When finished, the contents of the multiMS can be shown 00017 with 00018 00019 pCASA.list("my.ms") 00020 00021 Named subMS entries can be removed with pCASA.remove() 00022 00023 2) Now the following 00024 00025 flagdata("my.ms", <parameters>) 00026 00027 will have the effect of running 00028 00029 flagdata("spw0.ms", <parameters>) 00030 flagdata("spw1.ms", <parameters>) 00031 flagdata("spw2.ms", <parameters>) 00032 flagdata("spw3.ms", <parameters>) 00033 00034 on the given hosts, using parallel_go. This will also work in a single- 00035 machine (multi-core) environment. In a multi-host environment, notice 00036 that parallel_go requires password-less ssh in order to connect to the 00037 remote hosts. 00038 00039 The user does not have to explicitly define the available hosts 00040 or the number of engines per host. The number of engines per host is 00041 determined (automatically) from the number of CPU cores on the 00042 local host. The available hosts are determined from the contents of 00043 the multiMS. 00044 00045 It is possible (and a recommended way to avoid scheduling bottlenecks) 00046 to wrap more subMSs into one multiMS than the number of parallel engines 00047 (for example 20 subMSs on a single 8-core machine). The parallel engines 00048 are assigned and reassigned to individual subMSs on the fly, as they 00049 become idle. 00050 """ 00051 import parallel_go 00052 import pickle 00053 import sets 00054 import socket 00055 import time 00056 import os 00057 import sys 00058 00059 debug = False 00060 00061 class pCASA: 00062 """Allows user to manipulate parallel configuration, 00063 e.g. number of engines 00064 """ 00065 def __init__(self): 00066 00067 # default to setting up N engines on each host, 00068 # where N is the number of cores on the local host 00069 00070 # Python 2.6: self.engines_per_host = multiprocessing.cpu_count() 00071 00072 # POSIX 00073 try: 00074 self.engines_per_host = int(os.sysconf('SC_NPROCESSORS_ONLN')) 00075 except (AttributeError,ValueError): 00076 # As fallback 00077 self.engines_per_host = 1 00078 00079 self.hosts = sets.Set() 00080 self.cluster = parallel_go.cluster() 00081 00082 00083 def register_host(self, hostname): 00084 self.hosts.add(hostname) 00085 00086 def start(self): 00087 00088 already_running_nodes_ip = sets.Set() 00089 # Convert to IP addresses, for reasons of uniqueness 00090 for node in self.cluster.get_nodes(): 00091 already_running_nodes_ip.add(_ip(node)) 00092 00093 for host in self.hosts: 00094 00095 if debug: 00096 print "new host: ", host, _ip(host) 00097 print "existing hosts:", already_running_nodes_ip 00098 if not _ip(host) in already_running_nodes_ip: 00099 if debug: 00100 print "Start engines on host", host 00101 self.cluster.start_engine(host, 00102 self.engines_per_host, 00103 os.getcwd()) 00104 00105 if debug: 00106 print "ipengines started on host", host 00107 00108 def _load(mms_name): 00109 """Returns the multiMS object, or throws 00110 an exception if the file could not be loaded as 00111 a multiMS""" 00112 00113 if not os.path.lexists(mms_name): 00114 raise Exception("%s does not exist" % mms_name) 00115 00116 try: 00117 f = open(mms_name, "r") 00118 mms = pickle.load(f) 00119 f.close() 00120 except: 00121 raise Exception("Could not load %s" % mms_name) 00122 00123 if not isinstance(mms, multiMS): 00124 raise Exception("%s is not a multiMS. Its python type is %s" % \ 00125 (mms_name, type(mms))) 00126 00127 if mms.version != multiMS.code_version: 00128 raise Exception(mms_name + " file version " + str(mms.version) + 00129 " is incompatible with this code version: " + 00130 str(multiMS.code_version)) 00131 return mms 00132 00133 def is_mms(mms_name): 00134 """Returns true if and only if 00135 the file with the given name contains a multiMS object 00136 """ 00137 try: 00138 _load(mms_name) 00139 return True 00140 except: 00141 return False 00142 00143 def list(mms_name): 00144 """Prints the contents of the given multiMS 00145 """ 00146 mms = _load(mms_name) 00147 00148 print mms_name, "(multiMS v" + str(mms.version) + "):" 00149 00150 for s in mms.sub_mss: 00151 print " %s %s" % (s.host, s.ms) 00152 if len(mms.sub_mss) == 0: 00153 print " <empty>" 00154 00155 def create(mms_name): 00156 """Create an empty multiMS""" 00157 00158 if os.path.lexists(mms_name): 00159 raise Exception("%s already exists" % mms_name) 00160 00161 mms = multiMS() 00162 00163 f = open(mms_name, "w") 00164 pickle.dump(mms, f) 00165 f.close() 00166 00167 def add(mms_name, subms_name, hostname = "localhost"): 00168 """Add a subMS entry to a multiMS""" 00169 00170 mms = _load(mms_name) 00171 00172 s = subMS(subms_name, hostname) 00173 mms.add(s) 00174 00175 # Overwrite existing file 00176 f = open(mms_name, "w") 00177 pickle.dump(mms, f) 00178 f.close() 00179 00180 def remove(mms_name, subms_name, hostname = ""): 00181 """Remove subMS entries from the multiMS 00182 00183 If the hostname is not given, all subMSs with the given name will 00184 be removed. Otherwise only matching subMSs on the given host are 00185 removed. """ 00186 00187 mms = _load(mms_name) 00188 if not mms.remove(subms_name, hostname): 00189 hoststr = "" 00190 if hostname != "": 00191 hoststr = " on host " + str(hostname) 00192 print ("%s does not contain a subMS with name %s" % (mms_name, subms_name)) + hoststr 00193 else: 00194 f = open(mms_name, "w") 00195 pickle.dump(mms, f) 00196 f.close() 00197 00198 if debug: 00199 print "Removed %s from %s" %(subms_name, mms_name) 00200 00201 def _ip(host): 00202 """Returns a unique IP address of the given hostname, 00203 i.e. not 127.0.0.1 for localhost but localhost's global IP 00204 """ 00205 00206 try: 00207 ip = socket.gethostbyname(host) 00208 00209 if ip == "127.0.0.1": 00210 ip = socket.gethostbyname(socket.gethostname()) 00211 00212 return ip 00213 except Exception, e: 00214 print "Could not get IP address of host '%s': %s" % (host, str(e)) 00215 raise 00216 00217 00218 00219 def _launch(engine, taskname, ms, parameters): 00220 """Launches a job""" 00221 00222 print "%s engine %s: %s(\"%s\", ...)" % \ 00223 (engine['host'], engine['id'], taskname, ms) 00224 00225 args = [] 00226 for (p, val) in parameters.items(): 00227 if p == "vis": 00228 args.append(p + " = '" + ms + "'") 00229 else: 00230 if isinstance(val, str): 00231 args.append(p + " = '" + val + "'") 00232 else: 00233 args.append(p + " = " + str(val)) 00234 00235 cmd = taskname + "(" + ", ".join(args) + ")" 00236 if debug: 00237 print cmd 00238 engine['job'] = pc.cluster.odo(cmd, engine['id']) 00239 engine['ms'] = ms 00240 engine['idle'] = False 00241 00242 def _poor_mans_wait(engines, taskname): 00243 """Returns engine id and any exception thrown by the job. 00244 The return value of the job is not made available. 00245 00246 Each pending job is polled once per second; this is inefficient 00247 and should be replaced with a 'wait' call that blocks 00248 until any job terminates and returns the ID of the job that 00249 terminated. But that 'wait' function does not seem to exist.""" 00250 00251 while True: 00252 for engine in engines.values(): 00253 if not engine['idle']: 00254 ex = None 00255 try: 00256 # get_result() will 00257 # return None: if the job is not done 00258 # return ResultList: if the job is done 00259 # rethrow the job's exception if the job threw 00260 r = engine['job'].get_result(block = False) 00261 except Exception, e: 00262 ex = e 00263 00264 if ex != None or r != None: 00265 if ex == None: 00266 state = "success" 00267 else: 00268 state = "fail" 00269 print "%s engine %s: %s(\"%s\", ...) %s" % \ 00270 (engine['host'], engine['id'], taskname, engine['ms'], 00271 state) 00272 engine['idle'] = True 00273 return (engine['id'], ex) 00274 00275 time.sleep(1) 00276 00277 def execute(taskname, parameters): 00278 """Runs the given task on the given multiMS. 00279 If any of the jobs throw an exception, execution stops, and the 00280 first exception that happened is rethrown by this function""" 00281 00282 global pc 00283 00284 mms_name = parameters['vis'] 00285 mms = _load(mms_name) 00286 00287 for s in mms.sub_mss: 00288 pc.register_host(s.host) 00289 00290 pc.start() 00291 00292 # Convert engines to a more handy format (from lists to dictionaries), 00293 # and add idleness as an engine property 00294 engines = {} 00295 for e in pc.cluster.get_engines(): 00296 engines[e[0]] = {} 00297 engines[e[0]]['id'] = e[0] 00298 engines[e[0]]['host'] = e[1] 00299 engines[e[0]]['pid'] = e[2] 00300 engines[e[0]]['idle'] = True 00301 engines[e[0]]['job'] = None 00302 00303 m = len(mms.sub_mss) 00304 00305 n = len(engines) 00306 00307 print "Process %s using %s engines" % (mms_name, n) 00308 00309 non_processed_submss = mms.sub_mss[:] 00310 00311 status = None # None: success 00312 00313 # The availble engines will be assigned to the subMSs 00314 # on the fly, as engines become idle 00315 # 00316 # Pseudo code: 00317 # 00318 # while still_data_to_process: 00319 # search for an idle engine with access to the data 00320 # if found: 00321 # launch job 00322 # else: 00323 # if there are running engines: 00324 # wait next idle engines 00325 # else: 00326 # give up 00327 # while there are running engines: 00328 # wait 00329 # 00330 while len(non_processed_submss) > 0 and status == None: 00331 if debug: 00332 print "Still to process =", non_processed_submss 00333 print "Engines =", engines 00334 00335 found = False 00336 00337 for engine in engines.values(): 00338 if engine['idle']: 00339 if debug: 00340 print "idle engine", engine['id'], "is", engine['host'], _ip(engine['host']) 00341 00342 for subms in non_processed_submss: 00343 if debug: 00344 print "subMS", subms.ms, "is", subms.host, _ip(subms.host) 00345 00346 if _ip(engine['host']) == _ip(subms.host): 00347 00348 _launch(engine, taskname, subms.ms, parameters) 00349 00350 non_processed_submss.remove(subms) 00351 00352 found = True 00353 break 00354 00355 if found: 00356 break 00357 00358 if not found: 00359 pending_job = False 00360 for engine in engines.values(): 00361 if not engine['idle']: 00362 pending_job = True 00363 break 00364 if pending_job: 00365 (engine_id, s) = _poor_mans_wait(engines, taskname) 00366 00367 # If an exception was thrown, bail out ASAP 00368 if s != None and status == None: 00369 status = (engine_id, s) 00370 00371 else: 00372 raise Exception("All " + str(len(engines)) + \ 00373 "engine(s) are idle but no engine has access to any subMS.") 00374 00375 00376 if debug: 00377 print "Still to process =", non_processed_submss 00378 print "casalogs =", pc.cluster.get_casalogs() 00379 print "ids =", pc.cluster.get_ids() 00380 print "properties =", pc.cluster.get_properties() 00381 print "engines =", pc.cluster.get_engines() 00382 print "nodes = ", pc.cluster.get_nodes() 00383 00384 pending_job = True 00385 while pending_job: 00386 pending_job = False 00387 00388 for engine in engines.values(): 00389 if not engine['idle']: 00390 pending_job = True 00391 break 00392 00393 if pending_job: 00394 (engine_id, s) = _poor_mans_wait(engines, taskname) 00395 00396 if s != None and status == None: 00397 status = (engine_id, s) 00398 00399 if status != None: 00400 print >> sys.stderr, "Engine %s error while processing %s" % \ 00401 (status[0], engines[status[0]]['ms']) 00402 00403 raise status[1] 00404 00405 00406 # A subset of a multi MS 00407 class subMS: 00408 def __init__(self, ms_name, host): 00409 self.ms = ms_name 00410 self.host = host 00411 00412 # A collection of subMSs 00413 class multiMS: 00414 00415 code_version = "0.2" 00416 # Should be incremented every time this class changes (incompatibly 00417 # wrt to its pickle representation), in order to fail cleanly if 00418 # we try to load a multiMS pickel file which was written with a previous 00419 # (incompatible) version of this class 00420 00421 def __init__(self): 00422 self.sub_mss = [] 00423 self.version = multiMS.code_version 00424 00425 def add(self, sub_ms): 00426 self.sub_mss.append(sub_ms) 00427 00428 def remove(self, subms_name, hostname = ""): 00429 """Removes all subMSs with the given MS name. 00430 The hostname must match if it is provided. 00431 Returns true if and only if a matching subMS existed""" 00432 00433 found = False 00434 00435 while True: 00436 # Remove the first match, and repeat from the beginning until 00437 # there are no more matches 00438 00439 f = False 00440 00441 for s in self.sub_mss: 00442 if s.ms == subms_name and \ 00443 (hostname == "" or s.host == hostname): 00444 00445 # Do not compare host IP addresses, 00446 # Looking up the given hostname may not be possible 00447 found = True 00448 f = True 00449 self.sub_mss.remove(s) 00450 break 00451 00452 if not f: 00453 break 00454 00455 return found 00456 00457 00458 pc = pCASA()