summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorshoward <showard@592f7852-d20e-0410-864c-8624ca9c26a4>2010-01-15 00:22:22 +0000
committershoward <showard@592f7852-d20e-0410-864c-8624ca9c26a4>2010-01-15 00:22:22 +0000
commitf8d915d2afac89985a6cd9213bf96a0e95a89a00 (patch)
treea29d17171ff34e285247d02e70ec78c3f6df977e
parent1590c1a328831f8bf7ef67ec7febf81be8bba58a (diff)
Support for job keyvals
* can be passed as an argument to create_job, stored in AFE DB * scheduler reads them from the AFE DB and writes them to the job-level keyval file before the job starts * parser reads them from the keyval file and writes them to the TKO DB in a new table Since the field name "key" happens to be a MySQL keyword, I went ahead and made db.py support proper quoting of field names. Evetually it'd be really nice to deprecate db.py and use Django models exclusively, but that is a far-off dream. Still lacking support in the AFE and TKO web clients and CLIs, at least the TKO part will be coming soon Signed-off-by: Steve Howard <showard@google.com> git-svn-id: svn://test.kernel.org/autotest/trunk@4123 592f7852-d20e-0410-864c-8624ca9c26a4
-rw-r--r--frontend/afe/models.py22
-rw-r--r--frontend/afe/rpc_interface.py10
-rwxr-xr-xfrontend/afe/rpc_interface_unittest.py12
-rw-r--r--frontend/migrations/047_job_keyvals.py28
-rw-r--r--frontend/tko/models.py10
-rwxr-xr-xscheduler/monitor_db.py14
-rwxr-xr-xscheduler/monitor_db_functional_test.py11
-rw-r--r--tko/db.py96
-rw-r--r--tko/frontend.py4
-rw-r--r--tko/models.py3
-rw-r--r--tko/parsers/version_0.py2
11 files changed, 165 insertions, 47 deletions
diff --git a/frontend/afe/models.py b/frontend/afe/models.py
index 07f1b407..41c39c01 100644
--- a/frontend/afe/models.py
+++ b/frontend/afe/models.py
@@ -709,6 +709,11 @@ class Job(dbmodels.Model, model_logic.ModelExtensions):
created_on=datetime.now())
job.dependency_labels = options['dependencies']
+
+ if options['keyvals'] is not None:
+ for key, value in options['keyvals'].iteritems():
+ JobKeyval.objects.create(job=job, key=key, value=value)
+
return job
@@ -755,6 +760,11 @@ class Job(dbmodels.Model, model_logic.ModelExtensions):
return '%s-%s' % (self.id, self.owner)
+ def keyval_dict(self):
+ return dict((keyval.key, keyval.value)
+ for keyval in self.jobkeyval_set.all())
+
+
class Meta:
db_table = 'afe_jobs'
@@ -762,6 +772,18 @@ class Job(dbmodels.Model, model_logic.ModelExtensions):
return u'%s (%s-%s)' % (self.name, self.id, self.owner)
+class JobKeyval(dbmodels.Model, model_logic.ModelExtensions):
+ """Keyvals associated with jobs"""
+ job = dbmodels.ForeignKey(Job)
+ key = dbmodels.CharField(max_length=90)
+ value = dbmodels.CharField(max_length=300)
+
+ objects = model_logic.ExtendedManager()
+
+ class Meta:
+ db_table = 'afe_job_keyvals'
+
+
class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions):
job = dbmodels.ForeignKey(Job)
host = dbmodels.ForeignKey(Host)
diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py
index dd366023..fe56cab9 100644
--- a/frontend/afe/rpc_interface.py
+++ b/frontend/afe/rpc_interface.py
@@ -402,7 +402,8 @@ def create_job(name, priority, control_file, control_type,
atomic_group_name=None, synch_count=None, is_template=False,
timeout=None, max_runtime_hrs=None, run_verify=True,
email_list='', dependencies=(), reboot_before=None,
- reboot_after=None, parse_failed_repair=None, hostless=False):
+ reboot_after=None, parse_failed_repair=None, hostless=False,
+ keyvals=None):
"""\
Create and enqueue a job.
@@ -424,6 +425,7 @@ def create_job(name, priority, control_file, control_type,
@param parse_failed_repair if true, results of failed repairs launched by
this job will be parsed as part of the job.
@param hostless if true, create a hostless job
+ @param keyvals dict of keyvals to associate with the job
@param hosts List of hosts to run job on.
@param meta_hosts List where each entry is a label name, and for each entry
@@ -531,7 +533,8 @@ def create_job(name, priority, control_file, control_type,
dependencies=dependencies,
reboot_before=reboot_before,
reboot_after=reboot_after,
- parse_failed_repair=parse_failed_repair)
+ parse_failed_repair=parse_failed_repair,
+ keyvals=keyvals)
return rpc_utils.create_new_job(owner=owner,
options=options,
host_objects=host_objects,
@@ -584,10 +587,13 @@ def get_jobs(not_yet_run=False, running=False, finished=False, **filter_data):
jobs = list(models.Job.query_objects(filter_data))
models.Job.objects.populate_relationships(jobs, models.Label,
'dependencies')
+ models.Job.objects.populate_relationships(jobs, models.JobKeyval, 'keyvals')
for job in jobs:
job_dict = job.get_object_dict()
job_dict['dependencies'] = ','.join(label.name
for label in job.dependencies)
+ job_dict['keyvals'] = dict((keyval.key, keyval.value)
+ for keyval in job.keyvals)
job_dicts.append(job_dict)
return rpc_utils.prepare_for_serialization(job_dicts)
diff --git a/frontend/afe/rpc_interface_unittest.py b/frontend/afe/rpc_interface_unittest.py
index 822ec1fb..c84dacfa 100755
--- a/frontend/afe/rpc_interface_unittest.py
+++ b/frontend/afe/rpc_interface_unittest.py
@@ -93,6 +93,18 @@ class RpcInterfaceTest(unittest.TestCase,
self._check_hostnames(hosts, ['host2'])
+ def test_job_keyvals(self):
+ keyval_dict = {'mykey': 'myvalue'}
+ job_id = rpc_interface.create_job(name='test', priority='Medium',
+ control_file='foo',
+ control_type='Client',
+ hosts=['host1'],
+ keyvals=keyval_dict)
+ jobs = rpc_interface.get_jobs(id=job_id)
+ self.assertEquals(len(jobs), 1)
+ self.assertEquals(jobs[0]['keyvals'], keyval_dict)
+
+
def test_get_jobs_summary(self):
job = self._create_job(hosts=xrange(1, 4))
entries = list(job.hostqueueentry_set.all())
diff --git a/frontend/migrations/047_job_keyvals.py b/frontend/migrations/047_job_keyvals.py
new file mode 100644
index 00000000..d587fd84
--- /dev/null
+++ b/frontend/migrations/047_job_keyvals.py
@@ -0,0 +1,28 @@
+UP_SQL = """
+CREATE TABLE `afe_job_keyvals` (
+ `id` integer AUTO_INCREMENT NOT NULL PRIMARY KEY,
+ `job_id` integer NOT NULL,
+ INDEX `afe_job_keyvals_job_id` (`job_id`),
+ FOREIGN KEY (`job_id`) REFERENCES `afe_jobs` (`id`) ON DELETE NO ACTION,
+ `key` varchar(90) NOT NULL,
+ INDEX `afe_job_keyvals_key` (`key`),
+ `value` varchar(300) NOT NULL
+) ENGINE=InnoDB;
+
+CREATE TABLE `tko_job_keyvals` (
+ `id` integer AUTO_INCREMENT NOT NULL PRIMARY KEY,
+ `job_id` int(10) unsigned NOT NULL,
+ INDEX `tko_job_keyvals_job_id` (`job_id`),
+ FOREIGN KEY (`job_id`) REFERENCES `tko_jobs` (`job_idx`)
+ ON DELETE NO ACTION,
+ `key` varchar(90) NOT NULL,
+ INDEX `tko_job_keyvals_key` (`key`),
+ `value` varchar(300) NOT NULL
+) ENGINE=InnoDB;
+"""
+
+
+DOWN_SQL = """
+DROP TABLE afe_job_keyvals;
+DROP TABLE tko_job_keyvals;
+"""
diff --git a/frontend/tko/models.py b/frontend/tko/models.py
index 61b24af5..429473d3 100644
--- a/frontend/tko/models.py
+++ b/frontend/tko/models.py
@@ -155,6 +155,16 @@ class Job(dbmodels.Model):
db_table = 'tko_jobs'
+class JobKeyval(dbmodels.Model):
+ job = dbmodels.ForeignKey(Job)
+ key = dbmodels.CharField(max_length=90)
+ value = dbmodels.CharField(blank=True, max_length=300)
+
+
+ class Meta:
+ db_table = 'tko_job_keyvals'
+
+
class Test(dbmodels.Model, model_logic.ModelExtensions,
model_logic.ModelWithAttributes):
test_idx = dbmodels.AutoField(primary_key=True)
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index f4cb8047..41647325 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -2097,7 +2097,8 @@ class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
def prolog(self):
queued_key, queued_time = self._job_queued_keyval(self.job)
- keyval_dict = {queued_key: queued_time}
+ keyval_dict = self.job.keyval_dict()
+ keyval_dict[queued_key] = queued_time
group_name = self.queue_entries[0].get_group_name()
if group_name:
keyval_dict['host_group_name'] = group_name
@@ -3214,6 +3215,10 @@ class Job(DBObject):
self._owner_model = None # caches model instance of owner
+ def model(self):
+ return models.Job.objects.get(id=self.id)
+
+
def owner_model(self):
# work around the fact that the Job owner field is a string, not a
# foreign key
@@ -3250,6 +3255,10 @@ class Job(DBObject):
queue_entry.set_status(status)
+ def keyval_dict(self):
+ return self.model().keyval_dict()
+
+
def _atomic_and_has_started(self):
"""
@returns True if any of the HostQueueEntries associated with this job
@@ -3579,8 +3588,7 @@ class Job(DBObject):
def request_abort(self):
"""Request that this Job be aborted on the next scheduler cycle."""
- self_model = models.Job.objects.get(id=self.id)
- self_model.abort()
+ self.model().abort()
def schedule_delayed_callback_task(self, queue_entry):
diff --git a/scheduler/monitor_db_functional_test.py b/scheduler/monitor_db_functional_test.py
index 5ee0fee8..f0e1e103 100755
--- a/scheduler/monitor_db_functional_test.py
+++ b/scheduler/monitor_db_functional_test.py
@@ -1019,7 +1019,15 @@ class SchedulerFunctionalTest(unittest.TestCase,
def test_pre_job_keyvals(self):
- self.test_simple_job()
+ job = self._create_job(hosts=[1])
+ job.run_verify = False
+ job.reboot_before = models.RebootBefore.NEVER
+ job.save()
+ models.JobKeyval.objects.create(job=job, key='mykey', value='myvalue')
+
+ self._run_dispatcher()
+ self._finish_job(job.hostqueueentry_set.all()[0])
+
attached_files = self.mock_drone_manager.attached_files(
'1-my_user/host1')
job_keyval_path = '1-my_user/host1/keyval'
@@ -1028,6 +1036,7 @@ class SchedulerFunctionalTest(unittest.TestCase,
keyval_dict = dict(line.strip().split('=', 1)
for line in keyval_contents.splitlines())
self.assert_('job_queued' in keyval_dict, keyval_dict)
+ self.assertEquals(keyval_dict['mykey'], 'myvalue')
if __name__ == '__main__':
diff --git a/tko/db.py b/tko/db.py
index c170fe9a..9b9f456d 100644
--- a/tko/db.py
+++ b/tko/db.py
@@ -139,8 +139,42 @@ class db_sql(object):
return self.cur.fetchall()[0][0]
- def select(self, fields, table, where, wherein={},
- distinct = False, group_by = None, max_rows = None):
+ def _quote(self, field):
+ return '`%s`' % field
+
+
+ def _where_clause(self, where):
+ if not where:
+ return '', []
+
+ if isinstance(where, dict):
+ # key/value pairs (which should be equal, or None for null)
+ keys, values = [], []
+ for field, value in where.iteritems():
+ quoted_field = self._quote(field)
+ if value is None:
+ keys.append(quoted_field + ' is null')
+ else:
+ keys.append(quoted_field + '=%s')
+ values.append(value)
+ where_clause = ' and '.join(keys)
+ elif isinstance(where, basestring):
+ # the exact string
+ where_clause = where
+ values = []
+ elif isinstance(where, tuple):
+ # preformatted where clause + values
+ where_clause, values = where
+ assert where_clause
+ else:
+ raise ValueError('Invalid "where" value: %r' % where)
+
+ return ' WHERE ' + where_clause, values
+
+
+
+ def select(self, fields, table, where, distinct=False, group_by=None,
+ max_rows=None):
"""\
This selects all the fields requested from a
specific table with a particular where clause.
@@ -162,31 +196,8 @@ class db_sql(object):
cmd.append('distinct')
cmd += [fields, 'from', table]
- values = []
- if where and isinstance(where, types.DictionaryType):
- # key/value pairs (which should be equal, or None for null)
- keys, values = [], []
- for field, value in where.iteritems():
- if value is None:
- keys.append(field + ' is null')
- else:
- keys.append(field + '=%s')
- values.append(value)
- cmd.append(' where ' + ' and '.join(keys))
- elif where and isinstance(where, types.StringTypes):
- # the exact string
- cmd.append(' where ' + where)
- elif where and isinstance(where, types.TupleType):
- # preformatted where clause + values
- (sql, vals) = where
- values = vals
- cmd.append(' where (%s) ' % sql)
-
- # TODO: this assumes there's a where clause...bad
- if wherein and isinstance(wherein, types.DictionaryType):
- keys_in = ["%s in (%s) " % (field, ','.join(where))
- for field, where in wherein.iteritems()]
- cmd.append(' and '+' and '.join(keys_in))
+ where_clause, values = self._where_clause(where)
+ cmd.append(where_clause)
if group_by:
cmd.append(' GROUP BY ' + group_by)
@@ -252,8 +263,9 @@ class db_sql(object):
fields = data.keys()
refs = ['%s' for field in fields]
values = [data[field] for field in fields]
- cmd = 'insert into %s (%s) values (%s)' % \
- (table, ','.join(fields), ','.join(refs))
+ cmd = ('insert into %s (%s) values (%s)' %
+ (table, ','.join(self._quote(field) for field in fields),
+ ','.join(refs)))
self.dprint('%s %s' % (cmd, values))
self._exec_sql_with_commit(cmd, values, commit)
@@ -263,10 +275,8 @@ class db_sql(object):
cmd = ['delete from', table]
if commit is None:
commit = self.autocommit
- if where and isinstance(where, types.DictionaryType):
- keys = [field + '=%s' for field in where.keys()]
- values = [where[field] for field in where.keys()]
- cmd += ['where', ' and '.join(keys)]
+ where_clause, values = self._where_clause(where)
+ cmd.append(where_clause)
sql = ' '.join(cmd)
self.dprint('%s %s' % (sql, values))
@@ -284,13 +294,12 @@ class db_sql(object):
commit = self.autocommit
cmd = 'update %s ' % table
fields = data.keys()
- data_refs = [field + '=%s' for field in fields]
+ data_refs = [self._quote(field) + '=%s' for field in fields]
data_values = [data[field] for field in fields]
cmd += ' set ' + ', '.join(data_refs)
- where_keys = [field + '=%s' for field in where.keys()]
- where_values = [where[field] for field in where.keys()]
- cmd += ' where ' + ' and '.join(where_keys)
+ where_clause, where_values = self._where_clause(where)
+ cmd += where_clause
values = data_values + where_values
self.dprint('%s %s' % (cmd, values))
@@ -338,10 +347,23 @@ class db_sql(object):
else:
self.insert('tko_jobs', data, commit=commit)
job.index = self.get_last_autonumber_value()
+ self.update_job_keyvals(job, commit=commit)
for test in job.tests:
self.insert_test(job, test, commit=commit)
+ def update_job_keyvals(self, job, commit=None):
+ for key, value in job.keyval_dict.iteritems():
+ where = {'job_id': job.index, 'key': key}
+ data = dict(where, value=value)
+ exists = self.select('id', 'tko_job_keyvals', where=where)
+
+ if exists:
+ self.update('tko_job_keyvals', data, where=where, commit=commit)
+ else:
+ self.insert('tko_job_keyvals', data, commit=commit)
+
+
def insert_test(self, job, test, commit = None):
kver = self.insert_kernel(test.kernel, commit=commit)
data = {'job_idx':job.index, 'test':test.testname,
diff --git a/tko/frontend.py b/tko/frontend.py
index cbc328de..9033c20a 100644
--- a/tko/frontend.py
+++ b/tko/frontend.py
@@ -200,12 +200,12 @@ class kernel:
class test:
@classmethod
- def select(klass, db, where = {}, wherein = {}, distinct = False):
+ def select(klass, db, where={}, distinct=False):
fields = ['test_idx', 'job_idx', 'test', 'subdir',
'kernel_idx', 'status', 'reason', 'machine_idx']
tests = []
for row in db.select(','.join(fields), 'tko_tests', where,
- wherein,distinct):
+ distinct):
tests.append(klass(db, *row))
return tests
diff --git a/tko/models.py b/tko/models.py
index 0842e18a..bc70074b 100644
--- a/tko/models.py
+++ b/tko/models.py
@@ -7,7 +7,7 @@ from autotest_lib.tko import utils as tko_utils
class job(object):
def __init__(self, dir, user, label, machine, queued_time, started_time,
finished_time, machine_owner, machine_group, aborted_by,
- aborted_on):
+ aborted_on, keyval_dict):
self.dir = dir
self.tests = []
self.user = user
@@ -20,6 +20,7 @@ class job(object):
self.machine_group = machine_group
self.aborted_by = aborted_by
self.aborted_on = aborted_on
+ self.keyval_dict = keyval_dict
@staticmethod
diff --git a/tko/parsers/version_0.py b/tko/parsers/version_0.py
index f1be7b48..10daf19c 100644
--- a/tko/parsers/version_0.py
+++ b/tko/parsers/version_0.py
@@ -36,7 +36,7 @@ class job(models.job):
"queued_time": queued_time, "started_time": started_time,
"finished_time": finished_time, "machine_owner": machine_owner,
"machine_group": machine_group, "aborted_by": aborted_by,
- "aborted_on": aborted_at}
+ "aborted_on": aborted_at, "keyval_dict": keyval}
@classmethod