diff options
author | Thibault Saunier <thibault.saunier@collabora.com> | 2012-06-18 18:06:30 -0400 |
---|---|---|
committer | Thibault Saunier <thibault.saunier@collabora.com> | 2012-06-21 10:00:09 -0400 |
commit | a9fcd19d525371c371b5c087dd818e017eeb61f0 (patch) | |
tree | 55af8229560ee1abf04bf15b6609995b37e93599 | |
parent | 479d889f9f380a40338f247ef95370c1be813f14 (diff) |
insanity: Implement a class to redirect subprocess sterr and stdout to a file
This needs to be done into separate threads to avoid deadlocks as per Python
documentation
-rw-r--r-- | insanity/threads.py | 146 |
1 files changed, 145 insertions, 1 deletions
diff --git a/insanity/threads.py b/insanity/threads.py index 0dd8006..f822e9f 100644 --- a/insanity/threads.py +++ b/insanity/threads.py @@ -25,6 +25,7 @@ Convenience methods and classes for multi-threading # code from pitivi/threads.py +import Queue import threading import gobject import traceback @@ -106,7 +107,7 @@ class ActionQueueThread(threading.Thread): def run(self): # do something - debug("Starting in proces...") + debug("Starting in process...") self._lock.acquire() while True: debug("queue:%d _exit:%r _abort:%r", @@ -187,6 +188,149 @@ class ActionQueueThread(threading.Thread): debug("lock released, result:%r", res) return res +class FileReadingThread(threading.Thread): + """ + Helper class to implement asynchronous reading of a file + in a separate thread. Pushes read lines on a queue to + be consumed in another thread. + """ + + def __init__(self, fd, queue): + threading.Thread.__init__(self) + self._fd = fd + self._queue = queue + + def setQueue(self, queue): + old_q = self._queue + old_q.mutex.acquire() + self._queue = queue + old_q.mutex.release() + + def run(self): + '''The body of the tread: read lines and put them on the queue.''' + while True: + line = self._fd.readline() + self._queue.put(line) + if not line: + break + +class RedirectTerminalOuputThread(threading.Thread): + """ + Class implementing terminal stderr/stdout redirection to a file + allowing the redirection to be changed during the lifetime of the + subprocess. + + This needs to be done in separate threads to avoid deadlocks + """ + + def __init__(self, process, outfile_path, errfile_path): + threading.Thread.__init__(self) + self._lock = threading.Lock() + # if set to True, the thread will exit even though + # there are remaining actions + self._abort = False + # if set to True, the thread will exit + self._exit = False + # list of callables with arguments/kwargs + self._process = process + + # The only way to avoid deadlocks is to have one thread for each of + # stdout and stderr (Queue is thread safe) + self.stdout_queue = Queue.Queue() + self.stdout_reader = FileReadingThread(process.stdout, self.stdout_queue) + self.stdout_reader.start() + self.stderr_queue = Queue.Queue() + self.stderr_reader = FileReadingThread(process.stderr, self.stderr_queue) + self.stderr_reader.start() + + # We are working with the paths here because the file descriptor + # are not shared between the various threads + self.setStdoutFile(outfile_path) + self.setStderrFile(outfile_path) + + def setStderrFile(self, errfile_path): + self._lock.acquire() + + # We change the reader queue as we want to empty the current one + n_queue = Queue.Queue() + self.stderr_reader.setQueue(n_queue) + self._emptyErrQueue() + self.stderr_queue = n_queue + self._stderrfile = open(errfile_path, "a") + self._lock.release() + + def setStdoutFile(self, outfile_path): + self._lock.acquire() + + # We change the reader queue as we want to empty the current one + n_queue = Queue.Queue() + self.stdout_reader.setQueue(n_queue) + self._emptyOutQueue() + self.stdout_queue = n_queue + + self._stdoutfile = open(outfile_path, "a") + self._lock.release() + + def _emptyErrQueue(self): + while not self.stderr_queue.empty(): + self._write_err(self.stderr_queue.get()) + + def _emptyOutQueue(self): + while not self.stdout_queue.empty(): + self._write_out(self.stdout_queue.get()) + + def _finalize(self): + #Empty all the queue and make sure the reader thread are done + self._emptyErrQueue() + self._emptyOutQueue() + self.stdout_reader.join() + self.stderr_reader.join() + + + def run(self): + debug("Starting in process...") + + while True: + self._lock.acquire() + if self._exit: + self._lock.release() + + debug("exiting %s", self.stdout_queue) + self._finalize() + return + + if not self.stderr_queue.empty(): + self._write_err(self.stderr_queue.get()) + + if not self.stdout_queue.empty(): + self._write_out(self.stdout_queue.get()) + + if self._abort: + self._lock.release() + debug("aborting") + self._finalize() + return + + self._lock.release() + + def _write_err(self, data): + if data and self._stderrfile: + self._stderrfile.writelines(data) + + def _write_out(self, data): + if data and self._stderrfile: + self._stdoutfile.writelines(data) + + def exit(self): + self._lock.acquire() + self._exit = True + self._lock.release() + + def abort(self): + self._lock.acquire() + self._abort = True + self._lock.release() + class ThreadMaster(gobject.GObject): """ |