diff --git a/mumo.py b/mumo.py index 75c0607..573f015 100755 --- a/mumo.py +++ b/mumo.py @@ -63,7 +63,7 @@ ('watchdog', int, 30), ('callback_host', str, '127.0.0.1'), ('callback_port', int, -1)), - + 'iceraw':None, 'murmur':(('servers', commaSeperatedIntegers, []),), 'system':(('pidfile', str, 'mumo.pid'),), @@ -71,13 +71,13 @@ ('file', str, 'mumo.log'))}) def load_slice(slice): - # + # #--- Loads a given slicefile, used by dynload_slice and fsload_slice # This function works around a number of differences between Ice python # versions and distributions when it comes to slice include directories. # fallback_slicedirs = ["-I" + sdir for sdir in cfg.ice.slicedirs.split(';')] - + if not hasattr(Ice, "getSliceDir"): Ice.loadSlice('-I%s %s' % (" ".join(fallback_slicedirs), slice)) else: @@ -86,7 +86,7 @@ def load_slice(slice): slicedirs = fallback_slicedirs else: slicedirs = ['-I' + slicedir] - + Ice.loadSlice('', slicedirs + [slice]) def dynload_slice(prx): @@ -139,7 +139,7 @@ def do_main_program(): initdata.properties = Ice.createProperties([], initdata.properties) for prop, val in cfg.iceraw: initdata.properties.setProperty(prop, val) - + initdata.properties.setProperty('Ice.ImplicitContext', 'Shared') initdata.properties.setProperty('Ice.Default.EncodingVersion', '1.0') initdata.logger = CustomLogger() @@ -147,80 +147,81 @@ def do_main_program(): ice = Ice.initialize(initdata) prxstr = 'Meta:tcp -h %s -p %d' % (cfg.ice.host, cfg.ice.port) prx = ice.stringToProxy(prxstr) - + if not cfg.ice.slice: dynload_slice(prx) else: fsload_slice(cfg.ice.slice) - + import Murmur - + class mumoIceApp(Ice.Application): def __init__(self, manager): Ice.Application.__init__(self) self.manager = manager - + def run(self, args): self.shutdownOnInterrupt() - + if not self.initializeIceConnection(): return 1 - + if cfg.ice.watchdog > 0: self.metaUptime = -1 self.checkConnection() - + # Serve till we are stopped self.communicator().waitForShutdown() self.watchdog.cancel() - + if self.interrupted(): warning('Caught interrupt, shutting down') - + return 0 - + def initializeIceConnection(self): """ Establishes the two-way Ice connection and adds MuMo to the configured servers """ ice = self.communicator() - + if cfg.ice.secret: debug('Using shared ice secret') ice.getImplicitContext().put("secret", cfg.ice.secret) else: warning('Consider using an ice secret to improve security') - + info('Connecting to Ice server (%s:%d)', cfg.ice.host, cfg.ice.port) base = ice.stringToProxy(prxstr) - self.meta =Murmur.MetaPrx.uncheckedCast(base) - + self.meta = Murmur.MetaPrx.uncheckedCast(base) + if cfg.ice.callback_port > 0: cbp = ' -p %d' % cfg.ice.callback_port else: cbp = '' - + adapter = ice.createObjectAdapterWithEndpoints('Callback.Client', 'tcp -h %s%s' % (cfg.ice.callback_host, cbp)) adapter.activate() self.adapter = adapter - + self.manager.setClientAdapter(adapter) + metacbprx = adapter.addWithUUID(metaCallback(self)) self.metacb = Murmur.MetaCallbackPrx.uncheckedCast(metacbprx) - + return self.attachCallbacks() - + def attachCallbacks(self): """ Attaches all callbacks """ - + # Ice.ConnectionRefusedException debug('Attaching callbacks') try: info('Attaching meta callback') self.meta.addCallback(self.metacb) - + for server in self.meta.getBootedServers(): sid = server.id() if not cfg.murmur.servers or sid in cfg.murmur.servers: @@ -228,7 +229,7 @@ def attachCallbacks(self): servercbprx = self.adapter.addWithUUID(serverCallback(self.manager, server, sid)) servercb = Murmur.ServerCallbackPrx.uncheckedCast(servercbprx) server.addCallback(servercb) - + except (Murmur.InvalidSecretException, Ice.UnknownUserException, Ice.ConnectionRefusedException), e: if isinstance(e, Ice.ConnectionRefusedException): error('Server refused connection') @@ -238,15 +239,15 @@ def attachCallbacks(self): else: # We do not actually want to handle this one, re-raise it raise e - + self.connected = False self.manager.announceDisconnected() return False - + self.connected = True self.manager.announceConnected(self.meta) return True - + def checkConnection(self): """ Tries to retrieve the server uptime to determine wheter the server is @@ -255,7 +256,7 @@ def checkConnection(self): #debug('Watchdog run') try: uptime = self.meta.getUptime() - if self.metaUptime > 0: + if self.metaUptime > 0: # Check if the server didn't restart since we last checked, we assume # since the last time we ran this check the watchdog interval +/- 5s # have passed. This should be replaced by implementing a Keepalive in @@ -263,17 +264,17 @@ def checkConnection(self): if not ((uptime - 5) <= (self.metaUptime + cfg.ice.watchdog) <= (uptime + 5)): # Seems like the server restarted, re-attach the callbacks self.attachCallbacks() - + self.metaUptime = uptime except Ice.Exception, e: error('Connection to server lost, will try to reestablish callbacks in next watchdog run (%ds)', cfg.ice.watchdog) debug(str(e)) self.attachCallbacks() - + # Renew the timer self.watchdog = Timer(cfg.ice.watchdog, self.checkConnection) self.watchdog.start() - + def checkSecret(func): """ Decorator that checks whether the server transmitted the right secret @@ -281,28 +282,28 @@ def checkSecret(func): """ if not cfg.ice.secret: return func - + def newfunc(*args, **kws): if 'current' in kws: current = kws["current"] else: current = args[-1] - + if not current or 'secret' not in current.ctx or current.ctx['secret'] != cfg.ice.secret: error('Server transmitted invalid secret. Possible injection attempt.') raise Murmur.InvalidSecretException() - + return func(*args, **kws) - + return newfunc - + def fortifyIceFu(retval=None, exceptions=(Ice.Exception,)): """ Decorator that catches exceptions,logs them and returns a safe retval value. This helps to prevent getting stuck in critical code paths. Only exceptions that are instances of classes given in the exceptions list are not caught. - + The default is to catch all non-Ice exceptions. """ def newdec(func): @@ -315,13 +316,13 @@ def newfunc(*args, **kws): if isinstance(e, ex): catch = False break - + if catch: critical('Unexpected exception caught') exception(e) return retval raise - + return newfunc return newdec @@ -329,7 +330,7 @@ class metaCallback(Murmur.MetaCallback): def __init__(self, app): Murmur.MetaCallback.__init__(self) self.app = app - + @fortifyIceFu() @checkSecret def started(self, server, current=None): @@ -344,20 +345,20 @@ def started(self, server, current=None): servercbprx = self.app.adapter.addWithUUID(serverCallback(self.app.manager, server, sid)) servercb = Murmur.ServerCallbackPrx.uncheckedCast(servercbprx) server.addCallback(servercb) - + # Apparently this server was restarted without us noticing except (Murmur.InvalidSecretException, Ice.UnknownUserException), e: if hasattr(e, "unknown") and e.unknown != "Murmur::InvalidSecretException": # Special handling for Murmur 1.2.2 servers with invalid slice files raise e - + error('Invalid ice secret') return else: debug('Virtual server %d got started', sid) - + self.app.manager.announceMeta(sid, "started", server, current) - + @fortifyIceFu() @checkSecret def stopped(self, server, current=None): @@ -378,10 +379,10 @@ def stopped(self, server, current=None): except Ice.ConnectionRefusedException: self.app.connected = False self.app.manager.announceDisconnected() - + debug('Server shutdown stopped a virtual server') - - + + def forwardServer(fu): def new_fu(self, *args, **kwargs): self.manager.announceServer(self.sid, fu.__name__, self.server, *args, **kwargs) @@ -393,12 +394,12 @@ def __init__(self, manager, server, sid): self.manager = manager self.sid = sid self.server = server - + # Hack to prevent every call to server.id() from the client callbacks # from having to go over Ice def id_replacement(): return self.sid - + server.id = id_replacement @checkSecret @@ -412,7 +413,7 @@ def userDisconnected(self, u, current=None): pass def userConnected(self, u, current=None): pass @checkSecret @forwardServer - def channelCreated(self, c, current=None): pass + def channelCreated(self, c, current=None): pass @checkSecret @forwardServer def channelRemoved(self, c, current=None): pass @@ -422,56 +423,56 @@ def channelStateChanged(self, c, current=None): pass @checkSecret @forwardServer def userTextMessage(self, u, m, current=None) : pass - - class contextCallback(Murmur.ServerContextCallback): - def __init__(self, manager, server, sid): + + class customContextCallback(Murmur.ServerContextCallback): + def __init__(self, contextActionCallback, *ctx): Murmur.ServerContextCallback.__init__(self) - self.manager = manager - self.server = server - self.sid = sid - + self.cb = contextActionCallback + self.ctx = ctx + @checkSecret - def contextAction(self, action, p, session, chanid, current=None): - self.manager.announceContext(self.sid, "contextAction", self.server, action, p, session, chanid, current) + def contextAction(self, *args, **argv): + # (action, user, target_session, target_chanid, current=None) + self.cb(*(self.ctx + args), **argv) # #--- Start of moderator # info('Starting mumble moderator') debug('Initializing manager') - manager = MumoManager(Murmur) + manager = MumoManager(Murmur, customContextCallback) manager.start() manager.loadModules() manager.startModules() - - debug("Initializing mumoIceApp") + + debug("Initializing mumoIceApp") app = mumoIceApp(manager) state = app.main(sys.argv[:1], initData=initdata) - + manager.stopModules() manager.stop() info('Shutdown complete') return state - + class CustomLogger(Ice.Logger): """ Logger implementation to pipe Ice log messages into our own log """ - + def __init__(self): Ice.Logger.__init__(self) self._log = getLogger('Ice') - + def _print(self, message): self._log.info(message) - + def trace(self, category, message): self._log.debug('Trace %s: %s', category, message) - + def warning(self, message): self._log.warning(message) - + def error(self, message): self._log.error(message) @@ -492,11 +493,11 @@ def error(self, message): parser.add_option('-a', '--app', action='store_true', dest='force_app', help='do not run as daemon', default=False) (option, args) = parser.parse_args() - + if option.force_daemon and option.force_app: parser.print_help() sys.exit(1) - + # Load configuration try: cfg = Config(option.ini, default) @@ -504,7 +505,7 @@ def error(self, message): print >> sys.stderr, 'Fatal error, could not load config file from "%s"' % cfgfile print >> sys.stderr, e sys.exit(1) - + # Initialise logger if cfg.log.file: try: @@ -515,19 +516,19 @@ def error(self, message): sys.exit(1) else: logfile = logging.sys.stderr - - + + if option.verbose: level = cfg.log.level else: level = logging.ERROR - + logging.basicConfig(level=level, format='%(asctime)s %(levelname)s %(name)s %(message)s', stream=logfile) - + # As the default try to run as daemon. Silently degrade to running as a normal application if this fails - # unless the user explicitly defined what he expected with the -a / -d parameter. + # unless the user explicitly defined what he expected with the -a / -d parameter. try: if option.force_app: raise ImportError # Pretend that we couldn't import the daemon lib diff --git a/mumo_manager.py b/mumo_manager.py index 61b2ac1..6a677d5 100644 --- a/mumo_manager.py +++ b/mumo_manager.py @@ -34,6 +34,7 @@ from config import Config import sys import os +import uuid class FailedLoadModuleException(Exception): pass @@ -54,7 +55,7 @@ def new_fu(*args, **kwargs): log = self.log() skwargs = ','.join(['%s=%s' % (karg,repr(arg)) for karg, arg in kwargs]) sargs = ','.join([str(arg) for arg in args[1:]]) + '' if not skwargs else (',' + str(skwargs)) - + call = "%s(%s)" % (fu.__name__, sargs) log.debug(call) res = fu(*args, **kwargs) @@ -63,7 +64,7 @@ def new_fu(*args, **kwargs): return new_fu if enable else fu return new_dec - + debug_me = True @@ -74,129 +75,201 @@ class MumoManagerRemote(object): can register/unregister to/from callbacks as well as do other signaling to the master MumoManager. """ - + SERVERS_ALL = [-1] ## Applies to all servers - + def __init__(self, master, name, queue): self.__master = master self.__name = name self.__queue = queue - + + self.__context_callbacks = {} # server -> action -> callback + def getQueue(self): return self.__queue - + def subscribeMetaCallbacks(self, handler, servers = SERVERS_ALL): """ Subscribe to meta callbacks. Subscribes the given handler to the following callbacks: - + >>> started(self, server, context = None) >>> stopped(self, server, context = None) - + @param servers: List of server IDs for which to subscribe. To subscribe to all servers pass SERVERS_ALL. - @param handler: Object on which to call the callback functions + @param handler: Object on which to call the callback functions """ return self.__master.subscribeMetaCallbacks(self.__queue, handler, servers) - + def unsubscribeMetaCallbacks(self, handler, servers = SERVERS_ALL): """ Unsubscribe from meta callbacks. Unsubscribes the given handler from callbacks for the given servers. - + @param servers: List of server IDs for which to unsubscribe. To unsubscribe from all servers pass SERVERS_ALL. @param handler: Subscribed handler """ return self.__master.unscubscribeMetaCallbacks(self.__queue, handler, servers) - + def subscribeServerCallbacks(self, handler, servers = SERVERS_ALL): """ Subscribe to server callbacks. Subscribes the given handler to the following callbacks: - - >>> userConnected(self, state, context = None) + + >>> userConnected(self, state, context = None) >>> userDisconnected(self, state, context = None) >>> userStateChanged(self, state, context = None) >>> channelCreated(self, state, context = None) >>> channelRemoved(self, state, context = None) >>> channelStateChanged(self, state, context = None) - + @param servers: List of server IDs for which to subscribe. To subscribe to all servers pass SERVERS_ALL. - @param handler: Object on which to call the callback functions + @param handler: Object on which to call the callback functions """ return self.__master.subscribeServerCallbacks(self.__queue, handler, servers) - + def unsubscribeServerCallbacks(self, handler, servers = SERVERS_ALL): """ Unsubscribe from server callbacks. Unsubscribes the given handler from callbacks for the given servers. - + @param servers: List of server IDs for which to unsubscribe. To unsubscribe from all servers pass SERVERS_ALL. @param handler: Subscribed handler """ return self.__master.unsubscribeServerCallbacks(self.__queue, handler, servers) - - def subscribeContextCallbacks(self, handler, servers = SERVERS_ALL): + + def getUniqueAction(self): """ - Subscribe to context callbacks. Subscribes the given handler to the following - callbacks: - - >>> contextAction(self, action, user, session, channelid, context = None) - - @param servers: List of server IDs for which to subscribe. To subscribe to all - servers pass SERVERS_ALL. - @param handler: Object on which to call the callback functions + Returns a unique action string that can be used in addContextMenuEntry. + + :return: Unique action string """ - return self.__master.subscribeContextCallbacks(self.__queue, handler, servers) - - def unsubscribeContextCallbacks(self, handler, servers = SERVERS_ALL): + return str(uuid.uuid4()) + + def addContextMenuEntry(self, server, user, action, text, handler, context): """ - Unsubscribe from context callbacks. Unsubscribes the given handler from callbacks - for the given servers. - - @param servers: List of server IDs for which to unsubscribe. To unsubscribe from all - servers pass SERVERS_ALL. - @param handler: Subscribed handler + Adds a new context callback menu entry with the given text for the given user. + + You can use the same action identifier for multiple users entries to + simplify your handling. However make sure an action identifier is unique + to your module. The easiest way to achieve this is to use getUniqueAction + to generate a guaranteed unique one. + + Your handler should be of form: + >>> handler(self, server, action, user, target) + + Here server is the server the user who triggered the action resides on. + Target identifies what the context action was invoked on. It can be either + a User, Channel or None. + + @param server: Server the user resides on + @param user: User to add entry for + @param action: Action identifier passed to your callback (see above) + @param text: Text for the menu entry + @param handler: Handler function to call when the menu item is used + @param context: Contexts to show entry in (can be a combination of ContextServer, ContextChannel and ContextUser) + """ + + server_actions = self.__context_callbacks.get(server.id()) + if not server_actions: + server_actions = {} + self.__context_callbacks[server.id()] = server_actions + + action_cb = server_actions.get(action) + if not action_cb: + # We need to create an register a new context callback + action_cb = self.__master.createContextCallback(self.__handle_context_callback, handler, server) + server_actions[action] = action_cb + + server.addContextCallback(user.session, action, text, action_cb, context) + + def __handle_context_callback(self, handler, server, action, user, target_session, target_channelid, current=None): + """ + Small callback wrapper for context menu operations. + + Translates the given target into the corresponding object and + schedules a call to the actual user context menu handler which + will be executed in the modules thread. """ - return self.__master.unsubscribeContextCallbacks(self.__queue, handler, servers) - + + if target_session != 0: + target = server.getState(target_session) + elif target_channelid != -1: + target = server.getChannelState(target_channelid) + else: + target = None + + # Schedule a call to the handler + self.__queue.put((None, handler, [server, action, user, target], {})) + + def removeContextMenuEntry(self, server, action): + """ + Removes a previously created context action callback from a server. + + Applies to all users that share the action on this server. + + @param server Server the action should be removed from. + @param action Action to remove + """ + + try: + cb = self.__context_callbacks[server.id()].pop(action) + except KeyError: + # Nothing to unregister + return + + server.removeContextCallback(cb) + def getMurmurModule(self): """ Returns the Murmur module generated from the slice file """ return self.__master.getMurmurModule() - + def getMeta(self): """ Returns the connected servers meta module or None if it is not available """ return self.__master.getMeta() - + class MumoManager(Worker): MAGIC_ALL = -1 cfg_default = {'modules':(('mod_dir', str, "modules/"), ('cfg_dir', str, "modules-enabled/"), ('timeout', int, 2))} - - def __init__(self, murmur, cfg = Config(default = cfg_default)): + + def __init__(self, murmur, context_callback_type, cfg = Config(default = cfg_default)): Worker.__init__(self, "MumoManager") self.queues = {} # {queue:module} self.modules = {} # {name:module} self.imports = {} # {name:import} self.cfg = cfg - + self.murmur = murmur self.meta = None - + self.client_adapter = None + self.metaCallbacks = {} # {sid:{queue:[handler]}} self.serverCallbacks = {} - self.contextCallbacks = {} - + + self.context_callback_type = context_callback_type + + def setClientAdapter(self, client_adapter): + """ + Sets the ice adapter used for client-side callbacks. This is needed + in case per-module callbacks have to be attached during run-time + as is the case for context callbacks. + + :param client_adapter: Ice object adapter + """ + self.client_adapter = client_adapter + def __add_to_dict(self, mdict, queue, handler, servers): for server in servers: if server in mdict: @@ -207,32 +280,32 @@ def __add_to_dict(self, mdict, queue, handler, servers): mdict[server][queue] = [handler] else: mdict[server] = {queue:[handler]} - + def __rem_from_dict(self, mdict, queue, handler, servers): for server in servers: try: mdict[server][queue].remove(handler) except KeyError, ValueError: pass - + def __announce_to_dict(self, mdict, server, function, *args, **kwargs): """ Call function on handlers for specific servers in one of our handler dictionaries. - + @param mdict Dictionary to announce to @param server Server to announce to, ALL is always implied @param function Function the handler should call @param args Arguments for the function @param kwargs Keyword arguments for the function """ - + # Announce to all handlers of the given serverlist if server == self.MAGIC_ALL: servers = mdict.iterkeys() else: servers = [self.MAGIC_ALL, server] - + for server in servers: try: for queue, handlers in mdict[server].iteritems(): @@ -241,7 +314,7 @@ def __announce_to_dict(self, mdict, server, function, *args, **kwargs): except KeyError: # No handler registered for that server pass - + def __call_remote(self, queue, handler, function, *args, **kwargs): try: func = getattr(handler, function) # Find out what to call on target @@ -257,11 +330,11 @@ def __call_remote(self, queue, handler, function, *args, **kwargs): else: self.log().exception(e) - + # #-- Module multiplexing functionality # - + @local_thread def announceConnected(self, meta = None): """ @@ -270,7 +343,7 @@ def announceConnected(self, meta = None): self.meta = meta for queue, module in self.queues.iteritems(): self.__call_remote(queue, module, "connected") - + @local_thread def announceDisconnected(self): """ @@ -283,41 +356,30 @@ def announceDisconnected(self): def announceMeta(self, server, function, *args, **kwargs): """ Call a function on the meta handlers - + @param server Server to announce to @param function Name of the function to call on the handler @param args List of arguments @param kwargs List of keyword arguments """ self.__announce_to_dict(self.metaCallbacks, server, function, *args, **kwargs) - + @local_thread def announceServer(self, server, function, *args, **kwargs): """ Call a function on the server handlers - - @param server Server to announce to - @param function Name of the function to call on the handler - @param args List of arguments - @param kwargs List of keyword arguments - """ - self.__announce_to_dict(self.serverCallbacks, server, function, *args, **kwargs) - - @local_thread - def announceContext(self, server, function, *args, **kwargs): - """ - Call a function on the context handlers - + @param server Server to announce to @param function Name of the function to call on the handler @param args List of arguments @param kwargs List of keyword arguments """ self.__announce_to_dict(self.serverCallbacks, server, function, *args, **kwargs) + # #--- Module self management functionality # - + @local_thread def subscribeMetaCallbacks(self, queue, handler, servers): """ @@ -325,7 +387,7 @@ def subscribeMetaCallbacks(self, queue, handler, servers): @see MumoManagerRemote """ return self.__add_to_dict(self.metaCallbacks, queue, handler, servers) - + @local_thread def unsubscribeMetaCallbacks(self, queue, handler, servers): """ @@ -333,7 +395,7 @@ def unsubscribeMetaCallbacks(self, queue, handler, servers): @see MumoManagerRemote """ return self.__rem_from_dict(self.metaCallbacks, queue, handler, servers) - + @local_thread def subscribeServerCallbacks(self, queue, handler, servers): """ @@ -341,7 +403,7 @@ def subscribeServerCallbacks(self, queue, handler, servers): @see MumoManagerRemote """ return self.__add_to_dict(self.serverCallbacks, queue, handler, servers) - + @local_thread def unsubscribeServerCallbacks(self, queue, handler, servers): """ @@ -349,39 +411,34 @@ def unsubscribeServerCallbacks(self, queue, handler, servers): @see MumoManagerRemote """ return self.__rem_from_dict(self.serverCallbacks, queue, handler, servers) - - @local_thread - def subscribeContextCallbacks(self, queue, handler, servers): - """ - @param queue Target worker queue - @see MumoManagerRemote - """ - - #TODO: Implement context callbacks - self.log().error("Context callbacks not implemented at this point") - - return self.__add_to_dict(self.contextCallbacks, queue, handler, servers) - - @local_thread - def unsubscribeContextCallbacks(self, queue, handler, servers): - """ - @param queue Target worker queue - @see MumoManagerRemote - """ - return self.__rem_from_dict(self.contextCallbacks, queue, handler, servers) def getMurmurModule(self): """ Returns the Murmur module generated from the slice file """ return self.murmur - + + def createContextCallback(self, callback, *ctx): + """ + Creates a new context callback handler class instance. + + @param callback Callback to set for handler + @param *ctx Additional context parameters passed to callback + before the actual parameters. + @return Murmur ServerContextCallbackPrx object for the context + callback handler class. + """ + contextcbprx = self.client_adapter.addWithUUID(self.context_callback_type(callback, *ctx)) + contextcb = self.murmur.ServerContextCallbackPrx.uncheckedCast(contextcbprx) + + return contextcb + def getMeta(self): """ Returns the connected servers meta module or None if it is not available """ return self.meta - + #--- Module load/start/stop/unload functionality # @local_thread_blocking @@ -389,39 +446,39 @@ def getMeta(self): def loadModules(self, names = None): """ Loads a list of modules from the mumo directory structure by name. - + @param names List of names of modules to load @return: List of modules loaded """ loadedmodules = {} - + if not names: # If no names are given load all modules that have a configuration in the cfg_dir if not os.path.isdir(self.cfg.modules.cfg_dir): msg = "Module configuration directory '%s' not found" % self.cfg.modules.cfg_dir self.log().error(msg) raise FailedLoadModuleImportException(msg) - + names = [] for f in os.listdir(self.cfg.modules.cfg_dir): if os.path.isfile(self.cfg.modules.cfg_dir + f): base, ext = os.path.splitext(f) if not ext or ext.lower() == ".ini" or ext.lower() == ".conf": names.append(base) - + for name in names: try: modinst = self._loadModule_noblock(name) loadedmodules[name] = modinst except FailedLoadModuleException: pass - + return loadedmodules - + @local_thread_blocking def loadModuleCls(self, name, modcls, module_cfg = None): return self._loadModuleCls_noblock(name, modcls, module_cfg) - + @debug_log(debug_me) def _loadModuleCls_noblock(self, name, modcls, module_cfg = None): log = self.log() @@ -429,10 +486,10 @@ def _loadModuleCls_noblock(self, name, modcls, module_cfg = None): if name in self.modules: log.error("Module '%s' already loaded", name) return - + modqueue = Queue.Queue() modmanager = MumoManagerRemote(self, name, modqueue) - + try: modinst = modcls(name, modmanager, module_cfg) except Exception, e: @@ -440,40 +497,40 @@ def _loadModuleCls_noblock(self, name, modcls, module_cfg = None): log.error(msg) log.exception(e) raise FailedLoadModuleInitializationException(msg) - + # Remember it self.modules[name] = modinst self.queues[modqueue] = modinst - + return modinst - + @local_thread_blocking def loadModule(self, name): """ Loads a single module either by name - + @param name Name of the module to load @return Module instance """ self._loadModule_noblock(name) - + @debug_log(debug_me) - def _loadModule_noblock(self, name): + def _loadModule_noblock(self, name): # Make sure this module is not already loaded log = self.log() log.debug("loadModuleByName('%s')", name) - + if name in self.modules: log.warning("Tried to load already loaded module %s", name) return - + # Check whether there is a configuration file for this module confpath = self.cfg.modules.cfg_dir + name + '.ini' if not os.path.isfile(confpath): msg = "Module configuration file '%s' not found" % confpath log.error(msg) raise FailedLoadModuleConfigException(msg) - + # Make sure the module directory is in our python path and exists if not self.cfg.modules.mod_dir in sys.path: if not os.path.isdir(self.cfg.modules.mod_dir): @@ -481,7 +538,7 @@ def _loadModule_noblock(self, name): log.error(msg) raise FailedLoadModuleImportException(msg) sys.path.insert(0, self.cfg.modules.mod_dir) - + # Import the module and instanciate it try: mod = __import__(name) @@ -490,7 +547,7 @@ def _loadModule_noblock(self, name): msg = "Failed to import module '%s', reason: %s" % (name, str(e)) log.error(msg) raise FailedLoadModuleImportException(msg) - + try: try: modcls = mod.mumo_module_class # First check if there's a magic mumo_module_class variable @@ -503,23 +560,23 @@ def _loadModule_noblock(self, name): raise FailedLoadModuleInitializationException(msg) return self._loadModuleCls_noblock(name, modcls, confpath) - + @local_thread_blocking @debug_log(debug_me) def startModules(self, names = None): """ Start a module by name - + @param names List of names of modules to start @return A dict of started module names and instances """ log = self.log() startedmodules = {} - + if not names: # If no names are given start all models names = self.modules.iterkeys() - + for name in names: try: modinst = self.modules[name] @@ -531,9 +588,9 @@ def startModules(self, names = None): startedmodules[name] = modinst except KeyError: log.error("Could not start unknown module '%s'", name) - + return startedmodules - + @local_thread_blocking @debug_log(debug_me) def stopModules(self, names = None, force = False): @@ -541,18 +598,18 @@ def stopModules(self, names = None, force = False): Stop a list of modules by name. Note that this only works for well behaved modules. At this point if a module is really going rampant you will have to restart mumo. - + @param names List of names of modules to unload @param force Unload the module asap dropping messages queued for it @return A dict of stopped module names and instances """ log = self.log() stoppedmodules = {} - + if not names: # If no names are given start all models names = self.modules.iterkeys() - + for name in names: try: modinst = self.modules[name] @@ -560,7 +617,7 @@ def stopModules(self, names = None, force = False): except KeyError: log.warning("Asked to stop unknown module '%s'", name) continue - + if force: # We will have to drain the modules queues for queue, module in self.queues.iteritems(): @@ -575,13 +632,17 @@ def stopModules(self, names = None, force = False): log.debug("Module '%s' is being stopped", name) else: log.debug("Module '%s' already stopped", name) - + for modinst in stoppedmodules.itervalues(): modinst.join(timeout = self.cfg.modules.timeout) - + return stoppedmodules - + def stop(self, force = True): + """ + Stops all modules and shuts down the manager. + """ self.log().debug("Stopping") self.stopModules() Worker.stop(self, force) +