summaryrefslogtreecommitdiff
path: root/server/frontend_unittest.py
blob: a80089cdc4307cf874d2bda47f6a305996bbbfe5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#!/usr/bin/python
#
# Copyright Gregory P. Smith, Google Inc 2008
# Released under the GPL v2

"""Tests for server.frontend."""

from cStringIO import StringIO
import os, sys, unittest
import common
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib import utils
from autotest_lib.client.common_lib.test_utils import mock
from autotest_lib.frontend.afe import rpc_client_lib
from autotest_lib.server import frontend

GLOBAL_CONFIG = global_config.global_config


class BaseRpcClientTest(unittest.TestCase):
    def setUp(self):
        self.god = mock.mock_god()
        self.god.mock_up(rpc_client_lib, 'rpc_client_lib')
        self.god.stub_function(utils, 'send_email')


    def tearDown(self):
        self.god.unstub_all()


class RpcClientTest(BaseRpcClientTest):
    def test_init(self):
        os.environ['LOGNAME'] = 'unittest-user'
        GLOBAL_CONFIG.override_config_value('SERVER', 'hostname', 'test-host')
        rpc_client_lib.get_proxy.expect_call(
                'http://test-host/path',
                headers={'AUTHORIZATION': 'unittest-user'})
        frontend.RpcClient('/path', None, None, None, None, None)
        self.god.check_playback()


class AFETest(BaseRpcClientTest):
    def test_result_notify(self):
        class fake_job(object):
            result = True
            name = 'nameFoo'
            id = 'idFoo'
            results_platform_map = {'NORAD' : {'Seeking_Joshua': ['WOPR']}}
        GLOBAL_CONFIG.override_config_value('SERVER', 'hostname', 'chess')
        rpc_client_lib.get_proxy.expect_call(
                'http://chess/afe/server/noauth/rpc/',
                headers={'AUTHORIZATION': 'david'})
        self.god.stub_function(utils, 'send_email')
        utils.send_email.expect_any_call()

        my_afe = frontend.AFE(user='david')

        fake_stdout = StringIO()
        self.god.stub_with(sys, 'stdout', fake_stdout)
        my_afe.result_notify(fake_job, 'userA', 'userB')
        self.god.unstub(sys, 'stdout')
        fake_stdout = fake_stdout.getvalue()

        self.god.check_playback()

        self.assert_('PASSED' in fake_stdout)
        self.assert_('WOPR' in fake_stdout)
        self.assert_('http://chess/tko/compose_query.cgi?' in fake_stdout)
        self.assert_('columns=test' in fake_stdout)
        self.assert_('rows=machine_group' in fake_stdout)
        self.assert_("condition=tag~'idFoo-%25'" in fake_stdout)
        self.assert_('title=Report' in fake_stdout)
        self.assert_('Sending email' in fake_stdout)



if __name__ == '__main__':
    unittest.main()