diff options
Diffstat (limited to 'scheduler')
-rwxr-xr-x | scheduler/monitor_db.py | 122 | ||||
-rw-r--r-- | scheduler/monitor_db_unittest.py | 92 |
2 files changed, 191 insertions, 23 deletions
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py index 43ecd366..678aec10 100755 --- a/scheduler/monitor_db.py +++ b/scheduler/monitor_db.py @@ -224,6 +224,18 @@ def enable_logging(logfile): def _autoserv_command_line(machines, results_dir, extra_args, job=None, queue_entry=None): + """ + @returns The autoserv command line as a list of executable + parameters. + + @param machines - string - A machine or comma separated list of machines + for the (-m) flag. + @param results_dir - string - Where the results will be written (-r). + @param extra_args - list - Additional arguments to pass to autoserv. + @param job - Job object - If supplied, -u owner and -l name parameters + will be added. + @param queue_entry - A HostQueueEntry object - If supplied and no Job + object was supplied, this will be used to lookup the Job object. + """ autoserv_argv = [_autoserv_path, '-p', '-m', machines, '-r', _drone_manager.absolute_path(results_dir)] if job or queue_entry: @@ -1521,9 +1533,10 @@ class VerifyTask(PreJobTask): class QueueTask(AgentTask, TaskWithJobKeyvals): - def __init__(self, job, queue_entries, cmd): + def __init__(self, job, queue_entries, cmd, group_name=''): self.job = job self.queue_entries = queue_entries + self.group_name = group_name super(QueueTask, self).__init__(cmd, self._execution_tag()) self._set_ids(queue_entries=queue_entries) @@ -1560,7 +1573,10 @@ class QueueTask(AgentTask, TaskWithJobKeyvals): def prolog(self): queued_key, queued_time = self._job_queued_keyval(self.job) - self._write_keyvals_before_job({queued_key : queued_time}) + keyval_dict = {queued_key: queued_time} + if self.group_name: + keyval_dict['host_group_name'] = self.group_name + self._write_keyvals_before_job(keyval_dict) for queue_entry in self.queue_entries: self._write_host_keyvals(queue_entry.host) queue_entry.set_status('Running') @@ -2256,6 +2272,24 @@ class HostQueueEntry(DBObject): return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id) + def get_labels(self): + """ + Get all labels associated with this host queue entry (either via the + meta_host or as a job dependency label). The labels yielded are not + guaranteed to be unique. + + @yields Label instances associated with this host_queue_entry. + """ + if self.meta_host: + yield Label(id=self.meta_host, always_query=False) + labels = Label.fetch( + joins="JOIN jobs_dependency_labels AS deps " + "ON (labels.id = deps.label_id)", + where="deps.job_id = %d" % self.job.id) + for label in labels: + yield label + + def set_host(self, host): if host: self.queue_log_record('Assigning host ' + host.hostname) @@ -2576,17 +2610,26 @@ class Job(DBObject): _drone_manager.write_lines_to_file(file_path, [hostname]) - def _next_group_name(self): + def _next_group_name(self, group_name=''): + """@returns a directory name to use for the next host group results.""" + if group_name: + # Sanitize for use as a pathname. + group_name = group_name.replace(os.path.sep, '_') + if group_name.startswith('.'): + group_name = '_' + group_name[1:] + # Add a separator between the group name and 'group%d'. + group_name += '.' + group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name)) query = models.HostQueueEntry.objects.filter( job=self.id).values('execution_subdir').distinct() subdirs = (entry['execution_subdir'] for entry in query) - groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs) - ids = [int(match.group(1)) for match in groups if match] + group_matches = (group_count_re.match(subdir) for subdir in subdirs) + ids = [int(match.group(1)) for match in group_matches if match] if ids: next_id = max(ids) + 1 else: next_id = 0 - return "group%d" % next_id + return '%sgroup%d' % (group_name, next_id) def _write_control_file(self, execution_tag): @@ -2647,31 +2690,64 @@ class Job(DBObject): return tasks - def _assign_new_group(self, queue_entries): + def _assign_new_group(self, queue_entries, group_name=''): if len(queue_entries) == 1: - group_name = queue_entries[0].get_host().hostname + group_subdir_name = queue_entries[0].get_host().hostname else: - group_name = self._next_group_name() + group_subdir_name = self._next_group_name(group_name) logging.info('Running synchronous job %d hosts %s as %s', self.id, [entry.host.hostname for entry in queue_entries], - group_name) + group_subdir_name) for queue_entry in queue_entries: - queue_entry.set_execution_subdir(group_name) + queue_entry.set_execution_subdir(group_subdir_name) def _choose_group_to_run(self, include_queue_entry): + """ + @returns A tuple containing a list of HostQueueEntry instances to be + used to run this Job, a string group name to suggest giving + to this job a results database. + """ + if include_queue_entry.atomic_group_id: + atomic_group = AtomicGroup(include_queue_entry.atomic_group_id, + always_query=False) + else: + atomic_group = None + chosen_entries = [include_queue_entry] + if atomic_group: + num_entries_wanted = atomic_group.max_number_of_machines + else: + num_entries_wanted = self.synch_count + num_entries_wanted -= len(chosen_entries) - num_entries_needed = self.synch_count - 1 - if num_entries_needed > 0: + if num_entries_wanted > 0: + where_clause = 'job_id = %s AND status = "Pending" AND id != %s' pending_entries = HostQueueEntry.fetch( - where='job_id = %s AND status = "Pending" AND id != %s', - params=(self.id, include_queue_entry.id)) - chosen_entries += list(pending_entries)[:num_entries_needed] + where=where_clause, + params=(self.id, include_queue_entry.id)) + # TODO(gps): sort these by hostname before slicing. + chosen_entries += list(pending_entries)[:num_entries_wanted] + + # Sanity check. We'll only ever be called if this can be met. + assert len(chosen_entries) >= self.synch_count + + if atomic_group: + # Look at any meta_host and dependency labels and pick the first + # one that also specifies this atomic group. Use that label name + # as the group name if possible (it is more specific). + group_name = atomic_group.name + for label in include_queue_entry.get_labels(): + if label.atomic_group_id: + assert label.atomic_group_id == atomic_group.id + group_name = label.name + break + else: + group_name = '' - self._assign_new_group(chosen_entries) - return chosen_entries + self._assign_new_group(chosen_entries, group_name=group_name) + return chosen_entries, group_name def run(self, queue_entry): @@ -2679,17 +2755,17 @@ class Job(DBObject): queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING) return Agent(self._get_pre_job_tasks(queue_entry)) - queue_entries = self._choose_group_to_run(queue_entry) - return self._finish_run(queue_entries) + queue_entries, group_name = self._choose_group_to_run(queue_entry) + return self._finish_run(queue_entries, group_name) - def _finish_run(self, queue_entries, initial_tasks=[]): + def _finish_run(self, queue_entries, group_name): for queue_entry in queue_entries: queue_entry.set_status('Starting') params = self._get_autoserv_params(queue_entries) queue_task = QueueTask(job=self, queue_entries=queue_entries, - cmd=params) - tasks = initial_tasks + [queue_task] + cmd=params, group_name=group_name) + tasks = [queue_task] entry_ids = [entry.id for entry in queue_entries] return Agent(tasks, num_processes=len(queue_entries)) diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py index 15172078..f2176342 100644 --- a/scheduler/monitor_db_unittest.py +++ b/scheduler/monitor_db_unittest.py @@ -1764,6 +1764,35 @@ class AgentTasksTest(unittest.TestCase): self._test_cleanup_task_helper(False, True) +class HostQueueEntryTest(BaseSchedulerTest): + def _create_hqe(self, dependency_labels=(), **create_job_kwargs): + job = self._create_job(**create_job_kwargs) + for label in dependency_labels: + job.dependency_labels.add(label) + hqes = list(monitor_db.HostQueueEntry.fetch(where='job_id=%d' % job.id)) + self.assertEqual(1, len(hqes)) + return hqes[0] + + def _check_hqe_labels(self, hqe, expected_labels): + expected_labels = set(expected_labels) + label_names = set(label.name for label in hqe.get_labels()) + self.assertEqual(expected_labels, label_names) + + def test_get_labels_empty(self): + hqe = self._create_hqe(hosts=[1]) + labels = list(hqe.get_labels()) + self.assertEqual([], labels) + + def test_get_labels_metahost(self): + hqe = self._create_hqe(metahosts=[2]) + self._check_hqe_labels(hqe, ['label2']) + + def test_get_labels_dependancies(self): + hqe = self._create_hqe(dependency_labels=(self.label3, self.label4), + metahosts=[1]) + self._check_hqe_labels(hqe, ['label1', 'label3', 'label4']) + + class JobTest(BaseSchedulerTest): def setUp(self): super(JobTest, self).setUp() @@ -1774,6 +1803,7 @@ class JobTest(BaseSchedulerTest): def _setup_directory_expects(self, execution_subdir): + # XXX(gps): um... this function does -nothing- job_path = os.path.join('.', '1-my_user') results_dir = os.path.join(job_path, execution_subdir) @@ -1872,6 +1902,35 @@ class JobTest(BaseSchedulerTest): self.assertEquals(hqe_ids, [1, 2]) + def test_run_synchronous_atomic_group_ready(self): + self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True) + self._update_hqe("status='Pending', execution_subdir=''") + + tasks = self._test_run_helper(expect_starting=True) + self.assertEquals(len(tasks), 1) + queue_task = tasks[0] + + self.assert_(isinstance(queue_task, monitor_db.QueueTask)) + # Atomic group jobs that do not a specific label in the atomic group + # will use the atomic group name as their group name. + self.assertEquals(queue_task.group_name, 'atomic1') + + + def test_run_synchronous_atomic_group_with_label_ready(self): + job = self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True) + job.dependency_labels.add(self.label4) + self._update_hqe("status='Pending', execution_subdir=''") + + tasks = self._test_run_helper(expect_starting=True) + self.assertEquals(len(tasks), 1) + queue_task = tasks[0] + + self.assert_(isinstance(queue_task, monitor_db.QueueTask)) + # Atomic group jobs that also specify a label in the atomic group + # will use the label name as their group name. + self.assertEquals(queue_task.group_name, 'label4') + + def test_reboot_before_always(self): job = self._create_job(hosts=[1]) job.reboot_before = models.RebootBefore.ALWAYS @@ -1906,6 +1965,39 @@ class JobTest(BaseSchedulerTest): self._test_reboot_before_if_dirty_helper(False) + def test_next_group_name(self): + django_job = self._create_job(metahosts=[1]) + job = monitor_db.Job(id=django_job.id) + self.assertEqual('group0', job._next_group_name()) + + for hqe in django_job.hostqueueentry_set.filter(): + hqe.execution_subdir = 'my_rack.group0' + hqe.save() + self.assertEqual('my_rack.group1', job._next_group_name('my/rack')) + + +class TopLevelFunctionsTest(unittest.TestCase): + def test_autoserv_command_line(self): + machines = 'abcd12,efgh34' + results_dir = '/fake/path' + extra_args = ['-Z', 'hello'] + expected_command_line = [monitor_db._autoserv_path, '-p', + '-m', machines, '-r', results_dir] + + command_line = monitor_db._autoserv_command_line( + machines, results_dir, extra_args) + self.assertEqual(expected_command_line + extra_args, command_line) + + class FakeJob(object): + owner = 'Bob' + name = 'fake job name' + + command_line = monitor_db._autoserv_command_line( + machines, results_dir, extra_args=[], job=FakeJob()) + self.assertEqual(expected_command_line + + ['-u', FakeJob.owner, '-l', FakeJob.name], + command_line) + if __name__ == '__main__': unittest.main() |