Package s3 :: Module s3notify
[frames] | no frames]

Source Code for Module s3.s3notify

  1  # -*- coding: utf-8 -*- 
  2   
  3  """ S3 Notifications 
  4   
  5      @copyright: 2011-2019 (c) Sahana Software Foundation 
  6      @license: MIT 
  7   
  8      Permission is hereby granted, free of charge, to any person 
  9      obtaining a copy of this software and associated documentation 
 10      files (the "Software"), to deal in the Software without 
 11      restriction, including without limitation the rights to use, 
 12      copy, modify, merge, publish, distribute, sublicense, and/or sell 
 13      copies of the Software, and to permit persons to whom the 
 14      Software is furnished to do so, subject to the following 
 15      conditions: 
 16   
 17      The above copyright notice and this permission notice shall be 
 18      included in all copies or substantial portions of the Software. 
 19   
 20      THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 
 21      EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 
 22      OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 
 23      NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 
 24      HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
 25      WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
 26      FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 
 27      OTHER DEALINGS IN THE SOFTWARE. 
 28  """ 
 29   
 30  import datetime 
 31  import json 
 32  import os 
 33  import string 
 34  import sys 
 35  import urlparse 
 36  import urllib2 
 37  from urllib import urlencode 
 38  from uuid import uuid4 
 39   
 40  try: 
 41      from cStringIO import StringIO # Faster, where available 
 42  except: 
 43      from StringIO import StringIO 
 44   
 45  from gluon import current, TABLE, THEAD, TBODY, TR, TD, TH, XML 
 46   
 47  from s3datetime import s3_decode_iso_datetime, s3_encode_iso_datetime, s3_utc 
 48  from s3utils import s3_str, s3_truncate, s3_unicode 
49 50 # ============================================================================= 51 -class S3Notifications(object):
52 """ Framework to send notifications about subscribed events """ 53 54 # ------------------------------------------------------------------------- 55 @classmethod
56 - def check_subscriptions(cls):
57 """ 58 Scheduler entry point, creates notification tasks for all 59 active subscriptions which (may) have updates. 60 """ 61 62 _debug = current.log.debug 63 now = datetime.datetime.utcnow() 64 65 _debug("S3Notifications.check_subscriptions(now=%s)" % now) 66 67 subscriptions = cls._subscriptions(now) 68 if subscriptions: 69 run_async = current.s3task.async 70 for row in subscriptions: 71 # Create asynchronous notification task. 72 row.update_record(locked=True) 73 run_async("notify_notify", args=[row.id]) 74 message = "%s notifications scheduled." % len(subscriptions) 75 else: 76 message = "No notifications to schedule." 77 78 _debug(message) 79 return message
80 81 # ------------------------------------------------------------------------- 82 @classmethod
83 - def notify(cls, resource_id):
84 """ 85 Asynchronous task to notify a subscriber about updates, 86 runs a POST?format=msg request against the subscribed 87 controller which extracts the data and renders and sends 88 the notification message (see send()). 89 90 @param resource_id: the pr_subscription_resource record ID 91 """ 92 93 _debug = current.log.debug 94 _debug("S3Notifications.notify(resource_id=%s)" % resource_id) 95 96 db = current.db 97 s3db = current.s3db 98 99 stable = s3db.pr_subscription 100 rtable = db.pr_subscription_resource 101 ftable = s3db.pr_filter 102 103 # Extract the subscription data 104 join = stable.on(rtable.subscription_id == stable.id) 105 left = ftable.on(ftable.id == stable.filter_id) 106 107 # @todo: should not need rtable.resource here 108 row = db(rtable.id == resource_id).select(stable.id, 109 stable.pe_id, 110 stable.frequency, 111 stable.notify_on, 112 stable.method, 113 stable.email_format, 114 stable.attachment, 115 rtable.id, 116 rtable.resource, 117 rtable.url, 118 rtable.last_check_time, 119 ftable.query, 120 join=join, 121 left=left).first() 122 if not row: 123 return True 124 125 s = getattr(row, "pr_subscription") 126 r = getattr(row, "pr_subscription_resource") 127 f = getattr(row, "pr_filter") 128 129 # Create a temporary token to authorize the lookup request 130 auth_token = str(uuid4()) 131 132 # Store the auth_token in the subscription record 133 r.update_record(auth_token=auth_token) 134 db.commit() 135 136 # Construct the send-URL 137 public_url = current.deployment_settings.get_base_public_url() 138 lookup_url = "%s/%s/%s" % (public_url, 139 current.request.application, 140 r.url.lstrip("/")) 141 142 # Break up the URL into its components 143 purl = list(urlparse.urlparse(lookup_url)) 144 145 # Subscription parameters 146 # Date (must ensure we pass to REST as tz-aware) 147 last_check_time = s3_encode_iso_datetime(r.last_check_time) 148 query = {"subscription": auth_token, "format": "msg"} 149 if "upd" in s.notify_on: 150 query["~.modified_on__ge"] = "%sZ" % last_check_time 151 else: 152 query["~.created_on__ge"] = "%sZ" % last_check_time 153 154 # Filters 155 if f.query: 156 from s3filter import S3FilterString 157 resource = s3db.resource(r.resource) 158 fstring = S3FilterString(resource, f.query) 159 for k, v in fstring.get_vars.iteritems(): 160 if v is not None: 161 if k in query: 162 value = query[k] 163 if type(value) is list: 164 value.append(v) 165 else: 166 query[k] = [value, v] 167 else: 168 query[k] = v 169 query_nice = s3_unicode(fstring.represent()) 170 else: 171 query_nice = None 172 173 # Add subscription parameters and filters to the URL query, and 174 # put the URL back together 175 query = urlencode(query) 176 if purl[4]: 177 query = "&".join((purl[4], query)) 178 page_url = urlparse.urlunparse([purl[0], # scheme 179 purl[1], # netloc 180 purl[2], # path 181 purl[3], # params 182 query, # query 183 purl[5], # fragment 184 ]) 185 186 # Serialize data for send (avoid second lookup in send) 187 data = json.dumps({"pe_id": s.pe_id, 188 "notify_on": s.notify_on, 189 "method": s.method, 190 "email_format": s.email_format, 191 "attachment": s.attachment, 192 "resource": r.resource, 193 "last_check_time": last_check_time, 194 "filter_query": query_nice, 195 "page_url": lookup_url, 196 "item_url": None, 197 }) 198 199 # Send the request 200 _debug("Requesting %s" % page_url) 201 req = urllib2.Request(page_url, data=data) 202 req.add_header("Content-Type", "application/json") 203 success = False 204 try: 205 response = json.loads(urllib2.urlopen(req).read()) 206 message = response["message"] 207 if response["status"] == "success": 208 success = True 209 except urllib2.HTTPError, e: 210 message = ("HTTP %s: %s" % (e.code, e.read())) 211 except: 212 exc_info = sys.exc_info()[:2] 213 message = ("%s: %s" % (exc_info[0].__name__, exc_info[1])) 214 _debug(message) 215 216 # Update time stamps and unlock, invalidate auth token 217 intervals = s3db.pr_subscription_check_intervals 218 interval = datetime.timedelta(minutes=intervals.get(s.frequency, 0)) 219 if success: 220 last_check_time = datetime.datetime.utcnow() 221 next_check_time = last_check_time + interval 222 r.update_record(auth_token=None, 223 locked=False, 224 last_check_time=last_check_time, 225 next_check_time=next_check_time) 226 else: 227 r.update_record(auth_token=None, 228 locked=False) 229 db.commit() 230 231 # Done 232 return message
233 234 # ------------------------------------------------------------------------- 235 @classmethod
236 - def send(cls, r, resource):
237 """ 238 Method to retrieve updates for a subscription, render the 239 notification message and send it - responds to POST?format=msg 240 requests to the respective resource. 241 242 @param r: the S3Request 243 @param resource: the S3Resource 244 """ 245 246 _debug = current.log.debug 247 _debug("S3Notifications.send()") 248 249 json_message = current.xml.json_message 250 251 # Read subscription data 252 source = r.body 253 source.seek(0) 254 data = source.read() 255 subscription = json.loads(data) 256 257 #_debug("Notify PE #%s by %s on %s of %s since %s" % \ 258 # (subscription["pe_id"], 259 # str(subscription["method"]), 260 # str(subscription["notify_on"]), 261 # subscription["resource"], 262 # subscription["last_check_time"], 263 # )) 264 265 # Check notification settings 266 notify_on = subscription["notify_on"] 267 methods = subscription["method"] 268 if not notify_on or not methods: 269 return json_message(message="No notifications configured " 270 "for this subscription") 271 272 # Authorization (pe_id must not be None) 273 pe_id = subscription["pe_id"] 274 275 if not pe_id: 276 r.unauthorised() 277 278 # Fields to extract 279 fields = resource.list_fields(key="notify_fields") 280 if "created_on" not in fields: 281 fields.append("created_on") 282 283 # Extract the data 284 data = resource.select(fields, 285 represent=True, 286 raw_data=True) 287 rows = data["rows"] 288 289 # How many records do we have? 290 numrows = len(rows) 291 if not numrows: 292 return json_message(message="No records found") 293 294 #_debug("%s rows:" % numrows) 295 296 # Prepare meta-data 297 get_config = resource.get_config 298 settings = current.deployment_settings 299 300 page_url = subscription["page_url"] 301 302 crud_strings = current.response.s3.crud_strings.get(resource.tablename) 303 if crud_strings: 304 resource_name = crud_strings.title_list 305 else: 306 resource_name = string.capwords(resource.name, "_") 307 308 last_check_time = s3_decode_iso_datetime(subscription["last_check_time"]) 309 310 email_format = subscription["email_format"] 311 if not email_format: 312 email_format = settings.get_msg_notify_email_format() 313 314 filter_query = subscription.get("filter_query") 315 316 meta_data = {"systemname": settings.get_system_name(), 317 "systemname_short": settings.get_system_name_short(), 318 "resource": resource_name, 319 "page_url": page_url, 320 "notify_on": notify_on, 321 "last_check_time": last_check_time, 322 "filter_query": filter_query, 323 "total_rows": numrows, 324 } 325 326 # Render contents for the message template(s) 327 renderer = get_config("notify_renderer") 328 if not renderer: 329 renderer = settings.get_msg_notify_renderer() 330 if not renderer: 331 renderer = cls._render 332 333 contents = {} 334 if email_format == "html" and "EMAIL" in methods: 335 contents["html"] = renderer(resource, data, meta_data, "html") 336 contents["default"] = contents["html"] 337 if email_format != "html" or "EMAIL" not in methods or len(methods) > 1: 338 contents["text"] = renderer(resource, data, meta_data, "text") 339 contents["default"] = contents["text"] 340 341 # Subject line 342 subject = get_config("notify_subject") 343 if not subject: 344 subject = settings.get_msg_notify_subject() 345 if callable(subject): 346 subject = subject(resource, data, meta_data) 347 348 from string import Template 349 subject = Template(subject).safe_substitute(S="%(systemname)s", 350 s="%(systemname_short)s", 351 r="%(resource)s") 352 subject = subject % meta_data 353 354 # Attachment 355 attachment = subscription.get("attachment", False) 356 document_ids = None 357 if attachment: 358 attachment_fnc = settings.get_msg_notify_attachment() 359 if attachment_fnc: 360 document_ids = attachment_fnc(resource, data, meta_data) 361 362 # **data for send_by_pe_id function in s3msg 363 send_data = {} 364 send_data_fnc = settings.get_msg_notify_send_data() 365 if callable(send_data_fnc): 366 send_data = send_data_fnc(resource, data, meta_data) 367 368 # Helper function to find message templates from a priority list 369 join = lambda *f: os.path.join(current.request.folder, *f) 370 def get_msg_template(path, filenames): 371 for fn in filenames: 372 filepath = join(path, fn) 373 if os.path.exists(filepath): 374 try: 375 return open(filepath, "rb") 376 except: 377 pass 378 return None
379 380 # Render and send the message(s) 381 templates = settings.get_template() 382 if templates != "default" and not isinstance(templates, (tuple, list)): 383 templates = (templates,) 384 prefix = resource.get_config("notify_template", "notify") 385 386 send = current.msg.send_by_pe_id 387 388 success = False 389 errors = [] 390 391 for method in methods: 392 393 error = None 394 395 # Get the message template 396 msg_template = None 397 filenames = ["%s_%s.html" % (prefix, method.lower())] 398 if method == "EMAIL" and email_format: 399 filenames.insert(0, "%s_email_%s.html" % (prefix, email_format)) 400 if templates != "default": 401 for template in templates[::-1]: 402 path = join("modules", "templates", template, "views", "msg") 403 msg_template = get_msg_template(path, filenames) 404 if msg_template is not None: 405 break 406 if msg_template is None: 407 path = join("views", "msg") 408 msg_template = get_msg_template(path, filenames) 409 if msg_template is None: 410 msg_template = StringIO(s3_str(current.T("New updates are available."))) 411 412 # Select contents format 413 if method == "EMAIL" and email_format == "html": 414 output = contents["html"] 415 else: 416 output = contents["text"] 417 418 # Render the message 419 try: 420 message = current.response.render(msg_template, output) 421 except: 422 exc_info = sys.exc_info()[:2] 423 error = ("%s: %s" % (exc_info[0].__name__, exc_info[1])) 424 errors.append(error) 425 continue 426 finally: 427 if hasattr(msg_template, "close"): 428 msg_template.close() 429 430 if not message: 431 continue 432 433 # Send the message 434 #_debug("Sending message per %s" % method) 435 #_debug(message) 436 try: 437 sent = send(pe_id, 438 # RFC 2822 439 subject=s3_truncate(subject, 78), 440 message=message, 441 contact_method=method, 442 system_generated=True, 443 document_ids=document_ids, 444 **send_data) 445 except: 446 exc_info = sys.exc_info()[:2] 447 error = ("%s: %s" % (exc_info[0].__name__, exc_info[1])) 448 sent = False 449 450 if sent: 451 # Successful if at least one notification went out 452 success = True 453 else: 454 if not error: 455 error = current.session.error 456 if isinstance(error, list): 457 error = "/".join(error) 458 if error: 459 errors.append(error) 460 461 # Done 462 if errors: 463 message = ", ".join(errors) 464 else: 465 message = "Success" 466 return json_message(success=success, 467 statuscode=200 if success else 403, 468 message=message)
469 470 # ------------------------------------------------------------------------- 471 @classmethod
472 - def _subscriptions(cls, now):
473 """ 474 Helper method to find all subscriptions which need to be 475 notified now. 476 477 @param now: current datetime (UTC) 478 @return: joined Rows pr_subscription/pr_subscription_resource, 479 or None if no due subscriptions could be found 480 481 @todo: take notify_on into account when checking 482 """ 483 484 db = current.db 485 s3db = current.s3db 486 487 stable = s3db.pr_subscription 488 rtable = db.pr_subscription_resource 489 490 # Find all resources with due subscriptions 491 next_check = rtable.next_check_time 492 locked_deleted = (rtable.locked != True) & \ 493 (rtable.deleted != True) 494 query = ((next_check == None) | 495 (next_check <= now)) & \ 496 locked_deleted 497 498 tname = rtable.resource 499 last_check = rtable.last_check_time 500 mtime = last_check.min() 501 rows = db(query).select(tname, 502 mtime, 503 groupby=tname) 504 505 if not rows: 506 return None 507 508 # Select those which have updates 509 resources = set() 510 radd = resources.add 511 for row in rows: 512 tablename = row[tname] 513 table = s3db.table(tablename) 514 if not table or not "modified_on" in table.fields: 515 # Can't notify updates in resources without modified_on 516 continue 517 modified_on = table.modified_on 518 msince = row[mtime] 519 if msince is None: 520 query = (table.id > 0) 521 else: 522 query = (modified_on >= msince) 523 update = db(query).select(modified_on, 524 orderby=~(modified_on), 525 limitby=(0, 1)).first() 526 if update: 527 radd((tablename, update.modified_on)) 528 529 if not resources: 530 return None 531 532 # Get all active subscriptions to these resources which 533 # may need to be notified now: 534 join = rtable.on((rtable.subscription_id == stable.id) & \ 535 locked_deleted) 536 query = None 537 for rname, modified_on in resources: 538 q = (tname == rname) & \ 539 ((last_check == None) | 540 (last_check <= modified_on)) 541 if query is None: 542 query = q 543 else: 544 query |= q 545 546 query = (stable.frequency != "never") & \ 547 (stable.deleted != True) & \ 548 ((next_check == None) | \ 549 (next_check <= now)) & \ 550 query 551 return db(query).select(rtable.id, join=join)
552 553 # ------------------------------------------------------------------------- 554 @classmethod
555 - def _render(cls, resource, data, meta_data, format=None):
556 """ 557 Method to pre-render the contents for the message template 558 559 @param resource: the S3Resource 560 @param data: the data returned from S3Resource.select 561 @param meta_data: the meta data for the notification 562 @param format: the contents format ("text" or "html") 563 """ 564 565 created_on_selector = resource.prefix_selector("created_on") 566 created_on_colname = None 567 notify_on = meta_data["notify_on"] 568 last_check_time = meta_data["last_check_time"] 569 rows = data["rows"] 570 rfields = data["rfields"] 571 output = {} 572 new, upd = [], [] 573 574 if format == "html": 575 # Pre-formatted HTML 576 colnames = [] 577 578 new_headers = TR() 579 mod_headers = TR() 580 for rfield in rfields: 581 if rfield.selector == created_on_selector: 582 created_on_colname = rfield.colname 583 elif rfield.ftype != "id": 584 colnames.append(rfield.colname) 585 label = rfield.label 586 new_headers.append(TH(label)) 587 mod_headers.append(TH(label)) 588 for row in rows: 589 append_record = upd.append 590 if created_on_colname: 591 try: 592 created_on = row["_row"][created_on_colname] 593 except (KeyError, AttributeError): 594 pass 595 else: 596 if s3_utc(created_on) >= last_check_time: 597 append_record = new.append 598 tr = TR([TD(XML(row[colname])) for colname in colnames]) 599 append_record(tr) 600 if "new" in notify_on and len(new): 601 output["new"] = len(new) 602 output["new_records"] = TABLE(THEAD(new_headers), TBODY(new)) 603 else: 604 output["new"] = None 605 if "upd" in notify_on and len(upd): 606 output["upd"] = len(upd) 607 output["upd_records"] = TABLE(THEAD(new_headers), TBODY(upd)) 608 else: 609 output["upd"] = None 610 611 else: 612 # Standard text format 613 labels = [] 614 append = labels.append 615 616 for rfield in rfields: 617 if rfield.selector == created_on_selector: 618 created_on_colname = rfield.colname 619 elif rfield.ftype != "id": 620 append((rfield.colname, rfield.label)) 621 622 for row in rows: 623 append_record = upd.append 624 if created_on_colname: 625 try: 626 created_on = row["_row"][created_on_colname] 627 except (KeyError, AttributeError): 628 pass 629 else: 630 if s3_utc(created_on) >= last_check_time: 631 append_record = new.append 632 633 record = [] 634 append_column = record.append 635 for colname, label in labels: 636 append_column((label, row[colname])) 637 append_record(record) 638 639 if "new" in notify_on and len(new): 640 output["new"] = len(new) 641 output["new_records"] = new 642 else: 643 output["new"] = None 644 if "upd" in notify_on and len(upd): 645 output["upd"] = len(upd) 646 output["upd_records"] = upd 647 else: 648 output["upd"] = None 649 650 output.update(meta_data) 651 return output
652 653 # END ========================================================================= 654