diff options
Diffstat (limited to 'build/tap-driver')
-rwxr-xr-x | build/tap-driver | 284 |
1 files changed, 284 insertions, 0 deletions
diff --git a/build/tap-driver b/build/tap-driver new file mode 100755 index 0000000..eb22462 --- /dev/null +++ b/build/tap-driver @@ -0,0 +1,284 @@ +#!/usr/bin/python + +# Copyright (C) 2013 Red Hat, Inc. +# +# Cockpit is free software; you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation; either version 2.1 of the License, or +# (at your option) any later version. +# +# Cockpit is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Cockpit; If not, see <http://www.gnu.org/licenses/>. + +# +# This is a TAP driver for automake +# +# In particular it leaves stderr untouched, and is cleaner than the +# one implemented in shell that is making the rounds. +# +# This implements the automake "Custom Test Driver" protocol: +# https://www.gnu.org/software/automake/manual/html_node/Custom-Test-Drivers.html +# +# This consumes the Test Anything Protocol (ie: TAP) +# https://metacpan.org/pod/release/PETDANCE/Test-Harness-2.64/lib/Test/Harness/TAP.pod +# + +import argparse +import os +import select +import subprocess +import sys + +class Driver: + def __init__(self, command, args): + self.argv = command + self.output = "" + self.test_name = args.test_name + self.log = open(args.log_file, "w") + self.trs = open(args.trs_file, "w") + self.color_tests = args.color_tests + self.expect_failure = args.expect_failure + self.reported = { } + self.test_plan = None + self.late_plan = False + self.errored = False + self.bail_out = False + + def report(self, code, num, *args): + CODES = { + "XPASS": '\x1b[0;31m', # red + "FAIL": '\x1b[0;31m', # red + "PASS": '\x1b[0;32m', # grn + "XFAIL": '\x1b[1;32m', # lgn + "SKIP": '\x1b[1;34m', # blu + "ERROR": '\x1b[0;35m', # mgn + } + + # Print out to console + if self.color_tests: + if code in CODES: + sys.stdout.write(CODES[code]) + sys.stdout.write(code) + if self.color_tests: + sys.stdout.write('\x1b[m') + sys.stdout.write(": ") + sys.stdout.write(self.test_name) + sys.stdout.write(" ") + if num: + sys.stdout.write(str(num)) + sys.stdout.write(" ") + for arg in args: + sys.stdout.write(str(arg)) + sys.stdout.write("\n") + sys.stdout.flush() + + # Book keeping + if code in CODES: + if num != None: + self.reported[num] = code + self.trs.write(":test-result: %s\n" % code) + if code == "ERROR": + self.errored = True + + def result_pass(self, num, description): + if self.expect_failure: + self.report("XPASS", num, description) + else: + self.report("PASS", num, description) + + def result_fail(self, num, description): + if self.expect_failure: + self.report("XFAIL", num, description) + else: + self.report("FAIL", num, description) + + def result_skip(self, num, description, ok): + if self.expect_failure: + self.report("XFAIL", num, description) + else: + self.report("SKIP", num, description) + + def report_error(self, problem): + self.report("ERROR", None, problem) + + def consume_test_line(self, ok, data): + # It's an error if the caller sends a test plan in the middle of tests + if self.late_plan: + self.report_error("Got tests after late TAP test plan") + self.late_plan = False + + # Parse out a number and then description + (num, unused, description) = data.partition(" ") + try: + num = int(num) + except ValueError: + self.report_error("Invalid test number: %s" % data) + return + description = description.lstrip() + + # Special case if description starts with this, then skip + if description.lower().startswith("# skip"): + self.result_skip(num, description, ok) + elif ok: + self.result_pass(num, description) + else: + self.result_fail(num, description) + + def consume_test_plan(self, first, last): + # Only one test plan is supported + if self.test_plan: + self.report_error("Get a second TAP test plan") + return + + try: + first = int(first) + last = int(last) + except ValueError: + self.report_error("Invalid test plan: %s..%s" % (first, last)) + return + + self.test_plan = (first, last) + self.late_plan = self.reported and True or False + + def consume_bail_out(self, line): + self.bail_out = True + self.report("SKIP", 0, line) + + def drain(self): + (ready, unused, self.output) = self.output.rpartition("\n") + for line in ready.split("\n"): + self.log.write(line) + self.log.write("\n") + + if line.startswith("ok "): + self.consume_test_line(True, line[3:]) + elif line.startswith("not ok "): + self.consume_test_line(False, line[7:]) + elif line and line[0].isdigit() and ".." in line: + (first, unused, last) = line.partition("..") + self.consume_test_plan(first, last) + elif line.lower().startswith("bail out!"): + self.consume_bail_out(line) + + def execute(self): + try: + proc = subprocess.Popen(self.argv, close_fds=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + except OSError, ex: + self.report_error("Couldn't run %s: %s" % (self.argv[0], str(ex))) + return + + outf = proc.stdout.fileno() + errf = proc.stderr.fileno() + rset = [outf, errf] + while len(rset) > 0: + ret = select.select(rset, [], [], 10) + if outf in ret[0]: + data = os.read(outf, 1024) + if data == "": + if self.output: + self.output += "\n" + rset.remove(outf) + else: + self.output += data + self.drain() + if errf in ret[0]: + data = os.read(errf, 1024) + if data == "": + rset.remove(errf) + self.log.write(data) + sys.stderr.write(data) + + proc.wait() + self.returncode = proc.returncode + + def run(self): + self.execute() + + failed = False + skipped = True + + # Basic collation of results + for (num, code) in self.reported.items(): + if code == "ERROR": + self.errored = True + elif code == "FAIL" or code == "XPASS": + failed = True + if code != "SKIP": + skipped = False + + # Check the plan + if not self.errored: + if not self.test_plan: + if not self.bail_out: + self.report_error("Didn't receive a TAP test plan") + else: + for i in range(self.test_plan[0], self.test_plan[1] + 1): + if i not in self.reported: + if self.bail_out: + self.report("SKIP", i, "- bailed out") + else: + self.report("ERROR", i, "- missing test") + skipped = False + self.errored = True + + if self.errored: + self.trs.write(":global-test-result: ERROR\n") + self.trs.write(":test-global-result: ERROR\n") + self.trs.write(":recheck: no\n") + elif failed: + self.trs.write(":global-test-result: FAIL\n") + self.trs.write(":test-global-result: FAIL\n") + self.trs.write(":recheck: yes\n") + elif skipped: + self.trs.write(":global-test-result: SKIP\n") + self.trs.write(":test-global-result: SKIP\n") + self.trs.write(":recheck: no\n") + if self.errored or failed: + self.trs.write(":copy-in-global-log: yes\n") + + # Process result code + return self.errored and 1 or 0 + +class YesNoAction(argparse.Action): + def __init__(self, option_strings, dest, **kwargs): + argparse.Action.__init__(self, option_strings, dest, **kwargs) + self.metavar = "[yes|no]" + def __call__(self, parser, namespace, values, option_string=None): + if not values or "yes" in values: + setattr(namespace, self.dest, True) + else: + setattr(namespace, self.dest, False) + +def main(argv): + parser = argparse.ArgumentParser(description='Automake TAP driver') + parser.add_argument('--test-name', metavar='NAME', + help='The name of the test') + parser.add_argument('--log-file', metavar='PATH.log', required=True, + help='The .log file the driver creates') + parser.add_argument('--trs-file', metavar='PATH.trs', required=True, + help='The .trs file the driver creates') + parser.add_argument('--color-tests', default=True, action=YesNoAction, + help='Whether the console output should be colorized or not') + parser.add_argument('--expect-failure', default=False, action=YesNoAction, + help="Whether the tested program is expected to fail") + parser.add_argument('--enable-hard-errors', default=False, action=YesNoAction, + help="Whether hard errors in the tested program are treated differently") + parser.add_argument('command', nargs='+', + help="A test command line to run") + args = parser.parse_args(argv[1:]) + + if not args.test_name: + args.test_name = os.path.basename(args.command[0]) + + driver = Driver(args.command, args) + return driver.run() + +if __name__ == "__main__": + sys.exit(main(sys.argv)) |