summaryrefslogtreecommitdiff
path: root/scheduler/monitor_queue
blob: 513f0a73004d81b0ed56d78d933cd462a8a450c4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#!/usr/bin/python -u
# monitor_queue <client> <spool_directory> <resultsdir> [<conmux_server>]
import os, time, sys
from subprocess import *
import tempfile

if (len(sys.argv) < 3):
	print "Usage: monitor_queue <spool_directory> <resultsdir> [<conmux_server>]"
	sys.exit(1)
(spooldir, resultsdir) = [os.path.abspath(p) for p in sys.argv[1:3]]

queue_name = os.path.basename(spooldir)
dotmachines = os.path.join(spooldir, '.machines')
if os.path.exists(dotmachines):
	machines = [l.strip() for l in open(dotmachines).readlines() if len(l.strip())]
else:
	print "No .machines file in %s, assuming queue name \
	is a machine" % queue_name
	machines = [queue_name]

if len(sys.argv) == 5:
	console = sys.argv[4]
else:
	console = None
if not os.path.exists(spooldir):
	print "spooldir %s does not exist" % spooldir
	sys.exit(1)
if not os.path.exists(resultsdir):
	print "resultsdir %s does not exist" % resultsdir
	sys.exit(1)


##### Control file templates #####
SERV_MULTI = """# monitor_queue generated autoserv file (SERV_MULTI template)
hosts = [hosts.ConmuxSSHHost(hostname, server=%s)
		for hostname in machines]

at = autotest.Autotest()

control_path = %s
results = %s

def install_run(host):
	at.install(host)
	host_results = os.path.join(results, host.hostname)
	at.run(control_path, host_results, host)

parallel([subcommand(install_run, [host]) for host in hosts])"""


SERV_SINGLE = """# monitor_queue generated autoserv file (SERV_SINGLE template)
host = hosts.ConmuxSSHHost(machines[0], server=%s)
		
at = autotest.Autotest()

control_path = %s
results = %s

at.install(host)
at.run(control_path, results, host)"""

##### End control file templates #####

def pick_job(jobs):
	"""Pick the next job to run. Currently we just pick the oldest job
	However, this would be the place to put prioritizations."""
	if not jobs:
		return None
	return sorted(jobs, key=lambda x:os.stat(x).st_mtime, reverse=True)[0]


def __create_autoserv_wrapper(template, control_path, results):
	"""Create an autoserv file that runs an autotest file at
	control_path on clients and outputs the results in results."""
	# Create an autoserv control file to run this autotest control file
	tmpfd, tmpname = tempfile.mkstemp()
	tmp = os.fdopen(tmpfd, 'w')

	print >> tmp, template % tuple([repr(s) for s in (console,
							  control_path,
							  results)])
	return tmpname
	

def run_job(control):
	"""Runs a control file from the spooldir.
	Args:
	  control: A path to a control file.  It is assumed to be an
	    Autotest control file in which case it will automatically
	    be wrapped with autoserv control commands and run with
	    autoserv.  If the file name ends with .srv the wrapping
	    procedure will be skipped and the autoserv file will be
	    run directly.

	Return:
	  The return code from the autoserv process.
	"""
	# Make sure all the output directories are all setup
	results = os.path.join(resultsdir, control)
	if os.path.exists(results):
		print "Resultsdir %s already present, " % results,
		results = "%s.%d" % (results, int(time.time()))
		print "changing to %s" % results
	os.mkdir(results)
	debug = os.path.join(results, 'debug')
	os.mkdir(debug)

	# If this is an autoserv file then don't create the wrapper control
	is_autoserv_ctl = control.endswith('.srv')
	control_path = os.path.abspath(os.path.join(spooldir, control))
	# Otherwise create a tmp autoserv file just to launch the AT ctl file
	if not is_autoserv_ctl:
		if len(machines) > 1:
			# Run *AT* file on *all* machines in queue in *parallel*
			template = SERV_MULTI
		else:
			# Run *AT* on *one* machine
			template = SERV_SINGLE
		control_path = __create_autoserv_wrapper(template,
							 control_path,
							 results)

	# Now run the job
	exedir = os.path.abspath(os.path.dirname(sys.argv[0]))
	autoserv_exe = os.path.abspath(os.path.join(exedir,
						    '..',
						    'server',
						    'autoserv'))
	autoserv_cmd = ' '.join([autoserv_exe,
				 '-m',
				 ','.join(machines),
				 control_path])

	print "Starting job: %s" % control
	print autoserv_cmd
	
	autoserv_log = open(os.path.join(debug, 'server.log'), 'w');
	p = Popen(autoserv_cmd, shell=True, stdout=autoserv_log, stderr=STDOUT)
	(pid, ret) = os.waitpid(p.pid, 0)
	autoserv_log.close()
	
	# If this was a tempfile then clean it up
	if not is_autoserv_ctl:
		os.unlink(control_path)
	print "Completed job: %s (%d) " % (control, ret)
	
	return ret
	

dir = os.path.abspath(os.path.dirname(sys.argv[0]))
runjob = os.path.join(dir, 'runjob')
os.chdir(spooldir)
print "monitoring spool directory: " + spooldir
while True:
	jobs = [j for j in os.listdir(spooldir) if not j.startswith('.')]
	next_job = pick_job(jobs)
	if not next_job:
		time.sleep(10)
		continue
	ret = run_job(next_job)
	os.remove(next_job)