Package s3 :: Module s3import
[frames] | no frames]

Source Code for Module s3.s3import

   1  # -*- coding: utf-8 -*- 
   2   
   3  """ Resource Import Tools 
   4   
   5      @copyright: 2011-2019 (c) Sahana Software Foundation 
   6      @license: MIT 
   7   
   8      Permission is hereby granted, free of charge, to any person 
   9      obtaining a copy of this software and associated documentation 
  10      files (the "Software"), to deal in the Software without 
  11      restriction, including without limitation the rights to use, 
  12      copy, modify, merge, publish, distribute, sublicense, and/or sell 
  13      copies of the Software, and to permit persons to whom the 
  14      Software is furnished to do so, subject to the following 
  15      conditions: 
  16   
  17      The above copyright notice and this permission notice shall be 
  18      included in all copies or substantial portions of the Software. 
  19   
  20      THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 
  21      EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 
  22      OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 
  23      NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 
  24      HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
  25      WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
  26      FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 
  27      OTHER DEALINGS IN THE SOFTWARE. 
  28  """ 
  29   
  30  # @todo: remove all interactive error reporting out of the _private methods, 
  31  #        and raise exceptions instead. 
  32  __all__ = ("S3Importer", 
  33             "S3ImportJob", 
  34             "S3ImportItem", 
  35             "S3Duplicate", 
  36             "S3BulkImporter", 
  37             ) 
  38   
  39  import cPickle 
  40  import datetime 
  41  import json 
  42  import os 
  43  import sys 
  44  import urllib2 # Needed for error handling on fetch 
  45  import uuid 
  46   
  47  from copy import deepcopy 
  48  try: 
  49      from cStringIO import StringIO    # Faster, where available 
  50  except ImportError: 
  51      from StringIO import StringIO 
  52  try: 
  53      from lxml import etree 
  54  except ImportError: 
  55      sys.stderr.write("ERROR: lxml module needed for XML handling\n") 
  56      raise 
  57   
  58  from gluon import current, redirect, URL, \ 
  59                    A, B, DIV, INPUT, LI, P, TABLE, TBODY, TD, TFOOT, TH, TR, UL, \ 
  60                    IS_EMPTY_OR, IS_IN_SET, SQLFORM 
  61  from gluon.storage import Storage, Messages 
  62  from gluon.tools import callback, fetch 
  63   
  64  from s3dal import Field 
  65  from s3datetime import s3_utc 
  66  from s3rest import S3Method, S3Request 
  67  from s3resource import S3Resource 
  68  from s3utils import s3_auth_user_represent_name, s3_get_foreign_key, \ 
  69                      s3_has_foreign_key, s3_mark_required, s3_unicode 
  70  from s3validators import IS_JSONS3 
71 72 # ============================================================================= 73 -class S3Importer(S3Method):
74 """ 75 Transformable formats (XML, JSON, CSV) import handler 76 """ 77 78 UPLOAD_TABLE_NAME = "s3_import_upload" 79 80 # -------------------------------------------------------------------------
81 - def apply_method(self, r, **attr):
82 """ 83 Apply CRUD methods 84 85 @param r: the S3Request 86 @param attr: dictionary of parameters for the method handler 87 88 @return: output object to send to the view 89 90 Known means of communicating with this module: 91 92 It expects a URL of the form: /prefix/name/import 93 94 It will interpret the http requests as follows: 95 96 GET will trigger the upload 97 POST will trigger either commits or display the import details 98 DELETE will trigger deletes 99 100 It will accept one of the following control vars: 101 item: to specify a single item in the import job 102 job: to specify a job 103 It should not receive both so job takes precedent over item 104 105 For CSV imports, the calling controller can add extra fields 106 to the upload form to add columns to each row in the CSV. To add 107 the extra fields, pass a named parameter "csv_extra_fields" to the 108 s3_rest_controller call (or the S3Request call, respectively): 109 110 s3_rest_controller(module, resourcename, 111 csv_extra_fields=[ 112 dict(label="ColumnLabelInTheCSV", 113 field=field_instance) 114 ]) 115 116 The Field instance "field" will be added to the upload form, and 117 the user input will be added to each row of the CSV under the 118 label as specified. If the "field" validator has options, the 119 input value will be translated into the option representation, 120 otherwise the value will be used as-is. 121 122 Note that the "label" in the dict is the column label in the CSV, 123 whereas the field label for the form is to be set in the Field 124 instance passed as "field". 125 126 You can add any arbitrary number of csv_extra_fields to the list. 127 128 Additionally, you may want to allow the user to choose whether 129 the import shall first remove all existing data in the target 130 table. To do so, pass a label for the "replace_option" to the 131 request: 132 133 s3_rest_controller(module, resourcename, 134 replace_option=T("Remove existing data before import")) 135 136 This will add the respective checkbox to the upload form. 137 138 You may also want to provide a link to download a CSV template from 139 the upload form. To do that, add the resource name to the request 140 attributes: 141 142 s3_rest_controller(module, resourcename, 143 csv_template="<resourcename>") 144 145 This will provide a link to: 146 - static/formats/s3csv/<controller>/<resourcename>.csv 147 at the top of the upload form. 148 149 """ 150 151 #current.log.debug("S3Importer.apply_method(%s)" % r) 152 153 # Messages 154 T = current.T 155 messages = self.messages = Messages(T) 156 messages.download_template = "Download Template" 157 messages.invalid_file_format = "Invalid File Format" 158 messages.unsupported_file_type = "Unsupported file type of %s" 159 messages.stylesheet_not_found = "No Stylesheet %s could be found to manage the import file." 160 messages.no_file = "No file submitted" 161 messages.file_open_error = "Unable to open the file %s" 162 messages.file_not_found = "The file to upload is missing" 163 messages.no_records_to_import = "No records to import" 164 messages.no_job_to_delete = "No job to delete, maybe it has already been deleted." 165 messages.title_job_read = "Details of the selected import job" 166 messages.title_job_list = "List of import items" 167 messages.file_uploaded = "Import file uploaded" 168 messages.upload_submit_btn = "Upload Data File" 169 messages.open_btn = "Open" 170 messages.view_btn = "View" 171 messages.delete_btn = "Delete" 172 messages.item_show_details = "Display Details" 173 messages.job_total_records = "Total records in the Import Job" 174 messages.job_records_selected = "Records selected" 175 messages.job_deleted = "Import job deleted" 176 messages.job_completed = "Job run on %s. With result of (%s)" 177 messages.import_file = "Import File" 178 messages.import_file_comment = "Upload a file formatted according to the Template." 179 messages.user_name = "User Name" 180 messages.commit_total_records_imported = "%s records imported" 181 messages.commit_total_records_ignored = "%s records ignored" 182 messages.commit_total_errors = "%s records in error" 183 184 # Target table for the data import 185 tablename = self.tablename 186 187 # Table for uploads 188 self.__define_table() 189 190 # Check authorization 191 permitted = current.auth.s3_has_permission 192 authorised = permitted("create", self.upload_tablename) and \ 193 permitted("create", tablename) 194 if not authorised: 195 if r.method is not None: 196 r.unauthorised() 197 else: 198 return {"form": None} 199 200 # Target table for the data import 201 self.controller_resource = self.resource 202 self.controller_table = self.table 203 self.controller_tablename = tablename 204 205 self.upload_resource = None 206 self.item_resource = None 207 208 # Environment 209 self.controller = r.controller 210 self.function = r.function 211 212 try: 213 self.uploadTitle = current.response.s3.crud_strings[tablename].title_upload or T("Import") 214 except (KeyError, AttributeError): 215 self.uploadTitle = T("Import") 216 217 # @todo: correct to switch this off for the whole session? 218 current.session.s3.ocr_enabled = False 219 220 # Reset all errors/warnings 221 self.error = None 222 self.warning = None 223 224 # CSV upload configuration 225 if "csv_stylesheet" in attr: 226 self.csv_stylesheet = attr["csv_stylesheet"] 227 else: 228 self.csv_stylesheet = None 229 self.csv_extra_fields = None 230 self.csv_extra_data = None 231 232 # XSLT Path 233 self.xslt_path = os.path.join(r.folder, r.XSLT_PATH) 234 self.xslt_extension = r.XSLT_EXTENSION 235 236 # @todo get the data from either get_vars or post_vars appropriately 237 # for post -> commit_items would need to add the uploadID 238 get_vars = r.get_vars 239 transform = get_vars.get("transform", None) 240 source = get_vars.get("filename", None) 241 if "job" in r.post_vars: 242 upload_id = r.post_vars["job"] 243 else: 244 upload_id = get_vars.get("job") 245 items = self._process_item_list(upload_id, r.vars) 246 if "delete" in get_vars: 247 r.http = "DELETE" 248 249 # If we have an upload ID, then get upload and import job 250 self.upload_id = upload_id 251 query = (self.upload_table.id == upload_id) 252 self.upload_job = current.db(query).select(limitby=(0, 1)).first() 253 if self.upload_job: 254 self.job_id = self.upload_job.job_id 255 else: 256 self.job_id = None 257 258 # Experimental uploading via ajax - added for vulnerability 259 # Part of the problem with this is that it works directly with the 260 # opened file. This might pose a security risk, is should be alright 261 # if only trusted users are involved but care should be taken with this 262 self.ajax = current.request.ajax and r.post_vars.approach == "ajax" 263 264 # Now branch off to the appropriate controller function 265 if r.http == "GET": 266 if source != None: 267 self.commit(source, transform) 268 output = self.upload(r, **attr) 269 if upload_id != None: 270 output = self.display_job(upload_id) 271 else: 272 output = self.upload(r, **attr) 273 elif r.http == "POST": 274 if items != None: 275 output = self.commit_items(upload_id, items) 276 else: 277 output = self.generate_job(r, **attr) 278 elif r.http == "DELETE": 279 if upload_id != None: 280 output = self.delete_job(upload_id) 281 else: 282 r.error(405, current.ERROR.BAD_METHOD) 283 284 return output
285 286 # -------------------------------------------------------------------------
287 - def upload(self, r, **attr):
288 """ 289 This will display the upload form 290 It will ask for a file to be uploaded or for a job to be selected. 291 292 If a file is uploaded then it will guess at the file type and 293 ask for the transform file to be used. The transform files will 294 be in a dataTable with the module specific files shown first and 295 after those all other known transform files. Once the transform 296 file is selected the import process can be started which will 297 generate an importJob, and a "POST" method will occur 298 299 If a job is selected it will have two actions, open and delete. 300 Open will mean that a "GET" method will occur, with the job details 301 passed in. 302 Whilst the delete action will trigger a "DELETE" method. 303 """ 304 305 #current.log.debug("S3Importer.upload()") 306 307 request = self.request 308 309 form = self._upload_form(r, **attr) 310 output = self._create_upload_dataTable() 311 if request.representation == "aadata": 312 return output 313 314 output.update(form=form, title=self.uploadTitle) 315 return output
316 317 # -------------------------------------------------------------------------
318 - def generate_job(self, r, **attr):
319 """ 320 Generate an ImportJob from the submitted upload form 321 """ 322 323 #current.log.debug("S3Importer.display()") 324 325 response = current.response 326 s3 = response.s3 327 328 db = current.db 329 table = self.upload_table 330 output = None 331 if self.ajax: 332 sfilename = ofilename = r.post_vars["file"].filename 333 upload_id = table.insert(controller=self.controller, 334 function=self.function, 335 filename=ofilename, 336 file = sfilename, 337 user_id=current.session.auth.user.id 338 ) 339 else: 340 title = self.uploadTitle 341 form = self._upload_form(r, **attr) 342 343 r = self.request 344 r.read_body() 345 sfilename = form.vars.file 346 try: 347 ofilename = r.post_vars["file"].filename 348 except (KeyError, AttributeError): 349 form.errors.file = self.messages.no_file 350 351 if form.errors: 352 response.flash = "" 353 output = self._create_upload_dataTable() 354 output.update(form=form, title=title) 355 356 elif not sfilename or \ 357 ofilename not in r.files or r.files[ofilename] is None: 358 response.flash = "" 359 response.error = self.messages.file_not_found 360 output = self._create_upload_dataTable() 361 output.update(form=form, title=title) 362 else: 363 query = (table.file == sfilename) 364 db(query).update(controller = self.controller, 365 function = self.function, 366 filename = ofilename, 367 user_id = current.session.auth.user.id) 368 row = db(query).select(table.id, 369 limitby=(0, 1)).first() 370 upload_id = row.id 371 372 if not output: 373 output = {} 374 # Must commit here to separate this transaction from 375 # the trial import phase which will be rolled back. 376 db.commit() 377 378 extension = ofilename.rsplit(".", 1).pop() 379 if extension not in ("csv", "xls", "xlsx", "xlsm"): 380 if self.ajax: 381 return {"Error": self.messages.invalid_file_format} 382 response.flash = None 383 response.error = self.messages.invalid_file_format 384 return self.upload(r, **attr) 385 386 if self.ajax: 387 upload_file = r.post_vars.file.file 388 else: 389 upload_file = r.files[ofilename] 390 if extension == "xls": 391 if "xls_parser" in s3: 392 # Survey module currently 393 upload_file.seek(0) 394 upload_file = s3.xls_parser(upload_file.read()) 395 extension = "csv" 396 397 if upload_file is None: 398 response.flash = None 399 response.error = self.messages.file_not_found 400 return self.upload(r, **attr) 401 else: 402 upload_file.seek(0) 403 404 if "single_pass" in r.vars: 405 single_pass = r.vars["single_pass"] 406 else: 407 single_pass = None 408 self._generate_import_job(upload_id, 409 upload_file, 410 extension, 411 commit_job = single_pass) 412 if upload_id is None: 413 row = db(query).update(status = 2) # in error 414 if self.error != None: 415 response.error = self.error 416 if self.warning != None: 417 response.warning = self.warning 418 response.flash = "" 419 return self.upload(r, **attr) 420 else: 421 if single_pass: 422 current.session.confirmation = self.messages.file_uploaded 423 # For a single pass retain the vars from the original URL 424 next_URL = URL(r=self.request, 425 f=self.function, 426 args=["import"], 427 vars=current.request.get_vars 428 ) 429 redirect(next_URL) 430 s3.dataTable_vars = {"job": upload_id} 431 return self.display_job(upload_id) 432 return output
433 434 # -------------------------------------------------------------------------
435 - def display_job(self, upload_id):
436 """ 437 @todo: docstring? 438 """ 439 440 #current.log.debug("S3Importer.display_job()") 441 442 db = current.db 443 request = self.request 444 table = self.upload_table 445 job_id = self.job_id 446 if job_id is None: 447 # Redirect to the start page (removes all vars) 448 db(table.id == upload_id).update(status = 2) # in error 449 current.session.warning = self.messages.no_records_to_import 450 redirect(URL(r=request, f=self.function, args=["import"])) 451 452 # Get the status of the upload job 453 row = db(table.id == upload_id).select(table.status, 454 table.modified_on, 455 table.summary_added, 456 table.summary_error, 457 table.summary_ignored, 458 limitby=(0, 1)).first() 459 status = row.status 460 # completed display details 461 if status == 3: # Completed 462 # @todo currently this is an unnecessary server call, 463 # change for completed records to be a display details 464 # and thus avoid the round trip. 465 # but keep this code to protect against hand-crafted URLs 466 # (and the 'go back' syndrome on the browser) 467 result = (row.summary_added, 468 row.summary_error, 469 row.summary_ignored, 470 ) 471 self._display_completed_job(result, row.modified_on) 472 redirect(URL(r=request, f=self.function, args=["import"])) 473 474 output = self._create_import_item_dataTable(upload_id, job_id) 475 if request.representation == "aadata": 476 return output 477 478 # Interactive Import 479 # Display import items 480 response = current.response 481 response.view = self._view(request, "list.html") 482 if response.s3.error_report: 483 error_report = "Errors|" + "|".join(response.s3.error_report) 484 error_tip = A("All Errors", 485 _class="errortip", 486 _title=error_report) 487 else: 488 # @todo: restore the error tree from all items? 489 error_tip = "" 490 491 rowcount = len(self._get_all_items(upload_id)) 492 rheader = DIV(TABLE(TR(TH("%s: " % self.messages.job_total_records), 493 TD(rowcount, _id="totalAvailable"), 494 TH("%s: " % self.messages.job_records_selected), 495 TD(0, _id="totalSelected"), 496 TH(error_tip) 497 ), 498 )) 499 500 output["title"] = self.messages.title_job_read 501 output["rheader"] = rheader 502 output["subtitle"] = self.messages.title_job_list 503 504 return output
505 506 # -------------------------------------------------------------------------
507 - def commit(self, source, transform):
508 """ 509 @todo: docstring? 510 """ 511 512 #current.log.debug("S3Importer.commit(%s, %s)" % (source, transform)) 513 514 session = current.session 515 516 try: 517 openFile = open(source, "r") 518 except IOError: 519 session.error = self.messages.file_open_error % source 520 redirect(URL(r=self.request, f=self.function)) 521 522 # @todo: manage different file formats 523 # @todo: find file format from request.extension 524 extension = source.rsplit(".", 1).pop() 525 if extension not in ("csv, ""xls", "xlsx", "xlsm"): 526 fileFormat = "csv" 527 else: 528 fileFormat = extension 529 530 # Insert data in the table and get the ID 531 try: 532 user = session.auth.user.id 533 except AttributeError: 534 user = None 535 536 upload_id = self.upload_table.insert(controller = self.controller, 537 function = self.function, 538 filename = source, 539 user_id = user, 540 status = 1) 541 current.db.commit() 542 543 # Create the import job 544 result = self._generate_import_job(upload_id, 545 openFile, 546 fileFormat, 547 stylesheet=transform 548 ) 549 if result is None: 550 if self.error != None: 551 if session.error is None: 552 session.error = self.error 553 else: 554 session.error += self.error 555 if self.warning != None: 556 if session.warning is None: 557 session.warning = self.warning 558 else: 559 session.warning += self.warning 560 else: 561 items = self._get_all_items(upload_id, True) 562 # Commit the import job 563 self._commit_import_job(upload_id, items) 564 result = self._update_upload_job(upload_id) 565 566 # Get the results and display 567 messages = self.messages 568 msg = "%s : %s %s %s" % (source, 569 messages.commit_total_records_imported, 570 messages.commit_total_errors, 571 messages.commit_total_records_ignored) 572 msg = msg % result 573 574 confirmation = session.confirmation 575 if confirmation is None: 576 confirmation = msg 577 else: 578 confirmation += msg
579 580 # @todo: return the upload_id? 581 582 # -------------------------------------------------------------------------
583 - def commit_items(self, upload_id, items):
584 """ 585 @todo: docstring? 586 """ 587 588 #current.log.debug("S3Importer.commit_items(%s, %s)" % (upload_id, items)) 589 # Save the import items 590 self._commit_import_job(upload_id, items) 591 # Update the upload table 592 # change the status to completed 593 # record the summary details 594 # delete the upload file 595 result = self._update_upload_job(upload_id) 596 if self.ajax: 597 return result 598 # redirect to the start page (removes all vars) 599 self._display_completed_job(result) 600 redirect(URL(r=self.request, f=self.function, args=["import"]))
601 602 # -------------------------------------------------------------------------
603 - def delete_job(self, upload_id):
604 """ 605 Delete an uploaded file and the corresponding import job 606 607 @param upload_id: the upload ID 608 """ 609 610 #current.log.debug("S3Importer.delete_job(%s)" % upload_id) 611 612 db = current.db 613 614 request = self.request 615 resource = request.resource # use self.resource? 616 617 # Get the import job ID 618 job_id = self.job_id 619 620 # Delete the import job (if any) 621 if job_id: 622 result = resource.import_xml(None, 623 id = None, 624 tree = None, 625 job_id = job_id, 626 delete_job = True, 627 ) 628 # @todo: check result 629 630 # Delete the upload entry 631 count = db(self.upload_table.id == upload_id).delete() 632 # @todo: check that the record has been deleted 633 634 # Commit the changes 635 db.commit() 636 637 result = count 638 639 # Return to the main import screen 640 # @todo: check result properly 641 if result == False: 642 current.response.warning = self.messages.no_job_to_delete 643 else: 644 current.response.confirmation = self.messages.job_deleted 645 646 # redirect to the start page (remove all vars) 647 self.next = request.url(vars={})
648 649 # ======================================================================== 650 # Utility methods 651 # ========================================================================
652 - def _upload_form(self, r, **attr):
653 """ 654 Create and process the upload form, including csv_extra_fields 655 """ 656 657 EXTRA_FIELDS = "csv_extra_fields" 658 TEMPLATE = "csv_template" 659 REPLACE_OPTION = "replace_option" 660 661 response = current.response 662 s3 = response.s3 663 request = self.request 664 table = self.upload_table 665 666 formstyle = s3.crud.formstyle 667 response.view = self._view(request, "list_filter.html") 668 669 if REPLACE_OPTION in attr: 670 replace_option = attr[REPLACE_OPTION] 671 if replace_option is not None: 672 field = table.replace_option 673 field.readable = field.writable = True 674 field.label = replace_option 675 field.comment = DIV(_class="tooltip", 676 _title="%s|%s" % \ 677 (replace_option, 678 current.T("Delete all data of this type which the user has permission to before upload. This is designed for workflows where the data is maintained in an offline spreadsheet and uploaded just for Reads."))) 679 680 fields = [f for f in table if f.readable or f.writable and not f.compute] 681 if EXTRA_FIELDS in attr: 682 extra_fields = attr[EXTRA_FIELDS] 683 if extra_fields is not None: 684 fields.extend([f["field"] for f in extra_fields if "field" in f]) 685 self.csv_extra_fields = extra_fields 686 labels, required = s3_mark_required(fields) 687 if required: 688 s3.has_required = True 689 690 form = SQLFORM.factory(table_name=self.UPLOAD_TABLE_NAME, 691 labels=labels, 692 formstyle=formstyle, 693 upload = os.path.join(request.folder, "uploads", "imports"), 694 separator = "", 695 message=self.messages.file_uploaded, 696 *fields) 697 698 args = ["s3csv"] 699 template = attr.get(TEMPLATE, True) 700 if template is True: 701 args.extend([self.controller, "%s.csv" % self.function]) 702 elif isinstance(template, basestring): 703 args.extend([self.controller, "%s.csv" % template]) 704 elif isinstance(template, (tuple, list)): 705 args.extend(template[:-1]) 706 args.append("%s.csv" % template[-1]) 707 else: 708 template = None 709 if template is not None: 710 url = URL(r=request, c="static", f="formats", args=args) 711 try: 712 # only add the download link if the template can be opened 713 open("%s/../%s" % (r.folder, url)) 714 form[0][0].insert(0, TR(TD(A(self.messages.download_template, 715 _href=url)), 716 _id="template__row")) 717 except IOError: 718 pass 719 720 if form.accepts(r.post_vars, current.session, formname="upload_form"): 721 722 formvars = form.vars 723 724 # Create the upload entry 725 table.insert(file = formvars.file) 726 727 # Process extra fields 728 if self.csv_extra_fields: 729 730 # Convert Values to Represents 731 extra_data = self.csv_extra_data = Storage() 732 for f in self.csv_extra_fields: 733 label = f.get("label") 734 if not label: 735 continue 736 field = f.get("field") 737 value = f.get("value") 738 if field: 739 if field.name in formvars: 740 data = formvars[field.name] 741 else: 742 data = field.default 743 value = data 744 requires = field.requires 745 if not isinstance(requires, (list, tuple)): 746 requires = [requires] 747 if requires: 748 requires = requires[0] 749 if isinstance(requires, IS_EMPTY_OR): 750 requires = requires.other 751 try: 752 options = requires.options() 753 except: 754 pass 755 else: 756 for k, v in options: 757 if k == str(data): 758 value = v 759 break 760 if hasattr(value, "m"): 761 # Don't translate: XSLT expects English 762 value = value.m 763 elif value is None: 764 continue 765 extra_data[label] = value 766 767 s3.no_formats = True 768 return form
769 770 # -------------------------------------------------------------------------
771 - def _create_upload_dataTable(self):
772 """ 773 List of previous Import jobs 774 """ 775 776 db = current.db 777 request = self.request 778 controller = self.controller 779 function = self.function 780 s3 = current.response.s3 781 782 table = self.upload_table 783 s3.filter = (table.controller == controller) & \ 784 (table.function == function) 785 #fields = ["id", 786 # "filename", 787 # "created_on", 788 # "user_id", 789 # "replace_option", 790 # "status", 791 # ] 792 793 self._use_upload_table() 794 795 # Hide the list of prior uploads for now 796 #output = self._dataTable(fields, sort_by = [[2,"desc"]]) 797 output = {} 798 799 self._use_controller_table() 800 801 if request.representation == "aadata": 802 return output 803 804 query = (table.status != 3) # Status of Pending or in-Error 805 rows = db(query).select(table.id) 806 restrictOpen = [str(row.id) for row in rows] 807 query = (table.status == 3) # Status of Completed 808 rows = db(query).select(table.id) 809 restrictView = [str(row.id) for row in rows] 810 811 s3.actions = [{"label": str(self.messages.open_btn), 812 "_class": "action-btn", 813 "url": URL(r=request, 814 c=controller, 815 f=function, 816 args=["import"], 817 vars={"job":"[id]"}), 818 "restrict": restrictOpen 819 }, 820 {"label": str(self.messages.view_btn), 821 "_class": "action-btn", 822 "url": URL(r=request, 823 c=controller, 824 f=function, 825 args=["import"], 826 vars={"job":"[id]"}), 827 "restrict": restrictView 828 }, 829 {"label": str(self.messages.delete_btn), 830 "_class": "delete-btn", 831 "url": URL(r=request, 832 c=controller, 833 f=function, 834 args=["import"], 835 vars={"job":"[id]", 836 "delete":"True" 837 } 838 ) 839 }, 840 ] 841 # Display an Error if no job is attached with this record 842 query = (table.status == 1) # Pending 843 rows = db(query).select(table.id) 844 s3.dataTableStyleAlert = [str(row.id) for row in rows] 845 query = (table.status == 2) # in error 846 rows = db(query).select(table.id) 847 s3.dataTableStyleWarning = [str(row.id) for row in rows] 848 849 return output
850 851 # -------------------------------------------------------------------------
852 - def _create_import_item_dataTable(self, upload_id, job_id):
853 """ 854 @todo: docstring? 855 """ 856 857 s3 = current.response.s3 858 859 represent = {"s3_import_item.element": self._item_element_represent} 860 self._use_import_item_table(job_id) 861 table = self.table 862 863 # Get a list of the records that have an error of None 864 query = (table.job_id == job_id) & \ 865 (table.tablename == self.controller_tablename) 866 rows = current.db(query).select(table.id, table.error) 867 select_list = [] 868 error_list = [] 869 for row in rows: 870 if row.error: 871 error_list.append(str(row.id)) 872 else: 873 select_list.append("%s" % row.id) 874 875 # Add a filter to the dataTable query 876 s3.filter = query 877 878 # Experimental uploading via ajax - added for vulnerability 879 if self.ajax: 880 resource = self.resource 881 resource.add_filter(query) 882 rows = resource.select(["id", "element", "error"], 883 limit=None)["rows"] 884 return (upload_id, select_list, rows) 885 886 # Toggle-button for item details 887 s3.actions = [{"label": str(self.messages.item_show_details), 888 "_class": "action-btn toggle-item", 889 }, 890 ] 891 s3.jquery_ready.append(''' 892 $('#import-items').on('click','.toggle-item',function(){$('.importItem.item-'+$(this).attr('db_id')).toggle();})''') 893 894 output = self._dataTable(["id", "element", "error"], 895 #sort_by = [[1, "asc"]], 896 represent=represent, 897 ajax_item_id=upload_id, 898 dt_bulk_select = select_list) 899 900 self._use_controller_table() 901 902 if self.request.representation == "aadata": 903 return output 904 905 # Highlight rows in error in red 906 s3.dataTableStyleWarning = error_list 907 908 form = output["items"] 909 job = INPUT(_type="hidden", _id="importUploadID", _name="job", 910 _value="%s" % upload_id) 911 form.append(job) 912 return output
913 914 # -------------------------------------------------------------------------
915 - def _generate_import_job(self, 916 upload_id, 917 openFile, 918 fileFormat, 919 stylesheet=None, 920 commit_job=False):
921 """ 922 This will take a s3_import_upload record and 923 generate the importJob 924 925 @param uploadFilename: The name of the uploaded file 926 927 @todo: complete parameter descriptions 928 """ 929 930 #current.log.debug("S3Importer._generate_import_job(%s, %s, %s, %s)" % (upload_id, 931 # openFile, 932 # fileFormat, 933 # stylesheet, 934 # )) 935 936 # --------------------------------------------------------------------- 937 # CSV 938 if fileFormat in ("csv", "comma-separated-values"): 939 940 fmt = "csv" 941 src = openFile 942 943 # --------------------------------------------------------------------- 944 # XLS 945 elif fileFormat in ("xls", "xlsx", "xlsm"): 946 947 fmt = "xls" 948 src = openFile 949 950 # --------------------------------------------------------------------- 951 # XML 952 # @todo: implement 953 #elif fileFormat == "xml": 954 955 # --------------------------------------------------------------------- 956 # S3JSON 957 # @todo: implement 958 #elif fileFormat == "s3json": 959 960 # --------------------------------------------------------------------- 961 # PDF 962 # @todo: implement 963 #elif fileFormat == "pdf": 964 965 # --------------------------------------------------------------------- 966 # Unsupported Format 967 else: 968 msg = self.messages.unsupported_file_type % fileFormat 969 self.error = msg 970 current.log.debug(msg) 971 return None 972 973 # Get the stylesheet 974 if stylesheet == None: 975 stylesheet = self._get_stylesheet() 976 if stylesheet == None: 977 return None 978 979 request = self.request 980 resource = request.resource 981 982 # Before calling import tree ensure the db.table is the controller_table 983 self.table = self.controller_table 984 self.tablename = self.controller_tablename 985 986 # Pass stylesheet arguments 987 args = Storage() 988 mode = request.get_vars.get("xsltmode", None) 989 if mode is not None: 990 args.update(mode=mode) 991 992 # Generate the import job 993 resource.import_xml(src, 994 format=fmt, 995 extra_data=self.csv_extra_data, 996 stylesheet=stylesheet, 997 ignore_errors = True, 998 commit_job = commit_job, 999 **args) 1000 1001 job = resource.job 1002 if job is None: 1003 if resource.error: 1004 # Error 1005 self.error = resource.error 1006 return None 1007 else: 1008 # Nothing to import 1009 self.warning = self.messages.no_records_to_import 1010 return None 1011 else: 1012 # Job created 1013 db = current.db 1014 job_id = job.job_id 1015 errors = current.xml.collect_errors(job) 1016 if errors: 1017 current.response.s3.error_report = errors 1018 query = (self.upload_table.id == upload_id) 1019 result = db(query).update(job_id=job_id) 1020 # @todo: add check that result == 1, if not we are in error 1021 # Now commit the changes 1022 db.commit() 1023 1024 self.job_id = job_id 1025 return True
1026 1027 # -------------------------------------------------------------------------
1028 - def _get_stylesheet(self, file_format="csv"):
1029 """ 1030 Get the stylesheet for transformation of the import 1031 1032 @param file_format: the import source file format 1033 """ 1034 1035 if file_format == "csv": 1036 xslt_path = os.path.join(self.xslt_path, "s3csv") 1037 else: 1038 xslt_path = os.path.join(self.xslt_path, file_format, "import.xsl") 1039 return xslt_path 1040 1041 # Use the "csv_stylesheet" parameter to override the CSV stylesheet subpath 1042 # and filename, e.g. 1043 # s3_rest_controller(module, resourcename, 1044 # csv_stylesheet=("inv", "inv_item.xsl")) 1045 if self.csv_stylesheet: 1046 if isinstance(self.csv_stylesheet, (tuple, list)): 1047 stylesheet = os.path.join(xslt_path, 1048 *self.csv_stylesheet) 1049 else: 1050 stylesheet = os.path.join(xslt_path, 1051 self.controller, 1052 self.csv_stylesheet) 1053 else: 1054 xslt_filename = "%s.%s" % (self.function, self.xslt_extension) 1055 stylesheet = os.path.join(xslt_path, 1056 self.controller, 1057 xslt_filename) 1058 1059 if os.path.exists(stylesheet) is False: 1060 msg = self.messages.stylesheet_not_found % stylesheet 1061 self.error = msg 1062 current.log.debug(msg) 1063 return None 1064 1065 return stylesheet
1066 1067 # -------------------------------------------------------------------------
1068 - def _commit_import_job(self, upload_id, items):
1069 """ 1070 This will save all of the selected import items 1071 1072 @todo: parameter descriptions? 1073 """ 1074 1075 db = current.db 1076 resource = self.request.resource 1077 1078 # Load the items from the s3_import_item table 1079 self.importDetails = {} 1080 1081 table = self.upload_table 1082 row = db(table.id == upload_id).select(table.job_id, 1083 table.replace_option, 1084 limitby=(0, 1)).first() 1085 if row is None: 1086 return False 1087 else: 1088 job_id = row.job_id 1089 current.response.s3.import_replace = row.replace_option 1090 1091 itemTable = S3ImportJob.define_item_table() 1092 1093 if itemTable != None: 1094 #**************************************************************** 1095 # EXPERIMENTAL 1096 # This doesn't delete related items 1097 # but import_tree will tidy it up later 1098 #**************************************************************** 1099 # Get all the items selected for import 1100 rows = self._get_all_items(upload_id, as_string=True) 1101 1102 # Loop through each row and delete the items not required 1103 self._store_import_details(job_id, "preDelete") 1104 for _id in rows: 1105 if str(_id) not in items: 1106 # @todo: replace with a helper method from the API 1107 db(itemTable.id == _id).delete() 1108 1109 #**************************************************************** 1110 # EXPERIMENTAL 1111 #**************************************************************** 1112 1113 # Set up the table we will import data into 1114 self.table = self.controller_table 1115 self.tablename = self.controller_tablename 1116 1117 self._store_import_details(job_id, "preImportTree") 1118 1119 # Now commit the remaining items 1120 resource.import_xml(None, 1121 job_id = job_id, 1122 ignore_errors = True, 1123 ) 1124 return resource.error is None
1125 1126 # -------------------------------------------------------------------------
1127 - def _store_import_details(self, job_id, key):
1128 """ 1129 This will store the details from an importJob 1130 1131 @todo: parameter descriptions? 1132 """ 1133 1134 #current.log.debug("S3Importer._store_import_details(%s, %s)" % (job_id, key)) 1135 1136 itable = S3ImportJob.define_item_table() 1137 1138 query = (itable.job_id == job_id) & \ 1139 (itable.tablename == self.controller_tablename) 1140 rows = current.db(query).select(itable.data, itable.error) 1141 items = [{"data": row.data, "error": row.error} for row in rows] 1142 1143 self.importDetails[key] = items
1144 1145 # -------------------------------------------------------------------------
1146 - def _update_upload_job(self, upload_id):
1147 """ 1148 This will record the results from the import, and change the 1149 status of the upload job 1150 1151 @todo: parameter descriptions? 1152 @todo: report errors in referenced records, too 1153 """ 1154 1155 #current.log.debug("S3Importer._update_upload_job(%s)" % upload_id) 1156 1157 resource = self.request.resource 1158 db = current.db 1159 1160 totalPreDelete = len(self.importDetails["preDelete"]) 1161 totalPreImport = len(self.importDetails["preImportTree"]) 1162 totalIgnored = totalPreDelete - totalPreImport 1163 1164 if resource.error_tree is None: 1165 totalErrors = 0 1166 else: 1167 totalErrors = len(resource.error_tree.findall( 1168 "resource[@name='%s']" % resource.tablename)) 1169 1170 totalRecords = totalPreImport - totalErrors 1171 if totalRecords < 0: 1172 totalRecords = 0 1173 1174 query = (self.upload_table.id == upload_id) 1175 db(query).update(summary_added=totalRecords, 1176 summary_error=totalErrors, 1177 summary_ignored = totalIgnored, 1178 status = 3) 1179 1180 # Commit the changes 1181 db.commit() 1182 return (totalRecords, totalErrors, totalIgnored)
1183 1184 # -------------------------------------------------------------------------
1185 - def _display_completed_job(self, totals, timestmp=None):
1186 """ 1187 Generate a summary flash message for a completed import job 1188 1189 @param totals: the job totals as tuple 1190 (total imported, total errors, total ignored) 1191 @param timestmp: the timestamp of the completion 1192 """ 1193 1194 messages = self.messages 1195 msg = "%s - %s - %s" % \ 1196 (messages.commit_total_records_imported, 1197 messages.commit_total_errors, 1198 messages.commit_total_records_ignored) 1199 msg = msg % totals 1200 1201 if timestmp != None: 1202 current.session.flash = messages.job_completed % \ 1203 (self.date_represent(timestmp), msg) 1204 elif totals[1] is not 0: 1205 current.session.error = msg 1206 elif totals[2] is not 0: 1207 current.session.warning = msg 1208 else: 1209 current.session.flash = msg
1210 1211 # -------------------------------------------------------------------------
1212 - def _dataTable(self, 1213 list_fields, 1214 #sort_by = [[1, "asc"]], 1215 represent=None, 1216 ajax_item_id=None, 1217 dt_bulk_select=None, 1218 ):
1219 """ 1220 Method to get the data for the dataTable 1221 This can be either a raw html representation or 1222 and ajax call update 1223 Additional data will be cached to limit calls back to the server 1224 1225 @param list_fields: list of field names 1226 @param sort_by: list of sort by columns 1227 @param represent: a dict of field callback functions used 1228 to change how the data will be displayed 1229 keyed on the field identifier 1230 1231 @return: a dict() 1232 In html representations this will be a table of the data 1233 plus the sortby instructions 1234 In ajax this will be a json response 1235 1236 In addition the following values will be made available: 1237 recordsTotal Number of records in the filtered data set 1238 recordsFiltered Number of records to display 1239 start Start point in the ordered data set 1240 limit Number of records in the ordered set 1241 NOTE: limit - recordsFiltered = total cached 1242 """ 1243 1244 from s3data import S3DataTable 1245 request = self.request 1246 resource = self.resource 1247 s3 = current.response.s3 1248 1249 # Controller Filter 1250 if s3.filter is not None: 1251 self.resource.add_filter(s3.filter) 1252 1253 representation = request.representation 1254 1255 # Datatable Filter 1256 totalrows = None 1257 if representation == "aadata": 1258 searchq, orderby, left = resource.datatable_filter(list_fields, 1259 request.get_vars) 1260 if searchq is not None: 1261 totalrows = resource.count() 1262 resource.add_filter(searchq) 1263 else: 1264 orderby, left = None, None 1265 1266 # Start/Limit 1267 if representation == "aadata": 1268 get_vars = request.get_vars 1269 start = get_vars.get("displayStart", None) 1270 limit = get_vars.get("pageLength", None) 1271 draw = int(get_vars.draw or 0) 1272 else: # catch all 1273 start = 0 1274 limit = s3.ROWSPERPAGE 1275 if limit is not None: 1276 try: 1277 start = int(start) 1278 limit = int(limit) 1279 except ValueError: 1280 start = None 1281 limit = None # use default 1282 else: 1283 start = None # use default 1284 1285 if not orderby: 1286 orderby = ~resource.table.error 1287 1288 data = resource.select(list_fields, 1289 start=start, 1290 limit=limit, 1291 count=True, 1292 orderby=orderby, 1293 left=left) 1294 rows = data["rows"] 1295 1296 displayrows = data["numrows"] 1297 if totalrows is None: 1298 totalrows = displayrows 1299 1300 # Represent the data 1301 if represent: 1302 _represent = represent.items() 1303 for row in rows: 1304 record_id = row["s3_import_item.id"] 1305 for column, method in _represent: 1306 if column in row: 1307 row[column] = method(record_id, row[column]) 1308 1309 # Build the datatable 1310 rfields = resource.resolve_selectors(list_fields)[0] 1311 dt = S3DataTable(rfields, rows, orderby=orderby) 1312 datatable_id = "import-items" 1313 if representation == "aadata": 1314 # Ajax callback 1315 output = dt.json(totalrows, 1316 displayrows, 1317 datatable_id, 1318 draw, 1319 dt_bulk_actions = [current.T("Import")]) 1320 else: 1321 # Initial HTML response 1322 url = "/%s/%s/%s/import.aadata?job=%s" % (request.application, 1323 request.controller, 1324 request.function, 1325 ajax_item_id) 1326 items = dt.html(totalrows, 1327 displayrows, 1328 datatable_id, 1329 dt_ajax_url=url, 1330 dt_bulk_actions = [current.T("Import")], 1331 dt_bulk_selected = dt_bulk_select) 1332 output = {"items":items} 1333 1334 current.response.s3.dataTableID = [datatable_id] 1335 1336 return output
1337 1338 # -------------------------------------------------------------------------
1339 - def _item_element_represent(self, item_id, value):
1340 """ 1341 Represent the element in an import item for dataTable display 1342 1343 @param value: the string containing the element 1344 """ 1345 1346 try: 1347 element = etree.fromstring(value) 1348 except: 1349 return DIV(value) 1350 1351 db = current.db 1352 tablename = element.get("name") 1353 table = db[tablename] 1354 1355 output = DIV() 1356 details = TABLE(_class="importItem item-%s" % item_id) 1357 header, rows = self._add_item_details(element.findall("data"), table) 1358 if header is not None: 1359 output.append(header) 1360 # Add components, if present 1361 components = element.findall("resource") 1362 s3db = current.s3db 1363 for component in components: 1364 ctablename = component.get("name") 1365 ctable = s3db.table(ctablename) 1366 if not ctable: 1367 continue 1368 self._add_item_details(component.findall("data"), ctable, 1369 details=rows, prefix=True) 1370 if rows: 1371 details.append(TBODY(rows)) 1372 # Add error messages, if present 1373 errors = current.xml.collect_errors(element) 1374 if errors: 1375 details.append(TFOOT(TR(TH("%s:" % current.T("Errors")), 1376 TD(UL([LI(e) for e in errors]))))) 1377 if rows == [] and components == []: 1378 # At this stage we don't have anything to display to see if we can 1379 # find something to show. This could be the case when a table being 1380 # imported is a resolver for a many to many relationship 1381 refdetail = TABLE(_class="importItem item-%s" % item_id) 1382 references = element.findall("reference") 1383 for reference in references: 1384 tuid = reference.get("tuid") 1385 resource = reference.get("resource") 1386 refdetail.append(TR(TD(resource), TD(tuid))) 1387 output.append(refdetail) 1388 else: 1389 output.append(details) 1390 return output
1391 1392 # ------------------------------------------------------------------------- 1393 @staticmethod
1394 - def _add_item_details(data, table, details=None, prefix=False):
1395 """ 1396 Add details of the item element 1397 1398 @param data: the list of data elements in the item element 1399 @param table: the table for the data 1400 @param details: the existing details rows list (to append to) 1401 """ 1402 1403 tablename = table._tablename 1404 if details is None: 1405 details = [] 1406 first = None 1407 firstString = None 1408 header = None 1409 for child in data: 1410 f = child.get("field", None) 1411 if f not in table.fields: 1412 continue 1413 elif f == "wkt": 1414 # Skip bulky WKT fields 1415 continue 1416 field = table[f] 1417 ftype = str(field.type) 1418 value = child.get("value", None) 1419 if not value: 1420 value = current.xml.xml_decode(child.text) 1421 try: 1422 value = S3Importer._decode_data(field, value) 1423 except ValueError: 1424 pass 1425 if value: 1426 value = s3_unicode(value) 1427 else: 1428 value = "" 1429 if f != None and value != None: 1430 headerText = P(B("%s: " % f), value) 1431 if not first: 1432 first = headerText 1433 if ftype == "string" and not firstString: 1434 firstString = headerText 1435 if f == "name": 1436 header = headerText 1437 if prefix: 1438 details.append(TR(TH("%s.%s:" % (tablename, f)), TD(value))) 1439 else: 1440 details.append(TR(TH("%s:" % f), TD(value))) 1441 if not header: 1442 if firstString: 1443 header = firstString 1444 else: 1445 header = first 1446 return (header, details)
1447 1448 # ------------------------------------------------------------------------- 1449 @staticmethod
1450 - def _decode_data(field, value):
1451 """ 1452 Try to decode string data into their original type 1453 1454 @param field: the Field instance 1455 @param value: the stringified value 1456 1457 @todo: replace this by ordinary decoder 1458 """ 1459 1460 if field.type == "string" or \ 1461 field.type == "password" or \ 1462 field.type == "upload" or \ 1463 field.type == "text": 1464 return value 1465 elif field.type == "integer" or \ 1466 field.type == "id": 1467 return int(value) 1468 elif field.type == "double" or \ 1469 field.type == "decimal": 1470 return float(value) 1471 elif field.type == "boolean": 1472 if value and not str(value)[:1].upper() in ["F", "0"]: 1473 return "T" 1474 else: 1475 return "F" 1476 elif field.type == "date": 1477 return value # @todo fix this to get a date 1478 elif field.type == "time": 1479 return value # @todo fix this to get a time 1480 elif field.type == "datetime": 1481 return value # @todo fix this to get a datetime 1482 else: 1483 return value
1484 1485 # ------------------------------------------------------------------------- 1486 @staticmethod
1487 - def date_represent(date_obj):
1488 """ 1489 Represent a datetime object as string 1490 1491 @param date_obj: the datetime object 1492 1493 @todo: replace by S3DateTime method? 1494 """ 1495 1496 return date_obj.strftime("%d %B %Y, %I:%M%p")
1497 1498 # -------------------------------------------------------------------------
1499 - def _process_item_list(self, upload_id, req_vars):
1500 """ 1501 Get the list of IDs for the selected items from the "mode" 1502 and "selected" request variables 1503 1504 @param upload_id: the upload_id 1505 @param vars: the request variables 1506 """ 1507 1508 items = None 1509 if "mode" in req_vars: 1510 mode = req_vars["mode"] 1511 selected = req_vars.get("selected", []) 1512 if mode == "Inclusive": 1513 items = selected 1514 elif mode == "Exclusive": 1515 all_items = self._get_all_items(upload_id, as_string=True) 1516 items = [i for i in all_items if i not in selected] 1517 return items
1518 1519 # -------------------------------------------------------------------------
1520 - def _get_all_items(self, upload_id, as_string=False):
1521 """ 1522 Get a list of the record IDs of all import items for 1523 the the given upload ID 1524 1525 @param upload_id: the upload ID 1526 @param as_string: represent each ID as string 1527 """ 1528 1529 item_table = S3ImportJob.define_item_table() 1530 upload_table = self.upload_table 1531 1532 query = (upload_table.id == upload_id) & \ 1533 (item_table.job_id == upload_table.job_id) & \ 1534 (item_table.tablename == self.controller_tablename) 1535 1536 rows = current.db(query).select(item_table.id) 1537 if as_string: 1538 items = [str(row.id) for row in rows] 1539 else: 1540 items = [row.id for row in rows] 1541 1542 return items
1543 1544 # -------------------------------------------------------------------------
1545 - def _use_upload_table(self):
1546 """ 1547 Set the resource and the table to being s3_import_upload 1548 """ 1549 1550 self.tablename = self.upload_tablename 1551 if self.upload_resource is None: 1552 self.upload_resource = current.s3db.resource(self.tablename) 1553 self.resource = self.upload_resource 1554 self.table = self.upload_table
1555 1556 # -------------------------------------------------------------------------
1557 - def _use_controller_table(self):
1558 """ 1559 Set the resource and the table to be the imported resource 1560 """ 1561 1562 self.resource = self.controller_resource 1563 self.table = self.controller_table 1564 self.tablename = self.controller_tablename
1565 1566 # -------------------------------------------------------------------------
1567 - def _use_import_item_table(self, job_id):
1568 """ 1569 Set the resource and the table to being s3_import_item 1570 """ 1571 self.table = S3ImportJob.define_item_table() 1572 self.tablename = S3ImportJob.ITEM_TABLE_NAME 1573 if self.item_resource == None: 1574 self.item_resource = current.s3db.resource(self.tablename) 1575 self.resource = self.item_resource
1576 1577 # -------------------------------------------------------------------------
1578 - def __define_table(self):
1579 """ Configures the upload table """ 1580 1581 #current.log.debug("S3Importer.__define_table()") 1582 1583 T = current.T 1584 request = current.request 1585 1586 self.upload_tablename = self.UPLOAD_TABLE_NAME 1587 1588 import_upload_status = { 1589 1: T("Pending"), 1590 2: T("In error"), 1591 3: T("Completed"), 1592 } 1593 1594 now = request.utcnow 1595 table = self.define_upload_table() 1596 table.file.upload_folder = os.path.join(request.folder, 1597 "uploads", 1598 #"imports" 1599 ) 1600 messages = self.messages 1601 table.file.comment = DIV(_class="tooltip", 1602 _title="%s|%s" % (messages.import_file, 1603 messages.import_file_comment)) 1604 table.file.label = messages.import_file 1605 table.status.requires = IS_IN_SET(import_upload_status, zero=None) 1606 table.status.represent = lambda opt: \ 1607 import_upload_status.get(opt, current.messages.UNKNOWN_OPT) 1608 table.user_id.label = messages.user_name 1609 table.user_id.represent = s3_auth_user_represent_name 1610 table.created_on.default = now 1611 table.created_on.represent = self.date_represent 1612 table.modified_on.default = now 1613 table.modified_on.update = now 1614 table.modified_on.represent = self.date_represent 1615 1616 table.replace_option.label = T("Replace") 1617 1618 self.upload_table = current.db[self.UPLOAD_TABLE_NAME]
1619 1620 # ------------------------------------------------------------------------- 1621 @classmethod
1622 - def define_upload_table(cls):
1623 """ Defines the upload table """ 1624 1625 1626 # @todo: move into s3db/s3.py 1627 db = current.db 1628 UPLOAD_TABLE_NAME = cls.UPLOAD_TABLE_NAME 1629 if UPLOAD_TABLE_NAME not in db: 1630 db.define_table(UPLOAD_TABLE_NAME, 1631 Field("controller", 1632 readable = False, 1633 writable = False), 1634 Field("function", 1635 readable = False, 1636 writable = False), 1637 Field("file", "upload", 1638 length = current.MAX_FILENAME_LENGTH, 1639 uploadfolder = os.path.join(current.request.folder, 1640 "uploads", "imports"), 1641 autodelete = True), 1642 Field("filename", 1643 readable = False, 1644 writable = False), 1645 Field("status", "integer", 1646 default=1, 1647 readable = False, 1648 writable = False), 1649 Field("extra_data", 1650 readable = False, 1651 writable = False), 1652 Field("replace_option", "boolean", 1653 default=False, 1654 readable = False, 1655 writable = False), 1656 Field("job_id", length=128, 1657 readable = False, 1658 writable = False), 1659 Field("user_id", "integer", 1660 readable = False, 1661 writable = False), 1662 Field("created_on", "datetime", 1663 readable = False, 1664 writable = False), 1665 Field("modified_on", "datetime", 1666 readable = False, 1667 writable = False), 1668 Field("summary_added", "integer", 1669 readable = False, 1670 writable = False), 1671 Field("summary_error", "integer", 1672 readable = False, 1673 writable = False), 1674 Field("summary_ignored", "integer", 1675 readable = False, 1676 writable = False), 1677 Field("completed_details", "text", 1678 readable = False, 1679 writable = False)) 1680 1681 return db[UPLOAD_TABLE_NAME]
1682
1683 # ============================================================================= 1684 -class S3ImportItem(object):
1685 """ Class representing an import item (=a single record) """ 1686 1687 METHOD = Storage( 1688 CREATE = "create", 1689 UPDATE = "update", 1690 DELETE = "delete", 1691 MERGE = "merge" 1692 ) 1693 1694 POLICY = Storage( 1695 THIS = "THIS", # keep local instance 1696 OTHER = "OTHER", # update unconditionally 1697 NEWER = "NEWER", # update if import is newer 1698 MASTER = "MASTER" # update if import is master 1699 ) 1700 1701 # -------------------------------------------------------------------------
1702 - def __init__(self, job):
1703 """ 1704 Constructor 1705 1706 @param job: the import job this item belongs to 1707 """ 1708 1709 self.job = job 1710 1711 # Locking and error handling 1712 self.lock = False 1713 self.error = None 1714 1715 # Identification 1716 self.item_id = uuid.uuid4() # unique ID for this item 1717 self.id = None 1718 self.uid = None 1719 1720 # Data elements 1721 self.table = None 1722 self.tablename = None 1723 self.element = None 1724 self.data = None 1725 self.original = None 1726 self.components = [] 1727 self.references = [] 1728 self.load_components = [] 1729 self.load_references = [] 1730 self.parent = None 1731 self.skip = False 1732 1733 # Conflict handling 1734 self.mci = 2 1735 self.mtime = datetime.datetime.utcnow() 1736 self.modified = True 1737 self.conflict = False 1738 1739 # Allowed import methods 1740 self.strategy = job.strategy 1741 # Update and conflict resolution policies 1742 self.update_policy = job.update_policy 1743 self.conflict_policy = job.conflict_policy 1744 1745 # Actual import method 1746 self.method = None 1747 1748 self.onvalidation = None 1749 self.onaccept = None 1750 1751 # Item import status flags 1752 self.accepted = None 1753 self.permitted = False 1754 self.committed = False 1755 1756 # Writeback hook for circular references: 1757 # Items which need a second write to update references 1758 self.update = []
1759 1760 # -------------------------------------------------------------------------
1761 - def __repr__(self):
1762 """ Helper method for debugging """ 1763 1764 _str = "<S3ImportItem %s {item_id=%s uid=%s id=%s error=%s data=%s}>" % \ 1765 (self.table, self.item_id, self.uid, self.id, self.error, self.data) 1766 return _str
1767 1768 # -------------------------------------------------------------------------
1769 - def parse(self, 1770 element, 1771 original=None, 1772 table=None, 1773 tree=None, 1774 files=None):
1775 """ 1776 Read data from a <resource> element 1777 1778 @param element: the element 1779 @param table: the DB table 1780 @param tree: the import tree 1781 @param files: uploaded files 1782 1783 @return: True if successful, False if not (sets self.error) 1784 """ 1785 1786 s3db = current.s3db 1787 xml = current.xml 1788 1789 ERROR = xml.ATTRIBUTE["error"] 1790 1791 self.element = element 1792 if table is None: 1793 tablename = element.get(xml.ATTRIBUTE["name"]) 1794 table = s3db.table(tablename) 1795 if table is None: 1796 self.error = current.ERROR.BAD_RESOURCE 1797 element.set(ERROR, s3_unicode(self.error)) 1798 return False 1799 else: 1800 tablename = table._tablename 1801 1802 self.table = table 1803 self.tablename = tablename 1804 1805 UID = xml.UID 1806 1807 if original is None: 1808 original = S3Resource.original(table, element, 1809 mandatory = self._mandatory_fields()) 1810 elif isinstance(original, basestring) and UID in table.fields: 1811 # Single-component update in add-item => load the original now 1812 query = (table[UID] == original) 1813 pkeys = set(fname for fname in table.fields if table[fname].unique) 1814 fields = S3Resource.import_fields(table, pkeys, 1815 mandatory = self._mandatory_fields()) 1816 original = current.db(query).select(limitby=(0, 1), *fields).first() 1817 else: 1818 original = None 1819 1820 postprocess = s3db.get_config(tablename, "xml_post_parse") 1821 data = xml.record(table, element, 1822 files=files, 1823 original=original, 1824 postprocess=postprocess) 1825 1826 if data is None: 1827 self.error = current.ERROR.VALIDATION_ERROR 1828 self.accepted = False 1829 if not element.get(ERROR, False): 1830 element.set(ERROR, s3_unicode(self.error)) 1831 return False 1832 1833 self.data = data 1834 1835 MCI = xml.MCI 1836 MTIME = xml.MTIME 1837 1838 self.uid = data.get(UID) 1839 if original is not None: 1840 1841 self.original = original 1842 self.id = original[table._id.name] 1843 1844 if not current.response.s3.synchronise_uuids and UID in original: 1845 self.uid = self.data[UID] = original[UID] 1846 1847 if MTIME in data: 1848 self.mtime = data[MTIME] 1849 if MCI in data: 1850 self.mci = data[MCI] 1851 1852 #current.log.debug("New item: %s" % self) 1853 return True
1854 1855 # -------------------------------------------------------------------------
1856 - def deduplicate(self):
1857 """ 1858 Detect whether this is an update or a new record 1859 """ 1860 1861 table = self.table 1862 if table is None or self.id: 1863 return 1864 1865 METHOD = self.METHOD 1866 CREATE = METHOD["CREATE"] 1867 UPDATE = METHOD["UPDATE"] 1868 DELETE = METHOD["DELETE"] 1869 MERGE = METHOD["MERGE"] 1870 1871 xml = current.xml 1872 UID = xml.UID 1873 1874 data = self.data 1875 if self.job.second_pass and UID in table.fields: 1876 uid = data.get(UID) 1877 if uid and not self.element.get(UID) and not self.original: 1878 # Previously identified original does no longer exist 1879 del data[UID] 1880 1881 mandatory = self._mandatory_fields() 1882 1883 if self.original is not None: 1884 original = self.original 1885 elif self.data: 1886 original = S3Resource.original(table, 1887 self.data, 1888 mandatory=mandatory, 1889 ) 1890 else: 1891 original = None 1892 1893 synchronise_uuids = current.response.s3.synchronise_uuids 1894 1895 deleted = data[xml.DELETED] 1896 if deleted: 1897 if data[xml.REPLACEDBY]: 1898 self.method = MERGE 1899 else: 1900 self.method = DELETE 1901 1902 self.uid = data.get(UID) 1903 1904 if original is not None: 1905 1906 # The original record could be identified by a unique-key-match, 1907 # so this must be an update 1908 self.id = original[table._id.name] 1909 1910 if not deleted: 1911 self.method = UPDATE 1912 1913 else: 1914 1915 if UID in data and not synchronise_uuids: 1916 # The import item has a UUID but there is no match 1917 # in the database, so this must be a new record 1918 self.id = None 1919 if not deleted: 1920 self.method = CREATE 1921 else: 1922 # Nonexistent record to be deleted => skip 1923 self.method = DELETE 1924 self.skip = True 1925 else: 1926 # Use the resource's deduplicator to identify the original 1927 resolve = current.s3db.get_config(self.tablename, "deduplicate") 1928 if data and resolve: 1929 resolve(self) 1930 1931 if self.id and self.method in (UPDATE, DELETE, MERGE): 1932 # Retrieve the original 1933 fields = S3Resource.import_fields(table, 1934 data, 1935 mandatory=mandatory, 1936 ) 1937 original = current.db(table._id == self.id) \ 1938 .select(limitby=(0, 1), *fields).first() 1939 1940 # Retain the original UUID (except in synchronise_uuids mode) 1941 if original and not synchronise_uuids and UID in original: 1942 self.uid = data[UID] = original[UID] 1943 1944 self.original = original
1945 1946 # -------------------------------------------------------------------------
1947 - def authorize(self):
1948 """ 1949 Authorize the import of this item, sets self.permitted 1950 """ 1951 1952 if not self.table: 1953 return False 1954 1955 auth = current.auth 1956 tablename = self.tablename 1957 1958 # Check whether self.table is protected 1959 if not auth.override and tablename.split("_", 1)[0] in auth.PROTECTED: 1960 return False 1961 1962 # Determine the method 1963 METHOD = self.METHOD 1964 if self.data.deleted is True: 1965 if self.data.deleted_rb: 1966 self.method = METHOD["MERGE"] 1967 else: 1968 self.method = METHOD["DELETE"] 1969 self.accepted = True if self.id else False 1970 elif self.id: 1971 if not self.original: 1972 fields = S3Resource.import_fields(self.table, self.data, 1973 mandatory=self._mandatory_fields()) 1974 query = (self.table.id == self.id) 1975 self.original = current.db(query).select(limitby=(0, 1), 1976 *fields).first() 1977 if self.original: 1978 self.method = METHOD["UPDATE"] 1979 else: 1980 self.method = METHOD["CREATE"] 1981 else: 1982 self.method = METHOD["CREATE"] 1983 1984 # Set self.id 1985 if self.method == METHOD["CREATE"]: 1986 self.id = 0 1987 1988 # Authorization 1989 authorize = current.auth.s3_has_permission 1990 if authorize: 1991 self.permitted = authorize(self.method, 1992 tablename, 1993 record_id=self.id) 1994 else: 1995 self.permitted = True 1996 1997 return self.permitted
1998 1999 # -------------------------------------------------------------------------
2000 - def validate(self):
2001 """ 2002 Validate this item (=record onvalidation), sets self.accepted 2003 """ 2004 2005 data = self.data 2006 2007 if self.accepted is not None: 2008 return self.accepted 2009 if data is None or not self.table: 2010 self.accepted = False 2011 return False 2012 2013 xml = current.xml 2014 ERROR = xml.ATTRIBUTE["error"] 2015 2016 METHOD = self.METHOD 2017 DELETE = METHOD.DELETE 2018 MERGE = METHOD.MERGE 2019 2020 # Detect update 2021 if not self.id: 2022 self.deduplicate() 2023 if self.accepted is False: 2024 # Item rejected by deduplicator (e.g. due to ambiguity) 2025 return False 2026 2027 # Don't need to validate skipped or deleted records 2028 if self.skip or self.method in (DELETE, MERGE): 2029 self.accepted = True if self.id else False 2030 return True 2031 2032 # Set dynamic defaults for new records 2033 if not self.id: 2034 self._dynamic_defaults(data) 2035 2036 # Check for mandatory fields 2037 required_fields = self._mandatory_fields() 2038 2039 all_fields = data.keys() 2040 2041 failed_references = [] 2042 items = self.job.items 2043 for reference in self.references: 2044 resolvable = resolved = True 2045 entry = reference.entry 2046 if entry and not entry.id: 2047 if entry.item_id: 2048 item = items[entry.item_id] 2049 if item.error: 2050 relement = reference.element 2051 if relement is not None: 2052 # Repeat the errors from the referenced record 2053 # in the <reference> element (better reasoning) 2054 msg = "; ".join(xml.collect_errors(entry.element)) 2055 relement.set(ERROR, msg) 2056 else: 2057 resolvable = False 2058 resolved = False 2059 else: 2060 resolvable = resolved = False 2061 field = reference.field 2062 if isinstance(field, (tuple, list)): 2063 field = field[1] 2064 if resolved: 2065 all_fields.append(field) 2066 elif resolvable: 2067 # Both reference and referenced record are in the XML, 2068 # => treat foreign key as mandatory, and mark as failed 2069 if field not in required_fields: 2070 required_fields.append(field) 2071 if field not in failed_references: 2072 failed_references.append(field) 2073 2074 missing = [fname for fname in required_fields 2075 if fname not in all_fields] 2076 2077 original = self.original 2078 if missing: 2079 if original: 2080 missing = [fname for fname in missing 2081 if fname not in original] 2082 if missing: 2083 fields = [f for f in missing 2084 if f not in failed_references] 2085 if fields: 2086 errors = ["%s: value(s) required" % ", ".join(fields)] 2087 else: 2088 errors = [] 2089 if failed_references: 2090 fields = ", ".join(failed_references) 2091 errors.append("%s: reference import(s) failed" % 2092 ", ".join(failed_references)) 2093 self.error = "; ".join(errors) 2094 self.element.set(ERROR, self.error) 2095 self.accepted = False 2096 return False 2097 2098 # Run onvalidation 2099 form = Storage(method = self.method, 2100 vars = data, 2101 request_vars = data, 2102 # Useless since always incomplete: 2103 #record = original, 2104 ) 2105 if self.id: 2106 form.vars.id = self.id 2107 2108 form.errors = Storage() 2109 tablename = self.tablename 2110 key = "%s_onvalidation" % self.method 2111 get_config = current.s3db.get_config 2112 onvalidation = get_config(tablename, key, 2113 get_config(tablename, "onvalidation")) 2114 if onvalidation: 2115 try: 2116 callback(onvalidation, form, tablename=tablename) 2117 except: 2118 from traceback import format_exc 2119 current.log.error("S3Import %s onvalidation exception:" % tablename) 2120 current.log.debug(format_exc(10)) 2121 accepted = True 2122 if form.errors: 2123 element = self.element 2124 for k in form.errors: 2125 e = element.findall("data[@field='%s']" % k) 2126 if not e: 2127 e = element.findall("reference[@field='%s']" % k) 2128 if not e: 2129 e = element 2130 form.errors[k] = "[%s] %s" % (k, form.errors[k]) 2131 else: 2132 e = e[0] 2133 e.set(ERROR, str(form.errors[k]).decode("utf-8")) 2134 self.error = current.ERROR.VALIDATION_ERROR 2135 accepted = False 2136 2137 self.accepted = accepted 2138 return accepted
2139 2140 # -------------------------------------------------------------------------
2141 - def commit(self, ignore_errors=False):
2142 """ 2143 Commit this item to the database 2144 2145 @param ignore_errors: skip invalid components 2146 (still reports errors) 2147 """ 2148 2149 if self.committed: 2150 # already committed 2151 return True 2152 2153 # If the parent item gets skipped, then skip this item as well 2154 if self.parent is not None and self.parent.skip: 2155 return True 2156 2157 # Globals 2158 db = current.db 2159 s3db = current.s3db 2160 2161 xml = current.xml 2162 ATTRIBUTE = xml.ATTRIBUTE 2163 2164 # Methods 2165 METHOD = self.METHOD 2166 CREATE = METHOD.CREATE 2167 UPDATE = METHOD.UPDATE 2168 DELETE = METHOD.DELETE 2169 MERGE = METHOD.MERGE 2170 2171 # Policies 2172 POLICY = self.POLICY 2173 THIS = POLICY["THIS"] 2174 NEWER = POLICY["NEWER"] 2175 MASTER = POLICY["MASTER"] 2176 2177 # Constants 2178 UID = xml.UID 2179 MCI = xml.MCI 2180 MTIME = xml.MTIME 2181 VALIDATION_ERROR = current.ERROR.VALIDATION_ERROR 2182 2183 # Make item mtime TZ-aware 2184 self.mtime = s3_utc(self.mtime) 2185 2186 # Resolve references 2187 self._resolve_references() 2188 2189 # Deduplicate and validate 2190 if not self.validate(): 2191 self.skip = True 2192 2193 # Notify the error in the parent to have reported in the 2194 # interactive (2-phase) importer 2195 # Note that the parent item is already written at this point, 2196 # so this notification can NOT prevent/rollback the import of 2197 # the parent item if ignore_errors is True (forced commit), or 2198 # if the user deliberately chose to import it despite error. 2199 parent = self.parent 2200 if parent is not None: 2201 parent.error = VALIDATION_ERROR 2202 element = parent.element 2203 if not element.get(ATTRIBUTE.error, False): 2204 element.set(ATTRIBUTE.error, s3_unicode(parent.error)) 2205 2206 return ignore_errors 2207 2208 elif self.method not in (MERGE, DELETE) and self.components: 2209 for component in self.components: 2210 if component.accepted is False or \ 2211 component.data is None: 2212 component.skip = True 2213 # Skip this item on any component validation errors 2214 self.skip = True 2215 self.error = VALIDATION_ERROR 2216 return ignore_errors 2217 2218 elif self.method in (MERGE, DELETE) and not self.accepted: 2219 self.skip = True 2220 # Deletion of non-existent record: ignore silently 2221 return True 2222 2223 # Authorize item 2224 if not self.authorize(): 2225 self.error = "%s: %s, %s, %s" % (current.ERROR.NOT_PERMITTED, 2226 self.method, 2227 self.tablename, 2228 self.id) 2229 self.skip = True 2230 return ignore_errors 2231 2232 # Update the method 2233 method = self.method 2234 2235 # Check if import method is allowed in strategy 2236 strategy = self.strategy 2237 if not isinstance(strategy, (list, tuple)): 2238 strategy = [strategy] 2239 if method not in strategy: 2240 self.error = current.ERROR.NOT_PERMITTED 2241 self.skip = True 2242 return True 2243 2244 # Check mtime and mci 2245 table = self.table 2246 original = self.original 2247 original_mtime = None 2248 original_mci = 0 2249 if original: 2250 if hasattr(table, MTIME): 2251 original_mtime = s3_utc(original[MTIME]) 2252 if hasattr(table, MCI): 2253 original_mci = original[MCI] 2254 original_deleted = "deleted" in original and original.deleted 2255 else: 2256 original_deleted = False 2257 2258 # Detect conflicts 2259 job = self.job 2260 original_modified = True 2261 self.modified = True 2262 self.conflict = False 2263 last_sync = s3_utc(job.last_sync) 2264 if last_sync: 2265 if original_mtime and original_mtime < last_sync: 2266 original_modified = False 2267 if self.mtime and self.mtime < last_sync: 2268 self.modified = False 2269 if self.modified and original_modified: 2270 self.conflict = True 2271 if self.conflict and method in (UPDATE, DELETE, MERGE): 2272 if job.onconflict: 2273 job.onconflict(self) 2274 2275 if self.data is not None: 2276 data = table._filter_fields(self.data, id=True) 2277 else: 2278 data = Storage() 2279 2280 # Update policy 2281 if isinstance(self.update_policy, dict): 2282 def update_policy(f): 2283 setting = self.update_policy 2284 p = setting.get(f, 2285 setting.get("__default__", THIS)) 2286 if p not in POLICY: 2287 return THIS 2288 return p
2289 else: 2290 def update_policy(f): 2291 p = self.update_policy 2292 if p not in POLICY: 2293 return THIS 2294 return p
2295 2296 # Log this item 2297 if callable(job.log): 2298 job.log(self) 2299 2300 tablename = self.tablename 2301 enforce_realm_update = False 2302 2303 # Update existing record 2304 if method == UPDATE: 2305 2306 if original: 2307 if original_deleted: 2308 policy = update_policy(None) 2309 if policy == NEWER and \ 2310 original_mtime and original_mtime > self.mtime or \ 2311 policy == MASTER and \ 2312 (original_mci == 0 or self.mci != 1): 2313 self.skip = True 2314 return True 2315 2316 fields = data.keys() 2317 for f in fields: 2318 if f in original: 2319 # Check if unchanged 2320 if type(original[f]) is datetime.datetime: 2321 if s3_utc(data[f]) == s3_utc(original[f]): 2322 del data[f] 2323 continue 2324 else: 2325 if data[f] == original[f]: 2326 del data[f] 2327 continue 2328 remove = False 2329 policy = update_policy(f) 2330 if policy == THIS: 2331 remove = True 2332 elif policy == NEWER: 2333 if original_mtime and original_mtime > self.mtime: 2334 remove = True 2335 elif policy == MASTER: 2336 if original_mci == 0 or self.mci != 1: 2337 remove = True 2338 if remove: 2339 del data[f] 2340 2341 if original_deleted: 2342 # Undelete re-imported records 2343 data["deleted"] = False 2344 if hasattr(table, "deleted_fk"): 2345 data["deleted_fk"] = "" 2346 2347 # Set new author stamp 2348 if hasattr(table, "created_by"): 2349 data["created_by"] = table.created_by.default 2350 if hasattr(table, "modified_by"): 2351 data["modified_by"] = table.modified_by.default 2352 2353 # Restore defaults for foreign keys 2354 for fieldname in table.fields: 2355 field = table[fieldname] 2356 default = field.default 2357 if str(field.type)[:9] == "reference" and \ 2358 fieldname not in data and \ 2359 default is not None: 2360 data[fieldname] = default 2361 2362 # Enforce update of realm entity 2363 enforce_realm_update = True 2364 2365 if not self.skip and not self.conflict and \ 2366 (len(data) or self.components or self.references): 2367 if self.uid and hasattr(table, UID): 2368 data[UID] = self.uid 2369 if MTIME in table: 2370 data[MTIME] = self.mtime 2371 if MCI in data: 2372 # retain local MCI on updates 2373 del data[MCI] 2374 query = (table._id == self.id) 2375 try: 2376 db(query).update(**dict(data)) 2377 except: 2378 self.error = sys.exc_info()[1] 2379 self.skip = True 2380 return ignore_errors 2381 else: 2382 self.committed = True 2383 else: 2384 # Nothing to update 2385 self.committed = True 2386 2387 # Create new record 2388 elif method == CREATE: 2389 2390 # Do not apply field policy to UID and MCI 2391 if UID in data: 2392 del data[UID] 2393 if MCI in data: 2394 del data[MCI] 2395 2396 for f in data: 2397 if update_policy(f) == MASTER and self.mci != 1: 2398 del data[f] 2399 2400 if len(data) or self.components or self.references: 2401 2402 # Restore UID and MCI 2403 if self.uid and UID in table.fields: 2404 data[UID] = self.uid 2405 if MCI in table.fields: 2406 data[MCI] = self.mci 2407 2408 # Insert the new record 2409 try: 2410 success = table.insert(**dict(data)) 2411 except: 2412 self.error = sys.exc_info()[1] 2413 self.skip = True 2414 return ignore_errors 2415 if success: 2416 self.id = success 2417 self.committed = True 2418 2419 else: 2420 # Nothing to create 2421 self.skip = True 2422 return True 2423 2424 # Delete local record 2425 elif method == DELETE: 2426 2427 if original: 2428 if original_deleted: 2429 self.skip = True 2430 policy = update_policy(None) 2431 if policy == THIS: 2432 self.skip = True 2433 elif policy == NEWER and \ 2434 (original_mtime and original_mtime > self.mtime): 2435 self.skip = True 2436 elif policy == MASTER and \ 2437 (original_mci == 0 or self.mci != 1): 2438 self.skip = True 2439 else: 2440 self.skip = True 2441 2442 if not self.skip and not self.conflict: 2443 2444 resource = s3db.resource(tablename, id=self.id) 2445 # Use cascade=True so that the deletion can be 2446 # rolled back (e.g. trial phase, subsequent failure) 2447 success = resource.delete(cascade=True) 2448 if resource.error: 2449 self.error = resource.error 2450 self.skip = True 2451 return ignore_errors 2452 2453 return True 2454 2455 # Merge records 2456 elif method == MERGE: 2457 2458 if UID not in table.fields: 2459 self.skip = True 2460 elif original: 2461 if original_deleted: 2462 self.skip = True 2463 policy = update_policy(None) 2464 if policy == THIS: 2465 self.skip = True 2466 elif policy == NEWER and \ 2467 (original_mtime and original_mtime > self.mtime): 2468 self.skip = True 2469 elif policy == MASTER and \ 2470 (original_mci == 0 or self.mci != 1): 2471 self.skip = True 2472 else: 2473 self.skip = True 2474 2475 if not self.skip and not self.conflict: 2476 2477 row = db(table[UID] == data[xml.REPLACEDBY]) \ 2478 .select(table._id, limitby=(0, 1)) \ 2479 .first() 2480 if row: 2481 original_id = row[table._id] 2482 resource = s3db.resource(tablename, 2483 id = [original_id, self.id], 2484 ) 2485 try: 2486 success = resource.merge(original_id, self.id) 2487 except: 2488 self.error = sys.exc_info()[1] 2489 self.skip = True 2490 return ignore_errors 2491 if success: 2492 self.committed = True 2493 else: 2494 self.skip = True 2495 2496 return True 2497 2498 else: 2499 raise RuntimeError("unknown import method: %s" % method) 2500 2501 # Audit + onaccept on successful commits 2502 if self.committed: 2503 2504 # Create a pseudo-form for callbacks 2505 form = Storage() 2506 form.method = method 2507 form.table = table 2508 form.vars = self.data 2509 prefix, name = tablename.split("_", 1) 2510 if self.id: 2511 form.vars.id = self.id 2512 2513 # Audit 2514 current.audit(method, prefix, name, 2515 form = form, 2516 record = self.id, 2517 representation = "xml", 2518 ) 2519 2520 # Prevent that record post-processing breaks time-delayed 2521 # synchronization by implicitly updating "modified_on" 2522 if MTIME in table.fields: 2523 modified_on = table[MTIME] 2524 modified_on_update = modified_on.update 2525 modified_on.update = None 2526 else: 2527 modified_on_update = None 2528 2529 # Update super entity links 2530 s3db.update_super(table, form.vars) 2531 if method == CREATE: 2532 # Set record owner 2533 current.auth.s3_set_record_owner(table, self.id) 2534 elif method == UPDATE: 2535 # Update realm 2536 update_realm = enforce_realm_update or \ 2537 s3db.get_config(table, "update_realm") 2538 if update_realm: 2539 current.auth.set_realm_entity(table, self.id, 2540 force_update = True, 2541 ) 2542 # Onaccept 2543 key = "%s_onaccept" % method 2544 onaccept = current.deployment_settings.get_import_callback(tablename, key) 2545 if onaccept: 2546 callback(onaccept, form, tablename=tablename) 2547 2548 # Restore modified_on.update 2549 if modified_on_update is not None: 2550 modified_on.update = modified_on_update 2551 2552 # Update referencing items 2553 if self.update and self.id: 2554 for u in self.update: 2555 item = u.get("item", None) 2556 if not item: 2557 continue 2558 field = u.get("field", None) 2559 if isinstance(field, (list, tuple)): 2560 pkey, fkey = field 2561 query = (table.id == self.id) 2562 row = db(query).select(table[pkey], 2563 limitby=(0, 1)).first() 2564 if row: 2565 item._update_reference(fkey, row[pkey]) 2566 else: 2567 item._update_reference(field, self.id) 2568 2569 return True 2570 2571 # -------------------------------------------------------------------------
2572 - def _dynamic_defaults(self, data):
2573 """ 2574 Applies dynamic defaults from any keys in data that start with 2575 an underscore, used only for new records and only if the respective 2576 field is not populated yet. 2577 2578 @param data: the data dict 2579 """ 2580 2581 for k, v in data.items(): 2582 if k[0] == "_": 2583 fn = k[1:] 2584 if fn in self.table.fields and fn not in data: 2585 data[fn] = v
2586 2587 # -------------------------------------------------------------------------
2588 - def _mandatory_fields(self):
2589 2590 job = self.job 2591 2592 mandatory = None 2593 tablename = self.tablename 2594 2595 mfields = job.mandatory_fields 2596 if tablename in mfields: 2597 mandatory = mfields[tablename] 2598 2599 if mandatory is None: 2600 mandatory = [] 2601 for field in self.table: 2602 if field.default is not None: 2603 continue 2604 requires = field.requires 2605 if requires: 2606 if not isinstance(requires, (list, tuple)): 2607 requires = [requires] 2608 if isinstance(requires[0], IS_EMPTY_OR): 2609 continue 2610 error = field.validate("")[1] 2611 if error: 2612 mandatory.append(field.name) 2613 mfields[tablename] = mandatory 2614 2615 return mandatory
2616 2617 # -------------------------------------------------------------------------
2618 - def _resolve_references(self):
2619 """ 2620 Resolve the references of this item (=look up all foreign 2621 keys from other items of the same job). If a foreign key 2622 is not yet available, it will be scheduled for later update. 2623 """ 2624 2625 table = self.table 2626 if not table: 2627 return 2628 2629 db = current.db 2630 items = self.job.items 2631 for reference in self.references: 2632 2633 entry = reference.entry 2634 if not entry: 2635 continue 2636 2637 field = reference.field 2638 2639 # Resolve key tuples 2640 if isinstance(field, (list,tuple)): 2641 pkey, fkey = field 2642 else: 2643 pkey, fkey = ("id", field) 2644 2645 # Resolve the key table name 2646 ktablename, key, multiple = s3_get_foreign_key(table[fkey]) 2647 if not ktablename: 2648 continue 2649 if entry.tablename: 2650 ktablename = entry.tablename 2651 try: 2652 ktable = current.s3db[ktablename] 2653 except AttributeError: 2654 continue 2655 2656 # Resolve the foreign key (value) 2657 item = None 2658 fk = entry.id 2659 if entry.item_id: 2660 item = items[entry.item_id] 2661 if item: 2662 if item.original and \ 2663 item.original.get("deleted") and \ 2664 not item.committed: 2665 # Original is deleted and has not been updated 2666 fk = None 2667 else: 2668 fk = item.id 2669 if fk and pkey != "id": 2670 row = db(ktable._id == fk).select(ktable[pkey], 2671 limitby=(0, 1)).first() 2672 if not row: 2673 fk = None 2674 continue 2675 else: 2676 fk = row[pkey] 2677 2678 # Update record data 2679 if fk: 2680 if multiple: 2681 val = self.data.get(fkey, []) 2682 if fk not in val: 2683 val.append(fk) 2684 self.data[fkey] = val 2685 else: 2686 self.data[fkey] = fk 2687 else: 2688 if fkey in self.data and not multiple: 2689 del self.data[fkey] 2690 if item: 2691 item.update.append({"item": self, "field": fkey})
2692 2693 # -------------------------------------------------------------------------
2694 - def _update_reference(self, field, value):
2695 """ 2696 Helper method to update a foreign key in an already written 2697 record. Will be called by the referenced item after (and only 2698 if) it has been committed. This is only needed if the reference 2699 could not be resolved before commit due to circular references. 2700 2701 @param field: the field name of the foreign key 2702 @param value: the value of the foreign key 2703 """ 2704 2705 MTIME = current.xml.MTIME 2706 2707 table = self.table 2708 record_id = self.id 2709 2710 if not value or not table or not record_id or not self.permitted: 2711 return 2712 2713 # Prevent that reference update breaks time-delayed 2714 # synchronization by implicitly updating "modified_on" 2715 if MTIME in table.fields: 2716 modified_on = table[MTIME] 2717 modified_on_update = modified_on.update 2718 modified_on.update = None 2719 else: 2720 modified_on_update = None 2721 2722 db = current.db 2723 fieldtype = str(table[field].type) 2724 if fieldtype.startswith("list:reference"): 2725 query = (table._id == record_id) 2726 record = db(query).select(table[field], 2727 limitby=(0,1)).first() 2728 if record: 2729 values = record[field] 2730 if value not in values: 2731 values.append(value) 2732 db(table._id == record_id).update(**{field:values}) 2733 else: 2734 db(table._id == record_id).update(**{field:value}) 2735 2736 # Restore modified_on.update 2737 if modified_on_update is not None: 2738 modified_on.update = modified_on_update
2739 2740 # -------------------------------------------------------------------------
2741 - def store(self, item_table=None):
2742 """ 2743 Store this item in the DB 2744 """ 2745 2746 if item_table is None: 2747 return None 2748 2749 item_id = self.item_id 2750 db = current.db 2751 row = db(item_table.item_id == item_id).select(item_table.id, 2752 limitby=(0, 1) 2753 ).first() 2754 if row: 2755 record_id = row.id 2756 else: 2757 record_id = None 2758 2759 record = Storage(job_id = self.job.job_id, 2760 item_id = item_id, 2761 tablename = self.tablename, 2762 record_uid = self.uid, 2763 error = self.error or "", 2764 ) 2765 2766 if self.element is not None: 2767 element_str = current.xml.tostring(self.element, 2768 xml_declaration=False) 2769 record.update(element=element_str) 2770 2771 self_data = self.data 2772 if self_data is not None: 2773 table = self.table 2774 fields = table.fields 2775 data = Storage() 2776 for f in self_data.keys(): 2777 if f not in fields: 2778 continue 2779 field = table[f] 2780 field_type = str(field.type) 2781 if field_type == "id" or s3_has_foreign_key(field): 2782 continue 2783 data_ = self_data[f] 2784 if isinstance(data_, Field): 2785 # Not picklable 2786 # This is likely to be a modified_on to avoid updating this field, which skipping does just fine too 2787 continue 2788 data.update({f: data_}) 2789 data_str = cPickle.dumps(data) 2790 record.update(data=data_str) 2791 2792 ritems = [] 2793 for reference in self.references: 2794 field = reference.field 2795 entry = reference.entry 2796 store_entry = None 2797 if entry: 2798 if entry.item_id is not None: 2799 store_entry = {"field": field, 2800 "item_id": str(entry.item_id), 2801 } 2802 elif entry.uid is not None: 2803 store_entry = {"field": field, 2804 "tablename": entry.tablename, 2805 "uid": str(entry.uid), 2806 } 2807 if store_entry is not None: 2808 ritems.append(json.dumps(store_entry)) 2809 if ritems: 2810 record.update(ritems=ritems) 2811 citems = [c.item_id for c in self.components] 2812 if citems: 2813 record.update(citems=citems) 2814 if self.parent: 2815 record.update(parent=self.parent.item_id) 2816 if record_id: 2817 db(item_table.id == record_id).update(**record) 2818 else: 2819 record_id = item_table.insert(**record) 2820 2821 return record_id
2822 2823 # -------------------------------------------------------------------------
2824 - def restore(self, row):
2825 """ 2826 Restore an item from a item table row. This does not restore 2827 the references (since this can not be done before all items 2828 are restored), must call job.restore_references() to do that 2829 2830 @param row: the item table row 2831 """ 2832 2833 xml = current.xml 2834 2835 self.item_id = row.item_id 2836 self.accepted = None 2837 self.permitted = False 2838 self.committed = False 2839 tablename = row.tablename 2840 self.id = None 2841 self.uid = row.record_uid 2842 if row.data is not None: 2843 self.data = cPickle.loads(row.data) 2844 else: 2845 self.data = Storage() 2846 data = self.data 2847 if xml.MTIME in data: 2848 self.mtime = data[xml.MTIME] 2849 if xml.MCI in data: 2850 self.mci = data[xml.MCI] 2851 UID = xml.UID 2852 if UID in data: 2853 self.uid = data[UID] 2854 self.element = etree.fromstring(row.element) 2855 if row.citems: 2856 self.load_components = row.citems 2857 if row.ritems: 2858 self.load_references = [json.loads(ritem) for ritem in row.ritems] 2859 self.load_parent = row.parent 2860 s3db = current.s3db 2861 try: 2862 table = s3db[tablename] 2863 except AttributeError: 2864 self.error = current.ERROR.BAD_RESOURCE 2865 return False 2866 else: 2867 self.table = table 2868 self.tablename = tablename 2869 original = S3Resource.original(table, self.data, 2870 mandatory=self._mandatory_fields()) 2871 if original is not None: 2872 self.original = original 2873 self.id = original[table._id.name] 2874 if not current.response.s3.synchronise_uuids and UID in original: 2875 self.uid = self.data[UID] = original[UID] 2876 self.error = row.error 2877 postprocess = s3db.get_config(self.tablename, "xml_post_parse") 2878 if postprocess: 2879 postprocess(self.element, self.data) 2880 if self.error and not self.data: 2881 # Validation error 2882 return False 2883 return True
2884
2885 # ============================================================================= 2886 -class S3ImportJob():
2887 """ 2888 Class to import an element tree into the database 2889 """ 2890 2891 JOB_TABLE_NAME = "s3_import_job" 2892 ITEM_TABLE_NAME = "s3_import_item" 2893 2894 # -------------------------------------------------------------------------
2895 - def __init__(self, table, 2896 tree=None, 2897 files=None, 2898 job_id=None, 2899 strategy=None, 2900 update_policy=None, 2901 conflict_policy=None, 2902 last_sync=None, 2903 onconflict=None):
2904 """ 2905 Constructor 2906 2907 @param tree: the element tree to import 2908 @param files: files attached to the import (for upload fields) 2909 @param job_id: restore job from database (record ID or job_id) 2910 @param strategy: the import strategy 2911 @param update_policy: the update policy 2912 @param conflict_policy: the conflict resolution policy 2913 @param last_sync: the last synchronization time stamp (datetime) 2914 @param onconflict: custom conflict resolver function 2915 """ 2916 2917 self.error = None # the last error 2918 self.error_tree = etree.Element(current.xml.TAG.root) 2919 2920 self.table = table 2921 self.tree = tree 2922 self.files = files 2923 self.directory = Storage() 2924 2925 self._uidmap = None 2926 2927 # Mandatory fields 2928 self.mandatory_fields = Storage() 2929 2930 self.elements = Storage() 2931 self.items = Storage() 2932 self.references = [] 2933 2934 self.job_table = None 2935 self.item_table = None 2936 2937 self.count = 0 # total number of records imported 2938 self.created = [] # IDs of created records 2939 self.updated = [] # IDs of updated records 2940 self.deleted = [] # IDs of deleted records 2941 2942 self.log = None 2943 2944 # Import strategy 2945 if strategy is None: 2946 METHOD = S3ImportItem.METHOD 2947 strategy = [METHOD.CREATE, 2948 METHOD.UPDATE, 2949 METHOD.DELETE, 2950 METHOD.MERGE, 2951 ] 2952 if not isinstance(strategy, (tuple, list)): 2953 strategy = [strategy] 2954 self.strategy = strategy 2955 2956 # Update policy (default=always update) 2957 if update_policy: 2958 self.update_policy = update_policy 2959 else: 2960 self.update_policy = S3ImportItem.POLICY.OTHER 2961 # Conflict resolution policy (default=always update) 2962 if conflict_policy: 2963 self.conflict_policy = conflict_policy 2964 else: 2965 self.conflict_policy = S3ImportItem.POLICY.OTHER 2966 2967 # Synchronization settings 2968 self.mtime = None 2969 self.last_sync = last_sync 2970 self.onconflict = onconflict 2971 2972 if job_id: 2973 self.__define_tables() 2974 jobtable = self.job_table 2975 if str(job_id).isdigit(): 2976 query = (jobtable.id == job_id) 2977 else: 2978 query = (jobtable.job_id == job_id) 2979 row = current.db(query).select(jobtable.job_id, 2980 jobtable.tablename, 2981 limitby=(0, 1)).first() 2982 if not row: 2983 raise SyntaxError("Job record not found") 2984 self.job_id = row.job_id 2985 self.second_pass = True 2986 if not self.table: 2987 tablename = row.tablename 2988 try: 2989 table = current.s3db[tablename] 2990 except AttributeError: 2991 pass 2992 else: 2993 self.job_id = uuid.uuid4() # unique ID for this job 2994 self.second_pass = False
2995 2996 # ------------------------------------------------------------------------- 2997 @property
2998 - def uidmap(self):
2999 """ 3000 Map uuid/tuid => element, for faster reference lookups 3001 """ 3002 3003 uidmap = self._uidmap 3004 tree = self.tree 3005 3006 if uidmap is None and tree is not None: 3007 3008 root = tree if isinstance(tree, etree._Element) else tree.getroot() 3009 3010 xml = current.xml 3011 UUID = xml.UID 3012 TUID = xml.ATTRIBUTE.tuid 3013 3014 elements = root.xpath(".//%s" % xml.TAG.resource) 3015 self._uidmap = uidmap = {UUID: {}, 3016 TUID: {}, 3017 } 3018 uuidmap = uidmap[UUID] 3019 tuidmap = uidmap[TUID] 3020 for element in elements: 3021 r_uuid = element.get(UUID) 3022 if r_uuid and r_uuid not in uuidmap: 3023 uuidmap[r_uuid] = element 3024 r_tuid = element.get(TUID) 3025 if r_tuid and r_tuid not in tuidmap: 3026 tuidmap[r_tuid] = element 3027 3028 return uidmap
3029 3030 # -------------------------------------------------------------------------
3031 - def add_item(self, 3032 element=None, 3033 original=None, 3034 components=None, 3035 parent=None, 3036 joinby=None):
3037 """ 3038 Parse and validate an XML element and add it as new item 3039 to the job. 3040 3041 @param element: the element 3042 @param original: the original DB record (if already available, 3043 will otherwise be looked-up by this function) 3044 @param components: a dictionary of components (as in S3Resource) 3045 to include in the job (defaults to all 3046 defined components) 3047 @param parent: the parent item (if this is a component) 3048 @param joinby: the component join key(s) (if this is a component) 3049 3050 @return: a unique identifier for the new item, or None if there 3051 was an error. self.error contains the last error, and 3052 self.error_tree an element tree with all failing elements 3053 including error attributes. 3054 """ 3055 3056 if element in self.elements: 3057 # element has already been added to this job 3058 return self.elements[element] 3059 3060 # Parse the main element 3061 item = S3ImportItem(self) 3062 3063 # Update lookup lists 3064 item_id = item.item_id 3065 self.items[item_id] = item 3066 if element is not None: 3067 self.elements[element] = item_id 3068 3069 if not item.parse(element, 3070 original=original, 3071 files=self.files): 3072 self.error = item.error 3073 item.accepted = False 3074 if parent is None: 3075 self.error_tree.append(deepcopy(item.element)) 3076 3077 else: 3078 # Now parse the components 3079 table = item.table 3080 3081 s3db = current.s3db 3082 components = s3db.get_components(table, names=components) 3083 super_keys = s3db.get_super_keys(table) 3084 3085 cnames = Storage() 3086 cinfos = Storage() 3087 for alias in components: 3088 3089 component = components[alias] 3090 3091 ctable = component.table 3092 if ctable._id != "id" and "instance_type" in ctable.fields: 3093 # Super-entities cannot be imported to directly => skip 3094 continue 3095 3096 # Determine the keys 3097 pkey = component.pkey 3098 3099 if pkey != table._id.name and pkey not in super_keys: 3100 # Pseudo-component cannot be imported => skip 3101 continue 3102 3103 if component.linktable: 3104 ctable = component.linktable 3105 fkey = component.lkey 3106 else: 3107 fkey = component.fkey 3108 3109 ctablename = ctable._tablename 3110 if ctablename in cnames: 3111 cnames[ctablename].append(alias) 3112 else: 3113 cnames[ctablename] = [alias] 3114 3115 cinfos[(ctablename, alias)] = Storage(component = component, 3116 ctable = ctable, 3117 pkey = pkey, 3118 fkey = fkey, 3119 first = True, 3120 ) 3121 add_item = self.add_item 3122 xml = current.xml 3123 UID = xml.UID 3124 for celement in xml.components(element, names=cnames.keys()): 3125 3126 # Get the component tablename 3127 ctablename = celement.get(xml.ATTRIBUTE.name, None) 3128 if not ctablename or ctablename not in cnames: 3129 continue 3130 3131 # Get the component alias (for disambiguation) 3132 calias = celement.get(xml.ATTRIBUTE.alias, None) 3133 if calias is None: 3134 aliases = cnames[ctablename] 3135 if len(aliases) == 1: 3136 calias = aliases[0] 3137 else: 3138 calias = ctablename.split("_", 1)[1] 3139 if (ctablename, calias) not in cinfos: 3140 continue 3141 else: 3142 cinfo = cinfos[(ctablename, calias)] 3143 3144 component = cinfo.component 3145 ctable = cinfo.ctable 3146 3147 pkey = cinfo.pkey 3148 fkey = cinfo.fkey 3149 3150 original = None 3151 3152 if not component.multiple: 3153 # Single-component: skip all subsequent items after 3154 # the first under the same master record 3155 if not cinfo.first: 3156 continue 3157 cinfo.first = False 3158 3159 # Single component = the first component record 3160 # under the master record is always the original, 3161 # only relevant if the master record exists in 3162 # the db and hence item.id is not None 3163 if item.id: 3164 db = current.db 3165 query = (table.id == item.id) & \ 3166 (table[pkey] == ctable[fkey]) 3167 if UID in ctable.fields: 3168 # Load only the UUID now, parse will load any 3169 # required data later 3170 row = db(query).select(ctable[UID], 3171 limitby=(0, 1)).first() 3172 if row: 3173 original = row[UID] 3174 else: 3175 # Not nice, but a rare edge-case 3176 original = db(query).select(ctable.ALL, 3177 limitby=(0, 1)).first() 3178 3179 # Recurse 3180 item_id = add_item(element=celement, 3181 original=original, 3182 parent=item, 3183 joinby=(pkey, fkey)) 3184 if item_id is None: 3185 item.error = self.error 3186 self.error_tree.append(deepcopy(item.element)) 3187 else: 3188 citem = self.items[item_id] 3189 citem.parent = item 3190 item.components.append(citem) 3191 3192 lookahead = self.lookahead 3193 directory = self.directory 3194 3195 # Handle references 3196 table = item.table 3197 tree = self.tree 3198 if tree is not None: 3199 fields = [table[f] for f in table.fields] 3200 rfields = filter(s3_has_foreign_key, fields) 3201 item.references = lookahead(element, 3202 table=table, 3203 fields=rfields, 3204 tree=tree, 3205 directory=directory) 3206 3207 for reference in item.references: 3208 entry = reference.entry 3209 if entry and entry.element is not None: 3210 if not entry.item_id: 3211 item_id = add_item(element=entry.element) 3212 if item_id: 3213 entry.update(item_id=item_id) 3214 3215 # Parent reference 3216 if parent is not None: 3217 entry = Storage(item_id=parent.item_id, 3218 element=parent.element, 3219 tablename=parent.tablename) 3220 item.references.append(Storage(field=joinby, 3221 entry=entry)) 3222 3223 # Replacement reference 3224 data = item.data 3225 deleted = data.get(xml.DELETED, False) 3226 if deleted: 3227 field = xml.REPLACEDBY 3228 replaced_by = data.get(field, None) 3229 if replaced_by: 3230 rl = lookahead(element, 3231 table=table, 3232 tree=tree, 3233 directory=directory, 3234 lookup=(table, field, replaced_by)) 3235 if rl: 3236 entry = rl[0].entry 3237 if entry.element is not None and not entry.item_id: 3238 item_id = add_item(element=entry.element) 3239 if item_id: 3240 entry.item_id = item_id 3241 item.references.append(Storage(field=field, 3242 entry=entry)) 3243 3244 return item.item_id
3245 3246 # -------------------------------------------------------------------------
3247 - def lookahead(self, 3248 element, 3249 table=None, 3250 fields=None, 3251 tree=None, 3252 directory=None, 3253 lookup=None):
3254 """ 3255 Find referenced elements in the tree 3256 3257 @param element: the element 3258 @param table: the DB table 3259 @param fields: the FK fields in the table 3260 @param tree: the import tree 3261 @param directory: a dictionary to lookup elements in the tree 3262 (will be filled in by this function) 3263 """ 3264 3265 db = current.db 3266 s3db = current.s3db 3267 3268 xml = current.xml 3269 import_uid = xml.import_uid 3270 3271 ATTRIBUTE = xml.ATTRIBUTE 3272 TAG = xml.TAG 3273 UID = xml.UID 3274 3275 reference_list = [] 3276 rlappend = reference_list.append 3277 3278 root = None 3279 if tree is not None: 3280 root = tree if isinstance(tree, etree._Element) else tree.getroot() 3281 uidmap = self.uidmap 3282 3283 references = [lookup] if lookup else element.findall("reference") 3284 for reference in references: 3285 3286 if lookup: 3287 tablename = element.get(ATTRIBUTE.name, None) 3288 ktable, field, uid = reference 3289 attr = UID 3290 uids = [import_uid(uid)] 3291 3292 else: 3293 field = reference.get(ATTRIBUTE.field, None) 3294 3295 # Ignore references without valid field-attribute 3296 if not field or field not in fields or field not in table: 3297 continue 3298 3299 # Find the key table 3300 ktablename, key, multiple = s3_get_foreign_key(table[field]) 3301 if not ktablename: 3302 continue 3303 try: 3304 ktable = s3db[ktablename] 3305 except AttributeError: 3306 continue 3307 3308 tablename = reference.get(ATTRIBUTE.resource, None) 3309 # Ignore references to tables without UID field: 3310 if UID not in ktable.fields: 3311 continue 3312 # Fall back to key table name if tablename is not specified: 3313 if not tablename: 3314 tablename = ktablename 3315 # Super-entity references must use the super-key: 3316 if tablename != ktablename: 3317 field = (ktable._id.name, field) 3318 # Ignore direct references to super-entities: 3319 if tablename == ktablename and ktable._id.name != "id": 3320 continue 3321 # Get the foreign key 3322 uids = reference.get(UID, None) 3323 attr = UID 3324 if not uids: 3325 uids = reference.get(ATTRIBUTE.tuid, None) 3326 attr = ATTRIBUTE.tuid 3327 if uids and multiple: 3328 uids = json.loads(uids) 3329 elif uids: 3330 uids = [uids] 3331 3332 # Find the elements and map to DB records 3333 relements = [] 3334 3335 # Create a UID<->ID map 3336 id_map = {} 3337 if attr == UID and uids: 3338 if len(uids) == 1: 3339 uid = import_uid(uids[0]) 3340 query = (ktable[UID] == uid) 3341 record = db(query).select(ktable.id, 3342 cacheable = True, 3343 limitby = (0, 1), 3344 ).first() 3345 if record: 3346 id_map[uid] = record.id 3347 else: 3348 uids_ = map(import_uid, uids) 3349 query = (ktable[UID].belongs(uids_)) 3350 records = db(query).select(ktable.id, 3351 ktable[UID], 3352 limitby = (0, len(uids_)), 3353 ) 3354 for r in records: 3355 id_map[r[UID]] = r.id 3356 3357 if not uids: 3358 # Anonymous reference: <resource> inside the element 3359 expr = './/%s[@%s="%s"]' % (TAG.resource, 3360 ATTRIBUTE.name, 3361 tablename, 3362 ) 3363 relements = reference.xpath(expr) 3364 if relements and not multiple: 3365 relements = relements[:1] 3366 3367 elif root is not None: 3368 3369 for uid in uids: 3370 3371 entry = None 3372 3373 # Entry already in directory? 3374 if directory is not None: 3375 entry = directory.get((tablename, attr, uid)) 3376 3377 if not entry: 3378 e = uidmap[attr].get(uid) if uidmap else None 3379 if e is not None: 3380 # Element in the source => append to relements 3381 relements.append(e) 3382 else: 3383 # No element found, see if original record exists 3384 _uid = import_uid(uid) 3385 if _uid and _uid in id_map: 3386 _id = id_map[_uid] 3387 entry = Storage(tablename = tablename, 3388 element = None, 3389 uid = uid, 3390 id = _id, 3391 item_id = None, 3392 ) 3393 rlappend(Storage(field = field, 3394 element = reference, 3395 entry = entry, 3396 )) 3397 else: 3398 continue 3399 else: 3400 rlappend(Storage(field = field, 3401 element = reference, 3402 entry = entry, 3403 )) 3404 3405 # Create entries for all newly found elements 3406 for relement in relements: 3407 uid = relement.get(attr, None) 3408 if attr == UID: 3409 _uid = import_uid(uid) 3410 _id = _uid and id_map and id_map.get(_uid, None) or None 3411 else: 3412 _uid = None 3413 _id = None 3414 entry = Storage(tablename = tablename, 3415 element = relement, 3416 uid = uid, 3417 id = _id, 3418 item_id = None, 3419 ) 3420 # Add entry to directory 3421 if uid and directory is not None: 3422 directory[(tablename, attr, uid)] = entry 3423 # Append the entry to the reference list 3424 rlappend(Storage(field = field, 3425 element = reference, 3426 entry = entry, 3427 )) 3428 3429 return reference_list
3430 3431 # -------------------------------------------------------------------------
3432 - def load_item(self, row):
3433 """ 3434 Load an item from the item table (counterpart to add_item 3435 when restoring a job from the database) 3436 """ 3437 3438 item = S3ImportItem(self) 3439 if not item.restore(row): 3440 self.error = item.error 3441 if item.load_parent is None: 3442 self.error_tree.append(deepcopy(item.element)) 3443 # Update lookup lists 3444 item_id = item.item_id 3445 self.items[item_id] = item 3446 return item_id
3447 3448 # -------------------------------------------------------------------------
3449 - def resolve(self, item_id, import_list):
3450 """ 3451 Resolve the reference list of an item 3452 3453 @param item_id: the import item UID 3454 @param import_list: the ordered list of items (UIDs) to import 3455 """ 3456 3457 item = self.items[item_id] 3458 if item.lock or item.accepted is False: 3459 return False 3460 references = [] 3461 for reference in item.references: 3462 ritem_id = reference.entry.item_id 3463 if ritem_id and ritem_id not in import_list: 3464 references.append(ritem_id) 3465 for ritem_id in references: 3466 item.lock = True 3467 if self.resolve(ritem_id, import_list): 3468 import_list.append(ritem_id) 3469 item.lock = False 3470 return True
3471 3472 # -------------------------------------------------------------------------
3473 - def commit(self, ignore_errors=False, log_items=None):
3474 """ 3475 Commit the import job to the DB 3476 3477 @param ignore_errors: skip any items with errors 3478 (does still report the errors) 3479 @param log_items: callback function to log import items 3480 before committing them 3481 """ 3482 3483 ATTRIBUTE = current.xml.ATTRIBUTE 3484 METHOD = S3ImportItem.METHOD 3485 3486 # Resolve references 3487 import_list = [] 3488 for item_id in self.items: 3489 self.resolve(item_id, import_list) 3490 if item_id not in import_list: 3491 import_list.append(item_id) 3492 # Commit the items 3493 items = self.items 3494 count = 0 3495 mtime = None 3496 created = [] 3497 cappend = created.append 3498 updated = [] 3499 deleted = [] 3500 tablename = self.table._tablename 3501 3502 self.log = log_items 3503 failed = False 3504 for item_id in import_list: 3505 item = items[item_id] 3506 error = None 3507 3508 if item.accepted is not False: 3509 logged = False 3510 success = item.commit(ignore_errors=ignore_errors) 3511 else: 3512 # Field validation failed 3513 logged = True 3514 success = ignore_errors 3515 3516 if not success: 3517 failed = True 3518 3519 error = item.error 3520 if error: 3521 current.log.error(error) 3522 self.error = error 3523 element = item.element 3524 if element is not None: 3525 if not element.get(ATTRIBUTE.error, False): 3526 element.set(ATTRIBUTE.error, s3_unicode(self.error)) 3527 if not logged: 3528 self.error_tree.append(deepcopy(element)) 3529 3530 elif item.tablename == tablename: 3531 count += 1 3532 if mtime is None or item.mtime > mtime: 3533 mtime = item.mtime 3534 if item.id: 3535 if item.method == METHOD.CREATE: 3536 cappend(item.id) 3537 elif item.method == METHOD.UPDATE: 3538 updated.append(item.id) 3539 elif item.method in (METHOD.MERGE, METHOD.DELETE): 3540 deleted.append(item.id) 3541 3542 if failed: 3543 return False 3544 3545 self.count = count 3546 self.mtime = mtime 3547 self.created = created 3548 self.updated = updated 3549 self.deleted = deleted 3550 return True
3551 3552 # -------------------------------------------------------------------------
3553 - def __define_tables(self):
3554 """ 3555 Define the database tables for jobs and items 3556 """ 3557 3558 self.job_table = self.define_job_table() 3559 self.item_table = self.define_item_table()
3560 3561 # ------------------------------------------------------------------------- 3562 @classmethod
3563 - def define_job_table(cls):
3564 3565 db = current.db 3566 if cls.JOB_TABLE_NAME not in db: 3567 db.define_table(cls.JOB_TABLE_NAME, 3568 Field("job_id", length=128, 3569 unique=True, 3570 notnull=True), 3571 Field("tablename"), 3572 Field("timestmp", "datetime", 3573 default=datetime.datetime.utcnow())) 3574 3575 return db[cls.JOB_TABLE_NAME]
3576 3577 # ------------------------------------------------------------------------- 3578 @classmethod
3579 - def define_item_table(cls):
3580 3581 db = current.db 3582 if cls.ITEM_TABLE_NAME not in db: 3583 db.define_table(cls.ITEM_TABLE_NAME, 3584 Field("item_id", length=128, 3585 unique=True, 3586 notnull=True), 3587 Field("job_id", length=128), 3588 Field("tablename", length=128), 3589 #Field("record_id", "integer"), 3590 Field("record_uid"), 3591 Field("error", "text"), 3592 Field("data", "text"), 3593 Field("element", "text"), 3594 Field("ritems", "list:string"), 3595 Field("citems", "list:string"), 3596 Field("parent", length=128)) 3597 3598 return db[cls.ITEM_TABLE_NAME]
3599 3600 # -------------------------------------------------------------------------
3601 - def store(self):
3602 """ 3603 Store this job and all its items in the job table 3604 """ 3605 3606 db = current.db 3607 3608 self.__define_tables() 3609 jobtable = self.job_table 3610 query = jobtable.job_id == self.job_id 3611 row = db(query).select(jobtable.id, limitby=(0, 1)).first() 3612 if row: 3613 record_id = row.id 3614 else: 3615 record_id = None 3616 record = Storage(job_id=self.job_id) 3617 try: 3618 tablename = self.table._tablename 3619 except AttributeError: 3620 pass 3621 else: 3622 record.update(tablename=tablename) 3623 for item in self.items.values(): 3624 item.store(item_table=self.item_table) 3625 if record_id: 3626 db(jobtable.id == record_id).update(**record) 3627 else: 3628 record_id = jobtable.insert(**record) 3629 3630 return record_id
3631 3632 # -------------------------------------------------------------------------
3633 - def get_tree(self):
3634 """ 3635 Reconstruct the element tree of this job 3636 """ 3637 3638 if self.tree is not None: 3639 return self.tree 3640 else: 3641 xml = current.xml 3642 ATTRIBUTE = xml.ATTRIBUTE 3643 UID = xml.UID 3644 root = etree.Element(xml.TAG.root) 3645 for item in self.items.values(): 3646 element = item.element 3647 if element is not None and not item.parent: 3648 if item.tablename == self.table._tablename or \ 3649 element.get(UID, None) or \ 3650 element.get(ATTRIBUTE.tuid, None): 3651 root.append(deepcopy(element)) 3652 return etree.ElementTree(root)
3653 3654 # -------------------------------------------------------------------------
3655 - def delete(self):
3656 """ 3657 Delete this job and all its items from the job table 3658 """ 3659 3660 db = current.db 3661 3662 #current.log.debug("Deleting job ID=%s" % self.job_id) 3663 3664 self.__define_tables() 3665 item_table = self.item_table 3666 query = item_table.job_id == self.job_id 3667 db(query).delete() 3668 job_table = self.job_table 3669 query = job_table.job_id == self.job_id 3670 db(query).delete()
3671 3672 # -------------------------------------------------------------------------
3673 - def restore_references(self):
3674 """ 3675 Restore the job's reference structure after loading items 3676 from the item table 3677 """ 3678 3679 db = current.db 3680 UID = current.xml.UID 3681 3682 for item in self.items.values(): 3683 for citem_id in item.load_components: 3684 if citem_id in self.items: 3685 item.components.append(self.items[citem_id]) 3686 item.load_components = [] 3687 for ritem in item.load_references: 3688 field = ritem["field"] 3689 if "item_id" in ritem: 3690 item_id = ritem["item_id"] 3691 if item_id in self.items: 3692 _item = self.items[item_id] 3693 entry = Storage(tablename=_item.tablename, 3694 element=_item.element, 3695 uid=_item.uid, 3696 id=_item.id, 3697 item_id=item_id) 3698 item.references.append(Storage(field=field, 3699 entry=entry)) 3700 else: 3701 _id = None 3702 uid = ritem.get("uid", None) 3703 tablename = ritem.get("tablename", None) 3704 if tablename and uid: 3705 try: 3706 table = current.s3db[tablename] 3707 except AttributeError: 3708 continue 3709 if UID not in table.fields: 3710 continue 3711 query = table[UID] == uid 3712 row = db(query).select(table._id, 3713 limitby=(0, 1)).first() 3714 if row: 3715 _id = row[table._id.name] 3716 else: 3717 continue 3718 entry = Storage(tablename = ritem["tablename"], 3719 element=None, 3720 uid = ritem["uid"], 3721 id = _id, 3722 item_id = None) 3723 item.references.append(Storage(field=field, 3724 entry=entry)) 3725 item.load_references = [] 3726 if item.load_parent is not None: 3727 parent = self.items[item.load_parent] 3728 if parent is None: 3729 # Parent has been removed 3730 item.skip = True 3731 else: 3732 item.parent = parent 3733 item.load_parent = None
3734
3735 # ============================================================================= 3736 -class S3Duplicate(object):
3737 """ Standard deduplicator method """ 3738
3739 - def __init__(self, 3740 primary=None, 3741 secondary=None, 3742 ignore_case=True, 3743 ignore_deleted=False):
3744 """ 3745 Constructor 3746 3747 @param primary: list or tuple of primary fields to find a 3748 match, must always match (mandatory, defaults 3749 to "name" field) 3750 @param secondary: list or tuple of secondary fields to 3751 find a match, must match if values are 3752 present in the import item 3753 @param ignore_case: ignore case for string/text fields 3754 @param ignore_deleted: do not match deleted records 3755 3756 @ToDo: Fuzzy option to do a LIKE search 3757 """ 3758 3759 if not primary: 3760 primary = ("name",) 3761 self.primary = set(primary) 3762 3763 if not secondary: 3764 self.secondary = set() 3765 else: 3766 self.secondary = set(secondary) 3767 3768 self.ignore_case = ignore_case 3769 self.ignore_deleted = ignore_deleted
3770 3771 # -------------------------------------------------------------------------
3772 - def __call__(self, item):
3773 """ 3774 Entry point for importer 3775 3776 @param item: the import item 3777 3778 @return: the duplicate Row if match found, otherwise None 3779 3780 @raise SyntaxError: if any of the query fields doesn't exist 3781 in the item table 3782 """ 3783 3784 data = item.data 3785 table = item.table 3786 3787 query = None 3788 error = "Invalid field for duplicate detection: %s (%s)" 3789 3790 # Primary query (mandatory) 3791 primary = self.primary 3792 for fname in primary: 3793 3794 if fname not in table.fields: 3795 raise SyntaxError(error % (fname, table)) 3796 3797 field = table[fname] 3798 value = data.get(fname) 3799 3800 q = self.match(field, value) 3801 query = q if query is None else query & q 3802 3803 # Secondary queries (optional) 3804 secondary = self.secondary 3805 for fname in secondary: 3806 3807 if fname not in table.fields: 3808 raise SyntaxError(error % (fname, table)) 3809 3810 field = table[fname] 3811 value = data.get(fname) 3812 if value: 3813 query &= self.match(field, value) 3814 3815 # Ignore deleted records? 3816 if self.ignore_deleted and "deleted" in table.fields: 3817 query &= (table.deleted != True) 3818 3819 # Find a match 3820 duplicate = current.db(query).select(table._id, 3821 limitby = (0, 1)).first() 3822 3823 if duplicate: 3824 # Match found: Update import item 3825 item.id = duplicate[table._id] 3826 if not data.deleted: 3827 item.method = item.METHOD.UPDATE 3828 3829 # For uses outside of imports: 3830 return duplicate
3831 3832 # -------------------------------------------------------------------------
3833 - def match(self, field, value):
3834 """ 3835 Helper function to generate a match-query 3836 3837 @param field: the Field 3838 @param value: the value 3839 3840 @return: a Query 3841 """ 3842 3843 ftype = str(field.type) 3844 ignore_case = self.ignore_case 3845 3846 if ignore_case and \ 3847 hasattr(value, "lower") and ftype in ("string", "text"): 3848 # NB Must convert to unicode before lower() in order to correctly 3849 # convert certain unicode-characters (e.g. İ=>i, or Ẽ=>ẽ) 3850 # => PostgreSQL LOWER() on Windows may not convert correctly, 3851 # which seems to be a locale issue: 3852 # http://stackoverflow.com/questions/18507589/the-lower-function-on-international-characters-in-postgresql 3853 # => works fine on Debian servers if the locale is a .UTF-8 before 3854 # the Postgres cluster is created 3855 query = (field.lower() == s3_unicode(value).lower().encode("utf-8")) 3856 else: 3857 query = (field == value) 3858 3859 return query
3860
3861 # ============================================================================= 3862 -class S3BulkImporter(object):
3863 """ 3864 Import CSV files of data to pre-populate the database. 3865 Suitable for use in Testing, Demos & Simulations 3866 3867 http://eden.sahanafoundation.org/wiki/DeveloperGuidelines/PrePopulate 3868 """ 3869
3870 - def __init__(self):
3871 """ Constructor """ 3872 3873 import csv 3874 from xml.sax.saxutils import unescape 3875 3876 self.csv = csv 3877 self.unescape = unescape 3878 self.tasks = [] 3879 # Some functions refer to a different resource 3880 self.alternateTables = { 3881 "hrm_group_membership": {"tablename": "pr_group_membership", 3882 "prefix": "pr", 3883 "name": "group_membership"}, 3884 "hrm_person": {"tablename": "pr_person", 3885 "prefix": "pr", 3886 "name": "person"}, 3887 "member_person": {"tablename": "pr_person", 3888 "prefix": "pr", 3889 "name": "person"}, 3890 } 3891 # Keep track of which resources have been customised so we don't do this twice 3892 self.customised = [] 3893 self.errorList = [] 3894 self.resultList = []
3895 3896 # -------------------------------------------------------------------------
3897 - def load_descriptor(self, path):
3898 """ 3899 Load the descriptor file and then all the import tasks in that file 3900 into the task property. 3901 The descriptor file is the file called tasks.cfg in path. 3902 The file consists of a comma separated list of: 3903 module, resource name, csv filename, xsl filename. 3904 """ 3905 3906 source = open(os.path.join(path, "tasks.cfg"), "r") 3907 values = self.csv.reader(source) 3908 for details in values: 3909 if details == []: 3910 continue 3911 prefix = details[0][0].strip('" ') 3912 if prefix == "#": # comment 3913 continue 3914 if prefix == "*": # specialist function 3915 self.extract_other_import_line(path, details) 3916 else: # standard CSV importer 3917 self.extract_csv_import_line(path, details)
3918 3919 # -------------------------------------------------------------------------
3920 - def extract_csv_import_line(self, path, details):
3921 """ 3922 Extract the details for a CSV Import Task 3923 """ 3924 3925 argCnt = len(details) 3926 if argCnt == 4 or argCnt == 5: 3927 # Remove any spaces and enclosing double quote 3928 mod = details[0].strip('" ') 3929 res = details[1].strip('" ') 3930 folder = current.request.folder 3931 3932 csvFileName = details[2].strip('" ') 3933 if csvFileName[:7] == "http://": 3934 csv = csvFileName 3935 else: 3936 (csvPath, csvFile) = os.path.split(csvFileName) 3937 if csvPath != "": 3938 path = os.path.join(folder, 3939 "modules", 3940 "templates", 3941 csvPath) 3942 # @todo: deprecate this block once migration completed 3943 if not os.path.exists(path): 3944 # Non-standard location (legacy template)? 3945 path = os.path.join(folder, 3946 "private", 3947 "templates", 3948 csvPath) 3949 csv = os.path.join(path, csvFile) 3950 3951 xslFileName = details[3].strip('" ') 3952 templateDir = os.path.join(folder, 3953 "static", 3954 "formats", 3955 "s3csv") 3956 # Try the module directory in the templates directory first 3957 xsl = os.path.join(templateDir, mod, xslFileName) 3958 if os.path.exists(xsl) == False: 3959 # Now try the templates directory 3960 xsl = os.path.join(templateDir, xslFileName) 3961 if os.path.exists(xsl) == False: 3962 # Use the same directory as the csv file 3963 xsl = os.path.join(path, xslFileName) 3964 if os.path.exists(xsl) == False: 3965 self.errorList.append( 3966 "Failed to find a transform file %s, Giving up." % xslFileName) 3967 return 3968 3969 if argCnt == 5: 3970 extra_data = details[4] 3971 else: 3972 extra_data = None 3973 self.tasks.append([1, mod, res, csv, xsl, extra_data]) 3974 else: 3975 self.errorList.append( 3976 "prepopulate error: job not of length 4, ignored: %s" % str(details))
3977 3978 # -------------------------------------------------------------------------
3979 - def extract_other_import_line(self, path, details):
3980 """ 3981 Store a single import job into the tasks property 3982 *,function,filename,*extraArgs 3983 """ 3984 3985 function = details[1].strip('" ') 3986 filepath = None 3987 if len(details) >= 3: 3988 filename = details[2].strip('" ') 3989 if filename != "": 3990 (subfolder, filename) = os.path.split(filename) 3991 if subfolder != "": 3992 path = os.path.join(current.request.folder, 3993 "modules", 3994 "templates", 3995 subfolder) 3996 # @todo: deprecate this block once migration completed 3997 if not os.path.exists(path): 3998 # Non-standard location (legacy template)? 3999 path = os.path.join(current.request.folder, 4000 "private", 4001 "templates", 4002 subfolder) 4003 filepath = os.path.join(path, filename) 4004 4005 if len(details) >= 4: 4006 extraArgs = details[3:] 4007 else: 4008 extraArgs = None 4009 4010 self.tasks.append((2, function, filepath, extraArgs))
4011 4012 # -------------------------------------------------------------------------
4013 - def execute_import_task(self, task):
4014 """ 4015 Execute each import job, in order 4016 """ 4017 4018 start = datetime.datetime.now() 4019 if task[0] == 1: 4020 s3db = current.s3db 4021 response = current.response 4022 errorString = "prepopulate error: file %s missing" 4023 # Store the view 4024 view = response.view 4025 4026 #current.log.debug("Running job %s %s (filename=%s transform=%s)" % (task[1], 4027 # task[2], 4028 # task[3], 4029 # task[4], 4030 # )) 4031 4032 prefix = task[1] 4033 name = task[2] 4034 tablename = "%s_%s" % (prefix, name) 4035 if tablename in self.alternateTables: 4036 details = self.alternateTables[tablename] 4037 if "tablename" in details: 4038 tablename = details["tablename"] 4039 s3db.table(tablename) 4040 if "loader" in details: 4041 loader = details["loader"] 4042 if loader is not None: 4043 loader() 4044 if "prefix" in details: 4045 prefix = details["prefix"] 4046 if "name" in details: 4047 name = details["name"] 4048 4049 try: 4050 resource = s3db.resource(tablename) 4051 except AttributeError: 4052 # Table cannot be loaded 4053 self.errorList.append("WARNING: Unable to find table %s import job skipped" % tablename) 4054 return 4055 4056 # Check if the source file is accessible 4057 filename = task[3] 4058 if filename[:7] == "http://": 4059 req = urllib2.Request(url=filename) 4060 try: 4061 f = urllib2.urlopen(req) 4062 except urllib2.HTTPError, e: 4063 self.errorList.append("Could not access %s: %s" % (filename, e.read())) 4064 return 4065 except: 4066 self.errorList.append(errorString % filename) 4067 return 4068 else: 4069 csv = f 4070 else: 4071 try: 4072 csv = open(filename, "r") 4073 except IOError: 4074 self.errorList.append(errorString % filename) 4075 return 4076 4077 # Check if the stylesheet is accessible 4078 try: 4079 S = open(task[4], "r") 4080 except IOError: 4081 self.errorList.append(errorString % task[4]) 4082 return 4083 else: 4084 S.close() 4085 4086 if tablename not in self.customised: 4087 # Customise the resource 4088 customise = current.deployment_settings.customise_resource(tablename) 4089 if customise: 4090 request = S3Request(prefix, name, current.request) 4091 customise(request, tablename) 4092 self.customised.append(tablename) 4093 4094 extra_data = None 4095 if task[5]: 4096 try: 4097 extradata = self.unescape(task[5], {"'": '"'}) 4098 extradata = json.loads(extradata) 4099 extra_data = extradata 4100 except: 4101 self.errorList.append("WARNING:5th parameter invalid, parameter %s ignored" % task[5]) 4102 auth = current.auth 4103 auth.rollback = True 4104 try: 4105 # @todo: add extra_data and file attachments 4106 resource.import_xml(csv, 4107 format="csv", 4108 stylesheet=task[4], 4109 extra_data=extra_data, 4110 ) 4111 except SyntaxError, e: 4112 self.errorList.append("WARNING: import error - %s (file: %s, stylesheet: %s)" % 4113 (e, filename, task[4])) 4114 auth.rollback = False 4115 return 4116 4117 if not resource.error: 4118 current.db.commit() 4119 else: 4120 # Must roll back if there was an error! 4121 error = resource.error 4122 self.errorList.append("%s - %s: %s" % ( 4123 task[3], resource.tablename, error)) 4124 errors = current.xml.collect_errors(resource) 4125 if errors: 4126 self.errorList.extend(errors) 4127 current.db.rollback() 4128 4129 auth.rollback = False 4130 4131 # Restore the view 4132 response.view = view 4133 end = datetime.datetime.now() 4134 duration = end - start 4135 csvName = task[3][task[3].rfind("/") + 1:] 4136 try: 4137 # Python 2.7 4138 duration = '{:.2f}'.format(duration.total_seconds() / 60) 4139 msg = "%s import job completed in %s mins" % (csvName, duration) 4140 except AttributeError: 4141 # older Python 4142 msg = "%s import job completed in %s" % (csvName, duration) 4143 self.resultList.append(msg) 4144 current.log.debug(msg)
4145 4146 # -------------------------------------------------------------------------
4147 - def execute_special_task(self, task):
4148 """ 4149 Execute import tasks which require a custom function, 4150 such as import_role 4151 """ 4152 4153 start = datetime.datetime.now() 4154 s3 = current.response.s3 4155 if task[0] == 2: 4156 fun = task[1] 4157 filepath = task[2] 4158 extraArgs = task[3] 4159 if filepath is None: 4160 if extraArgs is None: 4161 error = s3[fun]() 4162 else: 4163 error = s3[fun](*extraArgs) 4164 elif extraArgs is None: 4165 error = s3[fun](filepath) 4166 else: 4167 error = s3[fun](filepath, *extraArgs) 4168 if error: 4169 self.errorList.append(error) 4170 end = datetime.datetime.now() 4171 duration = end - start 4172 try: 4173 # Python 2.7 4174 duration = '{:.2f}'.format(duration.total_seconds()/60) 4175 msg = "%s import job completed in %s mins" % (fun, duration) 4176 except AttributeError: 4177 # older Python 4178 msg = "%s import job completed in %s" % (fun, duration) 4179 self.resultList.append(msg) 4180 current.log.debug(msg)
4181 4182 # ------------------------------------------------------------------------- 4183 @staticmethod
4184 - def _lookup_pe(entity):
4185 """ 4186 Convert an Organisation Name to a pe_id 4187 - helper for import_role 4188 """ 4189 4190 table = current.s3db.org_organisation 4191 org = current.db(table.name == entity).select(table.pe_id, 4192 limitby = (0, 1) 4193 ).first() 4194 try: 4195 pe_id = org.pe_id 4196 except AttributeError: 4197 current.log.warning("import_role cannot find pe_id for %s" % entity) 4198 pe_id = None 4199 4200 return pe_id
4201 4202 # -------------------------------------------------------------------------
4203 - def import_role(self, filename):
4204 """ 4205 Import Roles from CSV 4206 """ 4207 4208 # Check if the source file is accessible 4209 try: 4210 openFile = open(filename, "r") 4211 except IOError: 4212 return "Unable to open file %s" % filename 4213 4214 auth = current.auth 4215 acl = auth.permission 4216 create_role = auth.s3_create_role 4217 4218 def parseACL(_acl): 4219 permissions = _acl.split("|") 4220 aclValue = 0 4221 for permission in permissions: 4222 if permission == "READ": 4223 aclValue = aclValue | acl.READ 4224 if permission == "CREATE": 4225 aclValue = aclValue | acl.CREATE 4226 if permission == "UPDATE": 4227 aclValue = aclValue | acl.UPDATE 4228 if permission == "DELETE": 4229 aclValue = aclValue | acl.DELETE 4230 if permission == "REVIEW": 4231 aclValue = aclValue | acl.REVIEW 4232 if permission == "APPROVE": 4233 aclValue = aclValue | acl.APPROVE 4234 if permission == "PUBLISH": 4235 aclValue = aclValue | acl.PUBLISH 4236 if permission == "ALL": 4237 aclValue = aclValue | acl.ALL 4238 return aclValue
4239 4240 reader = self.csv.DictReader(openFile) 4241 roles = {} 4242 acls = {} 4243 args = {} 4244 for row in reader: 4245 if row != None: 4246 row_get = row.get 4247 role = row_get("role") 4248 desc = row_get("description", "") 4249 rules = {} 4250 extra_param = {} 4251 controller = row_get("controller") 4252 if controller: 4253 rules["c"] = controller 4254 fn = row_get("function") 4255 if fn: 4256 rules["f"] = fn 4257 table = row_get("table") 4258 if table: 4259 rules["t"] = table 4260 oacl = row_get("oacl") 4261 if oacl: 4262 rules["oacl"] = parseACL(oacl) 4263 uacl = row_get("uacl") 4264 if uacl: 4265 rules["uacl"] = parseACL(uacl) 4266 #org = row_get("org") 4267 #if org: 4268 # rules["organisation"] = org 4269 #facility = row_get("facility") 4270 #if facility: 4271 # rules["facility"] = facility 4272 entity = row_get("entity") 4273 if entity: 4274 if entity == "any": 4275 # Pass through as-is 4276 pass 4277 else: 4278 try: 4279 entity = int(entity) 4280 except ValueError: 4281 entity = self._lookup_pe(entity) 4282 rules["entity"] = entity 4283 hidden = row_get("hidden") 4284 if hidden: 4285 extra_param["hidden"] = hidden 4286 system = row_get("system") 4287 if system: 4288 extra_param["system"] = system 4289 protected = row_get("protected") 4290 if protected: 4291 extra_param["protected"] = protected 4292 uid = row_get("uid") 4293 if uid: 4294 extra_param["uid"] = uid 4295 if role in roles: 4296 acls[role].append(rules) 4297 else: 4298 roles[role] = [role, desc] 4299 acls[role] = [rules] 4300 if len(extra_param) > 0 and role not in args: 4301 args[role] = extra_param 4302 for rulelist in roles.values(): 4303 if rulelist[0] in args: 4304 create_role(rulelist[0], 4305 rulelist[1], 4306 *acls[rulelist[0]], 4307 **args[rulelist[0]]) 4308 else: 4309 create_role(rulelist[0], 4310 rulelist[1], 4311 *acls[rulelist[0]])
4312 4313 # -------------------------------------------------------------------------
4314 - def import_user(self, filename):
4315 """ 4316 Import Users from CSV with an import Prep 4317 """ 4318 4319 current.response.s3.import_prep = current.auth.s3_import_prep 4320 user_task = [1, 4321 "auth", 4322 "user", 4323 filename, 4324 os.path.join(current.request.folder, 4325 "static", 4326 "formats", 4327 "s3csv", 4328 "auth", 4329 "user.xsl" 4330 ), 4331 None 4332 ] 4333 self.execute_import_task(user_task)
4334 4335 # -------------------------------------------------------------------------
4336 - def import_feed(self, filename):
4337 """ 4338 Import RSS Feeds from CSV with an import Prep 4339 """ 4340 4341 stylesheet = os.path.join(current.request.folder, 4342 "static", 4343 "formats", 4344 "s3csv", 4345 "msg", 4346 "rss_channel.xsl" 4347 ) 4348 4349 # 1st import any Contacts 4350 current.response.s3.import_prep = current.s3db.pr_import_prep 4351 user_task = [1, 4352 "pr", 4353 "contact", 4354 filename, 4355 stylesheet, 4356 None 4357 ] 4358 self.execute_import_task(user_task) 4359 4360 # Then import the Channels 4361 user_task = [1, 4362 "msg", 4363 "rss_channel", 4364 filename, 4365 stylesheet, 4366 None 4367 ] 4368 self.execute_import_task(user_task)
4369 4370 # -------------------------------------------------------------------------
4371 - def import_image(self, 4372 filename, 4373 tablename, 4374 idfield, 4375 imagefield):
4376 """ 4377 Import images, such as a logo or person image 4378 4379 filename a CSV list of records and filenames 4380 tablename the name of the table 4381 idfield the field used to identify the record 4382 imagefield the field to where the image will be added 4383 4384 Example: 4385 bi.import_image ("org_logos.csv", "org_organisation", "name", "logo") 4386 and the file org_logos.csv may look as follows 4387 id file 4388 Sahana Software Foundation sahanalogo.jpg 4389 American Red Cross icrc.gif 4390 """ 4391 4392 # Check if the source file is accessible 4393 try: 4394 openFile = open(filename, "r") 4395 except IOError: 4396 return "Unable to open file %s" % filename 4397 4398 prefix, name = tablename.split("_", 1) 4399 4400 reader = self.csv.DictReader(openFile) 4401 4402 db = current.db 4403 s3db = current.s3db 4404 audit = current.audit 4405 table = s3db[tablename] 4406 idfield = table[idfield] 4407 base_query = (table.deleted != True) 4408 fieldnames = [table._id.name, 4409 imagefield 4410 ] 4411 # https://github.com/web2py/web2py/blob/master/gluon/sqlhtml.py#L1947 4412 for field in table: 4413 if field.name not in fieldnames and field.writable is False \ 4414 and field.update is None and field.compute is None: 4415 fieldnames.append(field.name) 4416 fields = [table[f] for f in fieldnames] 4417 4418 # Get callbacks 4419 get_config = s3db.get_config 4420 onvalidation = get_config(tablename, "update_onvalidation") or \ 4421 get_config(tablename, "onvalidation") 4422 onaccept = get_config(tablename, "update_onaccept") or \ 4423 get_config(tablename, "onaccept") 4424 update_realm = get_config(tablename, "update_realm") 4425 if update_realm: 4426 set_realm_entity = current.auth.set_realm_entity 4427 update_super = s3db.update_super 4428 4429 for row in reader: 4430 if row != None: 4431 # Open the file 4432 image = row["file"] 4433 try: 4434 # Extract the path to the CSV file, image should be in 4435 # this directory, or relative to it 4436 path = os.path.split(filename)[0] 4437 imagepath = os.path.join(path, image) 4438 openFile = open(imagepath, "rb") 4439 except IOError: 4440 current.log.error("Unable to open image file %s" % image) 4441 continue 4442 image_source = StringIO(openFile.read()) 4443 # Get the id of the resource 4444 query = base_query & (idfield == row["id"]) 4445 record = db(query).select(limitby = (0, 1), 4446 *fields).first() 4447 try: 4448 record_id = record.id 4449 except AttributeError: 4450 current.log.error("Unable to get record %s of the resource %s to attach the image file to" % (row["id"], tablename)) 4451 continue 4452 # Create and accept the form 4453 form = SQLFORM(table, record, fields=["id", imagefield]) 4454 form_vars = Storage() 4455 form_vars._formname = "%s/%s" % (tablename, record_id) 4456 form_vars.id = record_id 4457 source = Storage() 4458 source.filename = imagepath 4459 source.file = image_source 4460 form_vars[imagefield] = source 4461 if form.accepts(form_vars, onvalidation=onvalidation): 4462 # Audit 4463 audit("update", prefix, name, form=form, 4464 record=record_id, representation="csv") 4465 4466 # Update super entity links 4467 update_super(table, form_vars) 4468 4469 # Update realm 4470 if update_realm: 4471 set_realm_entity(table, form_vars, force_update=True) 4472 4473 # Execute onaccept 4474 callback(onaccept, form, tablename=tablename) 4475 else: 4476 for (key, error) in form.errors.items(): 4477 current.log.error("error importing logo %s: %s %s" % (image, key, error))
4478 4479 # -------------------------------------------------------------------------
4480 - def import_font(self, url):
4481 """ 4482 Install a Font 4483 """ 4484 4485 if url == "unifont": 4486 #url = "http://unifoundry.com/pub/unifont-7.0.06/font-builds/unifont-7.0.06.ttf" 4487 url = "http://unifoundry.com/pub/unifont-10.0.07/font-builds/unifont-10.0.07.ttf" 4488 # Rename to make version upgrades be transparent 4489 filename = "unifont.ttf" 4490 extension = "ttf" 4491 else: 4492 filename = url.split("/")[-1] 4493 filename, extension = filename.rsplit(".", 1) 4494 4495 if extension not in ("ttf", "gz", "zip"): 4496 current.log.warning("Unsupported font extension: %s" % extension) 4497 return 4498 4499 filename = "%s.ttf" % filename 4500 4501 fontPath = os.path.join(current.request.folder, "static", "fonts") 4502 if os.path.exists(os.path.join(fontPath, filename)): 4503 current.log.warning("Using cached copy of %s" % filename) 4504 return 4505 4506 # Download as we have no cached copy 4507 4508 # Copy the current working directory to revert back to later 4509 cwd = os.getcwd() 4510 4511 # Set the current working directory 4512 os.chdir(fontPath) 4513 try: 4514 _file = fetch(url) 4515 except urllib2.URLError, exception: 4516 current.log.error(exception) 4517 # Revert back to the working directory as before. 4518 os.chdir(cwd) 4519 return 4520 4521 if extension == "gz": 4522 import tarfile 4523 tf = tarfile.open(fileobj=StringIO(_file)) 4524 tf.extractall() 4525 4526 elif extension == "zip": 4527 import zipfile 4528 zf = zipfile.ZipFile(StringIO(_file)) 4529 zf.extractall() 4530 4531 else: 4532 f = open(filename, "wb") 4533 f.write(_file) 4534 f.close() 4535 4536 # Revert back to the working directory as before. 4537 os.chdir(cwd)
4538 4539 # -------------------------------------------------------------------------
4540 - def import_remote_csv(self, url, prefix, resource, stylesheet):
4541 """ Import CSV files from remote servers """ 4542 4543 extension = url.split(".")[-1] 4544 if extension not in ("csv", "zip"): 4545 current.log.error("error importing remote file %s: invalid extension" % (url)) 4546 return 4547 4548 # Copy the current working directory to revert back to later 4549 cwd = os.getcwd() 4550 4551 # Shortcut 4552 os_path = os.path 4553 os_path_exists = os_path.exists 4554 os_path_join = os_path.join 4555 4556 # Create the working directory 4557 TEMP = os_path_join(cwd, "temp") 4558 if not os_path_exists(TEMP): # use web2py/temp/remote_csv as a cache 4559 import tempfile 4560 TEMP = tempfile.gettempdir() 4561 tempPath = os_path_join(TEMP, "remote_csv") 4562 if not os_path_exists(tempPath): 4563 try: 4564 os.mkdir(tempPath) 4565 except OSError: 4566 current.log.error("Unable to create temp folder %s!" % tempPath) 4567 return 4568 4569 filename = url.split("/")[-1] 4570 if extension == "zip": 4571 filename = filename.replace(".zip", ".csv") 4572 if os_path_exists(os_path_join(tempPath, filename)): 4573 current.log.warning("Using cached copy of %s" % filename) 4574 else: 4575 # Download if we have no cached copy 4576 # Set the current working directory 4577 os.chdir(tempPath) 4578 try: 4579 _file = fetch(url) 4580 except urllib2.URLError, exception: 4581 current.log.error(exception) 4582 # Revert back to the working directory as before. 4583 os.chdir(cwd) 4584 return 4585 4586 if extension == "zip": 4587 # Need to unzip 4588 import zipfile 4589 try: 4590 myfile = zipfile.ZipFile(StringIO(_file)) 4591 except zipfile.BadZipfile, exception: 4592 # e.g. trying to download through a captive portal 4593 current.log.error(exception) 4594 # Revert back to the working directory as before. 4595 os.chdir(cwd) 4596 return 4597 files = myfile.infolist() 4598 for f in files: 4599 filename = f.filename 4600 extension = filename.split(".")[-1] 4601 if extension == "csv": 4602 _file = myfile.read(filename) 4603 _f = open(filename, "w") 4604 _f.write(_file) 4605 _f.close() 4606 break 4607 myfile.close() 4608 else: 4609 f = open(filename, "w") 4610 f.write(_file) 4611 f.close() 4612 4613 # Revert back to the working directory as before. 4614 os.chdir(cwd) 4615 4616 task = [1, prefix, resource, 4617 os_path_join(tempPath, filename), 4618 os_path_join(current.request.folder, 4619 "static", 4620 "formats", 4621 "s3csv", 4622 prefix, 4623 stylesheet 4624 ), 4625 None 4626 ] 4627 self.execute_import_task(task)
4628 4629 # ------------------------------------------------------------------------- 4630 @staticmethod
4631 - def import_script(filename):
4632 """ 4633 Run a custom Import Script 4634 4635 @ToDo: Report Errors during Script run to console better 4636 """ 4637 4638 from gluon.cfs import getcfs 4639 from gluon.compileapp import build_environment 4640 from gluon.restricted import restricted 4641 4642 environment = build_environment(current.request, current.response, current.session) 4643 environment["current"] = current 4644 environment["auth"] = current.auth 4645 environment["db"] = current.db 4646 environment["gis"] = current.gis 4647 environment["s3db"] = current.s3db 4648 environment["settings"] = current.deployment_settings 4649 4650 code = getcfs(filename, filename, None) 4651 restricted(code, environment, layer=filename)
4652 4653 # -------------------------------------------------------------------------
4654 - def import_task(self, task_name, args_json=None, vars_json=None):
4655 """ 4656 Import a Scheduled Task 4657 """ 4658 4659 # Store current value of Bulk 4660 bulk = current.response.s3.bulk 4661 # Set Bulk to true for this parse 4662 current.response.s3.bulk = True 4663 validator = IS_JSONS3() 4664 if args_json: 4665 task_args, error = validator(args_json) 4666 if error: 4667 self.errorList.append(error) 4668 return 4669 else: 4670 task_args = [] 4671 if vars_json: 4672 all_vars, error = validator(vars_json) 4673 if error: 4674 self.errorList.append(error) 4675 return 4676 else: 4677 all_vars = {} 4678 # Restore bulk setting 4679 current.response.s3.bulk = bulk 4680 4681 kwargs = {} 4682 task_vars = {} 4683 options = ("function_name", 4684 "start_time", 4685 "next_run_time", 4686 "stop_time", 4687 "repeats", 4688 "period", # seconds 4689 "timeout", # seconds 4690 "enabled", # None = Enabled 4691 "group_name", 4692 "ignore_duplicate", 4693 "sync_output", 4694 ) 4695 for var in all_vars: 4696 if var in options: 4697 kwargs[var] = all_vars[var] 4698 else: 4699 task_vars[var] = all_vars[var] 4700 4701 current.s3task.schedule_task(task_name.split(os.path.sep)[-1], # Strip the path 4702 args = task_args, 4703 vars = task_vars, 4704 **kwargs 4705 )
4706 4707 # -------------------------------------------------------------------------
4708 - def import_xml(self, 4709 filepath, 4710 prefix, 4711 resourcename, 4712 dataformat, 4713 source_type=None, 4714 ):
4715 """ 4716 Import XML data using an XSLT: static/formats/<dataformat>/import.xsl 4717 Setting the source_type is possible 4718 """ 4719 4720 # Remove any spaces and enclosing double quote 4721 prefix = prefix.strip('" ') 4722 resourcename = resourcename.strip('" ') 4723 4724 errorString = "prepopulate error: file %s missing" 4725 try: 4726 source = open(filepath, "r") 4727 except IOError: 4728 self.errorList.append(errorString % filepath) 4729 return 4730 4731 stylesheet = os.path.join(current.request.folder, 4732 "static", 4733 "formats", 4734 dataformat, 4735 "import.xsl") 4736 try: 4737 xslt_file = open(stylesheet, "r") 4738 except IOError: 4739 self.errorList.append(errorString % stylesheet) 4740 return 4741 else: 4742 xslt_file.close() 4743 4744 tablename = "%s_%s" % (prefix, resourcename) 4745 resource = current.s3db.resource(tablename) 4746 4747 if tablename not in self.customised: 4748 # Customise the resource 4749 customise = current.deployment_settings.customise_resource(tablename) 4750 if customise: 4751 request = S3Request(prefix, resourcename, current.request) 4752 customise(request, tablename) 4753 self.customised.append(tablename) 4754 4755 auth = current.auth 4756 auth.rollback = True 4757 try: 4758 resource.import_xml(source, 4759 stylesheet = stylesheet, 4760 source_type = source_type, 4761 ) 4762 except SyntaxError, e: 4763 self.errorList.append("WARNING: import error - %s (file: %s, stylesheet: %s/import.xsl)" % 4764 (e, filepath, dataformat)) 4765 auth.rollback = False 4766 return 4767 4768 if not resource.error: 4769 current.db.commit() 4770 else: 4771 # Must roll back if there was an error! 4772 error = resource.error 4773 self.errorList.append("%s - %s: %s" % ( 4774 filepath, tablename, error)) 4775 errors = current.xml.collect_errors(resource) 4776 if errors: 4777 self.errorList.extend(errors) 4778 current.db.rollback() 4779 4780 auth.rollback = False
4781 4782 # -------------------------------------------------------------------------
4783 - def perform_tasks(self, path):
4784 """ 4785 Load and then execute the import jobs that are listed in the 4786 descriptor file (tasks.cfg) 4787 """ 4788 4789 self.load_descriptor(path) 4790 for task in self.tasks: 4791 if task[0] == 1: 4792 self.execute_import_task(task) 4793 elif task[0] == 2: 4794 self.execute_special_task(task)
4795 4796 # END ========================================================================= 4797