casa
$Rev:20696$
|
00001 from IPython.kernel import client 00002 from IPython.kernel.multiengineclient import PendingResult 00003 from IPython.kernel import error as iperror 00004 from mindpipes import mindpipes 00005 import subprocess 00006 import inspect 00007 import signal 00008 import string 00009 import atexit 00010 import time 00011 import sets 00012 import sys 00013 import os 00014 import re 00015 import casadef 00016 00017 from casac import * 00018 from IPython.Release import version 00019 casalog = casac.logsink() 00020 00021 def log_message( state, file, lines ) : 00022 if not (state['engine'].has_key('current task')) : 00023 state['engine']['current task'] = 'taskmanager' 00024 casalog.origin(str(state['engine']['current task'])) 00025 if state['out'] == "stderr" : 00026 for line in lines: 00027 casalog.post(line,"WARN") 00028 else : 00029 for line in lines: 00030 casalog.post(line,"INFO") 00031 00032 #def log_message( state, file, lines ) : 00033 # for line in lines: 00034 # print "[" + str(state['engine']['current task']) + "/" + str(state['out']) + "]: " + line 00035 00036 class taskmanager(object): 00037 "manage task engines" 00038 __dir = { 'home': os.environ['HOME'] + "/.casa/tm" } 00039 __cert = { } 00040 __furl = { } 00041 __hub = { 'init atend': False, 'proc': None, 'engines': [], 'engine hosts': { }, 00042 'mec': None, 'tasks initialized': [], 'task path': [], 'initialized': False, 00043 'log root': None, 'execute count': 0, 'result map': { }, 'pipe minder': None } 00044 __helpers = { 'ipcontroller': 'ipcontroller', 'ipengine': 'ipengine' } 00045 00046 def retrieve(self, receipt): 00047 try: 00048 if isinstance( self.__hub['result map'][receipt]['result'], PendingResult ): 00049 result = self.__hub['result map'][receipt]['result'].get_result(block=False) 00050 if result is not None: 00051 target = self.__hub['result map'][receipt]['engine']['index'] 00052 result_name = "result_%04d" % receipt 00053 self.__hub['result map'][receipt]['result'] = { 'result': self.__hub['mec'].pull( result_name, targets=[target] )[0], 'status': 'done' } 00054 self.__hub['result map'][receipt]['result output'] = result[0] 00055 if self.__hub['result map'][receipt]['result output'].has_key('stdout') : 00056 engine = self.__hub['result map'][receipt]['engine'] 00057 log_message({'out': 'stdout', 'engine': engine}, 1, self.__hub['result map'][receipt]['result output']['stdout'].splitlines() ) 00058 if self.__hub['result map'][receipt]['result output'].has_key('stderr') : 00059 log_message({'out': 'stderr', 'engine': engine}, 2, self.__hub['result map'][receipt]['result output']['stderr'].splitlines() ) 00060 return self.__hub['result map'][receipt]['result'] 00061 else: 00062 return { 'result': None, 'status': 'pending' } 00063 00064 except iperror.InvalidEngineID: 00065 if self.__hub['result map'].has_key(receipt): 00066 self.__hub['result map'][receipt]['result'] = { 'result': None, 00067 'status': 'died' } 00068 pass 00069 00070 except: 00071 #print ">>>>>>>>>>>>", sys.exc_info()[0] 00072 if self.__hub['result map'].has_key(receipt): 00073 self.__hub['result map'][receipt]['result'] = { 'result': None, 00074 'status': 'failed' } 00075 pass 00076 00077 if self.__hub['result map'].has_key(receipt): 00078 return self.__hub['result map'][receipt]['result'] 00079 else: 00080 raise Exception, "invalid receipt: " + str(receipt) 00081 00082 def execute(self, taskname, *args, **kwargs): 00083 00084 if not self.__hub['initialized']: 00085 self.__initialize( ) 00086 00087 engine = self.__find_engine( ) 00088 targets = [ engine['index'] ] 00089 00090 if self.__hub['mec'] is None : 00091 self.__setup_mec( ) 00092 00093 if engine['setup'] is not True: 00094 self.__setup_engine(engine) 00095 00096 try: 00097 engine['loaded tasks'].index(taskname) 00098 already_initialized = True 00099 except: 00100 already_initialized = False 00101 00102 if not engine['loaded tasks'].has_key(taskname) : 00103 self.__hub['mec'].execute( "from task_" + taskname + " import " + taskname, block=False, targets=targets ) 00104 engine['loaded tasks'][taskname] = True 00105 00106 count = self.__hub['execute count'] 00107 self.__hub['execute count'] = count + 1 00108 x = self.__hub['mec'].push( {'args': args, 'kwargs': kwargs}, targets=targets ) 00109 result_name = "result_%04d" % count 00110 engine['current task'] = taskname 00111 self.__hub['result map'][count] = { 'engine': engine, 00112 'result': self.__hub['mec'].execute( "casalog.origin('taskmanager'); " + \ 00113 "casalog.post('##### async task launch: " + taskname + " ########################'); " + \ 00114 "os.chdir( '" + os.getcwd( ) + "' ); " + \ 00115 result_name + " = " + taskname + "(*args,**kwargs); " + \ 00116 "os.chdir( _startup_dir_ ); " + \ 00117 "casalog.origin('taskmanager'); " + \ 00118 "casalog.post('##### async task completion: " + taskname + " ########################')", \ 00119 block=False,targets=targets) } 00120 ## not sure if any pause is necessary or not... 00121 time.sleep(0.25) 00122 return count 00123 00124 00125 def asyncwait(self, handles=[]): 00126 00127 done = (handles==[] or not type(handles)==list) 00128 while not done: 00129 for handle in handles: 00130 done = (self.retrieve(handle)['status'] == 'done') 00131 if not done: 00132 break 00133 time.sleep(10) 00134 return 00135 00136 00137 def __finalize(self): 00138 00139 if self.__hub['mec'] is not None : 00140 self.__hub['mec'].kill(controller=True) 00141 for e in self.__hub['engines'] : 00142 try: 00143 if e['proc'].pid is not None: 00144 os.kill(e['proc'].pid,signal.SIGKILL) 00145 except: 00146 pass 00147 00148 try: 00149 if self.__hub['proc'] is not None and self.__hub['proc'].pid is not None: 00150 os.kill(self.__hub['proc'].pid,signal.SIGINT) 00151 except: 00152 pass 00153 00154 self.__clean_furls( ) 00155 if os.path.exists(self.__dir['log root'] + '/last'): 00156 self.__rmdir(self.__dir['log root'] + '/last') 00157 try: 00158 os.rename(self.__dir['session log root'], self.__dir['log root'] + '/last') 00159 except: 00160 print "could not rename " + self.__dir['session log root'] + " to " + self.__dir['log root'] + '/last' + " ..." 00161 00162 00163 def __find_engine( self ) : 00164 for engine in self.__hub['engines'] : 00165 prop = self.__hub['mec'].get_properties(targets=[engine['index']],block=False) 00166 status = prop.client.queue_status()[engine['index']][1]['pending'] 00167 if engine['active'] and ( status == 'None' or re.match('^get_properties',status) is not None ): 00168 return engine 00169 for i in range(0,5): 00170 self.__start_engine( ) 00171 for j in range(0,5): 00172 for engine in self.__hub['engines'] : 00173 prop = self.__hub['mec'].get_properties(targets=[engine['index']],block=False) 00174 status = prop.client.queue_status()[engine['index']][1]['pending'] 00175 if engine['active'] and ( status == 'None' or re.match('^get_properties',status) is not None ): 00176 return engine 00177 time.sleep(0.25) 00178 raise Exception, "no engines available" 00179 00180 def __start_engine(self,host='localhost',num=1): 00181 00182 if host != 'localhost': 00183 raise Exception, "remote hosts not currently supported...\nplease report as JIRA ticket so that we know someone is interested in remote tasks" 00184 if num <= 0 : 00185 raise Exception, "__start_engine called with num == 0" 00186 00187 if not os.path.exists(self.__dir['session log root']): 00188 self.__mkdir(self.__dir['session log root']) 00189 00190 for i in range(0,20): 00191 if os.path.exists(self.__furl['engine']): 00192 break 00193 time.sleep(0.5) 00194 00195 for i in range(0,num) : 00196 engine = { } 00197 engine['host'] = host 00198 engine['active'] = True 00199 00200 if self.__hub['mec'] is None : 00201 self.__setup_mec( ) 00202 00203 ids = self.__hub['mec'].get_ids( ) 00204 if ids is None : 00205 ids = [ ] 00206 preset = sets.Set( ids ) 00207 engine['stdout'] = os.pipe( ) 00208 engine['stderr'] = os.pipe( ) 00209 self.__hub['pipe minder'].watch(engine['stdout'][0],log_message,{'out': 'stdout', 'engine': engine }) 00210 self.__hub['pipe minder'].watch(engine['stderr'][0],log_message,{'out': 'stderr', 'engine': engine }) 00211 # 00212 # Well things are in flux with ipcontroller and ipengine, no --ipythondir any longer it 00213 # uses IPYTHONDIR instead for IPython 0.10.x 00214 # 00215 if(int(version.split('.')[1]) < 10) : 00216 engine['proc'] = subprocess.Popen( [ self.__helpers['ipengine'], '--furl-file=' + self.__furl['engine'], 00217 '--ipythondir=' + self.__dir['rc'], 00218 '--logfile=' + self.__dir['session log root'] + "/engine." ], 00219 stdout=engine['stdout'][1], stderr=engine['stderr'][1]) 00220 else : 00221 engine['proc'] = subprocess.Popen( [ self.__helpers['ipengine'], '--furl-file=' + self.__furl['engine'], 00222 '--logfile=' + self.__dir['session log root'] + "/engine." ], 00223 stdout=engine['stdout'][1], stderr=engine['stderr'][1]) 00224 00225 ids = self.__hub['mec'].get_ids( ) 00226 if ids is None : 00227 ids = [ ] 00228 postset = sets.Set( ids ) 00229 try_count = 0 00230 while len(postset - preset) == 0 : 00231 ids = self.__hub['mec'].get_ids( ) 00232 if ids is None : 00233 ids = [ ] 00234 postset = sets.Set( ids ) 00235 if try_count > 20 : 00236 raise Exception, "could not start ipengine" 00237 time.sleep(0.5) 00238 try_count += 1 00239 00240 index = postset - preset; 00241 engine['index'] = index.pop( ) 00242 engine['loaded tasks'] = { } 00243 engine['setup'] = False 00244 engine['log'] = self.__dir['session log root'] + '/engine.' + str(engine['proc'].pid) + ".log" 00245 engine['active'] = True 00246 self.__hub['engines'].append(engine) 00247 00248 return len(self.__hub['engines']) - 1 00249 00250 def __initialize_helpers(self): 00251 #### 00252 #### where are our helpers? 00253 #### 00254 a=inspect.stack() 00255 stacklevel=0 00256 for k in range(len(a)): 00257 if a[k][1] == "<string>" or (string.find(a[k][1], 'ipython console') > 0 or string.find(a[k][1],"casapy.py") > 0): 00258 stacklevel=k 00259 00260 myf=sys._getframe(stacklevel).f_globals 00261 00262 ipcontroller_path = None 00263 ipengine_path = None 00264 if type(myf) == dict and myf.has_key('casa') and type(myf['casa']) == dict and \ 00265 myf['casa'].has_key('helpers') and type(myf['casa']['helpers']) == dict: 00266 00267 if myf['casa']['helpers'].has_key('ipcontroller'): 00268 self.__helpers['ipcontroller'] = myf['casa']['helpers']['ipcontroller'] #### set in casapy.py 00269 if myf['casa']['helpers'].has_key('ipengine'): 00270 self.__helpers['ipengine'] = myf['casa']['helpers']['ipengine'] #### set in casapy.py 00271 00272 00273 def __initialize(self): 00274 for host in self.__hub['engine hosts'].keys( ): 00275 if self.__hub['engine hosts'][host] > 0 : 00276 self.__start_engine(host,self.__hub['engine hosts'][host]) 00277 self.__hub['initialized'] = True 00278 00279 def __start_hub(self): 00280 self.__mkdir(self.__dir['session log root']) 00281 # 00282 # Well things are in flux with ipcontroller and ipengine, no --ipythondir any longer it 00283 # uses IPYTHONDIR instead for IPython 0.10.x 00284 # 00285 if(int(version.split('.')[1]) < 10) : 00286 self.__hub['proc'] = subprocess.Popen( [ self.__helpers['ipcontroller'], '-xy', 00287 '--client-cert-file=' + self.__cert['client'], 00288 '--engine-cert-file=' + self.__cert['engine'], 00289 '--engine-furl-file=' + self.__furl['engine'], 00290 '--multiengine-furl-file=' + self.__furl['mec'], 00291 '--task-furl-file=' + self.__furl['tc'], 00292 '--ipythondir=' + self.__dir['rc'], 00293 '--logfile=' + self.__dir['session log root'] + "/controller." ], 00294 stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) 00295 else : 00296 self.__hub['proc'] = subprocess.Popen( [ self.__helpers['ipcontroller'], '-xy', 00297 '--client-cert-file=' + self.__cert['client'], 00298 '--engine-cert-file=' + self.__cert['engine'], 00299 '--engine-furl-file=' + self.__furl['engine'], 00300 '--multiengine-furl-file=' + self.__furl['mec'], 00301 '--task-furl-file=' + self.__furl['tc'], 00302 '--logfile=' + self.__dir['session log root'] + "/controller." ], 00303 stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) 00304 00305 self.__hub['log'] = self.__dir['session log root'] + '/controller.' + str(self.__hub['proc'].pid) + ".log" 00306 00307 def __setup_mec(self): 00308 for i in range(0,10): 00309 if os.path.exists(self.__furl['mec']): 00310 break 00311 time.sleep(0.5) 00312 time.sleep(0.25) 00313 self.__hub['mec'] = client.MultiEngineClient(self.__furl['mec']) 00314 00315 def __setup_engine(self,engine) : 00316 if engine['setup'] is not True: 00317 00318 #### 00319 #### needed to allow pushing of the global 'casa' state dictionary 00320 #### 00321 a=inspect.stack() 00322 stacklevel=0 00323 for k in range(len(a)): 00324 if (string.find(a[k][1], 'ipython console') > 0): 00325 stacklevel=k 00326 myf=sys._getframe(stacklevel).f_globals 00327 00328 block = True 00329 targets = [ engine['index'] ] 00330 x = self.__hub['mec'].push( {'casa': myf['casa'] }, targets=targets ) 00331 00332 x = self.__hub['mec'].execute('import sys',targets=targets) 00333 path = self.__hub['task path'][:] 00334 path.reverse( ) 00335 for p in path: 00336 x = self.__hub['mec'].execute("sys.path.insert(0,'" + p + "')",block=block,targets=targets) 00337 00338 x = self.__hub['mec'].execute('import os',block=block,targets=targets) 00339 x = self.__hub['mec'].execute('_startup_dir_ = os.getcwd( )',block=block,targets=targets) 00340 x = self.__hub['mec'].execute('import signal',block=block,targets=targets) 00341 x = self.__hub['mec'].execute("original_sigint_handler = signal.signal(signal.SIGINT,signal.SIG_IGN)",block=block,targets=targets) 00342 x = self.__hub['mec'].execute('from taskinit import casalog',targets=targets) 00343 engine['setup'] = True 00344 00345 def __init__(self,task_path=[''],engines={'localhost': 1}): 00346 00347 self.__initialize_helpers( ) 00348 00349 if os.environ.has_key('__CASARCDIR__'): 00350 self.__dir['home'] = os.environ['__CASARCDIR__'] + "/tm" 00351 00352 self.__dir['rc'] = self.__dir['home'] + '/rc' 00353 self.__dir['furl'] = self.__dir['home'] + '/furl' 00354 self.__dir['log root'] = self.__dir['home'] + '/log' 00355 00356 self.__dir['session log root'] = self.__engine_log_root( ) 00357 00358 self.__hub['engine hosts'] = engines 00359 self.__hub['task path'] = task_path 00360 00361 self.__furl['engine'] = self.__dir['furl'] + '/ipcontroller-engine.' + str(os.getpid()) + '.furl' 00362 self.__furl['mec'] = self.__dir['furl'] + '/ipcontroller-mec.' + str(os.getpid()) + '.furl' 00363 self.__furl['tc'] = self.__dir['furl'] + '/ipcontroller-tc.' + str(os.getpid()) + '.furl' 00364 00365 self.__cert['client'] = self.__dir['furl'] + '/ipcontroller-client.' + str(os.getpid()) + '.pem' 00366 self.__cert['engine'] = self.__dir['furl'] + '/ipcontroller-engine.' + str(os.getpid()) + '.pem' 00367 00368 if self.__hub['init atend'] : 00369 atexit.register(taskmanager.__finalize,self) 00370 self.__hub['init atend'] = False 00371 00372 if self.__hub['pipe minder'] is None : 00373 self.__hub['pipe minder'] = mindpipes( ) 00374 self.__hub['pipe minder'].start( ) 00375 00376 self.__mkdir(self.__dir['rc']) 00377 self.__mkdir(self.__dir['furl']) 00378 self.__mkdir(self.__dir['log root']) 00379 00380 self.__clean_furls( ) 00381 # jagonzal (CAS-4322): Don't load task manager at the engine level 00382 if not os.environ.has_key('CASA_ENGINE'): 00383 self.__start_hub( ) 00384 00385 00386 def __clean_furls(self): 00387 if os.path.exists(self.__furl['engine']): 00388 os.unlink(self.__furl['engine']) 00389 if os.path.exists(self.__furl['mec']): 00390 os.unlink(self.__furl['mec']) 00391 if os.path.exists(self.__furl['tc']): 00392 os.unlink(self.__furl['tc']) 00393 00394 def __engine_log_root(self): 00395 return self.__dir['log root'] + '/' + str(os.getpid()) 00396 00397 def __mkdir(self,p): 00398 dirsep = '/' 00399 pv = p.split(dirsep) 00400 path = "" 00401 for i in pv: 00402 if i: 00403 path = path + "/" + i 00404 self.__mkdir1(path) 00405 00406 def __mkdir1(self,p): 00407 if os.path.exists(p): 00408 if not os.path.isdir(p): 00409 cnt = 1 00410 tmp = p + "." 00411 while os.path.exists(tmp + str(cnt)): 00412 cnt += 1 00413 os.rename(p,tmp) 00414 os.mkdir(p) 00415 else: 00416 os.mkdir(p) 00417 00418 def __rmdir(self,p): 00419 dirsep = '/' 00420 if os.path.isdir(p): 00421 contents = os.listdir(p) 00422 for f in contents: 00423 if os.path.isfile( p + dirsep + f ) or os.path.islink( p + dirsep + f ): 00424 try: 00425 os.remove( p + dirsep + f ) 00426 except: 00427 print "could not remove:", p + dirsep + f 00428 elif os.path.isdir( p + dirsep + f ): 00429 self.__rmdir( p + dirsep + f ) 00430 try: 00431 os.rmdir(p) 00432 except: 00433 print "could not remove:", p 00434 print "renaming to: ", p + ".nfs-" + str(os.getpid()) 00435 try: 00436 os.rename( p, p + ".nfs-" + str(os.getpid()) ) 00437 except: 00438 print " ...renaming failed!!!" 00439 00440 if os.environ.has_key('__CASAPY_PYTHONDIR'): 00441 tm = taskmanager( task_path=[ '', os.environ['__CASAPY_PYTHONDIR'] ] ) 00442 else: 00443 tm = taskmanager( task_path=[ '', casadef.python_library_directory ] )