summaryrefslogtreecommitdiff
path: root/scheduler
diff options
context:
space:
mode:
Diffstat (limited to 'scheduler')
-rwxr-xr-xscheduler/monitor_db.py122
-rw-r--r--scheduler/monitor_db_unittest.py92
2 files changed, 191 insertions, 23 deletions
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 43ecd366..678aec10 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -224,6 +224,18 @@ def enable_logging(logfile):
def _autoserv_command_line(machines, results_dir, extra_args, job=None,
queue_entry=None):
+ """
+ @returns The autoserv command line as a list of executable + parameters.
+
+ @param machines - string - A machine or comma separated list of machines
+ for the (-m) flag.
+ @param results_dir - string - Where the results will be written (-r).
+ @param extra_args - list - Additional arguments to pass to autoserv.
+ @param job - Job object - If supplied, -u owner and -l name parameters
+ will be added.
+ @param queue_entry - A HostQueueEntry object - If supplied and no Job
+ object was supplied, this will be used to lookup the Job object.
+ """
autoserv_argv = [_autoserv_path, '-p', '-m', machines,
'-r', _drone_manager.absolute_path(results_dir)]
if job or queue_entry:
@@ -1521,9 +1533,10 @@ class VerifyTask(PreJobTask):
class QueueTask(AgentTask, TaskWithJobKeyvals):
- def __init__(self, job, queue_entries, cmd):
+ def __init__(self, job, queue_entries, cmd, group_name=''):
self.job = job
self.queue_entries = queue_entries
+ self.group_name = group_name
super(QueueTask, self).__init__(cmd, self._execution_tag())
self._set_ids(queue_entries=queue_entries)
@@ -1560,7 +1573,10 @@ class QueueTask(AgentTask, TaskWithJobKeyvals):
def prolog(self):
queued_key, queued_time = self._job_queued_keyval(self.job)
- self._write_keyvals_before_job({queued_key : queued_time})
+ keyval_dict = {queued_key: queued_time}
+ if self.group_name:
+ keyval_dict['host_group_name'] = self.group_name
+ self._write_keyvals_before_job(keyval_dict)
for queue_entry in self.queue_entries:
self._write_host_keyvals(queue_entry.host)
queue_entry.set_status('Running')
@@ -2256,6 +2272,24 @@ class HostQueueEntry(DBObject):
return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
+ def get_labels(self):
+ """
+ Get all labels associated with this host queue entry (either via the
+ meta_host or as a job dependency label). The labels yielded are not
+ guaranteed to be unique.
+
+ @yields Label instances associated with this host_queue_entry.
+ """
+ if self.meta_host:
+ yield Label(id=self.meta_host, always_query=False)
+ labels = Label.fetch(
+ joins="JOIN jobs_dependency_labels AS deps "
+ "ON (labels.id = deps.label_id)",
+ where="deps.job_id = %d" % self.job.id)
+ for label in labels:
+ yield label
+
+
def set_host(self, host):
if host:
self.queue_log_record('Assigning host ' + host.hostname)
@@ -2576,17 +2610,26 @@ class Job(DBObject):
_drone_manager.write_lines_to_file(file_path, [hostname])
- def _next_group_name(self):
+ def _next_group_name(self, group_name=''):
+ """@returns a directory name to use for the next host group results."""
+ if group_name:
+ # Sanitize for use as a pathname.
+ group_name = group_name.replace(os.path.sep, '_')
+ if group_name.startswith('.'):
+ group_name = '_' + group_name[1:]
+ # Add a separator between the group name and 'group%d'.
+ group_name += '.'
+ group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
query = models.HostQueueEntry.objects.filter(
job=self.id).values('execution_subdir').distinct()
subdirs = (entry['execution_subdir'] for entry in query)
- groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
- ids = [int(match.group(1)) for match in groups if match]
+ group_matches = (group_count_re.match(subdir) for subdir in subdirs)
+ ids = [int(match.group(1)) for match in group_matches if match]
if ids:
next_id = max(ids) + 1
else:
next_id = 0
- return "group%d" % next_id
+ return '%sgroup%d' % (group_name, next_id)
def _write_control_file(self, execution_tag):
@@ -2647,31 +2690,64 @@ class Job(DBObject):
return tasks
- def _assign_new_group(self, queue_entries):
+ def _assign_new_group(self, queue_entries, group_name=''):
if len(queue_entries) == 1:
- group_name = queue_entries[0].get_host().hostname
+ group_subdir_name = queue_entries[0].get_host().hostname
else:
- group_name = self._next_group_name()
+ group_subdir_name = self._next_group_name(group_name)
logging.info('Running synchronous job %d hosts %s as %s',
self.id, [entry.host.hostname for entry in queue_entries],
- group_name)
+ group_subdir_name)
for queue_entry in queue_entries:
- queue_entry.set_execution_subdir(group_name)
+ queue_entry.set_execution_subdir(group_subdir_name)
def _choose_group_to_run(self, include_queue_entry):
+ """
+ @returns A tuple containing a list of HostQueueEntry instances to be
+ used to run this Job, a string group name to suggest giving
+ to this job a results database.
+ """
+ if include_queue_entry.atomic_group_id:
+ atomic_group = AtomicGroup(include_queue_entry.atomic_group_id,
+ always_query=False)
+ else:
+ atomic_group = None
+
chosen_entries = [include_queue_entry]
+ if atomic_group:
+ num_entries_wanted = atomic_group.max_number_of_machines
+ else:
+ num_entries_wanted = self.synch_count
+ num_entries_wanted -= len(chosen_entries)
- num_entries_needed = self.synch_count - 1
- if num_entries_needed > 0:
+ if num_entries_wanted > 0:
+ where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
pending_entries = HostQueueEntry.fetch(
- where='job_id = %s AND status = "Pending" AND id != %s',
- params=(self.id, include_queue_entry.id))
- chosen_entries += list(pending_entries)[:num_entries_needed]
+ where=where_clause,
+ params=(self.id, include_queue_entry.id))
+ # TODO(gps): sort these by hostname before slicing.
+ chosen_entries += list(pending_entries)[:num_entries_wanted]
+
+ # Sanity check. We'll only ever be called if this can be met.
+ assert len(chosen_entries) >= self.synch_count
+
+ if atomic_group:
+ # Look at any meta_host and dependency labels and pick the first
+ # one that also specifies this atomic group. Use that label name
+ # as the group name if possible (it is more specific).
+ group_name = atomic_group.name
+ for label in include_queue_entry.get_labels():
+ if label.atomic_group_id:
+ assert label.atomic_group_id == atomic_group.id
+ group_name = label.name
+ break
+ else:
+ group_name = ''
- self._assign_new_group(chosen_entries)
- return chosen_entries
+ self._assign_new_group(chosen_entries, group_name=group_name)
+ return chosen_entries, group_name
def run(self, queue_entry):
@@ -2679,17 +2755,17 @@ class Job(DBObject):
queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
return Agent(self._get_pre_job_tasks(queue_entry))
- queue_entries = self._choose_group_to_run(queue_entry)
- return self._finish_run(queue_entries)
+ queue_entries, group_name = self._choose_group_to_run(queue_entry)
+ return self._finish_run(queue_entries, group_name)
- def _finish_run(self, queue_entries, initial_tasks=[]):
+ def _finish_run(self, queue_entries, group_name):
for queue_entry in queue_entries:
queue_entry.set_status('Starting')
params = self._get_autoserv_params(queue_entries)
queue_task = QueueTask(job=self, queue_entries=queue_entries,
- cmd=params)
- tasks = initial_tasks + [queue_task]
+ cmd=params, group_name=group_name)
+ tasks = [queue_task]
entry_ids = [entry.id for entry in queue_entries]
return Agent(tasks, num_processes=len(queue_entries))
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index 15172078..f2176342 100644
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -1764,6 +1764,35 @@ class AgentTasksTest(unittest.TestCase):
self._test_cleanup_task_helper(False, True)
+class HostQueueEntryTest(BaseSchedulerTest):
+ def _create_hqe(self, dependency_labels=(), **create_job_kwargs):
+ job = self._create_job(**create_job_kwargs)
+ for label in dependency_labels:
+ job.dependency_labels.add(label)
+ hqes = list(monitor_db.HostQueueEntry.fetch(where='job_id=%d' % job.id))
+ self.assertEqual(1, len(hqes))
+ return hqes[0]
+
+ def _check_hqe_labels(self, hqe, expected_labels):
+ expected_labels = set(expected_labels)
+ label_names = set(label.name for label in hqe.get_labels())
+ self.assertEqual(expected_labels, label_names)
+
+ def test_get_labels_empty(self):
+ hqe = self._create_hqe(hosts=[1])
+ labels = list(hqe.get_labels())
+ self.assertEqual([], labels)
+
+ def test_get_labels_metahost(self):
+ hqe = self._create_hqe(metahosts=[2])
+ self._check_hqe_labels(hqe, ['label2'])
+
+ def test_get_labels_dependancies(self):
+ hqe = self._create_hqe(dependency_labels=(self.label3, self.label4),
+ metahosts=[1])
+ self._check_hqe_labels(hqe, ['label1', 'label3', 'label4'])
+
+
class JobTest(BaseSchedulerTest):
def setUp(self):
super(JobTest, self).setUp()
@@ -1774,6 +1803,7 @@ class JobTest(BaseSchedulerTest):
def _setup_directory_expects(self, execution_subdir):
+ # XXX(gps): um... this function does -nothing-
job_path = os.path.join('.', '1-my_user')
results_dir = os.path.join(job_path, execution_subdir)
@@ -1872,6 +1902,35 @@ class JobTest(BaseSchedulerTest):
self.assertEquals(hqe_ids, [1, 2])
+ def test_run_synchronous_atomic_group_ready(self):
+ self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
+ self._update_hqe("status='Pending', execution_subdir=''")
+
+ tasks = self._test_run_helper(expect_starting=True)
+ self.assertEquals(len(tasks), 1)
+ queue_task = tasks[0]
+
+ self.assert_(isinstance(queue_task, monitor_db.QueueTask))
+ # Atomic group jobs that do not a specific label in the atomic group
+ # will use the atomic group name as their group name.
+ self.assertEquals(queue_task.group_name, 'atomic1')
+
+
+ def test_run_synchronous_atomic_group_with_label_ready(self):
+ job = self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
+ job.dependency_labels.add(self.label4)
+ self._update_hqe("status='Pending', execution_subdir=''")
+
+ tasks = self._test_run_helper(expect_starting=True)
+ self.assertEquals(len(tasks), 1)
+ queue_task = tasks[0]
+
+ self.assert_(isinstance(queue_task, monitor_db.QueueTask))
+ # Atomic group jobs that also specify a label in the atomic group
+ # will use the label name as their group name.
+ self.assertEquals(queue_task.group_name, 'label4')
+
+
def test_reboot_before_always(self):
job = self._create_job(hosts=[1])
job.reboot_before = models.RebootBefore.ALWAYS
@@ -1906,6 +1965,39 @@ class JobTest(BaseSchedulerTest):
self._test_reboot_before_if_dirty_helper(False)
+ def test_next_group_name(self):
+ django_job = self._create_job(metahosts=[1])
+ job = monitor_db.Job(id=django_job.id)
+ self.assertEqual('group0', job._next_group_name())
+
+ for hqe in django_job.hostqueueentry_set.filter():
+ hqe.execution_subdir = 'my_rack.group0'
+ hqe.save()
+ self.assertEqual('my_rack.group1', job._next_group_name('my/rack'))
+
+
+class TopLevelFunctionsTest(unittest.TestCase):
+ def test_autoserv_command_line(self):
+ machines = 'abcd12,efgh34'
+ results_dir = '/fake/path'
+ extra_args = ['-Z', 'hello']
+ expected_command_line = [monitor_db._autoserv_path, '-p',
+ '-m', machines, '-r', results_dir]
+
+ command_line = monitor_db._autoserv_command_line(
+ machines, results_dir, extra_args)
+ self.assertEqual(expected_command_line + extra_args, command_line)
+
+ class FakeJob(object):
+ owner = 'Bob'
+ name = 'fake job name'
+
+ command_line = monitor_db._autoserv_command_line(
+ machines, results_dir, extra_args=[], job=FakeJob())
+ self.assertEqual(expected_command_line +
+ ['-u', FakeJob.owner, '-l', FakeJob.name],
+ command_line)
+
if __name__ == '__main__':
unittest.main()