1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
#
# Copyright 2008 Google Inc. All Rights Reserved.
"""Test for cli."""
import unittest, os, sys, tempfile, StringIO
import common
from autotest_lib.cli import atest, topic_common, rpc
from autotest_lib.frontend.afe.json_rpc import proxy
from autotest_lib.client.common_lib.test_utils import mock
CLI_USING_PDB = False
CLI_UT_DEBUG = False
def create_file(content):
(fp, filename) = tempfile.mkstemp(text=True)
os.write(fp, content)
os.close(fp)
return filename
class ExitException(Exception):
pass
class cli_unittest(unittest.TestCase):
def setUp(self):
super(cli_unittest, self).setUp()
self.god = mock.mock_god(debug=CLI_UT_DEBUG)
self.god.stub_class_method(rpc.afe_comm, 'run')
self.god.stub_function(sys, 'exit')
def tearDown(self):
super(cli_unittest, self).tearDown()
self.god.unstub_all()
def assertEqualNoOrder(self, x, y, message=None):
self.assertEqual(set(x), set(y), message)
def assertWords(self, string, to_find=[], not_in=[]):
for word in to_find:
self.assert_(string.find(word) >= 0,
"Could not find '%s' in: %s" % (word, string))
for word in not_in:
self.assert_(string.find(word) < 0,
"Found (and shouldn't have) '%s' in: %s" % (word,
string))
def _check_output(self, out='', out_words_ok=[], out_words_no=[],
err='', err_words_ok=[], err_words_no=[]):
if out_words_ok or out_words_no:
self.assertWords(out, out_words_ok, out_words_no)
else:
self.assertEqual('', out)
if err_words_ok or err_words_no:
self.assertWords(err, err_words_ok, err_words_no)
else:
self.assertEqual('', err)
def assertOutput(self, obj, results,
out_words_ok=[], out_words_no=[],
err_words_ok=[], err_words_no=[]):
self.god.mock_io()
obj.output(results)
obj.show_all_failures()
(out, err) = self.god.unmock_io()
self._check_output(out, out_words_ok, out_words_no,
err, err_words_ok, err_words_no)
def mock_rpcs(self, rpcs):
"""rpcs is a list of tuples, each representing one RPC:
(op, **dargs, success, expected)"""
for (op, dargs, success, expected) in rpcs:
comm = rpc.afe_comm.run
if success:
comm.expect_call(op, **dargs).and_return(expected)
else:
comm.expect_call(op, **dargs).and_raises(proxy.JSONRPCException(expected))
def run_cmd(self, argv, rpcs=[], exit_code=None,
out_words_ok=[], out_words_no=[],
err_words_ok=[], err_words_no=[]):
"""Runs the command in argv.
rpcs is a list of tuples, each representing one RPC:
(op, **dargs, success, expected)
exit_code should be set if you expect the command
to fail
The words are lists of words that are expected"""
sys.argv = argv
self.mock_rpcs(rpcs)
if not (CLI_USING_PDB and CLI_UT_DEBUG):
self.god.mock_io()
if exit_code is not None:
sys.exit.expect_call(exit_code).and_raises(ExitException)
self.assertRaises(ExitException, atest.main)
else:
atest.main()
(out, err) = self.god.unmock_io()
self.god.check_playback()
self._check_output(out, out_words_ok, out_words_no,
err, err_words_ok, err_words_no)
return (out, err)
|