summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorshoward <showard@592f7852-d20e-0410-864c-8624ca9c26a4>2009-09-08 16:26:33 +0000
committershoward <showard@592f7852-d20e-0410-864c-8624ca9c26a4>2009-09-08 16:26:33 +0000
commit0c8996bc5170274e90f6ea7730dc6f7fef84984c (patch)
treefcd062c853852f0aea40fc5f5a1f4f7c7317f39b
parent32ab9a5888cb423dfa11632e454c89aee70e89eb (diff)
Make scheduler more stateless. Agents are now scheduled only by the
dispatcher, and agents no longer use in-memory state to remember multiple tasks. All state is managed by the database. Risk: high (large scheduler change) Visibility: medium (scheduler restarts are now more stable) Signed-off-by: James Ren <jamesren@google.com> git-svn-id: svn://test.kernel.org/autotest/trunk@3664 592f7852-d20e-0410-864c-8624ca9c26a4
-rw-r--r--frontend/afe/models.py42
-rwxr-xr-xfrontend/afe/models_test.py16
-rw-r--r--frontend/afe/rpc_interface.py3
-rwxr-xr-xscheduler/monitor_db.py747
-rwxr-xr-xscheduler/monitor_db_unittest.py520
5 files changed, 658 insertions, 670 deletions
diff --git a/frontend/afe/models.py b/frontend/afe/models.py
index 45b078aa..99855e61 100644
--- a/frontend/afe/models.py
+++ b/frontend/afe/models.py
@@ -742,7 +742,8 @@ class IneligibleHostQueue(dbmodels.Model, model_logic.ModelExtensions):
class HostQueueEntry(dbmodels.Model, model_logic.ModelExtensions):
Status = enum.Enum('Queued', 'Starting', 'Verifying', 'Pending', 'Running',
'Gathering', 'Parsing', 'Aborted', 'Completed',
- 'Failed', 'Stopped', 'Template', string_values=True)
+ 'Failed', 'Stopped', 'Template', 'Waiting',
+ string_values=True)
ACTIVE_STATUSES = (Status.STARTING, Status.VERIFYING, Status.PENDING,
Status.RUNNING, Status.GATHERING)
COMPLETE_STATUSES = (Status.ABORTED, Status.COMPLETED, Status.FAILED,
@@ -983,45 +984,6 @@ class SpecialTask(dbmodels.Model, model_logic.ModelExtensions):
special_task.save()
- @classmethod
- def prepare(cls, agent, task):
- """
- Creates a new special task if necessary and prepares it to be run.
-
- @param agent: The scheduler.monitor_db.AgentTask handling this task.
- It is expected to have a TASK_TYPE, host and queue_entry
- attributes.
- @param task: SpecialTask instance to prepare, or None if a new
- SpecialTask should be created.
-
- @returns task or the newly created SpecialTask.
- """
- # TODO(gps): This method really belongs in scheduler/monitor_db.py.
- # It accesses scheduler specific instance internals!
- if not task:
- if not hasattr(agent, 'TASK_TYPE'):
- raise ValueError('Can only prepare special tasks for '
- 'verify, cleanup, or repair')
-
- host = Host.objects.get(id=agent.host.id)
- queue_entry = None
- if agent.queue_entry:
- queue_entry = (
- HostQueueEntry.objects.get(id=agent.queue_entry.id))
-
- active_tasks = cls.objects.filter(host=host, is_active=True)
- if active_tasks.count():
- raise model_logic.ValidationError(
- 'Active SpecialTask already exists for host %s. '
- 'Task %s must not be created. Existing tasks are: '
- '%s.' % (host, agent.TASK_TYPE, list(active_tasks)))
-
- task = cls.objects.create(host=host, task=agent.TASK_TYPE,
- queue_entry=queue_entry)
-
- return task
-
-
def activate(self):
"""
Sets a task as active and sets the time started to the current time.
diff --git a/frontend/afe/models_test.py b/frontend/afe/models_test.py
index 143853bb..9f4d99da 100755
--- a/frontend/afe/models_test.py
+++ b/frontend/afe/models_test.py
@@ -85,21 +85,5 @@ class SpecialTaskUnittest(unittest.TestCase,
self.assertTrue(task.is_complete)
- def test_prepare(self):
- self.assertEqual(13, models.SpecialTask.prepare(agent=None, task=13))
- class DummyAgentTask(object):
- host = None
- queue_entry = None
- self.assertRaises(ValueError, models.SpecialTask.prepare,
- DummyAgentTask(), None)
- DummyAgentTask.TASK_TYPE = models.SpecialTask.Task.VERIFY
- DummyAgentTask.host = models.Host.objects.create(hostname='hi')
- task1 = models.SpecialTask.prepare(DummyAgentTask(), None)
- task1.activate()
- DummyAgentTask.TASK_TYPE = models.SpecialTask.Task.REPAIR
- self.assertRaises(model_logic.ValidationError,
- models.SpecialTask.prepare, DummyAgentTask(), None)
-
-
if __name__ == '__main__':
unittest.main()
diff --git a/frontend/afe/rpc_interface.py b/frontend/afe/rpc_interface.py
index 95016ff6..e7308831 100644
--- a/frontend/afe/rpc_interface.py
+++ b/frontend/afe/rpc_interface.py
@@ -807,7 +807,8 @@ def get_static_data():
"Stopped": "Other host(s) failed verify",
"Parsing": "Awaiting parse of final results",
"Gathering": "Gathering log files",
- "Template": "Template job for recurring run"}
+ "Template": "Template job for recurring run",
+ "Waiting": "Waiting for scheduler action"}
return result
diff --git a/scheduler/monitor_db.py b/scheduler/monitor_db.py
index 11f69088..bd719f00 100755
--- a/scheduler/monitor_db.py
+++ b/scheduler/monitor_db.py
@@ -640,9 +640,11 @@ class Dispatcher(object):
_drone_manager.refresh()
self._run_cleanup()
self._find_aborting()
- self._find_reverify()
self._process_recurring_runs()
+ self._schedule_delay_tasks()
self._schedule_new_jobs()
+ self._schedule_running_host_queue_entries()
+ self._schedule_special_tasks()
self._handle_agents()
_drone_manager.execute_actions()
email_manager.manager.send_queued_emails()
@@ -694,11 +696,17 @@ class Dispatcher(object):
agent.queue_entry_ids, agent)
+ def _host_has_scheduled_special_task(self, host):
+ return bool(models.SpecialTask.objects.filter(host__id=host.id,
+ is_active=False,
+ is_complete=False))
+
+
def _recover_processes(self):
self._register_pidfiles()
_drone_manager.refresh()
self._recover_all_recoverable_entries()
- self._requeue_starting_entries()
+ self._recover_pending_entries()
self._check_for_remaining_active_entries()
self._reverify_remaining_hosts()
# reinitialize drones after killing orphaned processes, since they can
@@ -730,16 +738,22 @@ class Dispatcher(object):
return None, 'without process'
+ def _get_unassigned_entries(self, status):
+ for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
+ if not self.get_agents_for_entry(entry):
+ yield entry
+
+
def _recover_entries_with_status(self, status, orphans, pidfile_name,
recover_entries_fn):
- queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
- for queue_entry in queue_entries:
- if self.get_agents_for_entry(queue_entry):
- # synchronous job we've already recovered
- continue
+ for queue_entry in self._get_unassigned_entries(status):
queue_entries = queue_entry.job.get_group_entries(queue_entry)
run_monitor, process_string = self._get_recovery_run_monitor(
queue_entry.execution_path(), pidfile_name, orphans)
+ if not run_monitor:
+ # _schedule_running_host_queue_entries should schedule and
+ # recover these entries
+ continue
logging.info('Recovering %s entry %s %s',status.lower(),
', '.join(str(entry) for entry in queue_entries),
@@ -763,17 +777,10 @@ class Dispatcher(object):
def _recover_running_entries(self, orphans):
def recover_entries(job, queue_entries, run_monitor):
- if run_monitor is not None:
- queue_task = QueueTask(job=job, queue_entries=queue_entries,
- recover_run_monitor=run_monitor)
- self.add_agent(Agent(tasks=[queue_task],
- num_processes=len(queue_entries)))
- else:
- # we could do better, but this retains legacy behavior for now
- for queue_entry in queue_entries:
- logging.info('Requeuing running HQE %s since it has no '
- 'process' % queue_entry)
- queue_entry.requeue()
+ queue_task = QueueTask(job=job, queue_entries=queue_entries,
+ recover_run_monitor=run_monitor)
+ self.add_agent(Agent(task=queue_task,
+ num_processes=len(queue_entries)))
self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
orphans, _AUTOSERV_PID_FILE,
@@ -784,7 +791,7 @@ class Dispatcher(object):
def recover_entries(job, queue_entries, run_monitor):
gather_task = GatherLogsTask(job, queue_entries,
recover_run_monitor=run_monitor)
- self.add_agent(Agent([gather_task]))
+ self.add_agent(Agent(gather_task))
self._recover_entries_with_status(
models.HostQueueEntry.Status.GATHERING,
@@ -795,13 +802,19 @@ class Dispatcher(object):
def recover_entries(job, queue_entries, run_monitor):
reparse_task = FinalReparseTask(queue_entries,
recover_run_monitor=run_monitor)
- self.add_agent(Agent([reparse_task], num_processes=0))
+ self.add_agent(Agent(reparse_task, num_processes=0))
self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
orphans, _PARSER_PID_FILE,
recover_entries)
+ def _recover_pending_entries(self):
+ for entry in self._get_unassigned_entries(
+ models.HostQueueEntry.Status.PENDING):
+ entry.on_pending()
+
+
def _recover_all_recoverable_entries(self):
orphans = _drone_manager.get_orphaned_autoserv_processes()
self._recover_running_entries(orphans)
@@ -824,103 +837,70 @@ class Dispatcher(object):
if self.host_has_agent(task.host):
raise SchedulerError(
"%s already has a host agent %s." % (
- task, self._host_agents.get(host.id)))
-
- host = Host(id=task.host.id)
- queue_entry = None
- if task.queue_entry:
- queue_entry = HostQueueEntry(id=task.queue_entry.id)
+ task, self._host_agents.get(task.host.id)))
run_monitor, process_string = self._get_recovery_run_monitor(
task.execution_path(), _AUTOSERV_PID_FILE, orphans)
logging.info('Recovering %s %s', task, process_string)
- self._recover_special_task(task, host, queue_entry, run_monitor)
+ self._recover_special_task(task, run_monitor)
- def _recover_special_task(self, task, host, queue_entry, run_monitor):
+ def _recover_special_task(self, task, run_monitor):
"""\
Recovers a single special task.
"""
if task.task == models.SpecialTask.Task.VERIFY:
- agent_tasks = self._recover_verify(task, host, queue_entry,
- run_monitor)
+ agent_task = self._recover_verify(task, run_monitor)
elif task.task == models.SpecialTask.Task.REPAIR:
- agent_tasks = self._recover_repair(task, host, queue_entry,
- run_monitor)
+ agent_task = self._recover_repair(task, run_monitor)
elif task.task == models.SpecialTask.Task.CLEANUP:
- agent_tasks = self._recover_cleanup(task, host, queue_entry,
- run_monitor)
+ agent_task = self._recover_cleanup(task, run_monitor)
else:
# Should never happen
logging.error(
"Special task id %d had invalid task %s", (task.id, task.task))
- self.add_agent(Agent(agent_tasks))
+ self.add_agent(Agent(agent_task))
- def _recover_verify(self, task, host, queue_entry, run_monitor):
+ def _recover_verify(self, task, run_monitor):
"""\
Recovers a verify task.
No associated queue entry: Verify host
With associated queue entry: Verify host, and run associated queue
entry
"""
- if not task.queue_entry:
- return [VerifyTask(host=host, task=task,
- recover_run_monitor=run_monitor)]
- else:
- return [VerifyTask(queue_entry=queue_entry, task=task,
- recover_run_monitor=run_monitor),
- SetEntryPendingTask(queue_entry=queue_entry)]
+ return VerifyTask(task=task, recover_run_monitor=run_monitor)
- def _recover_repair(self, task, host, queue_entry, run_monitor):
+ def _recover_repair(self, task, run_monitor):
"""\
Recovers a repair task.
Always repair host
"""
- return [RepairTask(host=host, queue_entry=queue_entry, task=task,
- recover_run_monitor=run_monitor)]
+ return RepairTask(task=task, recover_run_monitor=run_monitor)
- def _recover_cleanup(self, task, host, queue_entry, run_monitor):
+ def _recover_cleanup(self, task, run_monitor):
"""\
Recovers a cleanup task.
No associated queue entry: Clean host
With associated queue entry: Clean host, verify host if needed, and
run associated queue entry
"""
- if not task.queue_entry:
- return [CleanupTask(host=host, task=task,
- recover_run_monitor=run_monitor)]
- else:
- agent_tasks = [CleanupTask(queue_entry=queue_entry,
- task=task,
- recover_run_monitor=run_monitor)]
- if queue_entry.job.should_run_verify(queue_entry):
- agent_tasks.append(VerifyTask(queue_entry=queue_entry))
- agent_tasks.append(
- SetEntryPendingTask(queue_entry=queue_entry))
- return agent_tasks
-
-
- def _requeue_starting_entries(self):
- # temporary measure until we implement proper recovery of Starting HQEs
- for entry in HostQueueEntry.fetch(where='status="Starting"'):
- logging.info('Requeuing "Starting" queue entry %s' % entry)
- assert not self.get_agents_for_entry(entry)
- assert entry.host.status == models.Host.Status.PENDING
- self._reverify_hosts_where('id = %s' % entry.host.id)
- entry.requeue()
+ return CleanupTask(task=task, recover_run_monitor=run_monitor)
def _check_for_remaining_active_entries(self):
queue_entries = HostQueueEntry.fetch(
- where='active AND NOT complete AND status != "Pending"')
+ where='active AND NOT complete AND status NOT IN '
+ '("Starting", "Gathering", "Pending")')
unrecovered_active_hqes = [entry for entry in queue_entries
- if not self.get_agents_for_entry(entry)]
+ if not self.get_agents_for_entry(entry) and
+ not self._host_has_scheduled_special_task(
+ entry.host)]
if unrecovered_active_hqes:
message = '\n'.join(str(hqe) for hqe in unrecovered_active_hqes)
raise SchedulerError(
@@ -928,18 +908,31 @@ class Dispatcher(object):
(len(unrecovered_active_hqes), message))
- def _find_reverify(self):
- tasks = models.SpecialTask.objects.filter(
- task=models.SpecialTask.Task.VERIFY, is_active=False,
- is_complete=False, queue_entry__isnull=True)
+ def _schedule_special_tasks(self):
+ tasks = models.SpecialTask.objects.filter(is_active=False,
+ is_complete=False,
+ host__locked=False)
+ # We want lower ids to come first, but the NULL queue_entry_ids need to
+ # come last
+ tasks = tasks.extra(select={'isnull' : 'queue_entry_id IS NULL'})
+ tasks = tasks.extra(order_by=['isnull', 'id'])
for task in tasks:
- host = Host.fetch(where='id = %s', params=(task.host.id,)).next()
- if host.locked or host.invalid or self.host_has_agent(host):
+ if self.host_has_agent(task.host):
+ continue
+
+ if task.task == models.SpecialTask.Task.CLEANUP:
+ agent_task = CleanupTask(task=task)
+ elif task.task == models.SpecialTask.Task.VERIFY:
+ agent_task = VerifyTask(task=task)
+ elif task.task == models.SpecialTask.Task.REPAIR:
+ agent_task = RepairTask(task=task)
+ else:
+ email_manager.manager.enqueue_notify_email(
+ 'Special task with invalid task', task)
continue
- logging.info('Force reverifying host %s', host.hostname)
- self.add_agent(Agent([VerifyTask(host=host, task=task)]))
+ self.add_agent(Agent(agent_task))
def _reverify_remaining_hosts(self):
@@ -948,7 +941,7 @@ class Dispatcher(object):
message = ('Recovering active host %s - this probably indicates a '
'scheduler bug')
self._reverify_hosts_where(
- "status IN ('Repairing', 'Verifying', 'Cleaning', 'Running')",
+ "status IN ('Repairing', 'Verifying', 'Cleaning')",
print_message=message)
@@ -959,10 +952,14 @@ class Dispatcher(object):
if self.host_has_agent(host):
# host has already been recovered in some way
continue
+ if self._host_has_scheduled_special_task(host):
+ # host will have a special task scheduled on the next cycle
+ continue
if print_message:
logging.info(print_message, host.hostname)
- tasks = host.reverify_tasks()
- self.add_agent(Agent(tasks))
+ models.SpecialTask.objects.create(
+ task=models.SpecialTask.Task.CLEANUP,
+ host=models.Host(id=host.id))
def _recover_hosts(self):
@@ -1045,9 +1042,51 @@ class Dispatcher(object):
self._schedule_atomic_group(queue_entry)
+ def _schedule_running_host_queue_entries(self):
+ entries = HostQueueEntry.fetch(
+ where="status IN "
+ "('Starting', 'Running', 'Gathering', 'Parsing')")
+ for entry in entries:
+ if self.get_agents_for_entry(entry):
+ continue
+
+ task_entries = entry.job.get_group_entries(entry)
+ for task_entry in task_entries:
+ if (task_entry.status != models.HostQueueEntry.Status.PARSING
+ and self.host_has_agent(task_entry.host)):
+ agent = self._host_agents.get(task_entry.host.id)[0]
+ raise SchedulerError('Attempted to schedule on host that '
+ 'already has agent: %s (previous '
+ 'agent task: %s)'
+ % (task_entry, agent.task))
+
+ if entry.status in (models.HostQueueEntry.Status.STARTING,
+ models.HostQueueEntry.Status.RUNNING):
+ params = entry.job.get_autoserv_params(task_entries)
+ agent_task = QueueTask(job=entry.job,
+ queue_entries=task_entries, cmd=params)
+ elif entry.status == models.HostQueueEntry.Status.GATHERING:
+ agent_task = GatherLogsTask(
+ job=entry.job, queue_entries=task_entries)
+ elif entry.status == models.HostQueueEntry.Status.PARSING:
+ agent_task = FinalReparseTask(queue_entries=task_entries)
+ else:
+ raise SchedulerError('_schedule_running_host_queue_entries got '
+ 'entry with invalid status %s: %s'
+ % (entry.status, entry))
+
+ self.add_agent(Agent(agent_task, num_processes=len(task_entries)))
+
+
+ def _schedule_delay_tasks(self):
+ for entry in HostQueueEntry.fetch(where="status = 'Waiting'"):
+ task = entry.job.schedule_delayed_callback_task(entry)
+ if task:
+ self.add_agent(Agent(task, num_processes=0))
+
+
def _run_queue_entry(self, queue_entry, host):
- agent = queue_entry.run_pre_job_tasks(assigned_host=host)
- self.add_agent(agent)
+ queue_entry.schedule_pre_job_tasks(assigned_host=host)
def _find_aborting(self):
@@ -1086,17 +1125,16 @@ class Dispatcher(object):
have_reached_limit = False
# iterate over copy, so we can remove agents during iteration
for agent in list(self._agents):
- if agent.is_done():
- logging.info("agent finished")
- self.remove_agent(agent)
- continue
- if not agent.is_running():
+ if not agent.started:
if not self._can_start_agent(agent, num_started_this_cycle,
have_reached_limit):
have_reached_limit = True
continue
num_started_this_cycle += agent.num_processes
agent.tick()
+ if agent.is_done():
+ logging.info("agent finished")
+ self.remove_agent(agent)
logging.info('%d running processes',
_drone_manager.total_running_processes())
@@ -1315,7 +1353,7 @@ class PidfileRunMonitor(object):
class Agent(object):
"""
- An agent for use by the Dispatcher class to perform a sequence of tasks.
+ An agent for use by the Dispatcher class to perform a task.
The following methods are required on all task objects:
poll() - Called periodically to let the task check its status and
@@ -1326,8 +1364,6 @@ class Agent(object):
The following attributes are required on all task objects:
aborted - bool, True if this task was aborted.
- failure_tasks - A sequence of tasks to be run using a new Agent
- by the dispatcher should this task fail.
success - bool, True if this task succeeded.
queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
host_ids - A sequence of Host ids this task represents.
@@ -1338,88 +1374,44 @@ class Agent(object):
"""
- def __init__(self, tasks, num_processes=1):
+ def __init__(self, task, num_processes=1):
"""
- @param tasks: A list of tasks as described in the class docstring.
+ @param task: A task as described in the class docstring.
@param num_processes: The number of subprocesses the Agent represents.
This is used by the Dispatcher for managing the load on the
system. Defaults to 1.
"""
- self.active_task = None
- self.queue = None
+ self.task = task
+ task.agent = self
+
# This is filled in by Dispatcher.add_agent()
self.dispatcher = None
self.num_processes = num_processes
- self.queue_entry_ids = self._union_ids(task.queue_entry_ids
- for task in tasks)
- self.host_ids = self._union_ids(task.host_ids for task in tasks)
-
- self._clear_queue()
- for task in tasks:
- self.add_task(task)
-
-
- def _clear_queue(self):
- self.queue = Queue.Queue(0)
-
+ self.queue_entry_ids = task.queue_entry_ids
+ self.host_ids = task.host_ids
- def _union_ids(self, id_lists):
- return set(itertools.chain(*id_lists))
-
-
- def add_task(self, task):
- self.queue.put_nowait(task)
- task.agent = self
+ self.started = False
def tick(self):
- while not self.is_done():
- if self.active_task:
- self.active_task.poll()
- if not self.active_task.is_done():
- return
- self._next_task()
-
-
- def _next_task(self):
- logging.info("agent picking task")
- if self.active_task is not None:
- assert self.active_task.is_done()
- if not self.active_task.success:
- self.on_task_failure()
- self.active_task = None
-
- if not self.is_done():
- self.active_task = self.queue.get_nowait()
-
-
- def on_task_failure(self):
- self._clear_queue()
- # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
- # get reset.
- new_agent = Agent(self.active_task.failure_tasks)
- self.dispatcher.add_agent(new_agent)
-
-
- def is_running(self):
- return self.active_task is not None
+ self.started = True
+ if self.task:
+ self.task.poll()
+ if self.task.is_done():
+ self.task = None
def is_done(self):
- return self.active_task is None and self.queue.empty()
+ return self.task is None
def abort(self):
- # abort tasks until the queue is empty or a task ignores the abort
- while not self.is_done():
- if not self.active_task:
- self._next_task()
- self.active_task.abort()
- if not self.active_task.aborted:
+ if self.task:
+ self.task.abort()
+ if self.task.aborted:
# tasks can choose to ignore aborts
- return
- self.active_task = None
+ self.task = None
class DelayedCallTask(object):
@@ -1455,7 +1447,6 @@ class DelayedCallTask(object):
# These attributes are required by Agent.
self.aborted = False
- self.failure_tasks = ()
self.host_ids = ()
self.success = False
self.queue_entry_ids = ()
@@ -1464,29 +1455,24 @@ class DelayedCallTask(object):
def poll(self):
- if self._callback and self._now_func() >= self.end_time:
- new_agent = self._callback()
- if new_agent:
- self.agent.dispatcher.add_agent(new_agent)
- self._callback = None
+ if not self.is_done() and self._now_func() >= self.end_time:
+ self._callback()
self.success = True
def is_done(self):
- return not self._callback
+ return self.success or self.aborted
def abort(self):
self.aborted = True
- self._callback = None
class AgentTask(object):
- def __init__(self, cmd=None, working_directory=None, failure_tasks=[],
+ def __init__(self, cmd=None, working_directory=None,
pidfile_name=None, paired_with_pidfile=None,
recover_run_monitor=None):
self.done = False
- self.failure_tasks = failure_tasks
self.cmd = cmd
self._working_directory = working_directory
self.agent = None
@@ -1598,8 +1584,8 @@ class AgentTask(object):
def _parse_results(self, queue_entries):
- reparse_task = FinalReparseTask(queue_entries)
- self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
+ for queue_entry in queue_entries:
+ queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
def _copy_and_parse_results(self, queue_entries, use_monitor=None):
@@ -1647,7 +1633,7 @@ class TaskWithJobKeyvals(object):
self._write_keyval_after_job("job_finished", int(time.time()))
-class SpecialAgentTask(AgentTask):
+class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
"""
Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
"""
@@ -1657,9 +1643,14 @@ class SpecialAgentTask(AgentTask):
queue_entry = None
def __init__(self, task, extra_command_args, **kwargs):
- assert self.host
assert (self.TASK_TYPE is not None,
'self.TASK_TYPE must be overridden')
+
+ self.host = Host(id=task.host.id)
+ self.queue_entry = None
+ if task.queue_entry:
+ self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
+
self.task = task
if task:
kwargs['working_directory'] = task.execution_path()
@@ -1667,9 +1658,12 @@ class SpecialAgentTask(AgentTask):
super(SpecialAgentTask, self).__init__(**kwargs)
+ def _keyval_path(self):
+ return os.path.join(self._working_directory, self._KEYVAL_FILE)
+
+
def prolog(self):
super(SpecialAgentTask, self).prolog()
- self.task = models.SpecialTask.prepare(self, self.task)
self.cmd = _autoserv_command_line(self.host.hostname,
self._extra_command_args,
queue_entry=self.queue_entry)
@@ -1677,86 +1671,80 @@ class SpecialAgentTask(AgentTask):
self.task.activate()
- def cleanup(self):
- super(SpecialAgentTask, self).cleanup()
+ def _fail_queue_entry(self):
+ assert self.queue_entry
- # self.task can be None if a SpecialAgentTask is aborted before the
- # prolog runs
- if self.task:
- self.task.finish()
+ if self.queue_entry.meta_host:
+ return # don't fail metahost entries, they'll be reassigned
- if self.monitor and self.monitor.has_process() and self.task:
+ self.queue_entry.update_from_database()
+ if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
+ return # entry has been aborted
+
+ self.queue_entry.set_execution_subdir()
+ queued_key, queued_time = self._job_queued_keyval(
+ self.queue_entry.job)
+ self._write_keyval_after_job(queued_key, queued_time)
+ self._write_job_finished()
+
+ # copy results logs into the normal place for job results
+ self.monitor.try_copy_results_on_drone(
+ source_path=self._working_directory + '/',
+ destination_path=self.queue_entry.execution_path() + '/')
+
+ self._copy_results([self.queue_entry])
+ self.queue_entry.handle_host_failure()
+ if self.queue_entry.job.parse_failed_repair:
+ self._parse_results([self.queue_entry])
+
+ pidfile_id = _drone_manager.get_pidfile_id_from(
+ self.queue_entry.execution_path(),
+ pidfile_name=_AUTOSERV_PID_FILE)
+ _drone_manager.register_pidfile(pidfile_id)
+
+
+ def cleanup(self):
+ super(SpecialAgentTask, self).cleanup()
+ self.task.finish()
+ if self.monitor and self.monitor.has_process():
self._copy_results([self.task])
-class RepairTask(SpecialAgentTask, TaskWithJobKeyvals):
+class RepairTask(SpecialAgentTask):
TASK_TYPE = models.SpecialTask.Task.REPAIR
- def __init__(self, host, queue_entry=None, task=None,
- recover_run_monitor=None):
+ def __init__(self, task, recover_run_monitor=None):
"""\
queue_entry: queue entry to mark failed if this repair fails.
"""
- protection = host_protections.Protection.get_string(host.protection)
+ protection = host_protections.Protection.get_string(
+ task.host.protection)
# normalize the protection name
protection = host_protections.Protection.get_attr_name(protection)
- self.host = host
- self.queue_entry = queue_entry
-
super(RepairTask, self).__init__(
task, ['-R', '--host-protection', protection],
recover_run_monitor=recover_run_monitor)
# *don't* include the queue entry in IDs -- if the queue entry is
# aborted, we want to leave the repair task running
- self._set_ids(host=host)
+ self._set_ids(host=self.host)
def prolog(self):
super(RepairTask, self).prolog()
logging.info("repair_task starting")
- self.host.set_status('Repairing')
-
-
- def _keyval_path(self):
- return os.path.join(self._working_directory, self._KEYVAL_FILE)
-
-
- def _fail_queue_entry(self):
- assert self.queue_entry
-
- if self.queue_entry.meta_host:
- return # don't fail metahost entries, they'll be reassigned
-
- self.queue_entry.update_from_database()
- if self.queue_entry.status != 'Queued':
- return # entry has been aborted
-
- self.queue_entry.set_execution_subdir()
- queued_key, queued_time = self._job_queued_keyval(
- self.queue_entry.job)
- self._write_keyval_after_job(queued_key, queued_time)
- self._write_job_finished()
-
- self.monitor.try_copy_results_on_drone(
- source_path=self._working_directory + '/',
- destination_path=self.queue_entry.execution_path() + '/')
-
- self._copy_results([self.queue_entry])
- if self.queue_entry.job.parse_failed_repair:
- self._parse_results([self.queue_entry])
- self.queue_entry.handle_host_failure()
+ self.host.set_status(models.Host.Status.REPAIRING)
def epilog(self):
super(RepairTask, self).epilog()
if self.success:
- self.host.set_status('Ready')
+ self.host.set_status(models.Host.Status.READY)
else:
- self.host.set_status('Repair Failed')
+ self.host.set_status(models.Host.Status.REPAIR_FAILED)
if self.queue_entry:
self._fail_queue_entry()
@@ -1777,26 +1765,34 @@ class PreJobTask(SpecialAgentTask):
self.monitor.try_copy_to_results_repository(
source, destination_path=destination)
- if not self.success and self.queue_entry:
- self.queue_entry.requeue()
+ if not self.success:
+ if self.queue_entry:
+ self.queue_entry.requeue()
+ if models.SpecialTask.objects.filter(
+ task=models.SpecialTask.Task.REPAIR,
+ queue_entry__id=self.queue_entry.id):
+ self.host.set_status(models.Host.Status.REPAIR_FAILED)
+ self._fail_queue_entry()
+ return
+ queue_entry = models.HostQueueEntry(id=self.queue_entry.id)
+ else:
+ queue_entry = None
+
+ models.SpecialTask.objects.create(
+ host=models.Host(id=self.host.id),
+ task=models.SpecialTask.Task.REPAIR,
+ queue_entry=queue_entry)
class VerifyTask(PreJobTask):
TASK_TYPE = models.SpecialTask.Task.VERIFY
- def __init__(self, queue_entry=None, host=None, task=None,
- recover_run_monitor=None):
- assert bool(queue_entry) != bool(host)
- self.host = host or queue_entry.host
- self.queue_entry = queue_entry
-
- failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
+ def __init__(self, task, recover_run_monitor=None):
super(VerifyTask, self).__init__(
- task, ['-v'], failure_tasks=failure_tasks,
- recover_run_monitor=recover_run_monitor)
+ task, ['-v'], recover_run_monitor=recover_run_monitor)
- self._set_ids(host=host, queue_entries=[queue_entry])
+ self._set_ids(host=self.host, queue_entries=[self.queue_entry])
def prolog(self):
@@ -1804,8 +1800,8 @@ class VerifyTask(PreJobTask):
logging.info("starting verify on %s", self.host.hostname)
if self.queue_entry:
- self.queue_entry.set_status('Verifying')
- self.host.set_status('Verifying')
+ self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
+ self.host.set_status(models.Host.Status.VERIFYING)
# Delete any other queued verifies for this host. One verify will do
# and there's no need to keep records of other requests.
@@ -1820,7 +1816,10 @@ class VerifyTask(PreJobTask):
def epilog(self):
super(VerifyTask, self).epilog()
if self.success:
- self.host.set_status('Ready')
+ if self.queue_entry:
+ self.queue_entry.on_pending()
+ else:
+ self.host.set_status(models.Host.Status.READY)
class CleanupHostsMixin(object):
@@ -1838,18 +1837,18 @@ class CleanupHostsMixin(object):
if do_reboot:
# don't pass the queue entry to the CleanupTask. if the cleanup
# fails, the job doesn't care -- it's over.
- cleanup_task = CleanupTask(host=queue_entry.host)
- self.agent.dispatcher.add_agent(Agent([cleanup_task]))
+ models.SpecialTask.objects.create(
+ host=models.Host(id=queue_entry.host.id),
+ task=models.SpecialTask.Task.CLEANUP)
else:
- queue_entry.host.set_status('Ready')
+ queue_entry.host.set_status(models.Host.Status.READY)
class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
- def __init__(self, job, queue_entries, cmd=None, group_name='',
- recover_run_monitor=None):
+ def __init__(self, job, queue_entries, cmd=None, recover_run_monitor=None):
self.job = job
self.queue_entries = queue_entries
- self.group_name = group_name
+ self.group_name = queue_entries[0].get_group_name()
super(QueueTask, self).__init__(
cmd, self._execution_path(),
recover_run_monitor=recover_run_monitor)
@@ -1887,6 +1886,18 @@ class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
def prolog(self):
+ for entry in self.queue_entries:
+ if entry.status not in (models.HostQueueEntry.Status.STARTING,
+ models.HostQueueEntry.Status.RUNNING):
+ raise SchedulerError('Queue task attempting to start '
+ 'entry with invalid status %s: %s'
+ % (entry.status, entry))
+ if entry.host.status not in (models.Host.Status.PENDING,
+ models.Host.Status.RUNNING):
+ raise SchedulerError('Queue task attempting to start on queue '
+ 'entry with invalid host status %s: %s'
+ % (entry.host.status, entry))
+
queued_key, queued_time = self._job_queued_keyval(self.job)
keyval_dict = {queued_key: queued_time}
if self.group_name:
@@ -1894,9 +1905,9 @@ class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
self._write_keyvals_before_job(keyval_dict)
for queue_entry in self.queue_entries:
self._write_host_keyvals(queue_entry.host)
- queue_entry.set_status('Running')
+ queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
queue_entry.update_field('started_on', datetime.datetime.now())
- queue_entry.host.set_status('Running')
+ queue_entry.host.set_status(models.Host.Status.RUNNING)
queue_entry.host.update_field('dirty', 1)
if self.job.synch_count == 1 and len(self.queue_entries) == 1:
# TODO(gps): Remove this if nothing needs it anymore.
@@ -1919,8 +1930,8 @@ class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
if self.monitor.lost_process:
self._write_lost_process_error_file()
- gather_task = GatherLogsTask(self.job, self.queue_entries)
- self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
+ for queue_entry in self.queue_entries:
+ queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
def _write_status_comment(self, comment):
@@ -2029,13 +2040,13 @@ class PostJobTask(AgentTask):
def run(self):
- # make sure we actually have results to work with.
- # this should never happen in normal operation.
+ # Make sure we actually have results to work with.
+ # This should never happen in normal operation.
if not self._autoserv_monitor.has_process():
email_manager.manager.enqueue_notify_email(
- 'No results in post-job task',
- 'No results in post-job task at %s' %
- self._autoserv_monitor.pidfile_id)
+ 'No results in post-job task',
+ 'No results in post-job task at %s' %
+ self._autoserv_monitor.pidfile_id)
self.finished(False)
return
@@ -2080,8 +2091,16 @@ class GatherLogsTask(PostJobTask, CleanupHostsMixin):
def prolog(self):
+ for queue_entry in self._queue_entries:
+ if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
+ raise SchedulerError('Gather task attempting to start on '
+ 'non-gathering entry: %s' % queue_entry)
+ if queue_entry.host.status != models.Host.Status.RUNNING:
+ raise SchedulerError('Gather task attempting to start on queue '
+ 'entry with non-running host: '
+ '%s' % queue_entry)
+
super(GatherLogsTask, self).prolog()
- self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
def epilog(self):
@@ -2116,35 +2135,50 @@ class CleanupTask(PreJobTask):
TASK_TYPE = models.SpecialTask.Task.CLEANUP
- def __init__(self, host=None, queue_entry=None, task=None,
- recover_run_monitor=None):
- assert bool(host) ^ bool(queue_entry)
- if queue_entry:
- host = queue_entry.get_host()
- self.queue_entry = queue_entry
- self.host = host
-
- repair_task = RepairTask(host, queue_entry=queue_entry)
+ def __init__(self, task, recover_run_monitor=None):
super(CleanupTask, self).__init__(
- task, ['--cleanup'], failure_tasks=[repair_task],
- recover_run_monitor=recover_run_monitor)
+ task, ['--cleanup'], recover_run_monitor=recover_run_monitor)
- self._set_ids(host=host, queue_entries=[queue_entry])
+ self._set_ids(host=self.host, queue_entries=[self.queue_entry])
def prolog(self):
super(CleanupTask, self).prolog()
logging.info("starting cleanup task for host: %s", self.host.hostname)
- self.host.set_status("Cleaning")
+ self.host.set_status(models.Host.Status.CLEANING)
+ if self.queue_entry:
+ self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
+
+
+ def _should_run_verify(self):
+ do_not_verify = (self.host.protection ==
+ host_protections.Protection.DO_NOT_VERIFY)
+ if do_not_verify:
+ return False
+ return self.queue_entry and self.queue_entry.job.run_verify
def epilog(self):
super(CleanupTask, self).epilog()
if self.success:
- self.host.set_status('Ready')
self.host.update_field('dirty', 0)
+ if self.queue_entry:
+ queue_entry = models.HostQueueEntry(id=self.queue_entry.id)
+ else:
+ queue_entry = None
+
+ if self._should_run_verify():
+ models.SpecialTask.objects.create(
+ host=models.Host(id=self.host.id),
+ queue_entry=queue_entry,
+ task=models.SpecialTask.Task.VERIFY)
+ elif self.queue_entry:
+ self.queue_entry.on_pending()
+ else:
+ self.host.set_status(models.Host.Status.READY)
+
class FinalReparseTask(PostJobTask):
_num_running_parses = 0
@@ -2176,8 +2210,12 @@ class FinalReparseTask(PostJobTask):
def prolog(self):
+ for queue_entry in self._queue_entries:
+ if queue_entry.status != models.HostQueueEntry.Status.PARSING:
+ raise SchedulerError('Parse task attempting to start on '
+ 'non-parsing entry: %s' % queue_entry)
+
super(FinalReparseTask, self).prolog()
- self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
def epilog(self):
@@ -2221,20 +2259,6 @@ class FinalReparseTask(PostJobTask):
self._decrement_running_parses()
-class SetEntryPendingTask(AgentTask):
- def __init__(self, queue_entry):
- super(SetEntryPendingTask, self).__init__(cmd='')
- self._queue_entry = queue_entry
- self._set_ids(queue_entries=[queue_entry])
-
-
- def run(self):
- agent = self._queue_entry.on_pending()
- if agent:
- self.agent.dispatcher.add_agent(agent)
- self.finished(True)
-
-
class DBError(Exception):
"""Raised by the DBObject constructor when its select fails."""
@@ -2267,7 +2291,9 @@ class DBObject(object):
def __init__(self, id=None, row=None, new_record=False, always_query=True):
- assert (bool(id) != bool(row))
+ assert bool(id) or bool(row)
+ if id is not None and row is not None:
+ assert id == row[0]
assert self._table_name, '_table_name must be defined in your class'
assert self._fields, '_fields must be defined in your class'
if not new_record:
@@ -2435,8 +2461,7 @@ class DBObject(object):
'where' : where,
'order_by' : order_by})
rows = _db.execute(query, params)
- for row in rows:
- yield cls(row=row)
+ return [cls(id=row[0], row=row) for row in rows]
class IneligibleHostQueue(DBObject):
@@ -2492,15 +2517,6 @@ class Host(DBObject):
return platform, all_labels
- def reverify_tasks(self):
- cleanup_task = CleanupTask(host=self)
- verify_task = VerifyTask(host=self)
-
- # just to make sure this host does not get taken away
- self.set_status('Cleaning')
- return [cleanup_task, verify_task]
-
-
_ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
@@ -2666,16 +2682,23 @@ class HostQueueEntry(DBObject):
logging.info("%s -> %s", self, self.status)
- if status in ['Queued', 'Parsing']:
+ if status in (models.HostQueueEntry.Status.QUEUED,
+ models.HostQueueEntry.Status.PARSING):
self.update_field('complete', False)
self.update_field('active', False)
- if status in ['Pending', 'Running', 'Verifying', 'Starting',
- 'Gathering']:
+ if status in (models.HostQueueEntry.Status.PENDING,
+ models.HostQueueEntry.Status.RUNNING,
+ models.HostQueueEntry.Status.VERIFYING,
+ models.HostQueueEntry.Status.STARTING,
+ models.HostQueueEntry.Status.GATHERING):
self.update_field('complete', False)
self.update_field('active', True)
- if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
+ if status in (models.HostQueueEntry.Status.FAILED,
+ models.HostQueueEntry.Status.COMPLETED,
+ models.HostQueueEntry.Status.STOPPED,
+ models.HostQueueEntry.Status.ABORTED):
self.update_field('complete', True)
self.update_field('active', False)
@@ -2723,7 +2746,7 @@ class HostQueueEntry(DBObject):
email_manager.manager.send_email(self.job.email_list, subject, body)
- def run_pre_job_tasks(self, assigned_host=None):
+ def schedule_pre_job_tasks(self, assigned_host=None):
if self.meta_host is not None or self.atomic_group:
assert assigned_host
# ensure results dir exists for the queue log
@@ -2736,22 +2759,19 @@ class HostQueueEntry(DBObject):
self.job.name, self.meta_host, self.atomic_group_id,
self.host.hostname, self.status)
- return self._do_run_pre_job_tasks()
+ self._do_schedule_pre_job_tasks()
- def _do_run_pre_job_tasks(self):
+ def _do_schedule_pre_job_tasks(self):
# Every host goes thru the Verifying stage (which may or may not
# actually do anything as determined by get_pre_job_tasks).
self.set_status(models.HostQueueEntry.Status.VERIFYING)
-
- # The pre job tasks always end with a SetEntryPendingTask which
- # will continue as appropriate through queue_entry.on_pending().
- return Agent(self.job.get_pre_job_tasks(queue_entry=self))
+ self.job.schedule_pre_job_tasks(queue_entry=self)
def requeue(self):
assert self.host
- self.set_status('Queued')
+ self.set_status(models.HostQueueEntry.Status.QUEUED)
self.update_field('started_on', None)
# verify/cleanup failure sets the execution subdir, so reset it here
self.set_execution_subdir('')
@@ -2765,7 +2785,7 @@ class HostQueueEntry(DBObject):
repair.
"""
assert not self.meta_host
- self.set_status('Failed')
+ self.set_status(models.HostQueueEntry.Status.FAILED)
self.job.stop_if_necessary()
@@ -2801,43 +2821,59 @@ class HostQueueEntry(DBObject):
def on_pending(self):
"""
Called when an entry in a synchronous job has passed verify. If the
- job is ready to run, returns an agent to run the job. Returns None
- otherwise.
+ job is ready to run, sets the entries to STARTING. Otherwise, it leaves
+ them in PENDING.
"""
- self.set_status('Pending')
- self.get_host().set_status('Pending')
+ self.set_status(models.HostQueueEntry.Status.PENDING)
+ self.host.set_status(models.Host.Status.PENDING)
# Some debug code here: sends an email if an asynchronous job does not
# immediately enter Starting.
# TODO: Remove this once we figure out why asynchronous jobs are getting
# stuck in Pending.
- agent = self.job.run_if_ready(queue_entry=self)
- if self.job.synch_count == 1 and agent is None:
+ self.job.run_if_ready(queue_entry=self)
+ if (self.job.synch_count == 1 and
+ self.status == models.HostQueueEntry.Status.PENDING):
subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
message = 'Asynchronous job stuck in Pending'
email_manager.manager.enqueue_notify_email(subject, message)
- return agent
def abort(self, dispatcher):
assert self.aborted and not self.complete
Status = models.HostQueueEntry.Status
- has_running_job_agent = (
- self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
- and dispatcher.get_agents_for_entry(self))
- if has_running_job_agent:
+ if self.status in (Status.GATHERING, Status.PARSING):
# do nothing; post-job tasks will finish and then mark this entry
# with status "Aborted" and take care of the host
return
- if self.status in (Status.STARTING, Status.PENDING):
+ if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
+ assert not dispatcher.get_agents_for_entry(self)
self.host.set_status(models.Host.Status.READY)
elif self.status == Status.VERIFYING:
- dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
+ models.SpecialTask.objects.create(
+ task=models.SpecialTask.Task.CLEANUP,
+ host=models.Host(id=self.host.id))
self.set_status(Status.ABORTED)
+
+ def get_group_name(self):
+ atomic_group = self.atomic_group
+ if not atomic_group:
+ return ''
+
+ # Look at any meta_host and dependency labels and pick the first
+ # one that also specifies this atomic group. Use that label name
+ # as the group name if possible (it is more specific).
+ for label in self.get_labels():
+ if label.atomic_group_id:
+ assert label.atomic_group_id == atomic_group.id
+ return label.name
+ return atomic_group.name
+
+
def execution_tag(self):
assert self.execution_subdir
return "%s/%s" % (self.job.tag(), self.execution_subdir)
@@ -3048,7 +3084,7 @@ class Job(DBObject):
params=(self.id, execution_subdir)))
- def _get_autoserv_params(self, queue_entries):
+ def get_autoserv_params(self, queue_entries):
assert queue_entries
execution_tag = queue_entries[0].execution_tag()
control_path = self._write_control_file(execution_tag)
@@ -3075,7 +3111,7 @@ class Job(DBObject):
return False
- def should_run_verify(self, queue_entry):
+ def _should_run_verify(self, queue_entry):
do_not_verify = (queue_entry.host.protection ==
host_protections.Protection.DO_NOT_VERIFY)
if do_not_verify:
@@ -3083,7 +3119,7 @@ class Job(DBObject):
return self.run_verify
- def get_pre_job_tasks(self, queue_entry):
+ def schedule_pre_job_tasks(self, queue_entry):
"""
Get a list of tasks to perform before the host_queue_entry
may be used to run this Job (such as Cleanup & Verify).
@@ -3093,13 +3129,18 @@ class Job(DBObject):
task in the list calls HostQueueEntry.on_pending(), which
continues the flow of the job.
"""
- tasks = []
if self._should_run_cleanup(queue_entry):
- tasks.append(CleanupTask(queue_entry=queue_entry))
- if self.should_run_verify(queue_entry):
- tasks.append(VerifyTask(queue_entry=queue_entry))
- tasks.append(SetEntryPendingTask(queue_entry))
- return tasks
+ task = models.SpecialTask.Task.CLEANUP
+ elif self._should_run_verify(queue_entry):
+ task = models.SpecialTask.Task.VERIFY
+ else:
+ queue_entry.on_pending()
+ return
+
+ models.SpecialTask.objects.create(
+ host=models.Host(id=queue_entry.host_id),
+ queue_entry=models.HostQueueEntry(id=queue_entry.id),
+ task=task)
def _assign_new_group(self, queue_entries, group_name=''):
@@ -3144,21 +3185,10 @@ class Job(DBObject):
# Sanity check. We'll only ever be called if this can be met.
assert len(chosen_entries) >= self.synch_count
- if atomic_group:
- # Look at any meta_host and dependency labels and pick the first
- # one that also specifies this atomic group. Use that label name
- # as the group name if possible (it is more specific).
- group_name = atomic_group.name
- for label in include_queue_entry.get_labels():
- if label.atomic_group_id:
- assert label.atomic_group_id == atomic_group.id
- group_name = label.name
- break
- else:
- group_name = ''
+ group_name = include_queue_entry.get_group_name()
self._assign_new_group(chosen_entries, group_name=group_name)
- return chosen_entries, group_name
+ return chosen_entries
def run_if_ready(self, queue_entry):
@@ -3170,12 +3200,10 @@ class Job(DBObject):
"""
if not self.is_ready():
self.stop_if_necessary()
- return None
-
- if queue_entry.atomic_group:
- return self.run_with_ready_delay(queue_entry)
-
- return self.run(queue_entry)
+ elif queue_entry.atomic_group:
+ self.run_with_ready_delay(queue_entry)
+ else:
+ self.run(queue_entry)
def run_with_ready_delay(self, queue_entry):
@@ -3200,12 +3228,19 @@ class Job(DBObject):
# Delay is disabled or we already have enough? Do not wait to run.
if not delay or over_max_threshold or delay_expired:
- return self.run(queue_entry)
+ self.run(queue_entry)
+ else:
+ queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
+
+
+ def schedule_delayed_callback_task(self, queue_entry):
+ queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
- # A delay was previously scheduled.
if self._delay_ready_task:
return None
+ delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
+
def run_job_after_delay():
logging.info('Job %s done waiting for extra hosts.', self.id)
return self.run(queue_entry)
@@ -3214,39 +3249,29 @@ class Job(DBObject):
self.id, delay)
self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
callback=run_job_after_delay)
-
- return Agent([self._delay_ready_task], num_processes=0)
+ return self._delay_ready_task
def run(self, queue_entry):
"""
@param queue_entry: The HostQueueEntry instance calling this method.
- @returns An Agent instance to run this job or None if we've already
- been run.
"""
if queue_entry.atomic_group and self._atomic_and_has_started():
logging.error('Job.run() called on running atomic Job %d '
'with HQE %s.', self.id, queue_entry)
- return None
- queue_entries, group_name = self._choose_group_to_run(queue_entry)
- return self._finish_run(queue_entries, group_name)
+ return
+ queue_entries = self._choose_group_to_run(queue_entry)
+ self._finish_run(queue_entries)
- def _finish_run(self, queue_entries, group_name):
+ def _finish_run(self, queue_entries):
for queue_entry in queue_entries:
- queue_entry.set_status('Starting')
- params = self._get_autoserv_params(queue_entries)
- queue_task = QueueTask(job=self, queue_entries=queue_entries,
- cmd=params, group_name=group_name)
- tasks = [queue_task]
+ queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
if self._delay_ready_task:
# Cancel any pending callback that would try to run again
# as we are already running.
self._delay_ready_task.abort()
- return Agent(tasks, num_processes=len(queue_entries))
-
-
def __str__(self):
return '%s-%s' % (self.id, self.owner)
diff --git a/scheduler/monitor_db_unittest.py b/scheduler/monitor_db_unittest.py
index ea4d0685..78793341 100755
--- a/scheduler/monitor_db_unittest.py
+++ b/scheduler/monitor_db_unittest.py
@@ -18,18 +18,15 @@ _DEBUG = False
class DummyAgent(object):
- _is_running = False
+ started = False
_is_done = False
num_processes = 1
host_ids = []
queue_entry_ids = []
- def is_running(self):
- return self._is_running
-
def tick(self):
- self._is_running = True
+ self.started = True
def is_done(self):
@@ -38,7 +35,6 @@ class DummyAgent(object):
def set_done(self, done):
self._is_done = done
- self._is_running = not done
class IsRow(mock.argument_comparator):
@@ -197,14 +193,14 @@ class DispatcherSchedulingTest(BaseSchedulerTest):
def _set_monitor_stubs(self):
super(DispatcherSchedulingTest, self)._set_monitor_stubs()
- def hqe__do_run_pre_job_tasks_stub(queue_entry):
- """Return a test dummy. Called by HostQueueEntry.run()."""
+ def hqe__do_schedule_pre_job_tasks_stub(queue_entry):
+ """Called by HostQueueEntry.run()."""
self._record_job_scheduled(queue_entry.job.id, queue_entry.host.id)
queue_entry.set_status('Starting')
- return DummyAgent()
- self.god.stub_with(monitor_db.HostQueueEntry, '_do_run_pre_job_tasks',
- hqe__do_run_pre_job_tasks_stub)
+ self.god.stub_with(monitor_db.HostQueueEntry,
+ '_do_schedule_pre_job_tasks',
+ hqe__do_schedule_pre_job_tasks_stub)
def hqe_queue_log_record_stub(self, log_line):
"""No-Op to avoid calls down to the _drone_manager during tests."""
@@ -457,9 +453,9 @@ class DispatcherSchedulingTest(BaseSchedulerTest):
self._dispatcher._refresh_pending_queue_entries()
atomic_hqe = monitor_db.HostQueueEntry.fetch(where='job_id=%d' %
- atomic_job.id).next()
+ atomic_job.id)[0]
normal_hqe = monitor_db.HostQueueEntry.fetch(where='job_id=%d' %
- normal_job.id).next()
+ normal_job.id)[0]
host_scheduler = self._dispatcher._host_scheduler
self.assertTrue(host_scheduler._check_atomic_group_labels(
@@ -478,7 +474,7 @@ class DispatcherSchedulingTest(BaseSchedulerTest):
def test_HostScheduler_get_host_atomic_group_id(self):
job = self._create_job(metahosts=[self.label6.id])
queue_entry = monitor_db.HostQueueEntry.fetch(
- where='job_id=%d' % job.id).next()
+ where='job_id=%d' % job.id)[0]
# Indirectly initialize the internal state of the host scheduler.
self._dispatcher._refresh_pending_queue_entries()
@@ -774,7 +770,7 @@ class DispatcherThrottlingTest(BaseSchedulerTest):
def fake_max_runnable_processes(fake_self):
running = sum(agent.num_processes
for agent in self._agents
- if agent.is_running())
+ if agent.started and not agent.is_done())
return self._MAX_RUNNING - running
self.god.stub_with(drone_manager.DroneManager, 'max_runnable_processes',
fake_max_runnable_processes)
@@ -792,7 +788,7 @@ class DispatcherThrottlingTest(BaseSchedulerTest):
def _assert_agents_started(self, indexes, is_started=True):
for i in indexes:
- self.assert_(self._agents[i].is_running() == is_started,
+ self.assert_(self._agents[i].started == is_started,
'Agent %d %sstarted' %
(i, is_started and 'not ' or ''))
@@ -853,29 +849,25 @@ class FindAbortTest(BaseSchedulerTest):
"""
def _check_host_agent(self, agent, host_id):
self.assert_(isinstance(agent, monitor_db.Agent))
- tasks = list(agent.queue.queue)
- self.assertEquals(len(tasks), 2)
- cleanup, verify = tasks
+ self.assert_(agent.task)
+ cleanup = agent.task
self.assert_(isinstance(cleanup, monitor_db.CleanupTask))
self.assertEquals(cleanup.host.id, host_id)
- self.assert_(isinstance(verify, monitor_db.VerifyTask))
- self.assertEquals(verify.host.id, host_id)
-
def _check_agents(self, agents):
agents = list(agents)
- self.assertEquals(len(agents), 3)
- self.assertEquals(agents[0], self._agent)
- self._check_host_agent(agents[1], 1)
- self._check_host_agent(agents[2], 2)
+ self.assertEquals(len(agents), 2)
+ self._check_host_agent(agents[0], 1)
+ self._check_host_agent(agents[1], 2)
def _common_setup(self):
self._create_job(hosts=[1, 2])
self._update_hqe(set='aborted=1')
self._agent = self.god.create_mock_class(monitor_db.Agent, 'old_agent')
+ self._agent.started = True
_set_host_and_qe_ids(self._agent, [1, 2])
self._agent.abort.expect_call()
self._agent.abort.expect_call() # gets called once for each HQE
@@ -892,8 +884,11 @@ class FindAbortTest(BaseSchedulerTest):
self._common_setup()
self._update_hqe(set='active=1, status="Verifying"')
+ self._agent.tick.expect_call()
+ self._agent.is_done.expect_call().and_return(True)
self._dispatcher._find_aborting()
-
+ self._dispatcher._handle_agents()
+ self._dispatcher._schedule_special_tasks()
self._check_agents(self._dispatcher._agents)
self.god.check_playback()
@@ -1128,8 +1123,8 @@ class AgentTest(unittest.TestCase):
_set_host_and_qe_ids(task)
return task
- def _create_agent(self, tasks):
- agent = monitor_db.Agent(tasks)
+ def _create_agent(self, task):
+ agent = monitor_db.Agent(task)
agent.dispatcher = self._dispatcher
return agent
@@ -1139,65 +1134,20 @@ class AgentTest(unittest.TestCase):
agent.tick()
- def test_agent(self):
- task1 = self._create_mock_task('task1')
- task2 = self._create_mock_task('task2')
- task3 = self._create_mock_task('task3')
- task1.poll.expect_call()
- task1.is_done.expect_call().and_return(False)
- task1.poll.expect_call()
- task1.is_done.expect_call().and_return(True)
- task1.is_done.expect_call().and_return(True)
- task1.success = True
-
- task2.poll.expect_call()
- task2.is_done.expect_call().and_return(True)
- task2.is_done.expect_call().and_return(True)
- task2.success = False
- task2.failure_tasks = [task3]
-
- self._dispatcher.add_agent.expect_call(IsAgentWithTask(task3))
-
- agent = self._create_agent([task1, task2])
- self._finish_agent(agent)
- self.god.check_playback()
-
-
- def _test_agent_abort_helper(self, ignore_abort=False):
- task1 = self._create_mock_task('task1')
- task2 = self._create_mock_task('task2')
- task1.poll.expect_call()
- task1.is_done.expect_call().and_return(False)
- task1.abort.expect_call()
- if ignore_abort:
- task1.aborted = False # task ignores abort; execution continues
-
- task1.poll.expect_call()
- task1.is_done.expect_call().and_return(True)
- task1.is_done.expect_call().and_return(True)
- task1.success = True
-
- task2.poll.expect_call()
- task2.is_done.expect_call().and_return(True)
- task2.is_done.expect_call().and_return(True)
- task2.success = True
- else:
- task1.aborted = True
- task2.abort.expect_call()
- task2.aborted = True
+ def test_agent_abort(self):
+ task = self._create_mock_task('task')
+ task.poll.expect_call()
+ task.is_done.expect_call().and_return(False)
+ task.abort.expect_call()
+ task.aborted = True
- agent = self._create_agent([task1, task2])
+ agent = self._create_agent(task)
agent.tick()
agent.abort()
self._finish_agent(agent)
self.god.check_playback()
- def test_agent_abort(self):
- self._test_agent_abort_helper()
- self._test_agent_abort_helper(True)
-
-
def _test_agent_abort_before_started_helper(self, ignore_abort=False):
task = self._create_mock_task('task')
task.abort.expect_call()
@@ -1205,12 +1155,11 @@ class AgentTest(unittest.TestCase):
task.aborted = False
task.poll.expect_call()
task.is_done.expect_call().and_return(True)
- task.is_done.expect_call().and_return(True)
task.success = True
else:
task.aborted = True
- agent = self._create_agent([task])
+ agent = self._create_agent(task)
agent.abort()
self._finish_agent(agent)
self.god.check_playback()
@@ -1243,8 +1192,8 @@ class DelayedCallTaskTest(unittest.TestCase):
delay_seconds=2, callback=test_callback,
now_func=test_time) # time 33
self.assertEqual(35, delay_task.end_time)
- agent = monitor_db.Agent([delay_task], num_processes=0)
- self.assert_(not agent.active_task)
+ agent = monitor_db.Agent(delay_task, num_processes=0)
+ self.assert_(not agent.started)
agent.tick() # activates the task and polls it once, time 34.01
self.assertEqual(0, test_callback.calls, "callback called early")
agent.tick() # time 34.99
@@ -1261,7 +1210,7 @@ class DelayedCallTaskTest(unittest.TestCase):
def test_delayed_call_abort(self):
delay_task = monitor_db.DelayedCallTask(
delay_seconds=987654, callback=lambda : None)
- agent = monitor_db.Agent([delay_task], num_processes=0)
+ agent = monitor_db.Agent(delay_task, num_processes=0)
agent.abort()
agent.tick()
self.assert_(agent.is_done())
@@ -1348,6 +1297,17 @@ class AgentTasksTest(BaseSchedulerTest):
host=host, meta_host=None)
queue_entry.save()
+ self.task = models.SpecialTask(id=1, host=host,
+ task=models.SpecialTask.Task.CLEANUP,
+ is_active=False, is_complete=False,
+ time_requested=datetime.datetime.now(),
+ time_started=None,
+ queue_entry=queue_entry)
+ self.task.save()
+ self.god.stub_function(self.task, 'activate')
+ self.god.stub_function(self.task, 'finish')
+ self.god.stub_function(models.SpecialTask.objects, 'create')
+
self._dispatcher = self.god.create_mock_class(monitor_db.Dispatcher,
'dispatcher')
@@ -1393,6 +1353,8 @@ class AgentTasksTest(BaseSchedulerTest):
monitor_db.PidfileRunMonitor.exit_code.expect_call().and_return(
exit_status)
+ self.task.finish.expect_call()
+
if copy_log_file:
self._setup_move_logfile()
@@ -1414,7 +1376,24 @@ class AgentTasksTest(BaseSchedulerTest):
mock.is_string_comparator())
- def _test_repair_task_helper(self, success, task_tag, queue_entry=None):
+ def _setup_special_task(self, task_id, task_type, use_queue_entry):
+ self.task.id = task_id
+ self.task.task = task_type
+ if use_queue_entry:
+ queue_entry = models.HostQueueEntry(id=self.queue_entry.id)
+ else:
+ queue_entry = None
+ self.task.queue_entry = queue_entry
+ self.task.save()
+
+
+ def _test_repair_task_helper(self, success, task_id, use_queue_entry=False):
+ self._setup_special_task(task_id, models.SpecialTask.Task.REPAIR,
+ use_queue_entry)
+ task_tag = '%d-repair' % task_id
+
+ self.task.activate.expect_call()
+
self.host.set_status.expect_call('Repairing')
if success:
self.setup_run_monitor(0, task_tag)
@@ -1423,8 +1402,11 @@ class AgentTasksTest(BaseSchedulerTest):
self.setup_run_monitor(1, task_tag)
self.host.set_status.expect_call('Repair Failed')
- task = monitor_db.RepairTask(self.host, queue_entry=queue_entry)
- self.assertEquals(task.failure_tasks, [])
+ task = monitor_db.RepairTask(task=self.task)
+ task.host = self.host
+ if use_queue_entry:
+ task.queue_entry = self.queue_entry
+
self.run_task(task, success)
expected_protection = host_protections.Protection.get_string(
@@ -1441,8 +1423,8 @@ class AgentTasksTest(BaseSchedulerTest):
def test_repair_task(self):
- self._test_repair_task_helper(True, '1-repair')
- self._test_repair_task_helper(False, '2-repair')
+ self._test_repair_task_helper(True, 1)
+ self._test_repair_task_helper(False, 2)
def test_repair_task_with_hqe_already_requeued(self):
@@ -1450,8 +1432,7 @@ class AgentTasksTest(BaseSchedulerTest):
# already been requeued. ensure it leaves the HQE alone in that case.
self.queue_entry.meta_host = 1
self.queue_entry.host = None
- self._test_repair_task_helper(False, '1-repair',
- queue_entry=self.queue_entry)
+ self._test_repair_task_helper(False, 1, True)
def test_recovery_repair_task_working_directory(self):
@@ -1460,31 +1441,38 @@ class AgentTasksTest(BaseSchedulerTest):
class MockSpecialTask(object):
def execution_path(self):
return '/my/path'
+ host = models.Host(id=self.host.id)
+ queue_entry = None
- special_task = MockSpecialTask()
- task = monitor_db.RepairTask(self.host, task=special_task)
+ task = monitor_db.RepairTask(task=MockSpecialTask())
self.assertEquals(task._working_directory, '/my/path')
def test_repair_task_aborted(self):
+ self._setup_special_task(1, models.SpecialTask.Task.REPAIR, False)
+
+ self.task.activate.expect_call()
self.host.set_status.expect_call('Repairing')
self.setup_run_monitor(0, '1-repair', aborted=True)
- task = monitor_db.RepairTask(self.host)
+ task = monitor_db.RepairTask(task=self.task)
+ task.host = self.host
task.agent = object()
+
task.poll()
task.abort()
self.assertTrue(task.done)
self.assertTrue(task.aborted)
- self.assertTrue(task.task.is_complete)
- self.assertFalse(task.task.is_active)
self.god.check_playback()
def _test_repair_task_with_queue_entry_helper(self, parse_failed_repair,
- task_tag):
+ task_id):
+ self._setup_special_task(task_id, models.SpecialTask.Task.REPAIR, True)
+ task_tag = '%d-repair' % task_id
+
self.god.stub_class(monitor_db, 'FinalReparseTask')
self.god.stub_class(monitor_db, 'Agent')
self.god.stub_class_method(monitor_db.TaskWithJobKeyvals,
@@ -1492,10 +1480,13 @@ class AgentTasksTest(BaseSchedulerTest):
agent = DummyAgent()
agent.dispatcher = self._dispatcher
+ self.task.activate.expect_call()
+
self.host.set_status.expect_call('Repairing')
self.setup_run_monitor(1, task_tag)
self.host.set_status.expect_call('Repair Failed')
self.queue_entry.update_from_database.expect_call()
+ self.queue_entry.status = 'Queued'
self.queue_entry.set_execution_subdir.expect_call()
monitor_db.TaskWithJobKeyvals._write_keyval_after_job.expect_call(
'job_queued', mock.is_instance_comparator(int))
@@ -1505,26 +1496,29 @@ class AgentTasksTest(BaseSchedulerTest):
self.queue_entry.execution_path.expect_call().and_return('tag')
self._setup_move_logfile()
self.job.parse_failed_repair = parse_failed_repair
- if parse_failed_repair:
- reparse_task = monitor_db.FinalReparseTask.expect_new(
- [self.queue_entry])
- reparse_agent = monitor_db.Agent.expect_new([reparse_task],
- num_processes=0)
- self._dispatcher.add_agent.expect_call(reparse_agent)
self.queue_entry.handle_host_failure.expect_call()
+ if parse_failed_repair:
+ self.queue_entry.set_status.expect_call('Parsing')
+ self.queue_entry.execution_path.expect_call().and_return('tag')
+ drone_manager.DroneManager.get_pidfile_id_from.expect_call(
+ 'tag', pidfile_name=monitor_db._AUTOSERV_PID_FILE).and_return(
+ self.PIDFILE_ID)
- task = monitor_db.RepairTask(self.host, self.queue_entry)
+ task = monitor_db.RepairTask(task=self.task)
task.agent = agent
+ task.host = self.host
+ task.queue_entry = self.queue_entry
self.queue_entry.status = 'Queued'
self.job.created_on = datetime.datetime(2009, 1, 1)
+
self.run_task(task, False)
self.assertTrue(set(task.cmd) >= self.JOB_AUTOSERV_PARAMS)
self.god.check_playback()
def test_repair_task_with_queue_entry(self):
- self._test_repair_task_with_queue_entry_helper(True, '1-repair')
- self._test_repair_task_with_queue_entry_helper(False, '2-repair')
+ self._test_repair_task_with_queue_entry_helper(True, 1)
+ self._test_repair_task_with_queue_entry_helper(False, 2)
def _setup_prejob_task_failure(self, task_tag, use_queue_entry):
@@ -1535,40 +1529,43 @@ class AgentTasksTest(BaseSchedulerTest):
self.queue_entry.execution_path.expect_call().and_return('tag')
self._setup_move_logfile(include_destination=True)
self.queue_entry.requeue.expect_call()
+ queue_entry = models.HostQueueEntry(id=self.queue_entry.id)
+ else:
+ queue_entry = None
+ models.SpecialTask.objects.create.expect_call(
+ host=models.Host(id=self.host.id),
+ task=models.SpecialTask.Task.REPAIR,
+ queue_entry=queue_entry)
def setup_verify_expects(self, success, use_queue_entry, task_tag):
+ self.task.activate.expect_call()
+
if use_queue_entry:
self.queue_entry.set_status.expect_call('Verifying')
self.host.set_status.expect_call('Verifying')
if success:
self.setup_run_monitor(0, task_tag)
- self.host.set_status.expect_call('Ready')
+ if use_queue_entry:
+ self.queue_entry.on_pending.expect_call()
+ else:
+ self.host.set_status.expect_call('Ready')
else:
self._setup_prejob_task_failure(task_tag, use_queue_entry)
- def _check_verify_failure_tasks(self, verify_task):
- self.assertEquals(len(verify_task.failure_tasks), 1)
- repair_task = verify_task.failure_tasks[0]
- self.assert_(isinstance(repair_task, monitor_db.RepairTask))
- self.assertEquals(verify_task.host, repair_task.host)
- if verify_task.queue_entry:
- self.assertEquals(repair_task.queue_entry,
- verify_task.queue_entry)
- else:
- self.assertEquals(repair_task.queue_entry, None)
-
-
- def _test_verify_task_helper(self, success, task_tag, use_queue_entry=False,
+ def _test_verify_task_helper(self, success, task_id, use_queue_entry=False,
use_meta_host=False):
+ self._setup_special_task(task_id, models.SpecialTask.Task.VERIFY,
+ use_queue_entry)
+ task_tag = '%d-verify' % task_id
self.setup_verify_expects(success, use_queue_entry, task_tag)
+ task = monitor_db.VerifyTask(task=self.task)
+ task.host = self.host
if use_queue_entry:
- task = monitor_db.VerifyTask(queue_entry=self.queue_entry)
- else:
- task = monitor_db.VerifyTask(host=self.host)
- self._check_verify_failure_tasks(task)
+ task.queue_entry = self.queue_entry
+
self.run_task(task, success)
self.assertTrue(set(task.cmd) >=
set([monitor_db._autoserv_path, '-p', '-v', '-m',
@@ -1580,13 +1577,13 @@ class AgentTasksTest(BaseSchedulerTest):
def test_verify_task_with_host(self):
- self._test_verify_task_helper(True, '1-verify')
- self._test_verify_task_helper(False, '2-verify')
+ self._test_verify_task_helper(True, 1)
+ self._test_verify_task_helper(False, 2)
def test_verify_task_with_queue_entry(self):
- self._test_verify_task_helper(True, '1-verify', use_queue_entry=True)
- self._test_verify_task_helper(False, '2-verify', use_queue_entry=True)
+ self._test_verify_task_helper(True, 1, use_queue_entry=True)
+ self._test_verify_task_helper(False, 2, use_queue_entry=True)
def test_verify_task_with_metahost(self):
@@ -1595,13 +1592,15 @@ class AgentTasksTest(BaseSchedulerTest):
def test_specialtask_abort_before_prolog(self):
- task = monitor_db.RepairTask(self.host)
+ self._setup_special_task(1, models.SpecialTask.Task.REPAIR, False)
+ task = monitor_db.RepairTask(task=self.task)
+ self.task.finish.expect_call()
task.abort()
self.assertTrue(task.aborted)
def _setup_post_job_task_expects(self, autoserv_success, hqe_status=None,
- hqe_aborted=False):
+ hqe_aborted=False, host_status=None):
self.queue_entry.execution_path.expect_call().and_return('tag')
self.pidfile_monitor = monitor_db.PidfileRunMonitor.expect_new()
self.pidfile_monitor.pidfile_id = self.PIDFILE_ID
@@ -1616,7 +1615,9 @@ class AgentTasksTest(BaseSchedulerTest):
self.pidfile_monitor.exit_code.expect_call().and_return(code)
if hqe_status:
- self.queue_entry.set_status.expect_call(hqe_status)
+ self.queue_entry.status = hqe_status
+ if host_status:
+ self.host.status = host_status
def _setup_pre_parse_expects(self, autoserv_success):
@@ -1717,7 +1718,7 @@ class AgentTasksTest(BaseSchedulerTest):
self.god.stub_class(monitor_db, 'PidfileRunMonitor')
self.god.stub_class(monitor_db, 'FinalReparseTask')
self._setup_post_job_task_expects(not autoserv_killed, 'Gathering',
- hqe_aborted)
+ hqe_aborted, host_status='Running')
if hqe_aborted or not has_process:
exit_code = None
elif autoserv_killed:
@@ -1739,10 +1740,7 @@ class AgentTasksTest(BaseSchedulerTest):
self.pidfile_monitor.has_process.expect_call().and_return(False)
self.pidfile_monitor.has_process.expect_call().and_return(False)
- parse_task = monitor_db.FinalReparseTask.expect_new([self.queue_entry])
- _set_host_and_qe_ids(parse_task)
- self._dispatcher.add_agent.expect_call(IsAgentWithTask(parse_task))
-
+ self.queue_entry.set_status.expect_call('Parsing')
self.pidfile_monitor.has_process.expect_call().and_return(has_process)
if has_process:
@@ -1776,10 +1774,9 @@ class AgentTasksTest(BaseSchedulerTest):
def _setup_gather_task_cleanup_expects(self):
- self.god.stub_class(monitor_db, 'CleanupTask')
- cleanup_task = monitor_db.CleanupTask.expect_new(host=self.host)
- _set_host_and_qe_ids(cleanup_task)
- self._dispatcher.add_agent.expect_call(IsAgentWithTask(cleanup_task))
+ models.SpecialTask.objects.create.expect_call(
+ host=models.Host(id=self.host.id),
+ task=models.SpecialTask.Task.CLEANUP)
def test_gather_logs_reboot_hosts(self):
@@ -1806,27 +1803,36 @@ class AgentTasksTest(BaseSchedulerTest):
self._run_gather_logs_task(success=False)
- def _test_cleanup_task_helper(self, success, task_tag,
+ def _test_cleanup_task_helper(self, success, task_id,
use_queue_entry=False):
- if use_queue_entry:
- self.queue_entry.get_host.expect_call().and_return(self.host)
+ self._setup_special_task(task_id, models.SpecialTask.Task.CLEANUP,
+ use_queue_entry)
+ task_tag = '%d-cleanup' % task_id
+
+ self.task.activate.expect_call()
self.host.set_status.expect_call('Cleaning')
+
+ if use_queue_entry:
+ self.queue_entry.set_status.expect_call('Verifying')
+
if success:
self.setup_run_monitor(0, task_tag)
- self.host.set_status.expect_call('Ready')
self.host.update_field.expect_call('dirty', 0)
+ if use_queue_entry:
+ queue_entry = models.HostQueueEntry(id=self.queue_entry.id)
+ models.SpecialTask.objects.create.expect_call(
+ host=models.Host(id=self.host.id),
+ task=models.SpecialTask.Task.VERIFY,
+ queue_entry=queue_entry)
+ else:
+ self.host.set_status.expect_call('Ready')
else:
self._setup_prejob_task_failure(task_tag, use_queue_entry)
+ task = monitor_db.CleanupTask(task=self.task)
+ task.host = self.host
if use_queue_entry:
- task = monitor_db.CleanupTask(queue_entry=self.queue_entry)
- else:
- task = monitor_db.CleanupTask(host=self.host)
- self.assertEquals(len(task.failure_tasks), 1)
- repair_task = task.failure_tasks[0]
- self.assert_(isinstance(repair_task, monitor_db.RepairTask))
- if use_queue_entry:
- self.assertEquals(repair_task.queue_entry, self.queue_entry)
+ task.queue_entry = self.queue_entry
self.run_task(task, success)
@@ -1839,12 +1845,12 @@ class AgentTasksTest(BaseSchedulerTest):
self.assertTrue(set(task.cmd) >= self.JOB_AUTOSERV_PARAMS)
def test_cleanup_task(self):
- self._test_cleanup_task_helper(True, '1-cleanup')
- self._test_cleanup_task_helper(False, '2-cleanup')
+ self._test_cleanup_task_helper(True, 1)
+ self._test_cleanup_task_helper(False, 2)
def test_cleanup_task_with_queue_entry(self):
- self._test_cleanup_task_helper(False, '1-cleanup', True)
+ self._test_cleanup_task_helper(False, 1, use_queue_entry=True)
def test_recovery_queue_task_aborted_early(self):
@@ -1854,6 +1860,7 @@ class AgentTasksTest(BaseSchedulerTest):
run_monitor = self.god.create_mock_class(monitor_db.PidfileRunMonitor,
'run_monitor')
+ self.queue_entry.get_group_name.expect_call().and_return('')
self.queue_entry.execution_path.expect_call().and_return('tag')
run_monitor.kill.expect_call()
monitor_db.QueueTask._log_abort.expect_call()
@@ -1945,20 +1952,22 @@ class JobTest(BaseSchedulerTest):
mock.mock_function('attach_file_to_execution',
default_return_val='/test/path/tmp/foo'))
+ def _mock_create(**kwargs):
+ task = models.SpecialTask(**kwargs)
+ task.save()
+ self._task = task
+ self.god.stub_with(models.SpecialTask.objects, 'create', _mock_create)
+
def _test_pre_job_tasks_helper(self):
"""
- Calls HQE._do_pre_run_job_tasks() and returns the task list after
- confirming that the last task is the SetEntryPendingTask.
+ Calls HQE._do_schedule_pre_job_tasks() and returns the created special
+ task
"""
- queue_entry = monitor_db.HostQueueEntry.fetch('id = 1').next()
- pre_job_agent = queue_entry._do_run_pre_job_tasks()
- self.assert_(isinstance(pre_job_agent, monitor_db.Agent))
- pre_job_tasks = list(pre_job_agent.queue.queue)
- self.assertTrue(isinstance(pre_job_tasks[-1],
- monitor_db.SetEntryPendingTask))
-
- return pre_job_tasks
+ self._task = None
+ queue_entry = monitor_db.HostQueueEntry.fetch('id = 1')[0]
+ queue_entry._do_schedule_pre_job_tasks()
+ return self._task
def _test_run_helper(self, expect_agent=True, expect_starting=False,
@@ -1969,12 +1978,17 @@ class JobTest(BaseSchedulerTest):
expected_status = models.HostQueueEntry.Status.PENDING
else:
expected_status = models.HostQueueEntry.Status.VERIFYING
- job = monitor_db.Job.fetch('id = 1').next()
- queue_entry = monitor_db.HostQueueEntry.fetch('id = 1').next()
+ job = monitor_db.Job.fetch('id = 1')[0]
+ queue_entry = monitor_db.HostQueueEntry.fetch('id = 1')[0]
assert queue_entry.job is job
- agent = job.run_if_ready(queue_entry)
+ job.run_if_ready(queue_entry)
self.god.check_playback()
+
+ self._dispatcher._schedule_delay_tasks()
+ self._dispatcher._schedule_running_host_queue_entries()
+ agent = self._dispatcher._agents[0]
+
actual_status = models.HostQueueEntry.smart_get(1).status
self.assertEquals(expected_status, actual_status)
@@ -1983,18 +1997,8 @@ class JobTest(BaseSchedulerTest):
return
self.assert_(isinstance(agent, monitor_db.Agent))
- tasks = list(agent.queue.queue)
- return tasks
-
-
- def _check_verify_task(self, verify_task):
- self.assert_(isinstance(verify_task, monitor_db.VerifyTask))
- self.assertEquals(verify_task.queue_entry.id, 1)
-
-
- def _check_pending_task(self, pending_task):
- self.assert_(isinstance(pending_task, monitor_db.SetEntryPendingTask))
- self.assertEquals(pending_task._queue_entry.id, 1)
+ self.assert_(agent.task)
+ return agent.task
def test_run_if_ready_delays(self):
@@ -2022,63 +2026,78 @@ class JobTest(BaseSchedulerTest):
'secs_to_wait_for_atomic_group_hosts', 123456)
# Get the pending one as a monitor_db.HostQueueEntry object.
- pending_hqe = monitor_db.HostQueueEntry(django_hqes[1].id)
+ hqe = monitor_db.HostQueueEntry(django_hqes[1].id)
self.assert_(not job._delay_ready_task)
self.assertTrue(job.is_ready())
# Ready with one pending, one verifying and an atomic group should
# result in a DelayCallTask to re-check if we're ready a while later.
- agent = job.run_if_ready(pending_hqe)
+ job.run_if_ready(hqe)
+ self.assertEquals('Waiting', hqe.status)
+ self._dispatcher._schedule_delay_tasks()
+ self.assertEquals('Pending', hqe.status)
+ agent = self._dispatcher._agents[0]
self.assert_(job._delay_ready_task)
self.assert_(isinstance(agent, monitor_db.Agent))
- tasks = list(agent.queue.queue)
- self.assertEqual(1, len(tasks))
- self.assert_(isinstance(tasks[0], monitor_db.DelayedCallTask))
- delay_task = tasks[0]
+ self.assert_(agent.task)
+ delay_task = agent.task
+ self.assert_(isinstance(delay_task, monitor_db.DelayedCallTask))
self.assert_(not delay_task.is_done())
+ self.god.stub_function(delay_task, 'abort')
+
self.god.stub_function(job, 'run')
# Test that the DelayedCallTask's callback queued up above does the
# correct thing and returns the Agent returned by job.run().
- job.run.expect_call(pending_hqe).and_return('Fake Agent')
- self.assertEqual('Fake Agent', delay_task._callback())
-
- # A delay already exists, this must do nothing.
- self.assertEqual(None, job.run_with_ready_delay(pending_hqe))
+ job.run.expect_call(hqe)
+ delay_task._callback()
+ self.god.check_playback()
# Adjust the delay deadline so that enough time has passed.
job._delay_ready_task.end_time = time.time() - 111111
- job.run.expect_call(pending_hqe).and_return('Forty two')
+ job.run.expect_call(hqe)
# ...the delay_expired condition should cause us to call run()
- self.assertEqual('Forty two', job.run_with_ready_delay(pending_hqe))
+ self._dispatcher._handle_agents()
+ self.god.check_playback()
+ delay_task.success = False
# Adjust the delay deadline back so that enough time has not passed.
job._delay_ready_task.end_time = time.time() + 111111
- self.assertEqual(None, job.run_with_ready_delay(pending_hqe))
+ self._dispatcher._handle_agents()
+ self.god.check_playback()
- set_hqe_status(django_hqes[0], models.HostQueueEntry.Status.PENDING)
# Now max_number_of_machines HQEs are in pending state. Remaining
# delay will now be ignored.
- job.run.expect_call(pending_hqe).and_return('Watermelon')
+ other_hqe = monitor_db.HostQueueEntry(django_hqes[0].id)
+ self.god.unstub(job, 'run')
# ...the over_max_threshold test should cause us to call run()
- self.assertEqual('Watermelon', job.run_with_ready_delay(pending_hqe))
+ delay_task.abort.expect_call()
+ other_hqe.on_pending()
+ self.assertEquals('Starting', other_hqe.status)
+ self.assertEquals('Starting', hqe.status)
+ self.god.stub_function(job, 'run')
+ self.god.unstub(delay_task, 'abort')
+ hqe.set_status('Pending')
+ other_hqe.set_status('Pending')
# Now we're not over the max for the atomic group. But all assigned
# hosts are in pending state. over_max_threshold should make us run().
- pending_hqe.atomic_group.max_number_of_machines += 1
- pending_hqe.atomic_group.save()
- job.run.expect_call(pending_hqe).and_return('Watermelon')
- self.assertEqual('Watermelon', job.run_with_ready_delay(pending_hqe))
- pending_hqe.atomic_group.max_number_of_machines -= 1
- pending_hqe.atomic_group.save()
+ hqe.atomic_group.max_number_of_machines += 1
+ hqe.atomic_group.save()
+ job.run.expect_call(hqe)
+ hqe.on_pending()
+ self.god.check_playback()
+ hqe.atomic_group.max_number_of_machines -= 1
+ hqe.atomic_group.save()
other_hqe = monitor_db.HostQueueEntry(django_hqes[0].id)
- self.assertTrue(pending_hqe.job is other_hqe.job)
+ self.assertTrue(hqe.job is other_hqe.job)
# DBObject classes should reuse instances so these should be the same.
self.assertEqual(job, other_hqe.job)
- self.assertEqual(other_hqe.job, pending_hqe.job)
+ self.assertEqual(other_hqe.job, hqe.job)
# Be sure our delay was not lost during the other_hqe construction.
+ self.assertEqual(job._delay_ready_task, delay_task)
self.assert_(job._delay_ready_task)
self.assertFalse(job._delay_ready_task.is_done())
self.assertFalse(job._delay_ready_task.aborted)
@@ -2088,7 +2107,7 @@ class JobTest(BaseSchedulerTest):
# We pass in the other HQE this time the same way it would happen
# for real when one host finishes verifying and enters pending.
- agent = job.run_if_ready(other_hqe)
+ job.run_if_ready(other_hqe)
# The delayed task must be aborted by the actual run() call above.
self.assertTrue(job._delay_ready_task.aborted)
@@ -2096,9 +2115,11 @@ class JobTest(BaseSchedulerTest):
self.assertTrue(job._delay_ready_task.is_done())
# Check that job run() and _finish_run() were called by the above:
- tasks = list(agent.queue.queue)
- self.assertEqual(1, len(tasks))
- self.assert_(isinstance(tasks[0], monitor_db.QueueTask))
+ self._dispatcher._schedule_running_host_queue_entries()
+ agent = self._dispatcher._agents[0]
+ self.assert_(agent.task)
+ task = agent.task
+ self.assert_(isinstance(task, monitor_db.QueueTask))
# Requery these hqes in order to verify the status from the DB.
django_hqes = list(models.HostQueueEntry.objects.filter(job=job.id))
for entry in django_hqes:
@@ -2108,12 +2129,15 @@ class JobTest(BaseSchedulerTest):
# We're already running, but more calls to run_with_ready_delay can
# continue to come in due to straggler hosts enter Pending. Make
# sure we don't do anything.
- self.assertEqual(None, job.run_with_ready_delay(pending_hqe))
+ self.god.stub_function(job, 'run')
+ job.run_with_ready_delay(hqe)
+ self.god.check_playback()
+ self.god.unstub(job, 'run')
def test__atomic_and_has_started__on_atomic(self):
self._create_job(hosts=[5, 6], atomic_group=1)
- job = monitor_db.Job.fetch('id = 1').next()
+ job = monitor_db.Job.fetch('id = 1')[0]
self.assertFalse(job._atomic_and_has_started())
self._update_hqe("status='Pending'")
@@ -2135,21 +2159,25 @@ class JobTest(BaseSchedulerTest):
def test__atomic_and_has_started__not_atomic(self):
self._create_job(hosts=[1, 2])
- job = monitor_db.Job.fetch('id = 1').next()
+ job = monitor_db.Job.fetch('id = 1')[0]
self.assertFalse(job._atomic_and_has_started())
self._update_hqe("status='Starting'")
self.assertFalse(job._atomic_and_has_started())
+ def _check_special_task(self, task, task_type, queue_entry_id=None):
+ self.assertEquals(task.task, task_type)
+ self.assertEquals(task.host.id, 1)
+ if queue_entry_id:
+ self.assertEquals(task.queue_entry.id, queue_entry_id)
+
+
def test_run_asynchronous(self):
self._create_job(hosts=[1, 2])
- tasks = self._test_pre_job_tasks_helper()
+ task = self._test_pre_job_tasks_helper()
- self.assertEquals(len(tasks), 2)
- verify_task, pending_task = tasks
- self._check_verify_task(verify_task)
- self._check_pending_task(pending_task)
+ self._check_special_task(task, models.SpecialTask.Task.VERIFY, 1)
def test_run_asynchronous_skip_verify(self):
@@ -2157,21 +2185,17 @@ class JobTest(BaseSchedulerTest):
job.run_verify = False
job.save()
- tasks = self._test_pre_job_tasks_helper()
+ task = self._test_pre_job_tasks_helper()
- self.assertEquals(len(tasks), 1)
- pending_task = tasks[0]
- self._check_pending_task(pending_task)
+ self.assertEquals(task, None)
def test_run_synchronous_verify(self):
self._create_job(hosts=[1, 2], synchronous=True)
- tasks = self._test_pre_job_tasks_helper()
- self.assertEquals(len(tasks), 2)
- verify_task, pending_task = tasks
- self._check_verify_task(verify_task)
- self._check_pending_task(pending_task)
+ task = self._test_pre_job_tasks_helper()
+
+ self._check_special_task(task, models.SpecialTask.Task.VERIFY, 1)
def test_run_synchronous_skip_verify(self):
@@ -2179,18 +2203,16 @@ class JobTest(BaseSchedulerTest):
job.run_verify = False
job.save()
- tasks = self._test_pre_job_tasks_helper()
- self.assertEquals(len(tasks), 1)
- self._check_pending_task(tasks[0])
+ task = self._test_pre_job_tasks_helper()
+
+ self.assertEquals(task, None)
def test_run_synchronous_ready(self):
self._create_job(hosts=[1, 2], synchronous=True)
self._update_hqe("status='Pending', execution_subdir=''")
- tasks = self._test_run_helper(expect_starting=True)
- self.assertEquals(len(tasks), 1)
- queue_task = tasks[0]
+ queue_task = self._test_run_helper(expect_starting=True)
self.assert_(isinstance(queue_task, monitor_db.QueueTask))
self.assertEquals(queue_task.job.id, 1)
@@ -2202,8 +2224,8 @@ class JobTest(BaseSchedulerTest):
self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
self._update_hqe("status='Starting', execution_subdir=''")
- job = monitor_db.Job.fetch('id = 1').next()
- queue_entry = monitor_db.HostQueueEntry.fetch('id = 1').next()
+ job = monitor_db.Job.fetch('id = 1')[0]
+ queue_entry = monitor_db.HostQueueEntry.fetch('id = 1')[0]
assert queue_entry.job is job
self.assertEqual(None, job.run(queue_entry))
@@ -2214,9 +2236,7 @@ class JobTest(BaseSchedulerTest):
self._create_job(hosts=[5, 6], atomic_group=1, synchronous=True)
self._update_hqe("status='Pending', execution_subdir=''")
- tasks = self._test_run_helper(expect_starting=True)
- self.assertEquals(len(tasks), 1)
- queue_task = tasks[0]
+ queue_task = self._test_run_helper(expect_starting=True)
self.assert_(isinstance(queue_task, monitor_db.QueueTask))
# Atomic group jobs that do not depend on a specific label in the
@@ -2229,9 +2249,7 @@ class JobTest(BaseSchedulerTest):
job.dependency_labels.add(self.label4)
self._update_hqe("status='Pending', execution_subdir=''")
- tasks = self._test_run_helper(expect_starting=True)
- self.assertEquals(len(tasks), 1)
- queue_task = tasks[0]
+ queue_task = self._test_run_helper(expect_starting=True)
self.assert_(isinstance(queue_task, monitor_db.QueueTask))
# Atomic group jobs that also specify a label in the atomic group
@@ -2244,11 +2262,9 @@ class JobTest(BaseSchedulerTest):
job.reboot_before = models.RebootBefore.ALWAYS
job.save()
- tasks = self._test_pre_job_tasks_helper()
- self.assertEquals(len(tasks), 3)
- cleanup_task = tasks[0]
- self.assert_(isinstance(cleanup_task, monitor_db.CleanupTask))
- self.assertEquals(cleanup_task.host.id, 1)
+ task = self._test_pre_job_tasks_helper()
+
+ self._check_special_task(task, models.SpecialTask.Task.CLEANUP)
def _test_reboot_before_if_dirty_helper(self, expect_reboot):
@@ -2256,12 +2272,12 @@ class JobTest(BaseSchedulerTest):
job.reboot_before = models.RebootBefore.IF_DIRTY
job.save()
- tasks = self._test_pre_job_tasks_helper()
- self.assertEquals(len(tasks), expect_reboot and 3 or 2)
+ task = self._test_pre_job_tasks_helper()
if expect_reboot:
- cleanup_task = tasks[0]
- self.assert_(isinstance(cleanup_task, monitor_db.CleanupTask))
- self.assertEquals(cleanup_task.host.id, 1)
+ task_type = models.SpecialTask.Task.CLEANUP
+ else:
+ task_type = models.SpecialTask.Task.VERIFY
+ self._check_special_task(task, task_type)
def test_reboot_before_if_dirty(self):