casa  $Rev:20696$
 All Classes Namespaces Files Functions Variables
taskmanager.py
Go to the documentation of this file.
00001 from IPython.kernel import client
00002 from IPython.kernel.multiengineclient import PendingResult
00003 from IPython.kernel import error as iperror
00004 from mindpipes import mindpipes
00005 import subprocess
00006 import inspect
00007 import signal
00008 import string
00009 import atexit
00010 import time
00011 import sets
00012 import sys
00013 import os
00014 import re
00015 import casadef
00016 
00017 from  casac import *
00018 from IPython.Release import version
00019 casalog = casac.logsink()
00020 
00021 def log_message( state, file, lines ) :
00022     if not (state['engine'].has_key('current task')) :
00023             state['engine']['current task'] = 'taskmanager'
00024     casalog.origin(str(state['engine']['current task']))
00025     if state['out'] == "stderr" :
00026         for line in lines:
00027             casalog.post(line,"WARN")
00028     else :
00029         for line in lines:
00030             casalog.post(line,"INFO")
00031 
00032 #def log_message( state, file, lines ) :
00033 #    for line in lines:
00034 #        print "[" + str(state['engine']['current task']) + "/" + str(state['out']) + "]: " + line
00035 
00036 class taskmanager(object):
00037     "manage task engines"
00038     __dir = { 'home': os.environ['HOME'] + "/.casa/tm" }
00039     __cert = { }
00040     __furl = { }
00041     __hub = { 'init atend': False, 'proc': None, 'engines': [], 'engine hosts': { },
00042               'mec': None, 'tasks initialized': [], 'task path': [], 'initialized': False,
00043               'log root': None, 'execute count': 0, 'result map': { }, 'pipe minder': None }
00044     __helpers = { 'ipcontroller': 'ipcontroller', 'ipengine': 'ipengine' }
00045 
00046     def retrieve(self, receipt):
00047         try:
00048             if isinstance( self.__hub['result map'][receipt]['result'], PendingResult ):
00049                 result = self.__hub['result map'][receipt]['result'].get_result(block=False)
00050                 if result is not None:
00051                     target = self.__hub['result map'][receipt]['engine']['index']
00052                     result_name = "result_%04d" % receipt
00053                     self.__hub['result map'][receipt]['result'] = { 'result': self.__hub['mec'].pull( result_name, targets=[target] )[0], 'status': 'done' }
00054                     self.__hub['result map'][receipt]['result output'] = result[0]
00055                     if self.__hub['result map'][receipt]['result output'].has_key('stdout') :
00056                         engine = self.__hub['result map'][receipt]['engine']
00057                         log_message({'out': 'stdout', 'engine': engine}, 1, self.__hub['result map'][receipt]['result output']['stdout'].splitlines() )
00058                     if self.__hub['result map'][receipt]['result output'].has_key('stderr') :
00059                         log_message({'out': 'stderr', 'engine': engine}, 2, self.__hub['result map'][receipt]['result output']['stderr'].splitlines() )
00060                     return self.__hub['result map'][receipt]['result']
00061                 else:
00062                     return { 'result': None, 'status': 'pending' }
00063 
00064         except iperror.InvalidEngineID:
00065             if self.__hub['result map'].has_key(receipt):
00066                 self.__hub['result map'][receipt]['result'] = { 'result': None,
00067                                                                 'status': 'died' }
00068             pass
00069 
00070         except:
00071             #print ">>>>>>>>>>>>", sys.exc_info()[0]
00072             if self.__hub['result map'].has_key(receipt):
00073                 self.__hub['result map'][receipt]['result'] = { 'result': None,
00074                                                                 'status': 'failed' }
00075             pass
00076 
00077         if self.__hub['result map'].has_key(receipt):
00078             return self.__hub['result map'][receipt]['result']
00079         else:
00080             raise Exception, "invalid receipt: " + str(receipt)
00081 
00082     def execute(self, taskname, *args, **kwargs):
00083 
00084         if not self.__hub['initialized']:
00085             self.__initialize( )
00086 
00087         engine = self.__find_engine( )
00088         targets = [ engine['index'] ]
00089 
00090         if self.__hub['mec'] is None :
00091             self.__setup_mec( )
00092 
00093         if engine['setup'] is not True:
00094             self.__setup_engine(engine)
00095 
00096         try:
00097             engine['loaded tasks'].index(taskname)
00098             already_initialized = True
00099         except:
00100             already_initialized = False
00101 
00102         if not engine['loaded tasks'].has_key(taskname) :
00103             self.__hub['mec'].execute( "from task_" + taskname + " import " + taskname, block=False, targets=targets )
00104             engine['loaded tasks'][taskname] = True
00105 
00106         count = self.__hub['execute count']
00107         self.__hub['execute count'] = count + 1
00108         x = self.__hub['mec'].push( {'args': args, 'kwargs': kwargs}, targets=targets )
00109         result_name = "result_%04d" % count
00110         engine['current task'] = taskname
00111         self.__hub['result map'][count] = { 'engine': engine,
00112                                             'result': self.__hub['mec'].execute( "casalog.origin('taskmanager'); " + \
00113                                                                                  "casalog.post('##### async task launch:     " + taskname + " ########################'); " + \
00114                                                                                  "os.chdir( '" + os.getcwd( ) + "' ); " + \
00115                                                                                  result_name + " = " + taskname + "(*args,**kwargs); " + \
00116                                                                                  "os.chdir( _startup_dir_ ); " + \
00117                                                                                  "casalog.origin('taskmanager'); " + \
00118                                                                                  "casalog.post('##### async task completion: " + taskname + " ########################')", \
00119                                                                                      block=False,targets=targets) }
00120         ## not sure if any pause is necessary or not...
00121         time.sleep(0.25)
00122         return count
00123 
00124 
00125     def asyncwait(self, handles=[]):
00126         
00127         done = (handles==[] or not type(handles)==list)
00128         while not done:
00129             for handle in handles:
00130                 done = (self.retrieve(handle)['status'] == 'done')
00131                 if not done:
00132                     break
00133             time.sleep(10)
00134         return
00135 
00136 
00137     def __finalize(self):
00138 
00139         if self.__hub['mec'] is not None :
00140             self.__hub['mec'].kill(controller=True)
00141             for e in self.__hub['engines'] :
00142                 try:
00143                     if e['proc'].pid is not None:
00144                         os.kill(e['proc'].pid,signal.SIGKILL)
00145                 except:
00146                     pass
00147 
00148         try:
00149             if self.__hub['proc'] is not None and self.__hub['proc'].pid is not None:
00150                 os.kill(self.__hub['proc'].pid,signal.SIGINT)
00151         except:
00152             pass
00153 
00154         self.__clean_furls( )
00155         if os.path.exists(self.__dir['log root'] + '/last'):
00156             self.__rmdir(self.__dir['log root'] + '/last')
00157         try:
00158             os.rename(self.__dir['session log root'], self.__dir['log root'] + '/last')
00159         except:
00160             print "could not rename " + self.__dir['session log root'] + " to " + self.__dir['log root'] + '/last' + " ..."
00161 
00162 
00163     def __find_engine( self ) :
00164         for engine in self.__hub['engines'] :
00165             prop = self.__hub['mec'].get_properties(targets=[engine['index']],block=False)
00166             status = prop.client.queue_status()[engine['index']][1]['pending']
00167             if engine['active'] and ( status == 'None' or re.match('^get_properties',status) is not None ):
00168                 return engine
00169         for i in range(0,5):
00170             self.__start_engine( )
00171             for j in range(0,5):
00172                 for engine in self.__hub['engines'] :
00173                     prop = self.__hub['mec'].get_properties(targets=[engine['index']],block=False)
00174                     status = prop.client.queue_status()[engine['index']][1]['pending']
00175                     if engine['active'] and ( status == 'None' or re.match('^get_properties',status) is not None ):
00176                         return engine
00177                 time.sleep(0.25)
00178         raise Exception, "no engines available"
00179 
00180     def __start_engine(self,host='localhost',num=1):
00181 
00182         if host != 'localhost':
00183             raise Exception, "remote hosts not currently supported...\nplease report as JIRA ticket so that we know someone is interested in remote tasks"
00184         if num <= 0 :
00185             raise Exception, "__start_engine called with num == 0"
00186 
00187         if not os.path.exists(self.__dir['session log root']):
00188             self.__mkdir(self.__dir['session log root'])
00189 
00190         for i in range(0,20):
00191             if os.path.exists(self.__furl['engine']):
00192                 break
00193             time.sleep(0.5)
00194 
00195         for i in range(0,num) :
00196             engine = { }
00197             engine['host'] = host
00198             engine['active'] = True
00199 
00200             if self.__hub['mec'] is None :
00201                 self.__setup_mec( )
00202 
00203             ids = self.__hub['mec'].get_ids( )
00204             if ids is None :
00205                 ids = [ ]
00206             preset = sets.Set( ids )
00207             engine['stdout'] = os.pipe( )
00208             engine['stderr'] = os.pipe( )
00209             self.__hub['pipe minder'].watch(engine['stdout'][0],log_message,{'out': 'stdout', 'engine': engine })
00210             self.__hub['pipe minder'].watch(engine['stderr'][0],log_message,{'out': 'stderr', 'engine': engine })
00211             #
00212             # Well things are in flux with ipcontroller and ipengine, no --ipythondir any longer it
00213             # uses IPYTHONDIR instead for IPython 0.10.x
00214             #
00215             if(int(version.split('.')[1]) < 10) :
00216                 engine['proc'] = subprocess.Popen( [ self.__helpers['ipengine'], '--furl-file=' + self.__furl['engine'],
00217                                                  '--ipythondir=' + self.__dir['rc'],
00218                                                  '--logfile=' + self.__dir['session log root'] + "/engine." ],
00219                                                stdout=engine['stdout'][1], stderr=engine['stderr'][1])
00220             else :
00221                 engine['proc'] = subprocess.Popen( [ self.__helpers['ipengine'], '--furl-file=' + self.__furl['engine'],
00222                                                  '--logfile=' + self.__dir['session log root'] + "/engine." ],
00223                                                stdout=engine['stdout'][1], stderr=engine['stderr'][1])
00224 
00225             ids = self.__hub['mec'].get_ids( )
00226             if ids is None :
00227                 ids = [ ]
00228             postset = sets.Set( ids )
00229             try_count = 0
00230             while len(postset - preset) == 0 :
00231                 ids = self.__hub['mec'].get_ids( )
00232                 if ids is None :
00233                     ids = [ ]
00234                 postset = sets.Set( ids )
00235                 if try_count > 20 :
00236                     raise Exception, "could not start ipengine"
00237                 time.sleep(0.5)
00238                 try_count += 1
00239 
00240             index = postset - preset;
00241             engine['index'] = index.pop( )
00242             engine['loaded tasks'] = { }
00243             engine['setup'] = False
00244             engine['log'] = self.__dir['session log root'] + '/engine.' + str(engine['proc'].pid) + ".log"
00245             engine['active'] = True
00246             self.__hub['engines'].append(engine)
00247 
00248         return len(self.__hub['engines']) - 1
00249 
00250     def __initialize_helpers(self):
00251         ####
00252         #### where are our helpers?
00253         ####
00254         a=inspect.stack()
00255         stacklevel=0
00256         for k in range(len(a)):
00257             if a[k][1] == "<string>" or (string.find(a[k][1], 'ipython console') > 0 or string.find(a[k][1],"casapy.py") > 0):
00258                       stacklevel=k
00259 
00260         myf=sys._getframe(stacklevel).f_globals
00261 
00262         ipcontroller_path = None
00263         ipengine_path = None
00264         if type(myf) == dict and myf.has_key('casa') and type(myf['casa']) == dict and \
00265                 myf['casa'].has_key('helpers') and type(myf['casa']['helpers']) == dict:
00266 
00267             if myf['casa']['helpers'].has_key('ipcontroller'):
00268                 self.__helpers['ipcontroller'] = myf['casa']['helpers']['ipcontroller']   #### set in casapy.py
00269             if myf['casa']['helpers'].has_key('ipengine'):
00270                 self.__helpers['ipengine'] = myf['casa']['helpers']['ipengine']           #### set in casapy.py
00271 
00272 
00273     def __initialize(self):
00274         for host in self.__hub['engine hosts'].keys( ):
00275             if self.__hub['engine hosts'][host] > 0 :
00276                 self.__start_engine(host,self.__hub['engine hosts'][host])
00277         self.__hub['initialized'] = True
00278 
00279     def __start_hub(self):
00280         self.__mkdir(self.__dir['session log root'])
00281         #
00282         # Well things are in flux with ipcontroller and ipengine, no --ipythondir any longer it
00283         # uses IPYTHONDIR instead for IPython 0.10.x
00284         #
00285         if(int(version.split('.')[1]) < 10) :
00286            self.__hub['proc'] = subprocess.Popen( [ self.__helpers['ipcontroller'], '-xy',
00287                                                  '--client-cert-file=' + self.__cert['client'],
00288                                                  '--engine-cert-file=' + self.__cert['engine'],
00289                                                  '--engine-furl-file=' + self.__furl['engine'],
00290                                                  '--multiengine-furl-file=' + self.__furl['mec'],
00291                                                  '--task-furl-file=' + self.__furl['tc'],
00292                                                  '--ipythondir=' + self.__dir['rc'],
00293                                                  '--logfile=' + self.__dir['session log root'] + "/controller." ],
00294                                                stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
00295         else :
00296            self.__hub['proc'] = subprocess.Popen( [ self.__helpers['ipcontroller'], '-xy',
00297                                                  '--client-cert-file=' + self.__cert['client'],
00298                                                  '--engine-cert-file=' + self.__cert['engine'],
00299                                                  '--engine-furl-file=' + self.__furl['engine'],
00300                                                  '--multiengine-furl-file=' + self.__furl['mec'],
00301                                                  '--task-furl-file=' + self.__furl['tc'],
00302                                                  '--logfile=' + self.__dir['session log root'] + "/controller." ],
00303                                                stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
00304 
00305         self.__hub['log'] = self.__dir['session log root'] + '/controller.' + str(self.__hub['proc'].pid) + ".log"
00306 
00307     def __setup_mec(self):
00308         for i in range(0,10):
00309             if os.path.exists(self.__furl['mec']):
00310                 break
00311             time.sleep(0.5)
00312         time.sleep(0.25)
00313         self.__hub['mec'] = client.MultiEngineClient(self.__furl['mec'])
00314 
00315     def __setup_engine(self,engine) :
00316         if engine['setup'] is not True:
00317 
00318             ####
00319             #### needed to allow pushing of the global 'casa' state dictionary
00320             ####
00321             a=inspect.stack()
00322             stacklevel=0    
00323             for k in range(len(a)):
00324                 if (string.find(a[k][1], 'ipython console') > 0):
00325                     stacklevel=k
00326             myf=sys._getframe(stacklevel).f_globals
00327 
00328             block = True
00329             targets = [ engine['index'] ]
00330             x = self.__hub['mec'].push( {'casa': myf['casa'] }, targets=targets )
00331 
00332             x = self.__hub['mec'].execute('import sys',targets=targets)
00333             path = self.__hub['task path'][:]
00334             path.reverse( )
00335             for p in path:
00336                 x = self.__hub['mec'].execute("sys.path.insert(0,'" + p + "')",block=block,targets=targets)
00337 
00338             x = self.__hub['mec'].execute('import os',block=block,targets=targets)
00339             x = self.__hub['mec'].execute('_startup_dir_ = os.getcwd( )',block=block,targets=targets)
00340             x = self.__hub['mec'].execute('import signal',block=block,targets=targets)
00341             x = self.__hub['mec'].execute("original_sigint_handler = signal.signal(signal.SIGINT,signal.SIG_IGN)",block=block,targets=targets)
00342             x = self.__hub['mec'].execute('from taskinit import casalog',targets=targets)
00343             engine['setup'] = True
00344 
00345     def __init__(self,task_path=[''],engines={'localhost': 1}):
00346 
00347         self.__initialize_helpers( )
00348 
00349         if os.environ.has_key('__CASARCDIR__'):
00350             self.__dir['home'] = os.environ['__CASARCDIR__'] + "/tm"
00351 
00352         self.__dir['rc'] = self.__dir['home'] + '/rc'
00353         self.__dir['furl'] = self.__dir['home'] + '/furl'
00354         self.__dir['log root'] = self.__dir['home'] + '/log'
00355 
00356         self.__dir['session log root'] = self.__engine_log_root( )
00357 
00358         self.__hub['engine hosts'] = engines
00359         self.__hub['task path'] = task_path
00360 
00361         self.__furl['engine'] = self.__dir['furl'] + '/ipcontroller-engine.' + str(os.getpid()) + '.furl'
00362         self.__furl['mec'] = self.__dir['furl'] + '/ipcontroller-mec.' + str(os.getpid()) + '.furl'
00363         self.__furl['tc'] = self.__dir['furl'] + '/ipcontroller-tc.' + str(os.getpid()) + '.furl'
00364 
00365         self.__cert['client'] = self.__dir['furl'] + '/ipcontroller-client.' + str(os.getpid()) + '.pem'
00366         self.__cert['engine'] = self.__dir['furl'] + '/ipcontroller-engine.' + str(os.getpid()) + '.pem'
00367 
00368         if self.__hub['init atend'] :
00369             atexit.register(taskmanager.__finalize,self)
00370             self.__hub['init atend'] = False
00371 
00372         if self.__hub['pipe minder'] is None :
00373             self.__hub['pipe minder'] = mindpipes( )
00374             self.__hub['pipe minder'].start( )
00375 
00376         self.__mkdir(self.__dir['rc'])
00377         self.__mkdir(self.__dir['furl'])
00378         self.__mkdir(self.__dir['log root'])
00379 
00380         self.__clean_furls( )
00381         # jagonzal (CAS-4322): Don't load task manager at the engine level
00382         if not os.environ.has_key('CASA_ENGINE'):
00383             self.__start_hub( )
00384 
00385 
00386     def __clean_furls(self):
00387         if os.path.exists(self.__furl['engine']):
00388             os.unlink(self.__furl['engine'])
00389         if os.path.exists(self.__furl['mec']):
00390             os.unlink(self.__furl['mec'])
00391         if os.path.exists(self.__furl['tc']):
00392             os.unlink(self.__furl['tc'])
00393 
00394     def __engine_log_root(self):
00395         return self.__dir['log root'] + '/' + str(os.getpid())
00396 
00397     def __mkdir(self,p):
00398         dirsep = '/'
00399         pv = p.split(dirsep)
00400         path = ""
00401         for i in pv:
00402             if i:
00403                 path = path + "/" + i
00404                 self.__mkdir1(path)
00405 
00406     def __mkdir1(self,p):
00407         if os.path.exists(p):
00408             if not os.path.isdir(p):
00409                 cnt = 1
00410                 tmp = p + "."
00411                 while os.path.exists(tmp + str(cnt)):
00412                     cnt += 1
00413                 os.rename(p,tmp)
00414                 os.mkdir(p)
00415         else:
00416             os.mkdir(p)
00417 
00418     def __rmdir(self,p):
00419         dirsep = '/'
00420         if os.path.isdir(p):
00421             contents = os.listdir(p)
00422             for f in contents:
00423                 if os.path.isfile( p + dirsep + f ) or os.path.islink( p + dirsep + f ):
00424                     try:
00425                         os.remove( p + dirsep + f )
00426                     except:
00427                         print "could not remove:", p + dirsep + f
00428                 elif os.path.isdir( p + dirsep + f ):
00429                     self.__rmdir( p + dirsep + f )
00430             try:
00431                 os.rmdir(p)
00432             except:
00433                 print "could not remove:", p
00434                 print "renaming to:     ", p + ".nfs-" + str(os.getpid())
00435                 try:
00436                     os.rename( p, p + ".nfs-" + str(os.getpid()) )
00437                 except:
00438                     print "               ...renaming failed!!!"
00439 
00440 if os.environ.has_key('__CASAPY_PYTHONDIR'):
00441     tm = taskmanager( task_path=[ '', os.environ['__CASAPY_PYTHONDIR'] ] )
00442 else:
00443     tm = taskmanager( task_path=[ '', casadef.python_library_directory ] )