Package s3 :: Module s3task
[frames] | no frames]

Source Code for Module s3.s3task

  1  # -*- coding: utf-8 -*- 
  2   
  3  """ Asynchronous Task Execution 
  4      - falls back to Synchronous if no workers are alive 
  5   
  6      To run a worker node: python web2py.py -K eden 
  7      or use UWSGI's 'Mule' 
  8      or use nssm on Win32: http://web2py.com/books/default/chapter/29/13/deployment-recipes#Using-nssm-to-run-as-a-Windows-service 
  9   
 10      NB 
 11          Need WEB2PY_PATH environment variable to be defined (e.g. /etc/profile) 
 12          Tasks need to be defined outside conditional model loads (e.g. models/tasks.py) 
 13          Avoid passing state into the async call as state may change before the message is executed (race condition) 
 14   
 15      Old screencast: http://www.vimeo.com/27478796 
 16   
 17      @requires: U{B{I{gluon}} <http://web2py.com>} 
 18   
 19      @copyright: 2011-2019 (c) Sahana Software Foundation 
 20      @license: MIT 
 21   
 22      Permission is hereby granted, free of charge, to any person 
 23      obtaining a copy of this software and associated documentation 
 24      files (the "Software"), to deal in the Software without 
 25      restriction, including without limitation the rights to use, 
 26      copy, modify, merge, publish, distribute, sublicense, and/or sell 
 27      copies of the Software, and to permit persons to whom the 
 28      Software is furnished to do so, subject to the following 
 29      conditions: 
 30   
 31      The above copyright notice and this permission notice shall be 
 32      included in all copies or substantial portions of the Software. 
 33   
 34      THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 
 35      EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 
 36      OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 
 37      NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 
 38      HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
 39      WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
 40      FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 
 41      OTHER DEALINGS IN THE SOFTWARE. 
 42  """ 
 43   
 44  __all__ = ("S3Task",) 
 45   
 46  import datetime 
 47  import json 
 48   
 49  from gluon import current, HTTP, IS_EMPTY_OR 
 50  from gluon.storage import Storage 
 51   
 52  from s3datetime import S3DateTime 
 53  from s3validators import IS_TIME_INTERVAL_WIDGET, IS_UTC_DATETIME 
 54  from s3widgets import S3CalendarWidget, S3TimeIntervalWidget 
55 56 # ----------------------------------------------------------------------------- 57 -class S3Task(object):
58 """ Asynchronous Task Execution """ 59 60 TASK_TABLENAME = "scheduler_task" 61 62 # -------------------------------------------------------------------------
63 - def __init__(self):
64 65 migrate = current.deployment_settings.get_base_migrate() 66 tasks = current.response.s3.tasks 67 68 # Instantiate Scheduler 69 try: 70 from gluon.scheduler import Scheduler 71 except: 72 # Warning should already have been given by eden_update_check.py 73 self.scheduler = None 74 else: 75 self.scheduler = Scheduler(current.db, 76 tasks, 77 migrate=migrate)
78 79 # -------------------------------------------------------------------------
80 - def configure_tasktable_crud(self, 81 task=None, 82 function=None, 83 args=None, 84 vars=None, 85 period = 3600, # seconds, so 1 hour 86 ):
87 """ 88 Configure the task table for interactive CRUD, 89 setting defaults, widgets and hiding unnecessary fields 90 91 @param task: the task name (will use a UUID if omitted) 92 @param function: the function name (won't hide if omitted) 93 @param args: the function position arguments 94 @param vars: the function named arguments 95 """ 96 97 if args is None: 98 args = [] 99 if vars is None: 100 vars = {} 101 102 T = current.T 103 NONE = current.messages["NONE"] 104 UNLIMITED = T("unlimited") 105 106 tablename = self.TASK_TABLENAME 107 table = current.db[tablename] 108 109 table.uuid.readable = table.uuid.writable = False 110 111 table.prevent_drift.readable = table.prevent_drift.writable = False 112 113 table.sync_output.readable = table.sync_output.writable = False 114 115 table.times_failed.readable = False 116 117 # Configure start/stop time fields 118 for fn in ("start_time", "stop_time"): 119 field = table[fn] 120 field.represent = lambda dt: \ 121 S3DateTime.datetime_represent(dt, utc=True) 122 field.requires = IS_UTC_DATETIME() 123 set_min = set_max = None 124 if fn == "start_time": 125 set_min = "#scheduler_task_stop_time" 126 elif fn == "stop_time": 127 set_max = "#scheduler_task_start_time" 128 field.widget = S3CalendarWidget(past = 0, 129 set_min = set_min, 130 set_max = set_max, 131 timepicker = True, 132 ) 133 134 if not task: 135 import uuid 136 task = str(uuid.uuid4()) 137 field = table.task_name 138 field.default = task 139 field.readable = False 140 field.writable = False 141 142 if function: 143 field = table.function_name 144 field.default = function 145 field.readable = False 146 field.writable = False 147 148 field = table.args 149 field.default = json.dumps(args) 150 field.readable = False 151 field.writable = False 152 153 field = table.repeats 154 field.label = T("Repeat") 155 field.comment = T("times (0 = unlimited)") 156 field.default = 0 157 field.represent = lambda opt: \ 158 opt and "%s %s" % (opt, T("times")) or \ 159 opt == 0 and UNLIMITED or \ 160 NONE 161 162 field = table.period 163 field.label = T("Run every") 164 field.default = period 165 field.widget = S3TimeIntervalWidget.widget 166 field.requires = IS_TIME_INTERVAL_WIDGET(table.period) 167 field.represent = S3TimeIntervalWidget.represent 168 field.comment = T("seconds") 169 170 table.timeout.default = 600 171 table.timeout.represent = lambda opt: \ 172 opt and "%s %s" % (opt, T("seconds")) or \ 173 opt == 0 and UNLIMITED or \ 174 NONE 175 176 field = table.vars 177 field.default = json.dumps(vars) 178 field.readable = field.writable = False 179 180 # Always use "default" controller (web2py uses current controller), 181 # otherwise the anonymous worker does not pass the controller 182 # permission check and gets redirected to login before it reaches 183 # the task function which does the s3_impersonate 184 field = table.application_name 185 field.default = "%s/default" % current.request.application 186 field.readable = field.writable = False 187 table.group_name.readable = table.group_name.writable = False 188 table.status.readable = table.status.writable = False 189 table.next_run_time.readable = table.next_run_time.writable = False 190 table.times_run.readable = table.times_run.writable = False 191 table.assigned_worker_name.readable = \ 192 table.assigned_worker_name.writable = False 193 194 current.s3db.configure(tablename, 195 list_fields = ["id", 196 "enabled", 197 "start_time", 198 "repeats", 199 "period", 200 (T("Last run"), "last_run_time"), 201 (T("Last status"), "status"), 202 (T("Next run"), "next_run_time"), 203 "stop_time" 204 ], 205 ) 206 207 response = current.response 208 if response: 209 response.s3.crud_strings[tablename] = Storage( 210 label_create = T("Create Job"), 211 title_display = T("Scheduled Jobs"), 212 title_list = T("Job Schedule"), 213 title_update = T("Edit Job"), 214 label_list_button = T("List Jobs"), 215 msg_record_created = T("Job added"), 216 msg_record_modified = T("Job updated"), 217 msg_record_deleted = T("Job deleted"), 218 msg_list_empty = T("No jobs configured yet"), 219 msg_no_match = T("No jobs configured")) 220 221 return
222 223 # ------------------------------------------------------------------------- 224 # API Function run within the main flow of the application 225 # -------------------------------------------------------------------------
226 - def async(self, task, args=None, vars=None, timeout=300):
227 """ 228 Wrapper to call an asynchronous task. 229 - run from the main request 230 231 @param task: The function which should be run 232 - async if a worker is alive 233 @param args: The list of unnamed args to send to the function 234 @param vars: The list of named vars to send to the function 235 @param timeout: The length of time available for the task to complete 236 - default 300s (5 mins) 237 """ 238 239 if args is None: 240 args = [] 241 if vars is None: 242 vars = {} 243 244 # Check that task is defined 245 tasks = current.response.s3.tasks 246 if not tasks: 247 return False 248 if task not in tasks: 249 return False 250 251 # Check that worker is alive 252 if not self._is_alive(): 253 # Run the task synchronously 254 _args = [] 255 for arg in args: 256 if isinstance(arg, (int, long, float)): 257 _args.append(str(arg)) 258 elif isinstance(arg, basestring): 259 _args.append("%s" % str(json.dumps(arg))) 260 else: 261 error = "Unhandled arg type: %s" % arg 262 current.log.error(error) 263 raise HTTP(501, error) 264 args = ",".join(_args) 265 _vars = ",".join(["%s=%s" % (str(var), 266 str(vars[var])) for var in vars]) 267 if args: 268 statement = "tasks['%s'](%s,%s)" % (task, args, _vars) 269 else: 270 statement = "tasks['%s'](%s)" % (task, _vars) 271 # Handle JSON 272 null = None 273 exec(statement) 274 return None 275 276 auth = current.auth 277 if auth.is_logged_in(): 278 # Add the current user to the vars 279 vars["user_id"] = auth.user.id 280 281 # Run the task asynchronously 282 # @ToDo: Switch to API: self.scheduler.queue_task() 283 task_id = current.db.scheduler_task.insert(application_name = "%s/default" % \ 284 current.request.application, 285 task_name = task, 286 function_name = task, 287 args = json.dumps(args), 288 vars = json.dumps(vars), 289 timeout = timeout, 290 ) 291 292 # Return task_id so that status can be polled 293 return task_id
294 295 # -------------------------------------------------------------------------
296 - def schedule_task(self, 297 task, 298 args=None, # args to pass to the task 299 vars=None, # vars to pass to the task 300 function_name=None, 301 start_time=None, 302 next_run_time=None, 303 stop_time=None, 304 repeats=None, 305 period=None, 306 timeout=None, 307 enabled=None, # None = Enabled 308 group_name=None, 309 ignore_duplicate=False, 310 sync_output=0):
311 """ 312 Schedule a task in web2py Scheduler 313 314 @param task: name of the function/task to be scheduled 315 @param args: args to be passed to the scheduled task 316 @param vars: vars to be passed to the scheduled task 317 @param function_name: function name (if different from task name) 318 @param start_time: start_time for the scheduled task 319 @param next_run_time: next_run_time for the the scheduled task 320 @param stop_time: stop_time for the the scheduled task 321 @param repeats: number of times the task to be repeated (0=unlimited) 322 @param period: time period between two consecutive runs (seconds) 323 @param timeout: set timeout for a running task 324 @param enabled: enabled flag for the scheduled task 325 @param group_name: group_name for the scheduled task 326 @param ignore_duplicate: disable or enable duplicate checking 327 @param sync_output: sync output every n seconds (0 = disable sync) 328 """ 329 330 if args is None: 331 args = [] 332 if vars is None: 333 vars = {} 334 335 if not ignore_duplicate and self._duplicate_task_exists(task, args, vars): 336 # if duplicate task exists, do not insert a new one 337 current.log.warning("Duplicate Task, Not Inserted", value=task) 338 return False 339 340 kwargs = {} 341 342 if function_name is None: 343 function_name = task 344 345 # storing valid keyword arguments only if they are provided 346 if start_time: 347 kwargs["start_time"] = start_time 348 349 if next_run_time: 350 kwargs["next_run_time"] = next_run_time 351 elif start_time: 352 # default it to start_time 353 kwargs["next_run_time"] = start_time 354 355 if stop_time: 356 kwargs["stop_time"] = stop_time 357 elif start_time: 358 # default it to one day ahead of given start_time 359 if not isinstance(start_time, datetime.datetime): 360 start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S") 361 stop_time = start_time + datetime.timedelta(days=1) 362 363 if repeats is not None: 364 kwargs["repeats"] = repeats 365 366 if period: 367 kwargs["period"] = period 368 369 if timeout: 370 kwargs["timeout"] = timeout 371 372 if enabled != None: 373 # NB None => enabled 374 kwargs["enabled"] = enabled 375 376 if group_name: 377 kwargs["group_name"] = group_name 378 379 if sync_output != 0: 380 kwargs["sync_output"] = sync_output 381 382 auth = current.auth 383 if auth.is_logged_in(): 384 # Add the current user to the vars 385 vars["user_id"] = auth.user.id 386 387 # Add to DB for pickup by Scheduler task 388 # @ToDo: Switch to API: self.scheduler.queue_task() 389 task_id = current.db.scheduler_task.insert(application_name = "%s/default" % \ 390 current.request.application, 391 task_name = task, 392 function_name = function_name, 393 args = json.dumps(args), 394 vars = json.dumps(vars), 395 **kwargs) 396 return task_id
397 398 # -------------------------------------------------------------------------
399 - def _duplicate_task_exists(self, task, args, vars):
400 """ 401 Checks if given task already exists in the Scheduler and both coincide 402 with their execution time 403 404 @param task: name of the task function 405 @param args: the job position arguments (list) 406 @param vars: the job named arguments (dict) 407 """ 408 409 db = current.db 410 ttable = db.scheduler_task 411 412 _args = json.dumps(args) 413 414 query = ((ttable.function_name == task) & \ 415 (ttable.args == _args) & \ 416 (ttable.status.belongs(["RUNNING", "QUEUED", "ALLOCATED"]))) 417 jobs = db(query).select(ttable.vars) 418 for job in jobs: 419 job_vars = json.loads(job.vars) 420 if job_vars == vars: 421 return True 422 return False
423 424 # -------------------------------------------------------------------------
425 - def _is_alive(self):
426 """ 427 Returns True if there is at least 1 active worker to run scheduled tasks 428 - run from the main request 429 430 NB Can't run this 1/request at the beginning since the tables 431 only get defined in zz_last 432 """ 433 434 #if self.scheduler: 435 # return self.scheduler.is_alive() 436 #else: 437 # return False 438 439 db = current.db 440 cache = current.response.s3.cache 441 now = datetime.datetime.now() 442 443 offset = datetime.timedelta(minutes=1) 444 table = db.scheduler_worker 445 query = (table.last_heartbeat > (now - offset)) 446 worker_alive = db(query).select(table.id, 447 limitby=(0, 1), 448 cache=cache).first() 449 if worker_alive: 450 return True 451 else: 452 return False
453 454 # ------------------------------------------------------------------------- 455 @staticmethod
456 - def reset(task_id):
457 """ 458 Reset the status of a task to QUEUED after FAILED 459 460 @param task_id: the task record ID 461 """ 462 463 db = current.db 464 ttable = db.scheduler_task 465 466 query = (ttable.id == task_id) & (ttable.status == "FAILED") 467 task = db(query).select(ttable.id, 468 limitby=(0, 1)).first() 469 if task: 470 task.update_record(status="QUEUED")
471 472 # ========================================================================= 473 # Functions run within the Task itself 474 # =========================================================================
475 - def authenticate(self, user_id):
476 """ 477 Activate the authentication passed from the caller to this new request 478 - run from within the task 479 480 NB This is so simple that we don't normally run via this API 481 - this is just kept as an example of what needs to happen within the task 482 """ 483 484 current.auth.s3_impersonate(user_id)
485 486 # END ========================================================================= 487