1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
#!/usr/bin/python
import unittest
import common
from autotest_lib.client.common_lib.test_utils import mock
from autotest_lib.database import migrate, db_utils
class UtilsTest(unittest.TestCase):
EXISTS_QUERY_BASE = ('SELECT table_name FROM information_schema.%s '
'WHERE table_schema = %%s')
DB_NAME = 'test_db'
def setUp(self):
self.god = mock.mock_god()
self.manager = self.god.create_mock_class(migrate.MigrationManager,
'manager')
self.god.stub_function(self.manager, 'execute')
self.god.stub_function(self.manager, 'get_db_name')
def tearDown(self):
self.god.unstub_all()
def test_check_exists(self):
views = ('view1', 'view2')
def _call_check_exists():
db_utils.check_exists(self.manager, views, db_utils.VIEW_TYPE)
self._setup_exists_expects(views, 'VIEWS')
_call_check_exists()
self.god.check_playback()
self._setup_exists_expects(('view1',), 'VIEWS')
self.assertRaises(Exception, _call_check_exists)
self.god.check_playback()
def test_drop_views(self):
views = ('view1', 'view2')
self._setup_exists_expects(views, 'VIEWS')
for view in views:
self.manager.execute.expect_call('DROP VIEW `%s`' % view)
db_utils.drop_views(self.manager, views)
self.god.check_playback()
def test_rename(self):
mapping = {
'table1' : 'new_table1',
'table2' : 'new_table2',
}
self._setup_exists_expects((name for name, _ in mapping.iteritems()),
'TABLES')
for name, new_name in mapping.iteritems():
self.manager.execute.expect_call(
'RENAME TABLE `%s` TO `%s`' % (name, new_name))
db_utils.rename(self.manager, mapping)
self.god.check_playback()
def _setup_exists_expects(self, names, table):
self.manager.get_db_name.expect_call().and_return(self.DB_NAME)
self.manager.execute.expect_call(
self.EXISTS_QUERY_BASE % table, self.DB_NAME).and_return(
self._create_exists_query_result(names))
def _create_exists_query_result(self, names):
return ((name, None) for name in names)
if __name__ == '__main__':
unittest.main()
|