Startup and Background Tasks in Chandler

Running Code at Application Startup

The osaf.startup module provides an API for invoking repository items when the application has been started. For our examples, we'll be using a null repository view, rv:

>>> from repository.persistence.RepositoryView import NullRepositoryView
>>> rv = NullRepositoryView()

In order to run code at application startup, you simply create items of type osaf.startup.Startup in your parcel. Each item's invoke attribute names a function (or non-Item class) that should be invoked at startup. For our example, we'll use sys.stdout.write as the function we want to call:

>>> from osaf.startup import Startup
>>> test = Startup("test", invoke="sys.stdout.write", view = rv)

When run_startup() is called, the function or class named by the invoke attribute of each Startup item will be called with the corresponding item as its only parameter. In our example, this will cause the test object to be written to sys.stdout:

>>> from osaf.startup import run_startup
>>> run_startup(rv)
<Startup (new): test ...>

Please note that startup invocation is intended for tasks like adding servers to the Twisted reactor, starting threads, or other in-process Python operations, based on data in the repository. It should not be used to modify repository items, as this may indirectly lead to merge conflicts with other threads. Any needed repository items should be created or modified via your parcel's installParcel() hook, rather than via startup items.

Disabling Startups

Individual startups can be enabled or disabled using their active boolean attribute:

>>> test.active = False
>>> run_startup(rv)     # Item is disabled, so nothing happens

Startup Subclasses

You can create subclasses of Startup, if your startup code needs stored configuration of some kind. Typically, you will override the onStart() method of Startup in your subclass so that it performs the desired behavior using attributes of the item. For example:

>>> from application import schema

>>> class StartupMessage(Startup):
...     message = schema.One(schema.Text, initialValue=u"Hello, world!")
...     def onStart(self):
...         print self.message.encode('utf8')

>>> hello = StartupMessage(view=rv)
>>> run_startup(rv)
Hello, world!

Notice that we did not invoke the superclass onStart() method, because we don't want the invoke attribute to take effect.

Ordered Startups

By default, there is no particular order that startup items start in. But sometimes, a startup item needs another item to be started first. The requires attribute can be used to reference other startup items that should be started first:

>>> goodbye = StartupMessage(
...     message=u"Goodbye, world!",
...     requires=[hello],
...     view=rv
... )

>>> run_startup(rv)
Hello, world!
Goodbye, world!

>>> goodbye.requires = []
>>> hello.requires = [goodbye]

>>> run_startup(rv)
Goodbye, world!
Hello, world!

Note that a startup will not run unless all of its required items have been started:

>>> goodbye.active = False
>>> run_startup(rv)     # hello needs goodbye, so neither runs here

Note that this also means that creating a requirements loop (where A requires B and B requires A) will prevent all items in the loop from starting:

>>> goodbye.active = True
>>> goodbye.requires = [hello]
>>> run_startup(rv)     # co-dependents won't be started

Threads

To run your startup code in a separate thread, you can create an item of type Thread instead of Startup. The object named by the Thread item's invoke attribute will be called in a new thread with its own repository view, but in all other respects a Thread is the same as a Startup.

Here's an example routine that we might run in a thread. It waits for a start flag to be set, then sets a finished flag and exits:

>>> start = False
>>> finished = False
>>> def my_thread(thread_item):
...     global start, finished
...     while not start: pass
...     finished = True

Normally, if you wanted to run this code in a thread at startup, you would just put it in a module, and then use its name (e.g. some_module.my_thread) as the invoke attribute of a Thread item. But for demonstration purposes, we don't want to have to put this code in a separate module, so we'll import an already existing module, and shove this routine into its namespace temporarily, so that our Thread will be able to find it:

>>> from osaf.tests import TestStartups
>>> TestStartups.my_thread = my_thread

And now we can make a Thread that will run it:

>>> from osaf.startup import Thread
>>> t = Thread(
...     "my_name_here", invoke="osaf.tests.TestStartups.my_thread", view=rv
... )
>>> t
<Thread (new): my_name_here ...>

>>> run_startup(rv)

So now, the thread has been started:

>>> import threading
>>> threading.enumerate()
[...<RepositoryThread(//userdata/my_name_here, started daemon)>...]

but it's looping, waiting for the start flag to be set:

>>> finished
False

So, let's set the start flag, and wait a moment to allow the thread to finish:

>>> start = True
>>> from time import sleep
>>> sleep(0.1)    # let the thread finish

And now the finish flag should have been set by the thread:

>>> finished
True

(Note: the sleep() duration above is 0.1 seconds because Windows machines have a fairly low resolution timer and may not actually perform the sleep() otherwise, which could break the test.)

>>> t.active = False    # don't use this thread in the subsequent tests

Starting Twisted

Chandler needs to run Twisted's reactor in a thread, so that it can run independently of the GUI (which could otherwise block its execution). The osaf.startup module provides a few APIs for this:

>>> from osaf.startup import get_reactor_thread, run_reactor, stop_reactor

get_reactor_thread() returns the threading.Thread object that the reactor is running in (assuming it was started using ReactorThread), or None:

>>> print get_reactor_thread()
None

stop_reactor() stops the reactor if it's running (regardless of what thread it's in) and waits for the reactor thread to exit, if it's running. In other words, it guarantees that reactor.running is False and get_reactor_thread() is None when it returns:

>>> stop_reactor()

run_reactor(in_thread) calls Twisted's reactor.run(), with some additional wrapper code to ensure it can run safely, only runs one reactor at a time, and runs in a separate thread (by default):

>>> run_reactor()
>>> get_reactor_thread()
<RepositoryThread(reactor, started daemon)>

And then we can verify that the reactor is in fact running when run_reactor returns:

>>> from twisted.internet import reactor
>>> reactor.running
1

And then stop it, verifying that both the reactor and thread have stopped:

>>> rt = get_reactor_thread()
>>> stop_reactor()
>>> print get_reactor_thread()
None
>>> rt.isAlive()
False
>>> reactor.running
0

Calling run_reactor() when the reactor is already running in another thread has no effect:

>>> run_reactor()
>>> rt = get_reactor_thread()

>>> run_reactor()
>>> rt is get_reactor_thread()
True

But trying to run the reactor in the current thread (by passing a False in_thread argument) when it's already running in some thread raises an error:

>>> run_reactor(False)
Traceback (most recent call last):
...
AssertionError: Reactor is already running

>>> stop_reactor()  # make sure we've stopped again for next test
>>> reactor.running
0

So, running the Twisted reactor at startup is as simple as defining a normal Startup instance to invoke run_reactor():

>>> twisted_startup = Startup(
...     "twisted", view=rv, invoke="osaf.startup.run_reactor"
... )
>>> run_startup(rv)
>>> reactor.running
1
>>> stop_reactor()
>>> twisted_startup.active = False

Note that it doesn't matter how many Startup instances start the reactor, as it will only be started once, by the first one that tries. If you have startup code that depends on the reactor, therefore, you can either define a Startup for run_reactor and reference it in another Startup, or you can just call run_reactor() in the code that needs it.

Also note that you should not invoke run_reactor() from any thread but the main thread, or you will receive an assertion error in that thread, and the reactor will not start:

>>> twisted_thread = Thread(
...     "dont_do_this", view=rv, invoke="osaf.startup.run_reactor"
... )

>>> import sys, StringIO    # trap stderr output
>>> old_stderr, sys.stderr = sys.stderr, StringIO.StringIO()

>>> run_startup(rv)
>>> reactor.running
0

>>> sleep(0.5)  # make sure exception has been sent to stderr
>>> stop_reactor()
>>> len(threading.enumerate())  # all other threads should be stopped now
1

>>> print sys.stderr.getvalue()
Exception in thread //userdata/dont_do_this:
Traceback (most recent call last):
  ...
AssertionError: can't start reactor thread except from the main thread...

>>> sys.stderr = old_stderr
>>> twisted_thread.active = False

Background Tasks using Twisted and Threads

Twisted Tasks

A TwistedTask is similar to a Thread, in that it begins an independent task with its own repository view. It is different in that all TwistedTask instances run in the same thread: the Twisted "reactor" thread. These tasks are co-operatively multitasked, by receiving callbacks from the Twisted reactor object. If you are comfortable with the Twisted API, or at least want to make use of it, you may find some advantage here over using threads.

As with Startup and Thread, a TwistedTask instance invokes the function or class named by its invoke attribute. The only difference is that the invocation occurs in the Twisted "reactor" thread, after the reactor has been started. Your startup code (function, or class __init__) should accept the TwistedTask item as its sole argument, and it should do whatever reactor setup it then needs to do, such as registering listening ports or scheduling timed callbacks.

For our example, we'll just create a task that just prints what thread it was called from:

>>> def hello_from(item):
...     print "running",item,"in",threading.currentThread()

and put it in a module so we can import it (again, we're doing this so you don't have to go read another file to see the source; in the "real world" you would just define the function in the appropriate module to start with):

>>> TestStartups.hello_from = hello_from

Now let's create the task item and run it:

>>> from osaf.startup import TwistedTask
>>> demo = TwistedTask(
...     "demo", view=rv, invoke="osaf.tests.TestStartups.hello_from"
... )

Giving it a chance to run and complete:

>>> run_startup(rv); sleep(0.1)     # give it a chance to run
running <TwistedTask ... demo ...> in <RepositoryThread(reactor,...)>

>>> stop_reactor()  # shut down the reactor again
>>> len(threading.enumerate())  # all other threads should be stopped now
1

>>> demo.active = False     # disable for next test(s)

Running Repetitive Tasks Periodically

Sometimes you don't need the full power of the Twisted API, or don't want to take the time to learn it for a quick hack. Or, even if you're familiar with the Twisted API, you'd like to have a convenient way to repeatedly run some code every N seconds or minutes. Then you can use a PeriodicTask item.

A PeriodicTask is similar to a TwistedTask, but its invoke target must be a class or factory function that returns an object with a run() method. The class or factory will be invoked in the reactor thread at startup, and the run() method will be called in its own thread repeatedly as long as it returns True from each call. If it returns False, None, or any other false value (or raises an uncaught exception) the repeated calls will cease. The interval between calls is determined by the PeriodicTask item's interval attribute, which must be set to a datetime.timedelta object.

For our example, we'll create a simple class suitable for use as a periodic task:

>>> class Counter:
...     def __init__(self, item):
...         self.item = item
...         self.count = 0
...         print "__init__",
...         hello_from(item)        # show what thread we were called from
...
...     def run(self):
...         self.count += 1
...         print "run() call", self.count,
...         hello_from(self.item)   # show what thread we were called from
...         return self.count<5     # stop when count reaches 5

And once again, we'll insert this class into the TestStartups module, so you don't have to go there to read the source. (But you should just set invoke to import from wherever your classes or functions are defined.):

>>> TestStartups.Counter = Counter

And now we'll create a PeriodicTask to create the counter and call it once per microsecond (so the test runs quickly!):

>>> from datetime import timedelta
>>> from osaf.startup import PeriodicTask

>>> counter = PeriodicTask(
...     "counter", view=rv, invoke="osaf.tests.TestStartups.Counter",
...     interval = timedelta(microseconds=1)
... )

>>> run_startup(rv); sleep(0.1)    # wait for it to run all 5 times
__init__ running ... in <RepositoryThread(reactor,...)>
run() call 1 running ... in <RepositoryThread(RepositoryPoolThread...)>
run() call 2 running ... in <RepositoryThread(RepositoryPoolThread...)>
run() call 3 running ... in <RepositoryThread(RepositoryPoolThread...)>
run() call 4 running ... in <RepositoryThread(RepositoryPoolThread...)>
run() call 5 running ... in <RepositoryThread(RepositoryPoolThread...)>

Notice that the __init__ method is called at startup time in the reactor thread, but subsequent run() calls occur in various "pool" threads managed by Twisted. It is not guaranteed that your run() method will execute in the same thread from one call to the next, so you should not rely on it being so. (We aren't showing the thread IDs in the example above, because they're not only different from one call to the next, they're also different from one test run to the next, so the test would fail if we included that info.)

Sometimes, you may want your task to run at startup, as well as after the first delay interval. You can set the run_at_startup flag to enable this:

>>> counter.run_at_startup = True

To demonstrate, we'll set the delay interval to a year:

>>> counter.interval = timedelta(days=365)
>>> run_startup(rv); sleep(0.1)
__init__ running ... in <RepositoryThread(reactor,...)>
run() call 1 running ... in <RepositoryThread(RepositoryPoolThread...)>

Notice that call 1 ran, even though there's a year between runs. However, if we turn the flag back off, it doesn't get a run() call at startup:

>>> counter.run_at_startup = True
>>> run_startup(rv); sleep(0.1)
__init__ running ... in <RepositoryThread(reactor,...)>

Finally, let's clean up after our PeriodicTask example:

>>> stop_reactor()
>>> len(threading.enumerate())  # all other threads should be stopped now
1

>>> counter.active = False      # disable for subsequent tests

Using the Repository

When running background tasks that use the repository, each logical task or individual thread needs its own repository view, so that it doesn't become confused by changes being made by other threads or tasks at the same time. TwistedTask, PeriodicTask, and Thread items handle this for you automatically, by calling the fork_item() API on an item. fork_item() opens a new repository view and returns a copy of the item that is native to the new view.

To demonstrate, let's set up a "real" (in-memory) repository, since null repository views can't be forked:

>>> from repository.persistence.DBRepository import DBRepository
>>> import os, repository
>>> rep = DBRepository('__nosuchfile__')
>>> rep.create(ramdb=True, refcounted=True)

First, we'll create an item we want to fork:

>>> anItem = schema.Item("Demo", rep.view)
>>> anItem
<Item (new): Demo ...>

Then, we'll commit the repository, because otherwise our item won't be visible in the new view:

>>> rep.view.commit()

And now we can fork the item:

>>> from osaf.startup import fork_item
>>> newItem = fork_item(anItem)
>>> newItem
<Item: Demo ...>
>>> newItem.itsUUID == anItem.itsUUID
True
>>> newItem.itsView is anItem.itsView
False
>>> newItem.itsView.repository is anItem.itsView.repository
True

Note that you should close a view when you are done with it, to avoid possible memory or resource leaks:

>>> newItem.itsView.closeView()
>>> anItem.itsView.closeView()

For this test document, we'll also close the repository and our null repository view:

>>> rep.close()
>>> rv.closeView()