PROJECT_MOVED -> https://lab.nexedi.com/nexedi/erp5
[erp5.git] / product / ERP5 / Tool / TaskDistributionTool.py
1 ##############################################################################
2 #
3 # Copyright (c) 2009 Nexedi SA and Contributors. All Rights Reserved.
4 # Julien Muchembled <jm@nexedi.com>
5 #
6 # WARNING: This program as such is intended to be used by professional
7 # programmers who take the whole responsibility of assessing all potential
8 # consequences resulting from its eventual inadequacies and bugs
9 # End users who are looking for a ready-to-use solution with commercial
10 # guarantees and support are strongly adviced to contract a Free Software
11 # Service Company
12 #
13 # This program is Free Software; you can redistribute it and/or
14 # modify it under the terms of the GNU General Public License
15 # as published by the Free Software Foundation; either version 2
16 # of the License, or (at your option) any later version.
17 #
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 # GNU General Public License for more details.
22 #
23 # You should have received a copy of the GNU General Public License
24 # along with this program; if not, write to the Free Software
25 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
26 #
27 ##############################################################################
28
29 import random
30 from AccessControl import ClassSecurityInfo
31 from Products.ERP5Type import Permissions, PropertySheet, Constraint, interfaces
32 from Products.ERP5Type.Tool.BaseTool import BaseTool
33 from Products.ZSQLCatalog.SQLCatalog import SimpleQuery, NegatedQuery
34 from zLOG import LOG
35 from xmlrpclib import Binary
36
37 class TaskDistributionTool(BaseTool):
38 """
39 A Task distribution tool (used for ERP5 unit test runs).
40 """
41
42 id = 'portal_task_distribution'
43 meta_type = 'ERP5 Task Distribution Tool'
44 portal_type = 'Task Distribution Tool'
45 allowed_types = ()
46
47 security = ClassSecurityInfo()
48 security.declareObjectProtected(Permissions.AccessContentsInformation)
49
50 security.declarePublic('getProtocolRevision')
51 def getProtocolRevision(self):
52 """
53 """
54 return 1
55
56 def _getTestResultNode(self, test_result, node_title):
57 node_list = [x for x in test_result.objectValues(
58 portal_type='Test Result Node') if x.getTitle() == node_title]
59 node_list_len = len(node_list)
60 assert node_list_len in (0, 1)
61 node = None
62 if len(node_list):
63 node = node_list[0]
64 return node
65
66 security.declarePublic('createTestResult')
67 def createTestResult(self, name, revision, test_name_list, allow_restart,
68 test_title=None, node_title=None, project_title=None):
69 """(temporary)
70 - name (string)
71 - revision (string representation of an integer)
72 - test_name_list (list of strings)
73 - allow_restart (boolean)
74
75 XXX 'revision' should be a string representing the full revision
76 of the tested code, because some projects are tested with different
77 revisions of ERP5.
78
79 -> (test_result_path, revision) or None if already completed
80 """
81 LOG('createTestResult', 0, (name, revision, test_title, project_title))
82 portal = self.getPortalObject()
83 if test_title is None:
84 test_title = name
85 def createNode(test_result, node_title):
86 if node_title is not None:
87 node = self._getTestResultNode(test_result, node_title)
88 if node is None:
89 node = test_result.newContent(portal_type='Test Result Node',
90 title=node_title)
91 node.start()
92 def createTestResultLineList(test_result, test_name_list):
93 duration_list = []
94 previous_test_result_list = portal.test_result_module.searchFolder(
95 title=SimpleQuery(comparison_operator='=', title=test_result.getTitle()),
96 sort_on=[('creation_date','descending')],
97 simulation_state=('stopped', 'public_stopped'),
98 limit=1)
99 if len(previous_test_result_list):
100 previous_test_result = previous_test_result_list[0].getObject()
101 for line in previous_test_result.objectValues():
102 if line.getSimulationState() in ('stopped', 'public_stopped'):
103 duration_list.append((line.getTitle(),line.getProperty('duration')))
104 duration_list.sort(key=lambda x: -x[1])
105 sorted_test_list = [x[0] for x in duration_list]
106 # Sort tests by name to have consistent numbering of test result line on
107 # a test suite.
108 for test_name in sorted(test_name_list):
109 index = 0
110 if sorted_test_list:
111 try:
112 index = sorted_test_list.index(test_name)
113 except ValueError:
114 pass
115 line = test_result.newContent(portal_type='Test Result Line',
116 title=test_name,
117 int_index=index)
118 reference_list_string = None
119 if type(revision) is str and '=' in revision:
120 reference_list_string = revision
121 int_index, reference = None, revision
122 elif type(revision) is str:
123 # backward compatibility
124 int_index, reference = revision, None
125 else:
126 # backward compatibility
127 int_index, reference = revision
128 result_list = portal.test_result_module.searchFolder(
129 portal_type="Test Result",
130 title=SimpleQuery(comparison_operator='=', title=test_title),
131 sort_on=(("creation_date","descending"),),
132 query=NegatedQuery(SimpleQuery(simulation_state="cancelled")),
133 limit=1)
134 if result_list:
135 test_result = result_list[0].getObject()
136 if test_result is not None:
137 last_state = test_result.getSimulationState()
138 last_revision = str(test_result.getIntIndex())
139 if last_state == 'started':
140 createNode(test_result, node_title)
141 reference = test_result.getReference()
142 if reference_list_string:
143 last_revision = reference
144 elif reference:
145 last_revision = last_revision, reference
146 if len(test_result.objectValues(portal_type="Test Result Line")) == 0 \
147 and len(test_name_list):
148 test_result.serialize() # prevent duplicate test result lines
149 createTestResultLineList(test_result, test_name_list)
150 return test_result.getRelativeUrl(), last_revision
151 if last_state in ('stopped', 'public_stopped'):
152 if reference_list_string is not None:
153 if reference_list_string == test_result.getReference() \
154 and not allow_restart:
155 return
156 elif last_revision == int_index and not allow_restart:
157 return
158 test_result = portal.test_result_module.newContent(
159 portal_type='Test Result',
160 title=test_title,
161 reference=reference,
162 is_indexable=False)
163 if int_index is not None:
164 test_result._setIntIndex(int_index)
165 if project_title is not None:
166 project_list = portal.portal_catalog(portal_type='Project',
167 title=SimpleQuery(comparison_operator='=', title=project_title))
168 if len(project_list) != 1:
169 raise ValueError('found this list of project : %r for title %r' % \
170 ([x.path for x in project_list], project_title))
171 test_result._setSourceProjectValue(project_list[0].getObject())
172 test_result.updateLocalRolesOnSecurityGroups() # XXX
173 test_result.start()
174 del test_result.isIndexable
175 test_result.immediateReindexObject()
176 self.serialize() # prevent duplicate test result
177 # following 2 functions only call 'newContent' on test_result
178 createTestResultLineList(test_result, test_name_list)
179 createNode(test_result, node_title)
180 return test_result.getRelativeUrl(), revision
181
182 security.declarePublic('startUnitTest')
183 def startUnitTest(self, test_result_path, exclude_list=()):
184 """(temporary)
185 - test_result_path (string)
186 - exclude_list (list of strings)
187
188 -> test_path (string), test_name (string)
189 or None if finished
190 """
191 portal = self.getPortalObject()
192 test_result = portal.restrictedTraverse(test_result_path)
193 if test_result.getSimulationState() != 'started':
194 return
195 started_list = []
196 for line in test_result.objectValues(portal_type="Test Result Line",
197 sort_on=[("int_index","ascending")]):
198 test = line.getTitle()
199 if test not in exclude_list:
200 state = line.getSimulationState()
201 test = line.getRelativeUrl(), test
202 if state == 'draft':
203 line.start()
204 return test
205 # XXX Make sure we finish all tests.
206 if state == 'started':
207 started_list.append(test)
208 if started_list:
209 return random.choice(started_list)
210
211 security.declarePublic('stopUnitTest')
212 def stopUnitTest(self, test_path, status_dict):
213 """(temporary)
214 - test_path (string)
215 - status_dict (dict)
216 """
217 status_dict = self._extractXMLRPCDict(status_dict)
218 LOG("TaskDistributionTool.stopUnitTest", 0, repr((test_path,status_dict)))
219 portal = self.getPortalObject()
220 line = portal.restrictedTraverse(test_path)
221 test_result = line.getParentValue()
222 if test_result.getSimulationState() == 'started':
223 if line.getSimulationState() == "started":
224 line.stop(**status_dict)
225 if {"stopped"} == {x.getSimulationState()
226 for x in test_result.objectValues(portal_type="Test Result Line")}:
227 test_result.stop()
228
229 def _extractXMLRPCDict(self, xmlrpc_dict):
230 """
231 extract all xmlrpclib.Binary instance
232 """
233 return {x: y.data if isinstance(y, Binary) else y
234 for x, y in xmlrpc_dict.iteritems()}
235
236 security.declarePublic('reportTaskFailure')
237 def reportTaskFailure(self, test_result_path, status_dict, node_title):
238 """report failure when a node can not handle task
239 """
240 status_dict = self._extractXMLRPCDict(status_dict)
241 LOG("TaskDistributionTool.reportTaskFailure", 0, repr((test_result_path,
242 status_dict)))
243 portal = self.getPortalObject()
244 test_result = portal.restrictedTraverse(test_result_path)
245 node = self._getTestResultNode(test_result, node_title)
246 assert node is not None
247 node.fail(**status_dict)
248 for node in test_result.objectValues(portal_type='Test Result Node'):
249 if node.getSimulationState() != 'failed':
250 break
251 else:
252 if test_result.getSimulationState() not in ('failed', 'cancelled'):
253 test_result.fail()
254
255 security.declarePublic('reportTaskStatus')
256 def reportTaskStatus(self, test_result_path, status_dict, node_title):
257 """report status of node
258 """
259 status_dict = self._extractXMLRPCDict(status_dict)
260 LOG("TaskDistributionTool.reportTaskStatus", 0, repr((test_result_path,
261 status_dict)))
262 portal = self.getPortalObject()
263 test_result = portal.restrictedTraverse(test_result_path)
264 node = self._getTestResultNode(test_result, node_title)
265 assert node is not None
266 node.edit(cmdline=status_dict['command'],
267 stdout=status_dict['stdout'], stderr=status_dict['stderr'])
268
269 security.declarePublic('isTaskAlive')
270 def isTaskAlive(self, test_result_path):
271 """check status of a test suite
272 """
273 LOG("TaskDistributionTool.checkTaskStatus", 0, repr(test_result_path))
274 portal = self.getPortalObject()
275 test_result = portal.restrictedTraverse(test_result_path)
276 return test_result.getSimulationState() == "started" and 1 or 0
277
278 security.declareProtected(Permissions.AccessContentsInformation, 'getMemcachedDict')
279 def getMemcachedDict(self):
280 """ Return a dictionary used for non persistent data related to distribution
281 """
282 portal = self.getPortalObject()
283 memcached_dict = portal.portal_memcached.getMemcachedDict(
284 "task_distribution", "default_memcached_plugin")
285 return memcached_dict