testnode: update runTestSuite code according to recent chanements in taskdistribution
[erp5.git] / erp5 / util / taskdistribution / __init__.py
1 ##############################################################################
2 #
3 # Copyright (c) 2012 Nexedi SA and Contributors. All Rights Reserved.
4 #
5 # WARNING: This program as such is intended to be used by professional
6 # programmers who take the whole responsibility of assessing all potential
7 # consequences resulting from its eventual inadequacies and bugs
8 # End users who are looking for a ready-to-use solution with commercial
9 # guarantees and support are strongly advised to contract a Free Software
10 # Service Company
11 #
12 # This program is Free Software; you can redistribute it and/or
13 # modify it under the terms of the GNU General Public License
14 # as published by the Free Software Foundation; either version 3
15 # of the License, or (at your option) any later version.
16 #
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
21 #
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25 #
26 ##############################################################################
27 """
28 Client implementation for portal_task_distribution.
29
30 Example use:
31 import erp5.util.taskdistribution
32 tool = erp5.util.taskdistribution.TaskDistributionTool(...)
33 test_result = tool.createTestResult(...)
34 test_result.addWatch('foo', open('foo'))
35 while True:
36 test_line = test_result.start()
37 if not test_line:
38 break
39 # Run the test_line.name test
40 test_line.stop()
41 """
42 import httplib
43 import logging
44 import select
45 import socket
46 import threading
47 import time
48 import xmlrpclib
49
50 __all__ = ['TaskDistributionTool', 'TestResultProxy', 'TestResultLineProxy', 'patchRPCParser']
51
52 # Depending on used xmlrpc backend, different exceptions can be thrown.
53 SAFE_RPC_EXCEPTION_LIST = [socket.error, xmlrpclib.ProtocolError,
54 xmlrpclib.Fault, httplib.BadStatusLine, httplib.ResponseNotReady]
55 parser, _ = xmlrpclib.getparser()
56 if xmlrpclib.ExpatParser and isinstance(parser, xmlrpclib.ExpatParser):
57 SAFE_RPC_EXCEPTION_LIST.append(xmlrpclib.expat.ExpatError)
58 else:
59 import sys
60 print >> sys.stderr, 'Warning: unhandled xmlrpclib parser %r, some ' \
61 'exceptions might get through safeRpcCall' % (parser, )
62 del sys
63 SAFE_RPC_EXCEPTION_LIST = tuple(SAFE_RPC_EXCEPTION_LIST)
64 del parser, _
65
66 def null_callable(*args, **kw):
67 pass
68
69 class NullLogger(object):
70 def __getattr__(self, name):
71 return null_callable
72 null_logger = NullLogger()
73
74 def patchRPCParser(error_handler):
75 """
76 Patch xmlrpcmlib's parser class, so it logs data content in case of errors,
77 to ease debugging.
78 Warning: this installs a monkey patch on a generic class, so it's last
79 comes wins. Must *not* be enabled by default.
80
81 error_handler (callable)
82 Receives the erroneous data as first parameter, and the exception
83 instance as second parameter.
84 If it returns a false value (ie, handler did not recover from the error),
85 exception is re-raised.
86 """
87 parser, _ = xmlrpclib.getparser()
88 parser_klass = parser.__class__
89 original_feed = parser_klass.feed
90 def verbose_feed(self, data):
91 try:
92 return original_feed(self, data)
93 except Exception, exc:
94 if not error_handler(data, exc):
95 raise
96 parser_klass.feed = verbose_feed
97
98 class RPCRetry(object):
99 def __init__(self, proxy, retry_time, logger):
100 super(RPCRetry, self).__init__()
101 self._proxy = proxy
102 self._retry_time = retry_time
103 self._logger = logger
104
105 def _RPC(self, func_id, args=()):
106 return getattr(self._proxy, func_id)(*args)
107
108 def _retryRPC(self, func_id, args=()):
109 retry_time = self._retry_time
110 while True:
111 try:
112 return self._RPC(func_id, args)
113 except SAFE_RPC_EXCEPTION_LIST:
114 self._logger.warning('Got exception, retrying: %s%r '
115 'in %is', func_id, tuple(args), retry_time, exc_info=1)
116 time.sleep(retry_time)
117 retry_time *= 1.5
118
119 class TestResultLineProxy(RPCRetry):
120 """
121 Represents a single test in a suite.
122
123 Properties:
124 name (str) (ro)
125 Test name, as provided to TaskDistributionTool.createTestResult .
126 """
127 def __init__(self, proxy, retry_time, logger, test_result_line_path,
128 test_name):
129 super(TestResultLineProxy, self).__init__(proxy, retry_time, logger)
130 self._test_result_line_path = test_result_line_path
131 self._name = test_name
132
133 def __repr__(self):
134 return '<%s(%r, %r) at %x>' % (self.__class__.__name__,
135 self._test_result_line_path, self._name, id(self))
136
137 @property
138 def name(self):
139 return self._name
140
141 def isTestCaseAlive(self):
142 """
143 Tell if test result line is still alive on site.
144 """
145 try:
146 return bool(self._retryRPC('isTestCaseAlive', [self._test_result_line_path]))
147 except:
148 raise ValueError('isTestCaseAlive Failed.')
149
150 def stop(self, test_count=None, error_count=None, failure_count=None,
151 skip_count=None, duration=None, date=None, command=None,
152 stdout=None, stderr=None, html_test_result=None, **kw):
153 """
154 Notify server of test completion.
155
156 Without any parameter, notifies of a test failure which prevents any
157 precise reading (step count, how many succeeded, etc).
158
159 BBB: extra named arguments are deprecated (if some are really needed,
160 they must be declared as explicit parameters, with proper default
161 value).
162 """
163 status_dict = dict(x for x in (
164 ('test_count', test_count),
165 ('error_count', error_count),
166 ('failure_count', failure_count),
167 ('skip_count', skip_count),
168 ('duration', duration),
169 ('date', date),
170 ('command', command),
171 ('stdout', stdout),
172 ('stderr', stderr),
173 ('html_test_result', html_test_result),
174 ) if x[1] is not None)
175 if kw:
176 self._logger.info('Extra parameters provided: %r', kw)
177 status_dict.update(kw)
178 self._retryRPC('stopUnitTest', (self._test_result_line_path,
179 status_dict))
180
181 class TestResultProxy(RPCRetry):
182 """
183 Represents a test suite run.
184
185 Allows fetching work to do (eg a single test in an entire run), monitoring
186 log files, informing server of problems and monitoring server-side
187 cancellation.
188
189 Properties
190 watcher_period (float) (rw)
191 How long log watcher sleeps between successive uploading latest data
192 chunks.
193 revision (str) (ro)
194 Revision to test. Might be different from the revision requested, when a
195 test batch is running on an older revision.
196 """
197 _watcher_can_run = True
198 _watcher_thread = None
199
200 def __init__(self, proxy, retry_time, logger, test_result_path, node_title,
201 revision):
202 super(TestResultProxy, self).__init__(proxy, retry_time, logger)
203 self._test_result_path = test_result_path
204 self._node_title = node_title
205 self._revision = revision
206 self._watcher_period = 60
207 self._watcher_dict = {}
208 self._watcher_condition = threading.Condition()
209 def __repr__(self):
210 return '<%s(%r, %r, %r) at %x>' % (self.__class__.__name__,
211 self._test_result_path, self._node_title, self._revision, id(self))
212
213 @property
214 def test_result_path(self):
215 return self._test_result_path
216
217 @property
218 def revision(self):
219 return self._revision
220
221 def start(self, exclude_list=()):
222 """
223 Ask for a test to run, among the list of tests composing this test
224 result.
225 Return an TestResultLineProxy instance, or None if there is nothing to
226 do.
227 """
228 result = self._retryRPC('startUnitTest', (self._test_result_path,
229 exclude_list))
230 if result:
231 line_url, test_name = result
232 result = TestResultLineProxy(self._proxy, self._retry_time,
233 self._logger, line_url, test_name)
234 return result
235
236 def reportFailure(self, date=None, command=None, stdout=None, stderr=None):
237 """
238 Report a test-node-level problem, preventing the test from continuing
239 on this node.
240 """
241 self._stopWatching()
242 status_dict = {
243 'date': date,
244 }
245 if command is not None:
246 status_dict['command'] = command
247 if stdout is not None:
248 status_dict['stdout'] = stdout
249 if stderr is not None:
250 status_dict['stderr'] = stderr
251 self._retryRPC('reportTaskFailure', args=(self._test_result_path,
252 status_dict, self._node_title))
253
254 def reportStatus(self, command, stdout, stderr):
255 """
256 Report some progress.
257
258 Used internally by file monitoring, you shouldn't have to use this
259 directly.
260 """
261 try:
262 self._RPC('reportTaskStatus', (self._test_result_path, {
263 'command': command,
264 'stdout': stdout,
265 'stderr': stderr,
266 }, self._node_title))
267 except SAFE_RPC_EXCEPTION_LIST:
268 self._logger.warning('Got exception in reportTaskStatus, giving up',
269 exc_info=1)
270
271 def isAlive(self):
272 """
273 Tell if test is still alive on site.
274
275 Useful to probe for test cancellation by user, so a new test run can
276 be started without waiting for current one to finish.
277 """
278 try:
279 return self._RPC('isTaskAlive', (self._test_result_path, ))
280 except SAFE_RPC_EXCEPTION_LIST:
281 self._logger.warning('Got exception in isTaskAlive, assuming alive',
282 exc_info=1)
283 return 1
284
285 @property
286 def watcher_period(self):
287 return self._watcher_period
288
289 @watcher_period.setter
290 def watcher_period(self, period):
291 cond = self._watcher_condition
292 with cond:
293 self._watcher_period = period
294 cond.notify()
295
296 def addWatch(self, name, stream, max_history_bytes=None):
297 """
298 Monitor given file, sending a few latest lines to remote server.
299 name (any)
300 Arbitrary identifier for stream. Must be usable as a dict key.
301 stream (file object)
302 File to monitor from its current offset.
303 max_history_bytes (int, None)
304 How many bytes to send to remote server at most for each wakeup.
305 If None, send all lines.
306 """
307 watcher_dict = self._watcher_dict
308 if not watcher_dict:
309 self._startWatching()
310 elif name in watcher_dict:
311 raise ValueError('Name already known: %r' % (name, ))
312 watcher_dict[name] = (stream, max_history_bytes)
313
314 def removeWatch(self, name):
315 """
316 Stop monitoring given stream.
317 """
318 watcher_dict = self._watcher_dict
319 del watcher_dict[name]
320 if not watcher_dict:
321 self._stopWatching()
322
323 def _startWatching(self):
324 if self._watcher_thread is not None:
325 raise ValueError('Thread already started')
326 self._watcher_thread = thread = threading.Thread(target=self._watcher)
327 thread.daemon = True
328 thread.start()
329
330 def _watcher(self):
331 cond = self._watcher_condition
332 while self._watcher_can_run and self.isAlive():
333 caption_list = []
334 append = caption_list.append
335 for name, (stream, max_history_bytes) in \
336 self._watcher_dict.iteritems():
337 append('==> %s <==' % (name, ))
338 start = stream.tell()
339 stream.seek(0, 2)
340 end = stream.tell()
341 if start == end:
342 caption = time.strftime(
343 '(no new lines at %Y/%m/%d %H:%M:%S)', time.gmtime())
344 else:
345 to_read = end - start
346 if to_read < 0:
347 # File got truncated, treat the whole content as new.
348 to_read = end
349 if max_history_bytes is not None:
350 to_read = min(to_read, max_history_bytes)
351 stream.seek(-to_read, 1)
352 caption = stream.read(to_read)
353 append(caption)
354 self.reportStatus('', '\n'.join(caption_list), '')
355 with cond:
356 cond.wait(self._watcher_period)
357
358 def _stopWatching(self):
359 cond = self._watcher_condition
360 with cond:
361 self._watcher_can_run = False
362 cond.notify()
363 if self._watcher_thread is not None:
364 self._watcher_thread.join()
365
366 def stop(self):
367 """
368
369 """
370 return self._retryRPC('stopTest', [self._test_result_path])
371
372 def fail(self):
373 """
374
375 """
376 return self._retryRPC('failTest', [self._test_result_path])
377
378 class TestResultProxyProxy(TestResultProxy):
379 """
380 A wrapper/proxy to TestResultProxy
381 """
382 def __init__(self, test_suite_master_url, retry_time, logger, test_result_path,
383 node_title, revision):
384 try:
385 proxy = ServerProxy(
386 test_suite_master_url,
387 allow_none=True,
388 ).portal_task_distribution
389 except:
390 raise ValueError("Cannot instanciate ServerProxy")
391 TestResultProxy.__init__(self, proxy, retry_time, logger, test_result_path,
392 node_title, revision)
393
394 def getRunningTestCase(self):
395 """
396 A proxy to getNextTestCase
397 Return the relative path of the test with the running state
398 """
399 return self._retryRPC('getRunningTestCase', [self._test_result_path])
400
401 class ServerProxy(xmlrpclib.ServerProxy):
402
403 def __init__(self, *args, **kw):
404 xmlrpclib.ServerProxy.__init__(self, *args, **kw)
405 transport = self.__transport
406 def make_connection(*args, **kw):
407 conn = transport.__class__.make_connection(transport, *args, **kw)
408 # BBB: On Python < 2.7, HTTP connection is wrapped
409 c = getattr(conn, '_conn', conn)
410 assert hasattr(c, 'timeout')
411 c.timeout = 120
412 return conn
413 transport.make_connection = make_connection
414 self.__rpc_lock = threading.Lock()
415
416 def __request(self, *args, **kw):
417 with self.__rpc_lock:
418 return xmlrpclib.ServerProxy.__request(self, *args, **kw)
419
420 class TaskDistributionTool(RPCRetry):
421 def __init__(self, portal_url, retry_time=64, logger=None):
422 """
423 portal_url (str, None)
424 Portal URL of ERP5 site to use as a task distributor.
425 If None, single node setup is assumed.
426 """
427 if logger is None:
428 logger = null_logger
429 if portal_url is None:
430 proxy = DummyTaskDistributionTool()
431 else:
432 proxy = ServerProxy(
433 portal_url,
434 allow_none=True,
435 ).portal_task_distribution
436 super(TaskDistributionTool, self).__init__(proxy, retry_time, logger)
437 protocol_revision = self._retryRPC('getProtocolRevision')
438 if protocol_revision != 1:
439 raise ValueError('Unsupported protocol revision: %r',
440 protocol_revision)
441
442 def createTestResult(self, revision, test_name_list, node_title,
443 allow_restart=False, test_title=None, project_title=None):
444 """
445 (maybe) create a new test run.
446 revision (str)
447 An opaque string describing code being tested.
448 test_name_list (list of str)
449 List of tests being part of this test run. May be empty.
450 node_title (str)
451 Human-readable test node identifier, so an adnmin can know which
452 node does what.
453 allow_restart (bool)
454 When true, a tet result is always created, even if a former finished
455 one is found for same name and revision pair.
456 test_title (str)
457 Human-readable title for test. Must be identical for successive runs.
458 Allows browsing its result history.
459 project_title (str)
460 Existing project title, so test result gets associated to it.
461
462 Returns None if no test run is needed (a test run for given name and
463 revision has already been completed).
464 Otherwise, returns a TestResultProxy instance.
465 """
466 result = self._retryRPC('createTestResult', ('', revision,
467 test_name_list, allow_restart, test_title, node_title,
468 project_title))
469 if result:
470 test_result_path, revision = result
471 result = TestResultProxy(self._proxy, self._retry_time,
472 self._logger, test_result_path, node_title, revision)
473 return result
474
475 class TaskDistributor(RPCRetry):
476
477 def __init__(self,portal_url,retry_time=64,logger=None):
478 if logger is None:
479 logger = null_logger
480 if portal_url is None:
481 proxy = DummyTaskDistributionTool()
482 else:
483 proxy = ServerProxy(portal_url, allow_none=True)
484 super(TaskDistributor, self).__init__(proxy, retry_time,logger)
485 protocol_revision = self._retryRPC('getProtocolRevision')
486 if protocol_revision != 1:
487 raise ValueError('Unsupported protocol revision: %r',
488 protocol_revision)
489
490 def startTestSuite(self,node_title):
491 """
492 Returns None if no test suite is needed.
493 therwise, returns a JSON with all the test suite parameters.
494 """
495 result = self._retryRPC('startTestSuite',(node_title,))
496 return result
497
498 def getTestType(self):
499 """
500 Return the Test Type
501 """
502 result = self._retryRPC('getTestType')
503 return result
504
505 def subscribeNode(self, node_title, computer_guid):
506 """
507 Susbscribes node with the node title and the computer guid.
508 """
509 self._retryRPC('subscribeNode', (node_title,computer_guid,))
510
511
512 def generateConfiguration(self, test_suite_title):
513 """
514 Generates a configuration from a test_suite_title
515 """
516 return self._retryRPC('generateConfiguration', (test_suite_title,))
517
518
519 def isMasterTestnode(self, test_node_title):
520 """
521 Returns True or False if the testnode is the master
522 """
523 return self._retryRPC('isMasterTestnode', (test_node_title,))
524
525 def getSlaposAccountKey(self):
526 """
527 Returns the slapos account key related to the distributor
528 """
529 return self._retryRPC('getSlaposAccountKey')
530
531 def getSlaposAccountCertificate(self):
532 """
533 Returns the slapos account certificate related to the distributor
534 """
535 return self._retryRPC('getSlaposAccountCertificate')
536
537 def getSlaposUrl(self):
538 """
539 Returns the url of slapos master related to the distributor
540 """
541 return self._retryRPC('getSlaposUrl')
542
543 def getSlaposHateoasUrl(self):
544 """
545 Returns the url of API REST using hateoas of
546 slapos master related to the distributor
547 """
548 return self._retryRPC('getSlaposHateoasUrl')
549
550
551 class DummyTaskDistributionTool(object):
552 """
553 Fake remote server.
554
555 Useful when willing to locally run all tests without reporting to any
556 server.
557
558 This class should remain internal to this module.
559 """
560 test_name_list = None
561
562 def __init__(self):
563 self._lock = threading.Lock()
564
565 def getProtocolRevision(self):
566 return 1
567
568 def createTestResult(self, name, revision, test_name_list, *args):
569 self.test_name_list = test_name_list[:]
570 return None, revision
571
572 def startUnitTest(self, test_result_path, exclude_list=()):
573 with self._lock:
574 for i, test in enumerate(self.test_name_list):
575 if test not in exclude_list:
576 del self.test_name_list[i]
577 return None, test
578
579 def stopUnitTest(self, *args):
580 pass
581
582 reportTaskFailure = reportTaskStatus = stopUnitTest
583
584 def isTaskAlive(self, *args):
585 return int(bool(self.test_name_list))
586