Some reactions to user actions need more than a few seconds to compute. Jakob Nielsen for tasks over 10 seconds waiting time “users will want to perform other tasks while waiting for the computer to finish”. He as others also states that an activity indication and an estimation of the total time to completion are important.
Google App Engine enforces the paradigm, that webpages have to respond within a few seconds with it’s request deadline of 10 seconds (later lifted to 30 and then to 60 seconds).
For an Web-Application an low latancy design for long computation means something like this:
This involves a lot of moving parts including RPC between bachend task and frontend. Besides the activity indicator the requirement to handle users performing other tasks while waiting. That basically means, that users must be able to bookmark the computation and navigate away and later come back to retrive the result.
For storing state and simple minded RPC the Datastore is the best option. We can use an Model like this:
class LongTask(db.Model):
parameters_blob = db.BlobProperty()
result_blob = db.BlobProperty()
status = db.StringProperty(required=True, default='ready',
choices=['ready', # to be started
'started', # task has been initiated
'error', # an error occured during execution
'done',
'finished']
)
Starting a Task could look like this:
# convert HTTP patameters to a dict
# for all parameters not starting with `_`
parameters = dict([(name, self.request.get(name))
for name in self.request.arguments()
if not name.startswith('_')])
# Create task Entity and start Taskqueue
task = gaetk_LongTask(parameters_blob=pickle.dumps(paramters),
status='ready')
task.put()
taskqueue.add(url=TASKHANDLER_URL, method='GET',
params={'_longtaskjob': 'execute',
'_longtaskid': task.key()})
# redirect to status queue
parameters = urlencode([('_longtaskjob', 'query'),
('_longtaskid', task.key())
])
raise HTTP307_TemporaryRedirect(location=STATUS_PAGE_URL
+ '?' + parameters)
The Task handler istself has to do some status updates and then do the real work. In the usual setup it has 10 Minutes todo it’s work:
task = gaetk_LongTask.get(self.request.get('_longtaskid', ''))
task.status = 'started'
task.put()
# Decode Parameters from Datastore and execute the actual task.
parameters = pickle.loads(task.parameters_blob)
try:
# to the actual work:
time.sleep(61) # obviously this is only an example
result = output ot your calculation
task.result_blob = pickle.dumps(result)
task.status = 'done'
task.put()
except Exception, msg:
# If an exception occured, note that and re-raise an error.
task.status = 'error'
task.put()
raise
Than we need the handler to display the status / wait page:
task = gaetk_LongTask.get(self.request.get('_longtaskid', ''))
if task.status == 'done':
result = pickle.loads(task.result_blob)
self.response.out.write(result)
elif task.status == 'started':
# Generate HTML output
html = u"""<html><head><meta http-equiv="refresh" content="3">
<title>Task Status</title></head>
<body><p>Still working.</p></body></html>"""
self.response.write(html)
elif task.status == 'error':
html = u"""<html><head><meta http-equiv="refresh" content="3">
<title>Task Status</title></head>
<body><p>Error, automated retry in progress.</p></body></html>"""
self.response.write(html)
This implements the machinery to run tasks anyncronous to displaying pages and it also implements bookmarkable, stable URLs for tasks. What we are still missing is a progress indicator. This requires somewhat more sophisticated messaging between the backend task, the fontend server and the client.
For intra-Server communication we can use memcache which works “reliable enough” for our purposes. The long-running computation can log a text message the current step and the (expected) total number of steps.
def log_progress(self, message, step=0, total_steps=0):
memcache.set("longtask_status_%s" % self.task.key(),
dict(message=message, step=step,
total_steps=total_steps))
The handler for the status / wait page can now be extended to display progress.
statusinfo = memcache.get("longtask_status_%s" % task.key())
info = ''
if statusinfo and statusinfo.get('total_steps'):
info = u"""<p>Progress:
<progress value="%d" max="%d">%d %%</progress></p>
""" % (statusinfo.get('step'),
statusinfo.get('total_steps'),
int(statusinfo.get('step') * 100.0 / statusinfo.get('total_steps')))
else:
# indetermine progress bar
info = u"<p><progress></progress></p>"
if statusinfo and statusinfo.get('message'):
info += '<p>%s</p>' % statusinfo.get('message')
html = u"""<html><head><meta http-equiv="refresh" content="3">
<title>Task Status</title></head>
<body>%s</body></html>""" % info
self.response.write(html)
This can end up in an awful lot of code scattered arround. Fotunately it leands itself to a nice OOP implementation. gae_longtask
all bundles it up.
In the simpelest form you can use it by just overwriting a single function. See the gaetk_longtask
. Here a somewhat more involved real-world example:
class IntrastatRohdaten(longtask.LongRunningTaskHandler):
def execute_task(self, parameters):
monthlast = date.today() - timedelta(days=datetime.date.today().day + 1)
monthfirst = monthlast - timedelta(days=vormonatsletzter.day - 1)
self.log_progress("Calculating INTRASTAT Data %s to %s"
% (monthlast, monthfirst),
step=0, total_steps=33)
count = 0
days = []
day = monthlast
while day <= monthfirst:
days.append(day)
day += datetime.timedelta(days=1)
for day in days:
count += 1
self.log_progress("Handling %s: so far %d Invoices" % (day, count),
step=day.day, total_steps=len(days) + 2)
for rechnung in Rechnung.all().filter('datum =', tag).run():
intrastat_fuer_rechung(rechnung)
self.log_progress("Summing up", step=32, total_steps=33)
vormonat = intrastat_monat_aufsummieren(monthfirst)
self.log_progress("Formating", step=33, total_steps=33)
csvdata = reformat(vormonat)
return csvdata
def display_result(self, paramters, result):
self.response.headers['Content-Type'] = 'text/plain'
self.response.write(result)
Get the gae_longtask
here at github.