1
2
3 """ Asynchronous Task Execution
4 - falls back to Synchronous if no workers are alive
5
6 To run a worker node: python web2py.py -K eden
7 or use UWSGI's 'Mule'
8 or use nssm on Win32: http://web2py.com/books/default/chapter/29/13/deployment-recipes#Using-nssm-to-run-as-a-Windows-service
9
10 NB
11 Need WEB2PY_PATH environment variable to be defined (e.g. /etc/profile)
12 Tasks need to be defined outside conditional model loads (e.g. models/tasks.py)
13 Avoid passing state into the async call as state may change before the message is executed (race condition)
14
15 Old screencast: http://www.vimeo.com/27478796
16
17 @requires: U{B{I{gluon}} <http://web2py.com>}
18
19 @copyright: 2011-2019 (c) Sahana Software Foundation
20 @license: MIT
21
22 Permission is hereby granted, free of charge, to any person
23 obtaining a copy of this software and associated documentation
24 files (the "Software"), to deal in the Software without
25 restriction, including without limitation the rights to use,
26 copy, modify, merge, publish, distribute, sublicense, and/or sell
27 copies of the Software, and to permit persons to whom the
28 Software is furnished to do so, subject to the following
29 conditions:
30
31 The above copyright notice and this permission notice shall be
32 included in all copies or substantial portions of the Software.
33
34 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
35 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
36 OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
37 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
38 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
39 WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
40 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
41 OTHER DEALINGS IN THE SOFTWARE.
42 """
43
44 __all__ = ("S3Task",)
45
46 import datetime
47 import json
48
49 from gluon import current, HTTP, IS_EMPTY_OR
50 from gluon.storage import Storage
51
52 from s3datetime import S3DateTime
53 from s3validators import IS_TIME_INTERVAL_WIDGET, IS_UTC_DATETIME
54 from s3widgets import S3CalendarWidget, S3TimeIntervalWidget
58 """ Asynchronous Task Execution """
59
60 TASK_TABLENAME = "scheduler_task"
61
62
64
65 migrate = current.deployment_settings.get_base_migrate()
66 tasks = current.response.s3.tasks
67
68
69 try:
70 from gluon.scheduler import Scheduler
71 except:
72
73 self.scheduler = None
74 else:
75 self.scheduler = Scheduler(current.db,
76 tasks,
77 migrate=migrate)
78
79
222
223
224
225
226 - def async(self, task, args=None, vars=None, timeout=300):
227 """
228 Wrapper to call an asynchronous task.
229 - run from the main request
230
231 @param task: The function which should be run
232 - async if a worker is alive
233 @param args: The list of unnamed args to send to the function
234 @param vars: The list of named vars to send to the function
235 @param timeout: The length of time available for the task to complete
236 - default 300s (5 mins)
237 """
238
239 if args is None:
240 args = []
241 if vars is None:
242 vars = {}
243
244
245 tasks = current.response.s3.tasks
246 if not tasks:
247 return False
248 if task not in tasks:
249 return False
250
251
252 if not self._is_alive():
253
254 _args = []
255 for arg in args:
256 if isinstance(arg, (int, long, float)):
257 _args.append(str(arg))
258 elif isinstance(arg, basestring):
259 _args.append("%s" % str(json.dumps(arg)))
260 else:
261 error = "Unhandled arg type: %s" % arg
262 current.log.error(error)
263 raise HTTP(501, error)
264 args = ",".join(_args)
265 _vars = ",".join(["%s=%s" % (str(var),
266 str(vars[var])) for var in vars])
267 if args:
268 statement = "tasks['%s'](%s,%s)" % (task, args, _vars)
269 else:
270 statement = "tasks['%s'](%s)" % (task, _vars)
271
272 null = None
273 exec(statement)
274 return None
275
276 auth = current.auth
277 if auth.is_logged_in():
278
279 vars["user_id"] = auth.user.id
280
281
282
283 task_id = current.db.scheduler_task.insert(application_name = "%s/default" % \
284 current.request.application,
285 task_name = task,
286 function_name = task,
287 args = json.dumps(args),
288 vars = json.dumps(vars),
289 timeout = timeout,
290 )
291
292
293 return task_id
294
295
296 - def schedule_task(self,
297 task,
298 args=None,
299 vars=None,
300 function_name=None,
301 start_time=None,
302 next_run_time=None,
303 stop_time=None,
304 repeats=None,
305 period=None,
306 timeout=None,
307 enabled=None,
308 group_name=None,
309 ignore_duplicate=False,
310 sync_output=0):
311 """
312 Schedule a task in web2py Scheduler
313
314 @param task: name of the function/task to be scheduled
315 @param args: args to be passed to the scheduled task
316 @param vars: vars to be passed to the scheduled task
317 @param function_name: function name (if different from task name)
318 @param start_time: start_time for the scheduled task
319 @param next_run_time: next_run_time for the the scheduled task
320 @param stop_time: stop_time for the the scheduled task
321 @param repeats: number of times the task to be repeated (0=unlimited)
322 @param period: time period between two consecutive runs (seconds)
323 @param timeout: set timeout for a running task
324 @param enabled: enabled flag for the scheduled task
325 @param group_name: group_name for the scheduled task
326 @param ignore_duplicate: disable or enable duplicate checking
327 @param sync_output: sync output every n seconds (0 = disable sync)
328 """
329
330 if args is None:
331 args = []
332 if vars is None:
333 vars = {}
334
335 if not ignore_duplicate and self._duplicate_task_exists(task, args, vars):
336
337 current.log.warning("Duplicate Task, Not Inserted", value=task)
338 return False
339
340 kwargs = {}
341
342 if function_name is None:
343 function_name = task
344
345
346 if start_time:
347 kwargs["start_time"] = start_time
348
349 if next_run_time:
350 kwargs["next_run_time"] = next_run_time
351 elif start_time:
352
353 kwargs["next_run_time"] = start_time
354
355 if stop_time:
356 kwargs["stop_time"] = stop_time
357 elif start_time:
358
359 if not isinstance(start_time, datetime.datetime):
360 start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
361 stop_time = start_time + datetime.timedelta(days=1)
362
363 if repeats is not None:
364 kwargs["repeats"] = repeats
365
366 if period:
367 kwargs["period"] = period
368
369 if timeout:
370 kwargs["timeout"] = timeout
371
372 if enabled != None:
373
374 kwargs["enabled"] = enabled
375
376 if group_name:
377 kwargs["group_name"] = group_name
378
379 if sync_output != 0:
380 kwargs["sync_output"] = sync_output
381
382 auth = current.auth
383 if auth.is_logged_in():
384
385 vars["user_id"] = auth.user.id
386
387
388
389 task_id = current.db.scheduler_task.insert(application_name = "%s/default" % \
390 current.request.application,
391 task_name = task,
392 function_name = function_name,
393 args = json.dumps(args),
394 vars = json.dumps(vars),
395 **kwargs)
396 return task_id
397
398
400 """
401 Checks if given task already exists in the Scheduler and both coincide
402 with their execution time
403
404 @param task: name of the task function
405 @param args: the job position arguments (list)
406 @param vars: the job named arguments (dict)
407 """
408
409 db = current.db
410 ttable = db.scheduler_task
411
412 _args = json.dumps(args)
413
414 query = ((ttable.function_name == task) & \
415 (ttable.args == _args) & \
416 (ttable.status.belongs(["RUNNING", "QUEUED", "ALLOCATED"])))
417 jobs = db(query).select(ttable.vars)
418 for job in jobs:
419 job_vars = json.loads(job.vars)
420 if job_vars == vars:
421 return True
422 return False
423
424
426 """
427 Returns True if there is at least 1 active worker to run scheduled tasks
428 - run from the main request
429
430 NB Can't run this 1/request at the beginning since the tables
431 only get defined in zz_last
432 """
433
434
435
436
437
438
439 db = current.db
440 cache = current.response.s3.cache
441 now = datetime.datetime.now()
442
443 offset = datetime.timedelta(minutes=1)
444 table = db.scheduler_worker
445 query = (table.last_heartbeat > (now - offset))
446 worker_alive = db(query).select(table.id,
447 limitby=(0, 1),
448 cache=cache).first()
449 if worker_alive:
450 return True
451 else:
452 return False
453
454
455 @staticmethod
457 """
458 Reset the status of a task to QUEUED after FAILED
459
460 @param task_id: the task record ID
461 """
462
463 db = current.db
464 ttable = db.scheduler_task
465
466 query = (ttable.id == task_id) & (ttable.status == "FAILED")
467 task = db(query).select(ttable.id,
468 limitby=(0, 1)).first()
469 if task:
470 task.update_record(status="QUEUED")
471
472
473
474
476 """
477 Activate the authentication passed from the caller to this new request
478 - run from within the task
479
480 NB This is so simple that we don't normally run via this API
481 - this is just kept as an example of what needs to happen within the task
482 """
483
484 current.auth.s3_impersonate(user_id)
485
486
487