casa  $Rev:20696$
 All Classes Namespaces Files Functions Variables
mindpipes.py
Go to the documentation of this file.
00001 import threading
00002 import select
00003 import time
00004 import base64
00005 import uuid
00006 import os
00007 import posix
00008 import fcntl
00009 
00010 class mindpipes (threading.Thread) :
00011     __change = threading.Lock( )
00012     __queue_lock = threading.Lock( )
00013     __queue = { }
00014 
00015     class nub :
00016         __watcher = None
00017         __id = None
00018         def __init__(self,watcher,id):
00019             self.__watcher = watcher
00020             self.__id = str(id)
00021         def id ( self ) :
00022             return self.__id;
00023         def unwatch ( self ) :
00024             self.__watcher.remove(self.__id)
00025 
00026     def __init__(self):
00027         threading.Thread.__init__(self)
00028         self.__change.acquire( )
00029         self.setDaemon(True)
00030 
00031     def watch ( self, fd, callback, data=None ):
00032         fcntl.fcntl(fd, fcntl.F_SETFL, posix.O_NDELAY)
00033         id = base64.b64encode(uuid.uuid4().bytes,'..')[0:10]
00034         self.__queue_lock.acquire( )
00035         while self.__queue.has_key(id):
00036             id = base64.b64encode(uuid.uuid4().bytes,'..')[0:10]
00037         self.__queue[id] = { 'pipe': fd, 'callback': callback, 'buffer': '', 'data': data }
00038         self.__queue_lock.release( )
00039         while not self.__change.locked( ) :
00040             time.sleep(0.1)
00041         self.__change.release( )
00042         return self.nub(self,id)
00043 
00044     def remove ( self, nubbin ):
00045         self.__queue_lock.acquire( )
00046         if self.__queue.has_key(nubbin.id( )):
00047             x = self.queue.pop(nubbin.id( ))
00048             x.pop('buffer')
00049             self.__queue_lock.release( )
00050             while not self.__change.locked( ) :
00051                 time.sleep(0.1)
00052             self.__change.release( )
00053             return x
00054         else:
00055             self.__queue_lock.release( )
00056             return None
00057 
00058 
00059     def run( self ) :
00060         while ( 1 ) :
00061             self.__queue_lock.acquire( )
00062             read_fds = [ ]
00063             for key in self.__queue.keys( ) :
00064                 read_fds.append(self.__queue[key]['pipe'])
00065             self.__queue_lock.release( )
00066             if not self.__change.locked( ):
00067                 self.__change.acquire( )
00068             while self.__change.locked( ) :
00069                 if len(read_fds) > 0 :
00070                     self.__select(read_fds)
00071                 else:
00072                     self.__change.acquire( )
00073                     break
00074 
00075     def __select( self, read_fds ) :
00076         timeout = 3
00077         (read, write, excep) = select.select(read_fds, [ ], [ ], timeout)
00078 #       print "select ended: (" + str(len(read)) + "," + str(len(write)) + "," + str(len(excep)) + ")"
00079         for file in read:
00080             line = os.read(file,9216)
00081             lines = line.splitlines( )
00082             self.__queue_lock.acquire( )
00083             for key in self.__queue.keys( ) :
00084                 if self.__queue[key]['pipe'] == file:
00085                     if self.__queue[key]['buffer'] !=  '':
00086                         lines[0] = self.__queue[key]['buffer'] + lines[0]
00087                         self.__queue[key]['buffer'] = ''
00088                     if line[len(line)-1] != '\n':
00089                         self.__queue[key]['buffer'] = lines.pop()
00090                     if len(lines) > 0:
00091                         self.__queue[key]['callback'](self.__queue[key]['data'],file,lines)
00092                     break
00093             self.__queue_lock.release( )