Last chunk of portal type classes / zodb property sheets.
[erp5.git] / product / CMFActivity / ActivityTool.py
1 ##############################################################################
2 #
3 # Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
4 # Jean-Paul Smets-Solanes <jp@nexedi.com>
5 #
6 # WARNING: This program as such is intended to be used by professional
7 # programmers who take the whole responsability of assessing all potential
8 # consequences resulting from its eventual inadequacies and bugs
9 # End users who are looking for a ready-to-use solution with commercial
10 # garantees and support are strongly adviced to contract a Free Software
11 # Service Company
12 #
13 # This program is Free Software; you can redistribute it and/or
14 # modify it under the terms of the GNU General Public License
15 # as published by the Free Software Foundation; either version 2
16 # of the License, or (at your option) any later version.
17 #
18 # This program is distributed in the hope that it will be useful,
19 # but WITHOUT ANY WARRANTY; without even the implied warranty of
20 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 # GNU General Public License for more details.
22 #
23 # You should have received a copy of the GNU General Public License
24 # along with this program; if not, write to the Free Software
25 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
26 #
27 ##############################################################################
28
29 import socket
30 import urllib
31 import threading
32 import sys
33 from types import StringType
34 import re
35
36 from Products.CMFCore import permissions as CMFCorePermissions
37 from Products.ERP5Type.Core.Folder import Folder
38 from Products.CMFActivity.ActiveResult import ActiveResult
39 from Products.CMFActivity.ActiveObject import DEFAULT_ACTIVITY
40 from Products.CMFActivity.ActivityConnection import ActivityConnection
41 from Products.PythonScripts.Utility import allow_class
42 from AccessControl import ClassSecurityInfo, Permissions
43 from AccessControl.SecurityManagement import newSecurityManager
44 from AccessControl.SecurityManagement import noSecurityManager
45 from AccessControl.SecurityManagement import setSecurityManager
46 from AccessControl.SecurityManagement import getSecurityManager
47 from Products.CMFCore.utils import UniqueObject, _getAuthenticatedUser, getToolByName
48 from Products.ERP5Type.Globals import InitializeClass, DTMLFile
49 from Acquisition import aq_base, aq_inner, aq_parent
50 from ActivityBuffer import ActivityBuffer
51 from ActivityRuntimeEnvironment import BaseMessage
52 from zExceptions import ExceptionFormatter
53 from BTrees.OIBTree import OIBTree
54
55 try:
56 from Products import iHotfix
57 localizer_lock = iHotfix._the_lock
58 localizer_contexts = iHotfix.contexts
59 LocalizerContext = iHotfix.Context
60 except ImportError:
61 # Localizer 1.2 includes iHotFix patches
62 import Products.Localizer.patches
63 localizer_lock = Products.Localizer.patches._requests_lock
64 localizer_contexts = Products.Localizer.patches._requests
65 LocalizerContext = lambda request: request
66
67
68 from ZODB.POSException import ConflictError
69 from Products.MailHost.MailHost import MailHostError
70
71 from zLOG import LOG, INFO, WARNING, ERROR
72 from warnings import warn
73 from time import time
74
75 try:
76 from Products.TimerService import getTimerService
77 except ImportError:
78 def getTimerService(self):
79 pass
80
81 try:
82 from traceback import format_list, extract_stack
83 except ImportError:
84 format_list = extract_stack = None
85
86 # minimal IP:Port regexp
87 NODE_RE = re.compile('^\d+\.\d+\.\d+\.\d+:\d+$')
88
89 # Using a RAM property (not a property of an instance) allows
90 # to prevent from storing a state in the ZODB (and allows to restart...)
91 active_threads = 0
92 max_active_threads = 1 # 2 will cause more bug to appear (he he)
93 is_initialized = False
94 tic_lock = threading.Lock() # A RAM based lock to prevent too many concurrent tic() calls
95 timerservice_lock = threading.Lock() # A RAM based lock to prevent TimerService spamming when busy
96 is_running_lock = threading.Lock()
97 currentNode = None
98 ROLE_IDLE = 0
99 ROLE_PROCESSING = 1
100
101 # Activity Registration
102 activity_dict = {}
103
104 # Logging channel definitions
105 import logging
106 # Main logging channel
107 activity_logger = logging.getLogger('CMFActivity')
108 # Some logging subchannels
109 activity_tracking_logger = logging.getLogger('Tracking')
110 activity_timing_logger = logging.getLogger('CMFActivity.TimingLog')
111
112 # Direct logging to "[instancehome]/log/CMFActivity.log", if this directory exists.
113 # Otherwise, it will end up in root logging facility (ie, event.log).
114 from App.config import getConfiguration
115 import os
116 instancehome = getConfiguration().instancehome
117 if instancehome is not None:
118 log_directory = os.path.join(instancehome, 'log')
119 if os.path.isdir(log_directory):
120 from Signals import Signals
121 from ZConfig.components.logger.loghandler import FileHandler
122 log_file_handler = FileHandler(os.path.join(log_directory, 'CMFActivity.log'))
123 # Default zope log format string borrowed from
124 # ZConfig/components/logger/factory.xml, but without the extra "------"
125 # line separating entries.
126 log_file_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s", "%Y-%m-%dT%H:%M:%S"))
127 Signals.registerZopeSignals([log_file_handler])
128 activity_logger.addHandler(log_file_handler)
129 activity_logger.propagate = 0
130
131 def activity_timing_method(method, args, kw):
132 begin = time()
133 try:
134 return method(*args, **kw)
135 finally:
136 end = time()
137 activity_timing_logger.info('%.02fs: %r(*%r, **%r)' % (end - begin, method, args, kw))
138
139 # Here go ActivityBuffer instances
140 # Structure:
141 # global_activity_buffer[activity_tool_path][thread_id] = ActivityBuffer
142 global_activity_buffer = {}
143 from thread import get_ident, allocate_lock
144 global_activity_buffer_lock = allocate_lock()
145
146 def registerActivity(activity):
147 # Must be rewritten to register
148 # class and create instance for each activity
149 #LOG('Init Activity', 0, str(activity.__name__))
150 activity_instance = activity()
151 activity_dict[activity.__name__] = activity_instance
152
153 MESSAGE_NOT_EXECUTED = 0
154 MESSAGE_EXECUTED = 1
155 MESSAGE_NOT_EXECUTABLE = 2
156
157
158 class Message(BaseMessage):
159 """Activity Message Class.
160
161 Message instances are stored in an activity queue, inside the Activity Tool.
162 """
163
164 active_process = None
165 active_process_uid = None
166
167 def __init__(self, obj, active_process, activity_kw, method_id, args, kw):
168 if isinstance(obj, str):
169 self.object_path = tuple(obj.split('/'))
170 activity_creation_trace = False
171 else:
172 self.object_path = obj.getPhysicalPath()
173 activity_creation_trace = obj.getPortalObject().portal_activities.activity_creation_trace
174 if active_process is not None:
175 self.active_process = active_process.getPhysicalPath()
176 self.active_process_uid = active_process.getUid()
177 if activity_kw.get('serialization_tag', False) is None:
178 # Remove serialization_tag if it's None.
179 del activity_kw['serialization_tag']
180 self.activity_kw = activity_kw
181 self.method_id = method_id
182 self.args = args
183 self.kw = kw
184 self.is_executed = MESSAGE_NOT_EXECUTED
185 self.exc_type = None
186 self.exc_value = None
187 self.traceback = None
188 if activity_creation_trace and format_list is not None:
189 # Save current traceback, to make it possible to tell where a message
190 # was generated.
191 # Strip last stack entry, since it will always be the same.
192 self.call_traceback = ''.join(format_list(extract_stack()[:-1]))
193 else:
194 self.call_traceback = None
195 self.processing = None
196 self.user_name = str(_getAuthenticatedUser(self))
197 # Store REQUEST Info
198 self.request_info = {}
199 request = getattr(obj, 'REQUEST', None)
200 if request is not None:
201 if 'SERVER_URL' in request.other:
202 self.request_info['SERVER_URL'] = request.other['SERVER_URL']
203 if 'VirtualRootPhysicalPath' in request.other:
204 self.request_info['VirtualRootPhysicalPath'] = \
205 request.other['VirtualRootPhysicalPath']
206 if 'HTTP_ACCEPT_LANGUAGE' in request.environ:
207 self.request_info['HTTP_ACCEPT_LANGUAGE'] = \
208 request.environ['HTTP_ACCEPT_LANGUAGE']
209 self.request_info['_script'] = list(request._script)
210
211 def getObject(self, activity_tool):
212 """return the object referenced in this message."""
213 return activity_tool.unrestrictedTraverse(self.object_path)
214
215 def getObjectList(self, activity_tool):
216 """return the list of object that can be expanded from this message."""
217 object_list = []
218 try:
219 object_list.append(self.getObject(activity_tool))
220 except KeyError:
221 pass
222 else:
223 if self.hasExpandMethod():
224 expand_method_id = self.activity_kw['expand_method_id']
225 # FIXME: how to pass parameters?
226 object_list = getattr(object_list[0], expand_method_id)()
227 return object_list
228
229 def hasExpandMethod(self):
230 """return true if the message has an expand method.
231 An expand method is used to expand the list of objects and to turn a
232 big recursive transaction affecting many objects into multiple
233 transactions affecting only one object at a time (this can prevent
234 duplicated method calls)."""
235 return self.activity_kw.has_key('expand_method_id')
236
237 def changeUser(self, user_name, activity_tool):
238 """restore the security context for the calling user."""
239 uf = activity_tool.getPortalObject().acl_users
240 user = uf.getUserById(user_name)
241 # if the user is not found, try to get it from a parent acl_users
242 # XXX this is still far from perfect, because we need to store all
243 # informations about the user (like original user folder, roles) to
244 # replay the activity with exactly the same security context as if
245 # it had been executed without activity.
246 if user is None:
247 uf = activity_tool.getPortalObject().aq_parent.acl_users
248 user = uf.getUserById(user_name)
249 if user is not None:
250 user = user.__of__(uf)
251 newSecurityManager(None, user)
252 else :
253 LOG("CMFActivity", WARNING,
254 "Unable to find user %r in the portal" % user_name)
255 noSecurityManager()
256 return user
257
258 def activateResult(self, activity_tool, result, object):
259 if self.active_process is not None:
260 active_process = activity_tool.unrestrictedTraverse(self.active_process)
261 if isinstance(result, ActiveResult):
262 result.edit(object_path=object)
263 result.edit(method_id=self.method_id)
264 # XXX Allow other method_id in future
265 active_process.activateResult(result)
266 else:
267 active_process.activateResult(
268 ActiveResult(object_path=object,
269 method_id=self.method_id,
270 result=result)) # XXX Allow other method_id in future
271
272 def __call__(self, activity_tool):
273 try:
274 obj = self.getObject(activity_tool)
275 except KeyError:
276 LOG('CMFActivity', ERROR,
277 'Message failed in getting an object from the path %r' % \
278 (self.object_path,),
279 error=sys.exc_info())
280 self.setExecutionState(MESSAGE_NOT_EXECUTABLE, context=activity_tool)
281 else:
282 try:
283 old_security_manager = getSecurityManager()
284 try:
285 # Change user if required (TO BE DONE)
286 # We will change the user only in order to execute this method
287 self.changeUser(self.user_name, activity_tool)
288 try:
289 # XXX: There is no check to see if user is allowed to access
290 # that method !
291 method = getattr(obj, self.method_id)
292 except:
293 LOG('CMFActivity', ERROR,
294 'Message failed in getting a method %r from an object %r' % \
295 (self.method_id, obj,),
296 error=sys.exc_info())
297 method = None
298 self.setExecutionState(MESSAGE_NOT_EXECUTABLE, context=activity_tool)
299 else:
300 if activity_tool.activity_timing_log:
301 result = activity_timing_method(method, self.args, self.kw)
302 else:
303 result = method(*self.args, **self.kw)
304 finally:
305 setSecurityManager(old_security_manager)
306
307 if method is not None:
308 self.activateResult(activity_tool, result, obj)
309 self.setExecutionState(MESSAGE_EXECUTED)
310 except:
311 self.setExecutionState(MESSAGE_NOT_EXECUTED, context=activity_tool)
312
313 def validate(self, activity, activity_tool, check_order_validation=1):
314 return activity.validate(activity_tool, self,
315 check_order_validation=check_order_validation,
316 **self.activity_kw)
317
318 def getDependentMessageList(self, activity, activity_tool):
319 return activity.getDependentMessageList(activity_tool, self, **self.activity_kw)
320
321 def notifyUser(self, activity_tool, retry=False):
322 """Notify the user that the activity failed."""
323 portal = activity_tool.getPortalObject()
324 user_email = portal.getProperty('email_to_address',
325 portal.getProperty('email_from_address'))
326
327 email_from_name = portal.getProperty('email_from_name',
328 portal.getProperty('email_from_address'))
329 call_traceback = ''
330 if self.call_traceback:
331 call_traceback = 'Created at:\n%s' % self.call_traceback
332
333 fail_count = self.line.retry + 1
334 if self.getExecutionState() == MESSAGE_NOT_EXECUTABLE:
335 message = "Not executable activity"
336 elif retry:
337 message = "Pending activity already failed %s times" % fail_count
338 else:
339 message = "Activity failed"
340 path = '/'.join(self.object_path)
341 mail_text = """From: %s <%s>
342 To: %s
343 Subject: %s: %s/%s
344
345 Node: %s
346 Failures: %s
347 User name: %r
348 Document: %s
349 Method: %s
350 Arguments: %r
351 Named Parameters: %r
352 %s
353
354 Exception: %s %s
355
356 %s
357 """ % (email_from_name, activity_tool.email_from_address, user_email,
358 message, path, self.method_id,
359 activity_tool.getCurrentNode(), fail_count,
360 self.user_name, path, self.method_id, self.args, self.kw,
361 call_traceback, self.exc_type, self.exc_value, self.traceback)
362
363 if isinstance(mail_text, unicode):
364 # __traceback_info__ can turn the tracebacks into unicode strings, but
365 # MailHost.send (in Zope 2.8) will not be able to parse headers if the
366 # mail_text is passed as a unicode.
367 mail_text = mail_text.encode('utf8')
368 try:
369 activity_tool.MailHost.send( mail_text )
370 except (socket.error, MailHostError), message:
371 LOG('ActivityTool.notifyUser', WARNING, 'Mail containing failure information failed to be sent: %s. Exception was: %s %s\n%s' % (message, self.exc_type, self.exc_value, self.traceback))
372
373 def reactivate(self, activity_tool, activity=DEFAULT_ACTIVITY):
374 # Reactivate the original object.
375 obj= self.getObject(activity_tool)
376 old_security_manager = getSecurityManager()
377 try:
378 # Change user if required (TO BE DONE)
379 # We will change the user only in order to execute this method
380 user = self.changeUser(self.user_name, activity_tool)
381 active_obj = obj.activate(activity=activity, **self.activity_kw)
382 getattr(active_obj, self.method_id)(*self.args, **self.kw)
383 finally:
384 # Use again the previous user
385 setSecurityManager(old_security_manager)
386
387 def setExecutionState(self, is_executed, exc_info=None, log=True, context=None):
388 """
389 Set message execution state.
390
391 is_executed can be one of MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED and
392 MESSAGE_NOT_EXECUTABLE (variables defined above).
393
394 exc_info must be - if given - similar to sys.exc_info() return value.
395
396 log must be - if given - True or False. If True, a log line will be
397 emited with failure details. This parameter should only be used when
398 invoking this method on a list of messages to avoid log flood. It is
399 caller's responsability to output a log line summing up all errors, and
400 to store error in Zope's error_log.
401
402 context must be - if given - an object wrapped in acquisition context.
403 It is used to access Zope's error_log object. It is not used if log is
404 False.
405
406 If given state is not MESSAGE_EXECUTED, it will also store given
407 exc_info. If not given, it will extract one using sys.exc_info().
408 If final exc_info does not contain any exception, current stack trace
409 will be stored instead: it will hopefuly help understand why message
410 is in an error state.
411 """
412 assert is_executed in (MESSAGE_NOT_EXECUTED, MESSAGE_EXECUTED, MESSAGE_NOT_EXECUTABLE)
413 self.is_executed = is_executed
414 if is_executed != MESSAGE_EXECUTED:
415 if exc_info is None:
416 exc_info = sys.exc_info()
417 if exc_info == (None, None, None):
418 # Raise a dummy exception, ignore it, fetch it and use it as if it was the error causing message non-execution. This will help identifyting the cause of this misbehaviour.
419 try:
420 raise Exception, 'Message execution failed, but there is no exception to explain it. This is a dummy exception so that one can track down why we end up here outside of an exception handling code path.'
421 except:
422 pass
423 exc_info = sys.exc_info()
424 if log:
425 LOG('ActivityTool', WARNING, 'Could not call method %s on object %s. Activity created at:\n%s' % (self.method_id, self.object_path, self.call_traceback), error=exc_info)
426 # push the error in ZODB error_log
427 error_log = getattr(context, 'error_log', None)
428 if error_log is not None:
429 error_log.raising(exc_info)
430 self.exc_type = exc_info[0]
431 self.exc_value = str(exc_info[1])
432 self.traceback = ''.join(ExceptionFormatter.format_exception(*exc_info))
433
434 def getExecutionState(self):
435 return self.is_executed
436
437 class Method:
438
439 def __init__(self, passive_self, activity, active_process, kw, method_id):
440 self.__passive_self = passive_self
441 self.__activity = activity
442 self.__active_process = active_process
443 self.__kw = kw
444 self.__method_id = method_id
445
446 def __call__(self, *args, **kw):
447 m = Message(self.__passive_self, self.__active_process, self.__kw, self.__method_id, args, kw)
448 portal_activities = self.__passive_self.getPortalObject().portal_activities
449 if portal_activities.activity_tracking:
450 activity_tracking_logger.info('queuing message: activity=%s, object_path=%s, method_id=%s, args=%s, kw=%s, activity_kw=%s, user_name=%s' % (self.__activity, '/'.join(m.object_path), m.method_id, m.args, m.kw, m.activity_kw, m.user_name))
451 activity_dict[self.__activity].queueMessage(portal_activities, m)
452
453 allow_class(Method)
454
455 class ActiveWrapper:
456
457 def __init__(self, passive_self, activity, active_process, **kw):
458 self.__dict__['__passive_self'] = passive_self
459 self.__dict__['__activity'] = activity
460 self.__dict__['__active_process'] = active_process
461 self.__dict__['__kw'] = kw
462
463 def __getattr__(self, id):
464 return Method(self.__dict__['__passive_self'], self.__dict__['__activity'],
465 self.__dict__['__active_process'],
466 self.__dict__['__kw'], id)
467
468 def __repr__(self):
469 return '<%s at 0x%x to %r>' % (self.__class__.__name__, id(self),
470 self.__dict__['__passive_self'])
471
472 # True when activities cannot be executing any more.
473 has_processed_shutdown = False
474
475 def cancelProcessShutdown():
476 """
477 This method reverts the effect of calling "process_shutdown" on activity
478 tool.
479 """
480 global has_processed_shutdown
481 is_running_lock.release()
482 has_processed_shutdown = False
483
484 class ActivityTool (Folder, UniqueObject):
485 """
486 ActivityTool is the central point for activity management.
487
488 Improvement to consider to reduce locks:
489
490 Idea 1: create an SQL tool which accumulate queries and executes them at the end of a transaction,
491 thus allowing all SQL transaction to happen in a very short time
492 (this would also be a great way of using MyISAM tables)
493
494 Idea 2: do the same at the level of ActivityTool
495
496 Idea 3: do the same at the level of each activity (ie. queueMessage
497 accumulates and fires messages at the end of the transactino)
498 """
499 id = 'portal_activities'
500 meta_type = 'CMF Activity Tool'
501 portal_type = 'Activity Tool'
502 allowed_types = ( 'CMF Active Process', )
503 security = ClassSecurityInfo()
504
505 isIndexable = False
506
507 manage_options = tuple(
508 [ { 'label' : 'Overview', 'action' : 'manage_overview' }
509 , { 'label' : 'Activities', 'action' : 'manageActivities' }
510 , { 'label' : 'LoadBalancing', 'action' : 'manageLoadBalancing'}
511 , { 'label' : 'Advanced', 'action' : 'manageActivitiesAdvanced' }
512 ,
513 ] + list(Folder.manage_options))
514
515 security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivities' )
516 manageActivities = DTMLFile( 'dtml/manageActivities', globals() )
517
518 security.declareProtected( CMFCorePermissions.ManagePortal , 'manageActivitiesAdvanced' )
519 manageActivitiesAdvanced = DTMLFile( 'dtml/manageActivitiesAdvanced', globals() )
520
521 security.declareProtected( CMFCorePermissions.ManagePortal , 'manage_overview' )
522 manage_overview = DTMLFile( 'dtml/explainActivityTool', globals() )
523
524 security.declareProtected( CMFCorePermissions.ManagePortal , 'manageLoadBalancing' )
525 manageLoadBalancing = DTMLFile( 'dtml/manageLoadBalancing', globals() )
526
527 distributingNode = ''
528 _nodes = ()
529 activity_creation_trace = False
530 activity_tracking = False
531 activity_timing_log = False
532 cancel_and_invoke_links_hidden = False
533
534 def SQLDict_setPriority(self, **kw):
535 real_SQLDict_setPriority = getattr(self.aq_parent, 'SQLDict_setPriority')
536 LOG('ActivityTool', 0, real_SQLDict_setPriority(src__=1, **kw))
537 return real_SQLDict_setPriority(**kw)
538
539 def __init__(self):
540 return Folder.__init__(self, ActivityTool.id)
541
542 # Filter content (ZMI))
543 def filtered_meta_types(self, user=None):
544 # Filters the list of available meta types.
545 all = Folder.filtered_meta_types(self)
546 meta_types = []
547 for meta_type in self.all_meta_types():
548 if meta_type['name'] in self.allowed_types:
549 meta_types.append(meta_type)
550 return meta_types
551
552 def maybeMigrateConnectionClass(self):
553 connection_id = 'cmf_activity_sql_connection'
554 sql_connection = getattr(self, connection_id, None)
555 if (sql_connection is not None and
556 not isinstance(sql_connection, ActivityConnection)):
557 # SQL Connection migration is needed
558 LOG('ActivityTool', WARNING, "Migrating MySQL Connection class")
559 parent = aq_parent(aq_inner(sql_connection))
560 parent._delObject(sql_connection.getId())
561 new_sql_connection = ActivityConnection(connection_id,
562 sql_connection.title,
563 sql_connection.connection_string)
564 parent._setObject(connection_id, new_sql_connection)
565
566 def initialize(self):
567 global is_initialized
568 from Activity import RAMQueue, RAMDict, SQLQueue, SQLDict
569 # Initialize each queue
570 for activity in activity_dict.itervalues():
571 activity.initialize(self)
572 self.maybeMigrateConnectionClass()
573 is_initialized = True
574
575 security.declareProtected(Permissions.manage_properties, 'isSubscribed')
576 def isSubscribed(self):
577 """
578 return True, if we are subscribed to TimerService.
579 Otherwise return False.
580 """
581 service = getTimerService(self)
582 if not service:
583 LOG('ActivityTool', INFO, 'TimerService not available')
584 return False
585
586 path = '/'.join(self.getPhysicalPath())
587 if path in service.lisSubscriptions():
588 return True
589 return False
590
591 security.declareProtected(Permissions.manage_properties, 'subscribe')
592 def subscribe(self, REQUEST=None, RESPONSE=None):
593 """ subscribe to the global Timer Service """
594 service = getTimerService(self)
595 url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
596 if not service:
597 LOG('ActivityTool', INFO, 'TimerService not available')
598 url += urllib.quote('TimerService not available')
599 else:
600 service.subscribe(self)
601 url += urllib.quote("Subscribed to Timer Service")
602 if RESPONSE is not None:
603 RESPONSE.redirect(url)
604
605 security.declareProtected(Permissions.manage_properties, 'unsubscribe')
606 def unsubscribe(self, REQUEST=None, RESPONSE=None):
607 """ unsubscribe from the global Timer Service """
608 service = getTimerService(self)
609 url = '%s/manageLoadBalancing?manage_tabs_message=' %self.absolute_url()
610 if not service:
611 LOG('ActivityTool', INFO, 'TimerService not available')
612 url += urllib.quote('TimerService not available')
613 else:
614 service.unsubscribe(self)
615 url += urllib.quote("Unsubscribed from Timer Service")
616 if RESPONSE is not None:
617 RESPONSE.redirect(url)
618
619 security.declareProtected(Permissions.manage_properties, 'isActivityTrackingEnabled')
620 def isActivityTrackingEnabled(self):
621 return self.activity_tracking
622
623 security.declareProtected(Permissions.manage_properties, 'manage_enableActivityTracking')
624 def manage_enableActivityTracking(self, REQUEST=None, RESPONSE=None):
625 """
626 Enable activity tracing.
627 """
628 self.activity_tracking = True
629 if RESPONSE is not None:
630 url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
631 url += urllib.quote('Tracking log enabled')
632 RESPONSE.redirect(url)
633
634 security.declareProtected(Permissions.manage_properties, 'manage_disableActivityTracking')
635 def manage_disableActivityTracking(self, REQUEST=None, RESPONSE=None):
636 """
637 Disable activity tracing.
638 """
639 self.activity_tracking = False
640 if RESPONSE is not None:
641 url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
642 url += urllib.quote('Tracking log disabled')
643 RESPONSE.redirect(url)
644
645 security.declareProtected(Permissions.manage_properties, 'isActivityTimingLoggingEnabled')
646 def isActivityTimingLoggingEnabled(self):
647 return self.activity_timing_log
648
649 security.declareProtected(Permissions.manage_properties, 'manage_enableActivityTimingLogging')
650 def manage_enableActivityTimingLogging(self, REQUEST=None, RESPONSE=None):
651 """
652 Enable activity timing logging.
653 """
654 self.activity_timing_log = True
655 if RESPONSE is not None:
656 url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
657 url += urllib.quote('Timing log enabled')
658 RESPONSE.redirect(url)
659
660 security.declareProtected(Permissions.manage_properties, 'manage_disableActivityTimingLogging')
661 def manage_disableActivityTimingLogging(self, REQUEST=None, RESPONSE=None):
662 """
663 Disable activity timing logging.
664 """
665 self.activity_timing_log = False
666 if RESPONSE is not None:
667 url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
668 url += urllib.quote('Timing log disabled')
669 RESPONSE.redirect(url)
670
671 security.declareProtected(Permissions.manage_properties, 'isActivityCreationTraceEnabled')
672 def isActivityCreationTraceEnabled(self):
673 return self.activity_creation_trace
674
675 security.declareProtected(Permissions.manage_properties, 'manage_enableActivityCreationTrace')
676 def manage_enableActivityCreationTrace(self, REQUEST=None, RESPONSE=None):
677 """
678 Enable activity creation trace.
679 """
680 self.activity_creation_trace = True
681 if RESPONSE is not None:
682 url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
683 url += urllib.quote('Activity creation trace enabled')
684 RESPONSE.redirect(url)
685
686 security.declareProtected(Permissions.manage_properties, 'manage_disableActivityCreationTrace')
687 def manage_disableActivityCreationTrace(self, REQUEST=None, RESPONSE=None):
688 """
689 Disable activity creation trace.
690 """
691 self.activity_creation_trace = False
692 if RESPONSE is not None:
693 url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
694 url += urllib.quote('Activity creation trace disabled')
695 RESPONSE.redirect(url)
696
697 security.declareProtected(Permissions.manage_properties, 'isCancelAndInvokeLinksHidden')
698 def isCancelAndInvokeLinksHidden(self):
699 return self.cancel_and_invoke_links_hidden
700
701 security.declareProtected(Permissions.manage_properties, 'manage_hideCancelAndInvokeLinks')
702 def manage_hideCancelAndInvokeLinks(self, REQUEST=None, RESPONSE=None):
703 """
704 """
705 self.cancel_and_invoke_links_hidden = True
706 if RESPONSE is not None:
707 url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
708 url += urllib.quote('Cancel and invoke links hidden')
709 RESPONSE.redirect(url)
710
711 security.declareProtected(Permissions.manage_properties, 'manage_showCancelAndInvokeLinks')
712 def manage_showCancelAndInvokeLinks(self, REQUEST=None, RESPONSE=None):
713 """
714 """
715 self.cancel_and_invoke_links_hidden = False
716 if RESPONSE is not None:
717 url = '%s/manageActivitiesAdvanced?manage_tabs_message=' % self.absolute_url()
718 url += urllib.quote('Cancel and invoke links visible')
719 RESPONSE.redirect(url)
720
721 def manage_beforeDelete(self, item, container):
722 self.unsubscribe()
723 Folder.inheritedAttribute('manage_beforeDelete')(self, item, container)
724
725 def manage_afterAdd(self, item, container):
726 self.subscribe()
727 Folder.inheritedAttribute('manage_afterAdd')(self, item, container)
728
729 def getCurrentNode(self):
730 """ Return current node in form ip:port """
731 global currentNode
732 if currentNode is None:
733 ip = port = ''
734 from asyncore import socket_map
735 for k, v in socket_map.items():
736 if hasattr(v, 'addr'):
737 # see Zope/lib/python/App/ApplicationManager.py: def getServers(self)
738 type = str(getattr(v, '__class__', 'unknown'))
739 if type == 'ZServer.HTTPServer.zhttp_server':
740 ip, port = v.addr
741 break
742 if ip == '0.0.0.0':
743 ip = socket.gethostbyname(socket.gethostname())
744 currentNode = '%s:%s' %(ip, port)
745 return currentNode
746
747 security.declarePublic('getDistributingNode')
748 def getDistributingNode(self):
749 """ Return the distributingNode """
750 return self.distributingNode
751
752 def getNodeList(self, role=None):
753 node_dict = self.getNodeDict()
754 if role is None:
755 result = [x for x in node_dict.keys()]
756 else:
757 result = [node_id for node_id, node_role in node_dict.items() if node_role == role]
758 result.sort()
759 return result
760
761 def getNodeDict(self):
762 nodes = self._nodes
763 if isinstance(nodes, tuple):
764 new_nodes = OIBTree()
765 new_nodes.update([(x, ROLE_PROCESSING) for x in self._nodes])
766 self._nodes = nodes = new_nodes
767 return nodes
768
769 def registerNode(self, node):
770 node_dict = self.getNodeDict()
771 if not node_dict.has_key(node):
772 if len(node_dict) == 0: # If we are registering the first node, make
773 # it both the distributing node and a processing
774 # node.
775 role = ROLE_PROCESSING
776 self.distributingNode = node
777 else:
778 role = ROLE_IDLE
779 self.updateNode(node, role)
780
781 def updateNode(self, node, role):
782 node_dict = self.getNodeDict()
783 node_dict[node] = role
784
785 security.declareProtected(CMFCorePermissions.ManagePortal, 'getProcessingNodeList')
786 def getProcessingNodeList(self):
787 return self.getNodeList(role=ROLE_PROCESSING)
788
789 security.declareProtected(CMFCorePermissions.ManagePortal, 'getIdleNodeList')
790 def getIdleNodeList(self):
791 return self.getNodeList(role=ROLE_IDLE)
792
793 def _isValidNodeName(self, node_name) :
794 """Check we have been provided a good node name"""
795 return isinstance(node_name, str) and NODE_RE.match(node_name)
796
797 security.declarePublic('manage_setDistributingNode')
798 def manage_setDistributingNode(self, distributingNode, REQUEST=None):
799 """ set the distributing node """
800 if not distributingNode or self._isValidNodeName(distributingNode):
801 self.distributingNode = distributingNode
802 if REQUEST is not None:
803 REQUEST.RESPONSE.redirect(
804 REQUEST.URL1 +
805 '/manageLoadBalancing?manage_tabs_message=' +
806 urllib.quote("Distributing Node successfully changed."))
807 else :
808 if REQUEST is not None:
809 REQUEST.RESPONSE.redirect(
810 REQUEST.URL1 +
811 '/manageLoadBalancing?manage_tabs_message=' +
812 urllib.quote("Malformed Distributing Node."))
813
814 security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_delNode')
815 def manage_delNode(self, unused_node_list=None, REQUEST=None):
816 """ delete selected unused nodes """
817 processing_node = self.getDistributingNode()
818 updated_processing_node = False
819 if unused_node_list is not None:
820 node_dict = self.getNodeDict()
821 for node in unused_node_list:
822 if node in node_dict:
823 del node_dict[node]
824 if node == processing_node:
825 self.processing_node = ''
826 updated_processing_node = True
827 if REQUEST is not None:
828 if unused_node_list is None:
829 message = "No unused node selected, nothing deleted."
830 else:
831 message = "Deleted nodes %r." % (unused_node_list, )
832 if updated_processing_node:
833 message += "Disabled distributing node because it was deleted."
834 REQUEST.RESPONSE.redirect(
835 REQUEST.URL1 +
836 '/manageLoadBalancing?manage_tabs_message=' +
837 urllib.quote(message))
838
839 security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_addToProcessingList')
840 def manage_addToProcessingList(self, unused_node_list=None, REQUEST=None):
841 """ Change one or more idle nodes into processing nodes """
842 if unused_node_list is not None:
843 node_dict = self.getNodeDict()
844 for node in unused_node_list:
845 self.updateNode(node, ROLE_PROCESSING)
846 if REQUEST is not None:
847 if unused_node_list is None:
848 message = "No unused node selected, nothing done."
849 else:
850 message = "Nodes now procesing: %r." % (unused_node_list, )
851 REQUEST.RESPONSE.redirect(
852 REQUEST.URL1 +
853 '/manageLoadBalancing?manage_tabs_message=' +
854 urllib.quote(message))
855
856 security.declareProtected(CMFCorePermissions.ManagePortal, 'manage_removeFromProcessingList')
857 def manage_removeFromProcessingList(self, processing_node_list=None, REQUEST=None):
858 """ Change one or more procesing nodes into idle nodes """
859 if processing_node_list is not None:
860 node_dict = self.getNodeDict()
861 for node in processing_node_list:
862 self.updateNode(node, ROLE_IDLE)
863 if REQUEST is not None:
864 if processing_node_list is None:
865 message = "No used node selected, nothing done."
866 else:
867 message = "Nodes now unused %r." % (processing_node_list, )
868 REQUEST.RESPONSE.redirect(
869 REQUEST.URL1 +
870 '/manageLoadBalancing?manage_tabs_message=' +
871 urllib.quote(message))
872
873 def process_shutdown(self, phase, time_in_phase):
874 """
875 Prevent shutdown from happening while an activity queue is
876 processing a batch.
877 """
878 global has_processed_shutdown
879 if phase == 3 and not has_processed_shutdown:
880 has_processed_shutdown = True
881 LOG('CMFActivity', INFO, "Shutdown: Waiting for activities to finish.")
882 is_running_lock.acquire()
883 LOG('CMFActivity', INFO, "Shutdown: Activities finished.")
884
885 def process_timer(self, tick, interval, prev="", next=""):
886 """
887 Call distribute() if we are the Distributing Node and call tic()
888 with our node number.
889 This method is called by TimerService in the interval given
890 in zope.conf. The Default is every 5 seconds.
891 """
892 # Prevent TimerService from starting multiple threads in parallel
893 acquired = timerservice_lock.acquire(0)
894 if not acquired:
895 return
896
897 # make sure our skin is set-up. On CMF 1.5 it's setup by acquisition,
898 # but on 2.2 it's by traversal, and our site probably wasn't traversed
899 # by the timerserver request, which goes into the Zope Control_Panel
900 # calling it a second time is a harmless and cheap no-op.
901 # both setupCurrentSkin and REQUEST are acquired from containers.
902 self.setupCurrentSkin(self.REQUEST)
903 try:
904 old_sm = getSecurityManager()
905 try:
906 try:
907 # get owner of portal_catalog, so normally we should be able to
908 # have the permission to invoke all activities
909 user = self.portal_catalog.getWrappedOwner()
910 newSecurityManager(self.REQUEST, user)
911
912 currentNode = self.getCurrentNode()
913 self.registerNode(currentNode)
914 processing_node_list = self.getNodeList(role=ROLE_PROCESSING)
915
916 # only distribute when we are the distributingNode
917 if (self.getDistributingNode() == currentNode):
918 self.distribute(len(processing_node_list))
919
920 # SkinsTool uses a REQUEST cache to store skin objects, as
921 # with TimerService we have the same REQUEST over multiple
922 # portals, we clear this cache to make sure the cache doesn't
923 # contains skins from another portal.
924 stool = getToolByName(self, 'portal_skins', None)
925 if stool is not None:
926 stool.changeSkin(None)
927
928 # call tic for the current processing_node
929 # the processing_node numbers are the indices of the elements in the node tuple +1
930 # because processing_node starts form 1
931 if currentNode in processing_node_list:
932 self.tic(processing_node_list.index(currentNode)+1)
933 except:
934 # Catch ALL exception to avoid killing timerserver.
935 LOG('ActivityTool', ERROR, 'process_timer received an exception', error=sys.exc_info())
936 finally:
937 setSecurityManager(old_sm)
938 finally:
939 timerservice_lock.release()
940
941 security.declarePublic('distribute')
942 def distribute(self, node_count=1):
943 """
944 Distribute load
945 """
946 # Initialize if needed
947 if not is_initialized:
948 self.initialize()
949
950 # Call distribute on each queue
951 for activity in activity_dict.itervalues():
952 activity.distribute(aq_inner(self), node_count)
953
954 security.declarePublic('tic')
955 def tic(self, processing_node=1, force=0):
956 """
957 Starts again an activity
958 processing_node starts from 1 (there is not node 0)
959 """
960 global active_threads
961
962 # return if the number of threads is too high
963 # else, increase the number of active_threads and continue
964 tic_lock.acquire()
965 too_many_threads = (active_threads >= max_active_threads)
966 if not too_many_threads or force:
967 active_threads += 1
968 else:
969 tic_lock.release()
970 raise RuntimeError, 'Too many threads'
971 tic_lock.release()
972
973 # Initialize if needed
974 if not is_initialized:
975 self.initialize()
976
977 inner_self = aq_inner(self)
978
979 try:
980 #Sort activity list by priority
981 activity_list = sorted(activity_dict.itervalues(),
982 key=lambda activity: activity.getPriority(self))
983
984 # Wakeup each queue
985 for activity in activity_list:
986 activity.wakeup(inner_self, processing_node)
987
988 # Process messages on each queue in round robin
989 has_awake_activity = 1
990 while has_awake_activity:
991 has_awake_activity = 0
992 for activity in activity_list:
993 acquired = is_running_lock.acquire(0)
994 if acquired:
995 try:
996 activity.tic(inner_self, processing_node) # Transaction processing is the responsability of the activity
997 has_awake_activity = has_awake_activity or activity.isAwake(inner_self, processing_node)
998 finally:
999 is_running_lock.release()
1000 finally:
1001 # decrease the number of active_threads
1002 tic_lock.acquire()
1003 active_threads -= 1
1004 tic_lock.release()
1005
1006 def hasActivity(self, *args, **kw):
1007 # Check in each queue if the object has deferred tasks
1008 # if not argument is provided, then check on self
1009 if len(args) > 0:
1010 obj = args[0]
1011 else:
1012 obj = self
1013 for activity in activity_dict.itervalues():
1014 if activity.hasActivity(aq_inner(self), obj, **kw):
1015 return True
1016 return False
1017
1018 def getActivityBuffer(self, create_if_not_found=True):
1019 """
1020 Get activtity buffer for this thread for this activity tool.
1021 If no activity buffer is found at lowest level and create_if_not_found
1022 is True, create one.
1023 Intermediate level is unconditionaly created if non existant because
1024 chances are it will be used in the instance life.
1025 Lock is held when checking for intermediate level existance
1026 because:
1027 - intermediate level dict must not be created in 2 threads at the
1028 same time, since one creation would destroy the existing one.
1029 It's released after that step because:
1030 - lower level access is at thread scope, thus by definition there
1031 can be only one access at a time to a key
1032 - GIL protects us when accessing python instances
1033 """
1034 # Safeguard: make sure we are wrapped in acquisition context before
1035 # using our path as an activity tool instance-wide identifier.
1036 assert getattr(self, 'aq_self', None) is not None
1037 my_instance_key = self.getPhysicalPath()
1038 my_thread_key = get_ident()
1039 global_activity_buffer_lock.acquire()
1040 try:
1041 if my_instance_key not in global_activity_buffer:
1042 global_activity_buffer[my_instance_key] = {}
1043 finally:
1044 global_activity_buffer_lock.release()
1045 thread_activity_buffer = global_activity_buffer[my_instance_key]
1046 try:
1047 return thread_activity_buffer[my_thread_key]
1048 except KeyError:
1049 if create_if_not_found:
1050 buffer = ActivityBuffer()
1051 else:
1052 buffer = None
1053 thread_activity_buffer[my_thread_key] = buffer
1054 return buffer
1055
1056 security.declarePrivate('activateObject')
1057 def activateObject(self, object, activity, active_process, **kw):
1058 if not is_initialized:
1059 self.initialize()
1060 self.getActivityBuffer()
1061 return ActiveWrapper(object, activity, active_process, **kw)
1062
1063 def deferredQueueMessage(self, activity, message):
1064 activity_buffer = self.getActivityBuffer()
1065 activity_buffer.deferredQueueMessage(self, activity, message)
1066
1067 def deferredDeleteMessage(self, activity, message):
1068 activity_buffer = self.getActivityBuffer()
1069 activity_buffer.deferredDeleteMessage(self, activity, message)
1070
1071 def getRegisteredMessageList(self, activity):
1072 activity_buffer = self.getActivityBuffer(create_if_not_found=False)
1073 if activity_buffer is not None:
1074 #activity_buffer._register() # This is required if flush flush is called outside activate
1075 return activity.getRegisteredMessageList(activity_buffer,
1076 aq_inner(self))
1077 else:
1078 return []
1079
1080 def unregisterMessage(self, activity, message):
1081 activity_buffer = self.getActivityBuffer()
1082 #activity_buffer._register()
1083 return activity.unregisterMessage(activity_buffer, aq_inner(self), message)
1084
1085 def flush(self, obj, invoke=0, **kw):
1086 if not is_initialized:
1087 self.initialize()
1088 self.getActivityBuffer()
1089 if isinstance(obj, tuple):
1090 object_path = obj
1091 else:
1092 object_path = obj.getPhysicalPath()
1093 for activity in activity_dict.itervalues():
1094 activity.flush(aq_inner(self), object_path, invoke=invoke, **kw)
1095
1096 def start(self, **kw):
1097 if not is_initialized:
1098 self.initialize()
1099 for activity in activity_dict.itervalues():
1100 activity.start(aq_inner(self), **kw)
1101
1102 def stop(self, **kw):
1103 if not is_initialized:
1104 self.initialize()
1105 for activity in activity_dict.itervalues():
1106 activity.stop(aq_inner(self), **kw)
1107
1108 def invoke(self, message):
1109 if self.activity_tracking:
1110 activity_tracking_logger.info('invoking message: object_path=%s, method_id=%s, args=%r, kw=%r, activity_kw=%r, user_name=%s' % ('/'.join(message.object_path), message.method_id, message.args, message.kw, message.activity_kw, message.user_name))
1111 old_localizer_context = False
1112 if getattr(self, 'aq_chain', None) is not None:
1113 # Grab existing acquisition chain and extrach base objects.
1114 base_chain = [aq_base(x) for x in self.aq_chain]
1115 # Grab existig request (last chain item) and create a copy.
1116 request_container = base_chain.pop()
1117 request = request_container.REQUEST
1118 # Generate PARENTS value. Sadly, we cannot reuse base_chain since
1119 # PARENTS items must be wrapped in acquisition
1120 parents = []
1121 application = self.getPhysicalRoot().aq_base
1122 for parent in self.aq_chain:
1123 if parent.aq_base is application:
1124 break
1125 parents.append(parent)
1126 # XXX: REQUEST.clone() requires PARENTS to be set, and it's not when
1127 # runing unit tests. Recreate it if it does not exist.
1128 if getattr(request.other, 'PARENTS', None) is None:
1129 request.other['PARENTS'] = parents
1130 # XXX: itools (used by Localizer) requires PATH_INFO to be set, and it's
1131 # not when runing unit tests. Recreate it if it does not exist.
1132 if request.environ.get('PATH_INFO') is None:
1133 request.environ['PATH_INFO'] = '/Control_Panel/timer_service/process_timer'
1134
1135 # restore request information
1136 new_request = request.clone()
1137 request_info = message.request_info
1138 # PARENTS is truncated by clone
1139 new_request.other['PARENTS'] = parents
1140 if '_script' in request_info:
1141 new_request._script = request_info['_script']
1142 if 'SERVER_URL' in request_info:
1143 new_request.other['SERVER_URL'] = request_info['SERVER_URL']
1144 if 'VirtualRootPhysicalPath' in request_info:
1145 new_request.other['VirtualRootPhysicalPath'] = request_info['VirtualRootPhysicalPath']
1146 if 'HTTP_ACCEPT_LANGUAGE' in request_info:
1147 new_request.environ['HTTP_ACCEPT_LANGUAGE'] = request_info['HTTP_ACCEPT_LANGUAGE']
1148 # Replace Localizer/iHotfix Context, saving existing one
1149 localizer_context = LocalizerContext(new_request)
1150 id = get_ident()
1151 localizer_lock.acquire()
1152 try:
1153 old_localizer_context = localizer_contexts.get(id)
1154 localizer_contexts[id] = localizer_context
1155 finally:
1156 localizer_lock.release()
1157 # Execute Localizer/iHotfix "patch 2"
1158 new_request.processInputs()
1159
1160 new_request_container = request_container.__class__(REQUEST=new_request)
1161 # Recreate acquisition chain.
1162 my_self = new_request_container
1163 base_chain.reverse()
1164 for item in base_chain:
1165 my_self = item.__of__(my_self)
1166 else:
1167 my_self = self
1168 LOG('CMFActivity.ActivityTool.invoke', INFO, 'Strange: invoke is called outside of acquisition context.')
1169 try:
1170 message(my_self)
1171 finally:
1172 if my_self is not self: # We rewrapped self
1173 # Restore default skin selection
1174 skinnable = self.getPortalObject()
1175 skinnable.changeSkin(skinnable.getSkinNameFromRequest(request))
1176 if old_localizer_context is not False:
1177 # Restore Localizer/iHotfix context
1178 id = get_ident()
1179 localizer_lock.acquire()
1180 try:
1181 if old_localizer_context is None:
1182 del localizer_contexts[id]
1183 else:
1184 localizer_contexts[id] = old_localizer_context
1185 finally:
1186 localizer_lock.release()
1187 if self.activity_tracking:
1188 activity_tracking_logger.info('invoked message')
1189 if my_self is not self: # We rewrapped self
1190 for held in my_self.REQUEST._held:
1191 self.REQUEST._hold(held)
1192
1193 def invokeGroup(self, method_id, message_list, activity, merge_duplicate):
1194 if self.activity_tracking:
1195 activity_tracking_logger.info(
1196 'invoking group messages: method_id=%s, paths=%s'
1197 % (method_id, ['/'.join(m.object_path) for m in message_list]))
1198 # Invoke a group method.
1199 expanded_object_list = []
1200 new_message_list = []
1201 path_set = set()
1202 # Filter the list of messages. If an object is not available, mark its
1203 # message as non-executable. In addition, expand an object if necessary,
1204 # and make sure that no duplication happens.
1205 for m in message_list:
1206 # alternate method is used to segregate objects which cannot be grouped.
1207 alternate_method_id = m.activity_kw.get('alternate_method_id')
1208 try:
1209 obj = m.getObject(self)
1210 except KeyError:
1211 LOG('CMFActivity', ERROR,
1212 'Message failed in getting an object from the path %r' % \
1213 (m.object_path,),
1214 error=sys.exc_info())
1215 m.setExecutionState(MESSAGE_NOT_EXECUTABLE, context=self)
1216 continue
1217 try:
1218 if m.hasExpandMethod():
1219 subobject_list = m.getObjectList(self)
1220 else:
1221 subobject_list = (obj,)
1222 for subobj in subobject_list:
1223 if merge_duplicate:
1224 path = subobj.getPath()
1225 if path in path_set:
1226 continue
1227 path_set.add(path)
1228 if alternate_method_id is not None \
1229 and hasattr(aq_base(subobj), alternate_method_id):
1230 # if this object is alternated,
1231 # generate a new single active object
1232 activity_kw = m.activity_kw.copy()
1233 activity_kw.pop('group_method_id', None)
1234 activity_kw.pop('group_id', None)
1235 active_obj = subobj.activate(activity=activity, **activity_kw)
1236 getattr(active_obj, alternate_method_id)(*m.args, **m.kw)
1237 else:
1238 expanded_object_list.append((subobj, m.args, m.kw))
1239 new_message_list.append((m, obj))
1240 except:
1241 m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
1242
1243 try:
1244 if len(expanded_object_list) > 0:
1245 method = self.unrestrictedTraverse(method_id)
1246 # FIXME: how to apply security here?
1247 # NOTE: expanded_object_list must be set to failed objects by the
1248 # callee. If it fully succeeds, expanded_object_list must be
1249 # empty when returning.
1250 result = method(expanded_object_list)
1251 else:
1252 result = None
1253 except:
1254 # In this case, the group method completely failed.
1255 exc_info = sys.exc_info()
1256 for m, obj in new_message_list:
1257 m.setExecutionState(MESSAGE_NOT_EXECUTED, exc_info, log=False)
1258 LOG('WARNING ActivityTool', 0,
1259 'Could not call method %s on objects %s' %
1260 (method_id, [x[0] for x in expanded_object_list]), error=exc_info)
1261 error_log = getattr(self, 'error_log', None)
1262 if error_log is not None:
1263 error_log.raising(exc_info)
1264 else:
1265 # Obtain all indices of failed messages.
1266 # Note that this can be a partial failure.
1267 failed_message_set = set(id(x[2]) for x in expanded_object_list)
1268 # Only for succeeded messages, an activity process is invoked (if any).
1269 for m, obj in new_message_list:
1270 # We use id of kw dict (persistent object) to know if there is a
1271 # failed 3-tuple corresponding to Message m.
1272 if id(m.kw) in failed_message_set:
1273 m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
1274 else:
1275 try:
1276 m.activateResult(self, result, obj)
1277 except:
1278 m.setExecutionState(MESSAGE_NOT_EXECUTED, context=self)
1279 else:
1280 m.setExecutionState(MESSAGE_EXECUTED, context=self)
1281 if self.activity_tracking:
1282 activity_tracking_logger.info('invoked group messages')
1283
1284 def newMessage(self, activity, path, active_process,
1285 activity_kw, method_id, *args, **kw):
1286 # Some Security Cheking should be made here XXX
1287 if not is_initialized:
1288 self.initialize()
1289 self.getActivityBuffer()
1290 activity_dict[activity].queueMessage(aq_inner(self),
1291 Message(path, active_process, activity_kw, method_id, args, kw))
1292
1293 security.declareProtected( CMFCorePermissions.ManagePortal, 'manageInvoke' )
1294 def manageInvoke(self, object_path, method_id, REQUEST=None):
1295 """
1296 Invokes all methods for object "object_path"
1297 """
1298 if type(object_path) is type(''):
1299 object_path = tuple(object_path.split('/'))
1300 self.flush(object_path,method_id=method_id,invoke=1)
1301 if REQUEST is not None:
1302 return REQUEST.RESPONSE.redirect('%s/%s' %
1303 (self.absolute_url(), 'manageActivities'))
1304
1305 security.declareProtected( CMFCorePermissions.ManagePortal, 'manageCancel' )
1306 def manageCancel(self, object_path, method_id, REQUEST=None):
1307 """
1308 Cancel all methods for object "object_path"
1309 """
1310 if type(object_path) is type(''):
1311 object_path = tuple(object_path.split('/'))
1312 self.flush(object_path,method_id=method_id,invoke=0)
1313 if REQUEST is not None:
1314 return REQUEST.RESPONSE.redirect('%s/%s' %
1315 (self.absolute_url(), 'manageActivities'))
1316
1317 security.declareProtected( CMFCorePermissions.ManagePortal,
1318 'manageClearActivities' )
1319 def manageClearActivities(self, keep=1, REQUEST=None):
1320 """
1321 Clear all activities and recreate tables.
1322 """
1323 folder = getToolByName(self, 'portal_skins').activity
1324
1325 # Obtain all pending messages.
1326 message_list_dict = {}
1327 if keep:
1328 for activity in activity_dict.itervalues():
1329 if hasattr(activity, 'dumpMessageList'):
1330 try:
1331 message_list_dict[activity.__class__.__name__] =\
1332 activity.dumpMessageList(self)
1333 except ConflictError:
1334 raise
1335 except:
1336 LOG('ActivityTool', WARNING,
1337 'could not dump messages from %s' %
1338 (activity,), error=sys.exc_info())
1339
1340 if getattr(folder, 'SQLDict_createMessageTable', None) is not None:
1341 try:
1342 folder.SQLDict_dropMessageTable()
1343 except ConflictError:
1344 raise
1345 except:
1346 LOG('CMFActivity', WARNING,
1347 'could not drop the message table',
1348 error=sys.exc_info())
1349 folder.SQLDict_createMessageTable()
1350
1351 if getattr(folder, 'SQLQueue_createMessageTable', None) is not None:
1352 try:
1353 folder.SQLQueue_dropMessageTable()
1354 except ConflictError:
1355 raise
1356 except:
1357 LOG('CMFActivity', WARNING,
1358 'could not drop the message queue table',
1359 error=sys.exc_info())
1360 folder.SQLQueue_createMessageTable()
1361
1362 # Reactivate the messages.
1363 for activity, message_list in message_list_dict.iteritems():
1364 for m in message_list:
1365 try:
1366 m.reactivate(aq_inner(self), activity=activity)
1367 except ConflictError:
1368 raise
1369 except:
1370 LOG('ActivityTool', WARNING,
1371 'could not reactivate the message %r, %r' %
1372 (m.object_path, m.method_id), error=sys.exc_info())
1373
1374 if REQUEST is not None:
1375 message = 'Activities%20Cleared'
1376 if keep:
1377 message = 'Tables%20Recreated'
1378 return REQUEST.RESPONSE.redirect(
1379 '%s/manageActivitiesAdvanced?manage_tabs_message=%s' % (
1380 self.absolute_url(), message))
1381
1382 security.declarePublic('getMessageList')
1383 def getMessageList(self,**kw):
1384 """
1385 List messages waiting in queues
1386 """
1387 # Initialize if needed
1388 if not is_initialized:
1389 self.initialize()
1390
1391 message_list = []
1392 for activity in activity_dict.itervalues():
1393 try:
1394 message_list += activity.getMessageList(aq_inner(self),**kw)
1395 except AttributeError:
1396 LOG('getMessageList, could not get message from Activity:',0,activity)
1397 return message_list
1398
1399 security.declarePublic('countMessageWithTag')
1400 def countMessageWithTag(self, value):
1401 """
1402 Return the number of messages which match the given tag.
1403 """
1404 message_count = 0
1405 for activity in activity_dict.itervalues():
1406 message_count += activity.countMessageWithTag(aq_inner(self), value)
1407 return message_count
1408
1409 security.declarePublic('countMessage')
1410 def countMessage(self, **kw):
1411 """
1412 Return the number of messages which match the given parameter.
1413
1414 Parameters allowed:
1415
1416 method_id : the id of the method
1417 path : for activities on a particular object
1418 tag : activities with a particular tag
1419 message_uid : activities with a particular uid
1420 """
1421 message_count = 0
1422 for activity in activity_dict.itervalues():
1423 message_count += activity.countMessage(aq_inner(self), **kw)
1424 return message_count
1425
1426 security.declareProtected( CMFCorePermissions.ManagePortal , 'newActiveProcess' )
1427 def newActiveProcess(self, **kw):
1428 from ActiveProcess import addActiveProcess
1429 new_id = str(self.generateNewId())
1430 return addActiveProcess(self, new_id, **kw)
1431
1432 # Active synchronisation methods
1433 security.declarePrivate('validateOrder')
1434 def validateOrder(self, message, validator_id, validation_value):
1435 message_list = self.getDependentMessageList(message, validator_id, validation_value)
1436 return len(message_list) > 0
1437
1438 security.declarePrivate('getDependentMessageList')
1439 def getDependentMessageList(self, message, validator_id, validation_value):
1440 if not is_initialized:
1441 self.initialize()
1442 message_list = []
1443 method_id = "_validate_%s" % validator_id
1444 for activity in activity_dict.itervalues():
1445 method = getattr(activity, method_id, None)
1446 if method is not None:
1447 result = method(aq_inner(self), message, validation_value)
1448 if result:
1449 message_list.extend([(activity, m) for m in result])
1450 return message_list
1451
1452 # Required for tests (time shift)
1453 def timeShift(self, delay):
1454 if not is_initialized:
1455 self.initialize()
1456 for activity in activity_dict.itervalues():
1457 activity.timeShift(aq_inner(self), delay)
1458
1459 InitializeClass(ActivityTool)