LongRunningTasks

Introduction

The following was posted to the comp.lang.python newsgroup on 24-May-2001. Since it explains very well the options for keeping the GUI responsive when the application has to do long running tasks I decided to copy it here verbatim. Many thanks to David Bolen for taking the time and effort to compose and post this message.

--Robin

 

The Question

Daniel Frame writes:

  • Basically, I've constructed a GUI with wxPython which runs a very long function when a 'START' button is pressed. Of course, the GUI is 'locked up' until this function completes itself. I was wanting to add a 'STOP' button to my application which would halt this other function if the user got tired of waiting for it to complete. Is it possible to accomplish this without resorting to using threads? If so, How can I have my running function check occasionally to see if the stop button was pressed? I've heard of wxYield, but I can't seem to implement it properly.

 

David's Response

There's really three possibilities I would think of - threading, wxYield, or chunking up your processing in a wxEVT_IDLE handler. I've given a small sample of each below.

I personally like threads for this sort of thing (I think they leave the UI the most responsive), but there's no hard and fast rule. So whichever you're more comfortable with.

Threading really isn't that hard - you can just start up a thread to do your processing, and have it send an event to your main GUI thread when it has completed. While it is working, it can check an event object or some other flag variable to indicate that it should give up and stop.

For a simple sort of example, here's a small bit of code that presents a simple (really dumb visually) frame, and uses a worker thread to simulate some processing (that takes 10s, resulting in a value of 10), while permitting it to be aborted. This could be extrapolated to doing any sort of lengthy processing and returning any sort of result. Intermediate events could be generated during the processing to give some indication to the main GUI thread of how things were proceeding (perhaps updating something like a gauge or some other visual indicator).

 

切换行号显示
import time
from threading import *
import wx

# Button definitions
ID_START = wx.NewId()
ID_STOP = wx.NewId()

# Define notification event for thread completion
EVT_RESULT_ID = wx.NewId()

def EVT_RESULT(win, func):
    """Define Result Event."""
    win.Connect(-1, -1, EVT_RESULT_ID, func)

class ResultEvent(wx.PyEvent):
    """Simple event to carry arbitrary result data."""
    def __init__(self, data):
        """Init Result Event."""
        wx.PyEvent.__init__(self)
        self.SetEventType(EVT_RESULT_ID)
        self.data = data

# Thread class that executes processing
class WorkerThread(Thread):
    """Worker Thread Class."""
    def __init__(self, notify_window):
        """Init Worker Thread Class."""
        Thread.__init__(self)
        self._notify_window = notify_window
        self._want_abort = 0
        # This starts the thread running on creation, but you could
        # also make the GUI thread responsible for calling this
        self.start()

    def run(self):
        """Run Worker Thread."""
        # This is the code executing in the new thread. Simulation of
        # a long process (well, 10s here) as a simple loop - you will
        # need to structure your processing so that you periodically
        # peek at the abort variable
        for i in range(10):
            time.sleep(1)
            if self._want_abort:
                # Use a result of None to acknowledge the abort (of
                # course you can use whatever you'd like or even
                # a separate event type)
                wx.PostEvent(self._notify_window, ResultEvent(None))
                return
        # Here's where the result would be returned (this is an
        # example fixed result of the number 10, but it could be
        # any Python object)
        wx.PostEvent(self._notify_window, ResultEvent(10))

    def abort(self):
        """abort worker thread."""
        # Method for use by main thread to signal an abort
        self._want_abort = 1

# GUI Frame class that spins off the worker thread
class MainFrame(wx.Frame):
    """Class MainFrame."""
    def __init__(self, parent, id):
        """Create the MainFrame."""
        wx.Frame.__init__(self, parent, id, 'Thread Test')

        # Dumb sample frame with two buttons
        wx.Button(self, ID_START, 'Start', pos=(0,0))
        wx.Button(self, ID_STOP, 'Stop', pos=(0,50))
        self.status = wx.StaticText(self, -1, '', pos=(0,100))

        self.Bind(wx.EVT_BUTTON, self.OnStart, id=ID_START)
        self.Bind(wx.EVT_BUTTON, self.OnStop, id=ID_STOP)

        # Set up event handler for any worker thread results
        EVT_RESULT(self,self.OnResult)

        # And indicate we don't have a worker thread yet
        self.worker = None

    def OnStart(self, event):
        """Start Computation."""
        # Trigger the worker thread unless it's already busy
        if not self.worker:
            self.status.SetLabel('Starting computation')
            self.worker = WorkerThread(self)

    def OnStop(self, event):
        """Stop Computation."""
        # Flag the worker thread to stop if running
        if self.worker:
            self.status.SetLabel('Trying to abort computation')
            self.worker.abort()

    def OnResult(self, event):
        """Show Result status."""
        if event.data is None:
            # Thread aborted (using our convention of None return)
            self.status.SetLabel('Computation aborted')
        else:
            # Process results here
            self.status.SetLabel('Computation Result: %s' % event.data)
        # In either event, the worker is done
        self.worker = None

class MainApp(wx.App):
    """Class Main App."""
    def OnInit(self):
        """Init Main App."""
        self.frame = MainFrame(None, -1)
        self.frame.Show(True)
        self.SetTopWindow(self.frame)
        return True

if __name__ == '__main__':
    app = MainApp(0)
    app.MainLoop()

Oh, and if you're concerned with hanging on an exit if your thread doesn't terminate for some reason, just add a "self.setDaemon(1)" to the init and Python won't wait for it to terminate.

The second approach, using wxYield, should be fine too - just add a call to wxYield() somewhere within the computation code such that it executes periodically. At that point, any pending window events will be dispatched (permitting the window to refresh, process button presses, etc...). Then, it's similar to the above in that you set a flag so that when the original code gets control after the wxYield() returns it knows to stop processing.

As with the threading case, since all events go through during the wxYield() you need to protect against trying to run the same operation twice.

Here's the equivalent of the above but placing the computation right inside the main window class. Note that one difference is that unlike with threading, the responsiveness of your GUI is now directly related to how frequently you call wxYield, so you may have delays refreshing your window dependent on that frequency. You should notice that this is a bit more sluggish with its frequency of a wxYield() each second.

 

切换行号显示
import time
import wx

# Button definitions
ID_START = wx.NewId()
ID_STOP = wx.NewId()

# GUI Frame class that spins off the worker thread
class MainFrame(wx.Frame):
    """Class MainFrame."""
    def __init__(self, parent, id):
        """Create the MainFrame."""
        wx.Frame.__init__(self, parent, id, 'wxYield Test')

        # Dumb sample frame with two buttons
        wx.Button(self, ID_START, 'Start', pos=(0,0))
        wx.Button(self, ID_STOP, 'Stop', pos=(0,50))
        self.status = wx.StaticText(self, -1, '', pos=(0,100))

        self.Bind (wx.EVT_BUTTON, self.OnStart, id=ID_START)
        self.Bind (wx.EVT_BUTTON, self.OnStop, id=ID_STOP)

        # Indicate we aren't working on it yet
        self.working = 0

    def OnStart(self, event):
        """Start Computation."""
        # Start the processing - this simulates a loop - you need to call
        # wx.Yield at some periodic interval.
        if not self.working:
            self.status.SetLabel('Starting Computation')
            self.working = 1
            self.need_abort = 0

            for i in range(10):
                time.sleep(1)
                wx.Yield()
                if self.need_abort:
                    self.status.SetLabel('Computation aborted')
                    break
            else:
                # Here's where you would process the result
                # Note you should only do this if not aborted.
                self.status.SetLabel('Computation Completed')

            # In either event, we aren't running any more
            self.working = 0

    def OnStop(self, event):
        """Stop Computation."""
        if self.working:
            self.status.SetLabel('Trying to abort computation')
            self.need_abort = 1

class MainApp(wx.App):
    """Class Main App."""
    def OnInit(self):
        """Init Main App."""
        self.frame = MainFrame(None,-1)
        self.frame.Show(True)
        self.SetTopWindow(self.frame)
        return True

if __name__ == '__main__':
    app = MainApp(0)
    app.MainLoop()

And finally, you can do your work within an idle handler. In this case, you let wxPython generate an IDLE event whenever it has completed processing normal user events, and then you perform a "chunk" of your processing in each such case. This can be a little tricker depending on your algorithm since you have to be able to perform the work in discrete pieces. Inside your IDLE handler, you request that it be called again if you aren't done, but you want to make sure that each pass through the handler doesn't take too long.Effectively, each event is similar to the gap between wxYield() calls in the previous example, and your GUI responsiveness will be subject to that latency just as with the wxYield() case.

I'm also not sure you can remove an idle handler once established (or at least I think I had problems with that in the past), so the code below just establishes it once and the handler only does work if it's in the midst of a computation. [Actually, you can use the Disconnect method to remove an event handler binding, although there is no real need to do so as there is very little overhead if you use a guard condition as in the code below. --Robin]

 

切换行号显示
import time
import wx

# Button definitions
ID_START = wx.NewId()
ID_STOP = wx.NewId()

# GUI Frame class that spins off the worker thread
class MainFrame(wx.Frame):
    """Class MainFrame."""
    def __init__(self, parent, id):
        """Create the MainFrame."""
        wx.Frame.__init__(self, parent, id, 'Idle Test')

        # Dumb sample frame with two buttons
        wx.Button(self, ID_START, 'Start',p os=(0,0))
        wx.Button(self, ID_STOP, 'Stop', pos=(0,50))
        self.status = wx.StaticText(self, -1, '', pos=(0,100))

        self.Bind (wx.EVT_BUTTON, self.OnStart, id=ID_START)
        self.Bind (wx.EVT_BUTTON, self.OnStop, id=ID_STOP)
        self.Bind (wx.EVT_IDLE, self.OnIdle)

        # Indicate we aren't working on it yet
        self.working = 0

    def OnStart(self, event):
        """Start Computation."""
        # Set up for processing and trigger idle event
        if not self.working:
            self.status.SetLabel('Starting Computation')
            self.count = 0
            self.working = 1
            self.need_abort = 0

    def OnIdle(self, event):
        """Idle Handler."""
        if self.working:
            # This is where the processing takes place, one bit at a time
            if self.need_abort:
                self.status.SetLabel('Computation aborted')
            else:
                self.count = self.count + 1
                time.sleep(1)
                if self.count < 10:
                    # Still more work to do so request another event
                    event.RequestMore()
                    return
                else:
                    self.status.SetLabel('Computation completed')

            # Reaching here is an abort or completion - end in either case
            self.working = 0

    def OnStop(self, event):
        """Stop Computation."""
        if self.working:
            self.status.SetLabel('Trying to abort computation')
            self.need_abort = 1

class MainApp(wx.App):
    """Class Main App."""
    def OnInit(self):
        """Init Main App."""
        self.frame = MainFrame(None, -1)
        self.frame.Show(True)
        self.SetTopWindow(self.frame)
        return True

if __name__ == '__main__':
    app = MainApp(0)
    app.MainLoop()

 


 

 

Slight modification for recursive tasks

Jeff Grimmett adds:

The above three examples got me going in the right direction, but the actual worker task used is a little on the simple side for what I needed to do, which was to recurse through directories and use the same class for each recursion.

Do I need to tell you that it's a bad idea to recurse 100 threads at once? :-) Highly entertaining to watch but hardly useful of the end goal is to collect data serially!

The trick I settled on was to seperate the thread from the task. The means of communicating the shutdown command was the first thing I tackled. I settled on the python threading module's Event class. This class is basically a flag, which i suppose would also work quite well as long as the scope is correct.

 

切换行号显示
import  threading

# This is a global flag that we will manipulate as needed
# to allow graceful exit from the recursive search.
KeepRunning     =       threading.Event()

The next thing I needed to do was define the scope of the actual worker thread.

 

切换行号显示
class   Worker(threading.Thread):
  def __init__ (self, op, dir1, dir2):
  threading.Thread.__init__(self)
  self.op   = op
  self.Dir1 = dir1
  self.Dir2 = dir2
  KeepRunning.set()

  self.start()

  def run(self):
    self.wb = WorkerBee(self.op, self.Dir1, self.Dir2)
    self.op.AppendText('Done!
')
    wxBell()

    # Assuming you are following the above example somewhat, assume that this is a
    # similar 'reporting' event class that we are calling. It carries a cargo which
    # is in fact the 'head' WorkerBee.
    wxPostEvent(self.op.GetParent(), NewReport(self.wb))

  def abort(self):
    KeepRunning.clear()

  # print to the output window.
  def Update(self, txt):
    self.op.AppendText(txt)
    self.op.ShowPosition(self.op.GetLastPosition()) # keeps the last line visible

The thread initializes with a pointer to the output window - which in this case is a wxTextCtrl() object - and the starting directory. Like the above example, this thread is self-starting, but need not be.

The first big difference is in the run() method. In the above example all the work is actually done BY the run() method. In this case, we create a WorkerBee object and retain a handle on it. Exactly why will become clear in a moment.

The second difference is that instead of keeping our 'keep running' variable local, we now have it global in the form of the KeepGoing Event object. Again, the reason why becomes clear in the next part.

While not directly on topic, the Update() method is simply a convenience to deal with the output display. If you were going to stdout this would be entirely uncessesary.

====

The WorkerBee class is the main point of difference between this example and the previous. Instead of the thread object doing all the work, it starts up the WorkerBee object.

The WorkerBee class recursively scans two directories and compares the results using the filecmp Python module. The exact task is not important, but I had to use something :-)

 

切换行号显示
class WorkerBee:
  def __init__ (self, op, dir1, dir2):
    self.DiffList = []  # We will retain a list of changed files
    self.DirList  = []  # We will retain a list of directories.
    self.A        = dir1
    self.B        = dir2
    self.op       = op

    self.Update('scanning %s
' % self.A)

    self.cmp = filecmp.dircmp(self.A, self.B)

    self.Deleted = self.cmp.left_only
    self.New     = self.cmp.right_only
    self.Files   = self.cmp.common_files
    self.Dirs    = self.cmp.common_dirs

    for i in self.Files :
     if not KeepRunning.isSet(): break

       self.Update('	%s\%s' %(self.A,i))

       if filecmp.cmp('%s\%s' % (self.A, i), '%s\%s' % (self.B, i), shallow=0) == 0 :
         self.Update('	<---- DIFF ***
')      # A diff!
         self.DiffList.append(i)
       else:
         self.Update('
')

    for i in self.Dirs  :
      if not KeepRunning.isSet(): break

      self.DirList.append ( WorkerBee ( op,
                                        '%s\%s' % (self.A, i),
                                        '%s\%s' % (self.B, i)
                                      )
                          )

  def   Update(self, txt):
    self.op.AppendText(txt)
    self.op.ShowPosition(self.op.GetLastPosition())

THIS is why we need to keep the thread object and the workerbee seperate. If the workerbee was built around the Thread class, we would have a SWARM of workerbees! However, in this case what we end up with is a controlled recursion into a directory tree, with frequent checks on the KeepRunning Event flag. The whole thing shuts down quite nicely and leaves the GUI VERY responsive.

 

More Tips

ChuckEsterbrook asked:

The LongRunningTasks wiki page doesn't mention using Queues. When people use this technique what is the set up? For example, is your "worker thread" pushing data into the queue and then a wx.Timer is polling it out with a non-blocking q.get_nowait()?

RobinDunn replied:

That's one way. Another is to call wxWakeUpIdle when the item is put in the Queue, and then have a EVT_IDLE handler that fetches items from the queue. That way the item is usually processed very soon after it is put there without having to have a higher frequency timer.

Another option is to not use the queue or wxPostEvent and just use wxCallAfter passing to it the callable and parameters to be called in the GUI thread. (It uses wxPostEvent internally.) The implementation and docs of wxCallAfter is in core.py.

MarienZwart would like to add:

This is actually (sort of) documented in the wxwidgets docs:

  • For communication between secondary threads and the main thread, you may use wxEvtHandler::AddPendingEvent or its short version wxPostEvent. These functions have a thread-safe implementation so that they can be used as they are for sending events from one thread to another. However there is no built in method to send messages to the worker threads and you will need to use the available synchronization classes to implement the solution which suits your needs yourself.

So you do not need a Queue to send messages from the worker thread to the main/gui thread: just use AddPendingEvent in the worker and a normal event handler in the gui. However if you need to send messages from the main/gui thread to the worker thread the right way to do it is probably a Queue, with the gui thread calling put() and the worker regularly checking for messages with get_nowait(). You can *probably* get away with using a simple boolean flag like some examples on this page use without any extra locking thanks to the GIL (Global Interpreter Lock), but for anything more complicated than that you need locking (like the Event used in the previous example), and for more than one kind of "message" a queue is usually the simplest way to go. Even something as simple as a worker thread that can be paused, resumed and cancelled is IMHO simpler using a queue than using a bunch of Event and/or Condition objects (the worker can just .get() on the queue to pause).

 


 

See also Brian Kelley's demo (Process.py) which shows how to do a long task in a separate process (using wxProcess and wxExecute) rather than a thread. A separate process is like running a separate application - it can run full speed, whereas a thread might be slowed about 30% by Python's global interpreter lock (I don't know which version you were using, but at least for the first few threads, Python does pretty well, resulting in fairly minimal overhead - Josiah).

MVoncken suggests: I prefer using a generator,(variant on wxYield,Example2):

切换行号显示
        def DoSomeLongTask(self):
            """
            usualy defined somewhere else
            """
            for i in range(10):
                 time.sleep(1)
                 yield i

        def OnStart(self, event):
            # Over-Simplified version.
            # iterate over DoSomeLongTask,call wxYield and display status
            for status in DoSomeLongTask():
                #display status here.. (wx.ProgressBar?)  
                wxYield()
                if self.need_abort:
                        break

 


 

While using a generator and wx.Yield() works, you are limited to having a single long-running-task at any time. This may be a serious limitation to your application. Instead, you can use the built-in wx.FutureCall() function to schedule as many calls as you want/need. I use this in to handle 'replace all' in one of my applications.

Josiah suggests:

 

切换行号显示
    def DoSomeLongTask(self, state):
        #pull the state out
        x,y,z = state
        if not self.need_abort:
            time.sleep(1)
            x -= 1
            if x > 0 and not self.need_abort:
                wx.FutureCall(1, self.DoSomeLongTask, (x,y,z))

    def OnStart(self, event):
        # Over-Simplified version.
        self._DoSomeLongTask((10, 1, 2))

Alternatively, since wx.FutureCall creates and manipulates wx.Timer instances, we could just create a registry of stuff to call when a timer expires, and allow insertion of items to call into this registry. Why do we use a Timer/FutureCall instead of wx.CallAfter? It's mostly a matter of taste, but we use self.IsRunning() rather than keeping explicit state as a convenience.

Josiah also suggests:

 

切换行号显示
import traceback

class CallRegistry(wx.Timer):
    def __init__(self, delay=1):
        wx.Timer.__init__(self)
        self.tasks = []
        if delay < 1:
            delay = 1
        self.delay = delay

    def Notify(self):
        tl = []
        otl = self.tasks
        self.tasks = []
        for x in otl:
            try:
                i,j = x
                x = i(j)
                if x:
                    try:
                        i,j = x
                    except:
                        if callable(x):
                            tl.append((x, None))
                    else:
                        tl.append(x)
            except (KeyboardInterrupt, SystemExit):
                raise
            except:
                traceback.print_exc()
        self.tasks.extend(tl)
        if not self.tasks:
            self.Stop()

    def CallLater(self, fcn, state=None):
        self.tasks.append((fcn, state))
        if not self.IsRunning():
            self.Start(self.delay, wx.TIMER_CONTINUOUS)

Sample use of the above:

 

切换行号显示
class MyFrame(wx.Frame):
    def __init__(self, ...):
        ...
        self.cr = CallRegistry()
        self.cr.CallLater(self.Task1)
        self.cr.CallLater(self.Task2)
        ...

    def Task1(self, state):
        ...
        #automatically reschedules itself
        return self.Task1, newstate

    def Task2(self, state):
        ...
        if c1:
            #schedules some other task without arguments
            return self.Task3
        else:
            #reschedules itself again
            return self.Task2, newstate

 

Redirecting text from stdout to a wx.TextCtrl

I thought I might add this to the discussion. There are probably more ways than one to do this but... This is some sample code that I put together that updates a text control in wxpython from output being sent to stdout through backend "print" statments being executed in a separate thread.

The idea here is to that you're creating a class with a "write" method to rebind to stdout (the real value of stdout is always kept in sys.stdout for rebinding). Everytime a message is spit out to stdout an event is thrown out to the text control for processing. The problem is that the text control doesn't update itself. Therefore we need to get the control to process its pending events. Therefore we need to set up a timer that gets the text control to attempt to process its pending events every so often.

The threading code is pulled from Python in a Nutshell pretty much (which is what I'm using). It uses 2 Queues to process events. requestID, callable, args, kwds = self.requestQ.get() will suspend the thread untill the callable returns, and when it completes self.resultQ.put((requestID, callable(*args, **kwds))) will tell us that the process has finished its task. When this is done we submit an event to the GUI telling us that we are done.

There's better ways to poll the resultQ but for my purposes here this works pretty nicely. Anyway hope this is useful to someone...

 

切换行号显示
#!/usr/bin/env python

import wx, threading, Queue, sys, time
from wx.lib.newevent import NewEvent

ID_BEGIN=100
wxStdOut, EVT_STDDOUT= NewEvent()
wxWorkerDone, EVT_WORKER_DONE= NewEvent()

def LongRunningProcess(lines_of_output):
    for x in range(lines_of_output):
        print "I am a line of output (hi!)...."
        time.sleep(1)

class MainFrame(wx.Frame):
    def __init__(self, parent, id, title):
        wx.Frame.__init__(self, parent, id, title, size=(300, 300))
        self.requestQ = Queue.Queue() #create queues
        self.resultQ = Queue.Queue()

        #widgets
        p = wx.Panel(self)
        self.output_window = wx.TextCtrl(p, -1,
                             style=wx.TE_AUTO_SCROLL|wx.TE_MULTILINE|wx.TE_READONLY)
        self.go = wx.Button(p, ID_BEGIN, 'Begin')
        self.output_window_timer = wx.Timer(self.output_window, -1)

        #frame sizers
        sizer = wx.BoxSizer(wx.VERTICAL)
        sizer.Add(self.output_window, 10, wx.EXPAND)
        sizer.Add(self.go, 1, wx.EXPAND)
        p.SetSizer(sizer)

        #events
        wx.EVT_BUTTON(self, ID_BEGIN, self.OnBeginTest)
        self.output_window.Bind(EVT_STDDOUT, self.OnUpdateOutputWindow)
        self.output_window.Bind(wx.EVT_TIMER, self.OnProcessPendingOutputWindowEvents)
        self.Bind(EVT_WORKER_DONE, self.OnWorkerDone)

        #thread
        self.worker = Worker(self, self.requestQ, self.resultQ)

    def OnUpdateOutputWindow(self, event):
        value = event.text
        self.output_window.AppendText(value)

    def OnBeginTest(self, event):
        lines_of_output=7
        self.go.Disable()
        self.worker.beginTest(LongRunningProcess, lines_of_output)
        self.output_window_timer.Start(50)

    def OnWorkerDone(self, event):
        self.output_window_timer.Stop()
        self.go.Enable()

    def OnProcessPendingOutputWindowEvents(self, event):
        self.output_window.ProcessPendingEvents()

class Worker(threading.Thread):
    requestID = 0
    def __init__(self, parent, requestQ, resultQ, **kwds):
        threading.Thread.__init__(self, **kwds)
        self.setDaemon(True)
        self.requestQ = requestQ
        self.resultQ = resultQ
        self.start()

    def beginTest(self, callable, *args, **kwds):
        Worker.requestID +=1
        self.requestQ.put((Worker.requestID, callable, args, kwds))
        return Worker.requestID

    def run(self):
        while True:
            requestID, callable, args, kwds = self.requestQ.get()
            self.resultQ.put((requestID, callable(*args, **kwds)))
            evt = wxWorkerDone()
            wx.PostEvent(wx.GetApp().frame, evt)

class SysOutListener:
    def write(self, string):
        sys.__stdout__.write(string)
        evt = wxStdOut(text=string)
        wx.PostEvent(wx.GetApp().frame.output_window, evt)

class MyApp(wx.App):
    def OnInit(self):
        self.frame = MainFrame(None, -1, 'rebinding stdout')
        self.frame.Show(True)
        self.frame.Center()
        return True

#entry point
if __name__ == '__main__':
    app = MyApp(0)
    sys.stdout = SysOutListener()
    app.MainLoop()

 

Easiest Implementation *Ever* :)

So you say you want to perform a long running task from a wxPython GUI? Well, it don't get no simpler than this! Use threads but don't mess with a separate thread class. Make sure if you're running in the other thread that you interact with the GUI via wx.CallAfter.

 

切换行号显示
import wx
import thread
from time import sleep

class MainFrame(wx.Frame):

    def __init__(self, parent):
        wx.Frame.__init__(self, parent)

        self.label = wx.StaticText(self, label="Ready")
        self.btn = wx.Button(self, label="Start")
        self.gauge = wx.Gauge(self)

        sizer = wx.BoxSizer(wx.VERTICAL)
        sizer.Add(self.label, proportion=1, flag=wx.EXPAND)
        sizer.Add(self.btn, proportion=0, flag=wx.EXPAND)
        sizer.Add(self.gauge, proportion=0, flag=wx.EXPAND)

        self.SetSizerAndFit(sizer)

        self.Bind(wx.EVT_BUTTON, self.onButton)

    def onButton(self, evt):
        self.btn.Enable(False)
        self.gauge.SetValue(0)
        self.label.SetLabel("Running")
        thread.start_new_thread(self.longRunning, ())

    def onLongRunDone(self):
        self.gauge.SetValue(100)
        self.label.SetLabel("Done")
        self.btn.Enable(True)

    def longRunning(self):
        """This runs in a different thread.  Sleep is used to simulate a long running task."""
        sleep(3)
        wx.CallAfter(self.gauge.SetValue, 20)
        sleep(5)
        wx.CallAfter(self.gauge.SetValue, 50)
        sleep(1)
        wx.CallAfter(self.gauge.SetValue, 70)
        sleep(10)
        wx.CallAfter(self.onLongRunDone)

if __name__ == "__main__":
    app = wx.PySimpleApp()
    app.TopWindow = MainFrame(None)
    app.TopWindow.Show()
    app.MainLoop()

 

THE Easiest Implementation *Ever* :-)))

...no, really. All (most) of the above cases are one-way communication examples. Sometimes, you just want the main thread to be non-gui and communicate with the gui in a blocking manner. See http://radekpodgorny.blogspot.cz/2012/12/working-with-wxpython-in-separate-thread.html on how to achieve that.

/! update: robin was so kind to comment on my solution. i'll investigate all his ideas and probably refactor/improve the code. stay tuned and THANK YOU, robin! ;-)

原文地址:https://www.cnblogs.com/dengyigod/p/3304586.html