summaryrefslogtreecommitdiff
path: root/scheduler
diff options
context:
space:
mode:
authorshoward <showard@592f7852-d20e-0410-864c-8624ca9c26a4>2009-05-11 19:26:02 +0000
committershoward <showard@592f7852-d20e-0410-864c-8624ca9c26a4>2009-05-11 19:26:02 +0000
commitebf29ddb26411000a751040a7b835905d0448a78 (patch)
tree26521ce85ef274cda7b71c572a48ebb864b601f6 /scheduler
parent2480be184bd082d63fc890d3f28a65b7d47de649 (diff)
Represent a group of machines with either the atomic group label name,
if a specific label was used, or the atomic group name in the results database when parsing. Adds an optional host_group_name= to the server side group job keyval file. The scheduler choses the most appropriate name for this and adds it to the group keyvals file. Changes the TKO results parser to use host_group_name= as the machine name instead of hostname= when hostname= is a comma separated list of hostnames rather than a single name. Also fixes atomic group scheduling to be able to use up to the atomic group's max_number_of_machines when launching the job; This is still unlikely to happen as the code still launches the job as soon as at least the sync count have exited Verify. Signed-off-by: Gregory Smith <gps@google.com> git-svn-id: svn://test.kernel.org/autotest/trunk@3103 592f7852-d20e-0410-864c-8624ca9c26a4
Diffstat (limited to 'scheduler')
-rwxr-xr-xscheduler/monitor_db.py122
-rw-r--r--scheduler/monitor_db_unittest.py92
2 files changed, 191 insertions, 23 deletions
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 43ecd366..678aec10 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -224,6 +224,18 @@ def enable_logging(logfile):
def _autoserv_command_line(machines, results_dir, extra_args, job=None,
queue_entry=None):
+ """
+ @returns The autoserv command line as a list of executable + parameters.
+
+ @param machines - string - A machine or comma separated list of machines
+ for the (-m) flag.
+ @param results_dir - string - Where the results will be written (-r).
+ @param extra_args - list - Additional arguments to pass to autoserv.
+ @param job - Job object - If supplied, -u owner and -l name parameters
+ will be added.
+ @param queue_entry - A HostQueueEntry object - If supplied and no Job
+ object was supplied, this will be used to lookup the Job object.
+ """
autoserv_argv = [_autoserv_path, '-p', '-m', machines,
'-r', _drone_manager.absolute_path(results_dir)]
if job or queue_entry:
@@ -1521,9 +1533,10 @@ class VerifyTask(PreJobTask):
class QueueTask(AgentTask, TaskWithJobKeyvals):
- def __init__(self, job, queue_entries, cmd):
+ def __init__(self, job, queue_entries, cmd, group_name=''):
self.job = job
self.queue_entries = queue_entries
+ self.group_name = group_name
super(QueueTask, self).__init__(cmd, self._execution_tag())
self._set_ids(queue_entries=queue_entries)
@@ -1560,7 +1573,10 @@ class QueueTask(AgentTask, TaskWithJobKeyvals):
def prolog(self):
queued_key, queued_time = self._job_queued_keyval(self.job)
- self._write_keyvals_before_job({queued_key : queued_time})
+ keyval_dict = {queued_key: queued_time}
+ if self.group_name:
+ keyval_dict['host_group_name'] = self.group_name
+ self._write_keyvals_before_job(keyval_dict)
for queue_entry in self.queue_entries:
self._write_host_keyvals(queue_entry.host)
queue_entry.set_status('Running')
@@ -2256,6 +2272,24 @@ class HostQueueEntry(DBObject):
return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
+ def get_labels(self):
+ """
+ Get all labels associated with this host queue entry (either via the
+ meta_host or as a job dependency label). The labels yielded are not
+ guaranteed to be unique.
+
+ @yields Label instances associated with this host_queue_entry.
+ """
+ if self.meta_host:
+ yield Label(id=self.meta_host, always_query=False)
+ labels = Label.fetch(
+ joins="JOIN jobs_dependency_labels AS deps "
+ "ON (labels.id = deps.label_id)",
+ where="deps.job_id = %d" % self.job.id)
+ for label in labels:
+ yield label
+
+
def set_host(self, host):
if host:
self.queue_log_record('Assigning host ' + host.hostname)
@@ -2576,17 +2610,26 @@ class Job(DBObject):
_drone_manager.write_lines_to_file(file_path, [hostname])
- def _next_group_name(self):
+ def _next_group_name(self, group_name=''):
+ """@returns a directory name to use for the next host group results."""
+ if group_name:
+ # Sanitize for use as a pathname.
+ group_name = group_name.replace(os.path.sep, '_')
+ if group_name.startswith('.'):
+ group_name = '_' + group_name[1:]
+ # Add a separator between the group name and 'group%d'.
+ group_name += '.'
+ group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
query = models.HostQueueEntry.objects.filter(
job=self.id).values('execution_subdir').distinct()
subdirs = (entry['execution_subdir'] for entry in query)
- groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
- ids = [int(match.group(1)) for match in groups if match]
+ group_matches = (group_count_re.match(subdir) for subdir in subdirs)
+ ids = [int(match.group(1)) for match in group_matches if match]
if ids:
next_id = max(ids) + 1
else:
next_id = 0
- return "group%d" % next_id
+ return '%sgroup%d' % (group_name, next_id)
def _write_control_file(self, execution_tag):
@@ -2647,31 +2690,64 @@ class Job(DBObject):
return tasks
- def _assign_new_group(self, queue_entries):
+ def _assign_new_group(self, queue_entries, group_name=''):
if len(queue_entries) == 1:
- group_name = queue_entries[0].get_host().hostname
+ group_subdir_name = queue_entries[0].get_host().hostname
else:
- group_name = self._next_group_name()
+ group_subdir_name = self._next_group_name(group_name)
logging.info('Running synchronous job %d hosts %s as %s',
self.id, [entry.host.hostname for entry in queue_entries],
- group_name)
+ group_subdir_name)
for queue_entry in queue_entries:
- queue_entry.set_execution_subdir(group_name)
+ queue_entry.set_execution_subdir(group_subdir_name)
def _choose_group_to_run(self, include_queue_entry):
+ """
+ @returns A tuple containing a list of HostQueueEntry instances to be
+ used to run this Job, a string group name to suggest giving
+ to this job a results database.
+ """
+ if include_queue_entry.atomic_group_id:
+ atomic_group = AtomicGroup(include_queue_entry.atomic_group_id,
+ always_query=False)
+ else:
+ atomic_group = None
+
chosen_entries = [include_queue_entry]
+ if atomic_group:
+ num_entries_wanted = atomic_group.max_number_of_machines
+ else:
+ num_entries_wanted = self.synch_count
+ num_entries_wanted -= len(chosen_entries)
- num_entries_needed = self.synch_count - 1
- if num_entries_needed > 0:
+ if num_entries_wanted > 0:
+ where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
pending_entries = HostQueueEntry.fetch(
- where='job_id = %s AND status = "Pending" AND id != %s',
- params=(self.id, include_queue_entry.id))
- chosen_entries += list(pending_entries)[:num_entries_needed]
+ where=where_clause,
+ params=(self.id, include_queue_entry.id))
+ # TODO(gps): sort these by hostname before slicing.
+ chosen_entries += list(pending_entries)[:num_entries_wanted]
+
+ # Sanity check. We'll only ever be called if this can be met.
+ assert len(chosen_entries) >= self.synch_count
+
+ if atomic_group:
+ # Look at any meta_host and dependency labels and pick the first
+ # one that also specifies this atomic group. Use that label name
+ # as the group name if possible (it is more specific).
+ group_name = atomic_group.name
+ for label in include_queue_entry.get_labels():
+ if label.atomic_group_id:
+ assert label.atomic_group_id == atomic_group.id
+ group_name = label.name
+ break
+ else:
+ group_name = ''
- self._assign_new_group(chosen_entries)
- return chosen_entries
+ self._assign_new_group(chosen_entries, group_name=group_name)
+ return chosen_entries, group_name
def run(self, queue_entry):
@@ -2679,17 +2755,17 @@ class Job(DBObject):
queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
return Agent(self._get_pre_job_tasks(queue_entry))
- queue_entries = self._choose_group_to_run(queue_entry)
- return self._finish_run(queue_entries)
+ queue_entries, group_name = self._choose_group_to_run(queue_entry)
+ return self._finish_run(queue_entries, group_name)
- def _finish_run(self, queue_entries, initial_tasks=[]):
+ def _finish_run(self, queue_entries, group_name):
for queue_entry in queue_entries:
queue_entry.set_status('Starting')
params = self._get_autoserv_params(queue_entries)
queue_task = QueueTask(job=self, queue_entries=queue_entries,
- cmd=params)
- tasks = initial_tasks + [queue_task]
+ cmd=params, group_name=group_name)
+ tasks = [queue_task]
entry_ids = [entry.id for entry in queue_entries]
return Agent(tasks, num_processes=len(queue_entries))
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index 15172078..f2176342 100644
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -1764,6 +1764,35 @@ class AgentTasksTest(unittest.TestCase):
self._test_cleanup_task_helper(False, True)
+class HostQueueEntryTest(BaseSchedulerTest):
+ def _create_hqe(self, dependency_labels=(), **create_job_kwargs):
+ job = self._create_job(**create_job_kwargs)
+ for label in dependency_labels:
+ job.dependency_labels.add(label)
+ hqes = list(monitor_db.HostQueueEntry.fetch(where='job_id=%d' % job.id))
+ self.assertEqual(1, len(hqes))
+ return hqes[0]
+
+ def _check_hqe_labels(self, hqe, expected_labels):
+ expected_labels = set(expected_labels)
+ label_names = set(label.name for label in hqe.get_labels())
+ self.assertEqual(expected_labels, label_names)
+
+ def test_get_labels_empty(self):
+ hqe = self._create_hqe(hosts=[1])
+ labels = list(hqe.get_labels())
+ self.assertEqual([], labels)
+
+ def test_get_labels_metahost(self):
+ hqe = self._create_hqe(metahosts=[2])
+ self._check_hqe_labels(hqe, ['label2'])
+
+ def test_get_labels_dependancies(self):
+ hqe = self._create_hqe(dependency_labels=(self.label3, self.label4),
+ metahosts=[1])
+ self._check_hqe_labels(hqe, ['label1', 'label3', 'label4'])
+
+
class JobTest(BaseSchedulerTest):
def setUp(self):
super(JobTest, self).setUp()
@@ -1774,6 +1803,7 @@ class JobTest(BaseSchedulerTest):
def _setup_directory_expects(self, execution_subdir):
+ # XXX(gps): um... this function does -nothing-
job_path = os.path.join('.', '1-my_user')
results_dir = os.path.join(job_path, execution_subdir)
@@ -1872,6 +1902,35 @@ class JobTest(BaseSchedulerTest):
self.assertEquals(hqe_ids, [1, 2])
+ def test_run_synchronous_atomic_group_ready(self):
+ self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
+ self._update_hqe("status='Pending', execution_subdir=''")
+
+ tasks = self._test_run_helper(expect_starting=True)
+ self.assertEquals(len(tasks), 1)
+ queue_task = tasks[0]
+
+ self.assert_(isinstance(queue_task, monitor_db.QueueTask))
+ # Atomic group jobs that do not a specific label in the atomic group
+ # will use the atomic group name as their group name.
+ self.assertEquals(queue_task.group_name, 'atomic1')
+
+
+ def test_run_synchronous_atomic_group_with_label_ready(self):
+ job = self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
+ job.dependency_labels.add(self.label4)
+ self._update_hqe("status='Pending', execution_subdir=''")
+
+ tasks = self._test_run_helper(expect_starting=True)
+ self.assertEquals(len(tasks), 1)
+ queue_task = tasks[0]
+
+ self.assert_(isinstance(queue_task, monitor_db.QueueTask))
+ # Atomic group jobs that also specify a label in the atomic group
+ # will use the label name as their group name.
+ self.assertEquals(queue_task.group_name, 'label4')
+
+
def test_reboot_before_always(self):
job = self._create_job(hosts=[1])
job.reboot_before = models.RebootBefore.ALWAYS
@@ -1906,6 +1965,39 @@ class JobTest(BaseSchedulerTest):
self._test_reboot_before_if_dirty_helper(False)
+ def test_next_group_name(self):
+ django_job = self._create_job(metahosts=[1])
+ job = monitor_db.Job(id=django_job.id)
+ self.assertEqual('group0', job._next_group_name())
+
+ for hqe in django_job.hostqueueentry_set.filter():
+ hqe.execution_subdir = 'my_rack.group0'
+ hqe.save()
+ self.assertEqual('my_rack.group1', job._next_group_name('my/rack'))
+
+
+class TopLevelFunctionsTest(unittest.TestCase):
+ def test_autoserv_command_line(self):
+ machines = 'abcd12,efgh34'
+ results_dir = '/fake/path'
+ extra_args = ['-Z', 'hello']
+ expected_command_line = [monitor_db._autoserv_path, '-p',
+ '-m', machines, '-r', results_dir]
+
+ command_line = monitor_db._autoserv_command_line(
+ machines, results_dir, extra_args)
+ self.assertEqual(expected_command_line + extra_args, command_line)
+
+ class FakeJob(object):
+ owner = 'Bob'
+ name = 'fake job name'
+
+ command_line = monitor_db._autoserv_command_line(
+ machines, results_dir, extra_args=[], job=FakeJob())
+ self.assertEqual(expected_command_line +
+ ['-u', FakeJob.owner, '-l', FakeJob.name],
+ command_line)
+
if __name__ == '__main__':
unittest.main()