Continued work on GEP008: move gen/utils/longop.py, gen/utils/progressmon.py, ProgressDialog.py into gui/widgets/progressdialog.py.

svn: r13980
This commit is contained in:
Brian Matherly 2010-01-06 05:27:28 +00:00
parent 7aec8904ea
commit 1fa4e8d66c
10 changed files with 609 additions and 613 deletions

View File

@ -58,8 +58,6 @@ src/gen/__init__.py
src/gen/utils/__init__.py
src/gen/utils/callback.py
src/gen/utils/callman.py
src/gen/utils/longop.py
src/gen/utils/progressmon.py
# gen proxy API
src/gen/proxy/living.py
@ -295,6 +293,7 @@ src/gui/widgets/buttons.py
src/gui/widgets/expandcollapsearrow.py
src/gui/widgets/labels.py
src/gui/widgets/menutoolbuttonaction.py
src/gui/widgets/progressdialog.py
src/gui/widgets/styledtexteditor.py
src/gui/widgets/validatedmaskedentry.py

View File

@ -1,208 +0,0 @@
"""
This module provides a progess dialog for displaying the status of
long running operations.
"""
import gtk
class _GtkProgressBar(gtk.VBox):
"""This widget displays the progress bar and labels for a progress
indicator. It provides an interface to updating the progress bar.
"""
def __init__(self, long_op_status):
""":param long_op_status: the status of the operation.
:type long_op_status: L{gen.utils.LongOpStatus}
"""
gtk.VBox.__init__(self)
msg = long_op_status.get_msg()
self._old_val = -1
self._lbl = gtk.Label(msg)
self._lbl.set_use_markup(True)
#self.set_border_width(24)
self._pbar = gtk.ProgressBar()
self._hbox = gtk.HBox()
# Only display the cancel button is the operation
# can be canceled.
if long_op_status.can_cancel():
self._cancel = gtk.Button(stock=gtk.STOCK_CANCEL)
self._cancel.connect("clicked",
lambda x: long_op_status.cancel())
self._cancel.show()
self._hbox.pack_end(self._cancel)
self._hbox.pack_start(self._pbar)
self.pack_start(self._lbl, expand=False, fill=False)
self.pack_start(self._hbox, expand=False, fill=False)
self._pbar_max = (long_op_status.get_total_steps()/
long_op_status.get_interval())
self._pbar_index = 0.0
self._pbar.set_fraction(((100/float(long_op_status.get_total_steps())*
float(long_op_status.get_interval())))/
100.0)
if msg != '':
self._lbl.show()
self._pbar.show()
self._hbox.show()
def step(self):
"""Move the progress bar on a step.
"""
self._pbar_index = self._pbar_index + 1.0
if self._pbar_index > self._pbar_max:
self._pbar_index = self._pbar_max
try:
val = int(100*self._pbar_index/self._pbar_max)
except ZeroDivisionError:
val = 0
if val != self._old_val:
self._pbar.set_text("%d%%" % val)
self._pbar.set_fraction(val/100.0)
self._pbar.old_val = val
class GtkProgressDialog(gtk.Dialog):
"""A gtk window to display the status of a long running
process."""
def __init__(self, window_params, title):
""":param title: The title to display on the top of the window.
:type title: string
"""
gtk.Dialog.__init__(self, *window_params)
self.connect('delete_event', self._warn)
self.set_has_separator(False)
self.set_title(title)
#self.set_resize_mode(gtk.RESIZE_IMMEDIATE)
#self.show()
self._progress_bars = []
def add(self, long_op_status):
"""Add a new status object to the progress dialog.
:param long_op_status: the status object.
:type long_op_status: L{gen.utils.LongOpStatus}
:returns: a key that can be used as the L{pbar_idx}
to the other methods.
:rtype: int
"""
pbar = _GtkProgressBar(long_op_status)
self.vbox.pack_start(pbar, expand=False, fill=False)
pbar.show()
# this seems to cause an infinite loop:
#self.resize_children()
self._progress_bars.append(pbar)
# This is a bad idea; could cause deletes while adding:
#self._process_events()
return len(self._progress_bars)-1
def remove(self, pbar_idx):
"""Remove the specified status object from the progress dialog.
:param pbar_idx: the index as returned from L{add}
:type pbar_idx: int
"""
if pbar_idx is not None:
pbar = self._progress_bars[pbar_idx]
self.vbox.remove(pbar)
del self._progress_bars[pbar_idx]
def step(self, pbar_idx):
"""Click the progress bar over to the next value. Be paranoid
and insure that it doesn't go over 100%.
:param pbar_idx: the index as returned from L{add}
:type pbar_idx: int
"""
if pbar_idx < len(self._progress_bars):
self._progress_bars[pbar_idx].step()
self._process_events()
def _process_events(self):
while gtk.events_pending():
gtk.main_iteration()
def show(self):
"""Show the dialog and process any events.
"""
gtk.Dialog.show(self)
self._process_events()
def hide(self):
"""Hide the dialog and process any events.
"""
gtk.Dialog.hide(self)
self._process_events()
def _warn(self, x, y):
return True
def close(self):
self.destroy()
if __name__ == '__main__':
import time
from gen.utils import LongOpStatus, ProgressMonitor
def test(a, b):
d = ProgressMonitor(GtkProgressDialog)
s = LongOpStatus("Doing very long operation", 100, 10, can_cancel=True)
d.add_op(s)
for i in xrange(0, 99):
if s.should_cancel():
break
time.sleep(0.1)
if i == 30:
t = LongOpStatus("doing a shorter one", 100, 10,
can_cancel=True)
d.add_op(t)
for j in xrange(0, 99):
if s.should_cancel():
t.cancel()
break
if t.should_cancel():
break
time.sleep(0.1)
t.heartbeat()
if not t.was_cancelled():
t.end()
if i == 60:
t = LongOpStatus("doing another shorter one", 100, 10)
d.add_op(t)
for j in xrange(0, 99):
if s.should_cancel():
t.cancel()
break
time.sleep(0.1)
t.heartbeat()
t.end()
s.heartbeat()
if not s.was_cancelled():
s.end()
w = gtk.Window(gtk.WINDOW_TOPLEVEL)
w.connect('destroy', gtk.main_quit)
button = gtk.Button("Test")
button.connect("clicked", test, None)
w.add(button)
button.show()
w.show()
gtk.main()
print 'done'

View File

@ -10,9 +10,7 @@ pkgdata_PYTHON = \
callback.py \
callman.py \
configmanager.py \
fallback.py \
progressmon.py \
longop.py
fallback.py
pkgpyexecdir = @pkgpyexecdir@/gen/utils
pkgpythondir = @pkgpythondir@/gen/utils

View File

@ -24,6 +24,4 @@ Generic utilities useful for users of the gen package
from fallback import *
from configmanager import ConfigManager
from progressmon import ProgressMonitor
from longop import LongOpStatus
from callback import Callback

View File

@ -1,222 +0,0 @@
import time
from callback import Callback
class LongOpStatus(Callback):
"""LongOpStatus provides a way of communicating the status of a long
running operations. The intended use is that when a long running operation
is about to start it should create an instance of this class and emit
it so that any listeners can pick it up and use it to record the status
of the operation.
Signals
=======
op-heartbeat - emitted every 'interval' calls to heartbeat.
op-end - emitted once when the operation completes.
Example usage:
class MyClass(Callback):
__signals__ = {
'op-start' : object
}
def long(self):
status = LongOpStatus("doing long job", 100, 10)
for i in xrange(0,99):
time.sleep(0.1)
status.heartbeat()
status.end()
class MyListener(object):
def __init__(self):
self._op = MyClass()
self._op.connect('op-start', self.start)
self._current_op = None
def start(self,long_op):
self._current_op.connect('op-heartbeat', self.heartbeat)
self._current_op.connect('op-end', self.stop)
def hearbeat(self):
# update status display
def stop(self):
# close the status display
self._current_op = None
"""
__signals__ = {
'op-heartbeat' : None,
'op-end' : None
}
def __init__(self, msg="",
total_steps=None,
interval=1,
can_cancel=False):
"""
@param msg: A Message to indicated the purpose of the operation.
@type msg: string
@param total_steps: The total number of steps that the operation
will perform.
@type total_steps:
@param interval: The number of iterations between emissions.
@type interval:
@param can_cancel: Set to True if the operation can be cancelled.
If this is set the operation that creates the status object should
check the 'should_cancel' method regularly so that it can cancel
the operation.
@type can_cancel:
"""
Callback.__init__(self)
self._msg = msg
self._total_steps = total_steps
# don't allow intervals less that 1
self._interval = max(interval,1)
self._can_cancel = can_cancel
self._cancel = False
self._count = 0
self._countdown = interval
self._secs_left = 0
self._start = time.time()
self._running = True
def __del__(self):
if self._running:
self.emit('op-end')
def heartbeat(self):
"""This should be called for each step in the operation. It will
emit a 'op-heartbeat' every 'interval' steps. It recalcuates the
'estimated_secs_to_complete' from the time taken for previous
steps.
"""
self._countdown -= 1
if self._countdown <= 0:
elapsed = time.time() - self._start
self._secs_left = \
( elapsed / self._interval ) \
* (self._total_steps - self._count)
self._count += self._interval
self._countdown = self._interval
self._start = time.time()
self.emit('op-heartbeat')
def estimated_secs_to_complete(self):
"""Return the number of seconds estimated left before operation
completes. This will change as 'hearbeat' is called.
@return: estimated seconds to complete.
@rtype: int
"""
return self._secs_left
def was_cancelled(self):
"""
Has this process been cancelled?
"""
return self._cancel
def cancel(self):
"""Inform the operation that it should complete.
"""
self._cancel = True
self.end()
def end(self):
"""End the operation. Causes the 'op-end' signal to be emitted.
"""
self.emit('op-end')
self._running = False
def should_cancel(self):
"""Return true of the user has asked for the operation to be cancelled.
@return: True of the operation should be cancelled.
@rtype: bool
"""
return self._cancel
def can_cancel(self):
"""@return: True if the operation can be cancelled.
@rtype: bool
"""
return self._can_cancel
def get_msg(self):
"""@return: The current status description messages.
@rtype: string
"""
return self._msg
def set_msg(self, msg):
"""Set the current description message.
@param msg: The description message.
@type msg: string
"""
self._msg = msg
def get_total_steps(self):
"""Get to total number of steps. NOTE: this is not the
number of times that the 'op-heartbeat' message will be
emited. 'op-heartbeat' is emited get_total_steps/interval
times.
@return: total number of steps.
@rtype: int
"""
return self._total_steps
def get_interval(self):
"""Get the interval between 'op-hearbeat' signals.
@return: the interval between 'op-hearbeat' signals.
@rtype: int
"""
return self._interval
if __name__ == '__main__':
s = LongOpStatus("msg", 100, 10, can_cancel=True)
def heartbeat():
print "heartbeat ", s.estimated_secs_to_complete()
def end():
print "end"
s.connect('op-heartbeat', heartbeat)
s.connect('op-end', end)
import signal
def ctrlc_handler(signum, frame):
print "Received interrupt!"
s.cancel()
signal.signal(signal.SIGINT, ctrlc_handler)
for i in xrange(0, 99):
if s.should_cancel():
break
time.sleep(0.1)
s.heartbeat()
if not s.was_cancelled():
s.end()

View File

@ -1,172 +0,0 @@
#
# Gramps - a GTK+/GNOME based genealogy program
#
# Copyright (C) 2007 Richard Taylor
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
This module provides a progess dialog for displaying the status of
long running operations.
"""
import logging
log = logging.getLogger(".gen")
from gettext import gettext as _
class _StatusObjectFacade(object):
"""This provides a simple structure for recording the information
needs about a status object."""
def __init__(self, status_obj, heartbeat_cb_id=None, end_cb_id=None):
"""
@param status_obj:
@type status_obj: L{gen.utils.LongOpStatus}
@param heartbeat_cb_id: (default: None)
@type heartbeat_cb_id: int
@param end_cb_id: (default: None)
@type end_cb_id: int
"""
self.status_obj = status_obj
self.heartbeat_cb_id = heartbeat_cb_id
self.end_cb_id = end_cb_id
self.pbar_idx = None
self.active = False
class ProgressMonitor(object):
"""A dialog for displaying the status of long running operations.
It will work with L{gen.utils.LongOpStatus} objects to track the
progress of long running operations. If the operations is going to
take longer than I{popup_time} it will pop up a dialog with a
progress bar so that the user gets some feedback about what is
happening.
"""
__default_popup_time = 5 # seconds
def __init__(self, dialog_class, dialog_class_params=(),
title=_("Progress Information"),
popup_time = None):
"""
@param dialog_class: A class used to display the progress dialog.
@type dialog_class: _GtkProgressDialog or the same interface.
@param dialog_class_params: A tuple that will be used as the initial
arguments to the dialog_class, this might be used for passing in
a parent window handle.
@type dialog_class_params: tuple
@param title: The title of the progress dialog
@type title: string
@param popup_time: number of seconds to wait before popup.
@type popup_time: int
"""
self._dialog_class = dialog_class
self._dialog_class_params = dialog_class_params
self._title = title
self._popup_time = popup_time
if self._popup_time is None:
self._popup_time = self.__class__.__default_popup_time
self._status_stack = [] # list of current status objects
self._dlg = None
def _get_dlg(self):
if self._dlg is None:
self._dlg = self._dialog_class(self._dialog_class_params,
self._title)
#self._dlg.show()
return self._dlg
def add_op(self, op_status):
"""Add a new status object to the progress dialog.
@param op_status: the status object.
@type op_status: L{gen.utils.LongOpStatus}
"""
log.debug("adding op to Progress Monitor")
facade = _StatusObjectFacade(op_status)
self._status_stack.append(facade)
idx = len(self._status_stack)-1
# wrap up the op_status object idx into the callback calls
def heartbeat_cb():
self._heartbeat(idx)
def end_cb():
self._end(idx)
facade.heartbeat_cb_id = op_status.connect('op-heartbeat',
heartbeat_cb)
facade.end_cb_id = op_status.connect('op-end', end_cb)
def _heartbeat(self, idx):
# check the estimated time to complete to see if we need
# to pop up a progress dialog.
log.debug("heartbeat in ProgressMonitor")
if idx >= len(self._status_stack):
# this item has been cancelled
return
facade = self._status_stack[idx]
if facade.status_obj.estimated_secs_to_complete() > self._popup_time:
facade.active = True
if facade.active:
dlg = self._get_dlg()
if facade.pbar_idx is None:
facade.pbar_idx = dlg.add(facade.status_obj)
dlg.show()
dlg.step(facade.pbar_idx)
def _end(self, idx):
# hide any progress dialog
# remove the status object from the stack
log.debug("received end in ProgressMonitor")
if idx >= len(self._status_stack):
# this item has been cancelled
return
while idx < len(self._status_stack) - 1:
self._end(len(self._status_stack) - 1)
facade = self._status_stack[idx]
if facade.active:
dlg = self._get_dlg()
if len(self._status_stack) == 1:
dlg.hide()
dlg.remove(facade.pbar_idx)
facade.status_obj.disconnect(facade.heartbeat_cb_id)
facade.status_obj.disconnect(facade.end_cb_id)
del self._status_stack[idx]

View File

@ -77,11 +77,10 @@ from gui import widgets
import UndoHistory
from gui.dbloader import DbLoader
import GrampsDisplay
from gen.utils import ProgressMonitor
from gui.widgets.progressdialog import ProgressMonitor, GtkProgressDialog
from gen.db.backup import backup
from gen.db.exceptions import DbException
from GrampsAboutDialog import GrampsAboutDialog
import ProgressDialog
#-------------------------------------------------------------------------
#
@ -324,7 +323,7 @@ class ViewManager(CLIManager):
vbox.show()
self.progress_monitor = ProgressMonitor(
ProgressDialog.GtkProgressDialog, ("", self.window))
GtkProgressDialog, ("", self.window))
self.uistate = DisplayState.DisplayState(
self.window, self.statusbar, self.progress, self.warnbtn,

View File

@ -51,7 +51,7 @@ import gtk
#-------------------------------------------------------------------------
import config
from Utils import conv_unicode_tosrtkey_ongtk
from gen.utils.longop import LongOpStatus
from gui.widgets.progressdialog import LongOpStatus
from Lru import LRU
from bisect import bisect_right
from Filters import SearchFilter, ExactSearchFilter

View File

@ -13,6 +13,7 @@ pkgdata_PYTHON = \
linkbox.py \
menutoolbuttonaction.py \
monitoredwidgets.py \
progressdialog.py \
shortlistcomboentry.py \
springseparator.py \
statusbar.py \

View File

@ -0,0 +1,603 @@
#
# Gramps - a GTK+/GNOME based genealogy program
#
# Copyright (C) 2010 Brian G. Matherly
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# $Id$
"""
This module provides a progress dialog for displaying the status of
long running operations.
"""
#-------------------------------------------------------------------------
#
# Standard python modules
#
#-------------------------------------------------------------------------
import time
from gettext import gettext as _
import logging
log = logging.getLogger("gen.progressdialog")
#-------------------------------------------------------------------------
#
# GTK modules
#
#-------------------------------------------------------------------------
import gtk
#-------------------------------------------------------------------------
#
# Gramps modules
#
#-------------------------------------------------------------------------
from gen.utils.callback import Callback
#-------------------------------------------------------------------------
#
# LongOpStatus
#
#-------------------------------------------------------------------------
class LongOpStatus(Callback):
"""LongOpStatus provides a way of communicating the status of a long
running operations. The intended use is that when a long running operation
is about to start it should create an instance of this class and emit
it so that any listeners can pick it up and use it to record the status
of the operation.
Signals
=======
op-heartbeat - emitted every 'interval' calls to heartbeat.
op-end - emitted once when the operation completes.
Example usage:
class MyClass(Callback):
__signals__ = {
'op-start' : object
}
def long(self):
status = LongOpStatus("doing long job", 100, 10)
for i in xrange(0,99):
time.sleep(0.1)
status.heartbeat()
status.end()
class MyListener(object):
def __init__(self):
self._op = MyClass()
self._op.connect('op-start', self.start)
self._current_op = None
def start(self,long_op):
self._current_op.connect('op-heartbeat', self.heartbeat)
self._current_op.connect('op-end', self.stop)
def hearbeat(self):
# update status display
def stop(self):
# close the status display
self._current_op = None
"""
__signals__ = {
'op-heartbeat' : None,
'op-end' : None
}
def __init__(self, msg="",
total_steps=None,
interval=1,
can_cancel=False):
"""
@param msg: A Message to indicated the purpose of the operation.
@type msg: string
@param total_steps: The total number of steps that the operation
will perform.
@type total_steps:
@param interval: The number of iterations between emissions.
@type interval:
@param can_cancel: Set to True if the operation can be cancelled.
If this is set the operation that creates the status object should
check the 'should_cancel' method regularly so that it can cancel
the operation.
@type can_cancel:
"""
Callback.__init__(self)
self._msg = msg
self._total_steps = total_steps
# don't allow intervals less that 1
self._interval = max(interval, 1)
self._can_cancel = can_cancel
self._cancel = False
self._count = 0
self._countdown = interval
self._secs_left = 0
self._start = time.time()
self._running = True
def __del__(self):
if self._running:
self.emit('op-end')
def heartbeat(self):
"""This should be called for each step in the operation. It will
emit a 'op-heartbeat' every 'interval' steps. It recalcuates the
'estimated_secs_to_complete' from the time taken for previous
steps.
"""
self._countdown -= 1
if self._countdown <= 0:
elapsed = time.time() - self._start
self._secs_left = \
( elapsed / self._interval ) \
* (self._total_steps - self._count)
self._count += self._interval
self._countdown = self._interval
self._start = time.time()
self.emit('op-heartbeat')
def estimated_secs_to_complete(self):
"""Return the number of seconds estimated left before operation
completes. This will change as 'hearbeat' is called.
@return: estimated seconds to complete.
@rtype: int
"""
return self._secs_left
def was_cancelled(self):
"""
Has this process been cancelled?
"""
return self._cancel
def cancel(self):
"""Inform the operation that it should complete.
"""
self._cancel = True
self.end()
def end(self):
"""End the operation. Causes the 'op-end' signal to be emitted.
"""
self.emit('op-end')
self._running = False
def should_cancel(self):
"""Return true of the user has asked for the operation to be cancelled.
@return: True of the operation should be cancelled.
@rtype: bool
"""
return self._cancel
def can_cancel(self):
"""@return: True if the operation can be cancelled.
@rtype: bool
"""
return self._can_cancel
def get_msg(self):
"""@return: The current status description messages.
@rtype: string
"""
return self._msg
def set_msg(self, msg):
"""Set the current description message.
@param msg: The description message.
@type msg: string
"""
self._msg = msg
def get_total_steps(self):
"""Get to total number of steps. NOTE: this is not the
number of times that the 'op-heartbeat' message will be
emited. 'op-heartbeat' is emited get_total_steps/interval
times.
@return: total number of steps.
@rtype: int
"""
return self._total_steps
def get_interval(self):
"""Get the interval between 'op-hearbeat' signals.
@return: the interval between 'op-hearbeat' signals.
@rtype: int
"""
return self._interval
#-------------------------------------------------------------------------
#
# _StatusObjectFacade
#
#-------------------------------------------------------------------------
class _StatusObjectFacade(object):
"""This provides a simple structure for recording the information
needs about a status object."""
def __init__(self, status_obj, heartbeat_cb_id=None, end_cb_id=None):
"""
@param status_obj:
@type status_obj: L{LongOpStatus}
@param heartbeat_cb_id: (default: None)
@type heartbeat_cb_id: int
@param end_cb_id: (default: None)
@type end_cb_id: int
"""
self.status_obj = status_obj
self.heartbeat_cb_id = heartbeat_cb_id
self.end_cb_id = end_cb_id
self.pbar_idx = None
self.active = False
#-------------------------------------------------------------------------
#
# ProgressMonitor
#
#-------------------------------------------------------------------------
class ProgressMonitor(object):
"""A dialog for displaying the status of long running operations.
It will work with L{LongOpStatus} objects to track the
progress of long running operations. If the operations is going to
take longer than I{popup_time} it will pop up a dialog with a
progress bar so that the user gets some feedback about what is
happening.
"""
__default_popup_time = 5 # seconds
def __init__(self, dialog_class, dialog_class_params=(),
title=_("Progress Information"),
popup_time = None):
"""
@param dialog_class: A class used to display the progress dialog.
@type dialog_class: GtkProgressDialog or the same interface.
@param dialog_class_params: A tuple that will be used as the initial
arguments to the dialog_class, this might be used for passing in
a parent window handle.
@type dialog_class_params: tuple
@param title: The title of the progress dialog
@type title: string
@param popup_time: number of seconds to wait before popup.
@type popup_time: int
"""
self._dialog_class = dialog_class
self._dialog_class_params = dialog_class_params
self._title = title
self._popup_time = popup_time
if self._popup_time is None:
self._popup_time = self.__class__.__default_popup_time
self._status_stack = [] # list of current status objects
self._dlg = None
def _get_dlg(self):
if self._dlg is None:
self._dlg = self._dialog_class(self._dialog_class_params,
self._title)
#self._dlg.show()
return self._dlg
def add_op(self, op_status):
"""Add a new status object to the progress dialog.
@param op_status: the status object.
@type op_status: L{LongOpStatus}
"""
log.debug("adding op to Progress Monitor")
facade = _StatusObjectFacade(op_status)
self._status_stack.append(facade)
idx = len(self._status_stack)-1
# wrap up the op_status object idx into the callback calls
def heartbeat_cb():
self._heartbeat(idx)
def end_cb():
self._end(idx)
facade.heartbeat_cb_id = op_status.connect('op-heartbeat',
heartbeat_cb)
facade.end_cb_id = op_status.connect('op-end', end_cb)
def _heartbeat(self, idx):
# check the estimated time to complete to see if we need
# to pop up a progress dialog.
log.debug("heartbeat in ProgressMonitor")
if idx >= len(self._status_stack):
# this item has been cancelled
return
facade = self._status_stack[idx]
if facade.status_obj.estimated_secs_to_complete() > self._popup_time:
facade.active = True
if facade.active:
dlg = self._get_dlg()
if facade.pbar_idx is None:
facade.pbar_idx = dlg.add(facade.status_obj)
dlg.show()
dlg.step(facade.pbar_idx)
def _end(self, idx):
# hide any progress dialog
# remove the status object from the stack
log.debug("received end in ProgressMonitor")
if idx >= len(self._status_stack):
# this item has been cancelled
return
while idx < len(self._status_stack) - 1:
self._end(len(self._status_stack) - 1)
facade = self._status_stack[idx]
if facade.active:
dlg = self._get_dlg()
if len(self._status_stack) == 1:
dlg.hide()
dlg.remove(facade.pbar_idx)
facade.status_obj.disconnect(facade.heartbeat_cb_id)
facade.status_obj.disconnect(facade.end_cb_id)
del self._status_stack[idx]
#-------------------------------------------------------------------------
#
# _GtkProgressBar
#
#-------------------------------------------------------------------------
class _GtkProgressBar(gtk.VBox):
"""This widget displays the progress bar and labels for a progress
indicator. It provides an interface to updating the progress bar.
"""
def __init__(self, long_op_status):
""":param long_op_status: the status of the operation.
:type long_op_status: L{gen.utils.LongOpStatus}
"""
gtk.VBox.__init__(self)
msg = long_op_status.get_msg()
self._old_val = -1
self._lbl = gtk.Label(msg)
self._lbl.set_use_markup(True)
#self.set_border_width(24)
self._pbar = gtk.ProgressBar()
self._hbox = gtk.HBox()
# Only display the cancel button is the operation
# can be canceled.
if long_op_status.can_cancel():
self._cancel = gtk.Button(stock=gtk.STOCK_CANCEL)
self._cancel.connect("clicked",
lambda x: long_op_status.cancel())
self._cancel.show()
self._hbox.pack_end(self._cancel)
self._hbox.pack_start(self._pbar)
self.pack_start(self._lbl, expand=False, fill=False)
self.pack_start(self._hbox, expand=False, fill=False)
self._pbar_max = (long_op_status.get_total_steps()/
long_op_status.get_interval())
self._pbar_index = 0.0
self._pbar.set_fraction(((100/float(long_op_status.get_total_steps())*
float(long_op_status.get_interval())))/
100.0)
if msg != '':
self._lbl.show()
self._pbar.show()
self._hbox.show()
def step(self):
"""Move the progress bar on a step.
"""
self._pbar_index = self._pbar_index + 1.0
if self._pbar_index > self._pbar_max:
self._pbar_index = self._pbar_max
try:
val = int(100*self._pbar_index/self._pbar_max)
except ZeroDivisionError:
val = 0
if val != self._old_val:
self._pbar.set_text("%d%%" % val)
self._pbar.set_fraction(val/100.0)
self._pbar.old_val = val
#-------------------------------------------------------------------------
#
# GtkProgressDialog
#
#-------------------------------------------------------------------------
class GtkProgressDialog(gtk.Dialog):
"""A gtk window to display the status of a long running
process."""
def __init__(self, window_params, title):
""":param title: The title to display on the top of the window.
:type title: string
"""
gtk.Dialog.__init__(self, *window_params)
self.connect('delete_event', self._warn)
self.set_has_separator(False)
self.set_title(title)
#self.set_resize_mode(gtk.RESIZE_IMMEDIATE)
#self.show()
self._progress_bars = []
def add(self, long_op_status):
"""Add a new status object to the progress dialog.
:param long_op_status: the status object.
:type long_op_status: L{gen.utils.LongOpStatus}
:returns: a key that can be used as the L{pbar_idx}
to the other methods.
:rtype: int
"""
pbar = _GtkProgressBar(long_op_status)
self.vbox.pack_start(pbar, expand=False, fill=False)
pbar.show()
# this seems to cause an infinite loop:
#self.resize_children()
self._progress_bars.append(pbar)
# This is a bad idea; could cause deletes while adding:
#self._process_events()
return len(self._progress_bars)-1
def remove(self, pbar_idx):
"""Remove the specified status object from the progress dialog.
:param pbar_idx: the index as returned from L{add}
:type pbar_idx: int
"""
if pbar_idx is not None:
pbar = self._progress_bars[pbar_idx]
self.vbox.remove(pbar)
del self._progress_bars[pbar_idx]
def step(self, pbar_idx):
"""Click the progress bar over to the next value. Be paranoid
and insure that it doesn't go over 100%.
:param pbar_idx: the index as returned from L{add}
:type pbar_idx: int
"""
if pbar_idx < len(self._progress_bars):
self._progress_bars[pbar_idx].step()
self._process_events()
def _process_events(self):
while gtk.events_pending():
gtk.main_iteration()
def show(self):
"""Show the dialog and process any events.
"""
gtk.Dialog.show(self)
self._process_events()
def hide(self):
"""Hide the dialog and process any events.
"""
gtk.Dialog.hide(self)
self._process_events()
def _warn(self, x, y):
return True
def close(self):
self.destroy()
if __name__ == '__main__':
def test(a, b):
d = ProgressMonitor(GtkProgressDialog)
s = LongOpStatus("Doing very long operation", 100, 10, can_cancel=True)
d.add_op(s)
for i in xrange(0, 99):
if s.should_cancel():
break
time.sleep(0.1)
if i == 30:
t = LongOpStatus("doing a shorter one", 100, 10,
can_cancel=True)
d.add_op(t)
for j in xrange(0, 99):
if s.should_cancel():
t.cancel()
break
if t.should_cancel():
break
time.sleep(0.1)
t.heartbeat()
if not t.was_cancelled():
t.end()
if i == 60:
t = LongOpStatus("doing another shorter one", 100, 10)
d.add_op(t)
for j in xrange(0, 99):
if s.should_cancel():
t.cancel()
break
time.sleep(0.1)
t.heartbeat()
t.end()
s.heartbeat()
if not s.was_cancelled():
s.end()
w = gtk.Window(gtk.WINDOW_TOPLEVEL)
w.connect('destroy', gtk.main_quit)
button = gtk.Button("Test")
button.connect("clicked", test, None)
w.add(button)
button.show()
w.show()
gtk.main()
print 'done'