casa
$Rev:20696$
|
00001 import threading 00002 import select 00003 import time 00004 import base64 00005 import uuid 00006 import os 00007 import posix 00008 import fcntl 00009 00010 class mindpipes (threading.Thread) : 00011 __change = threading.Lock( ) 00012 __queue_lock = threading.Lock( ) 00013 __queue = { } 00014 00015 class nub : 00016 __watcher = None 00017 __id = None 00018 def __init__(self,watcher,id): 00019 self.__watcher = watcher 00020 self.__id = str(id) 00021 def id ( self ) : 00022 return self.__id; 00023 def unwatch ( self ) : 00024 self.__watcher.remove(self.__id) 00025 00026 def __init__(self): 00027 threading.Thread.__init__(self) 00028 self.__change.acquire( ) 00029 self.setDaemon(True) 00030 00031 def watch ( self, fd, callback, data=None ): 00032 fcntl.fcntl(fd, fcntl.F_SETFL, posix.O_NDELAY) 00033 id = base64.b64encode(uuid.uuid4().bytes,'..')[0:10] 00034 self.__queue_lock.acquire( ) 00035 while self.__queue.has_key(id): 00036 id = base64.b64encode(uuid.uuid4().bytes,'..')[0:10] 00037 self.__queue[id] = { 'pipe': fd, 'callback': callback, 'buffer': '', 'data': data } 00038 self.__queue_lock.release( ) 00039 while not self.__change.locked( ) : 00040 time.sleep(0.1) 00041 self.__change.release( ) 00042 return self.nub(self,id) 00043 00044 def remove ( self, nubbin ): 00045 self.__queue_lock.acquire( ) 00046 if self.__queue.has_key(nubbin.id( )): 00047 x = self.queue.pop(nubbin.id( )) 00048 x.pop('buffer') 00049 self.__queue_lock.release( ) 00050 while not self.__change.locked( ) : 00051 time.sleep(0.1) 00052 self.__change.release( ) 00053 return x 00054 else: 00055 self.__queue_lock.release( ) 00056 return None 00057 00058 00059 def run( self ) : 00060 while ( 1 ) : 00061 self.__queue_lock.acquire( ) 00062 read_fds = [ ] 00063 for key in self.__queue.keys( ) : 00064 read_fds.append(self.__queue[key]['pipe']) 00065 self.__queue_lock.release( ) 00066 if not self.__change.locked( ): 00067 self.__change.acquire( ) 00068 while self.__change.locked( ) : 00069 if len(read_fds) > 0 : 00070 self.__select(read_fds) 00071 else: 00072 self.__change.acquire( ) 00073 break 00074 00075 def __select( self, read_fds ) : 00076 timeout = 3 00077 (read, write, excep) = select.select(read_fds, [ ], [ ], timeout) 00078 # print "select ended: (" + str(len(read)) + "," + str(len(write)) + "," + str(len(excep)) + ")" 00079 for file in read: 00080 line = os.read(file,9216) 00081 lines = line.splitlines( ) 00082 self.__queue_lock.acquire( ) 00083 for key in self.__queue.keys( ) : 00084 if self.__queue[key]['pipe'] == file: 00085 if self.__queue[key]['buffer'] != '': 00086 lines[0] = self.__queue[key]['buffer'] + lines[0] 00087 self.__queue[key]['buffer'] = '' 00088 if line[len(line)-1] != '\n': 00089 self.__queue[key]['buffer'] = lines.pop() 00090 if len(lines) > 0: 00091 self.__queue[key]['callback'](self.__queue[key]['data'],file,lines) 00092 break 00093 self.__queue_lock.release( )