diff options
author | Kenneth Graunke <kenneth@whitecape.org> | 2012-04-07 11:31:46 -0700 |
---|---|---|
committer | Kenneth Graunke <kenneth@whitecape.org> | 2012-04-07 11:31:46 -0700 |
commit | 9cf240c30ae6b7d318166a86055f3f44e988dea1 (patch) | |
tree | 55ee951220fd5fcec4329c2049f0e467722c43c4 /framework |
Initial commit lol :(
Diffstat (limited to 'framework')
-rw-r--r-- | framework/__init__.py | 0 | ||||
-rwxr-xr-x | framework/process.py | 49 | ||||
-rwxr-xr-x | framework/threading.py | 106 |
3 files changed, 155 insertions, 0 deletions
diff --git a/framework/__init__.py b/framework/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/framework/__init__.py diff --git a/framework/process.py b/framework/process.py new file mode 100755 index 0000000..fd8df4b --- /dev/null +++ b/framework/process.py @@ -0,0 +1,49 @@ +# +# Copyright © 2012 Intel Corporation +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the "Software"), +# to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice (including the next +# paragraph) shall be included in all copies or substantial portions of the +# Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +import sys +import time +from threading import Thread, Timer +import subprocess + +__all__ = ['runProgram'] + +def runProgram(cmd, timeout): + """Run a program with a timeout in case it never terminates. + + cmd -- List of command and arguments. See subprocess.Popen. + timeout -- Fractional number of seconds to wait before killing the program. + + Returns a 4-tuple: (stdout, stderr, return code, <finished before timeout>) + """ + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) + timeoutExpired = [] # A bool would make sense, but it needs to be mutable. + def timerCallback(): + timeoutExpired.append(True) + proc.kill() + t = Timer(timeout, timerCallback) #### XXX: USE CONFIGURABLE TIMEOUT. + t.start() + + out, err = proc.communicate() + t.cancel() + + return (out, err, proc.returncode, not timeoutExpired) diff --git a/framework/threading.py b/framework/threading.py new file mode 100755 index 0000000..29e55a0 --- /dev/null +++ b/framework/threading.py @@ -0,0 +1,106 @@ +# +# Copyright © 2012 Intel Corporation +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the "Software"), +# to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice (including the next +# paragraph) shall be included in all copies or substantial portions of the +# Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +import sys +import threading +import time +from threading import Thread, Timer +from multiprocessing import cpu_count +from collections import deque +import subprocess + +__all__ = ['processQueue'] + +class Worker(Thread): + def __init__(self, queue, handleItem): + Thread.__init__(self) + self.queue = queue + self.handle = handleItem + self.seqno = 0 + self.start() + + def run(self): + while True: + self.seqno += 1 + try: + item = self.queue.pop() + except IndexError: + break + self.handle(item) + +def processQueue(items, process, num_threads=0): + """ + Concurrently processes the given queue of items using a thread pool. + + The main thread will block until all threads are complete or until + SIGINT/KeyboardInterrupt is received. If interrupted, the main thread + will empty the queue and wait for any outstanding tasks to complete + before continuing. It does not rethrow KeyboardInterrupt. + + items -- the queue to process. Will be turned into a deque. + process -- callback for items. void function taking one argument. + num_threads -- number of worker threads to create. If 0 or None, + use the current number of CPUs. + """ + if not num_threads: + num_threads = cpu_count() + + queue = deque(items) + + workers = [Worker(queue, process) for i in range(num_threads)] + try: + for w in workers: + w.join() + except KeyboardInterrupt: + print("Interrupted. Discarding remaining tasks.") + queue.clear() + print("Waiting for threads to quiesce...") + while threading.activeCount() > 1: + pass + +def runProgram(cmd): + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) + timeoutExpired = [] + def timerCallback(): + timeoutExpired.append(True) + proc.kill() + t = Timer(3.5, timerCallback) #### XXX: USE CONFIGURABLE TIMEOUT. + t.start() + + out, err = proc.communicate() + t.cancel() + + return (out, err, timeoutExpired) + + +def process(item): + out, err, timedOut = runProgram(item) + if timedOut: + print("expired") + else: + print("success", err) + +def main(): + processAsynchronously([['/home/kwg/Projects/piglit/bin/glsl-max-varyings', '-auto'] for i in range(3)], process) + +if __name__ == "__main__": + main() |