summaryrefslogtreecommitdiff
path: root/scheduler
diff options
context:
space:
mode:
authorshoward <showard@592f7852-d20e-0410-864c-8624ca9c26a4>2009-03-12 20:39:13 +0000
committershoward <showard@592f7852-d20e-0410-864c-8624ca9c26a4>2009-03-12 20:39:13 +0000
commitedcfc3fc638b38c3a7b6214a7eaa752441cdb02a (patch)
treed2d36ebd674a3d88d27e855e58452249b79e8eec /scheduler
parentca0fa5bf2aff8b045b92a408937e52cfd709888c (diff)
Add the concept of an Atomic Group to the scheduler and database.
Scheduling a job on an atomic group means that all of the Ready machines (up to a maximum specified in the atomic group) in a single label associated with that atomic group will be used to run the job. The job synch_count becomes a minimum when scheduling on an atomic group. Both HostQueueEntrys and Labels may have an AtomicGroup associated with them: * A HostQueueEntry with an AtomicGroup acts to schedule a job on all Ready machines of a single Label associated with that AtomicGroup. * A Label with an AtomicGroup means that any Hosts bearing that Label may only be scheduled together as a group with other hosts of that Label to satisify a Job's HostQueueEntry bearing the same AtomicGroup. Such Hosts will never be scheduled as normal metahosts. Future patches are coming that will add the ability to schedule jobs using this feature to the RPC interface, CLI and GUI. Signed-off-by: Gregory Smith <gps@google.com> git-svn-id: svn://test.kernel.org/autotest/trunk@2878 592f7852-d20e-0410-864c-8624ca9c26a4
Diffstat (limited to 'scheduler')
-rw-r--r--scheduler/monitor_db.py269
-rw-r--r--scheduler/monitor_db_unittest.py393
2 files changed, 615 insertions, 47 deletions
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 02356d66..388dfe54 100644
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -5,7 +5,7 @@ Autotest scheduler
"""
-import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
+import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
import itertools, logging, weakref
import common
@@ -193,6 +193,10 @@ def queue_entries_to_abort():
return qe
+class SchedulerError(Exception):
+ """Raised by HostScheduler when an inconsistent state occurs."""
+
+
class HostScheduler(object):
def _get_ready_hosts(self):
# avoid any host with a currently active queue entry against it
@@ -224,7 +228,7 @@ class HostScheduler(object):
def _process_many2many_dict(rows, flip=False):
result = {}
for row in rows:
- left_id, right_id = long(row[0]), long(row[1])
+ left_id, right_id = int(row[0]), int(row[1])
if flip:
left_id, right_id = right_id, left_id
result.setdefault(left_id, set()).add(right_id)
@@ -317,7 +321,7 @@ class HostScheduler(object):
def _check_job_dependencies(self, job_dependencies, host_labels):
missing = job_dependencies - host_labels
- return len(job_dependencies - host_labels) == 0
+ return len(missing) == 0
def _check_only_if_needed_labels(self, job_dependencies, host_labels,
@@ -339,15 +343,79 @@ class HostScheduler(object):
return True
+ def _check_atomic_group_labels(self, host_labels, queue_entry):
+ """
+ Determine if the given HostQueueEntry's atomic group settings are okay
+ to schedule on a host with the given labels.
+
+ @param host_labels - A list of label ids that the host has.
+ @param queue_entry - The HostQueueEntry being considered for the host.
+
+ @returns True if atomic group settings are okay, False otherwise.
+ """
+ return (self._get_host_atomic_group_id(host_labels) ==
+ queue_entry.atomic_group_id)
+
+
+ def _get_host_atomic_group_id(self, host_labels):
+ """
+ Return the atomic group label id for a host with the given set of
+ labels if any, or None otherwise. Raises an exception if more than
+ one atomic group are found in the set of labels.
+
+ @param host_labels - A list of label ids that the host has.
+
+ @returns The id of the atomic group found on a label in host_labels
+ or None if no atomic group label is found.
+ @raises SchedulerError - If more than one atomic group label is found.
+ """
+ atomic_ids = [self._labels[label_id].atomic_group_id
+ for label_id in host_labels
+ if self._labels[label_id].atomic_group_id is not None]
+ if not atomic_ids:
+ return None
+ if len(atomic_ids) > 1:
+ raise SchedulerError('More than one atomic label on host.')
+ return atomic_ids[0]
+
+
+ def _get_atomic_group_labels(self, atomic_group_id):
+ """
+ Lookup the label ids that an atomic_group is associated with.
+
+ @param atomic_group_id - The id of the AtomicGroup to look up.
+
+ @returns A generator yeilding Label ids for this atomic group.
+ """
+ return (id for id, label in self._labels.iteritems()
+ if label.atomic_group_id == atomic_group_id
+ and not label.invalid)
+
+
+ def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
+ """
+ @param group_hosts - A sequence of Host ids to test for usability
+ and eligibility against the Job associated with queue_entry.
+ @param queue_entry - The HostQueueEntry that these hosts are being
+ tested for eligibility against.
+
+ @returns A subset of group_hosts Host ids that are eligible for the
+ supplied queue_entry.
+ """
+ return set(host_id for host_id in group_hosts
+ if self._is_host_usable(host_id)
+ and self._is_host_eligible_for_job(host_id, queue_entry))
+
+
def _is_host_eligible_for_job(self, host_id, queue_entry):
job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
host_labels = self._host_labels.get(host_id, set())
- acl = self._is_acl_accessible(host_id, queue_entry)
- deps = self._check_job_dependencies(job_dependencies, host_labels)
- only_if = self._check_only_if_needed_labels(job_dependencies,
- host_labels, queue_entry)
- return acl and deps and only_if
+ return (self._is_acl_accessible(host_id, queue_entry) and
+ self._check_job_dependencies(job_dependencies, host_labels) and
+ self._check_only_if_needed_labels(
+ job_dependencies, host_labels, queue_entry) and
+ self._check_atomic_group_labels(host_labels, queue_entry))
def _schedule_non_metahost(self, queue_entry):
@@ -383,6 +451,8 @@ class HostScheduler(object):
if not self._is_host_eligible_for_job(host_id, queue_entry):
continue
+ # Remove the host from our cached internal state before returning
+ # the host object.
hosts_in_label.remove(host_id)
return self._hosts_available.pop(host_id)
return None
@@ -390,10 +460,88 @@ class HostScheduler(object):
def find_eligible_host(self, queue_entry):
if not queue_entry.meta_host:
+ assert queue_entry.host_id is not None
return self._schedule_non_metahost(queue_entry)
+ assert queue_entry.atomic_group_id is None
return self._schedule_metahost(queue_entry)
+ def find_eligible_atomic_group(self, queue_entry):
+ """
+ Given an atomic group host queue entry, locate an appropriate group
+ of hosts for the associated job to run on.
+
+ The caller is responsible for creating new HQEs for the additional
+ hosts returned in order to run the actual job on them.
+
+ @returns A list of Host instances in a ready state to satisfy this
+ atomic group scheduling. Hosts will all belong to the same
+ atomic group label as specified by the queue_entry.
+ An empty list will be returned if no suitable atomic
+ group could be found.
+
+ TODO(gps): what is responsible for kicking off any attempted repairs on
+ a group of hosts? not this function, but something needs to. We do
+ not communicate that reason for returning [] outside of here...
+ For now, we'll just be unschedulable if enough hosts within one group
+ enter Repair Failed state.
+ """
+ assert queue_entry.atomic_group_id is not None
+ job = queue_entry.job
+ assert job.synch_count and job.synch_count > 0
+ atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
+ if job.synch_count > atomic_group.max_number_of_machines:
+ # Such a Job and HostQueueEntry should never be possible to
+ # create using the frontend. Regardless, we can't process it.
+ # Abort it immediately and log an error on the scheduler.
+ queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
+ bprint('Error: job %d synch_count=%d > requested atomic_group %d '
+ 'max_number_of_machines=%d. Aborted host_queue_entry %d.' %
+ (job.id, job.synch_count, atomic_group.id,
+ atomic_group.max_number_of_machines, queue_entry.id))
+ return []
+ hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
+ ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
+ set())
+
+ # Look in each label associated with atomic_group until we find one with
+ # enough hosts to satisfy the job.
+ for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
+ group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
+ if queue_entry.meta_host is not None:
+ # If we have a metahost label, only allow its hosts.
+ group_hosts.intersection_update(hosts_in_label)
+ group_hosts -= ineligible_host_ids
+ eligible_hosts_in_group = self._get_eligible_hosts_in_group(
+ group_hosts, queue_entry)
+
+ # Job.synch_count is treated as "minimum synch count" when
+ # scheduling for an atomic group of hosts. The atomic group
+ # number of machines is the maximum to pick out of a single
+ # atomic group label for scheduling at one time.
+ min_hosts = job.synch_count
+ max_hosts = atomic_group.max_number_of_machines
+
+ if len(eligible_hosts_in_group) < min_hosts:
+ # Not enough eligible hosts in this atomic group label.
+ continue
+
+ # Limit ourselves to scheduling the atomic group size.
+ if len(eligible_hosts_in_group) > max_hosts:
+ eligible_hosts_in_group = random.sample(
+ eligible_hosts_in_group, max_hosts)
+
+ # Remove the selected hosts from our cached internal state
+ # of available hosts in order to return the Host objects.
+ host_list = []
+ for host_id in eligible_hosts_in_group:
+ hosts_in_label.discard(host_id)
+ host_list.append(self._hosts_available.pop(host_id))
+ return host_list
+
+ return []
+
+
class Dispatcher(object):
def __init__(self):
self._agents = []
@@ -677,18 +825,64 @@ class Dispatcher(object):
order_by='jobs.priority DESC, meta_host, job_id'))
- def _schedule_new_jobs(self):
+ def _refresh_pending_queue_entries(self):
+ """
+ Lookup the pending HostQueueEntries and call our HostScheduler
+ refresh() method given that list. Return the list.
+
+ @returns A list of pending HostQueueEntries sorted in priority order.
+ """
queue_entries = self._get_pending_queue_entries()
if not queue_entries:
- return
+ return []
self._host_scheduler.refresh(queue_entries)
+ return queue_entries
+
+
+ def _schedule_atomic_group(self, queue_entry):
+ """
+ Schedule the given queue_entry on an atomic group of hosts.
+
+ Returns immediately if there are insufficient available hosts.
+
+ Creates new HostQueueEntries based off of queue_entry for the
+ scheduled hosts and starts them all running.
+ """
+ # This is a virtual host queue entry representing an entire
+ # atomic group, find a group and schedule their hosts.
+ group_hosts = self._host_scheduler.find_eligible_atomic_group(
+ queue_entry)
+ if not group_hosts:
+ return
+ # The first assigned host uses the original HostQueueEntry
+ group_queue_entries = [queue_entry]
+ for assigned_host in group_hosts[1:]:
+ # Create a new HQE for every additional assigned_host.
+ new_hqe = HostQueueEntry.clone(queue_entry)
+ new_hqe.save()
+ group_queue_entries.append(new_hqe)
+ assert len(group_queue_entries) == len(group_hosts)
+ for queue_entry, host in itertools.izip(group_queue_entries,
+ group_hosts):
+ self._run_queue_entry(queue_entry, host)
+
+
+ def _schedule_new_jobs(self):
+ queue_entries = self._refresh_pending_queue_entries()
+ if not queue_entries:
+ return
+
for queue_entry in queue_entries:
- assigned_host = self._host_scheduler.find_eligible_host(queue_entry)
- if not assigned_host:
- continue
- self._run_queue_entry(queue_entry, assigned_host)
+ if (queue_entry.atomic_group_id is None or
+ queue_entry.host_id is not None):
+ assigned_host = self._host_scheduler.find_eligible_host(
+ queue_entry)
+ if assigned_host:
+ self._run_queue_entry(queue_entry, assigned_host)
+ else:
+ self._schedule_atomic_group(queue_entry)
def _run_queue_entry(self, queue_entry, host):
@@ -1707,10 +1901,12 @@ class DBObject(object):
keys = self._fields[1:] # avoid id
columns = ','.join([str(key) for key in keys])
values = ['"%s"' % self.__dict__[key] for key in keys]
- values = ','.join(values)
- query = """INSERT INTO %s (%s) VALUES (%s)""" % \
- (self.__table, columns, values)
+ values_str = ','.join(values)
+ query = ('INSERT INTO %s (%s) VALUES (%s)' %
+ (self.__table, columns, values_str))
_db.execute(query)
+ # Update our id to the one the database just assigned to us.
+ self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
def delete(self):
@@ -1730,6 +1926,11 @@ class DBObject(object):
@classmethod
def fetch(cls, where='', params=(), joins='', order_by=''):
+ """
+ Construct instances of our class based on the given database query.
+
+ @yields One class instance for each row fetched.
+ """
order_by = cls._prefix_with(order_by, 'ORDER BY ')
where = cls._prefix_with(where, 'WHERE ')
query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
@@ -1747,10 +1948,15 @@ class IneligibleHostQueue(DBObject):
_fields = ('id', 'job_id', 'host_id')
+class AtomicGroup(DBObject):
+ _table_name = 'atomic_groups'
+ _fields = ('id', 'name', 'description', 'max_number_of_machines')
+
+
class Label(DBObject):
_table_name = 'labels'
_fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
- 'only_if_needed')
+ 'only_if_needed', 'atomic_group_id')
class Host(DBObject):
@@ -1814,7 +2020,8 @@ class Host(DBObject):
class HostQueueEntry(DBObject):
_table_name = 'host_queue_entries'
_fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
- 'active', 'complete', 'deleted', 'execution_subdir')
+ 'active', 'complete', 'deleted', 'execution_subdir',
+ 'atomic_group_id')
def __init__(self, id=None, row=None, **kwargs):
@@ -1831,6 +2038,21 @@ class HostQueueEntry(DBObject):
'queue.log.' + str(self.id))
+ @classmethod
+ def clone(cls, template):
+ """
+ Creates a new row using the values from a template instance.
+
+ The new instance will not exist in the database or have a valid
+ id attribute until its save() method is called.
+ """
+ assert isinstance(template, cls)
+ new_row = [getattr(template, field) for field in cls._fields]
+ clone = cls(row=new_row, new_record=True)
+ clone.id = None
+ return clone
+
+
def _view_job_url(self):
return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
@@ -1959,14 +2181,15 @@ class HostQueueEntry(DBObject):
email_manager.manager.send_email(self.job.email_list, subject, body)
- def run(self,assigned_host=None):
- if self.meta_host:
+ def run(self, assigned_host=None):
+ if self.meta_host is not None or self.atomic_group_id is not None:
assert assigned_host
# ensure results dir exists for the queue log
self.set_host(assigned_host)
- print "%s/%s scheduled on %s, status=%s" % (self.job.name,
- self.meta_host, self.host.hostname, self.status)
+ print "%s/%s/%s scheduled on %s, status=%s" % (
+ self.job.name, self.meta_host, self.atomic_group_id,
+ self.host.hostname, self.status)
return self.job.run(queue_entry=self)
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index 503ccc9c..e37cf788 100644
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -96,21 +96,48 @@ class BaseSchedulerTest(unittest.TestCase):
def _fill_in_test_data(self):
+ """Populate the test database with some hosts and labels."""
user = models.User.objects.create(login='my_user')
acl_group = models.AclGroup.objects.create(name='my_acl')
acl_group.users.add(user)
hosts = [models.Host.objects.create(hostname=hostname) for hostname in
- ('host1', 'host2', 'host3', 'host4')]
+ ('host1', 'host2', 'host3', 'host4', 'host5', 'host6',
+ 'host7', 'host8', 'host9')]
+
acl_group.hosts = hosts
models.AclGroup.smart_get('Everyone').hosts = []
labels = [models.Label.objects.create(name=name) for name in
- ('label1', 'label2', 'label3')]
- labels[2].only_if_needed = True
- labels[2].save()
- hosts[0].labels.add(labels[0])
- hosts[1].labels.add(labels[1])
+ ('label1', 'label2', 'label3', 'label4', 'label5', 'label6',
+ 'label7')]
+
+ atomic_group1 = models.AtomicGroup.objects.create(
+ name='atomic1', max_number_of_machines=2)
+ atomic_group2 = models.AtomicGroup.objects.create(
+ name='atomic2', max_number_of_machines=2)
+
+ self.label3 = labels[2]
+ self.label3.only_if_needed = True
+ self.label3.save()
+ self.label4 = labels[3]
+ self.label4.atomic_group = atomic_group1
+ self.label4.save()
+ self.label5 = labels[4]
+ self.label5.atomic_group = atomic_group1
+ self.label5.save()
+ hosts[0].labels.add(labels[0]) # label1
+ hosts[1].labels.add(labels[1]) # label2
+ self.label6 = labels[5]
+ self.label7 = labels[6]
+ for hostnum in xrange(4,7): # host5..host7
+ hosts[hostnum].labels.add(self.label4) # an atomic group lavel
+ hosts[hostnum].labels.add(self.label6) # a normal label
+ hosts[6].labels.add(self.label7)
+ for hostnum in xrange(7,9): # host8..host9
+ hosts[hostnum].labels.add(self.label5) # an atomic group lavel
+ hosts[hostnum].labels.add(self.label6) # a normal label
+ hosts[hostnum].labels.add(self.label7)
def _setup_dummy_user(self):
@@ -133,7 +160,24 @@ class BaseSchedulerTest(unittest.TestCase):
def _create_job(self, hosts=[], metahosts=[], priority=0, active=False,
- synchronous=False):
+ synchronous=False, atomic_group=None):
+ """
+ Create a job row in the test database.
+
+ @param hosts - A list of explicit host ids for this job to be
+ scheduled on.
+ @param metahosts - A list of label ids for each host that this job
+ should be scheduled on (meta host scheduling).
+ @param priority - The job priority (integer).
+ @param active - bool, mark this job as running or not in the database?
+ @param synchronous - bool, if True use synch_count=2 otherwise use
+ synch_count=1.
+ @param atomic_group - An atomic group id for this job to schedule on
+ or None if atomic scheduling is not required. Each metahost
+ becomes a request to schedule an entire atomic group.
+ This does not support creating an active atomic group job.
+ """
+ assert not (atomic_group and active) # TODO(gps): support this
synch_count = synchronous and 2 or 1
created_on = datetime.datetime(2008, 1, 1)
status = models.HostQueueEntry.Status.QUEUED
@@ -145,17 +189,25 @@ class BaseSchedulerTest(unittest.TestCase):
reboot_before=models.RebootBefore.NEVER)
for host_id in hosts:
models.HostQueueEntry.objects.create(job=job, host_id=host_id,
- status=status)
+ status=status,
+ atomic_group_id=atomic_group)
models.IneligibleHostQueue.objects.create(job=job, host_id=host_id)
for label_id in metahosts:
models.HostQueueEntry.objects.create(job=job, meta_host_id=label_id,
- status=status)
+ status=status,
+ atomic_group_id=atomic_group)
+ if atomic_group and not (metahosts or hosts):
+ # Create a single HQE to request the atomic group of hosts even if
+ # no metahosts or hosts are supplied.
+ models.HostQueueEntry.objects.create(job=job,
+ status=status,
+ atomic_group_id=atomic_group)
return job
def _create_job_simple(self, hosts, use_metahost=False,
priority=0, active=False):
- 'An alternative interface to _create_job'
+ """An alternative interface to _create_job"""
args = {'hosts' : [], 'metahosts' : []}
if use_metahost:
args['metahosts'] = hosts
@@ -218,17 +270,27 @@ class DBObjectTest(BaseSchedulerTest):
class DispatcherSchedulingTest(BaseSchedulerTest):
_jobs_scheduled = []
+
+ def tearDown(self):
+ super(DispatcherSchedulingTest, self).tearDown()
+
+
def _set_monitor_stubs(self):
super(DispatcherSchedulingTest, self)._set_monitor_stubs()
- def run_stub(hqe_self, assigned_host=None):
- hqe_self.set_status('Starting')
- if hqe_self.meta_host:
- host = assigned_host
- else:
- host = hqe_self.host
- self._record_job_scheduled(hqe_self.job.id, host.id)
+
+ def job_run_stub(job_self, queue_entry):
+ """Return a dummy for testing. Called by HostQueueEntry.run()."""
+ self._record_job_scheduled(job_self.id, queue_entry.host.id)
+ queue_entry.set_status('Starting')
return DummyAgent()
- monitor_db.HostQueueEntry.run = run_stub
+
+ self.god.stub_with(monitor_db.Job, 'run', job_run_stub)
+
+ def hqe_queue_log_record_stub(self, log_line):
+ """No-Op to avoid calls down to the _drone_manager during tests."""
+
+ self.god.stub_with(monitor_db.HostQueueEntry, 'queue_log_record',
+ hqe_queue_log_record_stub)
def _record_job_scheduled(self, job_id, host_id):
@@ -248,6 +310,22 @@ class DispatcherSchedulingTest(BaseSchedulerTest):
self._jobs_scheduled.remove(record)
+ def _assert_job_scheduled_on_number_of(self, job_id, host_ids, number):
+ """Assert job was scheduled on exactly number hosts out of a set."""
+ found = []
+ for host_id in host_ids:
+ record = (job_id, host_id)
+ if record in self._jobs_scheduled:
+ found.append(record)
+ self._jobs_scheduled.remove(record)
+ if len(found) < number:
+ self.fail('Job %d scheduled on fewer than %d hosts in %s.\n'
+ 'Jobs scheduled: %s' % (job_id, number, host_ids, found))
+ elif len(found) > number:
+ self.fail('Job %d scheduled on more than %d hosts in %s.\n'
+ 'Jobs scheduled: %s' % (job_id, number, host_ids, found))
+
+
def _check_for_extra_schedulings(self):
if len(self._jobs_scheduled) != 0:
self.fail('Extra jobs scheduled: ' +
@@ -383,22 +461,30 @@ class DispatcherSchedulingTest(BaseSchedulerTest):
self._test_obey_ACLs_helper(True)
- def test_only_if_needed_labels(self):
+ def _setup_test_only_if_needed_labels(self):
# apply only_if_needed label3 to host1
- label3 = models.Label.smart_get('label3')
- models.Host.smart_get('host1').labels.add(label3)
+ models.Host.smart_get('host1').labels.add(self.label3)
+ return self._create_job_simple([1], use_metahost=True)
+
- job = self._create_job_simple([1], use_metahost=True)
+ def test_only_if_needed_labels_avoids_host(self):
+ job = self._setup_test_only_if_needed_labels()
# if the job doesn't depend on label3, there should be no scheduling
self._dispatcher._schedule_new_jobs()
self._check_for_extra_schedulings()
- # now make the job depend on label3
- job.dependency_labels.add(label3)
+
+ def test_only_if_needed_labels_schedules(self):
+ job = self._setup_test_only_if_needed_labels()
+ job.dependency_labels.add(self.label3)
self._dispatcher._schedule_new_jobs()
self._assert_job_scheduled_on(1, 1)
self._check_for_extra_schedulings()
+
+ def test_only_if_needed_labels_via_metahost(self):
+ job = self._setup_test_only_if_needed_labels()
+ job.dependency_labels.add(self.label3)
# should also work if the metahost is the only_if_needed label
self._do_query('DELETE FROM jobs_dependency_labels')
self._create_job(metahosts=[3])
@@ -432,6 +518,265 @@ class DispatcherSchedulingTest(BaseSchedulerTest):
self._check_for_extra_schedulings()
+ # TODO(gps): These should probably live in their own TestCase class
+ # specific to testing HostScheduler methods directly. It was convenient
+ # to put it here for now to share existing test environment setup code.
+ def test_HostScheduler_check_atomic_group_labels(self):
+ normal_job = self._create_job(metahosts=[0])
+ atomic_job = self._create_job(atomic_group=1)
+ # Indirectly initialize the internal state of the host scheduler.
+ self._dispatcher._refresh_pending_queue_entries()
+
+ atomic_hqe = monitor_db.HostQueueEntry(id=atomic_job.id)
+ normal_hqe = monitor_db.HostQueueEntry(id=normal_job.id)
+
+ host_scheduler = self._dispatcher._host_scheduler
+ self.assertTrue(host_scheduler._check_atomic_group_labels(
+ [self.label4.id], atomic_hqe))
+ self.assertFalse(host_scheduler._check_atomic_group_labels(
+ [self.label4.id], normal_hqe))
+ self.assertFalse(host_scheduler._check_atomic_group_labels(
+ [self.label5.id, self.label6.id, self.label7.id], normal_hqe))
+ self.assertTrue(host_scheduler._check_atomic_group_labels(
+ [self.label4.id, self.label6.id], atomic_hqe))
+ self.assertRaises(monitor_db.SchedulerError,
+ host_scheduler._check_atomic_group_labels,
+ [self.label4.id, self.label5.id],
+ atomic_hqe)
+
+
+ def test_HostScheduler_get_host_atomic_group_id(self):
+ self._create_job(metahosts=[self.label6.id])
+ # Indirectly initialize the internal state of the host scheduler.
+ self._dispatcher._refresh_pending_queue_entries()
+
+ # Test the host scheduler
+ host_scheduler = self._dispatcher._host_scheduler
+ self.assertRaises(monitor_db.SchedulerError,
+ host_scheduler._get_host_atomic_group_id,
+ [self.label4.id, self.label5.id])
+ self.assertEqual(None, host_scheduler._get_host_atomic_group_id([]))
+ self.assertEqual(None, host_scheduler._get_host_atomic_group_id(
+ [self.label3.id, self.label7.id, self.label6.id]))
+ self.assertEqual(1, host_scheduler._get_host_atomic_group_id(
+ [self.label4.id, self.label7.id, self.label6.id]))
+ self.assertEqual(1, host_scheduler._get_host_atomic_group_id(
+ [self.label7.id, self.label5.id]))
+
+
+ def test_atomic_group_hosts_blocked_from_non_atomic_jobs(self):
+ # Create a job scheduled to run on label6.
+ self._create_job(metahosts=[self.label6.id])
+ self._dispatcher._schedule_new_jobs()
+ # label6 only has hosts that are in atomic groups associated with it,
+ # there should be no scheduling.
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_hosts_blocked_from_non_atomic_jobs_explicit(self):
+ # Create a job scheduled to run on label5. This is an atomic group
+ # label but this job does not request atomic group scheduling.
+ self._create_job(metahosts=[self.label5.id])
+ self._dispatcher._schedule_new_jobs()
+ # label6 only has hosts that are in atomic groups associated with it,
+ # there should be no scheduling.
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_basics(self):
+ # Create jobs scheduled to run on an atomic group.
+ job_a = self._create_job(synchronous=True, metahosts=[self.label4.id],
+ atomic_group=1)
+ job_b = self._create_job(synchronous=True, metahosts=[self.label5.id],
+ atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ # atomic_group.max_number_of_machines was 2 so we should run on 2.
+ self._assert_job_scheduled_on_number_of(job_a.id, (5, 6, 7), 2)
+ self._assert_job_scheduled_on(job_b.id, 8) # label5
+ self._assert_job_scheduled_on(job_b.id, 9) # label5
+ self._check_for_extra_schedulings()
+
+ # The three host label4 atomic group still has one host available.
+ # That means a job with a synch_count of 1 asking to be scheduled on
+ # the atomic group can still use the final machine.
+ #
+ # This may seem like a somewhat odd use case. It allows the use of an
+ # atomic group as a set of machines to run smaller jobs within (a set
+ # of hosts configured for use in network tests with eachother perhaps?)
+ onehost_job = self._create_job(atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ self._assert_job_scheduled_on_number_of(onehost_job.id, (5, 6, 7), 1)
+ self._check_for_extra_schedulings()
+
+ # No more atomic groups have hosts available, no more jobs should
+ # be scheduled.
+ self._create_job(atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_obeys_acls(self):
+ # Request scheduling on a specific atomic label but be denied by ACLs.
+ self._do_query('DELETE FROM acl_groups_hosts WHERE host_id in (8,9)')
+ job = self._create_job(metahosts=[self.label5.id], atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_dependency_label_exclude(self):
+ # A dependency label that matches no hosts in the atomic group.
+ job_a = self._create_job(atomic_group=1)
+ job_a.dependency_labels.add(self.label3)
+ self._dispatcher._schedule_new_jobs()
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_metahost_dependency_label_exclude(self):
+ # A metahost and dependency label that excludes too many hosts.
+ job_b = self._create_job(synchronous=True, metahosts=[self.label4.id],
+ atomic_group=1)
+ job_b.dependency_labels.add(self.label7)
+ self._dispatcher._schedule_new_jobs()
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_dependency_label_match(self):
+ # A dependency label that exists on enough atomic group hosts in only
+ # one of the two atomic group labels.
+ job_c = self._create_job(synchronous=True, atomic_group=1)
+ job_c.dependency_labels.add(self.label7)
+ self._dispatcher._schedule_new_jobs()
+ self._assert_job_scheduled_on_number_of(job_c.id, (8, 9), 2)
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_no_metahost(self):
+ # Force it to schedule on the other group for a reliable test.
+ self._do_query('UPDATE hosts SET invalid=1 WHERE id=9')
+ # An atomic job without a metahost.
+ job = self._create_job(synchronous=True, atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ self._assert_job_scheduled_on_number_of(job.id, (5, 6, 7), 2)
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_partial_group(self):
+ # Make one host in labels[3] unavailable so that there are only two
+ # hosts left in the group.
+ self._do_query('UPDATE hosts SET status="Repair Failed" WHERE id=5')
+ job = self._create_job(synchronous=True, metahosts=[self.label4.id],
+ atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ # Verify that it was scheduled on the 2 ready hosts in that group.
+ self._assert_job_scheduled_on(job.id, 6)
+ self._assert_job_scheduled_on(job.id, 7)
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_not_enough_available(self):
+ # Mark some hosts in each atomic group label as not usable.
+ # One host running, another invalid in the first group label.
+ self._do_query('UPDATE hosts SET status="Running" WHERE id=5')
+ self._do_query('UPDATE hosts SET invalid=1 WHERE id=6')
+ # One host invalid in the second group label.
+ self._do_query('UPDATE hosts SET invalid=1 WHERE id=9')
+ # Nothing to schedule when no group label has enough (2) good hosts..
+ self._create_job(atomic_group=1, synchronous=True)
+ self._dispatcher._schedule_new_jobs()
+ # There are not enough hosts in either atomic group,
+ # No more scheduling should occur.
+ self._check_for_extra_schedulings()
+
+ # Now create an atomic job that has a synch count of 1. It should
+ # schedule on exactly one of the hosts.
+ onehost_job = self._create_job(atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ self._assert_job_scheduled_on_number_of(onehost_job.id, (7, 8), 1)
+
+
+ def test_atomic_group_scheduling_no_valid_hosts(self):
+ self._do_query('UPDATE hosts SET invalid=1 WHERE id in (8,9)')
+ self._create_job(synchronous=True, metahosts=[self.label5.id],
+ atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ # no hosts in the selected group and label are valid. no schedulings.
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_metahost_works(self):
+ # Test that atomic group scheduling also obeys metahosts.
+ self._create_job(metahosts=[0], atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ # There are no atomic group hosts that also have that metahost.
+ self._check_for_extra_schedulings()
+
+ job_b = self._create_job(metahosts=[self.label5.id], atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ self._assert_job_scheduled_on(job_b.id, 8)
+ self._assert_job_scheduled_on(job_b.id, 9)
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_skips_ineligible_hosts(self):
+ # Test hosts marked ineligible for this job are not eligible.
+ # How would this ever happen anyways?
+ job = self._create_job(metahosts=[self.label4.id], atomic_group=1)
+ models.IneligibleHostQueue.objects.create(job=job, host_id=5)
+ models.IneligibleHostQueue.objects.create(job=job, host_id=6)
+ models.IneligibleHostQueue.objects.create(job=job, host_id=7)
+ self._dispatcher._schedule_new_jobs()
+ # No scheduling should occur as all desired hosts were ineligible.
+ self._check_for_extra_schedulings()
+
+
+ def test_atomic_group_scheduling_fail(self):
+ # If synch_count is > the atomic group number of machines, the job
+ # should be aborted immediately.
+ model_job = self._create_job(synchronous=True, atomic_group=1)
+ model_job.synch_count = 4
+ model_job.save()
+ job = monitor_db.Job(id=model_job.id)
+ self._dispatcher._schedule_new_jobs()
+ self._check_for_extra_schedulings()
+ queue_entries = job.get_host_queue_entries()
+ self.assertEqual(1, len(queue_entries))
+ self.assertEqual(queue_entries[0].status,
+ models.HostQueueEntry.Status.ABORTED)
+
+
+ def test_schedule_directly_on_atomic_group_host_fail(self):
+ # Scheduling a job directly on hosts in an atomic group must
+ # fail to avoid users inadvertently holding up the use of an
+ # entire atomic group by using the machines individually.
+ job = self._create_job(hosts=[5])
+ self._dispatcher._schedule_new_jobs()
+ self._check_for_extra_schedulings()
+
+
+ def test_schedule_directly_on_atomic_group_host(self):
+ # Scheduling a job directly on one host in an atomic group will
+ # work when the atomic group is listed on the HQE in addition
+ # to the host (assuming the sync count is 1).
+ job = self._create_job(hosts=[5], atomic_group=1)
+ self._dispatcher._schedule_new_jobs()
+ self._assert_job_scheduled_on(job.id, 5)
+ self._check_for_extra_schedulings()
+
+
+ def test_schedule_directly_on_atomic_group_hosts_sync2(self):
+ job = self._create_job(hosts=[5,8], atomic_group=1, synchronous=True)
+ self._dispatcher._schedule_new_jobs()
+ self._assert_job_scheduled_on(job.id, 5)
+ self._assert_job_scheduled_on(job.id, 8)
+ self._check_for_extra_schedulings()
+
+
+ def test_schedule_directly_on_atomic_group_hosts_wrong_group(self):
+ job = self._create_job(hosts=[5,8], atomic_group=2, synchronous=True)
+ self._dispatcher._schedule_new_jobs()
+ self._check_for_extra_schedulings()
+
+
def test_only_schedule_queued_entries(self):
self._create_job(metahosts=[1])
self._update_hqe(set='active=1, host_id=2')