summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThibault Saunier <thibault.saunier@collabora.com>2012-06-18 18:06:30 -0400
committerThibault Saunier <thibault.saunier@collabora.com>2012-06-21 10:00:09 -0400
commita9fcd19d525371c371b5c087dd818e017eeb61f0 (patch)
tree55af8229560ee1abf04bf15b6609995b37e93599
parent479d889f9f380a40338f247ef95370c1be813f14 (diff)
insanity: Implement a class to redirect subprocess sterr and stdout to a file
This needs to be done into separate threads to avoid deadlocks as per Python documentation
-rw-r--r--insanity/threads.py146
1 files changed, 145 insertions, 1 deletions
diff --git a/insanity/threads.py b/insanity/threads.py
index 0dd8006..f822e9f 100644
--- a/insanity/threads.py
+++ b/insanity/threads.py
@@ -25,6 +25,7 @@ Convenience methods and classes for multi-threading
# code from pitivi/threads.py
+import Queue
import threading
import gobject
import traceback
@@ -106,7 +107,7 @@ class ActionQueueThread(threading.Thread):
def run(self):
# do something
- debug("Starting in proces...")
+ debug("Starting in process...")
self._lock.acquire()
while True:
debug("queue:%d _exit:%r _abort:%r",
@@ -187,6 +188,149 @@ class ActionQueueThread(threading.Thread):
debug("lock released, result:%r", res)
return res
+class FileReadingThread(threading.Thread):
+ """
+ Helper class to implement asynchronous reading of a file
+ in a separate thread. Pushes read lines on a queue to
+ be consumed in another thread.
+ """
+
+ def __init__(self, fd, queue):
+ threading.Thread.__init__(self)
+ self._fd = fd
+ self._queue = queue
+
+ def setQueue(self, queue):
+ old_q = self._queue
+ old_q.mutex.acquire()
+ self._queue = queue
+ old_q.mutex.release()
+
+ def run(self):
+ '''The body of the tread: read lines and put them on the queue.'''
+ while True:
+ line = self._fd.readline()
+ self._queue.put(line)
+ if not line:
+ break
+
+class RedirectTerminalOuputThread(threading.Thread):
+ """
+ Class implementing terminal stderr/stdout redirection to a file
+ allowing the redirection to be changed during the lifetime of the
+ subprocess.
+
+ This needs to be done in separate threads to avoid deadlocks
+ """
+
+ def __init__(self, process, outfile_path, errfile_path):
+ threading.Thread.__init__(self)
+ self._lock = threading.Lock()
+ # if set to True, the thread will exit even though
+ # there are remaining actions
+ self._abort = False
+ # if set to True, the thread will exit
+ self._exit = False
+ # list of callables with arguments/kwargs
+ self._process = process
+
+ # The only way to avoid deadlocks is to have one thread for each of
+ # stdout and stderr (Queue is thread safe)
+ self.stdout_queue = Queue.Queue()
+ self.stdout_reader = FileReadingThread(process.stdout, self.stdout_queue)
+ self.stdout_reader.start()
+ self.stderr_queue = Queue.Queue()
+ self.stderr_reader = FileReadingThread(process.stderr, self.stderr_queue)
+ self.stderr_reader.start()
+
+ # We are working with the paths here because the file descriptor
+ # are not shared between the various threads
+ self.setStdoutFile(outfile_path)
+ self.setStderrFile(outfile_path)
+
+ def setStderrFile(self, errfile_path):
+ self._lock.acquire()
+
+ # We change the reader queue as we want to empty the current one
+ n_queue = Queue.Queue()
+ self.stderr_reader.setQueue(n_queue)
+ self._emptyErrQueue()
+ self.stderr_queue = n_queue
+ self._stderrfile = open(errfile_path, "a")
+ self._lock.release()
+
+ def setStdoutFile(self, outfile_path):
+ self._lock.acquire()
+
+ # We change the reader queue as we want to empty the current one
+ n_queue = Queue.Queue()
+ self.stdout_reader.setQueue(n_queue)
+ self._emptyOutQueue()
+ self.stdout_queue = n_queue
+
+ self._stdoutfile = open(outfile_path, "a")
+ self._lock.release()
+
+ def _emptyErrQueue(self):
+ while not self.stderr_queue.empty():
+ self._write_err(self.stderr_queue.get())
+
+ def _emptyOutQueue(self):
+ while not self.stdout_queue.empty():
+ self._write_out(self.stdout_queue.get())
+
+ def _finalize(self):
+ #Empty all the queue and make sure the reader thread are done
+ self._emptyErrQueue()
+ self._emptyOutQueue()
+ self.stdout_reader.join()
+ self.stderr_reader.join()
+
+
+ def run(self):
+ debug("Starting in process...")
+
+ while True:
+ self._lock.acquire()
+ if self._exit:
+ self._lock.release()
+
+ debug("exiting %s", self.stdout_queue)
+ self._finalize()
+ return
+
+ if not self.stderr_queue.empty():
+ self._write_err(self.stderr_queue.get())
+
+ if not self.stdout_queue.empty():
+ self._write_out(self.stdout_queue.get())
+
+ if self._abort:
+ self._lock.release()
+ debug("aborting")
+ self._finalize()
+ return
+
+ self._lock.release()
+
+ def _write_err(self, data):
+ if data and self._stderrfile:
+ self._stderrfile.writelines(data)
+
+ def _write_out(self, data):
+ if data and self._stderrfile:
+ self._stdoutfile.writelines(data)
+
+ def exit(self):
+ self._lock.acquire()
+ self._exit = True
+ self._lock.release()
+
+ def abort(self):
+ self._lock.acquire()
+ self._abort = True
+ self._lock.release()
+
class ThreadMaster(gobject.GObject):
"""