diff options
Diffstat (limited to 'totem/plugin/threadpool.py')
-rw-r--r-- | totem/plugin/threadpool.py | 240 |
1 files changed, 240 insertions, 0 deletions
diff --git a/totem/plugin/threadpool.py b/totem/plugin/threadpool.py new file mode 100644 index 0000000..31ac724 --- /dev/null +++ b/totem/plugin/threadpool.py @@ -0,0 +1,240 @@ +## {{{ http://code.activestate.com/recipes/203871/ (r3) +import threading +from time import sleep + +# Ensure booleans exist (not needed for Python 2.2.1 or higher) +try: + True +except NameError: + False = 0 + True = not False + +class Task: + def __init__(self, task, taskCallback, *args, **kwargs): + self.task = task + self.taskCallback = taskCallback + self.args = args + self.kwargs = kwargs + +class ThreadPool: + + """Flexible thread pool class. Creates a pool of threads, then + accepts tasks that will be dispatched to the next available + thread.""" + + def __init__(self, numThreads): + + """Initialize the thread pool with numThreads workers.""" + + self.__threads = [] + self.__resizeLock = threading.Condition(threading.Lock()) + self.__taskLock = threading.Condition(threading.Lock()) + self.__tasks = [] + self.__isJoining = False + self.setThreadCount(numThreads) + + def setThreadCount(self, newNumThreads): + + """ External method to set the current pool size. Acquires + the resizing lock, then calls the internal version to do real + work.""" + + # Can't change the thread count if we're shutting down the pool! + if self.__isJoining: + return False + + self.__resizeLock.acquire() + try: + self.__setThreadCountNolock(newNumThreads) + finally: + self.__resizeLock.release() + return True + + def __setThreadCountNolock(self, newNumThreads): + + """Set the current pool size, spawning or terminating threads + if necessary. Internal use only; assumes the resizing lock is + held.""" + + # If we need to grow the pool, do so + while newNumThreads > len(self.__threads): + newThread = ThreadPoolThread(self) + self.__threads.append(newThread) + newThread.start() + # If we need to shrink the pool, do so + while newNumThreads < len(self.__threads): + self.__threads[0].goAway() + del self.__threads[0] + + def getThreadCount(self): + + """Return the number of threads in the pool.""" + + self.__resizeLock.acquire() + try: + return len(self.__threads) + finally: + self.__resizeLock.release() + + def queueTask(self, task, taskCallback, *args, **kwargs): + + """Insert a task into the queue. task must be callable; + args and taskCallback can be None.""" + + if self.__isJoining == True: + return False + if not callable(task): + return False + + self.__taskLock.acquire() + try: + self.__tasks.append(Task(task, taskCallback, *args, **kwargs)) + return True + finally: + self.__taskLock.release() + + def getNextTask(self): + + """ Retrieve the next task from the task queue. For use + only by ThreadPoolThread objects contained in the pool.""" + + self.__taskLock.acquire() + try: + if self.__tasks == []: + return None + else: + return self.__tasks.pop(0) + finally: + self.__taskLock.release() + + def joinAll(self, waitForTasks = True, waitForThreads = True): + """ Clear the task queue and terminate all pooled threads, + optionally allowing the tasks and threads to finish.""" + + # Mark the pool as joining to prevent any more task queueing + self.__isJoining = True + + # Wait for tasks to finish + if waitForTasks: + while self.__tasks != []: + sleep(0.1) + + # Tell all the threads to quit + self.__resizeLock.acquire() + try: + # Wait until all threads have exited + if waitForThreads: + for t in self.__threads: + t.goAway() + for t in self.__threads: + t.join() + # print t,"joined" + del t + self.__setThreadCountNolock(0) + self.__isJoining = True + + # Reset the pool for potential reuse + self.__isJoining = False + finally: + self.__resizeLock.release() + + def add_task(self, task, *args, **kwargs): + self.queueTask(task, None, *args, **kwargs) + + def wait_completion(self): + self.joinAll() + + def get_num_tasks(self): + + """ Retrieve the number of tasks queued in the thread pool.""" + + self.__taskLock.acquire() + num = len(self.__tasks) + self.__taskLock.release() + return num + + def is_busy(self): + return self.get_num_tasks() != 0 + +class ThreadPoolThread(threading.Thread): + + """ Pooled thread class. """ + + threadSleepTime = 0.1 + + def __init__(self, pool): + + """ Initialize the thread and remember the pool. """ + + threading.Thread.__init__(self) + self.__pool = pool + self.__isDying = False + + def run(self): + + """ Until told to quit, retrieve the next task and execute + it, calling the callback if any. """ + + while self.__isDying == False: + task = self.__pool.getNextTask() + # If there's nothing to do, just sleep a bit + if task is None: + sleep(ThreadPoolThread.threadSleepTime) + elif task.taskCallback is None: + task.task(*task.args, **task.kwargs) + else: + task.taskCallback(task.task(*task.args, **task.kwargs)) + + def goAway(self): + + """ Exit the run loop next time through.""" + + self.__isDying = True + +# Usage example +if __name__ == "__main__": + + from random import randrange + + # Sample task 1: given a start and end value, shuffle integers, + # then sort them + + def sortTask(data): + print "SortTask starting for ", data + numbers = range(data[0], data[1]) + for a in numbers: + rnd = randrange(0, len(numbers) - 1) + a, numbers[rnd] = numbers[rnd], a + print "SortTask sorting for ", data + numbers.sort() + print "SortTask done for ", data + return "Sorter ", data + + # Sample task 2: just sleep for a number of seconds. + + def waitTask(data): + print "WaitTask starting for ", data + print "WaitTask sleeping for %d seconds" % data + sleep(data) + return "Waiter", data + + # Both tasks use the same callback + + def taskCallback(data): + print "Callback called for", data + + # Create a pool with three worker threads + + pool = ThreadPool(3) + + # Insert tasks into the queue and let them run + pool.queueTask(sortTask, taskCallback, (1000, 100000)) + pool.queueTask(waitTask, taskCallback, 5) + pool.queueTask(sortTask, taskCallback, (200, 200000)) + pool.queueTask(waitTask, taskCallback, 2) + pool.queueTask(sortTask, taskCallback, (3, 30000)) + pool.queueTask(waitTask, taskCallback, 7) + + # When all tasks are finished, allow the threads to terminate + pool.joinAll() +## end of http://code.activestate.com/recipes/203871/ }}} |