diff options
Diffstat (limited to 'scheduler/monitor_db_unittest.py')
-rw-r--r-- | scheduler/monitor_db_unittest.py | 393 |
1 files changed, 369 insertions, 24 deletions
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py index 503ccc9c..e37cf788 100644 --- a/scheduler/monitor_db_unittest.py +++ b/scheduler/monitor_db_unittest.py @@ -96,21 +96,48 @@ class BaseSchedulerTest(unittest.TestCase): def _fill_in_test_data(self): + """Populate the test database with some hosts and labels.""" user = models.User.objects.create(login='my_user') acl_group = models.AclGroup.objects.create(name='my_acl') acl_group.users.add(user) hosts = [models.Host.objects.create(hostname=hostname) for hostname in - ('host1', 'host2', 'host3', 'host4')] + ('host1', 'host2', 'host3', 'host4', 'host5', 'host6', + 'host7', 'host8', 'host9')] + acl_group.hosts = hosts models.AclGroup.smart_get('Everyone').hosts = [] labels = [models.Label.objects.create(name=name) for name in - ('label1', 'label2', 'label3')] - labels[2].only_if_needed = True - labels[2].save() - hosts[0].labels.add(labels[0]) - hosts[1].labels.add(labels[1]) + ('label1', 'label2', 'label3', 'label4', 'label5', 'label6', + 'label7')] + + atomic_group1 = models.AtomicGroup.objects.create( + name='atomic1', max_number_of_machines=2) + atomic_group2 = models.AtomicGroup.objects.create( + name='atomic2', max_number_of_machines=2) + + self.label3 = labels[2] + self.label3.only_if_needed = True + self.label3.save() + self.label4 = labels[3] + self.label4.atomic_group = atomic_group1 + self.label4.save() + self.label5 = labels[4] + self.label5.atomic_group = atomic_group1 + self.label5.save() + hosts[0].labels.add(labels[0]) # label1 + hosts[1].labels.add(labels[1]) # label2 + self.label6 = labels[5] + self.label7 = labels[6] + for hostnum in xrange(4,7): # host5..host7 + hosts[hostnum].labels.add(self.label4) # an atomic group lavel + hosts[hostnum].labels.add(self.label6) # a normal label + hosts[6].labels.add(self.label7) + for hostnum in xrange(7,9): # host8..host9 + hosts[hostnum].labels.add(self.label5) # an atomic group lavel + hosts[hostnum].labels.add(self.label6) # a normal label + hosts[hostnum].labels.add(self.label7) def _setup_dummy_user(self): @@ -133,7 +160,24 @@ class BaseSchedulerTest(unittest.TestCase): def _create_job(self, hosts=[], metahosts=[], priority=0, active=False, - synchronous=False): + synchronous=False, atomic_group=None): + """ + Create a job row in the test database. + + @param hosts - A list of explicit host ids for this job to be + scheduled on. + @param metahosts - A list of label ids for each host that this job + should be scheduled on (meta host scheduling). + @param priority - The job priority (integer). + @param active - bool, mark this job as running or not in the database? + @param synchronous - bool, if True use synch_count=2 otherwise use + synch_count=1. + @param atomic_group - An atomic group id for this job to schedule on + or None if atomic scheduling is not required. Each metahost + becomes a request to schedule an entire atomic group. + This does not support creating an active atomic group job. + """ + assert not (atomic_group and active) # TODO(gps): support this synch_count = synchronous and 2 or 1 created_on = datetime.datetime(2008, 1, 1) status = models.HostQueueEntry.Status.QUEUED @@ -145,17 +189,25 @@ class BaseSchedulerTest(unittest.TestCase): reboot_before=models.RebootBefore.NEVER) for host_id in hosts: models.HostQueueEntry.objects.create(job=job, host_id=host_id, - status=status) + status=status, + atomic_group_id=atomic_group) models.IneligibleHostQueue.objects.create(job=job, host_id=host_id) for label_id in metahosts: models.HostQueueEntry.objects.create(job=job, meta_host_id=label_id, - status=status) + status=status, + atomic_group_id=atomic_group) + if atomic_group and not (metahosts or hosts): + # Create a single HQE to request the atomic group of hosts even if + # no metahosts or hosts are supplied. + models.HostQueueEntry.objects.create(job=job, + status=status, + atomic_group_id=atomic_group) return job def _create_job_simple(self, hosts, use_metahost=False, priority=0, active=False): - 'An alternative interface to _create_job' + """An alternative interface to _create_job""" args = {'hosts' : [], 'metahosts' : []} if use_metahost: args['metahosts'] = hosts @@ -218,17 +270,27 @@ class DBObjectTest(BaseSchedulerTest): class DispatcherSchedulingTest(BaseSchedulerTest): _jobs_scheduled = [] + + def tearDown(self): + super(DispatcherSchedulingTest, self).tearDown() + + def _set_monitor_stubs(self): super(DispatcherSchedulingTest, self)._set_monitor_stubs() - def run_stub(hqe_self, assigned_host=None): - hqe_self.set_status('Starting') - if hqe_self.meta_host: - host = assigned_host - else: - host = hqe_self.host - self._record_job_scheduled(hqe_self.job.id, host.id) + + def job_run_stub(job_self, queue_entry): + """Return a dummy for testing. Called by HostQueueEntry.run().""" + self._record_job_scheduled(job_self.id, queue_entry.host.id) + queue_entry.set_status('Starting') return DummyAgent() - monitor_db.HostQueueEntry.run = run_stub + + self.god.stub_with(monitor_db.Job, 'run', job_run_stub) + + def hqe_queue_log_record_stub(self, log_line): + """No-Op to avoid calls down to the _drone_manager during tests.""" + + self.god.stub_with(monitor_db.HostQueueEntry, 'queue_log_record', + hqe_queue_log_record_stub) def _record_job_scheduled(self, job_id, host_id): @@ -248,6 +310,22 @@ class DispatcherSchedulingTest(BaseSchedulerTest): self._jobs_scheduled.remove(record) + def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number): + """Assert job was scheduled on exactly number hosts out of a set.""" + found = [] + for host_id in host_ids: + record = (job_id, host_id) + if record in self._jobs_scheduled: + found.append(record) + self._jobs_scheduled.remove(record) + if len(found) < number: + self.fail('Job %d scheduled on fewer than %d hosts in %s.\n' + 'Jobs scheduled: %s' % (job_id, number, host_ids, found)) + elif len(found) > number: + self.fail('Job %d scheduled on more than %d hosts in %s.\n' + 'Jobs scheduled: %s' % (job_id, number, host_ids, found)) + + def _check_for_extra_schedulings(self): if len(self._jobs_scheduled) != 0: self.fail('Extra jobs scheduled: ' + @@ -383,22 +461,30 @@ class DispatcherSchedulingTest(BaseSchedulerTest): self._test_obey_ACLs_helper(True) - def test_only_if_needed_labels(self): + def _setup_test_only_if_needed_labels(self): # apply only_if_needed label3 to host1 - label3 = models.Label.smart_get('label3') - models.Host.smart_get('host1').labels.add(label3) + models.Host.smart_get('host1').labels.add(self.label3) + return self._create_job_simple([1], use_metahost=True) + - job = self._create_job_simple([1], use_metahost=True) + def test_only_if_needed_labels_avoids_host(self): + job = self._setup_test_only_if_needed_labels() # if the job doesn't depend on label3, there should be no scheduling self._dispatcher._schedule_new_jobs() self._check_for_extra_schedulings() - # now make the job depend on label3 - job.dependency_labels.add(label3) + + def test_only_if_needed_labels_schedules(self): + job = self._setup_test_only_if_needed_labels() + job.dependency_labels.add(self.label3) self._dispatcher._schedule_new_jobs() self._assert_job_scheduled_on(1, 1) self._check_for_extra_schedulings() + + def test_only_if_needed_labels_via_metahost(self): + job = self._setup_test_only_if_needed_labels() + job.dependency_labels.add(self.label3) # should also work if the metahost is the only_if_needed label self._do_query('DELETE FROM jobs_dependency_labels') self._create_job(metahosts=[3]) @@ -432,6 +518,265 @@ class DispatcherSchedulingTest(BaseSchedulerTest): self._check_for_extra_schedulings() + # TODO(gps): These should probably live in their own TestCase class + # specific to testing HostScheduler methods directly. It was convenient + # to put it here for now to share existing test environment setup code. + def test_HostScheduler_check_atomic_group_labels(self): + normal_job = self._create_job(metahosts=[0]) + atomic_job = self._create_job(atomic_group=1) + # Indirectly initialize the internal state of the host scheduler. + self._dispatcher._refresh_pending_queue_entries() + + atomic_hqe = monitor_db.HostQueueEntry(id=atomic_job.id) + normal_hqe = monitor_db.HostQueueEntry(id=normal_job.id) + + host_scheduler = self._dispatcher._host_scheduler + self.assertTrue(host_scheduler._check_atomic_group_labels( + [self.label4.id], atomic_hqe)) + self.assertFalse(host_scheduler._check_atomic_group_labels( + [self.label4.id], normal_hqe)) + self.assertFalse(host_scheduler._check_atomic_group_labels( + [self.label5.id, self.label6.id, self.label7.id], normal_hqe)) + self.assertTrue(host_scheduler._check_atomic_group_labels( + [self.label4.id, self.label6.id], atomic_hqe)) + self.assertRaises(monitor_db.SchedulerError, + host_scheduler._check_atomic_group_labels, + [self.label4.id, self.label5.id], + atomic_hqe) + + + def test_HostScheduler_get_host_atomic_group_id(self): + self._create_job(metahosts=[self.label6.id]) + # Indirectly initialize the internal state of the host scheduler. + self._dispatcher._refresh_pending_queue_entries() + + # Test the host scheduler + host_scheduler = self._dispatcher._host_scheduler + self.assertRaises(monitor_db.SchedulerError, + host_scheduler._get_host_atomic_group_id, + [self.label4.id, self.label5.id]) + self.assertEqual(None, host_scheduler._get_host_atomic_group_id([])) + self.assertEqual(None, host_scheduler._get_host_atomic_group_id( + [self.label3.id, self.label7.id, self.label6.id])) + self.assertEqual(1, host_scheduler._get_host_atomic_group_id( + [self.label4.id, self.label7.id, self.label6.id])) + self.assertEqual(1, host_scheduler._get_host_atomic_group_id( + [self.label7.id, self.label5.id])) + + + def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self): + # Create a job scheduled to run on label6. + self._create_job(metahosts=[self.label6.id]) + self._dispatcher._schedule_new_jobs() + # label6 only has hosts that are in atomic groups associated with it, + # there should be no scheduling. + self._check_for_extra_schedulings() + + + def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self): + # Create a job scheduled to run on label5. This is an atomic group + # label but this job does not request atomic group scheduling. + self._create_job(metahosts=[self.label5.id]) + self._dispatcher._schedule_new_jobs() + # label6 only has hosts that are in atomic groups associated with it, + # there should be no scheduling. + self._check_for_extra_schedulings() + + + def test_atomic_group_scheduling_basics(self): + # Create jobs scheduled to run on an atomic group. + job_a = self._create_job(synchronous=True, metahosts=[self.label4.id], + atomic_group=1) + job_b = self._create_job(synchronous=True, metahosts=[self.label5.id], + atomic_group=1) + self._dispatcher._schedule_new_jobs() + # atomic_group.max_number_of_machines was 2 so we should run on 2. + self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2) + self._assert_job_scheduled_on(job_b.id, 8) # label5 + self._assert_job_scheduled_on(job_b.id, 9) # label5 + self._check_for_extra_schedulings() + + # The three host label4 atomic group still has one host available. + # That means a job with a synch_count of 1 asking to be scheduled on + # the atomic group can still use the final machine. + # + # This may seem like a somewhat odd use case. It allows the use of an + # atomic group as a set of machines to run smaller jobs within (a set + # of hosts configured for use in network tests with eachother perhaps?) + onehost_job = self._create_job(atomic_group=1) + self._dispatcher._schedule_new_jobs() + self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1) + self._check_for_extra_schedulings() + + # No more atomic groups have hosts available, no more jobs should + # be scheduled. + self._create_job(atomic_group=1) + self._dispatcher._schedule_new_jobs() + self._check_for_extra_schedulings() + + + def test_atomic_group_scheduling_obeys_acls(self): + # Request scheduling on a specific atomic label but be denied by ACLs. + self._do_query('DELETE FROM acl_groups_hosts WHERE host_id in (8,9)') + job = self._create_job(metahosts=[self.label5.id], atomic_group=1) + self._dispatcher._schedule_new_jobs() + self._check_for_extra_schedulings() + + + def test_atomic_group_scheduling_dependency_label_exclude(self): + # A dependency label that matches no hosts in the atomic group. + job_a = self._create_job(atomic_group=1) + job_a.dependency_labels.add(self.label3) + self._dispatcher._schedule_new_jobs() + self._check_for_extra_schedulings() + + + def test_atomic_group_scheduling_metahost_dependency_label_exclude(self): + # A metahost and dependency label that excludes too many hosts. + job_b = self._create_job(synchronous=True, metahosts=[self.label4.id], + atomic_group=1) + job_b.dependency_labels.add(self.label7) + self._dispatcher._schedule_new_jobs() + self._check_for_extra_schedulings() + + + def test_atomic_group_scheduling_dependency_label_match(self): + # A dependency label that exists on enough atomic group hosts in only + # one of the two atomic group labels. + job_c = self._create_job(synchronous=True, atomic_group=1) + job_c.dependency_labels.add(self.label7) + self._dispatcher._schedule_new_jobs() + self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2) + self._check_for_extra_schedulings() + + + def test_atomic_group_scheduling_no_metahost(self): + # Force it to schedule on the other group for a reliable test. + self._do_query('UPDATE hosts SET invalid=1 WHERE id=9') + # An atomic job without a metahost. + job = self._create_job(synchronous=True, atomic_group=1) + self._dispatcher._schedule_new_jobs() + self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2) + self._check_for_extra_schedulings() + + + def test_atomic_group_scheduling_partial_group(self): + # Make one host in labels[3] unavailable so that there are only two + # hosts left in the group. + self._do_query('UPDATE hosts SET status="Repair Failed" WHERE id=5') + job = self._create_job(synchronous=True, metahosts=[self.label4.id], + atomic_group=1) + self._dispatcher._schedule_new_jobs() + # Verify that it was scheduled on the 2 ready hosts in that group. + self._assert_job_scheduled_on(job.id, 6) + self._assert_job_scheduled_on(job.id, 7) + self._check_for_extra_schedulings() + + + def test_atomic_group_scheduling_not_enough_available(self): + # Mark some hosts in each atomic group label as not usable. + # One host running, another invalid in the first group label. + self._do_query('UPDATE hosts SET status="Running" WHERE id=5') + self._do_query('UPDATE hosts SET invalid=1 WHERE id=6') + # One host invalid in the second group label. + self._do_query('UPDATE hosts SET invalid=1 WHERE id=9') + # Nothing to schedule when no group label has enough (2) good hosts.. + self._create_job(atomic_group=1, synchronous=True) + self._dispatcher._schedule_new_jobs() + # There are not enough hosts in either atomic group, + # No more scheduling should occur. + self._check_for_extra_schedulings() + + # Now create an atomic job that has a synch count of 1. It should + # schedule on exactly one of the hosts. + onehost_job = self._create_job(atomic_group=1) + self._dispatcher._schedule_new_jobs() + self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1) + + + def test_atomic_group_scheduling_no_valid_hosts(self): + self._do_query('UPDATE hosts SET invalid=1 WHERE id in (8,9)') + self._create_job(synchronous=True, metahosts=[self.label5.id], + atomic_group=1) + self._dispatcher._schedule_new_jobs() + # no hosts in the selected group and label are valid. no schedulings. + self._check_for_extra_schedulings() + + + def test_atomic_group_scheduling_metahost_works(self): + # Test that atomic group scheduling also obeys metahosts. + self._create_job(metahosts=[0], atomic_group=1) + self._dispatcher._schedule_new_jobs() + # There are no atomic group hosts that also have that metahost. + self._check_for_extra_schedulings() + + job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1) + self._dispatcher._schedule_new_jobs() + self._assert_job_scheduled_on(job_b.id, 8) + self._assert_job_scheduled_on(job_b.id, 9) + self._check_for_extra_schedulings() + + + def test_atomic_group_skips_ineligible_hosts(self): + # Test hosts marked ineligible for this job are not eligible. + # How would this ever happen anyways? + job = self._create_job(metahosts=[self.label4.id], atomic_group=1) + models.IneligibleHostQueue.objects.create(job=job, host_id=5) + models.IneligibleHostQueue.objects.create(job=job, host_id=6) + models.IneligibleHostQueue.objects.create(job=job, host_id=7) + self._dispatcher._schedule_new_jobs() + # No scheduling should occur as all desired hosts were ineligible. + self._check_for_extra_schedulings() + + + def test_atomic_group_scheduling_fail(self): + # If synch_count is > the atomic group number of machines, the job + # should be aborted immediately. + model_job = self._create_job(synchronous=True, atomic_group=1) + model_job.synch_count = 4 + model_job.save() + job = monitor_db.Job(id=model_job.id) + self._dispatcher._schedule_new_jobs() + self._check_for_extra_schedulings() + queue_entries = job.get_host_queue_entries() + self.assertEqual(1, len(queue_entries)) + self.assertEqual(queue_entries[0].status, + models.HostQueueEntry.Status.ABORTED) + + + def test_schedule_directly_on_atomic_group_host_fail(self): + # Scheduling a job directly on hosts in an atomic group must + # fail to avoid users inadvertently holding up the use of an + # entire atomic group by using the machines individually. + job = self._create_job(hosts=[5]) + self._dispatcher._schedule_new_jobs() + self._check_for_extra_schedulings() + + + def test_schedule_directly_on_atomic_group_host(self): + # Scheduling a job directly on one host in an atomic group will + # work when the atomic group is listed on the HQE in addition + # to the host (assuming the sync count is 1). + job = self._create_job(hosts=[5], atomic_group=1) + self._dispatcher._schedule_new_jobs() + self._assert_job_scheduled_on(job.id, 5) + self._check_for_extra_schedulings() + + + def test_schedule_directly_on_atomic_group_hosts_sync2(self): + job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True) + self._dispatcher._schedule_new_jobs() + self._assert_job_scheduled_on(job.id, 5) + self._assert_job_scheduled_on(job.id, 8) + self._check_for_extra_schedulings() + + + def test_schedule_directly_on_atomic_group_hosts_wrong_group(self): + job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True) + self._dispatcher._schedule_new_jobs() + self._check_for_extra_schedulings() + + def test_only_schedule_queued_entries(self): self._create_job(metahosts=[1]) self._update_hqe(set='active=1, host_id=2') |