Scheduler - runs things concurrentlyThe Scheduler runs active microprocesses - giving a regular timeslice to each. It also provides the ability to pause and wake them; allowing an Axon based system to play nicely and relinquish the cpu when idle.
Using the schedulerThe simplest way is to just use the default scheduler scheduler.run. Simply activate components or microprocesses then call the runThreads() method of the scheduler: from Axon.Scheduler import scheduler from MyComponents import MyComponent, AnotherComponent c1 = MyComponent().activate() c2 = MyComponent().activate() c3 = AnotherComponent().activate() scheduler.run.runThreads() Alternatively you can create a specific scheduler instance, and activate them using that specific scheduler: mySched = scheduler() c1 = MyComponent().activate(Scheduler=mySched) c2 = MyComponent().activate(Scheduler=mySched) c3 = AnotherComponent().activate(Scheduler=mySched) mySched.runThreads() The runThreads() method is the way of bootstrapping the scheduler. Being a microprocess, it needs something to schedule it! The runThreads() method does exactly that. The activate() method is fully thread-safe. It can handle multiple simultaneous callers from different threads to the one the scheduler is running in. Pausing and Waking microprocessesThe Scheduler supports the ability to, in a thread safe manner, pause and wake individual microprocesses under its control. Because it is thread safe, any thread of execution can issue pause and wake requests for any scheduled microprocess. The pauseThread() and wakeThread() methods submit requests to pause or wake microprocesses. The scheduler will process these when it is next able to - the requests are queued rather than processed immediately. This is done to ensure thread safety. It can handle multiple simultaneous callers from different threads to the one the scheduler is running in. Pausing a microprocess means the scheduler removes it from its 'run queue'. This means that it no longer executes that microprocess. Waking it puts it back into the 'run queue'. If no microprocesses are awake then the scheduler relinquishes cpu usage by blocking. If however this scheduler is itself being scheduled by another microprocess then it does not block. Ideally it should ask its scheduler to pause it, but instead it busy-waits - self pausing functionality is not yet implemented. 'yielding' new components for activation and replacement generatorsIn general, the main() generator in a microprocess (its thread of execution) can return any values it likes when it uses the yield statement. It is recommended to not yield zeros or other kinds of 'false' value as these are reserved for possible future special meaning. However, this scheduler does understand certain values that can be yielded:
What happens when a microprocess finishes?The scheduler will stop running it! It will call the microprocess's stop() method. It will also call the _closeDownMicroprocess() method and will act on the return value if it is one of the following:
Querying the scheduler (Introspection)The listAllThreads() method returns a list of all activated microprocesses - both paused and awake. The isThreadPaused() method lets you determine if an individual microprocess is paused. Note that the result returned by this method is conservative (the default assumption is that a thread is probably awake). the result will vary depending on the exact moment it is called! Both these methods are thread safe. Slowing down execution (for debugging)It also has a slow motion mode designed to help with debugging & testing. Call runThreads() with the slowmo argument set to the number of seconds the scheduler should pause after each cycle of executing all microprocesses. For example, to wait half a second after each cycle of execution: scheduler.run.runThreads(slowmo=0.5) How does it work internally?The scheduler keeps the following internal state:
The scheduler uses a simple round robin approach - it walks through its run queue and calls the next() method of each microprocess in turn. As it goes, it builds a new run queue, ready for the next cycle. If a microprocess terminates (raises a StopIteration exception) then it is not included in the next cycle's run queue. After it has gone through all microprocesses, the scheduler then processes messages in its wakeRequests and sleepRequests queues. Sleep requests are processed first; then wake requests second. Suppose there is a sleep and wake request queued for the same microprocess; should it be left awake or put to sleep? By processing wake requests last, the scheduler can err on the side of caution and prefer to leave it awake. Microprocesses are all in one of three possible states (recorded in the threads dictionary):
A request to put a microprocess to sleep is handled as follows:
Wake requests are used to both wake up sleeping microprocesses and also to activate new ones. A request to wake a microprocess is handled like this:
If the request contains a flag indicating that this is actually an activation request, then this also happens:
This three state system is a performance optimisation: it means that the scheduler does not need to waste time searching through the next run queue to remove items - they simply get removed on the next cycle of execution. Wake requests and sleep requests are handled through thread-safe queues. This enables other threads of execution (eg. threaded components) to safely make requests to wake or pause components. Test documentationTests passed:
Axon.Scheduler._sortAxon.Scheduler.schedulerclass scheduler(Axon.Microprocess.microprocess)
Scheduler - runs microthreads of control.
Methods defined here__init__(self, **argd)Creates a scheduler object. If scheduler.run has not been set, sets it. Class initialisation ensures that this object/class attribute is initialised - client modules always have access to a standalone scheduler. Internal attributes:
Whilst there can be more than one scheduler active in the general case you will NOT want to create a custom scheduler. _addThread(self, mprocess)A Microprocess adds itself to the runqueue using this method, using the mannerism scheduler.run._addThread(). Generally component writers should not use this method to activate a component - use the component's own activate() method instead. isThreadPaused(self, mprocess)Returns True if the specified microprocess is sleeping, or the scheduler does not know about it. listAllThreads(self)Returns a list of all microprocesses (both active and sleeping) main(self[, slowmo][, canblock])main([slowmo][,canblock]) - Scheduler main loop generator Each cycle through this generator does two things: * one pass through all active microprocesses, giving executing them. * processing of wake/sleep requests You can optionally slow down execution to aid debugging. You can also allow the scheduler to block if there are no active, awake microprocesses. Keyword arguments:
slowmo specifies a delay (in seconds) before the main loop is run. slowmo defaults to 0. If canblock is True, this generator will briefly) block if there are no active microprocesses, otherwise it will return immediately (default). This generator terminates when there are no microprocesses left (either sleeping or awake) because they've all terminated. (or because there were none to begin with!) pauseThread(self, mprocess)pauseThread(mprocess) - request to put a mprocess to sleep. If active, or already sleeping, the specified microprocess will be put to leep on the next cycle through the scheduler. runThreads(self[, slowmo])Runs the scheduler until there are no activated microprocesses left (they've all terminated). Think of this as bootstrapping the scheduler - after all it is a microprocess like any other, so needs something to run it! Keyword arguments:
wakeThread(self, mprocess[, canActivate])Request to wake a sleeping mprocess, or activate a new one. If sleeping or already active, the specified microprocess will be ensured to be active on the next cycle through the scheduler. If the microprocess is not running yet then it will be woken if (and only if) canActivate is set to True (the default is False). Methods inherited from Axon.Microprocess.microprocess :
FeedbackGot a problem with the documentation? Something unclear that could be clearer? Want to help improve it? Constructive criticism is very welcome - especially if you can suggest a better rewording! Please leave you feedback here in reply to the documentation thread in the Kamaelia blog. -- Automatic documentation generator, 01 Feb 2010 at 04:00:28 UTC/GMT |
Kamaelia
is an open source project originated from and guided by BBC
Research. For more information browse the site or get in
contact.
This is an ongoing community based development site. As a result the contents of this page is the opinions of the contributors of the pages involved not the organisations involved. Specificially, this page may contain personal views which are not the views of the BBC. (the site is powered by a wiki engine)
(C) Copyright 2008 Kamaelia Contributors, including the British Broadcasting Corporation, All Rights Reserved