summaryrefslogtreecommitdiff
path: root/scheduler/monitor_db_unittest.py
diff options
context:
space:
mode:
Diffstat (limited to 'scheduler/monitor_db_unittest.py')
-rw-r--r--scheduler/monitor_db_unittest.py393
1 files changed, 369 insertions, 24 deletions
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index 503ccc9c..e37cf788 100644
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -96,21 +96,48 @@ class BaseSchedulerTest(unittest.TestCase):
def _fill_in_test_data(self):
+ """Populate the test database with some hosts and labels."""
user = models.User.objects.create(login='my_user')
acl_group = models.AclGroup.objects.create(name='my_acl')
acl_group.users.add(user)
hosts = [models.Host.objects.create(hostname=hostname) for hostname in
- ('host1', 'host2', 'host3', 'host4')]
+ ('host1', 'host2', 'host3', 'host4', 'host5', 'host6',
+ 'host7', 'host8', 'host9')]
+
acl_group.hosts = hosts
models.AclGroup.smart_get('Everyone').hosts = []
labels = [models.Label.objects.create(name=name) for name in
- ('label1', 'label2', 'label3')]
- labels[2].only_if_needed = True
- labels[2].save()
- hosts[0].labels.add(labels[0])
- hosts[1].labels.add(labels[1])
+ ('label1', 'label2', 'label3', 'label4', 'label5', 'label6',
+ 'label7')]
+
+ atomic_group1 = models.AtomicGroup.objects.create(
+ name='atomic1', max_number_of_machines=2)
+ atomic_group2 = models.AtomicGroup.objects.create(
+ name='atomic2', max_number_of_machines=2)
+
+ self.label3 = labels[2]
+ self.label3.only_if_needed = True
+ self.label3.save()
+ self.label4 = labels[3]
+ self.label4.atomic_group = atomic_group1
+ self.label4.save()
+ self.label5 = labels[4]
+ self.label5.atomic_group = atomic_group1
+ self.label5.save()
+ hosts[0].labels.add(labels[0]) # label1
+ hosts[1].labels.add(labels[1]) # label2
+ self.label6 = labels[5]
+ self.label7 = labels[6]
+ for hostnum in xrange(4,7): # host5..host7
+ hosts[hostnum].labels.add(self.label4) # an atomic group lavel
+ hosts[hostnum].labels.add(self.label6) # a normal label
+ hosts[6].labels.add(self.label7)
+ for hostnum in xrange(7,9): # host8..host9
+ hosts[hostnum].labels.add(self.label5) # an atomic group lavel
+ hosts[hostnum].labels.add(self.label6) # a normal label
+ hosts[hostnum].labels.add(self.label7)
def _setup_dummy_user(self):
@@ -133,7 +160,24 @@ class BaseSchedulerTest(unittest.TestCase):
def _create_job(self, hosts=[], metahosts=[], priority=0, active=False,
- synchronous=False):
+ synchronous=False, atomic_group=None):
+ """
+ Create a job row in the test database.
+
+ @param hosts - A list of explicit host ids for this job to be
+ scheduled on.
+ @param metahosts - A list of label ids for each host that this job
+ should be scheduled on (meta host scheduling).
+ @param priority - The job priority (integer).
+ @param active - bool, mark this job as running or not in the database?
+ @param synchronous - bool, if True use synch_count=2 otherwise use
+ synch_count=1.
+ @param atomic_group - An atomic group id for this job to schedule on
+ or None if atomic scheduling is not required. Each metahost
+ becomes a request to schedule an entire atomic group.
+ This does not support creating an active atomic group job.
+ """
+ assert not (atomic_group and active) # TODO(gps): support this
synch_count = synchronous and 2 or 1
created_on = datetime.datetime(2008, 1, 1)
status = models.HostQueueEntry.Status.QUEUED
@@ -145,17 +189,25 @@ class BaseSchedulerTest(unittest.TestCase):
reboot_before=models.RebootBefore.NEVER)
for host_id in hosts:
models.HostQueueEntry.objects.create(job=job, host_id=host_id,
- status=status)
+ status=status,
+ atomic_group_id=atomic_group)
models.IneligibleHostQueue.objects.create(job=job, host_id=host_id)
for label_id in metahosts:
models.HostQueueEntry.objects.create(job=job, meta_host_id=label_id,
- status=status)
+ status=status,
+ atomic_group_id=atomic_group)
+ if atomic_group and not (metahosts or hosts):
+ # Create a single HQE to request the atomic group of hosts even if
+ # no metahosts or hosts are supplied.
+ models.HostQueueEntry.objects.create(job=job,
+ status=status,
+ atomic_group_id=atomic_group)
return job
def _create_job_simple(self, hosts, use_metahost=False,
priority=0, active=False):
- 'An alternative interface to _create_job'
+ """An alternative interface to _create_job"""
args = {'hosts' : [], 'metahosts' : []}
if use_metahost:
args['metahosts'] = hosts
@@ -218,17 +270,27 @@ class DBObjectTest(BaseSchedulerTest):
class DispatcherSchedulingTest(BaseSchedulerTest):
_jobs_scheduled = []
+
+ def tearDown(self):
+ super(DispatcherSchedulingTest, self).tearDown()
+
+
def _set_monitor_stubs(self):
super(DispatcherSchedulingTest, self)._set_monitor_stubs()
- def run_stub(hqe_self, assigned_host=None):
- hqe_self.set_status('Starting')
- if hqe_self.meta_host:
- host = assigned_host
- else:
- host = hqe_self.host
- self._record_job_scheduled(hqe_self.job.id, host.id)
+
+ def job_run_stub(job_self, queue_entry):
+ """Return a dummy for testing. Called by HostQueueEntry.run()."""
+ self._record_job_scheduled(job_self.id, queue_entry.host.id)
+ queue_entry.set_status('Starting')
return DummyAgent()
- monitor_db.HostQueueEntry.run = run_stub
+
+ self.god.stub_with(monitor_db.Job, 'run', job_run_stub)
+
+ def hqe_queue_log_record_stub(self, log_line):
+ """No-Op to avoid calls down to the _drone_manager during tests."""
+
+ self.god.stub_with(monitor_db.HostQueueEntry, 'queue_log_record',
+ hqe_queue_log_record_stub)
def _record_job_scheduled(self, job_id, host_id):
@@ -248,6 +310,22 @@ class DispatcherSchedulingTest(BaseSchedulerTest):
self._jobs_scheduled.remove(record)
+ def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number):
+ """Assert job was scheduled on exactly number hosts out of a set."""
+ found = []
+ for host_id in host_ids:
+ record = (job_id, host_id)
+ if record in self._jobs_scheduled:
+ found.append(record)
+ self._jobs_scheduled.remove(record)
+ if len(found) < number:
+ self.fail('Job %d scheduled on fewer than %d hosts in %s.\n'
+ 'Jobs scheduled: %s' % (job_id, number, host_ids, found))
+ elif len(found) > number:
+ self.fail('Job %d scheduled on more than %d hosts in %s.\n'
+ 'Jobs scheduled: %s' % (job_id, number, host_ids, found))
+
+
def _check_for_extra_schedulings(self):
if len(self._jobs_scheduled) != 0:
self.fail('Extra jobs scheduled: ' +
@@ -383,22 +461,30 @@ class DispatcherSchedulingTest(BaseSchedulerTest):
self._test_obey_ACLs_helper(True)
- def test_only_if_needed_labels(self):
+ def _setup_test_only_if_needed_labels(self):
# apply only_if_needed label3 to host1
- label3 = models.Label.smart_get('label3')
- models.Host.smart_get('host1').labels.add(label3)
+ models.Host.smart_get('host1').labels.add(self.label3)
+ return self._create_job_simple([1], use_metahost=True)
+
- job = self._create_job_simple([1], use_metahost=True)
+ def test_only_if_needed_labels_avoids_host(self):
+ job = self._setup_test_only_if_needed_labels()
# if the job doesn't depend on label3, there should be no scheduling
self._dispatcher._schedule_new_jobs()
self._check_for_extra_schedulings()
- # now make the job depend on label3
- job.dependency_labels.add(label3)
+
+ def test_only_if_needed_labels_schedules(self):
+ job = self._setup_test_only_if_needed_labels()
+ job.dependency_labels.add(self.label3)
self._dispatcher._schedule_new_jobs()
self._assert_job_scheduled_on(1, 1)
self._check_for_extra_schedulings()
+
+ def test_only_if_needed_labels_via_metahost(self):
+ job = self._setup_test_only_if_needed_labels()
+ job.dependency_labels.add(self.label3)
# should also work if the metahost is the only_if_needed label
self._do_query('DELETE FROM jobs_dependency_labels')
self._create_job(metahosts=[3])
@@ -432,6 +518,265 @@ class DispatcherSchedulingTest(BaseSchedulerTest):
self._check_for_extra_schedulings()
+ # TODO(gps): These should probably live in their own TestCase class
+ # specific to testing HostScheduler methods directly. It was convenient
+ # to put it here for now to share existing test environment setup code.
+ def test_HostScheduler_check_atomic_group_labels(self):
+ normal_job = self._create_job(metahosts=[0])
+ atomic_job = self._create_job(atomic_group=1)
+ # Indirectly initialize the internal state of the host scheduler.
+ self._dispatcher._refresh_pending_queue_entries()
+
+ atomic_hqe = monitor_db.HostQueueEntry(id=atomic_job.id)
+ normal_hqe = monitor_db.HostQueueEntry(id=normal_job.id)
+
+ host_scheduler = self._dispatcher._host_scheduler
+ self.assertTrue(host_scheduler._check_atomic_group_labels(
+ [self.label4.id], atomic_hqe))
+ self.assertFalse(host_scheduler._check_atomic_group_labels(
+ [self.label4.id], normal_hqe))
+ self.assertFalse(host_scheduler._check_atomic_group_labels(
+ [self.label5.id, self.label6.id, self.label7.id], normal_hqe))
+ self.assertTrue(host_scheduler._check_atomic_group_labels(
+ [self.label4.id, self.label6.id], atomic_hqe))
+ self.assertRaises(monitor_db.SchedulerError,
+ host_scheduler._check_atomic_group_labels,
+ [self.label4.id, self.label5.id],
+ atomic_hqe)
+
+
+ def test_HostScheduler_get_host_atomic_group_id(self):
+ self._create_job(metahosts=[self.label6.id])
+ # Indirectly initialize the internal state of the host scheduler.
+ self._dispatcher._refresh_pending_queue_entries()
+
+ # Test the host scheduler
+ host_scheduler = self._dispatcher._host_scheduler
+ self.assertRaises(monitor_db.SchedulerError,
+ host_scheduler._get_host_atomic_group_id,
+ [self.label4.id, self.label5.id])
+ self.assertEqual(None, host_scheduler._get_host_atomic_group_id([]))
+ self.assertEqual(None, host_scheduler._get_host_atomic_group_id(
+ [self.label3.id, self.label7.id, self.label6.id]))
+ self.assertEqual(1, host_scheduler._get_host_atomic_group_id(
+ [self.label4.id, self.label7.id, self.label6.id]))
+ self.assertEqual(1, host_scheduler._get_host_atomic_group_id(
+ [self.label7.id, self.label5.id]))
+
+
+ def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self):
+ # Create a job scheduled to run on label6.
+ self._create_job(metahosts=[self.label6.id])
+ self._dispatcher._schedule_new_jobs()
+ # label6 only has hosts that are in atomic groups associated with it,
+ # there should be no scheduling.
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self):
+ # Create a job scheduled to run on label5. This is an atomic group
+ # label but this job does not request atomic group scheduling.
+ self._create_job(metahosts=[self.label5.id])
+ self._dispatcher._schedule_new_jobs()
+ # label6 only has hosts that are in atomic groups associated with it,
+ # there should be no scheduling.
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_basics(self):
+ # Create jobs scheduled to run on an atomic group.
+ job_a = self._create_job(synchronous=True, metahosts=[self.label4.id],
+ atomic_group=1)
+ job_b = self._create_job(synchronous=True, metahosts=[self.label5.id],
+ atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ # atomic_group.max_number_of_machines was 2 so we should run on 2.
+ self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2)
+ self._assert_job_scheduled_on(job_b.id, 8) # label5
+ self._assert_job_scheduled_on(job_b.id, 9) # label5
+ self._check_for_extra_schedulings()
+
+ # The three host label4 atomic group still has one host available.
+ # That means a job with a synch_count of 1 asking to be scheduled on
+ # the atomic group can still use the final machine.
+ #
+ # This may seem like a somewhat odd use case. It allows the use of an
+ # atomic group as a set of machines to run smaller jobs within (a set
+ # of hosts configured for use in network tests with eachother perhaps?)
+ onehost_job = self._create_job(atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1)
+ self._check_for_extra_schedulings()
+
+ # No more atomic groups have hosts available, no more jobs should
+ # be scheduled.
+ self._create_job(atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_obeys_acls(self):
+ # Request scheduling on a specific atomic label but be denied by ACLs.
+ self._do_query('DELETE FROM acl_groups_hosts WHERE host_id in (8,9)')
+ job = self._create_job(metahosts=[self.label5.id], atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_dependency_label_exclude(self):
+ # A dependency label that matches no hosts in the atomic group.
+ job_a = self._create_job(atomic_group=1)
+ job_a.dependency_labels.add(self.label3)
+ self._dispatcher._schedule_new_jobs()
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_metahost_dependency_label_exclude(self):
+ # A metahost and dependency label that excludes too many hosts.
+ job_b = self._create_job(synchronous=True, metahosts=[self.label4.id],
+ atomic_group=1)
+ job_b.dependency_labels.add(self.label7)
+ self._dispatcher._schedule_new_jobs()
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_dependency_label_match(self):
+ # A dependency label that exists on enough atomic group hosts in only
+ # one of the two atomic group labels.
+ job_c = self._create_job(synchronous=True, atomic_group=1)
+ job_c.dependency_labels.add(self.label7)
+ self._dispatcher._schedule_new_jobs()
+ self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2)
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_no_metahost(self):
+ # Force it to schedule on the other group for a reliable test.
+ self._do_query('UPDATE hosts SET invalid=1 WHERE id=9')
+ # An atomic job without a metahost.
+ job = self._create_job(synchronous=True, atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2)
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_partial_group(self):
+ # Make one host in labels[3] unavailable so that there are only two
+ # hosts left in the group.
+ self._do_query('UPDATE hosts SET status="Repair Failed" WHERE id=5')
+ job = self._create_job(synchronous=True, metahosts=[self.label4.id],
+ atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ # Verify that it was scheduled on the 2 ready hosts in that group.
+ self._assert_job_scheduled_on(job.id, 6)
+ self._assert_job_scheduled_on(job.id, 7)
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_not_enough_available(self):
+ # Mark some hosts in each atomic group label as not usable.
+ # One host running, another invalid in the first group label.
+ self._do_query('UPDATE hosts SET status="Running" WHERE id=5')
+ self._do_query('UPDATE hosts SET invalid=1 WHERE id=6')
+ # One host invalid in the second group label.
+ self._do_query('UPDATE hosts SET invalid=1 WHERE id=9')
+ # Nothing to schedule when no group label has enough (2) good hosts..
+ self._create_job(atomic_group=1, synchronous=True)
+ self._dispatcher._schedule_new_jobs()
+ # There are not enough hosts in either atomic group,
+ # No more scheduling should occur.
+ self._check_for_extra_schedulings()
+
+ # Now create an atomic job that has a synch count of 1. It should
+ # schedule on exactly one of the hosts.
+ onehost_job = self._create_job(atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1)
+
+
+ def test_atomic_group_scheduling_no_valid_hosts(self):
+ self._do_query('UPDATE hosts SET invalid=1 WHERE id in (8,9)')
+ self._create_job(synchronous=True, metahosts=[self.label5.id],
+ atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ # no hosts in the selected group and label are valid. no schedulings.
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_metahost_works(self):
+ # Test that atomic group scheduling also obeys metahosts.
+ self._create_job(metahosts=[0], atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ # There are no atomic group hosts that also have that metahost.
+ self._check_for_extra_schedulings()
+
+ job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ self._assert_job_scheduled_on(job_b.id, 8)
+ self._assert_job_scheduled_on(job_b.id, 9)
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_skips_ineligible_hosts(self):
+ # Test hosts marked ineligible for this job are not eligible.
+ # How would this ever happen anyways?
+ job = self._create_job(metahosts=[self.label4.id], atomic_group=1)
+ models.IneligibleHostQueue.objects.create(job=job, host_id=5)
+ models.IneligibleHostQueue.objects.create(job=job, host_id=6)
+ models.IneligibleHostQueue.objects.create(job=job, host_id=7)
+ self._dispatcher._schedule_new_jobs()
+ # No scheduling should occur as all desired hosts were ineligible.
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_fail(self):
+ # If synch_count is > the atomic group number of machines, the job
+ # should be aborted immediately.
+ model_job = self._create_job(synchronous=True, atomic_group=1)
+ model_job.synch_count = 4
+ model_job.save()
+ job = monitor_db.Job(id=model_job.id)
+ self._dispatcher._schedule_new_jobs()
+ self._check_for_extra_schedulings()
+ queue_entries = job.get_host_queue_entries()
+ self.assertEqual(1, len(queue_entries))
+ self.assertEqual(queue_entries[0].status,
+ models.HostQueueEntry.Status.ABORTED)
+
+
+ def test_schedule_directly_on_atomic_group_host_fail(self):
+ # Scheduling a job directly on hosts in an atomic group must
+ # fail to avoid users inadvertently holding up the use of an
+ # entire atomic group by using the machines individually.
+ job = self._create_job(hosts=[5])
+ self._dispatcher._schedule_new_jobs()
+ self._check_for_extra_schedulings()
+
+
+ def test_schedule_directly_on_atomic_group_host(self):
+ # Scheduling a job directly on one host in an atomic group will
+ # work when the atomic group is listed on the HQE in addition
+ # to the host (assuming the sync count is 1).
+ job = self._create_job(hosts=[5], atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ self._assert_job_scheduled_on(job.id, 5)
+ self._check_for_extra_schedulings()
+
+
+ def test_schedule_directly_on_atomic_group_hosts_sync2(self):
+ job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True)
+ self._dispatcher._schedule_new_jobs()
+ self._assert_job_scheduled_on(job.id, 5)
+ self._assert_job_scheduled_on(job.id, 8)
+ self._check_for_extra_schedulings()
+
+
+ def test_schedule_directly_on_atomic_group_hosts_wrong_group(self):
+ job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True)
+ self._dispatcher._schedule_new_jobs()
+ self._check_for_extra_schedulings()
+
+
def test_only_schedule_queued_entries(self):
self._create_job(metahosts=[1])
self._update_hqe(set='active=1, host_id=2')