summaryrefslogtreecommitdiff
path: root/scheduler/monitor_db_functional_test.py
blob: f0e1e10303c907fa7e8f6c209a0e9b0fcd5ebfe7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
#!/usr/bin/python

import logging, os, unittest
import common
from autotest_lib.client.common_lib import enum, global_config, host_protections
from autotest_lib.database import database_connection
from autotest_lib.frontend import setup_django_environment
from autotest_lib.frontend.afe import frontend_test_utils, models
from autotest_lib.scheduler import drone_manager, email_manager, monitor_db

# translations necessary for scheduler queries to work with SQLite
_re_translator = database_connection.TranslatingDatabase.make_regexp_translator
_DB_TRANSLATORS = (
        _re_translator(r'NOW\(\)', 'time("now")'),
        _re_translator(r'LAST_INSERT_ID\(\)', 'LAST_INSERT_ROWID()'),
        # older SQLite doesn't support group_concat, so just don't bother until
        # it arises in an important query
        _re_translator(r'GROUP_CONCAT\((.*?)\)', r'\1'),
)

HqeStatus = models.HostQueueEntry.Status
HostStatus = models.Host.Status

class NullMethodObject(object):
    _NULL_METHODS = ()

    def __init__(self):
        def null_method(*args, **kwargs):
            pass

        for method_name in self._NULL_METHODS:
            setattr(self, method_name, null_method)

class MockGlobalConfig(object):
    def __init__(self):
        self._config_info = {}


    def set_config_value(self, section, key, value):
        self._config_info[(section, key)] = value


    def get_config_value(self, section, key, type=str,
                         default=None, allow_blank=False):
        identifier = (section, key)
        if identifier not in self._config_info:
            raise RuntimeError('Unset global config value: %s' % (identifier,))
        return self._config_info[identifier]


# the SpecialTask names here must match the suffixes used on the SpecialTask
# results directories
_PidfileType = enum.Enum('verify', 'cleanup', 'repair', 'job', 'gather',
                         'parse', 'archive')


_PIDFILE_TO_PIDFILE_TYPE = {
        monitor_db._AUTOSERV_PID_FILE: _PidfileType.JOB,
        monitor_db._CRASHINFO_PID_FILE: _PidfileType.GATHER,
        monitor_db._PARSER_PID_FILE: _PidfileType.PARSE,
        monitor_db._ARCHIVER_PID_FILE: _PidfileType.ARCHIVE,
        }


_PIDFILE_TYPE_TO_PIDFILE = dict((value, key) for key, value
                                in _PIDFILE_TO_PIDFILE_TYPE.iteritems())


class MockDroneManager(NullMethodObject):
    """
    Public attributes:
    max_runnable_processes_value: value returned by max_runnable_processes().
            tests can change this to activate throttling.
    """
    _NULL_METHODS = ('reinitialize_drones', 'copy_to_results_repository',
                     'copy_results_on_drone')

    class _DummyPidfileId(object):
        """
        Object to represent pidfile IDs that is opaque to the scheduler code but
        still debugging-friendly for us.
        """
        def __init__(self, working_directory, pidfile_name, num_processes=None):
            self._working_directory = working_directory
            self._pidfile_name = pidfile_name
            self._num_processes = num_processes
            self._paired_with_pidfile = None


        def key(self):
            """Key for MockDroneManager._pidfile_index"""
            return (self._working_directory, self._pidfile_name)


        def __str__(self):
            return os.path.join(self._working_directory, self._pidfile_name)


        def __repr__(self):
            return '<_DummyPidfileId: %s>' % str(self)


    def __init__(self):
        super(MockDroneManager, self).__init__()
        self.process_capacity = 100

        # maps result_dir to set of tuples (file_path, file_contents)
        self._attached_files = {}
        # maps pidfile IDs to PidfileContents
        self._pidfiles = {}
        # pidfile IDs that haven't been created yet
        self._future_pidfiles = []
        # maps _PidfileType to the most recently created pidfile ID of that type
        self._last_pidfile_id = {}
        # maps (working_directory, pidfile_name) to pidfile IDs
        self._pidfile_index = {}
        # maps process to pidfile IDs
        self._process_index = {}
        # tracks pidfiles of processes that have been killed
        self._killed_pidfiles = set()
        # pidfile IDs that have just been unregistered (so will disappear on the
        # next cycle)
        self._unregistered_pidfiles = set()


    # utility APIs for use by the test

    def finish_process(self, pidfile_type, exit_status=0):
        pidfile_id = self._last_pidfile_id[pidfile_type]
        self._set_pidfile_exit_status(pidfile_id, exit_status)


    def finish_specific_process(self, working_directory, pidfile_name):
        pidfile_id = self.pidfile_from_path(working_directory, pidfile_name)
        self._set_pidfile_exit_status(pidfile_id, 0)


    def _set_pidfile_exit_status(self, pidfile_id, exit_status):
        assert pidfile_id is not None
        contents = self._pidfiles[pidfile_id]
        contents.exit_status = exit_status
        contents.num_tests_failed = 0


    def was_last_process_killed(self, pidfile_type):
        pidfile_id = self._last_pidfile_id[pidfile_type]
        return pidfile_id in self._killed_pidfiles


    def nonfinished_pidfile_ids(self):
        return [pidfile_id for pidfile_id, pidfile_contents
                in self._pidfiles.iteritems()
                if pidfile_contents.exit_status is None]


    def running_pidfile_ids(self):
        return [pidfile_id for pidfile_id in self.nonfinished_pidfile_ids()
                if self._pidfiles[pidfile_id].process is not None]


    def pidfile_from_path(self, working_directory, pidfile_name):
        return self._pidfile_index[(working_directory, pidfile_name)]


    def attached_files(self, working_directory):
        """
        Return dict mapping path to contents for attached files with specified
        paths.
        """
        return dict((path, contents) for path, contents
                    in self._attached_files.get(working_directory, [])
                    if path is not None)


    # DroneManager emulation APIs for use by monitor_db

    def get_orphaned_autoserv_processes(self):
        return set()


    def total_running_processes(self):
        return sum(pidfile_id._num_processes
                   for pidfile_id in self.nonfinished_pidfile_ids())


    def max_runnable_processes(self, username):
        return self.process_capacity - self.total_running_processes()


    def refresh(self):
        for pidfile_id in self._unregistered_pidfiles:
            # intentionally handle non-registered pidfiles silently
            self._pidfiles.pop(pidfile_id, None)
        self._unregistered_pidfiles = set()


    def execute_actions(self):
        # executing an "execute_command" causes a pidfile to be created
        for pidfile_id in self._future_pidfiles:
            # Process objects are opaque to monitor_db
            process = object()
            self._pidfiles[pidfile_id].process = process
            self._process_index[process] = pidfile_id
        self._future_pidfiles = []


    def attach_file_to_execution(self, result_dir, file_contents,
                                 file_path=None):
        self._attached_files.setdefault(result_dir, set()).add((file_path,
                                                                file_contents))
        return 'attach_path'


    def _initialize_pidfile(self, pidfile_id):
        if pidfile_id not in self._pidfiles:
            assert pidfile_id.key() not in self._pidfile_index
            self._pidfiles[pidfile_id] = drone_manager.PidfileContents()
            self._pidfile_index[pidfile_id.key()] = pidfile_id


    def _set_last_pidfile(self, pidfile_id, working_directory, pidfile_name):
        if working_directory.startswith('hosts/'):
            # such paths look like hosts/host1/1-verify, we'll grab the end
            type_string = working_directory.rsplit('-', 1)[1]
            pidfile_type = _PidfileType.get_value(type_string)
        else:
            pidfile_type = _PIDFILE_TO_PIDFILE_TYPE[pidfile_name]
        self._last_pidfile_id[pidfile_type] = pidfile_id


    def execute_command(self, command, working_directory, pidfile_name,
                        num_processes, log_file=None, paired_with_pidfile=None,
                        username=None):
        logging.debug('Executing %s in %s', command, working_directory)
        pidfile_id = self._DummyPidfileId(working_directory, pidfile_name)
        if pidfile_id.key() in self._pidfile_index:
            pidfile_id = self._pidfile_index[pidfile_id.key()]
        pidfile_id._num_processes = num_processes
        pidfile_id._paired_with_pidfile = paired_with_pidfile

        self._future_pidfiles.append(pidfile_id)
        self._initialize_pidfile(pidfile_id)
        self._pidfile_index[(working_directory, pidfile_name)] = pidfile_id
        self._set_last_pidfile(pidfile_id, working_directory, pidfile_name)
        return pidfile_id


    def get_pidfile_contents(self, pidfile_id, use_second_read=False):
        if pidfile_id not in self._pidfiles:
            logging.debug('Request for nonexistent pidfile %s' % pidfile_id)
        return self._pidfiles.get(pidfile_id, drone_manager.PidfileContents())


    def is_process_running(self, process):
        return True


    def register_pidfile(self, pidfile_id):
        self._initialize_pidfile(pidfile_id)


    def unregister_pidfile(self, pidfile_id):
        self._unregistered_pidfiles.add(pidfile_id)


    def declare_process_count(self, pidfile_id, num_processes):
        pidfile_id.num_processes = num_processes


    def absolute_path(self, path):
        return 'absolute/' + path


    def write_lines_to_file(self, file_path, lines, paired_with_process=None):
        # TODO: record this
        pass


    def get_pidfile_id_from(self, execution_tag, pidfile_name):
        default_pidfile = self._DummyPidfileId(execution_tag, pidfile_name,
                                               num_processes=0)
        return self._pidfile_index.get((execution_tag, pidfile_name),
                                       default_pidfile)


    def kill_process(self, process):
        pidfile_id = self._process_index[process]
        self._killed_pidfiles.add(pidfile_id)
        self._set_pidfile_exit_status(pidfile_id, 271)


class MockEmailManager(NullMethodObject):
    _NULL_METHODS = ('send_queued_emails', 'send_email')

    def enqueue_notify_email(self, subject, message):
        logging.warn('enqueue_notify_email: %s', subject)
        logging.warn(message)


class SchedulerFunctionalTest(unittest.TestCase,
                              frontend_test_utils.FrontendTestMixin):
    # some number of ticks after which the scheduler is presumed to have
    # stabilized, given no external changes
    _A_LOT_OF_TICKS = 10

    def setUp(self):
        self._frontend_common_setup()
        self._set_stubs()
        self._set_global_config_values()
        self._create_dispatcher()

        logging.basicConfig(level=logging.DEBUG)


    def _create_dispatcher(self):
        self.dispatcher = monitor_db.Dispatcher()


    def tearDown(self):
        self._database.disconnect()
        self._frontend_common_teardown()


    def _set_stubs(self):
        self.mock_config = MockGlobalConfig()
        self.god.stub_with(global_config, 'global_config', self.mock_config)

        self.mock_drone_manager = MockDroneManager()
        self.god.stub_with(monitor_db, '_drone_manager',
                           self.mock_drone_manager)

        self.mock_email_manager = MockEmailManager()
        self.god.stub_with(email_manager, 'manager', self.mock_email_manager)

        self._database = (
            database_connection.TranslatingDatabase.get_test_database(
                translators=_DB_TRANSLATORS))
        self._database.connect(db_type='django')
        self.god.stub_with(monitor_db, '_db', self._database)


    def _set_global_config_values(self):
        self.mock_config.set_config_value('SCHEDULER', 'pidfile_timeout_mins',
                                          1)
        self.mock_config.set_config_value('SCHEDULER', 'gc_stats_interval_mins',
                                          999999)


    def _initialize_test(self):
        self.dispatcher.initialize()


    def _run_dispatcher(self):
        for _ in xrange(self._A_LOT_OF_TICKS):
            self.dispatcher.tick()


    def test_idle(self):
        self._initialize_test()
        self._run_dispatcher()


    def _assert_process_executed(self, working_directory, pidfile_name):
        process_was_executed = self.mock_drone_manager.was_process_executed(
                'hosts/host1/1-verify', monitor_db._AUTOSERV_PID_FILE)
        self.assert_(process_was_executed,
                     '%s/%s not executed' % (working_directory, pidfile_name))


    def _update_instance(self, model_instance):
        return type(model_instance).objects.get(pk=model_instance.pk)


    def _check_statuses(self, queue_entry, queue_entry_status,
                        host_status=None):
        self._check_entry_status(queue_entry, queue_entry_status)
        if host_status:
            self._check_host_status(queue_entry.host, host_status)


    def _check_entry_status(self, queue_entry, status):
        # update from DB
        queue_entry = self._update_instance(queue_entry)
        self.assertEquals(queue_entry.status, status)


    def _check_host_status(self, host, status):
        # update from DB
        host = self._update_instance(host)
        self.assertEquals(host.status, status)


    def _run_pre_job_verify(self, queue_entry):
        self._run_dispatcher() # launches verify
        self._check_statuses(queue_entry, HqeStatus.VERIFYING,
                             HostStatus.VERIFYING)
        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)


    def test_simple_job(self):
        self._initialize_test()
        job, queue_entry = self._make_job_and_queue_entry()
        self._run_pre_job_verify(queue_entry)
        self._run_dispatcher() # launches job
        self._check_statuses(queue_entry, HqeStatus.RUNNING, HostStatus.RUNNING)
        self._finish_job(queue_entry)
        self._check_statuses(queue_entry, HqeStatus.COMPLETED, HostStatus.READY)
        self._assert_nothing_is_running()


    def _setup_for_pre_job_cleanup(self):
        self._initialize_test()
        job, queue_entry = self._make_job_and_queue_entry()
        job.reboot_before = models.RebootBefore.ALWAYS
        job.save()
        return queue_entry


    def _run_pre_job_cleanup_job(self, queue_entry):
        self._run_dispatcher() # cleanup
        self._check_statuses(queue_entry, HqeStatus.VERIFYING,
                             HostStatus.CLEANING)
        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)
        self._run_dispatcher() # verify
        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
        self._run_dispatcher() # job
        self._finish_job(queue_entry)


    def test_pre_job_cleanup(self):
        queue_entry = self._setup_for_pre_job_cleanup()
        self._run_pre_job_cleanup_job(queue_entry)


    def _run_pre_job_cleanup_one_failure(self):
        queue_entry = self._setup_for_pre_job_cleanup()
        self._run_dispatcher() # cleanup
        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,
                                               exit_status=256)
        self._run_dispatcher() # repair
        self._check_statuses(queue_entry, HqeStatus.QUEUED,
                             HostStatus.REPAIRING)
        self.mock_drone_manager.finish_process(_PidfileType.REPAIR)
        return queue_entry


    def test_pre_job_cleanup_failure(self):
        queue_entry = self._run_pre_job_cleanup_one_failure()
        # from here the job should run as normal
        self._run_pre_job_cleanup_job(queue_entry)


    def test_pre_job_cleanup_double_failure(self):
        # TODO (showard): this test isn't perfect.  in reality, when the second
        # cleanup fails, it copies its results over to the job directory using
        # copy_results_on_drone() and then parses them.  since we don't handle
        # that, there appear to be no results at the job directory.  the
        # scheduler handles this gracefully, parsing gets effectively skipped,
        # and this test passes as is.  but we ought to properly test that
        # behavior.
        queue_entry = self._run_pre_job_cleanup_one_failure()
        self._run_dispatcher() # second cleanup
        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,
                                               exit_status=256)
        self._run_dispatcher()
        self._check_statuses(queue_entry, HqeStatus.FAILED,
                             HostStatus.REPAIR_FAILED)
        # nothing else should run
        self._assert_nothing_is_running()


    def _assert_nothing_is_running(self):
        self.assertEquals(self.mock_drone_manager.running_pidfile_ids(), [])


    def _setup_for_post_job_cleanup(self):
        self._initialize_test()
        job, queue_entry = self._make_job_and_queue_entry()
        job.reboot_after = models.RebootAfter.ALWAYS
        job.save()
        return queue_entry


    def _run_post_job_cleanup_failure_up_to_repair(self, queue_entry,
                                                   include_verify=True):
        if include_verify:
            self._run_pre_job_verify(queue_entry)
        self._run_dispatcher() # job
        self.mock_drone_manager.finish_process(_PidfileType.JOB)
        self._run_dispatcher() # parsing + cleanup
        self.mock_drone_manager.finish_process(_PidfileType.PARSE)
        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,
                                               exit_status=256)
        self._run_dispatcher() # repair, HQE unaffected
        self.mock_drone_manager.finish_process(_PidfileType.ARCHIVE)
        self._run_dispatcher()
        return queue_entry


    def test_post_job_cleanup_failure(self):
        queue_entry = self._setup_for_post_job_cleanup()
        self._run_post_job_cleanup_failure_up_to_repair(queue_entry)
        self._check_statuses(queue_entry, HqeStatus.COMPLETED,
                             HostStatus.REPAIRING)
        self.mock_drone_manager.finish_process(_PidfileType.REPAIR)
        self._run_dispatcher()
        self._check_statuses(queue_entry, HqeStatus.COMPLETED, HostStatus.READY)


    def test_post_job_cleanup_failure_repair_failure(self):
        queue_entry = self._setup_for_post_job_cleanup()
        self._run_post_job_cleanup_failure_up_to_repair(queue_entry)
        self.mock_drone_manager.finish_process(_PidfileType.REPAIR,
                                               exit_status=256)
        self._run_dispatcher()
        self._check_statuses(queue_entry, HqeStatus.COMPLETED,
                             HostStatus.REPAIR_FAILED)


    def _ensure_post_job_process_is_paired(self, queue_entry, pidfile_type):
        pidfile_name = _PIDFILE_TYPE_TO_PIDFILE[pidfile_type]
        queue_entry = self._update_instance(queue_entry)
        pidfile_id = self.mock_drone_manager.pidfile_from_path(
                queue_entry.execution_path(), pidfile_name)
        self.assert_(pidfile_id._paired_with_pidfile)


    def _finish_job(self, queue_entry):
        self.mock_drone_manager.finish_process(_PidfileType.JOB)
        self._run_dispatcher() # launches parsing + cleanup
        self._check_statuses(queue_entry, HqeStatus.PARSING,
                             HostStatus.CLEANING)
        self._ensure_post_job_process_is_paired(queue_entry, _PidfileType.PARSE)
        self._finish_parsing_and_cleanup(queue_entry)


    def _finish_parsing_and_cleanup(self, queue_entry):
        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)
        self.mock_drone_manager.finish_process(_PidfileType.PARSE)
        self._run_dispatcher()

        self._check_entry_status(queue_entry, HqeStatus.ARCHIVING)
        self.mock_drone_manager.finish_process(_PidfileType.ARCHIVE)
        self._run_dispatcher()


    def _create_reverify_request(self):
        host = self.hosts[0]
        models.SpecialTask.objects.create(host=host,
                                          task=models.SpecialTask.Task.VERIFY,
                                          requested_by=self.user)
        return host


    def test_requested_reverify(self):
        host = self._create_reverify_request()
        self._run_dispatcher()
        self._check_host_status(host, HostStatus.VERIFYING)
        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
        self._run_dispatcher()
        self._check_host_status(host, HostStatus.READY)


    def test_requested_reverify_failure(self):
        host = self._create_reverify_request()
        self._run_dispatcher()
        self.mock_drone_manager.finish_process(_PidfileType.VERIFY,
                                               exit_status=256)
        self._run_dispatcher() # repair
        self._check_host_status(host, HostStatus.REPAIRING)
        self.mock_drone_manager.finish_process(_PidfileType.REPAIR)
        self._run_dispatcher()
        self._check_host_status(host, HostStatus.READY)


    def _setup_for_do_not_verify(self):
        self._initialize_test()
        job, queue_entry = self._make_job_and_queue_entry()
        queue_entry.host.protection = host_protections.Protection.DO_NOT_VERIFY
        queue_entry.host.save()
        return queue_entry


    def test_do_not_verify_job(self):
        queue_entry = self._setup_for_do_not_verify()
        self._run_dispatcher() # runs job directly
        self._finish_job(queue_entry)


    def test_do_not_verify_job_with_cleanup(self):
        queue_entry = self._setup_for_do_not_verify()
        queue_entry.job.reboot_before = models.RebootBefore.ALWAYS
        queue_entry.job.save()

        self._run_dispatcher() # cleanup
        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)
        self._run_dispatcher() # job
        self._finish_job(queue_entry)


    def test_do_not_verify_pre_job_cleanup_failure(self):
        queue_entry = self._setup_for_do_not_verify()
        queue_entry.job.reboot_before = models.RebootBefore.ALWAYS
        queue_entry.job.save()

        self._run_dispatcher() # cleanup
        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP,
                                               exit_status=256)
        self._run_dispatcher() # failure ignored; job runs
        self._finish_job(queue_entry)


    def test_do_not_verify_post_job_cleanup_failure(self):
        queue_entry = self._setup_for_do_not_verify()

        self._run_post_job_cleanup_failure_up_to_repair(queue_entry,
                                                        include_verify=False)
        # failure ignored, host still set to Ready
        self._check_statuses(queue_entry, HqeStatus.COMPLETED, HostStatus.READY)
        self._run_dispatcher() # nothing else runs
        self._assert_nothing_is_running()


    def test_do_not_verify_requested_reverify_failure(self):
        host = self._create_reverify_request()
        host.protection = host_protections.Protection.DO_NOT_VERIFY
        host.save()

        self._run_dispatcher()
        self.mock_drone_manager.finish_process(_PidfileType.VERIFY,
                                               exit_status=256)
        self._run_dispatcher()
        self._check_host_status(host, HostStatus.READY) # ignore failure
        self._assert_nothing_is_running()


    def test_job_abort_in_verify(self):
        self._initialize_test()
        job = self._create_job(hosts=[1])
        self._run_dispatcher() # launches verify
        job.hostqueueentry_set.update(aborted=True)
        self._run_dispatcher() # kills verify, launches cleanup
        self.assert_(self.mock_drone_manager.was_last_process_killed(
                _PidfileType.VERIFY))
        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)
        self._run_dispatcher()


    def test_job_abort(self):
        self._initialize_test()
        job = self._create_job(hosts=[1])
        job.run_verify = False
        job.save()

        self._run_dispatcher() # launches job
        job.hostqueueentry_set.update(aborted=True)
        self._run_dispatcher() # kills job, launches gathering
        self.assert_(self.mock_drone_manager.was_last_process_killed(
                _PidfileType.JOB))
        self.mock_drone_manager.finish_process(_PidfileType.GATHER)
        self._run_dispatcher() # launches parsing + cleanup
        queue_entry = job.hostqueueentry_set.all()[0]
        self._finish_parsing_and_cleanup(queue_entry)


    def test_no_pidfile_leaking(self):
        self._initialize_test()
        self.test_simple_job()
        self.assertEquals(self.mock_drone_manager._pidfiles, {})

        self.test_job_abort_in_verify()
        self.assertEquals(self.mock_drone_manager._pidfiles, {})

        self.test_job_abort()
        self.assertEquals(self.mock_drone_manager._pidfiles, {})


    def _make_job_and_queue_entry(self):
        job = self._create_job(hosts=[1])
        queue_entry = job.hostqueueentry_set.all()[0]
        return job, queue_entry


    def test_recover_running_no_process(self):
        # recovery should re-execute a Running HQE if no process is found
        _, queue_entry = self._make_job_and_queue_entry()
        queue_entry.status = HqeStatus.RUNNING
        queue_entry.execution_subdir = '1-myuser/host1'
        queue_entry.save()
        queue_entry.host.status = HostStatus.RUNNING
        queue_entry.host.save()

        self._initialize_test()
        self._run_dispatcher()
        self._finish_job(queue_entry)


    def test_recover_verifying_hqe_no_special_task(self):
        # recovery should fail on a Verifing HQE with no corresponding
        # Verify or Cleanup SpecialTask
        _, queue_entry = self._make_job_and_queue_entry()
        queue_entry.status = HqeStatus.VERIFYING
        queue_entry.save()

        # make some dummy SpecialTasks that shouldn't count
        models.SpecialTask.objects.create(host=queue_entry.host,
                                          task=models.SpecialTask.Task.VERIFY)
        models.SpecialTask.objects.create(host=queue_entry.host,
                                          task=models.SpecialTask.Task.CLEANUP,
                                          queue_entry=queue_entry,
                                          is_complete=True)

        self.assertRaises(monitor_db.SchedulerError, self._initialize_test)


    def _test_recover_verifying_hqe_helper(self, task, pidfile_type):
        _, queue_entry = self._make_job_and_queue_entry()
        queue_entry.status = HqeStatus.VERIFYING
        queue_entry.save()

        special_task = models.SpecialTask.objects.create(
                host=queue_entry.host, task=task, queue_entry=queue_entry)

        self._initialize_test()
        self._run_dispatcher()
        self.mock_drone_manager.finish_process(pidfile_type)
        self._run_dispatcher()
        # don't bother checking the rest of the job execution, as long as the
        # SpecialTask ran


    def test_recover_verifying_hqe_with_cleanup(self):
        # recover an HQE that was in pre-job cleanup
        self._test_recover_verifying_hqe_helper(models.SpecialTask.Task.CLEANUP,
                                                _PidfileType.CLEANUP)


    def test_recover_verifying_hqe_with_verify(self):
        # recover an HQE that was in pre-job verify
        self._test_recover_verifying_hqe_helper(models.SpecialTask.Task.VERIFY,
                                                _PidfileType.VERIFY)


    def test_recover_pending_hqes_with_group(self):
        # recover a group of HQEs that are in Pending, in the same group (e.g.,
        # in a job with atomic hosts)
        job = self._create_job(hosts=[1,2], atomic_group=1)
        job.save()

        job.hostqueueentry_set.all().update(status=HqeStatus.PENDING)

        self._initialize_test()
        for queue_entry in job.hostqueueentry_set.all():
            self.assertEquals(queue_entry.status, HqeStatus.STARTING)


    def test_recover_parsing(self):
        self._initialize_test()
        job, queue_entry = self._make_job_and_queue_entry()
        job.run_verify = False
        job.reboot_after = models.RebootAfter.NEVER
        job.save()

        self._run_dispatcher() # launches job
        self.mock_drone_manager.finish_process(_PidfileType.JOB)
        self._run_dispatcher() # launches parsing

        # now "restart" the scheduler
        self._create_dispatcher()
        self._initialize_test()
        self._run_dispatcher()
        self.mock_drone_manager.finish_process(_PidfileType.PARSE)
        self._run_dispatcher()


    def test_recover_parsing__no_process_already_aborted(self):
        _, queue_entry = self._make_job_and_queue_entry()
        queue_entry.execution_subdir = 'host1'
        queue_entry.status = HqeStatus.PARSING
        queue_entry.aborted = True
        queue_entry.save()

        self._initialize_test()
        self._run_dispatcher()


    def test_job_scheduled_just_after_abort(self):
        # test a pretty obscure corner case where a job is aborted while queued,
        # another job is ready to run, and throttling is active. the post-abort
        # cleanup must not be pre-empted by the second job.
        job1, queue_entry1 = self._make_job_and_queue_entry()
        job2, queue_entry2 = self._make_job_and_queue_entry()

        self.mock_drone_manager.process_capacity = 0
        self._run_dispatcher() # schedule job1, but won't start verify
        job1.hostqueueentry_set.update(aborted=True)
        self.mock_drone_manager.process_capacity = 100
        self._run_dispatcher() # cleanup must run here, not verify for job2
        self._check_statuses(queue_entry1, HqeStatus.ABORTED,
                             HostStatus.CLEANING)
        self.mock_drone_manager.finish_process(_PidfileType.CLEANUP)
        self._run_dispatcher() # now verify starts for job2
        self._check_statuses(queue_entry2, HqeStatus.VERIFYING,
                             HostStatus.VERIFYING)


    def test_reverify_interrupting_pre_job(self):
        # ensure things behave sanely if a reverify is scheduled in the middle
        # of pre-job actions
        _, queue_entry = self._make_job_and_queue_entry()

        self._run_dispatcher() # pre-job verify
        self._create_reverify_request()
        self.mock_drone_manager.finish_process(_PidfileType.VERIFY,
                                               exit_status=256)
        self._run_dispatcher() # repair
        self.mock_drone_manager.finish_process(_PidfileType.REPAIR)
        self._run_dispatcher() # reverify runs now
        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
        self._run_dispatcher() # pre-job verify
        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
        self._run_dispatcher() # and job runs...
        self._check_statuses(queue_entry, HqeStatus.RUNNING, HostStatus.RUNNING)
        self._finish_job(queue_entry) # reverify has been deleted
        self._check_statuses(queue_entry, HqeStatus.COMPLETED,
                             HostStatus.READY)
        self._assert_nothing_is_running()


    def test_reverify_while_job_running(self):
        # once a job is running, a reverify must not be allowed to preempt
        # Gathering
        _, queue_entry = self._make_job_and_queue_entry()
        self._run_pre_job_verify(queue_entry)
        self._run_dispatcher() # job runs
        self._create_reverify_request()
        # make job end with a signal, so gathering will run
        self.mock_drone_manager.finish_process(_PidfileType.JOB,
                                               exit_status=271)
        self._run_dispatcher() # gathering must start
        self.mock_drone_manager.finish_process(_PidfileType.GATHER)
        self._run_dispatcher() # parsing and cleanup
        self._finish_parsing_and_cleanup(queue_entry)
        self._run_dispatcher() # now reverify runs
        self._check_statuses(queue_entry, HqeStatus.FAILED,
                             HostStatus.VERIFYING)
        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
        self._run_dispatcher()
        self._check_host_status(queue_entry.host, HostStatus.READY)


    def test_reverify_while_host_pending(self):
        # ensure that if a reverify is scheduled while a host is in Pending, it
        # won't run until the host is actually free
        job = self._create_job(hosts=[1,2])
        queue_entry = job.hostqueueentry_set.get(host__hostname='host1')
        job.synch_count = 2
        job.save()

        host2 = self.hosts[1]
        host2.locked = True
        host2.save()

        self._run_dispatcher() # verify host1
        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
        self._run_dispatcher() # host1 Pending
        self._check_statuses(queue_entry, HqeStatus.PENDING, HostStatus.PENDING)
        self._create_reverify_request()
        self._run_dispatcher() # nothing should happen here
        self._check_statuses(queue_entry, HqeStatus.PENDING, HostStatus.PENDING)

        # now let the job run
        host2.locked = False
        host2.save()
        self._run_dispatcher() # verify host2
        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
        self._run_dispatcher() # run job
        self._finish_job(queue_entry)
        # need to explicitly finish host1's post-job cleanup
        self.mock_drone_manager.finish_specific_process(
                'hosts/host1/4-cleanup', monitor_db._AUTOSERV_PID_FILE)
        self._run_dispatcher()
        # the reverify should now be running
        self._check_statuses(queue_entry, HqeStatus.COMPLETED,
                             HostStatus.VERIFYING)
        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
        self._run_dispatcher()
        self._check_host_status(queue_entry.host, HostStatus.READY)


    def test_throttling(self):
        job = self._create_job(hosts=[1,2,3])
        job.synch_count = 3
        job.save()

        queue_entries = list(job.hostqueueentry_set.all())
        def _check_hqe_statuses(*statuses):
            for queue_entry, status in zip(queue_entries, statuses):
                self._check_statuses(queue_entry, status)

        self.mock_drone_manager.process_capacity = 2
        self._run_dispatcher() # verify runs on 1 and 2
        _check_hqe_statuses(HqeStatus.VERIFYING, HqeStatus.VERIFYING,
                            HqeStatus.VERIFYING)
        self.assertEquals(len(self.mock_drone_manager.running_pidfile_ids()), 2)

        self.mock_drone_manager.finish_specific_process(
                'hosts/host1/1-verify', monitor_db._AUTOSERV_PID_FILE)
        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
        self._run_dispatcher() # verify runs on 3
        _check_hqe_statuses(HqeStatus.PENDING, HqeStatus.PENDING,
                            HqeStatus.VERIFYING)

        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
        self._run_dispatcher() # job won't run due to throttling
        _check_hqe_statuses(HqeStatus.STARTING, HqeStatus.STARTING,
                            HqeStatus.STARTING)
        self._assert_nothing_is_running()

        self.mock_drone_manager.process_capacity = 3
        self._run_dispatcher() # now job runs
        _check_hqe_statuses(HqeStatus.RUNNING, HqeStatus.RUNNING,
                            HqeStatus.RUNNING)

        self.mock_drone_manager.process_capacity = 2
        self.mock_drone_manager.finish_process(_PidfileType.JOB,
                                               exit_status=271)
        self._run_dispatcher() # gathering won't run due to throttling
        _check_hqe_statuses(HqeStatus.GATHERING, HqeStatus.GATHERING,
                            HqeStatus.GATHERING)
        self._assert_nothing_is_running()

        self.mock_drone_manager.process_capacity = 3
        self._run_dispatcher() # now gathering runs

        self.mock_drone_manager.process_capacity = 0
        self.mock_drone_manager.finish_process(_PidfileType.GATHER)
        self._run_dispatcher() # parsing runs despite throttling
        _check_hqe_statuses(HqeStatus.PARSING, HqeStatus.PARSING,
                            HqeStatus.PARSING)


    def test_abort_starting_while_throttling(self):
        self._initialize_test()
        job = self._create_job(hosts=[1,2], synchronous=True)
        queue_entry = job.hostqueueentry_set.all()[0]
        job.run_verify = False
        job.reboot_after = models.RebootAfter.NEVER
        job.save()

        self.mock_drone_manager.process_capacity = 0
        self._run_dispatcher() # go to starting, but don't start job
        self._check_statuses(queue_entry, HqeStatus.STARTING,
                             HostStatus.PENDING)

        job.hostqueueentry_set.update(aborted=True)
        self._run_dispatcher()


    def test_simple_atomic_group_job(self):
        job = self._create_job(atomic_group=1)
        self._run_dispatcher() # expand + verify
        queue_entries = job.hostqueueentry_set.all()
        self.assertEquals(len(queue_entries), 2)
        self.assertEquals(queue_entries[0].host.hostname, 'host5')
        self.assertEquals(queue_entries[1].host.hostname, 'host6')

        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
        self._run_dispatcher() # delay task started waiting

        self.mock_drone_manager.finish_specific_process(
                'hosts/host5/1-verify', monitor_db._AUTOSERV_PID_FILE)
        self._run_dispatcher() # job starts now
        for entry in queue_entries:
            self._check_statuses(entry, HqeStatus.RUNNING, HostStatus.RUNNING)

        # rest of job proceeds normally


    def test_simple_metahost_assignment(self):
        job = self._create_job(metahosts=[1])
        self._run_dispatcher()
        entry = job.hostqueueentry_set.all()[0]
        self.assertEquals(entry.host.hostname, 'host1')
        self._check_statuses(entry, HqeStatus.VERIFYING, HostStatus.VERIFYING)
        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
        self._run_dispatcher()
        self._check_statuses(entry, HqeStatus.RUNNING, HostStatus.RUNNING)
        # rest of job proceeds normally


    def test_metahost_fail_verify(self):
        self.hosts[1].labels.add(self.labels[0]) # put label1 also on host2
        job = self._create_job(metahosts=[1])
        self._run_dispatcher() # assigned to host1
        self.mock_drone_manager.finish_process(_PidfileType.VERIFY,
                                               exit_status=256)
        self._run_dispatcher() # host1 failed, gets reassigned to host2
        entry = job.hostqueueentry_set.all()[0]
        self.assertEquals(entry.host.hostname, 'host2')
        self._check_statuses(entry, HqeStatus.VERIFYING, HostStatus.VERIFYING)
        self._check_host_status(self.hosts[0], HostStatus.REPAIRING)

        self.mock_drone_manager.finish_process(_PidfileType.VERIFY)
        self._run_dispatcher()
        self._check_statuses(entry, HqeStatus.RUNNING, HostStatus.RUNNING)


    def test_hostless_job(self):
        job = self._create_job(hostless=True)
        entry = job.hostqueueentry_set.all()[0]

        self._run_dispatcher()
        self._check_entry_status(entry, HqeStatus.RUNNING)

        self.mock_drone_manager.finish_process(_PidfileType.JOB)
        self._run_dispatcher()
        self._check_entry_status(entry, HqeStatus.COMPLETED)


    def test_pre_job_keyvals(self):
        job = self._create_job(hosts=[1])
        job.run_verify = False
        job.reboot_before = models.RebootBefore.NEVER
        job.save()
        models.JobKeyval.objects.create(job=job, key='mykey', value='myvalue')

        self._run_dispatcher()
        self._finish_job(job.hostqueueentry_set.all()[0])

        attached_files = self.mock_drone_manager.attached_files(
                '1-my_user/host1')
        job_keyval_path = '1-my_user/host1/keyval'
        self.assert_(job_keyval_path in attached_files, attached_files)
        keyval_contents = attached_files[job_keyval_path]
        keyval_dict = dict(line.strip().split('=', 1)
                           for line in keyval_contents.splitlines())
        self.assert_('job_queued' in keyval_dict, keyval_dict)
        self.assertEquals(keyval_dict['mykey'], 'myvalue')


if __name__ == '__main__':
    unittest.main()