diff options
author | ericli <ericli@592f7852-d20e-0410-864c-8624ca9c26a4> | 2011-03-02 19:37:27 +0000 |
---|---|---|
committer | ericli <ericli@592f7852-d20e-0410-864c-8624ca9c26a4> | 2011-03-02 19:37:27 +0000 |
commit | 3c2b821a3d5271f5d1dfc9e9605a3211cfccef19 (patch) | |
tree | 2ac0e85c3a0108c636512c583bbf4d32bcf6bc69 /scheduler | |
parent | 95395da47bd9f83ae7a1acc901bb0bbd9c59acd0 (diff) |
Host scheduler refactoring. Move HostScheduler out of monitor_db.
In order to facilitate site extensibility of HostScheduler we need to factor out the dependence on global variables in monitor_db. I modeled this refactoring off of monitor_db_cleanup.
The main changes I've made are as follows:
1. Move BaseHostScheduler, site import, and SchedulerError out of monitor_db. SchedulerError must be moved to prevent a cyclical dependency.
2. Convert staticmethod/classmethods in BaseHostScheduler to normal methods.
3. Fix unit tests and monitor_db to import SchedulerError from host_scheduler.
Reviewable at: http://codereview.chromium.org/6597047/
Signed-off-by: Dale Curtis <dalecurtis@google.com>
git-svn-id: svn://test.kernel.org/autotest/trunk@5275 592f7852-d20e-0410-864c-8624ca9c26a4
Diffstat (limited to 'scheduler')
-rw-r--r-- | scheduler/host_scheduler.py | 424 | ||||
-rwxr-xr-x | scheduler/monitor_db.py | 455 | ||||
-rwxr-xr-x | scheduler/monitor_db_functional_test.py | 6 | ||||
-rwxr-xr-x | scheduler/monitor_db_unittest.py | 4 |
4 files changed, 447 insertions, 442 deletions
diff --git a/scheduler/host_scheduler.py b/scheduler/host_scheduler.py new file mode 100644 index 00000000..8b56ee4b --- /dev/null +++ b/scheduler/host_scheduler.py @@ -0,0 +1,424 @@ +""" +Autotest scheduling utility. +""" + + +import logging + +from autotest_lib.client.common_lib import global_config, utils +from autotest_lib.frontend.afe import models +from autotest_lib.scheduler import metahost_scheduler, scheduler_config +from autotest_lib.scheduler import scheduler_models + + +get_site_metahost_schedulers = utils.import_site_function( + __file__, 'autotest_lib.scheduler.site_metahost_scheduler', + 'get_metahost_schedulers', lambda : ()) + + +class SchedulerError(Exception): + """Raised by HostScheduler when an inconsistent state occurs.""" + + +class BaseHostScheduler(metahost_scheduler.HostSchedulingUtility): + """Handles the logic for choosing when to run jobs and on which hosts. + + This class makes several queries to the database on each tick, building up + some auxiliary data structures and using them to determine which hosts are + eligible to run which jobs, taking into account all the various factors that + affect that. + + In the past this was done with one or two very large, complex database + queries. It has proven much simpler and faster to build these auxiliary + data structures and perform the logic in Python. + """ + def __init__(self, db): + self._db = db + self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers() + + # load site-specific scheduler selected in global_config + site_schedulers_str = global_config.global_config.get_config_value( + scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers', + default='') + site_schedulers = set(site_schedulers_str.split(',')) + for scheduler in get_site_metahost_schedulers(): + if type(scheduler).__name__ in site_schedulers: + # always prepend, so site schedulers take precedence + self._metahost_schedulers = ( + [scheduler] + self._metahost_schedulers) + logging.info('Metahost schedulers: %s', + ', '.join(type(scheduler).__name__ for scheduler + in self._metahost_schedulers)) + + + def _get_ready_hosts(self): + # avoid any host with a currently active queue entry against it + hosts = scheduler_models.Host.fetch( + joins='LEFT JOIN afe_host_queue_entries AS active_hqe ' + 'ON (afe_hosts.id = active_hqe.host_id AND ' + 'active_hqe.active)', + where="active_hqe.host_id IS NULL " + "AND NOT afe_hosts.locked " + "AND (afe_hosts.status IS NULL " + "OR afe_hosts.status = 'Ready')") + return dict((host.id, host) for host in hosts) + + + def _get_sql_id_list(self, id_list): + return ','.join(str(item_id) for item_id in id_list) + + + def _get_many2many_dict(self, query, id_list, flip=False): + if not id_list: + return {} + query %= self._get_sql_id_list(id_list) + rows = self._db.execute(query) + return self._process_many2many_dict(rows, flip) + + + def _process_many2many_dict(self, rows, flip=False): + result = {} + for row in rows: + left_id, right_id = int(row[0]), int(row[1]) + if flip: + left_id, right_id = right_id, left_id + result.setdefault(left_id, set()).add(right_id) + return result + + + def _get_job_acl_groups(self, job_ids): + query = """ + SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id + FROM afe_jobs + INNER JOIN afe_users ON afe_users.login = afe_jobs.owner + INNER JOIN afe_acl_groups_users ON + afe_acl_groups_users.user_id = afe_users.id + WHERE afe_jobs.id IN (%s) + """ + return self._get_many2many_dict(query, job_ids) + + + def _get_job_ineligible_hosts(self, job_ids): + query = """ + SELECT job_id, host_id + FROM afe_ineligible_host_queues + WHERE job_id IN (%s) + """ + return self._get_many2many_dict(query, job_ids) + + + def _get_job_dependencies(self, job_ids): + query = """ + SELECT job_id, label_id + FROM afe_jobs_dependency_labels + WHERE job_id IN (%s) + """ + return self._get_many2many_dict(query, job_ids) + + + def _get_host_acls(self, host_ids): + query = """ + SELECT host_id, aclgroup_id + FROM afe_acl_groups_hosts + WHERE host_id IN (%s) + """ + return self._get_many2many_dict(query, host_ids) + + + def _get_label_hosts(self, host_ids): + if not host_ids: + return {}, {} + query = """ + SELECT label_id, host_id + FROM afe_hosts_labels + WHERE host_id IN (%s) + """ % self._get_sql_id_list(host_ids) + rows = self._db.execute(query) + labels_to_hosts = self._process_many2many_dict(rows) + hosts_to_labels = self._process_many2many_dict(rows, flip=True) + return labels_to_hosts, hosts_to_labels + + + def _get_labels(self): + return dict((label.id, label) for label + in scheduler_models.Label.fetch()) + + + def recovery_on_startup(self): + for metahost_scheduler in self._metahost_schedulers: + metahost_scheduler.recovery_on_startup() + + + def refresh(self, pending_queue_entries): + self._hosts_available = self._get_ready_hosts() + + relevant_jobs = [queue_entry.job_id + for queue_entry in pending_queue_entries] + self._job_acls = self._get_job_acl_groups(relevant_jobs) + self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs) + self._job_dependencies = self._get_job_dependencies(relevant_jobs) + + host_ids = self._hosts_available.keys() + self._host_acls = self._get_host_acls(host_ids) + self._label_hosts, self._host_labels = self._get_label_hosts(host_ids) + + self._labels = self._get_labels() + + + def tick(self): + for metahost_scheduler in self._metahost_schedulers: + metahost_scheduler.tick() + + + def hosts_in_label(self, label_id): + return set(self._label_hosts.get(label_id, ())) + + + def remove_host_from_label(self, host_id, label_id): + self._label_hosts[label_id].remove(host_id) + + + def pop_host(self, host_id): + return self._hosts_available.pop(host_id) + + + def ineligible_hosts_for_entry(self, queue_entry): + return set(self._ineligible_hosts.get(queue_entry.job_id, ())) + + + def _is_acl_accessible(self, host_id, queue_entry): + job_acls = self._job_acls.get(queue_entry.job_id, set()) + host_acls = self._host_acls.get(host_id, set()) + return len(host_acls.intersection(job_acls)) > 0 + + + def _check_job_dependencies(self, job_dependencies, host_labels): + missing = job_dependencies - host_labels + return len(missing) == 0 + + + def _check_only_if_needed_labels(self, job_dependencies, host_labels, + queue_entry): + if not queue_entry.meta_host: + # bypass only_if_needed labels when a specific host is selected + return True + + for label_id in host_labels: + label = self._labels[label_id] + if not label.only_if_needed: + # we don't care about non-only_if_needed labels + continue + if queue_entry.meta_host == label_id: + # if the label was requested in a metahost it's OK + continue + if label_id not in job_dependencies: + return False + return True + + + def _check_atomic_group_labels(self, host_labels, queue_entry): + """ + Determine if the given HostQueueEntry's atomic group settings are okay + to schedule on a host with the given labels. + + @param host_labels: A list of label ids that the host has. + @param queue_entry: The HostQueueEntry being considered for the host. + + @returns True if atomic group settings are okay, False otherwise. + """ + return (self._get_host_atomic_group_id(host_labels, queue_entry) == + queue_entry.atomic_group_id) + + + def _get_host_atomic_group_id(self, host_labels, queue_entry=None): + """ + Return the atomic group label id for a host with the given set of + labels if any, or None otherwise. Raises an exception if more than + one atomic group are found in the set of labels. + + @param host_labels: A list of label ids that the host has. + @param queue_entry: The HostQueueEntry we're testing. Only used for + extra info in a potential logged error message. + + @returns The id of the atomic group found on a label in host_labels + or None if no atomic group label is found. + """ + atomic_labels = [self._labels[label_id] for label_id in host_labels + if self._labels[label_id].atomic_group_id is not None] + atomic_ids = set(label.atomic_group_id for label in atomic_labels) + if not atomic_ids: + return None + if len(atomic_ids) > 1: + logging.error('More than one Atomic Group on HQE "%s" via: %r', + queue_entry, atomic_labels) + return atomic_ids.pop() + + + def _get_atomic_group_labels(self, atomic_group_id): + """ + Lookup the label ids that an atomic_group is associated with. + + @param atomic_group_id - The id of the AtomicGroup to look up. + + @returns A generator yeilding Label ids for this atomic group. + """ + return (id for id, label in self._labels.iteritems() + if label.atomic_group_id == atomic_group_id + and not label.invalid) + + + def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry): + """ + @param group_hosts - A sequence of Host ids to test for usability + and eligibility against the Job associated with queue_entry. + @param queue_entry - The HostQueueEntry that these hosts are being + tested for eligibility against. + + @returns A subset of group_hosts Host ids that are eligible for the + supplied queue_entry. + """ + return set(host_id for host_id in group_hosts + if self.is_host_usable(host_id) + and self.is_host_eligible_for_job(host_id, queue_entry)) + + + def is_host_eligible_for_job(self, host_id, queue_entry): + if self._is_host_invalid(host_id): + # if an invalid host is scheduled for a job, it's a one-time host + # and it therefore bypasses eligibility checks. note this can only + # happen for non-metahosts, because invalid hosts have their label + # relationships cleared. + return True + + job_dependencies = self._job_dependencies.get(queue_entry.job_id, set()) + host_labels = self._host_labels.get(host_id, set()) + + return (self._is_acl_accessible(host_id, queue_entry) and + self._check_job_dependencies(job_dependencies, host_labels) and + self._check_only_if_needed_labels( + job_dependencies, host_labels, queue_entry) and + self._check_atomic_group_labels(host_labels, queue_entry)) + + + def _is_host_invalid(self, host_id): + host_object = self._hosts_available.get(host_id, None) + return host_object and host_object.invalid + + + def _schedule_non_metahost(self, queue_entry): + if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry): + return None + return self._hosts_available.pop(queue_entry.host_id, None) + + + def is_host_usable(self, host_id): + if host_id not in self._hosts_available: + # host was already used during this scheduling cycle + return False + if self._hosts_available[host_id].invalid: + # Invalid hosts cannot be used for metahosts. They're included in + # the original query because they can be used by non-metahosts. + return False + return True + + + def schedule_entry(self, queue_entry): + if queue_entry.host_id is not None: + return self._schedule_non_metahost(queue_entry) + + for scheduler in self._metahost_schedulers: + if scheduler.can_schedule_metahost(queue_entry): + scheduler.schedule_metahost(queue_entry, self) + return None + + raise SchedulerError('No metahost scheduler to handle %s' % queue_entry) + + + def find_eligible_atomic_group(self, queue_entry): + """ + Given an atomic group host queue entry, locate an appropriate group + of hosts for the associated job to run on. + + The caller is responsible for creating new HQEs for the additional + hosts returned in order to run the actual job on them. + + @returns A list of Host instances in a ready state to satisfy this + atomic group scheduling. Hosts will all belong to the same + atomic group label as specified by the queue_entry. + An empty list will be returned if no suitable atomic + group could be found. + + TODO(gps): what is responsible for kicking off any attempted repairs on + a group of hosts? not this function, but something needs to. We do + not communicate that reason for returning [] outside of here... + For now, we'll just be unschedulable if enough hosts within one group + enter Repair Failed state. + """ + assert queue_entry.atomic_group_id is not None + job = queue_entry.job + assert job.synch_count and job.synch_count > 0 + atomic_group = queue_entry.atomic_group + if job.synch_count > atomic_group.max_number_of_machines: + # Such a Job and HostQueueEntry should never be possible to + # create using the frontend. Regardless, we can't process it. + # Abort it immediately and log an error on the scheduler. + queue_entry.set_status(models.HostQueueEntry.Status.ABORTED) + logging.error( + 'Error: job %d synch_count=%d > requested atomic_group %d ' + 'max_number_of_machines=%d. Aborted host_queue_entry %d.', + job.id, job.synch_count, atomic_group.id, + atomic_group.max_number_of_machines, queue_entry.id) + return [] + hosts_in_label = self.hosts_in_label(queue_entry.meta_host) + ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry) + + # Look in each label associated with atomic_group until we find one with + # enough hosts to satisfy the job. + for atomic_label_id in self._get_atomic_group_labels(atomic_group.id): + group_hosts = set(self.hosts_in_label(atomic_label_id)) + if queue_entry.meta_host is not None: + # If we have a metahost label, only allow its hosts. + group_hosts.intersection_update(hosts_in_label) + group_hosts -= ineligible_host_ids + eligible_host_ids_in_group = self._get_eligible_host_ids_in_group( + group_hosts, queue_entry) + + # Job.synch_count is treated as "minimum synch count" when + # scheduling for an atomic group of hosts. The atomic group + # number of machines is the maximum to pick out of a single + # atomic group label for scheduling at one time. + min_hosts = job.synch_count + max_hosts = atomic_group.max_number_of_machines + + if len(eligible_host_ids_in_group) < min_hosts: + # Not enough eligible hosts in this atomic group label. + continue + + eligible_hosts_in_group = [self._hosts_available[id] + for id in eligible_host_ids_in_group] + # So that they show up in a sane order when viewing the job. + eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort) + + # Limit ourselves to scheduling the atomic group size. + if len(eligible_hosts_in_group) > max_hosts: + eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts] + + # Remove the selected hosts from our cached internal state + # of available hosts in order to return the Host objects. + host_list = [] + for host in eligible_hosts_in_group: + hosts_in_label.discard(host.id) + self._hosts_available.pop(host.id) + host_list.append(host) + return host_list + + return [] + + +site_host_scheduler = utils.import_site_class( + __file__, 'autotest_lib.scheduler.site_host_scheduler', + 'site_host_scheduler', BaseHostScheduler) + + +class HostScheduler(site_host_scheduler): + pass diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py index f8ecf344..6be4af2d 100755 --- a/scheduler/monitor_db.py +++ b/scheduler/monitor_db.py @@ -23,9 +23,8 @@ from autotest_lib.database import database_connection from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection from autotest_lib.frontend.afe import model_attributes from autotest_lib.scheduler import drone_manager, drones, email_manager -from autotest_lib.scheduler import monitor_db_cleanup +from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup from autotest_lib.scheduler import status_server, scheduler_config -from autotest_lib.scheduler import gc_stats, metahost_scheduler from autotest_lib.scheduler import scheduler_models BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter' PID_FILE_PREFIX = 'monitor_db' @@ -75,15 +74,11 @@ def _site_init_monitor_db_dummy(): return {} -get_site_metahost_schedulers = utils.import_site_function( - __file__, 'autotest_lib.scheduler.site_metahost_scheduler', - 'get_metahost_schedulers', lambda : ()) - - def _verify_default_drone_set_exists(): if (models.DroneSet.drone_sets_enabled() and not models.DroneSet.default_drone_set_name()): - raise SchedulerError('Drone sets are enabled, but no default is set') + raise host_scheduler.SchedulerError( + 'Drone sets are enabled, but no default is set') def _sanity_check(): @@ -256,427 +251,11 @@ def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None, return autoserv_argv + extra_args -class SchedulerError(Exception): - """Raised by HostScheduler when an inconsistent state occurs.""" - - -class BaseHostScheduler(metahost_scheduler.HostSchedulingUtility): - """Handles the logic for choosing when to run jobs and on which hosts. - - This class makes several queries to the database on each tick, building up - some auxiliary data structures and using them to determine which hosts are - eligible to run which jobs, taking into account all the various factors that - affect that. - - In the past this was done with one or two very large, complex database - queries. It has proven much simpler and faster to build these auxiliary - data structures and perform the logic in Python. - """ - def __init__(self): - self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers() - - # load site-specific scheduler selected in global_config - site_schedulers_str = global_config.global_config.get_config_value( - scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers', - default='') - site_schedulers = set(site_schedulers_str.split(',')) - for scheduler in get_site_metahost_schedulers(): - if type(scheduler).__name__ in site_schedulers: - # always prepend, so site schedulers take precedence - self._metahost_schedulers = ( - [scheduler] + self._metahost_schedulers) - logging.info('Metahost schedulers: %s', - ', '.join(type(scheduler).__name__ for scheduler - in self._metahost_schedulers)) - - - def _get_ready_hosts(self): - # avoid any host with a currently active queue entry against it - hosts = scheduler_models.Host.fetch( - joins='LEFT JOIN afe_host_queue_entries AS active_hqe ' - 'ON (afe_hosts.id = active_hqe.host_id AND ' - 'active_hqe.active)', - where="active_hqe.host_id IS NULL " - "AND NOT afe_hosts.locked " - "AND (afe_hosts.status IS NULL " - "OR afe_hosts.status = 'Ready')") - return dict((host.id, host) for host in hosts) - - - @staticmethod - def _get_sql_id_list(id_list): - return ','.join(str(item_id) for item_id in id_list) - - - @classmethod - def _get_many2many_dict(cls, query, id_list, flip=False): - if not id_list: - return {} - query %= cls._get_sql_id_list(id_list) - rows = _db.execute(query) - return cls._process_many2many_dict(rows, flip) - - - @staticmethod - def _process_many2many_dict(rows, flip=False): - result = {} - for row in rows: - left_id, right_id = int(row[0]), int(row[1]) - if flip: - left_id, right_id = right_id, left_id - result.setdefault(left_id, set()).add(right_id) - return result - - - @classmethod - def _get_job_acl_groups(cls, job_ids): - query = """ - SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id - FROM afe_jobs - INNER JOIN afe_users ON afe_users.login = afe_jobs.owner - INNER JOIN afe_acl_groups_users ON - afe_acl_groups_users.user_id = afe_users.id - WHERE afe_jobs.id IN (%s) - """ - return cls._get_many2many_dict(query, job_ids) - - - @classmethod - def _get_job_ineligible_hosts(cls, job_ids): - query = """ - SELECT job_id, host_id - FROM afe_ineligible_host_queues - WHERE job_id IN (%s) - """ - return cls._get_many2many_dict(query, job_ids) - - - @classmethod - def _get_job_dependencies(cls, job_ids): - query = """ - SELECT job_id, label_id - FROM afe_jobs_dependency_labels - WHERE job_id IN (%s) - """ - return cls._get_many2many_dict(query, job_ids) - - - @classmethod - def _get_host_acls(cls, host_ids): - query = """ - SELECT host_id, aclgroup_id - FROM afe_acl_groups_hosts - WHERE host_id IN (%s) - """ - return cls._get_many2many_dict(query, host_ids) - - - @classmethod - def _get_label_hosts(cls, host_ids): - if not host_ids: - return {}, {} - query = """ - SELECT label_id, host_id - FROM afe_hosts_labels - WHERE host_id IN (%s) - """ % cls._get_sql_id_list(host_ids) - rows = _db.execute(query) - labels_to_hosts = cls._process_many2many_dict(rows) - hosts_to_labels = cls._process_many2many_dict(rows, flip=True) - return labels_to_hosts, hosts_to_labels - - - @classmethod - def _get_labels(cls): - return dict((label.id, label) for label - in scheduler_models.Label.fetch()) - - - def recovery_on_startup(self): - for metahost_scheduler in self._metahost_schedulers: - metahost_scheduler.recovery_on_startup() - - - def refresh(self, pending_queue_entries): - self._hosts_available = self._get_ready_hosts() - - relevant_jobs = [queue_entry.job_id - for queue_entry in pending_queue_entries] - self._job_acls = self._get_job_acl_groups(relevant_jobs) - self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs) - self._job_dependencies = self._get_job_dependencies(relevant_jobs) - - host_ids = self._hosts_available.keys() - self._host_acls = self._get_host_acls(host_ids) - self._label_hosts, self._host_labels = self._get_label_hosts(host_ids) - - self._labels = self._get_labels() - - - def tick(self): - for metahost_scheduler in self._metahost_schedulers: - metahost_scheduler.tick() - - - def hosts_in_label(self, label_id): - return set(self._label_hosts.get(label_id, ())) - - - def remove_host_from_label(self, host_id, label_id): - self._label_hosts[label_id].remove(host_id) - - - def pop_host(self, host_id): - return self._hosts_available.pop(host_id) - - - def ineligible_hosts_for_entry(self, queue_entry): - return set(self._ineligible_hosts.get(queue_entry.job_id, ())) - - - def _is_acl_accessible(self, host_id, queue_entry): - job_acls = self._job_acls.get(queue_entry.job_id, set()) - host_acls = self._host_acls.get(host_id, set()) - return len(host_acls.intersection(job_acls)) > 0 - - - def _check_job_dependencies(self, job_dependencies, host_labels): - missing = job_dependencies - host_labels - return len(missing) == 0 - - - def _check_only_if_needed_labels(self, job_dependencies, host_labels, - queue_entry): - if not queue_entry.meta_host: - # bypass only_if_needed labels when a specific host is selected - return True - - for label_id in host_labels: - label = self._labels[label_id] - if not label.only_if_needed: - # we don't care about non-only_if_needed labels - continue - if queue_entry.meta_host == label_id: - # if the label was requested in a metahost it's OK - continue - if label_id not in job_dependencies: - return False - return True - - - def _check_atomic_group_labels(self, host_labels, queue_entry): - """ - Determine if the given HostQueueEntry's atomic group settings are okay - to schedule on a host with the given labels. - - @param host_labels: A list of label ids that the host has. - @param queue_entry: The HostQueueEntry being considered for the host. - - @returns True if atomic group settings are okay, False otherwise. - """ - return (self._get_host_atomic_group_id(host_labels, queue_entry) == - queue_entry.atomic_group_id) - - - def _get_host_atomic_group_id(self, host_labels, queue_entry=None): - """ - Return the atomic group label id for a host with the given set of - labels if any, or None otherwise. Raises an exception if more than - one atomic group are found in the set of labels. - - @param host_labels: A list of label ids that the host has. - @param queue_entry: The HostQueueEntry we're testing. Only used for - extra info in a potential logged error message. - - @returns The id of the atomic group found on a label in host_labels - or None if no atomic group label is found. - """ - atomic_labels = [self._labels[label_id] for label_id in host_labels - if self._labels[label_id].atomic_group_id is not None] - atomic_ids = set(label.atomic_group_id for label in atomic_labels) - if not atomic_ids: - return None - if len(atomic_ids) > 1: - logging.error('More than one Atomic Group on HQE "%s" via: %r', - queue_entry, atomic_labels) - return atomic_ids.pop() - - - def _get_atomic_group_labels(self, atomic_group_id): - """ - Lookup the label ids that an atomic_group is associated with. - - @param atomic_group_id - The id of the AtomicGroup to look up. - - @returns A generator yeilding Label ids for this atomic group. - """ - return (id for id, label in self._labels.iteritems() - if label.atomic_group_id == atomic_group_id - and not label.invalid) - - - def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry): - """ - @param group_hosts - A sequence of Host ids to test for usability - and eligibility against the Job associated with queue_entry. - @param queue_entry - The HostQueueEntry that these hosts are being - tested for eligibility against. - - @returns A subset of group_hosts Host ids that are eligible for the - supplied queue_entry. - """ - return set(host_id for host_id in group_hosts - if self.is_host_usable(host_id) - and self.is_host_eligible_for_job(host_id, queue_entry)) - - - def is_host_eligible_for_job(self, host_id, queue_entry): - if self._is_host_invalid(host_id): - # if an invalid host is scheduled for a job, it's a one-time host - # and it therefore bypasses eligibility checks. note this can only - # happen for non-metahosts, because invalid hosts have their label - # relationships cleared. - return True - - job_dependencies = self._job_dependencies.get(queue_entry.job_id, set()) - host_labels = self._host_labels.get(host_id, set()) - - return (self._is_acl_accessible(host_id, queue_entry) and - self._check_job_dependencies(job_dependencies, host_labels) and - self._check_only_if_needed_labels( - job_dependencies, host_labels, queue_entry) and - self._check_atomic_group_labels(host_labels, queue_entry)) - - - def _is_host_invalid(self, host_id): - host_object = self._hosts_available.get(host_id, None) - return host_object and host_object.invalid - - - def _schedule_non_metahost(self, queue_entry): - if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry): - return None - return self._hosts_available.pop(queue_entry.host_id, None) - - - def is_host_usable(self, host_id): - if host_id not in self._hosts_available: - # host was already used during this scheduling cycle - return False - if self._hosts_available[host_id].invalid: - # Invalid hosts cannot be used for metahosts. They're included in - # the original query because they can be used by non-metahosts. - return False - return True - - - def schedule_entry(self, queue_entry): - if queue_entry.host_id is not None: - return self._schedule_non_metahost(queue_entry) - - for scheduler in self._metahost_schedulers: - if scheduler.can_schedule_metahost(queue_entry): - scheduler.schedule_metahost(queue_entry, self) - return None - - raise SchedulerError('No metahost scheduler to handle %s' % queue_entry) - - - def find_eligible_atomic_group(self, queue_entry): - """ - Given an atomic group host queue entry, locate an appropriate group - of hosts for the associated job to run on. - - The caller is responsible for creating new HQEs for the additional - hosts returned in order to run the actual job on them. - - @returns A list of Host instances in a ready state to satisfy this - atomic group scheduling. Hosts will all belong to the same - atomic group label as specified by the queue_entry. - An empty list will be returned if no suitable atomic - group could be found. - - TODO(gps): what is responsible for kicking off any attempted repairs on - a group of hosts? not this function, but something needs to. We do - not communicate that reason for returning [] outside of here... - For now, we'll just be unschedulable if enough hosts within one group - enter Repair Failed state. - """ - assert queue_entry.atomic_group_id is not None - job = queue_entry.job - assert job.synch_count and job.synch_count > 0 - atomic_group = queue_entry.atomic_group - if job.synch_count > atomic_group.max_number_of_machines: - # Such a Job and HostQueueEntry should never be possible to - # create using the frontend. Regardless, we can't process it. - # Abort it immediately and log an error on the scheduler. - queue_entry.set_status(models.HostQueueEntry.Status.ABORTED) - logging.error( - 'Error: job %d synch_count=%d > requested atomic_group %d ' - 'max_number_of_machines=%d. Aborted host_queue_entry %d.', - job.id, job.synch_count, atomic_group.id, - atomic_group.max_number_of_machines, queue_entry.id) - return [] - hosts_in_label = self.hosts_in_label(queue_entry.meta_host) - ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry) - - # Look in each label associated with atomic_group until we find one with - # enough hosts to satisfy the job. - for atomic_label_id in self._get_atomic_group_labels(atomic_group.id): - group_hosts = set(self.hosts_in_label(atomic_label_id)) - if queue_entry.meta_host is not None: - # If we have a metahost label, only allow its hosts. - group_hosts.intersection_update(hosts_in_label) - group_hosts -= ineligible_host_ids - eligible_host_ids_in_group = self._get_eligible_host_ids_in_group( - group_hosts, queue_entry) - - # Job.synch_count is treated as "minimum synch count" when - # scheduling for an atomic group of hosts. The atomic group - # number of machines is the maximum to pick out of a single - # atomic group label for scheduling at one time. - min_hosts = job.synch_count - max_hosts = atomic_group.max_number_of_machines - - if len(eligible_host_ids_in_group) < min_hosts: - # Not enough eligible hosts in this atomic group label. - continue - - eligible_hosts_in_group = [self._hosts_available[id] - for id in eligible_host_ids_in_group] - # So that they show up in a sane order when viewing the job. - eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort) - - # Limit ourselves to scheduling the atomic group size. - if len(eligible_hosts_in_group) > max_hosts: - eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts] - - # Remove the selected hosts from our cached internal state - # of available hosts in order to return the Host objects. - host_list = [] - for host in eligible_hosts_in_group: - hosts_in_label.discard(host.id) - self._hosts_available.pop(host.id) - host_list.append(host) - return host_list - - return [] - - -site_host_scheduler = utils.import_site_class(__file__, - "autotest_lib.scheduler.site_host_scheduler", - "site_host_scheduler", BaseHostScheduler) - - -class HostScheduler(site_host_scheduler): - pass - - class Dispatcher(object): def __init__(self): self._agents = [] self._last_clean_time = time.time() - self._host_scheduler = HostScheduler() + self._host_scheduler = host_scheduler.HostScheduler(_db) user_cleanup_time = scheduler_config.config.clean_interval self._periodic_cleanup = monitor_db_cleanup.UserCleanup( _db, user_cleanup_time) @@ -865,9 +444,9 @@ class Dispatcher(object): if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING: return ArchiveResultsTask(queue_entries=task_entries) - raise SchedulerError('_get_agent_task_for_queue_entry got entry with ' - 'invalid status %s: %s' - % (queue_entry.status, queue_entry)) + raise host_scheduler.SchedulerError( + '_get_agent_task_for_queue_entry got entry with ' + 'invalid status %s: %s' % (queue_entry.status, queue_entry)) def _check_for_duplicate_host_entries(self, task_entries): @@ -886,7 +465,7 @@ class Dispatcher(object): """ if self.host_has_agent(entry.host): agent = tuple(self._host_agents.get(entry.host.id))[0] - raise SchedulerError( + raise host_scheduler.SchedulerError( 'While scheduling %s, host %s already has a host agent %s' % (entry, entry.host, agent.task)) @@ -905,7 +484,8 @@ class Dispatcher(object): if agent_task_class.TASK_TYPE == special_task.task: return agent_task_class(task=special_task) - raise SchedulerError('No AgentTask class for task', str(special_task)) + raise host_scheduler.SchedulerError( + 'No AgentTask class for task', str(special_task)) def _register_pidfiles(self, agent_tasks): @@ -970,7 +550,7 @@ class Dispatcher(object): if unrecovered_hqes: message = '\n'.join(str(hqe) for hqe in unrecovered_hqes) - raise SchedulerError( + raise host_scheduler.SchedulerError( '%d unrecovered verifying host queue entries:\n%s' % (len(unrecovered_hqes), message)) @@ -1779,16 +1359,17 @@ class AgentTask(object): class_name = self.__class__.__name__ for entry in queue_entries: if entry.status not in allowed_hqe_statuses: - raise SchedulerError('%s attempting to start ' - 'entry with invalid status %s: %s' - % (class_name, entry.status, entry)) + raise host_scheduler.SchedulerError( + '%s attempting to start entry with invalid status %s: ' + '%s' % (class_name, entry.status, entry)) invalid_host_status = ( allowed_host_statuses is not None and entry.host.status not in allowed_host_statuses) if invalid_host_status: - raise SchedulerError('%s attempting to start on queue ' - 'entry with invalid host status %s: %s' - % (class_name, entry.host.status, entry)) + raise host_scheduler.SchedulerError( + '%s attempting to start on queue entry with invalid ' + 'host status %s: %s' + % (class_name, entry.host.status, entry)) class TaskWithJobKeyvals(object): diff --git a/scheduler/monitor_db_functional_test.py b/scheduler/monitor_db_functional_test.py index e1d74074..54cda8ed 100755 --- a/scheduler/monitor_db_functional_test.py +++ b/scheduler/monitor_db_functional_test.py @@ -7,8 +7,8 @@ from autotest_lib.database import database_connection from autotest_lib.frontend import setup_django_environment from autotest_lib.frontend.afe import frontend_test_utils, models from autotest_lib.frontend.afe import model_attributes -from autotest_lib.scheduler import drone_manager, email_manager, monitor_db -from autotest_lib.scheduler import scheduler_models +from autotest_lib.scheduler import drone_manager, email_manager, host_scheduler +from autotest_lib.scheduler import monitor_db, scheduler_models # translations necessary for scheduler queries to work with SQLite _re_translator = database_connection.TranslatingDatabase.make_regexp_translator @@ -731,7 +731,7 @@ class SchedulerFunctionalTest(unittest.TestCase, is_complete=True, requested_by=models.User.current_user()) - self.assertRaises(monitor_db.SchedulerError, self._initialize_test) + self.assertRaises(host_scheduler.SchedulerError, self._initialize_test) def _test_recover_verifying_hqe_helper(self, task, pidfile_type): diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py index 38ea84dd..cc655736 100755 --- a/scheduler/monitor_db_unittest.py +++ b/scheduler/monitor_db_unittest.py @@ -9,7 +9,7 @@ from autotest_lib.client.common_lib.test_utils import unittest from autotest_lib.database import database_connection from autotest_lib.frontend.afe import models from autotest_lib.scheduler import monitor_db, drone_manager, email_manager -from autotest_lib.scheduler import scheduler_config, gc_stats +from autotest_lib.scheduler import scheduler_config, gc_stats, host_scheduler from autotest_lib.scheduler import monitor_db_functional_test from autotest_lib.scheduler import scheduler_models @@ -1298,7 +1298,7 @@ class JobSchedulingTest(BaseSchedulerTest): dummy_test_agent) # Attempted to schedule on a host that already has an agent. - self.assertRaises(monitor_db.SchedulerError, + self.assertRaises(host_scheduler.SchedulerError, self._dispatcher._schedule_running_host_queue_entries) |