testnode: update runTestSuite code according to recent chanements in taskdistribution
[erp5.git] / erp5 / util / testnode / ScalabilityTestRunner.py
1 ##############################################################################
2 #
3 # Copyright (c) 2011 Nexedi SA and Contributors. All Rights Reserved.
4 #
5 # WARNING: This program as such is intended to be used by professional
6 # programmers who take the whole responsibility of assessing all potential
7 # consequences resulting from its eventual inadequacies and bugs
8 # End users who are looking for a ready-to-use solution with commercial
9 # guarantees and support are strongly advised to contract a Free Software
10 # Service Company
11 #
12 # This program is Free Software; you can redistribute it and/or
13 # modify it under the terms of the GNU General Public License
14 # as published by the Free Software Foundation; either version 3
15 # of the License, or (at your option) any later version.
16 #
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
21 #
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25 #
26 ##############################################################################
27 import datetime
28 import os
29 import subprocess
30 import sys
31 import time
32 import glob
33 import SlapOSControler
34 import SlapOSMasterCommunicator
35 import json
36 import time
37 import shutil
38 import logging
39 import string
40 import random
41 import Utils
42 from ProcessManager import SubprocessError, ProcessManager, CancellationError
43 from subprocess import CalledProcessError
44 from Updater import Updater
45 from erp5.util import taskdistribution
46 # for dummy slapos answer
47 import signal
48
49 # max time to instance changing state: 2 hour
50 MAX_INSTANCE_TIME = 60*60*2
51 # max time to register instance to slapOSMaster: 5 minutes
52 MAX_CREATION_INSTANCE_TIME = 60*10
53 # max time for a test: 1 hour
54 MAX_TEST_CASE_TIME = 60*60
55
56 class ScalabilityTestRunner():
57 def __init__(self, testnode):
58 self.testnode = testnode
59 self.log = self.testnode.log
60
61 self.slapos_controler = SlapOSControler.SlapOSControler(
62 self.testnode.working_directory,
63 self.testnode.config,
64 self.log)
65 # Create the slapos account configuration file and dir
66 key = self.testnode.test_suite_portal.getSlaposAccountKey()
67 certificate = self.testnode.test_suite_portal.getSlaposAccountCertificate()
68
69 # Get Slapos Master Url
70 self.slapos_url = ''
71 try:
72 self.slapos_url = self.testnode.test_suite_portal.getSlaposUrl()
73 if not self.slapos_url:
74 self.slapos_url = self.testnode.config['server_url']
75 except:
76 self.slapos_url = self.testnode.config['server_url']
77
78 # Get Slapos Master url used for api rest (using hateoas)
79 self.slapos_api_rest_url = self.testnode.test_suite_portal.getSlaposHateoasUrl()
80
81 self.log("SlapOS Master url is: %s" %self.slapos_url)
82 self.log("SlapOS Master hateoas url is: %s" %self.slapos_api_rest_url)
83
84 self.key_path, self.cert_path, config_path = self.slapos_controler.createSlaposConfigurationFileAccount(
85 key, certificate, self.slapos_url, self.testnode.config)
86 self.slapos_communicator = None
87 self.remaining_software_installation_dict = {}
88
89 # Protection to prevent installation of softwares after checking
90 self.authorize_supply = True
91 self.authorize_request = False
92 # Used to simulate SlapOS answer (used as a queue)
93 self.last_slapos_answer = []
94 self.last_slapos_answer_request = []
95
96 def _prepareSlapOS(self, software_path, computer_guid, create_partition=0):
97 # create_partition is kept for compatibility
98 """
99 A proxy to supply : Install a software on a specific node
100 """
101 self.log("testnode, supply : %s %s", software_path, computer_guid)
102 if self.authorize_supply :
103 self.remaining_software_installation_dict[computer_guid] = software_path
104 self.slapos_controler.supply(software_path, computer_guid)
105 # Here make a request via slapos controler ?
106 return {'status_code' : 0}
107 else:
108 raise ValueError("Too late to supply now. ('self.authorize_supply' is False)")
109 return {'status_code' : 1}
110
111 def _generateInstanceTitle(self, test_suite_title):
112 """
113 Generate an instance title using various parameter
114 TODO : add some verification (to don't use unexisting variables)
115 """
116 instance_title = "Scalability-"
117 instance_title += "("+test_suite_title+")-"
118 instance_title += str(self.involved_nodes_computer_guid).replace("'","")
119 instance_title += "-"+str(datetime.datetime.now().isoformat())+"-"
120 instance_title += "timestamp="+str(time.time())
121 return instance_title
122
123 def _generateInstanceXML(self, software_configuration,
124 test_result, test_suite):
125 """
126 Generate a complete scalability instance XML configuration
127 """
128 config_cluster = software_configuration.copy()
129 config = {'cluster':config_cluster}
130 config.update({'scalability-launcher-computer-guid':self.launcher_nodes_computer_guid[0]})
131 config.update({'scalability-launcher-title':'MyTestNodeTitle'})
132 config.update({'test-result-path':test_result.test_result_path})
133 config.update({'test-suite-revision':test_result.revision})
134 config.update({'test-suite':test_suite})
135 config.update({'test-suite-master-url':self.testnode.config['test_suite_master_url']})
136 return config
137
138 def _createInstance(self, software_path, software_configuration, instance_title,
139 test_result, test_suite):
140 """
141 Create scalability instance
142 """
143 if self.authorize_request:
144 config = self._generateInstanceXML(software_configuration,
145 test_result, test_suite)
146 self.log("testnode, request : %s", instance_title)
147 config = json.dumps(config)
148 self.slapos_controler.request(instance_title, software_path,
149 "test", {"_" : config},
150 self.launcher_nodes_computer_guid[0])
151 self.authorize_request = False
152 return {'status_code' : 0}
153 else:
154 raise ValueError("Softwares release not ready yet to launch instan\
155 ces or already launched.")
156 return {'status_code' : 1}
157
158 def prepareSlapOSForTestNode(self, test_node_slapos=None):
159 """
160 We will build slapos software needed by the testnode itself,
161 """
162 if self.testnode.test_suite_portal.isMasterTestnode(
163 self.testnode.config['test_node_title']):
164 pass
165 return {'status_code' : 0}
166
167 # Dummy slapos answering
168 def _getSignal(self, signal, frame):
169 self.log("Dummy SlapOS Master answer received.")
170 self.last_slapos_answer.append(True)
171 def _prepareDummySlapOSAnswer(self):
172 self.log("Dummy slapOS answer enabled, send signal to %s (kill -10 %s) to simu\
173 late a SlapOS (positive) answer." %(str(os.getpid()),str(os.getpid()),))
174 signal.signal(signal.SIGUSR1, self._getSignal)
175 def _comeBackFromDummySlapOS(self):
176 self.log("Dummy slapOS answer disabled, please don't send more signals.")
177 # use SIG_USR (kill)
178 signal.signal(signal.SIGUSR1, signal.SIG_DFL)
179 def simulateSlapOSAnswer(self):
180 if len(self.last_slapos_answer)==0:
181 return False
182 else:
183 return self.last_slapos_answer.pop()
184 # /Dummy slapos answering
185
186 def isSoftwareReleaseReady(self, software_url, computer_guid):
187 """
188 Return true if the specified software on the specified node is installed.
189 This method should communicates with SlapOS Master.
190 """
191 # TODO : implement -> communication with SlapOS master
192 # this simulate a SlapOS answer
193 return self.simulateSlapOSAnswer()
194
195 def remainSoftwareToInstall(self):
196 """
197 Return True if it remains softwares to install, otherwise return False
198 """
199 # Remove from grid installed software entries
200 for computer_guid, software_path in self.remaining_software_installation_dict.items():
201 if self.isSoftwareReleaseReady(software_path, computer_guid):
202 del self.remaining_software_installation_dict[computer_guid]
203 # Not empty grid means that all softwares are not installed
204 return len(self.remaining_software_installation_dict) > 0
205
206 def _updateInstanceXML(self, software_configuration, instance_title,
207 test_result, test_suite):
208 """
209 Just a proxy to SlapOSControler.updateInstanceXML.
210 """
211 config = self._generateInstanceXML(software_configuration,
212 test_result, test_suite)
213 config = json.dumps(config)
214 self.log("testnode, updateInstanceXML : %s", instance_title)
215 self.slapos_controler.updateInstanceXML(instance_title, {"_" : config})
216 return {'status_code' : 0}
217
218 def _waitInstance(self, instance_title, state, max_time=MAX_INSTANCE_TIME):
219 """
220 Wait for 'max_time' an instance specific state
221 """
222 self.log("Wait for instance state: %s" %state)
223 start_time = time.time()
224 while (not self.slapos_communicator.isHostingSubscriptionReady(instance_title, state)
225 and (max_time > (time.time()-start_time))):
226 self.log("Instance(s) not in %s state yet." % state)
227 time.sleep(15)
228 if (time.time()-start_time) > max_time:
229 raise ValueError("Instance '%s' not '%s' after %s seconds" %(instance_title, state, str(time.time()-start_time)))
230 self.log("Instance correctly '%s' after %s seconds." %(state, str(time.time()-start_time)))
231
232 def _waitInstanceCreation(self, instance_title, max_time=MAX_CREATION_INSTANCE_TIME):
233 """
234 Wait for 'max_time' the instance creation
235 """
236 self.log("Wait for instance creation")
237 start_time = time.time()
238 while ( not self.slapos_communicator.isRegisteredHostingSubscription(instance_title) \
239 and (max_time > (time.time()-start_time)) ):
240 time.sleep(5)
241 if (time.time()-start_time) > max_time:
242 raise ValueError("Instance '%s' not found after %s seconds" %(instance_title, max_time))
243 self.log("Instance found on slapOSMaster")
244
245 def prepareSlapOSForTestSuite(self, node_test_suite):
246 """
247 Install testsuite softwares
248 """
249 self.log('prepareSlapOSForTestSuite')
250 # Define how many time this method can take
251 max_time = 3600*10*1.0 # 10 hours
252 interval_time = 60
253 start_time = time.time()
254 # Create a communicator with slapos
255 self.log("creating SlapOs Master communicator...")
256 self.slapos_communicator = SlapOSMasterCommunicator.SlapOSMasterCommunicator(
257 self.cert_path,
258 self.key_path,
259 self.log,
260 self.slapos_api_rest_url)
261 # Only master testnode must order software installation
262 if self.testnode.test_suite_portal.isMasterTestnode(
263 self.testnode.config['test_node_title']):
264 # Get from ERP5 Master the configuration of the cluster for the test
265 test_configuration = Utils.deunicodeData(
266 json.loads(self.testnode.test_suite_portal.generateConfiguration(
267 node_test_suite.test_suite_title)
268 )
269 )
270 self.involved_nodes_computer_guid = test_configuration['involved_nodes_computer_guid']
271 self.launchable = test_configuration['launchable']
272 self.error_message = test_configuration['error_message']
273 self.randomized_path = test_configuration['randomized_path']
274 # Avoid the test if it is not launchable
275 if not self.launchable:
276 self.log("Test suite %s is not actually launchable with \
277 the current cluster configuration." %(node_test_suite.test_suite_title,))
278 self.log("ERP5 Master indicates : %s" %(self.error_message,))
279 # error : wich code to return ?
280 return {'status_code' : 1}
281
282 involved_nodes_computer_guid = test_configuration['involved_nodes_computer_guid']
283 configuration_list = test_configuration['configuration_list']
284 node_test_suite.edit(configuration_list=configuration_list)
285 self.launcher_nodes_computer_guid = test_configuration['launcher_nodes_computer_guid']
286
287 # Create an obfuscated link to the testsuite directory
288 path_to_suite = os.path.join(
289 self.testnode.config['working_directory'],
290 node_test_suite.reference)
291 self.obfuscated_link_path = os.path.join(
292 self.testnode.config['software_directory'],
293 self.randomized_path)
294 if ( not os.path.lexists(self.obfuscated_link_path) and
295 not os.path.exists(self.obfuscated_link_path) ) :
296 try :
297 os.symlink(path_to_suite, self.obfuscated_link_path)
298 self.log("testnode, Symbolic link (%s->%s) created."
299 %(self.obfuscated_link_path, path_to_suite))
300 except :
301 self.log("testnode, Unable to create symbolic link to the testsuite.")
302 raise ValueError("testnode, Unable to create symbolic link to the testsuite.")
303 self.log("Sym link : %s %s" %(path_to_suite, self.obfuscated_link_path))
304
305 # Construct the ipv6 obfuscated url of the software profile reachable from outside
306 self.reachable_address = os.path.join(
307 "https://","["+self.testnode.config['httpd_ip']+"]"+":"+self.testnode.config['httpd_software_access_port'],
308 self.randomized_path)
309 self.reachable_profile = os.path.join(self.reachable_address, "software.cfg")
310
311 # Write the reachable address in the software.cfg file,
312 # by replacing <obfuscated_url> occurences by the current reachable address.
313 software_file = open(node_test_suite.custom_profile_path, "r")
314 file_content = software_file.readlines()
315 new_file_content = []
316 for line in file_content:
317 new_file_content.append(line.replace('<obfuscated_url>', self.reachable_address))
318 software_file.close()
319 os.remove(node_test_suite.custom_profile_path)
320 software_file = open(node_test_suite.custom_profile_path, "w")
321 for line in new_file_content:
322 software_file.write(line)
323 software_file.close()
324 self.log("Software reachable profile path is : %s "
325 %(self.reachable_profile,))
326
327 # Ask for SR installation
328 for computer_guid in self.involved_nodes_computer_guid:
329 self._prepareSlapOS(self.reachable_profile, computer_guid)
330 # From the line below we would not supply any more softwares
331 self.authorize_supply = False
332 # TODO : remove the line below wich simulate an answer from slapos master
333 self._prepareDummySlapOSAnswer()
334 # Waiting until all softwares are installed
335 while ( self.remainSoftwareToInstall()
336 and (max_time > (time.time()-start_time))):
337 self.log("Master testnode is waiting\
338 for the end of all software installation (for %ss) PID=%s.",
339 str(int(time.time()-start_time)), str(os.getpid()))
340 time.sleep(interval_time)
341 # TODO : remove the line below wich simulate an answer from slapos master
342 self._comeBackFromDummySlapOS()
343 if self.remainSoftwareToInstall() :
344 # All softwares are not installed, however maxtime is elapsed, that's a failure.
345 return {'status_code' : 1}
346 self.authorize_request = True
347 self.log("Softwares installed")
348 # Launch instance
349 self.instance_title = self._generateInstanceTitle(node_test_suite.test_suite_title)
350 try:
351 self._createInstance(self.reachable_profile, configuration_list[0],
352 self.instance_title, node_test_suite.test_result, node_test_suite.test_suite)
353 self.log("Scalability instance requested.")
354 except:
355 self.log("Unable to launch instance")
356 raise ValueError("Unable to launch instance")
357 self.log("Waiting for instance creation..")
358 self._waitInstanceCreation(self.instance_title)
359 return {'status_code' : 0}
360 return {'status_code' : 1}
361
362 def runTestSuite(self, node_test_suite, portal_url):
363 if not self.launchable:
364 self.log("Current test_suite is not actually launchable.")
365 return {'status_code' : 1} # Unable to continue due to not realizable configuration
366 configuration_list = node_test_suite.configuration_list
367 test_list = range(0, len(configuration_list))
368 # create test_result
369 test_result_proxy = self.testnode.portal.createTestResult(
370 node_test_suite.revision, test_list,
371 self.testnode.config['test_node_title'],
372 True, node_test_suite.test_suite_title,
373 node_test_suite.project_title)
374
375 count = 0
376 error = None
377
378 # Each cluster configuration are tested
379 for configuration in configuration_list:
380
381 # First configuration doesn't need XML configuration update.
382 if count > 0:
383 # Stop instance
384 self.slapos_controler.stopInstance(self.instance_title)
385 self._waitInstance(self.instance_title, 'stopped')
386 # Update instance XML configuration
387 self._updateInstanceXML(configuration, self.instance_title,
388 node_test_suite.test_result, node_test_suite.test_suite)
389 self._waitInstance(self.instance_title, 'started')
390 # Start instance
391 self.slapos_controler.startInstance(self.instance_title)
392
393 # XXX: Dirty hack used to force haproxy to restart in time
394 # with all zope informations.
395 self._waitInstance(self.instance_title, 'started')
396 self.slapos_controler.stopInstance(self.instance_title)
397 self._waitInstance(self.instance_title, 'stopped')
398 self.slapos_controler.startInstance(self.instance_title)
399 ##########################################################
400
401 self._waitInstance(self.instance_title, 'started')
402
403 # Start only the current test
404 exclude_list=[x for x in test_list if x!=test_list[count]]
405 count += 1
406 test_result_line_proxy = test_result_proxy.start(exclude_list)
407
408 #
409 if test_result_line_proxy == None :
410 error_message = "Test case already tested."
411 error = ValueError(error_message)
412 break
413
414 self.log("Test for count : %d is in a running state." %count)
415
416 # Wait for test case ending
417 test_case_start_time = time.time()
418 while test_result_line_proxy.isTestCaseAlive() and \
419 test_result_proxy.isAlive() and \
420 time.time() - test_case_start_time < MAX_TEST_CASE_TIME:
421 time.sleep(15)
422
423 # Max time limit reach for current test case: failure.
424 if test_result_line_proxy.isTestCaseAlive():
425 error_message = "Test case during for %s seconds, too long. (max: %s seconds). Test failure." \
426 %(str(time.time() - test_case_start_time), MAX_TEST_CASE_TIME)
427 error = ValueError(error_message)
428 test_result_proxy.reportFailure(stdout=error_message)
429 break
430
431 # Test cancelled or in an undeterminate state.
432 if not test_result_proxy.isAlive():
433 error_message = "Test cancelled or undeterminate state."
434 error = ValueError(error_message)
435 break
436
437 # Stop current instance
438 self.slapos_controler.stopInstance(self.instance_title)
439 self._waitInstance(self.instance_title, 'stopped')
440
441 # Delete old instances
442 self._cleanUpOldInstance()
443
444 # If error appears then that's a test failure.
445 if error:
446 test_result_proxy.fail()
447 raise error
448 #else:
449 #test_result_proxy.stop()
450 return {'status_code' : 0}
451
452 def _cleanUpOldInstance(self):
453 self.log("_cleanUpOldInstance")
454
455 # Get title and link list of all instances
456 instance_dict = self.slapos_communicator.getHostingSubscriptionDict()
457 instance_to_delete_list = []
458 outdated_date = datetime.datetime.fromtimestamp(time.time()) - datetime.timedelta(days=2)
459
460 # Select instances to delete
461 for title,link in instance_dict.items():
462 # Instances created by testnode contains "Scalability-" and
463 # "timestamp=" in the title.
464 if "Scalability-" in title and "timestamp=" in title:
465 # Get timestamp of the instance creation date
466 foo, timestamp = title.split("timestamp=")
467 creation_date = datetime.datetime.fromtimestamp(float(timestamp))
468 # Test if instance is older than the limit
469 if creation_date < outdated_date:
470 instance_to_delete_list.append((title,link))
471
472 for title,link in instance_to_delete_list:
473 # Get instance information
474 instance_information_dict = self.slapos_communicator.getHostingSubscriptionInformationDict(title)
475 # Delete instance
476 if instance_information_dict:
477 if instance_information_dict['status'] != 'destroyed':
478 self.slapos_controler.request(
479 instance_information_dict['title'],
480 instance_information_dict['software_url'],
481 software_type=instance_information_dict['software_type'],
482 computer_guid=instance_information_dict['computer_guid'],
483 state='destroyed'
484 )
485 self.log("Instance '%s' deleted." %instance_information_dict['title'])
486
487 def _cleanUpNodesInformation(self):
488 self.involved_nodes_computer_guid = []
489 self.launcher_nodes_computer_guid = []
490 self.remaining_software_installation_dict = {}
491 self.authorize_supply = True
492 self.authorize_request = False
493
494 def getRelativePathUsage(self):
495 """
496 Used by the method testnode.constructProfile() to know
497 if the software.cfg have to use relative path or not.
498 """
499 return True