summaryrefslogtreecommitdiff
path: root/insanity/threads.py
blob: 32d3056870c86a50df4cec989be2d0d93464381b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# GStreamer QA system
#
#       threads.py
#
# Copyright (c) 2008, Edward Hervey <bilboed@bilboed.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
# Boston, MA 02110-1301, USA.

"""
Convenience methods and classes for multi-threading
"""

# code from pitivi/threads.py

import Queue
import threading
import gobject
import traceback
from insanity.log import error, warning, debug

class Thread(threading.Thread, gobject.GObject):
    """
    GObject-powered thread
    """

    __gsignals__ = {
        "done" : ( gobject.SIGNAL_RUN_LAST,
                   gobject.TYPE_NONE,
                   ( ))
        }

    def __init__(self):
        threading.Thread.__init__(self)
        gobject.GObject.__init__(self)

    def stop(self):
        """ stop the thread, do not override """
        self.abort()
        self.emit("done")

    def run(self):
        """ thread processing """
        self.process()
        gobject.idle_add(self.emit, "done")

    def process(self):
        """ Implement this in subclasses """
        raise NotImplementedError

    def abort(self):
        """ Abort the thread. Subclass have to implement this method ! """
        pass

gobject.type_register(Thread)

class CallbackThread(Thread):

    def __init__(self, callback, *args, **kwargs):
        Thread.__init__(self)
        self.callback = callback
        self.args = args
        self.kwargs = kwargs

    def process(self):
        self.callback(*self.args, **self.kwargs)

gobject.type_register(CallbackThread)

class ActionQueueThread(threading.Thread):
    """
    Thread for serializing actions.

    Actions can be added in queueAction()

    If you no longer wish to use this thread, add a
    notifier callback by using queueFinalAction().
    The thread will exit after calling that final action.

    If you wish to abort the thread, just call abort() and
    the Thread will return as soon as possible.
    """

    def __init__(self):
        threading.Thread.__init__(self)
        self._lock = threading.Condition()
        # if set to True, the thread will exit even though
        # there are remaining actions
        self._abort = False
        # if set to True, the thread will exit when there's
        # no longer any actions in the queue.
        self._exit = False
        # list of callables with arguments/kwargs
        self._queue = []

    def run(self):
        # do something
        debug("Starting in process...")
        self._lock.acquire()
        while True:
            debug("queue:%d _exit:%r _abort:%r",
                    len(self._queue), self._exit,
                    self._abort)
            if self._abort:
                debug("aborting")
                self._lock.release()
                return

            while len(self._queue) == 0:
                debug("queue:%d _exit:%r _abort:%r",
                        len(self._queue), self._exit,
                        self._abort)
                if self._exit:
                    self._lock.release()
                    return
                debug("waiting for cond")
                self._lock.wait()
                debug("cond was triggered")
                if self._abort:
                    self._lock.release()
                    return
            method, args, kwargs = self._queue.pop(0)
            self._lock.release()
            try:
                debug("about to call %r", method)
                method(*args, **kwargs)
            except:
                error("There was a problem calling %r", method)
                error(traceback.format_exc())
            finally:
                debug("Finished calling %r, re-acquiring lock",
                      method)
            self._lock.acquire()

    def abort(self):
        self._lock.acquire()
        self._abort = True
        self._lock.notify()
        self._lock.release()

    def queueAction(self, method, *args, **kwargs):
        """
        Queue an action.
        Returns True if the action was queued, else False.
        """
        res = False
        debug("about to queue %r", method)
        self._lock.acquire()
        debug("Got lock to queue, _abort:%r, _exit:%r",
                self._abort, self._exit)
        if not self._abort and not self._exit:
            self._queue.append((method, args, kwargs))
            self._lock.notify()
            res = True
        debug("about to release lock")
        self._lock.release()
        debug("lock released, result:%r", res)
        return res

    def queueFinalAction(self, method, *args, **kwargs):
        """
        Set a last action to be called.
        """
        res = False
        debug("about to queue %r", method)
        self._lock.acquire()
        debug("Got lock to queue, _abort:%r, _exit:%r",
                self._abort, self._exit)
        if not self._abort and not self._exit:
            self._queue.append((method, args, kwargs))
            res = True
        self._exit = True
        self._lock.notify()
        debug("about to release lock")
        self._lock.release()
        debug("lock released, result:%r", res)
        return res

class FileReadingThread(threading.Thread):
    """
    Helper class to implement asynchronous reading of a file
    in a separate thread. Pushes read lines on a queue to
    be consumed in another thread.
    """

    def __init__(self, fd, queue):
        threading.Thread.__init__(self)
        self._fd = fd
        self._queue = queue
        self._lock = threading.Condition()
        self._exit = False
        self._ignore = False

    def setQueue(self, queue):
        old_q = self._queue
        old_q.mutex.acquire()
        self._queue = queue
        old_q.mutex.release()

    def run(self):
        '''The body of the tread: read lines and put them on the queue.'''
        self._exit = False
        while True:
            line =  self._fd.readline()
            self._lock.acquire()
            if not self._ignore:
                self._queue.put(line)

            if not line:
                self._lock.release()
                break

            if self._exit:
                self._lock.release()
                return

            self._lock.release()

    def setIgnore(self, ignore):
        self._lock.acquire()
        self._ignore = ignore
        self._lock.release()

    def exit(self):
        self._lock.acquire()
        self._exit = True
        self._lock.notify()
        self._lock.release()

class RedirectTerminalOuputThread(threading.Thread):
    """
    Class implementing terminal stderr/stdout redirection to a file
    allowing the redirection to be changed during the lifetime of the
    subprocess.

    This needs to be done in separate threads to avoid deadlocks
    """

    def __init__(self, process, outfile_path, errfile_path):
        threading.Thread.__init__(self)
        self._lock = threading.Condition()
        # if set to True, the thread will exit even though
        # there are remaining actions
        self._abort = False
        # if set to True, the thread will exit
        self._exit = False
        # list of callables with arguments/kwargs
        self._process = process

        # The only way to avoid deadlocks is to have one thread for each of
        # stdout and stderr (Queue is thread safe)
        self.stdout_queue = Queue.Queue()
        self.stdout_reader = FileReadingThread(process.stdout, self.stdout_queue)
        self.stdout_reader.start()
        self.stderr_queue = Queue.Queue()
        self.stderr_reader = FileReadingThread(process.stderr, self.stderr_queue)
        self.stderr_reader.start()

        # We are working with the paths here because the file descriptor
        # are not shared between the various threads
        self.setStdoutFile(outfile_path)
        self.setStderrFile(errfile_path)

    def setStderrFile(self, errfile_path):
        self._lock.acquire()

        # We change the reader queue as we want to empty the current one
        n_queue = Queue.Queue()
        self.stderr_reader.setQueue(n_queue)
        self._emptyErrQueue()
        self.stderr_queue = n_queue
        if errfile_path:
            self._stderrfile = open(errfile_path, "a")
            self.stdout_reader.setIgnore(False)
            self._lock.notify()
        else:
            self.stdout_reader.setIgnore(True)
            self._stderrfile = None
        self._lock.release()

    def setStdoutFile(self, outfile_path):
        self._lock.acquire()

        # We change the reader queue as we want to empty the current one
        n_queue = Queue.Queue()
        self.stdout_reader.setQueue(n_queue)
        self._emptyOutQueue()
        self.stdout_queue = n_queue

        if outfile_path:
            self._stdoutfile = open(outfile_path, "a")
            self.stdout_reader.setIgnore(False)
            self._lock.notify()
        else:
            self.stdout_reader.setIgnore(True)
            self._stdoutfile = None

        self._lock.release()

    def _emptyErrQueue(self):
        while not self.stderr_queue.empty():
            self._write_err(self.stderr_queue.get())

    def _emptyOutQueue(self):
        while not self.stdout_queue.empty():
            self._write_out(self.stdout_queue.get())

    def _finalize(self):
        #Empty all the queue and make sure the reader thread are done
        self._emptyErrQueue()
        self._emptyOutQueue()
        self.stdout_reader.exit()
        self.stderr_reader.exit()


    def run(self):
        debug("Starting in process...")

        while True:
            self._lock.acquire()
            if self._stderrfile is None and self._stdoutfile is None:
                self._lock.wait()

            if self._exit:
                self._lock.release()

                debug("exiting %s", self.stdout_queue)
                self._finalize()
                return

            if not self.stderr_queue.empty():
                self._write_err(self.stderr_queue.get())

            if not self.stdout_queue.empty():
                self._write_out(self.stdout_queue.get())

            if self._abort:
                self._lock.release()
                debug("aborting")
                self._finalize()
                return

            self._lock.release()

    def _write_err(self, data):
        if data and self._stderrfile:
            self._stderrfile.writelines(data)

    def _write_out(self, data):
        if data and self._stderrfile:
            self._stdoutfile.writelines(data)

    def exit(self):
        self._lock.acquire()
        self._exit = True
        self._lock.notify()
        self._lock.release()

    def abort(self):
        self._lock.acquire()
        self._abort = True
        self._lock.notify()
        self._lock.release()


class ThreadMaster(gobject.GObject):
    """
    Controls all thread
    """

    def __init__(self):
        gobject.GObject.__init__(self)
        self.threads = []

    def addThread(self, threadclass, *args):
        # IDEA : We might need a limit of concurrent threads ?
        # ... or some priorities ?
        # FIXME : we should only accept subclasses of our Thread class
        debug("Adding thread of type %r" % threadclass)
        thread = threadclass(*args)
        thread.connect("done", self._threadDoneCb)
        self.threads.append(thread)
        debug("starting it...")
        thread.start()
        debug("started !")

    def _threadDoneCb(self, thread):
        debug("thread %r is done" % thread)
        self.threads.remove(thread)

    def stopAllThreads(self):
        debug("stopping all threads")
        joinedthreads = 0
        while(joinedthreads < len(self.threads)):
            for thread in self.threads:
                debug("Trying to stop thread %r" % thread)
                try:
                    thread.join()
                    joinedthreads += 1
                except:
                    warning("what happened ??")