PROJECT_MOVED -> https://lab.nexedi.com/nexedi/erp5
[erp5.git] / erp5 / util / taskdistribution / __init__.py
1 ##############################################################################
2 #
3 # Copyright (c) 2012 Nexedi SA and Contributors. All Rights Reserved.
4 #
5 # WARNING: This program as such is intended to be used by professional
6 # programmers who take the whole responsibility of assessing all potential
7 # consequences resulting from its eventual inadequacies and bugs
8 # End users who are looking for a ready-to-use solution with commercial
9 # guarantees and support are strongly advised to contract a Free Software
10 # Service Company
11 #
12 # This program is Free Software; you can redistribute it and/or
13 # modify it under the terms of the GNU General Public License
14 # as published by the Free Software Foundation; either version 3
15 # of the License, or (at your option) any later version.
16 #
17 # This program is distributed in the hope that it will be useful,
18 # but WITHOUT ANY WARRANTY; without even the implied warranty of
19 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 # GNU General Public License for more details.
21 #
22 # You should have received a copy of the GNU General Public License
23 # along with this program; if not, write to the Free Software
24 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
25 #
26 ##############################################################################
27 """
28 Client implementation for portal_task_distribution.
29
30 Example use:
31 import erp5.util.taskdistribution
32 tool = erp5.util.taskdistribution.TaskDistributionTool(...)
33 test_result = tool.createTestResult(...)
34 test_result.addWatch('foo', open('foo'))
35 while True:
36 test_line = test_result.start()
37 if not test_line:
38 break
39 # Run the test_line.name test
40 test_line.stop()
41 """
42 import httplib
43 import logging
44 import select
45 import socket
46 import threading
47 import time
48 import xmlrpclib
49
50 __all__ = ['TaskDistributionTool', 'TestResultProxy', 'TestResultLineProxy', 'patchRPCParser']
51
52 # Depending on used xmlrpc backend, different exceptions can be thrown.
53 SAFE_RPC_EXCEPTION_LIST = [socket.error, xmlrpclib.ProtocolError,
54 xmlrpclib.Fault, httplib.BadStatusLine, httplib.ResponseNotReady]
55 parser, _ = xmlrpclib.getparser()
56 if xmlrpclib.ExpatParser and isinstance(parser, xmlrpclib.ExpatParser):
57 SAFE_RPC_EXCEPTION_LIST.append(xmlrpclib.expat.ExpatError)
58 else:
59 import sys
60 print >> sys.stderr, 'Warning: unhandled xmlrpclib parser %r, some ' \
61 'exceptions might get through safeRpcCall' % (parser, )
62 del sys
63 SAFE_RPC_EXCEPTION_LIST = tuple(SAFE_RPC_EXCEPTION_LIST)
64 del parser, _
65
66 def null_callable(*args, **kw):
67 pass
68
69 class NullLogger(object):
70 def __getattr__(self, name):
71 return null_callable
72 null_logger = NullLogger()
73
74 def patchRPCParser(error_handler):
75 """
76 Patch xmlrpcmlib's parser class, so it logs data content in case of errors,
77 to ease debugging.
78 Warning: this installs a monkey patch on a generic class, so it's last
79 comes wins. Must *not* be enabled by default.
80
81 error_handler (callable)
82 Receives the erroneous data as first parameter, and the exception
83 instance as second parameter.
84 If it returns a false value (ie, handler did not recover from the error),
85 exception is re-raised.
86 """
87 parser, _ = xmlrpclib.getparser()
88 parser_klass = parser.__class__
89 original_feed = parser_klass.feed
90 def verbose_feed(self, data):
91 try:
92 return original_feed(self, data)
93 except Exception, exc:
94 if not error_handler(data, exc):
95 raise
96 parser_klass.feed = verbose_feed
97
98 class RPCRetry(object):
99 def __init__(self, proxy, retry_time, logger):
100 super(RPCRetry, self).__init__()
101 self._proxy = proxy
102 self._retry_time = retry_time
103 self._logger = logger
104
105 def _RPC(self, func_id, args=()):
106 return getattr(self._proxy, func_id)(*args)
107
108 def _retryRPC(self, func_id, args=()):
109 retry_time = self._retry_time
110 while True:
111 try:
112 return self._RPC(func_id, args)
113 except SAFE_RPC_EXCEPTION_LIST:
114 self._logger.warning('Got exception, retrying: %s%r '
115 'in %is', func_id, tuple(args), retry_time, exc_info=1)
116 time.sleep(retry_time)
117 # find a balance between not overloading a server and
118 # getting back working services quickly when rpc calls should
119 # rework (do not wait 2 days if server is back)
120 retry_time = min(retry_time * 1.5, self._retry_time * 10)
121
122 class TestResultLineProxy(RPCRetry):
123 """
124 Represents a single test in a suite.
125
126 Properties:
127 name (str) (ro)
128 Test name, as provided to TaskDistributionTool.createTestResult .
129 """
130 def __init__(self, proxy, retry_time, logger, test_result_line_path,
131 test_name):
132 super(TestResultLineProxy, self).__init__(proxy, retry_time, logger)
133 self._test_result_line_path = test_result_line_path
134 self._name = test_name
135
136 def __repr__(self):
137 return '<%s(%r, %r) at %x>' % (self.__class__.__name__,
138 self._test_result_line_path, self._name, id(self))
139
140 @property
141 def name(self):
142 return self._name
143
144 def isTestCaseAlive(self):
145 """
146 Tell if test result line is still alive on site.
147 """
148 try:
149 return bool(self._retryRPC('isTestCaseAlive', [self._test_result_line_path]))
150 except:
151 raise ValueError('isTestCaseAlive Failed.')
152
153 def stop(self, test_count=None, error_count=None, failure_count=None,
154 skip_count=None, duration=None, date=None, command=None,
155 stdout=None, stderr=None, html_test_result=None, **kw):
156 """
157 Notify server of test completion.
158
159 Without any parameter, notifies of a test failure which prevents any
160 precise reading (step count, how many succeeded, etc).
161
162 BBB: extra named arguments are deprecated (if some are really needed,
163 they must be declared as explicit parameters, with proper default
164 value).
165 """
166 status_dict = dict(x for x in (
167 ('test_count', test_count),
168 ('error_count', error_count),
169 ('failure_count', failure_count),
170 ('skip_count', skip_count),
171 ('duration', duration),
172 ('date', date),
173 ('command', command),
174 ('stdout', stdout),
175 ('stderr', stderr),
176 ('html_test_result', html_test_result),
177 ) if x[1] is not None)
178 if kw:
179 self._logger.info('Extra parameters provided: %r', kw)
180 status_dict.update(kw)
181 self._retryRPC('stopUnitTest', (self._test_result_line_path,
182 status_dict))
183
184 class TestResultProxy(RPCRetry):
185 """
186 Represents a test suite run.
187
188 Allows fetching work to do (eg a single test in an entire run), monitoring
189 log files, informing server of problems and monitoring server-side
190 cancellation.
191
192 Properties
193 watcher_period (float) (rw)
194 How long log watcher sleeps between successive uploading latest data
195 chunks.
196 revision (str) (ro)
197 Revision to test. Might be different from the revision requested, when a
198 test batch is running on an older revision.
199 """
200 _watcher_can_run = True
201 _watcher_thread = None
202
203 def __init__(self, proxy, retry_time, logger, test_result_path, node_title,
204 revision):
205 super(TestResultProxy, self).__init__(proxy, retry_time, logger)
206 self._test_result_path = test_result_path
207 self._node_title = node_title
208 self._revision = revision
209 self._watcher_period = 60
210 self._watcher_dict = {}
211 self._watcher_condition = threading.Condition()
212 def __repr__(self):
213 return '<%s(%r, %r, %r) at %x>' % (self.__class__.__name__,
214 self._test_result_path, self._node_title, self._revision, id(self))
215
216 @property
217 def test_result_path(self):
218 return self._test_result_path
219
220 @property
221 def revision(self):
222 return self._revision
223
224 def start(self, exclude_list=()):
225 """
226 Ask for a test to run, among the list of tests composing this test
227 result.
228 Return an TestResultLineProxy instance, or None if there is nothing to
229 do.
230 """
231 result = self._retryRPC('startUnitTest', (self._test_result_path,
232 exclude_list))
233 if result:
234 line_url, test_name = result
235 result = TestResultLineProxy(self._proxy, self._retry_time,
236 self._logger, line_url, test_name)
237 return result
238
239 def reportFailure(self, date=None, command=None, stdout=None, stderr=None):
240 """
241 Report a test-node-level problem, preventing the test from continuing
242 on this node.
243 """
244 self._stopWatching()
245 status_dict = {
246 'date': date,
247 }
248 if command is not None:
249 status_dict['command'] = command
250 if stdout is not None:
251 status_dict['stdout'] = stdout
252 if stderr is not None:
253 status_dict['stderr'] = stderr
254 self._retryRPC('reportTaskFailure', args=(self._test_result_path,
255 status_dict, self._node_title))
256
257 def reportStatus(self, command, stdout, stderr):
258 """
259 Report some progress.
260
261 Used internally by file monitoring, you shouldn't have to use this
262 directly.
263 """
264 try:
265 self._RPC('reportTaskStatus', (self._test_result_path, {
266 'command': command,
267 'stdout': stdout,
268 'stderr': stderr,
269 }, self._node_title))
270 except SAFE_RPC_EXCEPTION_LIST:
271 self._logger.warning('Got exception in reportTaskStatus, giving up',
272 exc_info=1)
273
274 def isAlive(self):
275 """
276 Tell if test is still alive on site.
277
278 Useful to probe for test cancellation by user, so a new test run can
279 be started without waiting for current one to finish.
280 """
281 try:
282 return self._RPC('isTaskAlive', (self._test_result_path, ))
283 except SAFE_RPC_EXCEPTION_LIST:
284 self._logger.warning('Got exception in isTaskAlive, assuming alive',
285 exc_info=1)
286 return 1
287
288 @property
289 def watcher_period(self):
290 return self._watcher_period
291
292 @watcher_period.setter
293 def watcher_period(self, period):
294 cond = self._watcher_condition
295 with cond:
296 self._watcher_period = period
297 cond.notify()
298
299 def addWatch(self, name, stream, max_history_bytes=None):
300 """
301 Monitor given file, sending a few latest lines to remote server.
302 name (any)
303 Arbitrary identifier for stream. Must be usable as a dict key.
304 stream (file object)
305 File to monitor from its current offset.
306 max_history_bytes (int, None)
307 How many bytes to send to remote server at most for each wakeup.
308 If None, send all lines.
309 """
310 watcher_dict = self._watcher_dict
311 if not watcher_dict:
312 self._startWatching()
313 elif name in watcher_dict:
314 raise ValueError('Name already known: %r' % (name, ))
315 watcher_dict[name] = (stream, max_history_bytes)
316
317 def removeWatch(self, name):
318 """
319 Stop monitoring given stream.
320 """
321 watcher_dict = self._watcher_dict
322 del watcher_dict[name]
323 if not watcher_dict:
324 self._stopWatching()
325
326 def _startWatching(self):
327 if self._watcher_thread is not None:
328 raise ValueError('Thread already started')
329 self._watcher_thread = thread = threading.Thread(target=self._watcher)
330 thread.daemon = True
331 thread.start()
332
333 def _watcher(self):
334 cond = self._watcher_condition
335 while self._watcher_can_run and self.isAlive():
336 caption_list = []
337 append = caption_list.append
338 for name, (stream, max_history_bytes) in \
339 self._watcher_dict.iteritems():
340 append('==> %s <==' % (name, ))
341 start = stream.tell()
342 stream.seek(0, 2)
343 end = stream.tell()
344 if start == end:
345 caption = time.strftime(
346 '(no new lines at %Y/%m/%d %H:%M:%S)', time.gmtime())
347 else:
348 to_read = end - start
349 if to_read < 0:
350 # File got truncated, treat the whole content as new.
351 to_read = end
352 if max_history_bytes is not None:
353 to_read = min(to_read, max_history_bytes)
354 stream.seek(-to_read, 1)
355 caption = stream.read(to_read)
356 append(caption)
357 self.reportStatus('', '\n'.join(caption_list), '')
358 with cond:
359 cond.wait(self._watcher_period)
360
361 def _stopWatching(self):
362 cond = self._watcher_condition
363 with cond:
364 self._watcher_can_run = False
365 cond.notify()
366 if self._watcher_thread is not None:
367 self._watcher_thread.join()
368
369 def stop(self):
370 """
371
372 """
373 return self._retryRPC('stopTest', [self._test_result_path])
374
375 def getRunningTestCase(self):
376 """
377 Return the relative path of the next test with the running state
378 """
379 return self._retryRPC('getRunningTestCase', [self._test_result_path])
380
381 class ServerProxy(xmlrpclib.ServerProxy):
382
383 def __init__(self, *args, **kw):
384 xmlrpclib.ServerProxy.__init__(self, *args, **kw)
385 transport = self.__transport
386 def make_connection(*args, **kw):
387 conn = transport.__class__.make_connection(transport, *args, **kw)
388 conn.timeout = 120
389 return conn
390 transport.make_connection = make_connection
391 self.__rpc_lock = threading.Lock()
392
393 def __request(self, *args, **kw):
394 with self.__rpc_lock:
395 return xmlrpclib.ServerProxy.__request(self, *args, **kw)
396
397 class TaskDistributionTool(RPCRetry):
398 def __init__(self, portal_url, retry_time=64, logger=None):
399 """
400 portal_url (str, None)
401 Portal URL of ERP5 site to use as a task distributor.
402 If None, single node setup is assumed.
403 """
404 if logger is None:
405 logger = null_logger
406 if portal_url is None:
407 proxy = DummyTaskDistributionTool()
408 else:
409 proxy = ServerProxy(
410 portal_url,
411 allow_none=True,
412 ).portal_task_distribution
413 super(TaskDistributionTool, self).__init__(proxy, retry_time, logger)
414 protocol_revision = self._retryRPC('getProtocolRevision')
415 if protocol_revision != 1:
416 raise ValueError('Unsupported protocol revision: %r',
417 protocol_revision)
418
419 def createTestResult(self, revision, test_name_list, node_title,
420 allow_restart=False, test_title=None, project_title=None):
421 """
422 (maybe) create a new test run.
423 revision (str)
424 An opaque string describing code being tested.
425 test_name_list (list of str)
426 List of tests being part of this test run. May be empty.
427 node_title (str)
428 Human-readable test node identifier, so an adnmin can know which
429 node does what.
430 allow_restart (bool)
431 When true, a tet result is always created, even if a former finished
432 one is found for same name and revision pair.
433 test_title (str)
434 Human-readable title for test. Must be identical for successive runs.
435 Allows browsing its result history.
436 project_title (str)
437 Existing project title, so test result gets associated to it.
438
439 Returns None if no test run is needed (a test run for given name and
440 revision has already been completed).
441 Otherwise, returns a TestResultProxy instance.
442 """
443 result = self._retryRPC('createTestResult', ('', revision,
444 test_name_list, allow_restart, test_title, node_title,
445 project_title))
446 if result:
447 test_result_path, revision = result
448 result = TestResultProxy(self._proxy, self._retry_time,
449 self._logger, test_result_path, node_title, revision)
450 return result
451
452 class TaskDistributor(RPCRetry):
453
454 def __init__(self,portal_url,retry_time=64,logger=None):
455 if logger is None:
456 logger = null_logger
457 if portal_url is None:
458 proxy = DummyTaskDistributionTool()
459 else:
460 proxy = ServerProxy(portal_url, allow_none=True)
461 super(TaskDistributor, self).__init__(proxy, retry_time,logger)
462 protocol_revision = self._retryRPC('getProtocolRevision')
463 if protocol_revision != 1:
464 raise ValueError('Unsupported protocol revision: %r',
465 protocol_revision)
466
467 def startTestSuite(self,node_title,computer_guid='unknown'):
468 """
469 Returns None if no test suite is needed.
470 therwise, returns a JSON with all the test suite parameters.
471 """
472 result = self._retryRPC('startTestSuite',(node_title,computer_guid,))
473 return result
474
475 def getTestType(self):
476 """
477 Return the Test Type
478 """
479 result = self._retryRPC('getTestType')
480 return result
481
482 def subscribeNode(self, node_title, computer_guid):
483 """
484 Susbscribes node with the node title and the computer guid.
485 """
486 self._retryRPC('subscribeNode', (node_title,computer_guid,))
487
488
489 def generateConfiguration(self, test_suite_title):
490 """
491 Generates a configuration from a test_suite_title
492 """
493 return self._retryRPC('generateConfiguration', (test_suite_title,))
494
495
496 def isMasterTestnode(self, test_node_title):
497 """
498 Returns True or False if the testnode is the master
499 """
500 return self._retryRPC('isMasterTestnode', (test_node_title,))
501
502 def getSlaposAccountKey(self):
503 """
504 Returns the slapos account key related to the distributor
505 """
506 return self._retryRPC('getSlaposAccountKey')
507
508 def getSlaposAccountCertificate(self):
509 """
510 Returns the slapos account certificate related to the distributor
511 """
512 return self._retryRPC('getSlaposAccountCertificate')
513
514 def getSlaposUrl(self):
515 """
516 Returns the url of slapos master related to the distributor
517 """
518 return self._retryRPC('getSlaposUrl')
519
520 def getSlaposHateoasUrl(self):
521 """
522 Returns the url of API REST using hateoas of
523 slapos master related to the distributor
524 """
525 return self._retryRPC('getSlaposHateoasUrl')
526
527
528 class DummyTaskDistributionTool(object):
529 """
530 Fake remote server.
531
532 Useful when willing to locally run all tests without reporting to any
533 server.
534
535 This class should remain internal to this module.
536 """
537 test_name_list = None
538
539 def __init__(self):
540 self._lock = threading.Lock()
541
542 def getProtocolRevision(self):
543 return 1
544
545 def createTestResult(self, name, revision, test_name_list, *args):
546 self.test_name_list = test_name_list[:]
547 return None, revision
548
549 def startUnitTest(self, test_result_path, exclude_list=()):
550 with self._lock:
551 for i, test in enumerate(self.test_name_list):
552 if test not in exclude_list:
553 del self.test_name_list[i]
554 return None, test
555
556 def stopUnitTest(self, *args):
557 pass
558
559 reportTaskFailure = reportTaskStatus = stopUnitTest
560
561 def isTaskAlive(self, *args):
562 return int(bool(self.test_name_list))
563