Cleanup test result when SubprocessError is raised.
[slapos.git] / slapos / recipe / erp5testnode / testnode.py
1 from xml_marshaller import xml_marshaller
2 import os, xmlrpclib, time, imp
3 from glob import glob
4 import signal
5 import slapos.slap
6 import subprocess
7 import sys
8 import socket
9 import pprint
10 import traceback
11 from SlapOSControler import SlapOSControler
12 import time
13
14 class SubprocessError(EnvironmentError):
15 def __init__(self, status_dict):
16 self.status_dict = status_dict
17 def __getattr__(self, name):
18 return self.status_dict[name]
19 def __str__(self):
20 return 'Error %i' % self.status_code
21
22
23 from Updater import Updater
24
25 def log(message):
26 # Log to stdout, with a timestamp.
27 print time.strftime('%Y/%m/%d %H:%M:%S'), message
28
29 process_group_pid_set = set()
30 process_pid_file_list = []
31 process_command_list = []
32 def sigterm_handler(signal, frame):
33 for pgpid in process_group_pid_set:
34 try:
35 os.killpg(pgpid, signal.SIGTERM)
36 except:
37 pass
38 for pid_file in process_pid_file_list:
39 try:
40 os.kill(int(open(pid_file).read().strip()), signal.SIGTERM)
41 except:
42 pass
43 for p in process_command_list:
44 try:
45 subprocess.call(p)
46 except:
47 pass
48 sys.exit(1)
49
50 signal.signal(signal.SIGTERM, sigterm_handler)
51
52 def safeRpcCall(function, *args):
53 retry = 64
54 while True:
55 try:
56 return function(*args)
57 except:
58 log('Error in RPC call: %s\n%s' % (
59 traceback.format_exc(),
60 pprint.pformat(args),
61 ))
62 time.sleep(retry)
63 retry += retry >> 1
64
65 def getInputOutputFileList(config, command_name):
66 stdout = open(os.path.join(
67 config['log_directory'], '%s_out' % (command_name, )),
68 'w+')
69 stdout.write("%s\n" % command_name)
70 stderr = open(os.path.join(
71 config['log_directory'], '%s_err' % (command_name, )),
72 'w+')
73 return (stdout, stderr)
74
75 slapos_controler = None
76
77 def killPreviousRun(process_group_pid_set, supervisord_pid_file):
78 for pgpid in process_group_pid_set:
79 try:
80 os.killpg(pgpid, signal.SIGTERM)
81 except:
82 pass
83 try:
84 if os.path.exists(supervisord_pid_file):
85 os.kill(int(open(supervisord_pid_file).read().strip()), signal.SIGTERM)
86 except:
87 pass
88
89 PROFILE_PATH_KEY = 'profile_path'
90
91 def run(args):
92 config = args[0]
93 slapgrid = None
94 supervisord_pid_file = os.path.join(config['run_directory'],
95 'supervisord.pid')
96 subprocess.check_call([config['git_binary'],
97 "config", "--global", "http.sslVerify", "false"])
98 previous_revision = None
99
100 run_software = True
101 # Write our own software.cfg to use the local repository
102 custom_profile_path = os.path.join(config['working_directory'], 'software.cfg')
103 config['custom_profile_path'] = custom_profile_path
104 vcs_repository_list = config['vcs_repository_list']
105 profile_content = None
106 assert len(vcs_repository_list), "we must have at least one repository"
107 try:
108 # BBB: Accept global profile_path, which is the same as setting it for the
109 # first configured repository.
110 profile_path = config.pop(PROFILE_PATH_KEY)
111 except KeyError:
112 pass
113 else:
114 vcs_repository_list[0][PROFILE_PATH_KEY] = profile_path
115 for vcs_repository in vcs_repository_list:
116 url = vcs_repository['url']
117 buildout_section_id = vcs_repository.get('buildout_section_id', None)
118 repository_id = buildout_section_id or \
119 url.split('/')[-1].split('.')[0]
120 repository_path = os.path.join(config['working_directory'],repository_id)
121 vcs_repository['repository_id'] = repository_id
122 vcs_repository['repository_path'] = repository_path
123 try:
124 profile_path = vcs_repository[PROFILE_PATH_KEY]
125 except KeyError:
126 pass
127 else:
128 if profile_content is not None:
129 raise ValueError(PROFILE_PATH_KEY + ' defined more than once')
130 profile_content = """
131 [buildout]
132 extends = %(software_config_path)s
133 """ % {'software_config_path': os.path.join(repository_path, profile_path)}
134 if not(buildout_section_id is None):
135 profile_content += """
136 [%(buildout_section_id)s]
137 repository = %(repository_path)s
138 branch = %(branch)s
139 """ % {'buildout_section_id': buildout_section_id,
140 'repository_path' : repository_path,
141 'branch' : vcs_repository.get('branch','master')}
142
143 if profile_content is None:
144 raise ValueError(PROFILE_PATH_KEY + ' not defined')
145 custom_profile = open(custom_profile_path, 'w')
146 custom_profile.write(profile_content)
147 custom_profile.close()
148 config['repository_path'] = repository_path
149 sys.path.append(repository_path)
150 test_suite_title = config['test_suite_title'] or config['test_suite']
151
152 retry_software = False
153 try:
154 while True:
155 remote_test_result_needs_cleanup = False
156 # kill processes from previous loop if any
157 try:
158 killPreviousRun(process_group_pid_set, supervisord_pid_file)
159 process_group_pid_set.clear()
160 full_revision_list = []
161 # Make sure we have local repository
162 for vcs_repository in vcs_repository_list:
163 repository_path = vcs_repository['repository_path']
164 repository_id = vcs_repository['repository_id']
165 if not os.path.exists(repository_path):
166 parameter_list = [config['git_binary'], 'clone',
167 vcs_repository['url']]
168 if vcs_repository.get('branch') is not None:
169 parameter_list.extend(['-b',vcs_repository.get('branch')])
170 parameter_list.append(repository_path)
171 subprocess.check_call(parameter_list)
172 # Make sure we have local repository
173 updater = Updater(repository_path, git_binary=config['git_binary'],
174 log=log)
175 updater.checkout()
176 revision = "-".join(updater.getRevision())
177 full_revision_list.append('%s=%s' % (repository_id, revision))
178 revision = ','.join(full_revision_list)
179 if previous_revision == revision:
180 log('Sleeping a bit')
181 time.sleep(120)
182 if not(retry_software):
183 continue
184 log('Retrying install')
185 retry_software = False
186 previous_revision = revision
187
188 portal_url = config['test_suite_master_url']
189 test_result_path = None
190 test_result = (test_result_path, revision)
191 if portal_url:
192 if portal_url[-1] != '/':
193 portal_url += '/'
194 portal = xmlrpclib.ServerProxy("%s%s" %
195 (portal_url, 'portal_task_distribution'),
196 allow_none=1)
197 master = portal.portal_task_distribution
198 assert master.getProtocolRevision() == 1
199 test_result = safeRpcCall(master.createTestResult,
200 config['test_suite'], revision, [],
201 False, test_suite_title,
202 config['test_node_title'], config['project_title'])
203 remote_test_result_needs_cleanup = True
204 log("testnode, test_result : %r" % (test_result, ))
205 if test_result:
206 test_result_path, test_revision = test_result
207 if revision != test_revision:
208 log('Disagreement on tested revision, checking out:')
209 for i, repository_revision in enumerate(test_revision.split(',')):
210 vcs_repository = vcs_repository_list[i]
211 repository_path = vcs_repository['repository_path']
212 revision = repository_revision.split('-')[1]
213 # other testnodes on other boxes are already ready to test another
214 # revision
215 log(' %s at %s' % (repository_path, revision))
216 updater = Updater(repository_path, git_binary=config['git_binary'],
217 revision=revision)
218 updater.checkout()
219
220 # Now prepare the installation of SlapOS and create instance
221 slapos_controler = SlapOSControler(config,
222 process_group_pid_set=process_group_pid_set, log=log)
223 for method_name in ("runSoftwareRelease", "runComputerPartition"):
224 stdout, stderr = getInputOutputFileList(config, method_name)
225 slapos_method = getattr(slapos_controler, method_name)
226 status_dict = slapos_method(config,
227 environment=config['environment'],
228 process_group_pid_set=process_group_pid_set,
229 stdout=stdout, stderr=stderr
230 )
231 if status_dict['status_code'] != 0:
232 retry_software = True
233 raise SubprocessError(status_dict)
234
235 partition_path = os.path.join(config['instance_root'],
236 config['partition_reference'])
237 run_test_suite_path = os.path.join(partition_path, 'bin',
238 'runTestSuite')
239 if not os.path.exists(run_test_suite_path):
240 raise SubprocessError({
241 'command': 'os.path.exists(run_test_suite_path)',
242 'status_code': 1,
243 'stdout': '',
244 'stderr': 'File does not exist: %r' % (run_test_suite_path, ),
245 })
246
247 run_test_suite_revision = revision
248 if isinstance(revision, tuple):
249 revision = ','.join(revision)
250 # Deal with Shebang size limitation
251 file_object = open(run_test_suite_path, 'r')
252 line = file_object.readline()
253 file_object.close()
254 invocation_list = []
255 if line[:2] == '#!':
256 invocation_list = line[2:].split()
257 invocation_list.extend([run_test_suite_path,
258 '--test_suite', config['test_suite'],
259 '--revision', revision,
260 '--test_suite_title', test_suite_title,
261 '--node_quantity', config['node_quantity'],
262 '--master_url', config['test_suite_master_url']])
263 # From this point, test runner becomes responsible for updating test
264 # result.
265 # XXX: is it good for all cases (eg: test runner fails too early for
266 # any custom code to pick the failure up and react ?)
267 remote_test_result_needs_cleanup = False
268 run_test_suite = subprocess.Popen(invocation_list)
269 process_group_pid_set.add(run_test_suite.pid)
270 run_test_suite.wait()
271 process_group_pid_set.remove(run_test_suite.pid)
272 except SubprocessError, e:
273 if remote_test_result_needs_cleanup:
274 safeRpcCall(master.reportTaskFailure,
275 test_result_path, e.status_dict, config['test_node_title'])
276 time.sleep(120)
277
278 finally:
279 # Nice way to kill *everything* generated by run process -- process
280 # groups working only in POSIX compilant systems
281 # Exceptions are swallowed during cleanup phase
282 log("going to kill %r" % (process_group_pid_set, ))
283 killPreviousRun(process_group_pid_set, supervisord_pid_file)