| Home | Trees | Indices | Help |
|
|---|
|
|
1 # -*- coding: utf-8 -*-
2
3 """ Asynchronous Task Execution
4 - falls back to Synchronous if no workers are alive
5
6 To run a worker node: python web2py.py -K eden
7 or use UWSGI's 'Mule'
8 or use nssm on Win32: http://web2py.com/books/default/chapter/29/13/deployment-recipes#Using-nssm-to-run-as-a-Windows-service
9
10 NB
11 Need WEB2PY_PATH environment variable to be defined (e.g. /etc/profile)
12 Tasks need to be defined outside conditional model loads (e.g. models/tasks.py)
13 Avoid passing state into the async call as state may change before the message is executed (race condition)
14
15 Old screencast: http://www.vimeo.com/27478796
16
17 @requires: U{B{I{gluon}} <http://web2py.com>}
18
19 @copyright: 2011-2019 (c) Sahana Software Foundation
20 @license: MIT
21
22 Permission is hereby granted, free of charge, to any person
23 obtaining a copy of this software and associated documentation
24 files (the "Software"), to deal in the Software without
25 restriction, including without limitation the rights to use,
26 copy, modify, merge, publish, distribute, sublicense, and/or sell
27 copies of the Software, and to permit persons to whom the
28 Software is furnished to do so, subject to the following
29 conditions:
30
31 The above copyright notice and this permission notice shall be
32 included in all copies or substantial portions of the Software.
33
34 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
35 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
36 OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
37 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
38 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
39 WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
40 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
41 OTHER DEALINGS IN THE SOFTWARE.
42 """
43
44 __all__ = ("S3Task",)
45
46 import datetime
47 import json
48
49 from gluon import current, HTTP, IS_EMPTY_OR
50 from gluon.storage import Storage
51
52 from s3datetime import S3DateTime
53 from s3validators import IS_TIME_INTERVAL_WIDGET, IS_UTC_DATETIME
54 from s3widgets import S3CalendarWidget, S3TimeIntervalWidget
55
56 # -----------------------------------------------------------------------------
57 -class S3Task(object):
58 """ Asynchronous Task Execution """
59
60 TASK_TABLENAME = "scheduler_task"
61
62 # -------------------------------------------------------------------------
64
65 migrate = current.deployment_settings.get_base_migrate()
66 tasks = current.response.s3.tasks
67
68 # Instantiate Scheduler
69 try:
70 from gluon.scheduler import Scheduler
71 except:
72 # Warning should already have been given by eden_update_check.py
73 self.scheduler = None
74 else:
75 self.scheduler = Scheduler(current.db,
76 tasks,
77 migrate=migrate)
78
79 # -------------------------------------------------------------------------
80 - def configure_tasktable_crud(self,
81 task=None,
82 function=None,
83 args=None,
84 vars=None,
85 period = 3600, # seconds, so 1 hour
86 ):
87 """
88 Configure the task table for interactive CRUD,
89 setting defaults, widgets and hiding unnecessary fields
90
91 @param task: the task name (will use a UUID if omitted)
92 @param function: the function name (won't hide if omitted)
93 @param args: the function position arguments
94 @param vars: the function named arguments
95 """
96
97 if args is None:
98 args = []
99 if vars is None:
100 vars = {}
101
102 T = current.T
103 NONE = current.messages["NONE"]
104 UNLIMITED = T("unlimited")
105
106 tablename = self.TASK_TABLENAME
107 table = current.db[tablename]
108
109 table.uuid.readable = table.uuid.writable = False
110
111 table.prevent_drift.readable = table.prevent_drift.writable = False
112
113 table.sync_output.readable = table.sync_output.writable = False
114
115 table.times_failed.readable = False
116
117 # Configure start/stop time fields
118 for fn in ("start_time", "stop_time"):
119 field = table[fn]
120 field.represent = lambda dt: \
121 S3DateTime.datetime_represent(dt, utc=True)
122 field.requires = IS_UTC_DATETIME()
123 set_min = set_max = None
124 if fn == "start_time":
125 set_min = "#scheduler_task_stop_time"
126 elif fn == "stop_time":
127 set_max = "#scheduler_task_start_time"
128 field.widget = S3CalendarWidget(past = 0,
129 set_min = set_min,
130 set_max = set_max,
131 timepicker = True,
132 )
133
134 if not task:
135 import uuid
136 task = str(uuid.uuid4())
137 field = table.task_name
138 field.default = task
139 field.readable = False
140 field.writable = False
141
142 if function:
143 field = table.function_name
144 field.default = function
145 field.readable = False
146 field.writable = False
147
148 field = table.args
149 field.default = json.dumps(args)
150 field.readable = False
151 field.writable = False
152
153 field = table.repeats
154 field.label = T("Repeat")
155 field.comment = T("times (0 = unlimited)")
156 field.default = 0
157 field.represent = lambda opt: \
158 opt and "%s %s" % (opt, T("times")) or \
159 opt == 0 and UNLIMITED or \
160 NONE
161
162 field = table.period
163 field.label = T("Run every")
164 field.default = period
165 field.widget = S3TimeIntervalWidget.widget
166 field.requires = IS_TIME_INTERVAL_WIDGET(table.period)
167 field.represent = S3TimeIntervalWidget.represent
168 field.comment = T("seconds")
169
170 table.timeout.default = 600
171 table.timeout.represent = lambda opt: \
172 opt and "%s %s" % (opt, T("seconds")) or \
173 opt == 0 and UNLIMITED or \
174 NONE
175
176 field = table.vars
177 field.default = json.dumps(vars)
178 field.readable = field.writable = False
179
180 # Always use "default" controller (web2py uses current controller),
181 # otherwise the anonymous worker does not pass the controller
182 # permission check and gets redirected to login before it reaches
183 # the task function which does the s3_impersonate
184 field = table.application_name
185 field.default = "%s/default" % current.request.application
186 field.readable = field.writable = False
187 table.group_name.readable = table.group_name.writable = False
188 table.status.readable = table.status.writable = False
189 table.next_run_time.readable = table.next_run_time.writable = False
190 table.times_run.readable = table.times_run.writable = False
191 table.assigned_worker_name.readable = \
192 table.assigned_worker_name.writable = False
193
194 current.s3db.configure(tablename,
195 list_fields = ["id",
196 "enabled",
197 "start_time",
198 "repeats",
199 "period",
200 (T("Last run"), "last_run_time"),
201 (T("Last status"), "status"),
202 (T("Next run"), "next_run_time"),
203 "stop_time"
204 ],
205 )
206
207 response = current.response
208 if response:
209 response.s3.crud_strings[tablename] = Storage(
210 label_create = T("Create Job"),
211 title_display = T("Scheduled Jobs"),
212 title_list = T("Job Schedule"),
213 title_update = T("Edit Job"),
214 label_list_button = T("List Jobs"),
215 msg_record_created = T("Job added"),
216 msg_record_modified = T("Job updated"),
217 msg_record_deleted = T("Job deleted"),
218 msg_list_empty = T("No jobs configured yet"),
219 msg_no_match = T("No jobs configured"))
220
221 return
222
223 # -------------------------------------------------------------------------
224 # API Function run within the main flow of the application
225 # -------------------------------------------------------------------------
227 """
228 Wrapper to call an asynchronous task.
229 - run from the main request
230
231 @param task: The function which should be run
232 - async if a worker is alive
233 @param args: The list of unnamed args to send to the function
234 @param vars: The list of named vars to send to the function
235 @param timeout: The length of time available for the task to complete
236 - default 300s (5 mins)
237 """
238
239 if args is None:
240 args = []
241 if vars is None:
242 vars = {}
243
244 # Check that task is defined
245 tasks = current.response.s3.tasks
246 if not tasks:
247 return False
248 if task not in tasks:
249 return False
250
251 # Check that worker is alive
252 if not self._is_alive():
253 # Run the task synchronously
254 _args = []
255 for arg in args:
256 if isinstance(arg, (int, long, float)):
257 _args.append(str(arg))
258 elif isinstance(arg, basestring):
259 _args.append("%s" % str(json.dumps(arg)))
260 else:
261 error = "Unhandled arg type: %s" % arg
262 current.log.error(error)
263 raise HTTP(501, error)
264 args = ",".join(_args)
265 _vars = ",".join(["%s=%s" % (str(var),
266 str(vars[var])) for var in vars])
267 if args:
268 statement = "tasks['%s'](%s,%s)" % (task, args, _vars)
269 else:
270 statement = "tasks['%s'](%s)" % (task, _vars)
271 # Handle JSON
272 null = None
273 exec(statement)
274 return None
275
276 auth = current.auth
277 if auth.is_logged_in():
278 # Add the current user to the vars
279 vars["user_id"] = auth.user.id
280
281 # Run the task asynchronously
282 # @ToDo: Switch to API: self.scheduler.queue_task()
283 task_id = current.db.scheduler_task.insert(application_name = "%s/default" % \
284 current.request.application,
285 task_name = task,
286 function_name = task,
287 args = json.dumps(args),
288 vars = json.dumps(vars),
289 timeout = timeout,
290 )
291
292 # Return task_id so that status can be polled
293 return task_id
294
295 # -------------------------------------------------------------------------
296 - def schedule_task(self,
297 task,
298 args=None, # args to pass to the task
299 vars=None, # vars to pass to the task
300 function_name=None,
301 start_time=None,
302 next_run_time=None,
303 stop_time=None,
304 repeats=None,
305 period=None,
306 timeout=None,
307 enabled=None, # None = Enabled
308 group_name=None,
309 ignore_duplicate=False,
310 sync_output=0):
311 """
312 Schedule a task in web2py Scheduler
313
314 @param task: name of the function/task to be scheduled
315 @param args: args to be passed to the scheduled task
316 @param vars: vars to be passed to the scheduled task
317 @param function_name: function name (if different from task name)
318 @param start_time: start_time for the scheduled task
319 @param next_run_time: next_run_time for the the scheduled task
320 @param stop_time: stop_time for the the scheduled task
321 @param repeats: number of times the task to be repeated (0=unlimited)
322 @param period: time period between two consecutive runs (seconds)
323 @param timeout: set timeout for a running task
324 @param enabled: enabled flag for the scheduled task
325 @param group_name: group_name for the scheduled task
326 @param ignore_duplicate: disable or enable duplicate checking
327 @param sync_output: sync output every n seconds (0 = disable sync)
328 """
329
330 if args is None:
331 args = []
332 if vars is None:
333 vars = {}
334
335 if not ignore_duplicate and self._duplicate_task_exists(task, args, vars):
336 # if duplicate task exists, do not insert a new one
337 current.log.warning("Duplicate Task, Not Inserted", value=task)
338 return False
339
340 kwargs = {}
341
342 if function_name is None:
343 function_name = task
344
345 # storing valid keyword arguments only if they are provided
346 if start_time:
347 kwargs["start_time"] = start_time
348
349 if next_run_time:
350 kwargs["next_run_time"] = next_run_time
351 elif start_time:
352 # default it to start_time
353 kwargs["next_run_time"] = start_time
354
355 if stop_time:
356 kwargs["stop_time"] = stop_time
357 elif start_time:
358 # default it to one day ahead of given start_time
359 if not isinstance(start_time, datetime.datetime):
360 start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")
361 stop_time = start_time + datetime.timedelta(days=1)
362
363 if repeats is not None:
364 kwargs["repeats"] = repeats
365
366 if period:
367 kwargs["period"] = period
368
369 if timeout:
370 kwargs["timeout"] = timeout
371
372 if enabled != None:
373 # NB None => enabled
374 kwargs["enabled"] = enabled
375
376 if group_name:
377 kwargs["group_name"] = group_name
378
379 if sync_output != 0:
380 kwargs["sync_output"] = sync_output
381
382 auth = current.auth
383 if auth.is_logged_in():
384 # Add the current user to the vars
385 vars["user_id"] = auth.user.id
386
387 # Add to DB for pickup by Scheduler task
388 # @ToDo: Switch to API: self.scheduler.queue_task()
389 task_id = current.db.scheduler_task.insert(application_name = "%s/default" % \
390 current.request.application,
391 task_name = task,
392 function_name = function_name,
393 args = json.dumps(args),
394 vars = json.dumps(vars),
395 **kwargs)
396 return task_id
397
398 # -------------------------------------------------------------------------
400 """
401 Checks if given task already exists in the Scheduler and both coincide
402 with their execution time
403
404 @param task: name of the task function
405 @param args: the job position arguments (list)
406 @param vars: the job named arguments (dict)
407 """
408
409 db = current.db
410 ttable = db.scheduler_task
411
412 _args = json.dumps(args)
413
414 query = ((ttable.function_name == task) & \
415 (ttable.args == _args) & \
416 (ttable.status.belongs(["RUNNING", "QUEUED", "ALLOCATED"])))
417 jobs = db(query).select(ttable.vars)
418 for job in jobs:
419 job_vars = json.loads(job.vars)
420 if job_vars == vars:
421 return True
422 return False
423
424 # -------------------------------------------------------------------------
426 """
427 Returns True if there is at least 1 active worker to run scheduled tasks
428 - run from the main request
429
430 NB Can't run this 1/request at the beginning since the tables
431 only get defined in zz_last
432 """
433
434 #if self.scheduler:
435 # return self.scheduler.is_alive()
436 #else:
437 # return False
438
439 db = current.db
440 cache = current.response.s3.cache
441 now = datetime.datetime.now()
442
443 offset = datetime.timedelta(minutes=1)
444 table = db.scheduler_worker
445 query = (table.last_heartbeat > (now - offset))
446 worker_alive = db(query).select(table.id,
447 limitby=(0, 1),
448 cache=cache).first()
449 if worker_alive:
450 return True
451 else:
452 return False
453
454 # -------------------------------------------------------------------------
455 @staticmethod
457 """
458 Reset the status of a task to QUEUED after FAILED
459
460 @param task_id: the task record ID
461 """
462
463 db = current.db
464 ttable = db.scheduler_task
465
466 query = (ttable.id == task_id) & (ttable.status == "FAILED")
467 task = db(query).select(ttable.id,
468 limitby=(0, 1)).first()
469 if task:
470 task.update_record(status="QUEUED")
471
472 # =========================================================================
473 # Functions run within the Task itself
474 # =========================================================================
476 """
477 Activate the authentication passed from the caller to this new request
478 - run from within the task
479
480 NB This is so simple that we don't normally run via this API
481 - this is just kept as an example of what needs to happen within the task
482 """
483
484 current.auth.s3_impersonate(user_id)
485
486 # END =========================================================================
487
| Home | Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0.1 on Fri Mar 15 08:51:55 2019 | http://epydoc.sourceforge.net |