diff options
author | showard <showard@592f7852-d20e-0410-864c-8624ca9c26a4> | 2010-01-15 00:22:22 +0000 |
---|---|---|
committer | showard <showard@592f7852-d20e-0410-864c-8624ca9c26a4> | 2010-01-15 00:22:22 +0000 |
commit | f8d915d2afac89985a6cd9213bf96a0e95a89a00 (patch) | |
tree | a29d17171ff34e285247d02e70ec78c3f6df977e | |
parent | 1590c1a328831f8bf7ef67ec7febf81be8bba58a (diff) |
Support for job keyvals
* can be passed as an argument to create_job, stored in AFE DB
* scheduler reads them from the AFE DB and writes them to the job-level keyval file before the job starts
* parser reads them from the keyval file and writes them to the TKO DB in a new table
Since the field name "key" happens to be a MySQL keyword, I went ahead and made db.py support proper quoting of field names. Evetually it'd be really nice to deprecate db.py and use Django models exclusively, but that is a far-off dream.
Still lacking support in the AFE and TKO web clients and CLIs, at least the TKO part will be coming soon
Signed-off-by: Steve Howard <showard@google.com>
git-svn-id: svn://test.kernel.org/autotest/trunk@4123 592f7852-d20e-0410-864c-8624ca9c26a4
-rw-r--r-- | frontend/afe/models.py | 22 | ||||
-rw-r--r-- | frontend/afe/rpc_interface.py | 10 | ||||
-rwxr-xr-x | frontend/afe/rpc_interface_unittest.py | 12 | ||||
-rw-r--r-- | frontend/migrations/047_job_keyvals.py | 28 | ||||
-rw-r--r-- | frontend/tko/models.py | 10 | ||||
-rwxr-xr-x | scheduler/monitor_db.py | 14 | ||||
-rwxr-xr-x | scheduler/monitor_db_functional_test.py | 11 | ||||
-rw-r--r-- | tko/db.py | 96 | ||||
-rw-r--r-- | tko/frontend.py | 4 | ||||
-rw-r--r-- | tko/models.py | 3 | ||||
-rw-r--r-- | tko/parsers/version_0.py | 2 |
11 files changed, 165 insertions, 47 deletions
diff --git a/frontend/afe/models.py b/frontend/afe/models.py index 07f1b407..41c39c01 100644 --- a/frontend/afe/models.py +++ b/frontend/afe/models.py @@ -709,6 +709,11 @@ class Job(dbmodels.Model, model_logic.ModelExtensions): created_on=datetime.now()) job.dependency_labels = options['dependencies'] + + if options['keyvals'] is not None: + for key, value in options['keyvals'].iteritems(): + JobKeyval.objects.create(job=job, key=key, value=value) + return job @@ -755,6 +760,11 @@ class Job(dbmodels.Model, model_logic.ModelExtensions): return '%s-%s' % (self.id, self.owner) + def keyval_dict(self): + return dict((keyval.key, keyval.value) + for keyval in self.jobkeyval_set.all()) + + class Meta: db_table = 'afe_jobs' @@ -762,6 +772,18 @@ class Job(dbmodels.Model, model_logic.ModelExtensions): return u'%s (%s-%s)' % (self.name, self.id, self.owner) +class JobKeyval(dbmodels.Model, model_logic.ModelExtensions): + """Keyvals associated with jobs""" + job = dbmodels.ForeignKey(Job) + key = dbmodels.CharField(max_length=90) + value = dbmodels.CharField(max_length=300) + + objects = model_logic.ExtendedManager() + + class Meta: + db_table = 'afe_job_keyvals' + + class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions): job = dbmodels.ForeignKey(Job) host = dbmodels.ForeignKey(Host) diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py index dd366023..fe56cab9 100644 --- a/frontend/afe/rpc_interface.py +++ b/frontend/afe/rpc_interface.py @@ -402,7 +402,8 @@ def create_job(name, priority, control_file, control_type, atomic_group_name=None, synch_count=None, is_template=False, timeout=None, max_runtime_hrs=None, run_verify=True, email_list='', dependencies=(), reboot_before=None, - reboot_after=None, parse_failed_repair=None, hostless=False): + reboot_after=None, parse_failed_repair=None, hostless=False, + keyvals=None): """\ Create and enqueue a job. @@ -424,6 +425,7 @@ def create_job(name, priority, control_file, control_type, @param parse_failed_repair if true, results of failed repairs launched by this job will be parsed as part of the job. @param hostless if true, create a hostless job + @param keyvals dict of keyvals to associate with the job @param hosts List of hosts to run job on. @param meta_hosts List where each entry is a label name, and for each entry @@ -531,7 +533,8 @@ def create_job(name, priority, control_file, control_type, dependencies=dependencies, reboot_before=reboot_before, reboot_after=reboot_after, - parse_failed_repair=parse_failed_repair) + parse_failed_repair=parse_failed_repair, + keyvals=keyvals) return rpc_utils.create_new_job(owner=owner, options=options, host_objects=host_objects, @@ -584,10 +587,13 @@ def get_jobs(not_yet_run=False, running=False, finished=False, **filter_data): jobs = list(models.Job.query_objects(filter_data)) models.Job.objects.populate_relationships(jobs, models.Label, 'dependencies') + models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals') for job in jobs: job_dict = job.get_object_dict() job_dict['dependencies'] = ','.join(label.name for label in job.dependencies) + job_dict['keyvals'] = dict((keyval.key, keyval.value) + for keyval in job.keyvals) job_dicts.append(job_dict) return rpc_utils.prepare_for_serialization(job_dicts) diff --git a/frontend/afe/rpc_interface_unittest.py b/frontend/afe/rpc_interface_unittest.py index 822ec1fb..c84dacfa 100755 --- a/frontend/afe/rpc_interface_unittest.py +++ b/frontend/afe/rpc_interface_unittest.py @@ -93,6 +93,18 @@ class RpcInterfaceTest(unittest.TestCase, self._check_hostnames(hosts, ['host2']) + def test_job_keyvals(self): + keyval_dict = {'mykey': 'myvalue'} + job_id = rpc_interface.create_job(name='test', priority='Medium', + control_file='foo', + control_type='Client', + hosts=['host1'], + keyvals=keyval_dict) + jobs = rpc_interface.get_jobs(id=job_id) + self.assertEquals(len(jobs), 1) + self.assertEquals(jobs[0]['keyvals'], keyval_dict) + + def test_get_jobs_summary(self): job = self._create_job(hosts=xrange(1, 4)) entries = list(job.hostqueueentry_set.all()) diff --git a/frontend/migrations/047_job_keyvals.py b/frontend/migrations/047_job_keyvals.py new file mode 100644 index 00000000..d587fd84 --- /dev/null +++ b/frontend/migrations/047_job_keyvals.py @@ -0,0 +1,28 @@ +UP_SQL = """ +CREATE TABLE `afe_job_keyvals` ( + `id` integer AUTO_INCREMENT NOT NULL PRIMARY KEY, + `job_id` integer NOT NULL, + INDEX `afe_job_keyvals_job_id` (`job_id`), + FOREIGN KEY (`job_id`) REFERENCES `afe_jobs` (`id`) ON DELETE NO ACTION, + `key` varchar(90) NOT NULL, + INDEX `afe_job_keyvals_key` (`key`), + `value` varchar(300) NOT NULL +) ENGINE=InnoDB; + +CREATE TABLE `tko_job_keyvals` ( + `id` integer AUTO_INCREMENT NOT NULL PRIMARY KEY, + `job_id` int(10) unsigned NOT NULL, + INDEX `tko_job_keyvals_job_id` (`job_id`), + FOREIGN KEY (`job_id`) REFERENCES `tko_jobs` (`job_idx`) + ON DELETE NO ACTION, + `key` varchar(90) NOT NULL, + INDEX `tko_job_keyvals_key` (`key`), + `value` varchar(300) NOT NULL +) ENGINE=InnoDB; +""" + + +DOWN_SQL = """ +DROP TABLE afe_job_keyvals; +DROP TABLE tko_job_keyvals; +""" diff --git a/frontend/tko/models.py b/frontend/tko/models.py index 61b24af5..429473d3 100644 --- a/frontend/tko/models.py +++ b/frontend/tko/models.py @@ -155,6 +155,16 @@ class Job(dbmodels.Model): db_table = 'tko_jobs' +class JobKeyval(dbmodels.Model): + job = dbmodels.ForeignKey(Job) + key = dbmodels.CharField(max_length=90) + value = dbmodels.CharField(blank=True, max_length=300) + + + class Meta: + db_table = 'tko_job_keyvals' + + class Test(dbmodels.Model, model_logic.ModelExtensions, model_logic.ModelWithAttributes): test_idx = dbmodels.AutoField(primary_key=True) diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py index f4cb8047..41647325 100755 --- a/scheduler/monitor_db.py +++ b/scheduler/monitor_db.py @@ -2097,7 +2097,8 @@ class AbstractQueueTask(AgentTask, TaskWithJobKeyvals): def prolog(self): queued_key, queued_time = self._job_queued_keyval(self.job) - keyval_dict = {queued_key: queued_time} + keyval_dict = self.job.keyval_dict() + keyval_dict[queued_key] = queued_time group_name = self.queue_entries[0].get_group_name() if group_name: keyval_dict['host_group_name'] = group_name @@ -3214,6 +3215,10 @@ class Job(DBObject): self._owner_model = None # caches model instance of owner + def model(self): + return models.Job.objects.get(id=self.id) + + def owner_model(self): # work around the fact that the Job owner field is a string, not a # foreign key @@ -3250,6 +3255,10 @@ class Job(DBObject): queue_entry.set_status(status) + def keyval_dict(self): + return self.model().keyval_dict() + + def _atomic_and_has_started(self): """ @returns True if any of the HostQueueEntries associated with this job @@ -3579,8 +3588,7 @@ class Job(DBObject): def request_abort(self): """Request that this Job be aborted on the next scheduler cycle.""" - self_model = models.Job.objects.get(id=self.id) - self_model.abort() + self.model().abort() def schedule_delayed_callback_task(self, queue_entry): diff --git a/scheduler/monitor_db_functional_test.py b/scheduler/monitor_db_functional_test.py index 5ee0fee8..f0e1e103 100755 --- a/scheduler/monitor_db_functional_test.py +++ b/scheduler/monitor_db_functional_test.py @@ -1019,7 +1019,15 @@ class SchedulerFunctionalTest(unittest.TestCase, def test_pre_job_keyvals(self): - self.test_simple_job() + job = self._create_job(hosts=[1]) + job.run_verify = False + job.reboot_before = models.RebootBefore.NEVER + job.save() + models.JobKeyval.objects.create(job=job, key='mykey', value='myvalue') + + self._run_dispatcher() + self._finish_job(job.hostqueueentry_set.all()[0]) + attached_files = self.mock_drone_manager.attached_files( '1-my_user/host1') job_keyval_path = '1-my_user/host1/keyval' @@ -1028,6 +1036,7 @@ class SchedulerFunctionalTest(unittest.TestCase, keyval_dict = dict(line.strip().split('=', 1) for line in keyval_contents.splitlines()) self.assert_('job_queued' in keyval_dict, keyval_dict) + self.assertEquals(keyval_dict['mykey'], 'myvalue') if __name__ == '__main__': @@ -139,8 +139,42 @@ class db_sql(object): return self.cur.fetchall()[0][0] - def select(self, fields, table, where, wherein={}, - distinct = False, group_by = None, max_rows = None): + def _quote(self, field): + return '`%s`' % field + + + def _where_clause(self, where): + if not where: + return '', [] + + if isinstance(where, dict): + # key/value pairs (which should be equal, or None for null) + keys, values = [], [] + for field, value in where.iteritems(): + quoted_field = self._quote(field) + if value is None: + keys.append(quoted_field + ' is null') + else: + keys.append(quoted_field + '=%s') + values.append(value) + where_clause = ' and '.join(keys) + elif isinstance(where, basestring): + # the exact string + where_clause = where + values = [] + elif isinstance(where, tuple): + # preformatted where clause + values + where_clause, values = where + assert where_clause + else: + raise ValueError('Invalid "where" value: %r' % where) + + return ' WHERE ' + where_clause, values + + + + def select(self, fields, table, where, distinct=False, group_by=None, + max_rows=None): """\ This selects all the fields requested from a specific table with a particular where clause. @@ -162,31 +196,8 @@ class db_sql(object): cmd.append('distinct') cmd += [fields, 'from', table] - values = [] - if where and isinstance(where, types.DictionaryType): - # key/value pairs (which should be equal, or None for null) - keys, values = [], [] - for field, value in where.iteritems(): - if value is None: - keys.append(field + ' is null') - else: - keys.append(field + '=%s') - values.append(value) - cmd.append(' where ' + ' and '.join(keys)) - elif where and isinstance(where, types.StringTypes): - # the exact string - cmd.append(' where ' + where) - elif where and isinstance(where, types.TupleType): - # preformatted where clause + values - (sql, vals) = where - values = vals - cmd.append(' where (%s) ' % sql) - - # TODO: this assumes there's a where clause...bad - if wherein and isinstance(wherein, types.DictionaryType): - keys_in = ["%s in (%s) " % (field, ','.join(where)) - for field, where in wherein.iteritems()] - cmd.append(' and '+' and '.join(keys_in)) + where_clause, values = self._where_clause(where) + cmd.append(where_clause) if group_by: cmd.append(' GROUP BY ' + group_by) @@ -252,8 +263,9 @@ class db_sql(object): fields = data.keys() refs = ['%s' for field in fields] values = [data[field] for field in fields] - cmd = 'insert into %s (%s) values (%s)' % \ - (table, ','.join(fields), ','.join(refs)) + cmd = ('insert into %s (%s) values (%s)' % + (table, ','.join(self._quote(field) for field in fields), + ','.join(refs))) self.dprint('%s %s' % (cmd, values)) self._exec_sql_with_commit(cmd, values, commit) @@ -263,10 +275,8 @@ class db_sql(object): cmd = ['delete from', table] if commit is None: commit = self.autocommit - if where and isinstance(where, types.DictionaryType): - keys = [field + '=%s' for field in where.keys()] - values = [where[field] for field in where.keys()] - cmd += ['where', ' and '.join(keys)] + where_clause, values = self._where_clause(where) + cmd.append(where_clause) sql = ' '.join(cmd) self.dprint('%s %s' % (sql, values)) @@ -284,13 +294,12 @@ class db_sql(object): commit = self.autocommit cmd = 'update %s ' % table fields = data.keys() - data_refs = [field + '=%s' for field in fields] + data_refs = [self._quote(field) + '=%s' for field in fields] data_values = [data[field] for field in fields] cmd += ' set ' + ', '.join(data_refs) - where_keys = [field + '=%s' for field in where.keys()] - where_values = [where[field] for field in where.keys()] - cmd += ' where ' + ' and '.join(where_keys) + where_clause, where_values = self._where_clause(where) + cmd += where_clause values = data_values + where_values self.dprint('%s %s' % (cmd, values)) @@ -338,10 +347,23 @@ class db_sql(object): else: self.insert('tko_jobs', data, commit=commit) job.index = self.get_last_autonumber_value() + self.update_job_keyvals(job, commit=commit) for test in job.tests: self.insert_test(job, test, commit=commit) + def update_job_keyvals(self, job, commit=None): + for key, value in job.keyval_dict.iteritems(): + where = {'job_id': job.index, 'key': key} + data = dict(where, value=value) + exists = self.select('id', 'tko_job_keyvals', where=where) + + if exists: + self.update('tko_job_keyvals', data, where=where, commit=commit) + else: + self.insert('tko_job_keyvals', data, commit=commit) + + def insert_test(self, job, test, commit = None): kver = self.insert_kernel(test.kernel, commit=commit) data = {'job_idx':job.index, 'test':test.testname, diff --git a/tko/frontend.py b/tko/frontend.py index cbc328de..9033c20a 100644 --- a/tko/frontend.py +++ b/tko/frontend.py @@ -200,12 +200,12 @@ class kernel: class test: @classmethod - def select(klass, db, where = {}, wherein = {}, distinct = False): + def select(klass, db, where={}, distinct=False): fields = ['test_idx', 'job_idx', 'test', 'subdir', 'kernel_idx', 'status', 'reason', 'machine_idx'] tests = [] for row in db.select(','.join(fields), 'tko_tests', where, - wherein,distinct): + distinct): tests.append(klass(db, *row)) return tests diff --git a/tko/models.py b/tko/models.py index 0842e18a..bc70074b 100644 --- a/tko/models.py +++ b/tko/models.py @@ -7,7 +7,7 @@ from autotest_lib.tko import utils as tko_utils class job(object): def __init__(self, dir, user, label, machine, queued_time, started_time, finished_time, machine_owner, machine_group, aborted_by, - aborted_on): + aborted_on, keyval_dict): self.dir = dir self.tests = [] self.user = user @@ -20,6 +20,7 @@ class job(object): self.machine_group = machine_group self.aborted_by = aborted_by self.aborted_on = aborted_on + self.keyval_dict = keyval_dict @staticmethod diff --git a/tko/parsers/version_0.py b/tko/parsers/version_0.py index f1be7b48..10daf19c 100644 --- a/tko/parsers/version_0.py +++ b/tko/parsers/version_0.py @@ -36,7 +36,7 @@ class job(models.job): "queued_time": queued_time, "started_time": started_time, "finished_time": finished_time, "machine_owner": machine_owner, "machine_group": machine_group, "aborted_by": aborted_by, - "aborted_on": aborted_at} + "aborted_on": aborted_at, "keyval_dict": keyval} @classmethod |