summaryrefslogtreecommitdiff
path: root/totem/plugin/threadpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'totem/plugin/threadpool.py')
-rw-r--r--totem/plugin/threadpool.py240
1 files changed, 240 insertions, 0 deletions
diff --git a/totem/plugin/threadpool.py b/totem/plugin/threadpool.py
new file mode 100644
index 0000000..31ac724
--- /dev/null
+++ b/totem/plugin/threadpool.py
@@ -0,0 +1,240 @@
+## {{{ http://code.activestate.com/recipes/203871/ (r3)
+import threading
+from time import sleep
+
+# Ensure booleans exist (not needed for Python 2.2.1 or higher)
+try:
+ True
+except NameError:
+ False = 0
+ True = not False
+
+class Task:
+ def __init__(self, task, taskCallback, *args, **kwargs):
+ self.task = task
+ self.taskCallback = taskCallback
+ self.args = args
+ self.kwargs = kwargs
+
+class ThreadPool:
+
+ """Flexible thread pool class. Creates a pool of threads, then
+ accepts tasks that will be dispatched to the next available
+ thread."""
+
+ def __init__(self, numThreads):
+
+ """Initialize the thread pool with numThreads workers."""
+
+ self.__threads = []
+ self.__resizeLock = threading.Condition(threading.Lock())
+ self.__taskLock = threading.Condition(threading.Lock())
+ self.__tasks = []
+ self.__isJoining = False
+ self.setThreadCount(numThreads)
+
+ def setThreadCount(self, newNumThreads):
+
+ """ External method to set the current pool size. Acquires
+ the resizing lock, then calls the internal version to do real
+ work."""
+
+ # Can't change the thread count if we're shutting down the pool!
+ if self.__isJoining:
+ return False
+
+ self.__resizeLock.acquire()
+ try:
+ self.__setThreadCountNolock(newNumThreads)
+ finally:
+ self.__resizeLock.release()
+ return True
+
+ def __setThreadCountNolock(self, newNumThreads):
+
+ """Set the current pool size, spawning or terminating threads
+ if necessary. Internal use only; assumes the resizing lock is
+ held."""
+
+ # If we need to grow the pool, do so
+ while newNumThreads > len(self.__threads):
+ newThread = ThreadPoolThread(self)
+ self.__threads.append(newThread)
+ newThread.start()
+ # If we need to shrink the pool, do so
+ while newNumThreads < len(self.__threads):
+ self.__threads[0].goAway()
+ del self.__threads[0]
+
+ def getThreadCount(self):
+
+ """Return the number of threads in the pool."""
+
+ self.__resizeLock.acquire()
+ try:
+ return len(self.__threads)
+ finally:
+ self.__resizeLock.release()
+
+ def queueTask(self, task, taskCallback, *args, **kwargs):
+
+ """Insert a task into the queue. task must be callable;
+ args and taskCallback can be None."""
+
+ if self.__isJoining == True:
+ return False
+ if not callable(task):
+ return False
+
+ self.__taskLock.acquire()
+ try:
+ self.__tasks.append(Task(task, taskCallback, *args, **kwargs))
+ return True
+ finally:
+ self.__taskLock.release()
+
+ def getNextTask(self):
+
+ """ Retrieve the next task from the task queue. For use
+ only by ThreadPoolThread objects contained in the pool."""
+
+ self.__taskLock.acquire()
+ try:
+ if self.__tasks == []:
+ return None
+ else:
+ return self.__tasks.pop(0)
+ finally:
+ self.__taskLock.release()
+
+ def joinAll(self, waitForTasks = True, waitForThreads = True):
+ """ Clear the task queue and terminate all pooled threads,
+ optionally allowing the tasks and threads to finish."""
+
+ # Mark the pool as joining to prevent any more task queueing
+ self.__isJoining = True
+
+ # Wait for tasks to finish
+ if waitForTasks:
+ while self.__tasks != []:
+ sleep(0.1)
+
+ # Tell all the threads to quit
+ self.__resizeLock.acquire()
+ try:
+ # Wait until all threads have exited
+ if waitForThreads:
+ for t in self.__threads:
+ t.goAway()
+ for t in self.__threads:
+ t.join()
+ # print t,"joined"
+ del t
+ self.__setThreadCountNolock(0)
+ self.__isJoining = True
+
+ # Reset the pool for potential reuse
+ self.__isJoining = False
+ finally:
+ self.__resizeLock.release()
+
+ def add_task(self, task, *args, **kwargs):
+ self.queueTask(task, None, *args, **kwargs)
+
+ def wait_completion(self):
+ self.joinAll()
+
+ def get_num_tasks(self):
+
+ """ Retrieve the number of tasks queued in the thread pool."""
+
+ self.__taskLock.acquire()
+ num = len(self.__tasks)
+ self.__taskLock.release()
+ return num
+
+ def is_busy(self):
+ return self.get_num_tasks() != 0
+
+class ThreadPoolThread(threading.Thread):
+
+ """ Pooled thread class. """
+
+ threadSleepTime = 0.1
+
+ def __init__(self, pool):
+
+ """ Initialize the thread and remember the pool. """
+
+ threading.Thread.__init__(self)
+ self.__pool = pool
+ self.__isDying = False
+
+ def run(self):
+
+ """ Until told to quit, retrieve the next task and execute
+ it, calling the callback if any. """
+
+ while self.__isDying == False:
+ task = self.__pool.getNextTask()
+ # If there's nothing to do, just sleep a bit
+ if task is None:
+ sleep(ThreadPoolThread.threadSleepTime)
+ elif task.taskCallback is None:
+ task.task(*task.args, **task.kwargs)
+ else:
+ task.taskCallback(task.task(*task.args, **task.kwargs))
+
+ def goAway(self):
+
+ """ Exit the run loop next time through."""
+
+ self.__isDying = True
+
+# Usage example
+if __name__ == "__main__":
+
+ from random import randrange
+
+ # Sample task 1: given a start and end value, shuffle integers,
+ # then sort them
+
+ def sortTask(data):
+ print "SortTask starting for ", data
+ numbers = range(data[0], data[1])
+ for a in numbers:
+ rnd = randrange(0, len(numbers) - 1)
+ a, numbers[rnd] = numbers[rnd], a
+ print "SortTask sorting for ", data
+ numbers.sort()
+ print "SortTask done for ", data
+ return "Sorter ", data
+
+ # Sample task 2: just sleep for a number of seconds.
+
+ def waitTask(data):
+ print "WaitTask starting for ", data
+ print "WaitTask sleeping for %d seconds" % data
+ sleep(data)
+ return "Waiter", data
+
+ # Both tasks use the same callback
+
+ def taskCallback(data):
+ print "Callback called for", data
+
+ # Create a pool with three worker threads
+
+ pool = ThreadPool(3)
+
+ # Insert tasks into the queue and let them run
+ pool.queueTask(sortTask, taskCallback, (1000, 100000))
+ pool.queueTask(waitTask, taskCallback, 5)
+ pool.queueTask(sortTask, taskCallback, (200, 200000))
+ pool.queueTask(waitTask, taskCallback, 2)
+ pool.queueTask(sortTask, taskCallback, (3, 30000))
+ pool.queueTask(waitTask, taskCallback, 7)
+
+ # When all tasks are finished, allow the threads to terminate
+ pool.joinAll()
+## end of http://code.activestate.com/recipes/203871/ }}}