Package s3 :: Module s3anonymize
[frames] | no frames]

Source Code for Module s3.s3anonymize

  1  # -*- coding: utf-8 -*- 
  2   
  3  """ S3 Person Record Anonymizing 
  4   
  5      @copyright: 2018-2019 (c) Sahana Software Foundation 
  6      @license: MIT 
  7   
  8      Permission is hereby granted, free of charge, to any person 
  9      obtaining a copy of this software and associated documentation 
 10      files (the "Software"), to deal in the Software without 
 11      restriction, including without limitation the rights to use, 
 12      copy, modify, merge, publish, distribute, sublicense, and/or sell 
 13      copies of the Software, and to permit persons to whom the 
 14      Software is furnished to do so, subject to the following 
 15      conditions: 
 16   
 17      The above copyright notice and this permission notice shall be 
 18      included in all copies or substantial portions of the Software. 
 19   
 20      THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 
 21      EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 
 22      OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 
 23      NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 
 24      HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
 25      WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
 26      FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 
 27      OTHER DEALINGS IN THE SOFTWARE. 
 28  """ 
 29   
 30  import json 
 31  import uuid 
 32   
 33  from gluon import current, A, BUTTON, DIV, FORM, INPUT, LABEL, P 
 34   
 35  from s3dal import original_tablename 
 36  from s3rest import S3Method 
 37  from s3query import FS, S3Joins 
 38  from s3validators import JSONERRORS 
 39  from s3utils import s3_str 
 40   
 41  __all__ = ("S3Anonymize", 
 42             "S3AnonymizeWidget", 
 43             ) 
44 45 # ============================================================================= 46 -class S3Anonymize(S3Method):
47 """ REST Method to Anonymize Person Records """ 48
49 - def apply_method(self, r, **attr):
50 """ 51 Entry point for REST API 52 53 @param r: the S3Request instance 54 @param attr: controller parameters 55 56 @return: output data (JSON) 57 """ 58 59 output = {} 60 61 table, record_id = self.get_target_id() 62 if not table: 63 r.error(405, "Anonymizing not configured for resource") 64 if not record_id: 65 r.error(400, "No target record specified") 66 if not self.permitted(table, record_id): 67 r.unauthorized() 68 69 if r.representation == "json": 70 if r.http == "POST": 71 output = self.anonymize(r, table, record_id) 72 else: 73 r.error(405, current.ERROR.BAD_METHOD) 74 else: 75 r.error(415, current.ERROR.BAD_FORMAT) 76 77 # Set Content Type 78 current.response.headers["Content-Type"] = "application/json" 79 80 return output
81 82 # ------------------------------------------------------------------------- 83 @classmethod
84 - def anonymize(cls, r, table, record_id):
85 """ 86 Handle POST (anonymize-request), i.e. anonymize the target record 87 88 @param r: the S3Request 89 @param table: the target Table 90 @param record_id: the target record ID 91 92 @returns: JSON message 93 """ 94 95 # Read+parse body JSON 96 s = r.body 97 s.seek(0) 98 try: 99 options = json.load(s) 100 except JSONERRORS: 101 options = None 102 if not isinstance(options, dict): 103 r.error(400, "Invalid request options") 104 105 # Verify submitted action key against session (CSRF protection) 106 widget_id = "%s-%s-anonymize" % (table, record_id) 107 session_s3 = current.session.s3 108 keys = session_s3.anonymize 109 if keys is None or \ 110 widget_id not in keys or \ 111 options.get("key") != keys[widget_id]: 112 r.error(400, "Invalid action key (form reopened in another tab?)") 113 114 # Get the available rules from settings 115 rules = current.s3db.get_config(table, "anonymize") 116 if isinstance(rules, (tuple, list)): 117 names = set(rule.get("name") for rule in rules) 118 names.discard(None) 119 else: 120 # Single rule 121 rules["name"] = "default" 122 names = (rules["name"],) 123 rules = [rules] 124 125 # Get selected rules from options 126 selected = options.get("apply") 127 if not isinstance(selected, list): 128 r.error(400, "Invalid request options") 129 130 # Validate selected rules 131 for name in selected: 132 if name not in names: 133 r.error(400, "Invalid rule: %s" % name) 134 135 # Merge selected rules 136 cleanup = {} 137 cascade = [] 138 for rule in rules: 139 name = rule.get("name") 140 if not name or name not in selected: 141 continue 142 field_rules = rule.get("fields") 143 if field_rules: 144 cleanup.update(field_rules) 145 cascade_rules = rule.get("cascade") 146 if cascade_rules: 147 cascade.extend(cascade_rules) 148 149 # Apply selected rules 150 if cleanup or cascade: 151 rules = {"fields": cleanup, "cascade": cascade} 152 153 # NB will raise (+roll back) if configuration is invalid 154 cls.cascade(table, (record_id,), rules) 155 156 # Audit anonymize 157 prefix, name = original_tablename(table).split("_", 1) 158 current.audit("anonymize", prefix, name, 159 record = record_id, 160 representation = "html", 161 ) 162 163 output = current.xml.json_message(updated=record_id) 164 else: 165 output = current.xml.json_message(msg="No applicable rules found") 166 167 return output
168 169 # -------------------------------------------------------------------------
170 - def get_target_id(self):
171 """ 172 Determine the target table and record ID 173 174 @return: tuple (table, record_id) 175 """ 176 177 resource = self.resource 178 179 rules = resource.get_config("anonymize") 180 if not rules: 181 return None, None 182 183 return resource.table, self.record_id
184 185 # ------------------------------------------------------------------------- 186 @staticmethod
187 - def permitted(table, record_id):
188 """ 189 Check permissions to anonymize the target record 190 191 @param table: the target Table 192 @param record_id: the target record ID 193 194 @return: True|False 195 """ 196 197 has_permission = current.auth.s3_has_permission 198 199 return has_permission("update", table, record_id=record_id) and \ 200 has_permission("delete", table, record_id=record_id)
201 202 # ------------------------------------------------------------------------- 203 @classmethod
204 - def cascade(cls, table, record_ids, rules):
205 """ 206 Apply cascade of rules to anonymize records 207 208 @param table: the Table 209 @param record_ids: a set of record IDs 210 @param rules: the rules for this Table 211 212 @raises Exception: if the cascade failed due to DB constraints 213 or invalid rules; callers should roll back 214 the transaction if an exception is raised 215 """ 216 217 s3db = current.s3db 218 219 pkey = table._id.name 220 221 cascade = rules.get("cascade") 222 if cascade: 223 224 fieldnames = set(rule.get("match", pkey) for _, rule in cascade) 225 if pkey not in fieldnames: 226 fieldnames.add(pkey) 227 fields = [table[fn] for fn in fieldnames] 228 229 db = current.db 230 rows = db(table._id.belongs(record_ids)).select(*fields) 231 232 for tablename, rule in cascade: 233 234 lookup = rule.get("lookup") 235 if lookup: 236 # Explicit look-up function, call with master table+rows, 237 # as well as the name of the related table; should return 238 # a set/tuple/list of record ids in the related table 239 ids = lookup(table, rows, tablename) 240 else: 241 key = rule.get("key") 242 if not key: 243 continue 244 245 field = rule.get("match", pkey) 246 match = set(row[field] for row in rows) 247 248 # Resolve key and construct query 249 resource = s3db.resource(tablename, components=[]) 250 rq = FS(key).belongs(match) 251 query = rq.query(resource) 252 253 # Construct necessary joins 254 joins = S3Joins(tablename) 255 joins.extend(rq._joins(resource)[0]) 256 joins = joins.as_list() 257 258 # Extract the target table IDs 259 target_rows = db(query).select(resource._id, 260 join = joins, 261 ) 262 ids = set(row[resource._id.name] for row in target_rows) 263 264 # Recurse into related table 265 if ids: 266 cls.cascade(resource.table, ids, rule) 267 268 # Apply field rules 269 field_rules = rules.get("fields") 270 if field_rules: 271 cls.apply_field_rules(table, record_ids, field_rules) 272 273 # Apply deletion rules 274 if rules.get("delete"): 275 resource = s3db.resource(table, id=list(record_ids)) 276 resource.delete(cascade=True)
277 278 # ------------------------------------------------------------------------- 279 @staticmethod
280 - def apply_field_rules(table, record_ids, rules):
281 """ 282 Apply field rules on a set of records in a table 283 284 @param table: the Table 285 @param record_ids: the record IDs 286 @param rules: the rules 287 288 @raises Exception: if the field rules could not be applied 289 due to DB constraints or invalid rules; 290 callers should roll back the transaction 291 if an exception is raised 292 """ 293 294 fields = [table[fn] for fn in rules if fn in table.fields] 295 if table._id.name not in rules: 296 fields.insert(0, table._id) 297 298 # Select the records 299 query = table._id.belongs(record_ids) 300 rows = current.db(query).select(*fields) 301 302 pkey = table._id.name 303 304 s3db = current.s3db 305 update_super = s3db.update_super 306 onaccept = s3db.onaccept 307 308 for row in rows: 309 data = {} 310 for fieldname, rule in rules.items(): 311 312 if fieldname in table.fields: 313 field = table[fieldname] 314 else: 315 continue 316 317 if rule == "remove": 318 # Set to None 319 if field.notnull: 320 raise ValueError("Cannot remove %s - must not be NULL" % field) 321 else: 322 data[fieldname] = None 323 324 elif rule == "reset": 325 # Reset to the field's default value 326 default = field.default 327 if default is None and field.notnull: 328 raise ValueError("Cannot reset %s - default value None violates notnull-constraint") 329 data[fieldname] = default 330 331 elif callable(rule): 332 # Callable rule to procude a new value 333 data[fieldname] = rule(row[pkey], field, row[field]) 334 335 elif type(rule) is tuple: 336 method, value = rule 337 if method == "set": 338 # Set a fixed value 339 data[fieldname] = value 340 341 if data: 342 success = row.update_record(**data) 343 if not success: 344 raise ValueError("Could not clean %s record" % table) 345 346 update_super(table, row) 347 348 data[pkey] = row[pkey] 349 onaccept(table, data, method="update")
350
351 # ============================================================================= 352 -class S3AnonymizeWidget(object):
353 """ GUI widget for S3Anonymize """ 354 355 # ------------------------------------------------------------------------- 356 @classmethod
357 - def widget(cls, r, _class="action-lnk"):
358 """ 359 Render an action item (link or button) to anonymize the 360 target record of an S3Request, which can be embedded in 361 the record view 362 363 @param r: the S3Request 364 @param _class: HTML class for the action item 365 366 @returns: the action item (a HTML helper instance), or an empty 367 string if no anonymize-rules are configured for the 368 target table, no target record was specified or the 369 user is not permitted to anonymize it 370 """ 371 372 T = current.T 373 374 default = "" 375 376 # Determine target table 377 if r.component: 378 resource = r.component 379 if resource.link and not r.actuate_link(): 380 resource = resource.link 381 else: 382 resource = r.resource 383 table = resource.table 384 385 # Determine target record 386 record_id = S3Anonymize._record_id(r) 387 if not record_id: 388 return default 389 390 # Check if target is configured for anonymize 391 rules = resource.get_config("anonymize") 392 if not rules: 393 return default 394 if not isinstance(rules, (tuple, list)): 395 # Single rule 396 rules["name"] = "default" 397 rules = [rules] 398 399 # Check permissions to anonymize 400 if not S3Anonymize.permitted(table, record_id): 401 return default 402 403 404 # Determine widget ID 405 widget_id = "%s-%s-anonymize" % (table, record_id) 406 407 # Inject script 408 script_options = {"ajaxURL": r.url(method = "anonymize", 409 representation = "json", 410 ), 411 } 412 cls.inject_script(widget_id, script_options) 413 414 # Action button 415 action_button = A(T("Anonymize"), _class="anonymize-btn") 416 if _class: 417 action_button.add_class(_class) 418 419 # Dialog and Form 420 INFO = T("The following information will be deleted from the record") 421 CONFIRM = T("Are you sure you want to delete the selected details?") 422 SUCCESS = T("Action successful - please wait...") 423 424 form = FORM(P("%s:" % INFO), 425 cls.selector(rules), 426 P(CONFIRM), 427 DIV(INPUT(value = "anonymize_confirm", 428 _name = "anonymize_confirm", 429 _type = "checkbox", 430 ), 431 LABEL(T("Yes, delete the selected details")), 432 _class = "anonymize-confirm", 433 ), 434 cls.buttons(), 435 _class = "anonymize-form", 436 # Store action key in form 437 hidden = {"action-key": cls.action_key(widget_id)}, 438 ) 439 440 dialog = DIV(form, 441 DIV(P(SUCCESS), 442 _class = "hide anonymize-success", 443 ), 444 _class = "anonymize-dialog hide", 445 _title = T("Anonymize"), 446 ) 447 448 # Assemble widget 449 widget = DIV(action_button, 450 dialog, 451 _class="s3-anonymize", 452 _id = widget_id, 453 ) 454 455 return widget
456 457 # ------------------------------------------------------------------------- 458 @staticmethod
459 - def action_key(widget_id):
460 """ 461 Generate a unique STP token for the widget (CSRF protection) and 462 store it in session 463 464 @param widget_id: the widget ID (which includes the target 465 table name and record ID) 466 @return: a unique identifier (as string) 467 """ 468 469 session_s3 = current.session.s3 470 471 keys = session_s3.anonymize 472 if keys is None: 473 session_s3.anonymize = keys = {} 474 key = keys[widget_id] = str(uuid.uuid4()) 475 476 return key
477 478 # ------------------------------------------------------------------------- 479 @staticmethod
480 - def selector(rules):
481 """ 482 Generate the rule selector for anonymize-form 483 484 @param rules: the list of configured rules 485 486 @return: the selector (DIV) 487 """ 488 489 T = current.T 490 491 selector = DIV(_class="anonymize-select") 492 493 for rule in rules: 494 495 name = rule.get("name") 496 if not name: 497 continue 498 499 title = T(rule.get("title", name)) 500 501 selector.append(DIV(INPUT(value = "on", 502 _name = s3_str(name), 503 _type = "checkbox", 504 _class = "anonymize-rule", 505 ), 506 LABEL(title), 507 _class = "anonymize-option", 508 )) 509 510 return selector
511 512 # ------------------------------------------------------------------------- 513 @staticmethod
514 - def buttons():
515 """ 516 Generate the submit/cancel buttons for the anonymize-form 517 518 @return: the buttons row (DIV) 519 """ 520 521 T = current.T 522 523 return DIV(BUTTON(T("Submit"), 524 _class = "small alert button anonymize-submit", 525 _disabled = "disabled", 526 _type = "button", 527 ), 528 A(T("Cancel"), 529 _class = "cancel-form-btn action-lnk anonymize-cancel", 530 _href = "javascript:void(0)", 531 ), 532 _class = "anonymize-buttons", 533 )
534 535 # ------------------------------------------------------------------------- 536 @staticmethod
537 - def inject_script(widget_id, options):
538 """ 539 Inject the necessary JavaScript for the UI dialog 540 541 @param widget_id: the widget ID 542 @param options: JSON-serializable dict of widget options 543 """ 544 545 request = current.request 546 s3 = current.response.s3 547 548 # Static script 549 if s3.debug: 550 script = "/%s/static/scripts/S3/s3.ui.anonymize.js" % \ 551 request.application 552 else: 553 script = "/%s/static/scripts/S3/s3.ui.anonymize.min.js" % \ 554 request.application 555 scripts = s3.scripts 556 if script not in scripts: 557 scripts.append(script) 558 559 # Widget options 560 opts = {} 561 if options: 562 opts.update(options) 563 564 # Widget instantiation 565 script = '''$('#%(widget_id)s').anonymize(%(options)s)''' % \ 566 {"widget_id": widget_id, 567 "options": json.dumps(opts), 568 } 569 jquery_ready = s3.jquery_ready 570 if script not in jquery_ready: 571 jquery_ready.append(script)
572 573 # END ========================================================================= 574