diff options
author | showard <showard@592f7852-d20e-0410-864c-8624ca9c26a4> | 2009-09-08 16:26:33 +0000 |
---|---|---|
committer | showard <showard@592f7852-d20e-0410-864c-8624ca9c26a4> | 2009-09-08 16:26:33 +0000 |
commit | 0c8996bc5170274e90f6ea7730dc6f7fef84984c (patch) | |
tree | fcd062c853852f0aea40fc5f5a1f4f7c7317f39b | |
parent | 32ab9a5888cb423dfa11632e454c89aee70e89eb (diff) |
Make scheduler more stateless. Agents are now scheduled only by the
dispatcher, and agents no longer use in-memory state to remember
multiple tasks. All state is managed by the database.
Risk: high (large scheduler change)
Visibility: medium (scheduler restarts are now more stable)
Signed-off-by: James Ren <jamesren@google.com>
git-svn-id: svn://test.kernel.org/autotest/trunk@3664 592f7852-d20e-0410-864c-8624ca9c26a4
-rw-r--r-- | frontend/afe/models.py | 42 | ||||
-rwxr-xr-x | frontend/afe/models_test.py | 16 | ||||
-rw-r--r-- | frontend/afe/rpc_interface.py | 3 | ||||
-rwxr-xr-x | scheduler/monitor_db.py | 747 | ||||
-rwxr-xr-x | scheduler/monitor_db_unittest.py | 520 |
5 files changed, 658 insertions, 670 deletions
diff --git a/frontend/afe/models.py b/frontend/afe/models.py index 45b078aa..99855e61 100644 --- a/frontend/afe/models.py +++ b/frontend/afe/models.py @@ -742,7 +742,8 @@ class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions): class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions): Status = enum.Enum('Queued', 'Starting', 'Verifying', 'Pending', 'Running', 'Gathering', 'Parsing', 'Aborted', 'Completed', - 'Failed', 'Stopped', 'Template', string_values=True) + 'Failed', 'Stopped', 'Template', 'Waiting', + string_values=True) ACTIVE_STATUSES = (Status.STARTING, Status.VERIFYING, Status.PENDING, Status.RUNNING, Status.GATHERING) COMPLETE_STATUSES = (Status.ABORTED, Status.COMPLETED, Status.FAILED, @@ -983,45 +984,6 @@ class SpecialTask(dbmodels.Model, model_logic.ModelExtensions): special_task.save() - @classmethod - def prepare(cls, agent, task): - """ - Creates a new special task if necessary and prepares it to be run. - - @param agent: The scheduler.monitor_db.AgentTask handling this task. - It is expected to have a TASK_TYPE, host and queue_entry - attributes. - @param task: SpecialTask instance to prepare, or None if a new - SpecialTask should be created. - - @returns task or the newly created SpecialTask. - """ - # TODO(gps): This method really belongs in scheduler/monitor_db.py. - # It accesses scheduler specific instance internals! - if not task: - if not hasattr(agent, 'TASK_TYPE'): - raise ValueError('Can only prepare special tasks for ' - 'verify, cleanup, or repair') - - host = Host.objects.get(id=agent.host.id) - queue_entry = None - if agent.queue_entry: - queue_entry = ( - HostQueueEntry.objects.get(id=agent.queue_entry.id)) - - active_tasks = cls.objects.filter(host=host, is_active=True) - if active_tasks.count(): - raise model_logic.ValidationError( - 'Active SpecialTask already exists for host %s. ' - 'Task %s must not be created. Existing tasks are: ' - '%s.' % (host, agent.TASK_TYPE, list(active_tasks))) - - task = cls.objects.create(host=host, task=agent.TASK_TYPE, - queue_entry=queue_entry) - - return task - - def activate(self): """ Sets a task as active and sets the time started to the current time. diff --git a/frontend/afe/models_test.py b/frontend/afe/models_test.py index 143853bb..9f4d99da 100755 --- a/frontend/afe/models_test.py +++ b/frontend/afe/models_test.py @@ -85,21 +85,5 @@ class SpecialTaskUnittest(unittest.TestCase, self.assertTrue(task.is_complete) - def test_prepare(self): - self.assertEqual(13, models.SpecialTask.prepare(agent=None, task=13)) - class DummyAgentTask(object): - host = None - queue_entry = None - self.assertRaises(ValueError, models.SpecialTask.prepare, - DummyAgentTask(), None) - DummyAgentTask.TASK_TYPE = models.SpecialTask.Task.VERIFY - DummyAgentTask.host = models.Host.objects.create(hostname='hi') - task1 = models.SpecialTask.prepare(DummyAgentTask(), None) - task1.activate() - DummyAgentTask.TASK_TYPE = models.SpecialTask.Task.REPAIR - self.assertRaises(model_logic.ValidationError, - models.SpecialTask.prepare, DummyAgentTask(), None) - - if __name__ == '__main__': unittest.main() diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py index 95016ff6..e7308831 100644 --- a/frontend/afe/rpc_interface.py +++ b/frontend/afe/rpc_interface.py @@ -807,7 +807,8 @@ def get_static_data(): "Stopped": "Other host(s) failed verify", "Parsing": "Awaiting parse of final results", "Gathering": "Gathering log files", - "Template": "Template job for recurring run"} + "Template": "Template job for recurring run", + "Waiting": "Waiting for scheduler action"} return result diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py index 11f69088..bd719f00 100755 --- a/scheduler/monitor_db.py +++ b/scheduler/monitor_db.py @@ -640,9 +640,11 @@ class Dispatcher(object): _drone_manager.refresh() self._run_cleanup() self._find_aborting() - self._find_reverify() self._process_recurring_runs() + self._schedule_delay_tasks() self._schedule_new_jobs() + self._schedule_running_host_queue_entries() + self._schedule_special_tasks() self._handle_agents() _drone_manager.execute_actions() email_manager.manager.send_queued_emails() @@ -694,11 +696,17 @@ class Dispatcher(object): agent.queue_entry_ids, agent) + def _host_has_scheduled_special_task(self, host): + return bool(models.SpecialTask.objects.filter(host__id=host.id, + is_active=False, + is_complete=False)) + + def _recover_processes(self): self._register_pidfiles() _drone_manager.refresh() self._recover_all_recoverable_entries() - self._requeue_starting_entries() + self._recover_pending_entries() self._check_for_remaining_active_entries() self._reverify_remaining_hosts() # reinitialize drones after killing orphaned processes, since they can @@ -730,16 +738,22 @@ class Dispatcher(object): return None, 'without process' + def _get_unassigned_entries(self, status): + for entry in HostQueueEntry.fetch(where="status = '%s'" % status): + if not self.get_agents_for_entry(entry): + yield entry + + def _recover_entries_with_status(self, status, orphans, pidfile_name, recover_entries_fn): - queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status) - for queue_entry in queue_entries: - if self.get_agents_for_entry(queue_entry): - # synchronous job we've already recovered - continue + for queue_entry in self._get_unassigned_entries(status): queue_entries = queue_entry.job.get_group_entries(queue_entry) run_monitor, process_string = self._get_recovery_run_monitor( queue_entry.execution_path(), pidfile_name, orphans) + if not run_monitor: + # _schedule_running_host_queue_entries should schedule and + # recover these entries + continue logging.info('Recovering %s entry %s %s',status.lower(), ', '.join(str(entry) for entry in queue_entries), @@ -763,17 +777,10 @@ class Dispatcher(object): def _recover_running_entries(self, orphans): def recover_entries(job, queue_entries, run_monitor): - if run_monitor is not None: - queue_task = QueueTask(job=job, queue_entries=queue_entries, - recover_run_monitor=run_monitor) - self.add_agent(Agent(tasks=[queue_task], - num_processes=len(queue_entries))) - else: - # we could do better, but this retains legacy behavior for now - for queue_entry in queue_entries: - logging.info('Requeuing running HQE %s since it has no ' - 'process' % queue_entry) - queue_entry.requeue() + queue_task = QueueTask(job=job, queue_entries=queue_entries, + recover_run_monitor=run_monitor) + self.add_agent(Agent(task=queue_task, + num_processes=len(queue_entries))) self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING, orphans, _AUTOSERV_PID_FILE, @@ -784,7 +791,7 @@ class Dispatcher(object): def recover_entries(job, queue_entries, run_monitor): gather_task = GatherLogsTask(job, queue_entries, recover_run_monitor=run_monitor) - self.add_agent(Agent([gather_task])) + self.add_agent(Agent(gather_task)) self._recover_entries_with_status( models.HostQueueEntry.Status.GATHERING, @@ -795,13 +802,19 @@ class Dispatcher(object): def recover_entries(job, queue_entries, run_monitor): reparse_task = FinalReparseTask(queue_entries, recover_run_monitor=run_monitor) - self.add_agent(Agent([reparse_task], num_processes=0)) + self.add_agent(Agent(reparse_task, num_processes=0)) self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING, orphans, _PARSER_PID_FILE, recover_entries) + def _recover_pending_entries(self): + for entry in self._get_unassigned_entries( + models.HostQueueEntry.Status.PENDING): + entry.on_pending() + + def _recover_all_recoverable_entries(self): orphans = _drone_manager.get_orphaned_autoserv_processes() self._recover_running_entries(orphans) @@ -824,103 +837,70 @@ class Dispatcher(object): if self.host_has_agent(task.host): raise SchedulerError( "%s already has a host agent %s." % ( - task, self._host_agents.get(host.id))) - - host = Host(id=task.host.id) - queue_entry = None - if task.queue_entry: - queue_entry = HostQueueEntry(id=task.queue_entry.id) + task, self._host_agents.get(task.host.id))) run_monitor, process_string = self._get_recovery_run_monitor( task.execution_path(), _AUTOSERV_PID_FILE, orphans) logging.info('Recovering %s %s', task, process_string) - self._recover_special_task(task, host, queue_entry, run_monitor) + self._recover_special_task(task, run_monitor) - def _recover_special_task(self, task, host, queue_entry, run_monitor): + def _recover_special_task(self, task, run_monitor): """\ Recovers a single special task. """ if task.task == models.SpecialTask.Task.VERIFY: - agent_tasks = self._recover_verify(task, host, queue_entry, - run_monitor) + agent_task = self._recover_verify(task, run_monitor) elif task.task == models.SpecialTask.Task.REPAIR: - agent_tasks = self._recover_repair(task, host, queue_entry, - run_monitor) + agent_task = self._recover_repair(task, run_monitor) elif task.task == models.SpecialTask.Task.CLEANUP: - agent_tasks = self._recover_cleanup(task, host, queue_entry, - run_monitor) + agent_task = self._recover_cleanup(task, run_monitor) else: # Should never happen logging.error( "Special task id %d had invalid task %s", (task.id, task.task)) - self.add_agent(Agent(agent_tasks)) + self.add_agent(Agent(agent_task)) - def _recover_verify(self, task, host, queue_entry, run_monitor): + def _recover_verify(self, task, run_monitor): """\ Recovers a verify task. No associated queue entry: Verify host With associated queue entry: Verify host, and run associated queue entry """ - if not task.queue_entry: - return [VerifyTask(host=host, task=task, - recover_run_monitor=run_monitor)] - else: - return [VerifyTask(queue_entry=queue_entry, task=task, - recover_run_monitor=run_monitor), - SetEntryPendingTask(queue_entry=queue_entry)] + return VerifyTask(task=task, recover_run_monitor=run_monitor) - def _recover_repair(self, task, host, queue_entry, run_monitor): + def _recover_repair(self, task, run_monitor): """\ Recovers a repair task. Always repair host """ - return [RepairTask(host=host, queue_entry=queue_entry, task=task, - recover_run_monitor=run_monitor)] + return RepairTask(task=task, recover_run_monitor=run_monitor) - def _recover_cleanup(self, task, host, queue_entry, run_monitor): + def _recover_cleanup(self, task, run_monitor): """\ Recovers a cleanup task. No associated queue entry: Clean host With associated queue entry: Clean host, verify host if needed, and run associated queue entry """ - if not task.queue_entry: - return [CleanupTask(host=host, task=task, - recover_run_monitor=run_monitor)] - else: - agent_tasks = [CleanupTask(queue_entry=queue_entry, - task=task, - recover_run_monitor=run_monitor)] - if queue_entry.job.should_run_verify(queue_entry): - agent_tasks.append(VerifyTask(queue_entry=queue_entry)) - agent_tasks.append( - SetEntryPendingTask(queue_entry=queue_entry)) - return agent_tasks - - - def _requeue_starting_entries(self): - # temporary measure until we implement proper recovery of Starting HQEs - for entry in HostQueueEntry.fetch(where='status="Starting"'): - logging.info('Requeuing "Starting" queue entry %s' % entry) - assert not self.get_agents_for_entry(entry) - assert entry.host.status == models.Host.Status.PENDING - self._reverify_hosts_where('id = %s' % entry.host.id) - entry.requeue() + return CleanupTask(task=task, recover_run_monitor=run_monitor) def _check_for_remaining_active_entries(self): queue_entries = HostQueueEntry.fetch( - where='active AND NOT complete AND status != "Pending"') + where='active AND NOT complete AND status NOT IN ' + '("Starting", "Gathering", "Pending")') unrecovered_active_hqes = [entry for entry in queue_entries - if not self.get_agents_for_entry(entry)] + if not self.get_agents_for_entry(entry) and + not self._host_has_scheduled_special_task( + entry.host)] if unrecovered_active_hqes: message = '\n'.join(str(hqe) for hqe in unrecovered_active_hqes) raise SchedulerError( @@ -928,18 +908,31 @@ class Dispatcher(object): (len(unrecovered_active_hqes), message)) - def _find_reverify(self): - tasks = models.SpecialTask.objects.filter( - task=models.SpecialTask.Task.VERIFY, is_active=False, - is_complete=False, queue_entry__isnull=True) + def _schedule_special_tasks(self): + tasks = models.SpecialTask.objects.filter(is_active=False, + is_complete=False, + host__locked=False) + # We want lower ids to come first, but the NULL queue_entry_ids need to + # come last + tasks = tasks.extra(select={'isnull' : 'queue_entry_id IS NULL'}) + tasks = tasks.extra(order_by=['isnull', 'id']) for task in tasks: - host = Host.fetch(where='id = %s', params=(task.host.id,)).next() - if host.locked or host.invalid or self.host_has_agent(host): + if self.host_has_agent(task.host): + continue + + if task.task == models.SpecialTask.Task.CLEANUP: + agent_task = CleanupTask(task=task) + elif task.task == models.SpecialTask.Task.VERIFY: + agent_task = VerifyTask(task=task) + elif task.task == models.SpecialTask.Task.REPAIR: + agent_task = RepairTask(task=task) + else: + email_manager.manager.enqueue_notify_email( + 'Special task with invalid task', task) continue - logging.info('Force reverifying host %s', host.hostname) - self.add_agent(Agent([VerifyTask(host=host, task=task)])) + self.add_agent(Agent(agent_task)) def _reverify_remaining_hosts(self): @@ -948,7 +941,7 @@ class Dispatcher(object): message = ('Recovering active host %s - this probably indicates a ' 'scheduler bug') self._reverify_hosts_where( - "status IN ('Repairing', 'Verifying', 'Cleaning', 'Running')", + "status IN ('Repairing', 'Verifying', 'Cleaning')", print_message=message) @@ -959,10 +952,14 @@ class Dispatcher(object): if self.host_has_agent(host): # host has already been recovered in some way continue + if self._host_has_scheduled_special_task(host): + # host will have a special task scheduled on the next cycle + continue if print_message: logging.info(print_message, host.hostname) - tasks = host.reverify_tasks() - self.add_agent(Agent(tasks)) + models.SpecialTask.objects.create( + task=models.SpecialTask.Task.CLEANUP, + host=models.Host(id=host.id)) def _recover_hosts(self): @@ -1045,9 +1042,51 @@ class Dispatcher(object): self._schedule_atomic_group(queue_entry) + def _schedule_running_host_queue_entries(self): + entries = HostQueueEntry.fetch( + where="status IN " + "('Starting', 'Running', 'Gathering', 'Parsing')") + for entry in entries: + if self.get_agents_for_entry(entry): + continue + + task_entries = entry.job.get_group_entries(entry) + for task_entry in task_entries: + if (task_entry.status != models.HostQueueEntry.Status.PARSING + and self.host_has_agent(task_entry.host)): + agent = self._host_agents.get(task_entry.host.id)[0] + raise SchedulerError('Attempted to schedule on host that ' + 'already has agent: %s (previous ' + 'agent task: %s)' + % (task_entry, agent.task)) + + if entry.status in (models.HostQueueEntry.Status.STARTING, + models.HostQueueEntry.Status.RUNNING): + params = entry.job.get_autoserv_params(task_entries) + agent_task = QueueTask(job=entry.job, + queue_entries=task_entries, cmd=params) + elif entry.status == models.HostQueueEntry.Status.GATHERING: + agent_task = GatherLogsTask( + job=entry.job, queue_entries=task_entries) + elif entry.status == models.HostQueueEntry.Status.PARSING: + agent_task = FinalReparseTask(queue_entries=task_entries) + else: + raise SchedulerError('_schedule_running_host_queue_entries got ' + 'entry with invalid status %s: %s' + % (entry.status, entry)) + + self.add_agent(Agent(agent_task, num_processes=len(task_entries))) + + + def _schedule_delay_tasks(self): + for entry in HostQueueEntry.fetch(where="status = 'Waiting'"): + task = entry.job.schedule_delayed_callback_task(entry) + if task: + self.add_agent(Agent(task, num_processes=0)) + + def _run_queue_entry(self, queue_entry, host): - agent = queue_entry.run_pre_job_tasks(assigned_host=host) - self.add_agent(agent) + queue_entry.schedule_pre_job_tasks(assigned_host=host) def _find_aborting(self): @@ -1086,17 +1125,16 @@ class Dispatcher(object): have_reached_limit = False # iterate over copy, so we can remove agents during iteration for agent in list(self._agents): - if agent.is_done(): - logging.info("agent finished") - self.remove_agent(agent) - continue - if not agent.is_running(): + if not agent.started: if not self._can_start_agent(agent, num_started_this_cycle, have_reached_limit): have_reached_limit = True continue num_started_this_cycle += agent.num_processes agent.tick() + if agent.is_done(): + logging.info("agent finished") + self.remove_agent(agent) logging.info('%d running processes', _drone_manager.total_running_processes()) @@ -1315,7 +1353,7 @@ class PidfileRunMonitor(object): class Agent(object): """ - An agent for use by the Dispatcher class to perform a sequence of tasks. + An agent for use by the Dispatcher class to perform a task. The following methods are required on all task objects: poll() - Called periodically to let the task check its status and @@ -1326,8 +1364,6 @@ class Agent(object): The following attributes are required on all task objects: aborted - bool, True if this task was aborted. - failure_tasks - A sequence of tasks to be run using a new Agent - by the dispatcher should this task fail. success - bool, True if this task succeeded. queue_entry_ids - A sequence of HostQueueEntry ids this task handles. host_ids - A sequence of Host ids this task represents. @@ -1338,88 +1374,44 @@ class Agent(object): """ - def __init__(self, tasks, num_processes=1): + def __init__(self, task, num_processes=1): """ - @param tasks: A list of tasks as described in the class docstring. + @param task: A task as described in the class docstring. @param num_processes: The number of subprocesses the Agent represents. This is used by the Dispatcher for managing the load on the system. Defaults to 1. """ - self.active_task = None - self.queue = None + self.task = task + task.agent = self + # This is filled in by Dispatcher.add_agent() self.dispatcher = None self.num_processes = num_processes - self.queue_entry_ids = self._union_ids(task.queue_entry_ids - for task in tasks) - self.host_ids = self._union_ids(task.host_ids for task in tasks) - - self._clear_queue() - for task in tasks: - self.add_task(task) - - - def _clear_queue(self): - self.queue = Queue.Queue(0) - + self.queue_entry_ids = task.queue_entry_ids + self.host_ids = task.host_ids - def _union_ids(self, id_lists): - return set(itertools.chain(*id_lists)) - - - def add_task(self, task): - self.queue.put_nowait(task) - task.agent = self + self.started = False def tick(self): - while not self.is_done(): - if self.active_task: - self.active_task.poll() - if not self.active_task.is_done(): - return - self._next_task() - - - def _next_task(self): - logging.info("agent picking task") - if self.active_task is not None: - assert self.active_task.is_done() - if not self.active_task.success: - self.on_task_failure() - self.active_task = None - - if not self.is_done(): - self.active_task = self.queue.get_nowait() - - - def on_task_failure(self): - self._clear_queue() - # run failure tasks in a new Agent, so host_ids and queue_entry_ids will - # get reset. - new_agent = Agent(self.active_task.failure_tasks) - self.dispatcher.add_agent(new_agent) - - - def is_running(self): - return self.active_task is not None + self.started = True + if self.task: + self.task.poll() + if self.task.is_done(): + self.task = None def is_done(self): - return self.active_task is None and self.queue.empty() + return self.task is None def abort(self): - # abort tasks until the queue is empty or a task ignores the abort - while not self.is_done(): - if not self.active_task: - self._next_task() - self.active_task.abort() - if not self.active_task.aborted: + if self.task: + self.task.abort() + if self.task.aborted: # tasks can choose to ignore aborts - return - self.active_task = None + self.task = None class DelayedCallTask(object): @@ -1455,7 +1447,6 @@ class DelayedCallTask(object): # These attributes are required by Agent. self.aborted = False - self.failure_tasks = () self.host_ids = () self.success = False self.queue_entry_ids = () @@ -1464,29 +1455,24 @@ class DelayedCallTask(object): def poll(self): - if self._callback and self._now_func() >= self.end_time: - new_agent = self._callback() - if new_agent: - self.agent.dispatcher.add_agent(new_agent) - self._callback = None + if not self.is_done() and self._now_func() >= self.end_time: + self._callback() self.success = True def is_done(self): - return not self._callback + return self.success or self.aborted def abort(self): self.aborted = True - self._callback = None class AgentTask(object): - def __init__(self, cmd=None, working_directory=None, failure_tasks=[], + def __init__(self, cmd=None, working_directory=None, pidfile_name=None, paired_with_pidfile=None, recover_run_monitor=None): self.done = False - self.failure_tasks = failure_tasks self.cmd = cmd self._working_directory = working_directory self.agent = None @@ -1598,8 +1584,8 @@ class AgentTask(object): def _parse_results(self, queue_entries): - reparse_task = FinalReparseTask(queue_entries) - self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0)) + for queue_entry in queue_entries: + queue_entry.set_status(models.HostQueueEntry.Status.PARSING) def _copy_and_parse_results(self, queue_entries, use_monitor=None): @@ -1647,7 +1633,7 @@ class TaskWithJobKeyvals(object): self._write_keyval_after_job("job_finished", int(time.time())) -class SpecialAgentTask(AgentTask): +class SpecialAgentTask(AgentTask, TaskWithJobKeyvals): """ Subclass for AgentTasks that correspond to a SpecialTask entry in the DB. """ @@ -1657,9 +1643,14 @@ class SpecialAgentTask(AgentTask): queue_entry = None def __init__(self, task, extra_command_args, **kwargs): - assert self.host assert (self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden') + + self.host = Host(id=task.host.id) + self.queue_entry = None + if task.queue_entry: + self.queue_entry = HostQueueEntry(id=task.queue_entry.id) + self.task = task if task: kwargs['working_directory'] = task.execution_path() @@ -1667,9 +1658,12 @@ class SpecialAgentTask(AgentTask): super(SpecialAgentTask, self).__init__(**kwargs) + def _keyval_path(self): + return os.path.join(self._working_directory, self._KEYVAL_FILE) + + def prolog(self): super(SpecialAgentTask, self).prolog() - self.task = models.SpecialTask.prepare(self, self.task) self.cmd = _autoserv_command_line(self.host.hostname, self._extra_command_args, queue_entry=self.queue_entry) @@ -1677,86 +1671,80 @@ class SpecialAgentTask(AgentTask): self.task.activate() - def cleanup(self): - super(SpecialAgentTask, self).cleanup() + def _fail_queue_entry(self): + assert self.queue_entry - # self.task can be None if a SpecialAgentTask is aborted before the - # prolog runs - if self.task: - self.task.finish() + if self.queue_entry.meta_host: + return # don't fail metahost entries, they'll be reassigned - if self.monitor and self.monitor.has_process() and self.task: + self.queue_entry.update_from_database() + if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED: + return # entry has been aborted + + self.queue_entry.set_execution_subdir() + queued_key, queued_time = self._job_queued_keyval( + self.queue_entry.job) + self._write_keyval_after_job(queued_key, queued_time) + self._write_job_finished() + + # copy results logs into the normal place for job results + self.monitor.try_copy_results_on_drone( + source_path=self._working_directory + '/', + destination_path=self.queue_entry.execution_path() + '/') + + self._copy_results([self.queue_entry]) + self.queue_entry.handle_host_failure() + if self.queue_entry.job.parse_failed_repair: + self._parse_results([self.queue_entry]) + + pidfile_id = _drone_manager.get_pidfile_id_from( + self.queue_entry.execution_path(), + pidfile_name=_AUTOSERV_PID_FILE) + _drone_manager.register_pidfile(pidfile_id) + + + def cleanup(self): + super(SpecialAgentTask, self).cleanup() + self.task.finish() + if self.monitor and self.monitor.has_process(): self._copy_results([self.task]) -class RepairTask(SpecialAgentTask, TaskWithJobKeyvals): +class RepairTask(SpecialAgentTask): TASK_TYPE = models.SpecialTask.Task.REPAIR - def __init__(self, host, queue_entry=None, task=None, - recover_run_monitor=None): + def __init__(self, task, recover_run_monitor=None): """\ queue_entry: queue entry to mark failed if this repair fails. """ - protection = host_protections.Protection.get_string(host.protection) + protection = host_protections.Protection.get_string( + task.host.protection) # normalize the protection name protection = host_protections.Protection.get_attr_name(protection) - self.host = host - self.queue_entry = queue_entry - super(RepairTask, self).__init__( task, ['-R', '--host-protection', protection], recover_run_monitor=recover_run_monitor) # *don't* include the queue entry in IDs -- if the queue entry is # aborted, we want to leave the repair task running - self._set_ids(host=host) + self._set_ids(host=self.host) def prolog(self): super(RepairTask, self).prolog() logging.info("repair_task starting") - self.host.set_status('Repairing') - - - def _keyval_path(self): - return os.path.join(self._working_directory, self._KEYVAL_FILE) - - - def _fail_queue_entry(self): - assert self.queue_entry - - if self.queue_entry.meta_host: - return # don't fail metahost entries, they'll be reassigned - - self.queue_entry.update_from_database() - if self.queue_entry.status != 'Queued': - return # entry has been aborted - - self.queue_entry.set_execution_subdir() - queued_key, queued_time = self._job_queued_keyval( - self.queue_entry.job) - self._write_keyval_after_job(queued_key, queued_time) - self._write_job_finished() - - self.monitor.try_copy_results_on_drone( - source_path=self._working_directory + '/', - destination_path=self.queue_entry.execution_path() + '/') - - self._copy_results([self.queue_entry]) - if self.queue_entry.job.parse_failed_repair: - self._parse_results([self.queue_entry]) - self.queue_entry.handle_host_failure() + self.host.set_status(models.Host.Status.REPAIRING) def epilog(self): super(RepairTask, self).epilog() if self.success: - self.host.set_status('Ready') + self.host.set_status(models.Host.Status.READY) else: - self.host.set_status('Repair Failed') + self.host.set_status(models.Host.Status.REPAIR_FAILED) if self.queue_entry: self._fail_queue_entry() @@ -1777,26 +1765,34 @@ class PreJobTask(SpecialAgentTask): self.monitor.try_copy_to_results_repository( source, destination_path=destination) - if not self.success and self.queue_entry: - self.queue_entry.requeue() + if not self.success: + if self.queue_entry: + self.queue_entry.requeue() + if models.SpecialTask.objects.filter( + task=models.SpecialTask.Task.REPAIR, + queue_entry__id=self.queue_entry.id): + self.host.set_status(models.Host.Status.REPAIR_FAILED) + self._fail_queue_entry() + return + queue_entry = models.HostQueueEntry(id=self.queue_entry.id) + else: + queue_entry = None + + models.SpecialTask.objects.create( + host=models.Host(id=self.host.id), + task=models.SpecialTask.Task.REPAIR, + queue_entry=queue_entry) class VerifyTask(PreJobTask): TASK_TYPE = models.SpecialTask.Task.VERIFY - def __init__(self, queue_entry=None, host=None, task=None, - recover_run_monitor=None): - assert bool(queue_entry) != bool(host) - self.host = host or queue_entry.host - self.queue_entry = queue_entry - - failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)] + def __init__(self, task, recover_run_monitor=None): super(VerifyTask, self).__init__( - task, ['-v'], failure_tasks=failure_tasks, - recover_run_monitor=recover_run_monitor) + task, ['-v'], recover_run_monitor=recover_run_monitor) - self._set_ids(host=host, queue_entries=[queue_entry]) + self._set_ids(host=self.host, queue_entries=[self.queue_entry]) def prolog(self): @@ -1804,8 +1800,8 @@ class VerifyTask(PreJobTask): logging.info("starting verify on %s", self.host.hostname) if self.queue_entry: - self.queue_entry.set_status('Verifying') - self.host.set_status('Verifying') + self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING) + self.host.set_status(models.Host.Status.VERIFYING) # Delete any other queued verifies for this host. One verify will do # and there's no need to keep records of other requests. @@ -1820,7 +1816,10 @@ class VerifyTask(PreJobTask): def epilog(self): super(VerifyTask, self).epilog() if self.success: - self.host.set_status('Ready') + if self.queue_entry: + self.queue_entry.on_pending() + else: + self.host.set_status(models.Host.Status.READY) class CleanupHostsMixin(object): @@ -1838,18 +1837,18 @@ class CleanupHostsMixin(object): if do_reboot: # don't pass the queue entry to the CleanupTask. if the cleanup # fails, the job doesn't care -- it's over. - cleanup_task = CleanupTask(host=queue_entry.host) - self.agent.dispatcher.add_agent(Agent([cleanup_task])) + models.SpecialTask.objects.create( + host=models.Host(id=queue_entry.host.id), + task=models.SpecialTask.Task.CLEANUP) else: - queue_entry.host.set_status('Ready') + queue_entry.host.set_status(models.Host.Status.READY) class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin): - def __init__(self, job, queue_entries, cmd=None, group_name='', - recover_run_monitor=None): + def __init__(self, job, queue_entries, cmd=None, recover_run_monitor=None): self.job = job self.queue_entries = queue_entries - self.group_name = group_name + self.group_name = queue_entries[0].get_group_name() super(QueueTask, self).__init__( cmd, self._execution_path(), recover_run_monitor=recover_run_monitor) @@ -1887,6 +1886,18 @@ class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin): def prolog(self): + for entry in self.queue_entries: + if entry.status not in (models.HostQueueEntry.Status.STARTING, + models.HostQueueEntry.Status.RUNNING): + raise SchedulerError('Queue task attempting to start ' + 'entry with invalid status %s: %s' + % (entry.status, entry)) + if entry.host.status not in (models.Host.Status.PENDING, + models.Host.Status.RUNNING): + raise SchedulerError('Queue task attempting to start on queue ' + 'entry with invalid host status %s: %s' + % (entry.host.status, entry)) + queued_key, queued_time = self._job_queued_keyval(self.job) keyval_dict = {queued_key: queued_time} if self.group_name: @@ -1894,9 +1905,9 @@ class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin): self._write_keyvals_before_job(keyval_dict) for queue_entry in self.queue_entries: self._write_host_keyvals(queue_entry.host) - queue_entry.set_status('Running') + queue_entry.set_status(models.HostQueueEntry.Status.RUNNING) queue_entry.update_field('started_on', datetime.datetime.now()) - queue_entry.host.set_status('Running') + queue_entry.host.set_status(models.Host.Status.RUNNING) queue_entry.host.update_field('dirty', 1) if self.job.synch_count == 1 and len(self.queue_entries) == 1: # TODO(gps): Remove this if nothing needs it anymore. @@ -1919,8 +1930,8 @@ class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin): if self.monitor.lost_process: self._write_lost_process_error_file() - gather_task = GatherLogsTask(self.job, self.queue_entries) - self.agent.dispatcher.add_agent(Agent(tasks=[gather_task])) + for queue_entry in self.queue_entries: + queue_entry.set_status(models.HostQueueEntry.Status.GATHERING) def _write_status_comment(self, comment): @@ -2029,13 +2040,13 @@ class PostJobTask(AgentTask): def run(self): - # make sure we actually have results to work with. - # this should never happen in normal operation. + # Make sure we actually have results to work with. + # This should never happen in normal operation. if not self._autoserv_monitor.has_process(): email_manager.manager.enqueue_notify_email( - 'No results in post-job task', - 'No results in post-job task at %s' % - self._autoserv_monitor.pidfile_id) + 'No results in post-job task', + 'No results in post-job task at %s' % + self._autoserv_monitor.pidfile_id) self.finished(False) return @@ -2080,8 +2091,16 @@ class GatherLogsTask(PostJobTask, CleanupHostsMixin): def prolog(self): + for queue_entry in self._queue_entries: + if queue_entry.status != models.HostQueueEntry.Status.GATHERING: + raise SchedulerError('Gather task attempting to start on ' + 'non-gathering entry: %s' % queue_entry) + if queue_entry.host.status != models.Host.Status.RUNNING: + raise SchedulerError('Gather task attempting to start on queue ' + 'entry with non-running host: ' + '%s' % queue_entry) + super(GatherLogsTask, self).prolog() - self._set_all_statuses(models.HostQueueEntry.Status.GATHERING) def epilog(self): @@ -2116,35 +2135,50 @@ class CleanupTask(PreJobTask): TASK_TYPE = models.SpecialTask.Task.CLEANUP - def __init__(self, host=None, queue_entry=None, task=None, - recover_run_monitor=None): - assert bool(host) ^ bool(queue_entry) - if queue_entry: - host = queue_entry.get_host() - self.queue_entry = queue_entry - self.host = host - - repair_task = RepairTask(host, queue_entry=queue_entry) + def __init__(self, task, recover_run_monitor=None): super(CleanupTask, self).__init__( - task, ['--cleanup'], failure_tasks=[repair_task], - recover_run_monitor=recover_run_monitor) + task, ['--cleanup'], recover_run_monitor=recover_run_monitor) - self._set_ids(host=host, queue_entries=[queue_entry]) + self._set_ids(host=self.host, queue_entries=[self.queue_entry]) def prolog(self): super(CleanupTask, self).prolog() logging.info("starting cleanup task for host: %s", self.host.hostname) - self.host.set_status("Cleaning") + self.host.set_status(models.Host.Status.CLEANING) + if self.queue_entry: + self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING) + + + def _should_run_verify(self): + do_not_verify = (self.host.protection == + host_protections.Protection.DO_NOT_VERIFY) + if do_not_verify: + return False + return self.queue_entry and self.queue_entry.job.run_verify def epilog(self): super(CleanupTask, self).epilog() if self.success: - self.host.set_status('Ready') self.host.update_field('dirty', 0) + if self.queue_entry: + queue_entry = models.HostQueueEntry(id=self.queue_entry.id) + else: + queue_entry = None + + if self._should_run_verify(): + models.SpecialTask.objects.create( + host=models.Host(id=self.host.id), + queue_entry=queue_entry, + task=models.SpecialTask.Task.VERIFY) + elif self.queue_entry: + self.queue_entry.on_pending() + else: + self.host.set_status(models.Host.Status.READY) + class FinalReparseTask(PostJobTask): _num_running_parses = 0 @@ -2176,8 +2210,12 @@ class FinalReparseTask(PostJobTask): def prolog(self): + for queue_entry in self._queue_entries: + if queue_entry.status != models.HostQueueEntry.Status.PARSING: + raise SchedulerError('Parse task attempting to start on ' + 'non-parsing entry: %s' % queue_entry) + super(FinalReparseTask, self).prolog() - self._set_all_statuses(models.HostQueueEntry.Status.PARSING) def epilog(self): @@ -2221,20 +2259,6 @@ class FinalReparseTask(PostJobTask): self._decrement_running_parses() -class SetEntryPendingTask(AgentTask): - def __init__(self, queue_entry): - super(SetEntryPendingTask, self).__init__(cmd='') - self._queue_entry = queue_entry - self._set_ids(queue_entries=[queue_entry]) - - - def run(self): - agent = self._queue_entry.on_pending() - if agent: - self.agent.dispatcher.add_agent(agent) - self.finished(True) - - class DBError(Exception): """Raised by the DBObject constructor when its select fails.""" @@ -2267,7 +2291,9 @@ class DBObject(object): def __init__(self, id=None, row=None, new_record=False, always_query=True): - assert (bool(id) != bool(row)) + assert bool(id) or bool(row) + if id is not None and row is not None: + assert id == row[0] assert self._table_name, '_table_name must be defined in your class' assert self._fields, '_fields must be defined in your class' if not new_record: @@ -2435,8 +2461,7 @@ class DBObject(object): 'where' : where, 'order_by' : order_by}) rows = _db.execute(query, params) - for row in rows: - yield cls(row=row) + return [cls(id=row[0], row=row) for row in rows] class IneligibleHostQueue(DBObject): @@ -2492,15 +2517,6 @@ class Host(DBObject): return platform, all_labels - def reverify_tasks(self): - cleanup_task = CleanupTask(host=self) - verify_task = VerifyTask(host=self) - - # just to make sure this host does not get taken away - self.set_status('Cleaning') - return [cleanup_task, verify_task] - - _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE) @@ -2666,16 +2682,23 @@ class HostQueueEntry(DBObject): logging.info("%s -> %s", self, self.status) - if status in ['Queued', 'Parsing']: + if status in (models.HostQueueEntry.Status.QUEUED, + models.HostQueueEntry.Status.PARSING): self.update_field('complete', False) self.update_field('active', False) - if status in ['Pending', 'Running', 'Verifying', 'Starting', - 'Gathering']: + if status in (models.HostQueueEntry.Status.PENDING, + models.HostQueueEntry.Status.RUNNING, + models.HostQueueEntry.Status.VERIFYING, + models.HostQueueEntry.Status.STARTING, + models.HostQueueEntry.Status.GATHERING): self.update_field('complete', False) self.update_field('active', True) - if status in ['Failed', 'Completed', 'Stopped', 'Aborted']: + if status in (models.HostQueueEntry.Status.FAILED, + models.HostQueueEntry.Status.COMPLETED, + models.HostQueueEntry.Status.STOPPED, + models.HostQueueEntry.Status.ABORTED): self.update_field('complete', True) self.update_field('active', False) @@ -2723,7 +2746,7 @@ class HostQueueEntry(DBObject): email_manager.manager.send_email(self.job.email_list, subject, body) - def run_pre_job_tasks(self, assigned_host=None): + def schedule_pre_job_tasks(self, assigned_host=None): if self.meta_host is not None or self.atomic_group: assert assigned_host # ensure results dir exists for the queue log @@ -2736,22 +2759,19 @@ class HostQueueEntry(DBObject): self.job.name, self.meta_host, self.atomic_group_id, self.host.hostname, self.status) - return self._do_run_pre_job_tasks() + self._do_schedule_pre_job_tasks() - def _do_run_pre_job_tasks(self): + def _do_schedule_pre_job_tasks(self): # Every host goes thru the Verifying stage (which may or may not # actually do anything as determined by get_pre_job_tasks). self.set_status(models.HostQueueEntry.Status.VERIFYING) - - # The pre job tasks always end with a SetEntryPendingTask which - # will continue as appropriate through queue_entry.on_pending(). - return Agent(self.job.get_pre_job_tasks(queue_entry=self)) + self.job.schedule_pre_job_tasks(queue_entry=self) def requeue(self): assert self.host - self.set_status('Queued') + self.set_status(models.HostQueueEntry.Status.QUEUED) self.update_field('started_on', None) # verify/cleanup failure sets the execution subdir, so reset it here self.set_execution_subdir('') @@ -2765,7 +2785,7 @@ class HostQueueEntry(DBObject): repair. """ assert not self.meta_host - self.set_status('Failed') + self.set_status(models.HostQueueEntry.Status.FAILED) self.job.stop_if_necessary() @@ -2801,43 +2821,59 @@ class HostQueueEntry(DBObject): def on_pending(self): """ Called when an entry in a synchronous job has passed verify. If the - job is ready to run, returns an agent to run the job. Returns None - otherwise. + job is ready to run, sets the entries to STARTING. Otherwise, it leaves + them in PENDING. """ - self.set_status('Pending') - self.get_host().set_status('Pending') + self.set_status(models.HostQueueEntry.Status.PENDING) + self.host.set_status(models.Host.Status.PENDING) # Some debug code here: sends an email if an asynchronous job does not # immediately enter Starting. # TODO: Remove this once we figure out why asynchronous jobs are getting # stuck in Pending. - agent = self.job.run_if_ready(queue_entry=self) - if self.job.synch_count == 1 and agent is None: + self.job.run_if_ready(queue_entry=self) + if (self.job.synch_count == 1 and + self.status == models.HostQueueEntry.Status.PENDING): subject = 'Job %s (id %s)' % (self.job.name, self.job.id) message = 'Asynchronous job stuck in Pending' email_manager.manager.enqueue_notify_email(subject, message) - return agent def abort(self, dispatcher): assert self.aborted and not self.complete Status = models.HostQueueEntry.Status - has_running_job_agent = ( - self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING) - and dispatcher.get_agents_for_entry(self)) - if has_running_job_agent: + if self.status in (Status.GATHERING, Status.PARSING): # do nothing; post-job tasks will finish and then mark this entry # with status "Aborted" and take care of the host return - if self.status in (Status.STARTING, Status.PENDING): + if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING): + assert not dispatcher.get_agents_for_entry(self) self.host.set_status(models.Host.Status.READY) elif self.status == Status.VERIFYING: - dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks())) + models.SpecialTask.objects.create( + task=models.SpecialTask.Task.CLEANUP, + host=models.Host(id=self.host.id)) self.set_status(Status.ABORTED) + + def get_group_name(self): + atomic_group = self.atomic_group + if not atomic_group: + return '' + + # Look at any meta_host and dependency labels and pick the first + # one that also specifies this atomic group. Use that label name + # as the group name if possible (it is more specific). + for label in self.get_labels(): + if label.atomic_group_id: + assert label.atomic_group_id == atomic_group.id + return label.name + return atomic_group.name + + def execution_tag(self): assert self.execution_subdir return "%s/%s" % (self.job.tag(), self.execution_subdir) @@ -3048,7 +3084,7 @@ class Job(DBObject): params=(self.id, execution_subdir))) - def _get_autoserv_params(self, queue_entries): + def get_autoserv_params(self, queue_entries): assert queue_entries execution_tag = queue_entries[0].execution_tag() control_path = self._write_control_file(execution_tag) @@ -3075,7 +3111,7 @@ class Job(DBObject): return False - def should_run_verify(self, queue_entry): + def _should_run_verify(self, queue_entry): do_not_verify = (queue_entry.host.protection == host_protections.Protection.DO_NOT_VERIFY) if do_not_verify: @@ -3083,7 +3119,7 @@ class Job(DBObject): return self.run_verify - def get_pre_job_tasks(self, queue_entry): + def schedule_pre_job_tasks(self, queue_entry): """ Get a list of tasks to perform before the host_queue_entry may be used to run this Job (such as Cleanup & Verify). @@ -3093,13 +3129,18 @@ class Job(DBObject): task in the list calls HostQueueEntry.on_pending(), which continues the flow of the job. """ - tasks = [] if self._should_run_cleanup(queue_entry): - tasks.append(CleanupTask(queue_entry=queue_entry)) - if self.should_run_verify(queue_entry): - tasks.append(VerifyTask(queue_entry=queue_entry)) - tasks.append(SetEntryPendingTask(queue_entry)) - return tasks + task = models.SpecialTask.Task.CLEANUP + elif self._should_run_verify(queue_entry): + task = models.SpecialTask.Task.VERIFY + else: + queue_entry.on_pending() + return + + models.SpecialTask.objects.create( + host=models.Host(id=queue_entry.host_id), + queue_entry=models.HostQueueEntry(id=queue_entry.id), + task=task) def _assign_new_group(self, queue_entries, group_name=''): @@ -3144,21 +3185,10 @@ class Job(DBObject): # Sanity check. We'll only ever be called if this can be met. assert len(chosen_entries) >= self.synch_count - if atomic_group: - # Look at any meta_host and dependency labels and pick the first - # one that also specifies this atomic group. Use that label name - # as the group name if possible (it is more specific). - group_name = atomic_group.name - for label in include_queue_entry.get_labels(): - if label.atomic_group_id: - assert label.atomic_group_id == atomic_group.id - group_name = label.name - break - else: - group_name = '' + group_name = include_queue_entry.get_group_name() self._assign_new_group(chosen_entries, group_name=group_name) - return chosen_entries, group_name + return chosen_entries def run_if_ready(self, queue_entry): @@ -3170,12 +3200,10 @@ class Job(DBObject): """ if not self.is_ready(): self.stop_if_necessary() - return None - - if queue_entry.atomic_group: - return self.run_with_ready_delay(queue_entry) - - return self.run(queue_entry) + elif queue_entry.atomic_group: + self.run_with_ready_delay(queue_entry) + else: + self.run(queue_entry) def run_with_ready_delay(self, queue_entry): @@ -3200,12 +3228,19 @@ class Job(DBObject): # Delay is disabled or we already have enough? Do not wait to run. if not delay or over_max_threshold or delay_expired: - return self.run(queue_entry) + self.run(queue_entry) + else: + queue_entry.set_status(models.HostQueueEntry.Status.WAITING) + + + def schedule_delayed_callback_task(self, queue_entry): + queue_entry.set_status(models.HostQueueEntry.Status.PENDING) - # A delay was previously scheduled. if self._delay_ready_task: return None + delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts + def run_job_after_delay(): logging.info('Job %s done waiting for extra hosts.', self.id) return self.run(queue_entry) @@ -3214,39 +3249,29 @@ class Job(DBObject): self.id, delay) self._delay_ready_task = DelayedCallTask(delay_seconds=delay, callback=run_job_after_delay) - - return Agent([self._delay_ready_task], num_processes=0) + return self._delay_ready_task def run(self, queue_entry): """ @param queue_entry: The HostQueueEntry instance calling this method. - @returns An Agent instance to run this job or None if we've already - been run. """ if queue_entry.atomic_group and self._atomic_and_has_started(): logging.error('Job.run() called on running atomic Job %d ' 'with HQE %s.', self.id, queue_entry) - return None - queue_entries, group_name = self._choose_group_to_run(queue_entry) - return self._finish_run(queue_entries, group_name) + return + queue_entries = self._choose_group_to_run(queue_entry) + self._finish_run(queue_entries) - def _finish_run(self, queue_entries, group_name): + def _finish_run(self, queue_entries): for queue_entry in queue_entries: - queue_entry.set_status('Starting') - params = self._get_autoserv_params(queue_entries) - queue_task = QueueTask(job=self, queue_entries=queue_entries, - cmd=params, group_name=group_name) - tasks = [queue_task] + queue_entry.set_status(models.HostQueueEntry.Status.STARTING) if self._delay_ready_task: # Cancel any pending callback that would try to run again # as we are already running. self._delay_ready_task.abort() - return Agent(tasks, num_processes=len(queue_entries)) - - def __str__(self): return '%s-%s' % (self.id, self.owner) diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py index ea4d0685..78793341 100755 --- a/scheduler/monitor_db_unittest.py +++ b/scheduler/monitor_db_unittest.py @@ -18,18 +18,15 @@ _DEBUG = False class DummyAgent(object): - _is_running = False + started = False _is_done = False num_processes = 1 host_ids = [] queue_entry_ids = [] - def is_running(self): - return self._is_running - def tick(self): - self._is_running = True + self.started = True def is_done(self): @@ -38,7 +35,6 @@ class DummyAgent(object): def set_done(self, done): self._is_done = done - self._is_running = not done class IsRow(mock.argument_comparator): @@ -197,14 +193,14 @@ class DispatcherSchedulingTest(BaseSchedulerTest): def _set_monitor_stubs(self): super(DispatcherSchedulingTest, self)._set_monitor_stubs() - def hqe__do_run_pre_job_tasks_stub(queue_entry): - """Return a test dummy. Called by HostQueueEntry.run().""" + def hqe__do_schedule_pre_job_tasks_stub(queue_entry): + """Called by HostQueueEntry.run().""" self._record_job_scheduled(queue_entry.job.id, queue_entry.host.id) queue_entry.set_status('Starting') - return DummyAgent() - self.god.stub_with(monitor_db.HostQueueEntry, '_do_run_pre_job_tasks', - hqe__do_run_pre_job_tasks_stub) + self.god.stub_with(monitor_db.HostQueueEntry, + '_do_schedule_pre_job_tasks', + hqe__do_schedule_pre_job_tasks_stub) def hqe_queue_log_record_stub(self, log_line): """No-Op to avoid calls down to the _drone_manager during tests.""" @@ -457,9 +453,9 @@ class DispatcherSchedulingTest(BaseSchedulerTest): self._dispatcher._refresh_pending_queue_entries() atomic_hqe = monitor_db.HostQueueEntry.fetch(where='job_id=%d' % - atomic_job.id).next() + atomic_job.id)[0] normal_hqe = monitor_db.HostQueueEntry.fetch(where='job_id=%d' % - normal_job.id).next() + normal_job.id)[0] host_scheduler = self._dispatcher._host_scheduler self.assertTrue(host_scheduler._check_atomic_group_labels( @@ -478,7 +474,7 @@ class DispatcherSchedulingTest(BaseSchedulerTest): def test_HostScheduler_get_host_atomic_group_id(self): job = self._create_job(metahosts=[self.label6.id]) queue_entry = monitor_db.HostQueueEntry.fetch( - where='job_id=%d' % job.id).next() + where='job_id=%d' % job.id)[0] # Indirectly initialize the internal state of the host scheduler. self._dispatcher._refresh_pending_queue_entries() @@ -774,7 +770,7 @@ class DispatcherThrottlingTest(BaseSchedulerTest): def fake_max_runnable_processes(fake_self): running = sum(agent.num_processes for agent in self._agents - if agent.is_running()) + if agent.started and not agent.is_done()) return self._MAX_RUNNING - running self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes', fake_max_runnable_processes) @@ -792,7 +788,7 @@ class DispatcherThrottlingTest(BaseSchedulerTest): def _assert_agents_started(self, indexes, is_started=True): for i in indexes: - self.assert_(self._agents[i].is_running() == is_started, + self.assert_(self._agents[i].started == is_started, 'Agent %d %sstarted' % (i, is_started and 'not ' or '')) @@ -853,29 +849,25 @@ class FindAbortTest(BaseSchedulerTest): """ def _check_host_agent(self, agent, host_id): self.assert_(isinstance(agent, monitor_db.Agent)) - tasks = list(agent.queue.queue) - self.assertEquals(len(tasks), 2) - cleanup, verify = tasks + self.assert_(agent.task) + cleanup = agent.task self.assert_(isinstance(cleanup, monitor_db.CleanupTask)) self.assertEquals(cleanup.host.id, host_id) - self.assert_(isinstance(verify, monitor_db.VerifyTask)) - self.assertEquals(verify.host.id, host_id) - def _check_agents(self, agents): agents = list(agents) - self.assertEquals(len(agents), 3) - self.assertEquals(agents[0], self._agent) - self._check_host_agent(agents[1], 1) - self._check_host_agent(agents[2], 2) + self.assertEquals(len(agents), 2) + self._check_host_agent(agents[0], 1) + self._check_host_agent(agents[1], 2) def _common_setup(self): self._create_job(hosts=[1, 2]) self._update_hqe(set='aborted=1') self._agent = self.god.create_mock_class(monitor_db.Agent, 'old_agent') + self._agent.started = True _set_host_and_qe_ids(self._agent, [1, 2]) self._agent.abort.expect_call() self._agent.abort.expect_call() # gets called once for each HQE @@ -892,8 +884,11 @@ class FindAbortTest(BaseSchedulerTest): self._common_setup() self._update_hqe(set='active=1, status="Verifying"') + self._agent.tick.expect_call() + self._agent.is_done.expect_call().and_return(True) self._dispatcher._find_aborting() - + self._dispatcher._handle_agents() + self._dispatcher._schedule_special_tasks() self._check_agents(self._dispatcher._agents) self.god.check_playback() @@ -1128,8 +1123,8 @@ class AgentTest(unittest.TestCase): _set_host_and_qe_ids(task) return task - def _create_agent(self, tasks): - agent = monitor_db.Agent(tasks) + def _create_agent(self, task): + agent = monitor_db.Agent(task) agent.dispatcher = self._dispatcher return agent @@ -1139,65 +1134,20 @@ class AgentTest(unittest.TestCase): agent.tick() - def test_agent(self): - task1 = self._create_mock_task('task1') - task2 = self._create_mock_task('task2') - task3 = self._create_mock_task('task3') - task1.poll.expect_call() - task1.is_done.expect_call().and_return(False) - task1.poll.expect_call() - task1.is_done.expect_call().and_return(True) - task1.is_done.expect_call().and_return(True) - task1.success = True - - task2.poll.expect_call() - task2.is_done.expect_call().and_return(True) - task2.is_done.expect_call().and_return(True) - task2.success = False - task2.failure_tasks = [task3] - - self._dispatcher.add_agent.expect_call(IsAgentWithTask(task3)) - - agent = self._create_agent([task1, task2]) - self._finish_agent(agent) - self.god.check_playback() - - - def _test_agent_abort_helper(self, ignore_abort=False): - task1 = self._create_mock_task('task1') - task2 = self._create_mock_task('task2') - task1.poll.expect_call() - task1.is_done.expect_call().and_return(False) - task1.abort.expect_call() - if ignore_abort: - task1.aborted = False # task ignores abort; execution continues - - task1.poll.expect_call() - task1.is_done.expect_call().and_return(True) - task1.is_done.expect_call().and_return(True) - task1.success = True - - task2.poll.expect_call() - task2.is_done.expect_call().and_return(True) - task2.is_done.expect_call().and_return(True) - task2.success = True - else: - task1.aborted = True - task2.abort.expect_call() - task2.aborted = True + def test_agent_abort(self): + task = self._create_mock_task('task') + task.poll.expect_call() + task.is_done.expect_call().and_return(False) + task.abort.expect_call() + task.aborted = True - agent = self._create_agent([task1, task2]) + agent = self._create_agent(task) agent.tick() agent.abort() self._finish_agent(agent) self.god.check_playback() - def test_agent_abort(self): - self._test_agent_abort_helper() - self._test_agent_abort_helper(True) - - def _test_agent_abort_before_started_helper(self, ignore_abort=False): task = self._create_mock_task('task') task.abort.expect_call() @@ -1205,12 +1155,11 @@ class AgentTest(unittest.TestCase): task.aborted = False task.poll.expect_call() task.is_done.expect_call().and_return(True) - task.is_done.expect_call().and_return(True) task.success = True else: task.aborted = True - agent = self._create_agent([task]) + agent = self._create_agent(task) agent.abort() self._finish_agent(agent) self.god.check_playback() @@ -1243,8 +1192,8 @@ class DelayedCallTaskTest(unittest.TestCase): delay_seconds=2, callback=test_callback, now_func=test_time) # time 33 self.assertEqual(35, delay_task.end_time) - agent = monitor_db.Agent([delay_task], num_processes=0) - self.assert_(not agent.active_task) + agent = monitor_db.Agent(delay_task, num_processes=0) + self.assert_(not agent.started) agent.tick() # activates the task and polls it once, time 34.01 self.assertEqual(0, test_callback.calls, "callback called early") agent.tick() # time 34.99 @@ -1261,7 +1210,7 @@ class DelayedCallTaskTest(unittest.TestCase): def test_delayed_call_abort(self): delay_task = monitor_db.DelayedCallTask( delay_seconds=987654, callback=lambda : None) - agent = monitor_db.Agent([delay_task], num_processes=0) + agent = monitor_db.Agent(delay_task, num_processes=0) agent.abort() agent.tick() self.assert_(agent.is_done()) @@ -1348,6 +1297,17 @@ class AgentTasksTest(BaseSchedulerTest): host=host, meta_host=None) queue_entry.save() + self.task = models.SpecialTask(id=1, host=host, + task=models.SpecialTask.Task.CLEANUP, + is_active=False, is_complete=False, + time_requested=datetime.datetime.now(), + time_started=None, + queue_entry=queue_entry) + self.task.save() + self.god.stub_function(self.task, 'activate') + self.god.stub_function(self.task, 'finish') + self.god.stub_function(models.SpecialTask.objects, 'create') + self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher, 'dispatcher') @@ -1393,6 +1353,8 @@ class AgentTasksTest(BaseSchedulerTest): monitor_db.PidfileRunMonitor.exit_code.expect_call().and_return( exit_status) + self.task.finish.expect_call() + if copy_log_file: self._setup_move_logfile() @@ -1414,7 +1376,24 @@ class AgentTasksTest(BaseSchedulerTest): mock.is_string_comparator()) - def _test_repair_task_helper(self, success, task_tag, queue_entry=None): + def _setup_special_task(self, task_id, task_type, use_queue_entry): + self.task.id = task_id + self.task.task = task_type + if use_queue_entry: + queue_entry = models.HostQueueEntry(id=self.queue_entry.id) + else: + queue_entry = None + self.task.queue_entry = queue_entry + self.task.save() + + + def _test_repair_task_helper(self, success, task_id, use_queue_entry=False): + self._setup_special_task(task_id, models.SpecialTask.Task.REPAIR, + use_queue_entry) + task_tag = '%d-repair' % task_id + + self.task.activate.expect_call() + self.host.set_status.expect_call('Repairing') if success: self.setup_run_monitor(0, task_tag) @@ -1423,8 +1402,11 @@ class AgentTasksTest(BaseSchedulerTest): self.setup_run_monitor(1, task_tag) self.host.set_status.expect_call('Repair Failed') - task = monitor_db.RepairTask(self.host, queue_entry=queue_entry) - self.assertEquals(task.failure_tasks, []) + task = monitor_db.RepairTask(task=self.task) + task.host = self.host + if use_queue_entry: + task.queue_entry = self.queue_entry + self.run_task(task, success) expected_protection = host_protections.Protection.get_string( @@ -1441,8 +1423,8 @@ class AgentTasksTest(BaseSchedulerTest): def test_repair_task(self): - self._test_repair_task_helper(True, '1-repair') - self._test_repair_task_helper(False, '2-repair') + self._test_repair_task_helper(True, 1) + self._test_repair_task_helper(False, 2) def test_repair_task_with_hqe_already_requeued(self): @@ -1450,8 +1432,7 @@ class AgentTasksTest(BaseSchedulerTest): # already been requeued. ensure it leaves the HQE alone in that case. self.queue_entry.meta_host = 1 self.queue_entry.host = None - self._test_repair_task_helper(False, '1-repair', - queue_entry=self.queue_entry) + self._test_repair_task_helper(False, 1, True) def test_recovery_repair_task_working_directory(self): @@ -1460,31 +1441,38 @@ class AgentTasksTest(BaseSchedulerTest): class MockSpecialTask(object): def execution_path(self): return '/my/path' + host = models.Host(id=self.host.id) + queue_entry = None - special_task = MockSpecialTask() - task = monitor_db.RepairTask(self.host, task=special_task) + task = monitor_db.RepairTask(task=MockSpecialTask()) self.assertEquals(task._working_directory, '/my/path') def test_repair_task_aborted(self): + self._setup_special_task(1, models.SpecialTask.Task.REPAIR, False) + + self.task.activate.expect_call() self.host.set_status.expect_call('Repairing') self.setup_run_monitor(0, '1-repair', aborted=True) - task = monitor_db.RepairTask(self.host) + task = monitor_db.RepairTask(task=self.task) + task.host = self.host task.agent = object() + task.poll() task.abort() self.assertTrue(task.done) self.assertTrue(task.aborted) - self.assertTrue(task.task.is_complete) - self.assertFalse(task.task.is_active) self.god.check_playback() def _test_repair_task_with_queue_entry_helper(self, parse_failed_repair, - task_tag): + task_id): + self._setup_special_task(task_id, models.SpecialTask.Task.REPAIR, True) + task_tag = '%d-repair' % task_id + self.god.stub_class(monitor_db, 'FinalReparseTask') self.god.stub_class(monitor_db, 'Agent') self.god.stub_class_method(monitor_db.TaskWithJobKeyvals, @@ -1492,10 +1480,13 @@ class AgentTasksTest(BaseSchedulerTest): agent = DummyAgent() agent.dispatcher = self._dispatcher + self.task.activate.expect_call() + self.host.set_status.expect_call('Repairing') self.setup_run_monitor(1, task_tag) self.host.set_status.expect_call('Repair Failed') self.queue_entry.update_from_database.expect_call() + self.queue_entry.status = 'Queued' self.queue_entry.set_execution_subdir.expect_call() monitor_db.TaskWithJobKeyvals._write_keyval_after_job.expect_call( 'job_queued', mock.is_instance_comparator(int)) @@ -1505,26 +1496,29 @@ class AgentTasksTest(BaseSchedulerTest): self.queue_entry.execution_path.expect_call().and_return('tag') self._setup_move_logfile() self.job.parse_failed_repair = parse_failed_repair - if parse_failed_repair: - reparse_task = monitor_db.FinalReparseTask.expect_new( - [self.queue_entry]) - reparse_agent = monitor_db.Agent.expect_new([reparse_task], - num_processes=0) - self._dispatcher.add_agent.expect_call(reparse_agent) self.queue_entry.handle_host_failure.expect_call() + if parse_failed_repair: + self.queue_entry.set_status.expect_call('Parsing') + self.queue_entry.execution_path.expect_call().and_return('tag') + drone_manager.DroneManager.get_pidfile_id_from.expect_call( + 'tag', pidfile_name=monitor_db._AUTOSERV_PID_FILE).and_return( + self.PIDFILE_ID) - task = monitor_db.RepairTask(self.host, self.queue_entry) + task = monitor_db.RepairTask(task=self.task) task.agent = agent + task.host = self.host + task.queue_entry = self.queue_entry self.queue_entry.status = 'Queued' self.job.created_on = datetime.datetime(2009, 1, 1) + self.run_task(task, False) self.assertTrue(set(task.cmd) >= self.JOB_AUTOSERV_PARAMS) self.god.check_playback() def test_repair_task_with_queue_entry(self): - self._test_repair_task_with_queue_entry_helper(True, '1-repair') - self._test_repair_task_with_queue_entry_helper(False, '2-repair') + self._test_repair_task_with_queue_entry_helper(True, 1) + self._test_repair_task_with_queue_entry_helper(False, 2) def _setup_prejob_task_failure(self, task_tag, use_queue_entry): @@ -1535,40 +1529,43 @@ class AgentTasksTest(BaseSchedulerTest): self.queue_entry.execution_path.expect_call().and_return('tag') self._setup_move_logfile(include_destination=True) self.queue_entry.requeue.expect_call() + queue_entry = models.HostQueueEntry(id=self.queue_entry.id) + else: + queue_entry = None + models.SpecialTask.objects.create.expect_call( + host=models.Host(id=self.host.id), + task=models.SpecialTask.Task.REPAIR, + queue_entry=queue_entry) def setup_verify_expects(self, success, use_queue_entry, task_tag): + self.task.activate.expect_call() + if use_queue_entry: self.queue_entry.set_status.expect_call('Verifying') self.host.set_status.expect_call('Verifying') if success: self.setup_run_monitor(0, task_tag) - self.host.set_status.expect_call('Ready') + if use_queue_entry: + self.queue_entry.on_pending.expect_call() + else: + self.host.set_status.expect_call('Ready') else: self._setup_prejob_task_failure(task_tag, use_queue_entry) - def _check_verify_failure_tasks(self, verify_task): - self.assertEquals(len(verify_task.failure_tasks), 1) - repair_task = verify_task.failure_tasks[0] - self.assert_(isinstance(repair_task, monitor_db.RepairTask)) - self.assertEquals(verify_task.host, repair_task.host) - if verify_task.queue_entry: - self.assertEquals(repair_task.queue_entry, - verify_task.queue_entry) - else: - self.assertEquals(repair_task.queue_entry, None) - - - def _test_verify_task_helper(self, success, task_tag, use_queue_entry=False, + def _test_verify_task_helper(self, success, task_id, use_queue_entry=False, use_meta_host=False): + self._setup_special_task(task_id, models.SpecialTask.Task.VERIFY, + use_queue_entry) + task_tag = '%d-verify' % task_id self.setup_verify_expects(success, use_queue_entry, task_tag) + task = monitor_db.VerifyTask(task=self.task) + task.host = self.host if use_queue_entry: - task = monitor_db.VerifyTask(queue_entry=self.queue_entry) - else: - task = monitor_db.VerifyTask(host=self.host) - self._check_verify_failure_tasks(task) + task.queue_entry = self.queue_entry + self.run_task(task, success) self.assertTrue(set(task.cmd) >= set([monitor_db._autoserv_path, '-p', '-v', '-m', @@ -1580,13 +1577,13 @@ class AgentTasksTest(BaseSchedulerTest): def test_verify_task_with_host(self): - self._test_verify_task_helper(True, '1-verify') - self._test_verify_task_helper(False, '2-verify') + self._test_verify_task_helper(True, 1) + self._test_verify_task_helper(False, 2) def test_verify_task_with_queue_entry(self): - self._test_verify_task_helper(True, '1-verify', use_queue_entry=True) - self._test_verify_task_helper(False, '2-verify', use_queue_entry=True) + self._test_verify_task_helper(True, 1, use_queue_entry=True) + self._test_verify_task_helper(False, 2, use_queue_entry=True) def test_verify_task_with_metahost(self): @@ -1595,13 +1592,15 @@ class AgentTasksTest(BaseSchedulerTest): def test_specialtask_abort_before_prolog(self): - task = monitor_db.RepairTask(self.host) + self._setup_special_task(1, models.SpecialTask.Task.REPAIR, False) + task = monitor_db.RepairTask(task=self.task) + self.task.finish.expect_call() task.abort() self.assertTrue(task.aborted) def _setup_post_job_task_expects(self, autoserv_success, hqe_status=None, - hqe_aborted=False): + hqe_aborted=False, host_status=None): self.queue_entry.execution_path.expect_call().and_return('tag') self.pidfile_monitor = monitor_db.PidfileRunMonitor.expect_new() self.pidfile_monitor.pidfile_id = self.PIDFILE_ID @@ -1616,7 +1615,9 @@ class AgentTasksTest(BaseSchedulerTest): self.pidfile_monitor.exit_code.expect_call().and_return(code) if hqe_status: - self.queue_entry.set_status.expect_call(hqe_status) + self.queue_entry.status = hqe_status + if host_status: + self.host.status = host_status def _setup_pre_parse_expects(self, autoserv_success): @@ -1717,7 +1718,7 @@ class AgentTasksTest(BaseSchedulerTest): self.god.stub_class(monitor_db, 'PidfileRunMonitor') self.god.stub_class(monitor_db, 'FinalReparseTask') self._setup_post_job_task_expects(not autoserv_killed, 'Gathering', - hqe_aborted) + hqe_aborted, host_status='Running') if hqe_aborted or not has_process: exit_code = None elif autoserv_killed: @@ -1739,10 +1740,7 @@ class AgentTasksTest(BaseSchedulerTest): self.pidfile_monitor.has_process.expect_call().and_return(False) self.pidfile_monitor.has_process.expect_call().and_return(False) - parse_task = monitor_db.FinalReparseTask.expect_new([self.queue_entry]) - _set_host_and_qe_ids(parse_task) - self._dispatcher.add_agent.expect_call(IsAgentWithTask(parse_task)) - + self.queue_entry.set_status.expect_call('Parsing') self.pidfile_monitor.has_process.expect_call().and_return(has_process) if has_process: @@ -1776,10 +1774,9 @@ class AgentTasksTest(BaseSchedulerTest): def _setup_gather_task_cleanup_expects(self): - self.god.stub_class(monitor_db, 'CleanupTask') - cleanup_task = monitor_db.CleanupTask.expect_new(host=self.host) - _set_host_and_qe_ids(cleanup_task) - self._dispatcher.add_agent.expect_call(IsAgentWithTask(cleanup_task)) + models.SpecialTask.objects.create.expect_call( + host=models.Host(id=self.host.id), + task=models.SpecialTask.Task.CLEANUP) def test_gather_logs_reboot_hosts(self): @@ -1806,27 +1803,36 @@ class AgentTasksTest(BaseSchedulerTest): self._run_gather_logs_task(success=False) - def _test_cleanup_task_helper(self, success, task_tag, + def _test_cleanup_task_helper(self, success, task_id, use_queue_entry=False): - if use_queue_entry: - self.queue_entry.get_host.expect_call().and_return(self.host) + self._setup_special_task(task_id, models.SpecialTask.Task.CLEANUP, + use_queue_entry) + task_tag = '%d-cleanup' % task_id + + self.task.activate.expect_call() self.host.set_status.expect_call('Cleaning') + + if use_queue_entry: + self.queue_entry.set_status.expect_call('Verifying') + if success: self.setup_run_monitor(0, task_tag) - self.host.set_status.expect_call('Ready') self.host.update_field.expect_call('dirty', 0) + if use_queue_entry: + queue_entry = models.HostQueueEntry(id=self.queue_entry.id) + models.SpecialTask.objects.create.expect_call( + host=models.Host(id=self.host.id), + task=models.SpecialTask.Task.VERIFY, + queue_entry=queue_entry) + else: + self.host.set_status.expect_call('Ready') else: self._setup_prejob_task_failure(task_tag, use_queue_entry) + task = monitor_db.CleanupTask(task=self.task) + task.host = self.host if use_queue_entry: - task = monitor_db.CleanupTask(queue_entry=self.queue_entry) - else: - task = monitor_db.CleanupTask(host=self.host) - self.assertEquals(len(task.failure_tasks), 1) - repair_task = task.failure_tasks[0] - self.assert_(isinstance(repair_task, monitor_db.RepairTask)) - if use_queue_entry: - self.assertEquals(repair_task.queue_entry, self.queue_entry) + task.queue_entry = self.queue_entry self.run_task(task, success) @@ -1839,12 +1845,12 @@ class AgentTasksTest(BaseSchedulerTest): self.assertTrue(set(task.cmd) >= self.JOB_AUTOSERV_PARAMS) def test_cleanup_task(self): - self._test_cleanup_task_helper(True, '1-cleanup') - self._test_cleanup_task_helper(False, '2-cleanup') + self._test_cleanup_task_helper(True, 1) + self._test_cleanup_task_helper(False, 2) def test_cleanup_task_with_queue_entry(self): - self._test_cleanup_task_helper(False, '1-cleanup', True) + self._test_cleanup_task_helper(False, 1, use_queue_entry=True) def test_recovery_queue_task_aborted_early(self): @@ -1854,6 +1860,7 @@ class AgentTasksTest(BaseSchedulerTest): run_monitor = self.god.create_mock_class(monitor_db.PidfileRunMonitor, 'run_monitor') + self.queue_entry.get_group_name.expect_call().and_return('') self.queue_entry.execution_path.expect_call().and_return('tag') run_monitor.kill.expect_call() monitor_db.QueueTask._log_abort.expect_call() @@ -1945,20 +1952,22 @@ class JobTest(BaseSchedulerTest): mock.mock_function('attach_file_to_execution', default_return_val='/test/path/tmp/foo')) + def _mock_create(**kwargs): + task = models.SpecialTask(**kwargs) + task.save() + self._task = task + self.god.stub_with(models.SpecialTask.objects, 'create', _mock_create) + def _test_pre_job_tasks_helper(self): """ - Calls HQE._do_pre_run_job_tasks() and returns the task list after - confirming that the last task is the SetEntryPendingTask. + Calls HQE._do_schedule_pre_job_tasks() and returns the created special + task """ - queue_entry = monitor_db.HostQueueEntry.fetch('id = 1').next() - pre_job_agent = queue_entry._do_run_pre_job_tasks() - self.assert_(isinstance(pre_job_agent, monitor_db.Agent)) - pre_job_tasks = list(pre_job_agent.queue.queue) - self.assertTrue(isinstance(pre_job_tasks[-1], - monitor_db.SetEntryPendingTask)) - - return pre_job_tasks + self._task = None + queue_entry = monitor_db.HostQueueEntry.fetch('id = 1')[0] + queue_entry._do_schedule_pre_job_tasks() + return self._task def _test_run_helper(self, expect_agent=True, expect_starting=False, @@ -1969,12 +1978,17 @@ class JobTest(BaseSchedulerTest): expected_status = models.HostQueueEntry.Status.PENDING else: expected_status = models.HostQueueEntry.Status.VERIFYING - job = monitor_db.Job.fetch('id = 1').next() - queue_entry = monitor_db.HostQueueEntry.fetch('id = 1').next() + job = monitor_db.Job.fetch('id = 1')[0] + queue_entry = monitor_db.HostQueueEntry.fetch('id = 1')[0] assert queue_entry.job is job - agent = job.run_if_ready(queue_entry) + job.run_if_ready(queue_entry) self.god.check_playback() + + self._dispatcher._schedule_delay_tasks() + self._dispatcher._schedule_running_host_queue_entries() + agent = self._dispatcher._agents[0] + actual_status = models.HostQueueEntry.smart_get(1).status self.assertEquals(expected_status, actual_status) @@ -1983,18 +1997,8 @@ class JobTest(BaseSchedulerTest): return self.assert_(isinstance(agent, monitor_db.Agent)) - tasks = list(agent.queue.queue) - return tasks - - - def _check_verify_task(self, verify_task): - self.assert_(isinstance(verify_task, monitor_db.VerifyTask)) - self.assertEquals(verify_task.queue_entry.id, 1) - - - def _check_pending_task(self, pending_task): - self.assert_(isinstance(pending_task, monitor_db.SetEntryPendingTask)) - self.assertEquals(pending_task._queue_entry.id, 1) + self.assert_(agent.task) + return agent.task def test_run_if_ready_delays(self): @@ -2022,63 +2026,78 @@ class JobTest(BaseSchedulerTest): 'secs_to_wait_for_atomic_group_hosts', 123456) # Get the pending one as a monitor_db.HostQueueEntry object. - pending_hqe = monitor_db.HostQueueEntry(django_hqes[1].id) + hqe = monitor_db.HostQueueEntry(django_hqes[1].id) self.assert_(not job._delay_ready_task) self.assertTrue(job.is_ready()) # Ready with one pending, one verifying and an atomic group should # result in a DelayCallTask to re-check if we're ready a while later. - agent = job.run_if_ready(pending_hqe) + job.run_if_ready(hqe) + self.assertEquals('Waiting', hqe.status) + self._dispatcher._schedule_delay_tasks() + self.assertEquals('Pending', hqe.status) + agent = self._dispatcher._agents[0] self.assert_(job._delay_ready_task) self.assert_(isinstance(agent, monitor_db.Agent)) - tasks = list(agent.queue.queue) - self.assertEqual(1, len(tasks)) - self.assert_(isinstance(tasks[0], monitor_db.DelayedCallTask)) - delay_task = tasks[0] + self.assert_(agent.task) + delay_task = agent.task + self.assert_(isinstance(delay_task, monitor_db.DelayedCallTask)) self.assert_(not delay_task.is_done()) + self.god.stub_function(delay_task, 'abort') + self.god.stub_function(job, 'run') # Test that the DelayedCallTask's callback queued up above does the # correct thing and returns the Agent returned by job.run(). - job.run.expect_call(pending_hqe).and_return('Fake Agent') - self.assertEqual('Fake Agent', delay_task._callback()) - - # A delay already exists, this must do nothing. - self.assertEqual(None, job.run_with_ready_delay(pending_hqe)) + job.run.expect_call(hqe) + delay_task._callback() + self.god.check_playback() # Adjust the delay deadline so that enough time has passed. job._delay_ready_task.end_time = time.time() - 111111 - job.run.expect_call(pending_hqe).and_return('Forty two') + job.run.expect_call(hqe) # ...the delay_expired condition should cause us to call run() - self.assertEqual('Forty two', job.run_with_ready_delay(pending_hqe)) + self._dispatcher._handle_agents() + self.god.check_playback() + delay_task.success = False # Adjust the delay deadline back so that enough time has not passed. job._delay_ready_task.end_time = time.time() + 111111 - self.assertEqual(None, job.run_with_ready_delay(pending_hqe)) + self._dispatcher._handle_agents() + self.god.check_playback() - set_hqe_status(django_hqes[0], models.HostQueueEntry.Status.PENDING) # Now max_number_of_machines HQEs are in pending state. Remaining # delay will now be ignored. - job.run.expect_call(pending_hqe).and_return('Watermelon') + other_hqe = monitor_db.HostQueueEntry(django_hqes[0].id) + self.god.unstub(job, 'run') # ...the over_max_threshold test should cause us to call run() - self.assertEqual('Watermelon', job.run_with_ready_delay(pending_hqe)) + delay_task.abort.expect_call() + other_hqe.on_pending() + self.assertEquals('Starting', other_hqe.status) + self.assertEquals('Starting', hqe.status) + self.god.stub_function(job, 'run') + self.god.unstub(delay_task, 'abort') + hqe.set_status('Pending') + other_hqe.set_status('Pending') # Now we're not over the max for the atomic group. But all assigned # hosts are in pending state. over_max_threshold should make us run(). - pending_hqe.atomic_group.max_number_of_machines += 1 - pending_hqe.atomic_group.save() - job.run.expect_call(pending_hqe).and_return('Watermelon') - self.assertEqual('Watermelon', job.run_with_ready_delay(pending_hqe)) - pending_hqe.atomic_group.max_number_of_machines -= 1 - pending_hqe.atomic_group.save() + hqe.atomic_group.max_number_of_machines += 1 + hqe.atomic_group.save() + job.run.expect_call(hqe) + hqe.on_pending() + self.god.check_playback() + hqe.atomic_group.max_number_of_machines -= 1 + hqe.atomic_group.save() other_hqe = monitor_db.HostQueueEntry(django_hqes[0].id) - self.assertTrue(pending_hqe.job is other_hqe.job) + self.assertTrue(hqe.job is other_hqe.job) # DBObject classes should reuse instances so these should be the same. self.assertEqual(job, other_hqe.job) - self.assertEqual(other_hqe.job, pending_hqe.job) + self.assertEqual(other_hqe.job, hqe.job) # Be sure our delay was not lost during the other_hqe construction. + self.assertEqual(job._delay_ready_task, delay_task) self.assert_(job._delay_ready_task) self.assertFalse(job._delay_ready_task.is_done()) self.assertFalse(job._delay_ready_task.aborted) @@ -2088,7 +2107,7 @@ class JobTest(BaseSchedulerTest): # We pass in the other HQE this time the same way it would happen # for real when one host finishes verifying and enters pending. - agent = job.run_if_ready(other_hqe) + job.run_if_ready(other_hqe) # The delayed task must be aborted by the actual run() call above. self.assertTrue(job._delay_ready_task.aborted) @@ -2096,9 +2115,11 @@ class JobTest(BaseSchedulerTest): self.assertTrue(job._delay_ready_task.is_done()) # Check that job run() and _finish_run() were called by the above: - tasks = list(agent.queue.queue) - self.assertEqual(1, len(tasks)) - self.assert_(isinstance(tasks[0], monitor_db.QueueTask)) + self._dispatcher._schedule_running_host_queue_entries() + agent = self._dispatcher._agents[0] + self.assert_(agent.task) + task = agent.task + self.assert_(isinstance(task, monitor_db.QueueTask)) # Requery these hqes in order to verify the status from the DB. django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id)) for entry in django_hqes: @@ -2108,12 +2129,15 @@ class JobTest(BaseSchedulerTest): # We're already running, but more calls to run_with_ready_delay can # continue to come in due to straggler hosts enter Pending. Make # sure we don't do anything. - self.assertEqual(None, job.run_with_ready_delay(pending_hqe)) + self.god.stub_function(job, 'run') + job.run_with_ready_delay(hqe) + self.god.check_playback() + self.god.unstub(job, 'run') def test__atomic_and_has_started__on_atomic(self): self._create_job(hosts=[5, 6], atomic_group=1) - job = monitor_db.Job.fetch('id = 1').next() + job = monitor_db.Job.fetch('id = 1')[0] self.assertFalse(job._atomic_and_has_started()) self._update_hqe("status='Pending'") @@ -2135,21 +2159,25 @@ class JobTest(BaseSchedulerTest): def test__atomic_and_has_started__not_atomic(self): self._create_job(hosts=[1, 2]) - job = monitor_db.Job.fetch('id = 1').next() + job = monitor_db.Job.fetch('id = 1')[0] self.assertFalse(job._atomic_and_has_started()) self._update_hqe("status='Starting'") self.assertFalse(job._atomic_and_has_started()) + def _check_special_task(self, task, task_type, queue_entry_id=None): + self.assertEquals(task.task, task_type) + self.assertEquals(task.host.id, 1) + if queue_entry_id: + self.assertEquals(task.queue_entry.id, queue_entry_id) + + def test_run_asynchronous(self): self._create_job(hosts=[1, 2]) - tasks = self._test_pre_job_tasks_helper() + task = self._test_pre_job_tasks_helper() - self.assertEquals(len(tasks), 2) - verify_task, pending_task = tasks - self._check_verify_task(verify_task) - self._check_pending_task(pending_task) + self._check_special_task(task, models.SpecialTask.Task.VERIFY, 1) def test_run_asynchronous_skip_verify(self): @@ -2157,21 +2185,17 @@ class JobTest(BaseSchedulerTest): job.run_verify = False job.save() - tasks = self._test_pre_job_tasks_helper() + task = self._test_pre_job_tasks_helper() - self.assertEquals(len(tasks), 1) - pending_task = tasks[0] - self._check_pending_task(pending_task) + self.assertEquals(task, None) def test_run_synchronous_verify(self): self._create_job(hosts=[1, 2], synchronous=True) - tasks = self._test_pre_job_tasks_helper() - self.assertEquals(len(tasks), 2) - verify_task, pending_task = tasks - self._check_verify_task(verify_task) - self._check_pending_task(pending_task) + task = self._test_pre_job_tasks_helper() + + self._check_special_task(task, models.SpecialTask.Task.VERIFY, 1) def test_run_synchronous_skip_verify(self): @@ -2179,18 +2203,16 @@ class JobTest(BaseSchedulerTest): job.run_verify = False job.save() - tasks = self._test_pre_job_tasks_helper() - self.assertEquals(len(tasks), 1) - self._check_pending_task(tasks[0]) + task = self._test_pre_job_tasks_helper() + + self.assertEquals(task, None) def test_run_synchronous_ready(self): self._create_job(hosts=[1, 2], synchronous=True) self._update_hqe("status='Pending', execution_subdir=''") - tasks = self._test_run_helper(expect_starting=True) - self.assertEquals(len(tasks), 1) - queue_task = tasks[0] + queue_task = self._test_run_helper(expect_starting=True) self.assert_(isinstance(queue_task, monitor_db.QueueTask)) self.assertEquals(queue_task.job.id, 1) @@ -2202,8 +2224,8 @@ class JobTest(BaseSchedulerTest): self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True) self._update_hqe("status='Starting', execution_subdir=''") - job = monitor_db.Job.fetch('id = 1').next() - queue_entry = monitor_db.HostQueueEntry.fetch('id = 1').next() + job = monitor_db.Job.fetch('id = 1')[0] + queue_entry = monitor_db.HostQueueEntry.fetch('id = 1')[0] assert queue_entry.job is job self.assertEqual(None, job.run(queue_entry)) @@ -2214,9 +2236,7 @@ class JobTest(BaseSchedulerTest): self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True) self._update_hqe("status='Pending', execution_subdir=''") - tasks = self._test_run_helper(expect_starting=True) - self.assertEquals(len(tasks), 1) - queue_task = tasks[0] + queue_task = self._test_run_helper(expect_starting=True) self.assert_(isinstance(queue_task, monitor_db.QueueTask)) # Atomic group jobs that do not depend on a specific label in the @@ -2229,9 +2249,7 @@ class JobTest(BaseSchedulerTest): job.dependency_labels.add(self.label4) self._update_hqe("status='Pending', execution_subdir=''") - tasks = self._test_run_helper(expect_starting=True) - self.assertEquals(len(tasks), 1) - queue_task = tasks[0] + queue_task = self._test_run_helper(expect_starting=True) self.assert_(isinstance(queue_task, monitor_db.QueueTask)) # Atomic group jobs that also specify a label in the atomic group @@ -2244,11 +2262,9 @@ class JobTest(BaseSchedulerTest): job.reboot_before = models.RebootBefore.ALWAYS job.save() - tasks = self._test_pre_job_tasks_helper() - self.assertEquals(len(tasks), 3) - cleanup_task = tasks[0] - self.assert_(isinstance(cleanup_task, monitor_db.CleanupTask)) - self.assertEquals(cleanup_task.host.id, 1) + task = self._test_pre_job_tasks_helper() + + self._check_special_task(task, models.SpecialTask.Task.CLEANUP) def _test_reboot_before_if_dirty_helper(self, expect_reboot): @@ -2256,12 +2272,12 @@ class JobTest(BaseSchedulerTest): job.reboot_before = models.RebootBefore.IF_DIRTY job.save() - tasks = self._test_pre_job_tasks_helper() - self.assertEquals(len(tasks), expect_reboot and 3 or 2) + task = self._test_pre_job_tasks_helper() if expect_reboot: - cleanup_task = tasks[0] - self.assert_(isinstance(cleanup_task, monitor_db.CleanupTask)) - self.assertEquals(cleanup_task.host.id, 1) + task_type = models.SpecialTask.Task.CLEANUP + else: + task_type = models.SpecialTask.Task.VERIFY + self._check_special_task(task, task_type) def test_reboot_before_if_dirty(self): |