Package s3 :: Module s3sync
[frames] | no frames]

Source Code for Module s3.s3sync

   1  # -*- coding: utf-8 -*- 
   2   
   3  """ S3 Synchronization 
   4   
   5      @copyright: 2011-2019 (c) Sahana Software Foundation 
   6      @license: MIT 
   7   
   8      Permission is hereby granted, free of charge, to any person 
   9      obtaining a copy of this software and associated documentation 
  10      files (the "Software"), to deal in the Software without 
  11      restriction, including without limitation the rights to use, 
  12      copy, modify, merge, publish, distribute, sublicense, and/or sell 
  13      copies of the Software, and to permit persons to whom the 
  14      Software is furnished to do so, subject to the following 
  15      conditions: 
  16   
  17      The above copyright notice and this permission notice shall be 
  18      included in all copies or substantial portions of the Software. 
  19   
  20      THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 
  21      EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 
  22      OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 
  23      NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 
  24      HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
  25      WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
  26      FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 
  27      OTHER DEALINGS IN THE SOFTWARE. 
  28  """ 
  29   
  30  import json 
  31  import sys 
  32  import datetime 
  33   
  34  try: 
  35      from cStringIO import StringIO # Faster, where available 
  36  except ImportError: 
  37      from StringIO import StringIO 
  38   
  39  from gluon import current, URL, DIV 
  40  from gluon.storage import Storage 
  41   
  42  from s3datetime import s3_parse_datetime, s3_utc 
  43  from s3rest import S3Method 
  44  from s3import import S3ImportItem 
  45  from s3query import S3URLQuery 
  46  from s3utils import s3_str 
47 48 # ============================================================================= 49 -class S3Sync(S3Method):
50 """ Synchronization Handler """ 51
52 - def __init__(self):
53 """ Constructor """ 54 55 S3Method.__init__(self) 56 57 self.log = S3SyncLog() 58 self._config = None
59 60 # -------------------------------------------------------------------------
61 - def apply_method(self, r, **attr):
62 """ 63 RESTful method handler, responds to: 64 - GET [prefix]/[name]/sync.xml - incoming pull 65 - PUT|POST [prefix]/[name]/sync.xml - incoming push 66 - POST sync/repository/register.json - remote registration 67 68 NB incoming pull/push reponse normally by local sync/sync 69 controller as resource proxy => back-end generated S3Request 70 71 @param r: the S3Request 72 @param attr: controller parameters for the request 73 """ 74 75 output = {} 76 77 method = r.method 78 if method == "sync": 79 80 if r.http == "GET": 81 # Incoming pull 82 output = self.__send(r, **attr) 83 84 elif r.http in ("PUT", "POST"): 85 # Incoming push 86 output = self.__receive(r, **attr) 87 88 else: 89 r.error(405, current.ERROR.BAD_METHOD) 90 91 elif method == "register": 92 93 if r.http in ("PUT", "POST"): 94 # Incoming registration request 95 if r.representation == "json": 96 output = self.__register(r, **attr) 97 else: 98 r.error(415, current.ERROR.BAD_FORMAT) 99 100 else: 101 r.error(405, current.ERROR.BAD_METHOD) 102 103 else: 104 r.error(405, current.ERROR.BAD_METHOD) 105 106 return output
107 108 # ------------------------------------------------------------------------- 109 # REST Methods: 110 # -------------------------------------------------------------------------
111 - def __register(self, r, **attr):
112 """ 113 Respond to an incoming registration request 114 115 @param r: the S3Request 116 @param attr: controller parameters for the request 117 """ 118 119 # Parse the request parameters 120 from s3validators import JSONERRORS 121 source = r.read_body() 122 if not source: 123 r.error(400, "Missing parameters") 124 try: 125 parameters = json.load(source[0]) 126 except JSONERRORS: 127 r.error(400, "Invalid parameters: %s" % (sys.exc_info()[1])) 128 129 log = self.log 130 131 result = log.SUCCESS 132 repository_id = None 133 134 ruid = parameters.get("uuid") 135 if ruid: 136 137 # New repository or update? 138 db = current.db 139 rtable = current.s3db.sync_repository 140 row = db(rtable.uuid == ruid).select(rtable.id, 141 limitby = (0, 1), 142 ).first() 143 144 # Check permissions 145 permitted = current.auth.s3_has_permission 146 if row: 147 repository_id = row.id 148 if not permitted("update", rtable, record_id = repository_id): 149 r.unauthorised() 150 data = {"deleted": False} 151 else: 152 if not permitted("create", rtable): 153 r.unauthorised() 154 data = {"uuid": ruid} 155 156 # Add repository parameters 157 apitype = parameters.get("apitype") 158 if apitype: 159 data["apitype"] = apitype 160 161 name = parameters.get("name") 162 if name: 163 data["name"] = name 164 165 # Update or insert repository record 166 if row: 167 success = row.update_record(**data) 168 if success: 169 message = "Registration update successful" 170 else: 171 result = log.ERROR 172 message = "Registration update failed" 173 else: 174 repository_id = rtable.insert(**data) 175 if repository_id: 176 message = "Registration successful" 177 else: 178 result = log.ERROR 179 message = "Registration failed" 180 else: 181 result = log.ERROR 182 message = "No repository identifier specified" 183 184 # Response message (JSON) 185 if result == log.SUCCESS: 186 output = current.xml.json_message(message = message, 187 sender = "%s" % self.config.uuid, 188 ) 189 else: 190 output = current.xml.json_message(False, 400, 191 message = message, 192 sender = "%s" % self.config.uuid, 193 ) 194 195 # Set Content-Type response header 196 current.response.headers["Content-Type"] = "application/json" 197 198 # Log the operation 199 log.write(repository_id = repository_id, 200 resource_name = log.NONE, 201 transmission = log.IN, 202 mode = log.REGISTER, 203 result = result, 204 message = message, 205 ) 206 207 return output
208 209 # -------------------------------------------------------------------------
210 - def __send(self, r, **attr):
211 """ 212 Respond to an incoming pull 213 214 @param r: the S3Request 215 @param attr: the controller attributes 216 """ 217 218 mixed = attr.get("mixed", False) 219 get_vars = r.get_vars 220 vars_get = get_vars.get 221 222 resource = r.resource 223 224 # Identify the requesting repository 225 repository_uuid = vars_get("repository") 226 connector = None 227 228 if repository_uuid: 229 230 rtable = current.s3db.sync_repository 231 query = rtable.uuid == repository_uuid 232 row = current.db(query).select(limitby=(0, 1)).first() 233 if row: 234 connector = S3SyncRepository(row) 235 236 if connector is None: 237 # Use a dummy repository with Eden API 238 connector = S3SyncRepository(Storage(id = None, 239 name = "unknown", 240 apitype = "eden", 241 )) 242 243 current.log.debug("S3Sync PULL from %s (%s)" % (connector.name, 244 connector.apitype)) 245 246 # Additional export parameters 247 start = vars_get("start", None) 248 if start is not None: 249 try: 250 start = int(start) 251 except ValueError: 252 start = None 253 limit = vars_get("limit", None) 254 if limit is not None: 255 try: 256 limit = int(limit) 257 except ValueError: 258 limit = None 259 msince = vars_get("msince", None) 260 if msince is not None: 261 msince = s3_parse_datetime(msince) 262 263 # Sync filters from peer 264 filters = {} 265 for k, v in get_vars.items(): 266 if k[0] == "[" and "]" in k: 267 tablename, urlvar = k[1:].split("]", 1) 268 if urlvar: 269 if not tablename or tablename == "~": 270 tablename = resource.tablename 271 f = filters.get(tablename, {}) 272 u = f.get(urlvar, None) 273 if u: 274 u = "%s&%s" % (u, v) 275 else: 276 u = v 277 f[urlvar] = u 278 filters[tablename] = f 279 if not filters: 280 filters = None 281 282 # Should we include Components? 283 # @ToDo: Option to specify components? 284 components = vars_get("components", None) 285 if components and components.lower() == "none": 286 resource.components.reset(expose=[]) 287 288 try: 289 result = connector.send(resource, 290 start = start, 291 limit = limit, 292 msince = msince, 293 filters = filters, 294 mixed = mixed, 295 ) 296 except NotImplementedError: 297 r.error(405, "Synchronization method not supported for repository") 298 299 log = self.log 300 log.write(repository_id = connector.id, 301 resource_name = "mixed" if mixed else resource.tablename, 302 transmission = log.IN, 303 mode = log.PULL, 304 action = "send", 305 remote = result.get("remote", False), 306 result = result.get("status", log.NONE), 307 message = result.get("message", ""), 308 ) 309 310 return result.get("response")
311 312 # -------------------------------------------------------------------------
313 - def __receive(self, r, **attr):
314 """ 315 Respond to an incoming push 316 317 @param r: the S3Request 318 @param attr: the controller attributes 319 """ 320 321 mixed = attr.get("mixed", False) 322 get_vars = r.get_vars 323 324 s3db = current.s3db 325 db = current.db 326 327 # Identify the sending repository 328 repository_uuid = get_vars.get("repository") 329 connector = None 330 331 if repository_uuid: 332 333 rtable = s3db.sync_repository 334 query = rtable.uuid == repository_uuid 335 row = current.db(query).select(limitby=(0, 1)).first() 336 if row: 337 connector = S3SyncRepository(row) 338 339 if connector is None: 340 # Repositories must be registered to push, so that we 341 # can track sync times and log operations properly 342 r.error(403, "Registration required") 343 344 current.log.debug("S3Sync PUSH from %s (%s)" % (connector.name, 345 connector.apitype, 346 )) 347 348 # Get strategy and policy 349 default_update_policy = S3ImportItem.POLICY.NEWER 350 default_conflict_policy = S3ImportItem.POLICY.MASTER 351 352 # Identify the synchronization task 353 ttable = s3db.sync_task 354 if not mixed: 355 query = (ttable.repository_id == connector.id) & \ 356 (ttable.resource_name == r.tablename) & \ 357 (ttable.deleted != True) 358 task = db(query).select(limitby=(0, 1)).first() 359 else: 360 task = None 361 362 last_sync = None 363 if task: 364 strategy = task.strategy 365 update_policy = task.update_policy or default_update_policy 366 conflict_policy = task.conflict_policy or default_conflict_policy 367 if update_policy not in ("THIS", "OTHER"): 368 last_sync = task.last_pull 369 370 else: 371 policies = S3ImportItem.POLICY 372 p = get_vars.get("update_policy", None) 373 values = {"THIS": "OTHER", "OTHER": "THIS"} 374 switch = lambda p: p in values and values[p] or p 375 if p and p in policies: 376 p = switch(p) 377 update_policy = policies[p] 378 else: 379 update_policy = default_update_policy 380 p = get_vars.get("conflict_policy", None) 381 if p and p in policies: 382 p = switch(p) 383 conflict_policy = policies[p] 384 else: 385 conflict_policy = default_conflict_policy 386 msince = get_vars.get("msince", None) 387 if msince is not None: 388 last_sync = s3_parse_datetime(msince) 389 s = get_vars.get("strategy", None) 390 if s: 391 s = str(s).split(",") 392 methods = S3ImportItem.METHOD 393 strategy = [method for method in methods.values() 394 if method in s] 395 else: 396 strategy = ttable.strategy.default 397 398 # Get the source 399 source = r.read_body() 400 401 # Import resource 402 resource = r.resource 403 404 try: 405 result = connector.receive(source, 406 resource, 407 strategy = strategy, 408 update_policy = update_policy, 409 conflict_policy = conflict_policy, 410 last_sync = last_sync, 411 onconflict = self.onconflict, 412 mixed = mixed, 413 ) 414 except IOError: 415 current.auth.permission.fail() 416 except SyntaxError: 417 e = sys.exc_info()[1] 418 r.error(400, e) 419 except NotImplementedError: 420 r.error(405, "Synchronization method not supported for repository") 421 422 log = self.log 423 log.write(repository_id = connector.id, 424 resource_name = "mixed" if mixed else resource.tablename, 425 transmission = log.IN, 426 mode = log.PUSH, 427 action = "receive", 428 remote = result.get("remote", False), 429 result = result.get("status", log.NONE), 430 message = result.get("message", ""), 431 ) 432 433 return result.get("response")
434 435 # ------------------------------------------------------------------------- 436 # API Methods: 437 # -------------------------------------------------------------------------
438 - def synchronize(self, repository):
439 """ 440 Synchronize with a repository, called from scheduler task 441 442 @param repository: the repository Row 443 444 @return: True if successful, False if there was an error 445 """ 446 447 current.log.debug("S3Sync: synchronize %s" % repository.url) 448 449 log = self.log 450 repository_id = repository.id 451 452 error = None 453 if repository.apitype == "filesync": 454 if not repository.path: 455 error = "No path set for repository" 456 else: 457 if not repository.url: 458 error = "No URL set for repository" 459 if error: 460 log.write(repository_id = repository_id, 461 resource_name = None, 462 transmission = None, 463 mode = log.NONE, 464 action = "connect", 465 remote = False, 466 result = log.FATAL, 467 message = error, 468 ) 469 return False 470 471 # Should we update sync tasks from peer? 472 connector = S3SyncRepository(repository) 473 if hasattr(connector, "refresh"): 474 success = connector.refresh() 475 476 # Look up current sync tasks 477 db = current.db 478 s3db = current.s3db 479 ttable = s3db.sync_task 480 query = (ttable.repository_id == repository_id) & \ 481 (ttable.deleted == False) 482 tasks = db(query).select() 483 484 # Login at repository 485 error = connector.login() 486 if error: 487 log.write(repository_id = repository_id, 488 resource_name = None, 489 transmission = log.OUT, 490 mode = log.LOGIN, 491 action = "login", 492 remote = True, 493 result = log.FATAL, 494 message = error, 495 ) 496 return False 497 498 # Activate UUID synchronisation if required 499 s3 = current.response.s3 500 s3.synchronise_uuids = connector.synchronise_uuids 501 502 # Delta for msince progress = 1 second after the mtime of 503 # the youngest item transmitted (without this, the youngest 504 # items would be re-transmitted until there is another update, 505 # because msince means greater-or-equal) 506 delta = datetime.timedelta(seconds=1) 507 508 success = True 509 for task in tasks: 510 511 # Pull 512 mtime = None 513 if task.mode in (1, 3): 514 error, mtime = connector.pull(task, 515 onconflict=self.onconflict, 516 ) 517 if error: 518 success = False 519 current.log.debug("S3Sync: %s PULL error: %s" % 520 (task.resource_name, error)) 521 continue 522 if mtime is not None: 523 task.update_record(last_pull=mtime+delta) 524 525 # Push 526 mtime = None 527 if task.mode in (2, 3): 528 error, mtime = connector.push(task) 529 if error: 530 success = False 531 current.log.debug("S3Sync: %s PUSH error: %s" % 532 (task.resource_name, error)) 533 continue 534 if mtime is not None: 535 task.update_record(last_push=mtime+delta) 536 537 current.log.debug("S3Sync.synchronize: %s done" % task.resource_name) 538 539 s3.synchronise_uuids = False 540 db(s3db.sync_repository.id == repository_id).update( 541 last_connected = datetime.datetime.utcnow(), 542 ) 543 544 connector.close_archives() 545 546 return success
547 548 # ------------------------------------------------------------------------- 549 @classmethod
550 - def onconflict(cls, item, repository, resource):
551 """ 552 Automatic conflict resolution 553 554 @param item: the conflicting import item 555 @param repository: the repository the item comes from 556 @param resource: the resource the item shall be imported to 557 """ 558 559 s3db = current.s3db 560 debug = current.log.debug 561 562 tablename = resource.tablename 563 resolver = s3db.get_config(tablename, "onconflict") 564 565 debug("Resolving conflict in %s" % resource.tablename) 566 debug("Repository: %s" % repository.name) 567 debug("Conflicting item: %s" % item) 568 debug("Method: %s" % item.method) 569 570 if resolver: 571 debug("Applying custom rule") 572 resolver(item, repository, resource) 573 if item.conflict: 574 debug("Do not accept") 575 else: 576 debug("Accept per custom rule") 577 else: 578 debug("Applying default rule") 579 ttable = s3db.sync_task 580 policies = S3ImportItem.POLICY 581 query = (ttable.repository_id == repository.id) & \ 582 (ttable.resource_name == tablename) & \ 583 (ttable.deleted != True) 584 task = current.db(query).select(limitby=(0, 1)).first() 585 if task and item.original: 586 original = item.original 587 conflict_policy = task.conflict_policy 588 if conflict_policy == policies.OTHER: 589 # Always accept 590 debug("Accept by default") 591 item.conflict = False 592 elif conflict_policy == policies.NEWER: 593 # Accept if newer 594 xml = current.xml 595 if xml.MTIME in original and \ 596 s3_utc(original[xml.MTIME]) <= item.mtime: 597 debug("Accept because newer") 598 item.conflict = False 599 else: 600 debug("Do not accept") 601 elif conflict_policy == policies.MASTER: 602 # Accept if master 603 if current.xml.MCI in original and \ 604 original.mci == 0 or item.mci == 1: 605 debug("Accept because master") 606 item.conflict = False 607 else: 608 debug("Do not accept") 609 else: 610 # Never accept 611 debug("Do not accept") 612 else: 613 # No rule - accept always 614 debug("Accept because no rule found") 615 item.conflict = False
616 617 # -------------------------------------------------------------------------
618 - def create_archive(self, dataset_id, task_id=None):
619 """ 620 Create an archive for a data set 621 622 @param dataset_id: the data set record ID 623 @param task_id: the scheduler task ID if the archive is 624 created asynchronously 625 626 @return: error message if an error occured, otherwise None 627 """ 628 629 db = current.db 630 s3db = current.s3db 631 632 T = current.T 633 634 # Standard download path for archives 635 DOWNLOAD = "/default/download" 636 637 # Get the data set 638 dtable = s3db.sync_dataset 639 query = (dtable.id == dataset_id) & \ 640 (dtable.deleted == False) 641 dataset = db(query).select(dtable.id, 642 dtable.code, 643 dtable.archive_url, 644 dtable.repository_id, 645 limitby = (0, 1), 646 ).first() 647 if not dataset: 648 return T("Data Set not found") 649 elif dataset.repository_id: 650 return T("Cannot create archive from remote data set") 651 else: 652 code = dataset.code 653 if code: 654 filename = "%s.zip" % code 655 else: 656 filename = "dataset-%s.zip" % dataset_id 657 658 # Get all sync tasks for the data set 659 ttable = s3db.sync_task 660 query = (ttable.dataset_id == dataset_id) & \ 661 (ttable.repository_id == None) & \ 662 (ttable.mode == 1) & \ 663 (ttable.deleted == False) 664 tasks = db(query).select(ttable.id, 665 ttable.uuid, 666 ttable.resource_name, 667 ttable.components, 668 ) 669 670 if not tasks: 671 return T("No resources defined for dataset") 672 673 # Get the current archive record 674 atable = s3db.sync_dataset_archive 675 query = (atable.dataset_id == dataset_id) & \ 676 (atable.deleted == False) 677 row = db(query).select(atable.id, 678 limitby = (0, 1), 679 ).first() 680 if row: 681 archive_id = row.id 682 if task_id: 683 row.update_record(task_id=task_id) 684 db.commit() 685 else: 686 archive_id = atable.insert(dataset_id = dataset_id) 687 if not archive_id: 688 return T("Could not create or update archive") 689 690 # Create archive 691 archive = S3SyncDataArchive() 692 693 for task in tasks: 694 695 # Define the resource 696 components = [] if task.components is False else None 697 resource = current.s3db.resource(task.resource_name, 698 components = components, 699 ) 700 701 # Get the sync filters for this task 702 filters = current.sync.get_filters(task.id) 703 704 # Export the resource as S3XML 705 data = resource.export_xml(filters = filters, 706 #pretty_print = True, 707 ) 708 709 # Add to archive, using the UUID of the task as object name 710 archive.add("%s.xml" % task.uuid, data) 711 712 # Close the archive and get the output as file-like object 713 fileobj = archive.close() 714 715 # Store the fileobj in the archive-field 716 stored_filename = atable.archive.store(fileobj, filename) 717 db(atable.id == archive_id).update(date = datetime.datetime.utcnow(), 718 archive = stored_filename, 719 task_id = None, 720 ) 721 722 # Update archive URL if it is empty or a local path 723 # pointing to the standard download location 724 archive_url = dataset.archive_url 725 if not archive_url or archive_url.startswith(DOWNLOAD): 726 dataset.update_record( 727 use_archive = True, 728 archive_url = "%s/%s" % (DOWNLOAD, stored_filename), 729 ) 730 731 # Return None to indicate success 732 return None
733 734 # ------------------------------------------------------------------------- 735 # Utility methods: 736 # ------------------------------------------------------------------------- 737 @property
738 - def config(self):
739 """ Lazy access to the current sync config """ 740 741 if self._config is None: 742 743 table = current.s3db.sync_config 744 row = current.db().select(table.ALL, limitby=(0, 1)).first() 745 self._config = row 746 747 return self._config
748 749 # -------------------------------------------------------------------------
750 - def get_status(self):
751 """ Read the current sync status """ 752 753 table = current.s3db.sync_status 754 row = current.db().select(table.ALL, limitby=(0, 1)).first() 755 if not row: 756 row = Storage() 757 return row
758 759 # -------------------------------------------------------------------------
760 - def set_status(self, **attr):
761 """ Update the current sync status """ 762 763 table = current.s3db.sync_status 764 765 data = dict((k, attr[k]) for k in attr if k in table.fields) 766 data["timestmp"] = datetime.datetime.utcnow() 767 768 row = current.db().select(table._id, limitby=(0, 1)).first() 769 if row: 770 row.update_record(**data) 771 else: 772 table.insert(**data) 773 row = data 774 return row
775 776 # ------------------------------------------------------------------------- 777 @staticmethod
778 - def get_filters(task_id):
779 """ 780 Get all filters for a synchronization task 781 782 @param task_id: the task ID 783 @return: a dict of dicts like {tablename: {url_var: value}} 784 """ 785 786 db = current.db 787 s3db = current.s3db 788 789 ftable = s3db.sync_resource_filter 790 query = (ftable.task_id == task_id) & \ 791 (ftable.deleted != True) 792 rows = db(query).select(ftable.tablename, 793 ftable.filter_string, 794 ) 795 796 filters = {} 797 for row in rows: 798 tablename = row.tablename 799 if tablename in filters: 800 filters[tablename] = "%s&%s" % (filters[tablename], 801 row.filter_string, 802 ) 803 else: 804 filters[tablename] = row.filter_string 805 806 parse_url = S3URLQuery.parse_url 807 for tablename in filters: 808 filters[tablename] = parse_url(filters[tablename]) 809 return filters
810
811 # ============================================================================= 812 -class S3SyncLog(S3Method):
813 """ Synchronization Logger """ 814 815 TABLENAME = "sync_log" 816 817 # Outcomes 818 SUCCESS = "success" # worked 819 WARNING = "warning" # worked, but had issues 820 ERROR = "error" # failed, but may work later 821 FATAL = "fatal" # failed, will never work unless reconfigured 822 823 # Transmissions 824 IN = "incoming" 825 OUT = "outgoing" 826 827 # Methods 828 PULL = "pull" 829 PUSH = "push" 830 LOGIN = "login" 831 REGISTER = "register" 832 833 # None 834 NONE = "none" 835 836 # -------------------------------------------------------------------------
837 - def apply_method(self, r, **attr):
838 """ 839 RESTful method handler 840 841 @param r: the S3Request instance 842 @param attr: controller attributes for the request 843 """ 844 845 output = {} 846 847 resource = r.resource 848 if resource.tablename == self.TABLENAME: 849 return resource.crud.select(r, **attr) 850 851 elif resource.tablename == "sync_repository": 852 # READ for sync log for this repository (currently not needed) 853 pass 854 855 else: 856 if r.interactive: 857 # READ for sync log for this resource 858 here = "%s.%s" % (r.controller, r.function) 859 sync_log = current.s3db[self.TABLENAME] 860 sync_log.resource_name.readable = False 861 query = (sync_log.resource_name == resource.tablename) 862 r = r.factory(prefix="sync", name="log", args=[]) 863 s3 = current.response.s3 864 s3.filter = query 865 s3.prep = None 866 s3.postp = None 867 s3.actions = [{"label": s3_str(current.T("Details")), 868 "_class": "action-btn", 869 "url": URL(c = "sync", 870 f = "log", 871 args = ["[id]"], 872 vars = {"return":here}, 873 ) 874 }, 875 ] 876 output = r(subtitle=None, rheader=self.rheader) 877 else: 878 r.error(415, current.ERROR.BAD_FORMAT) 879 880 return output
881 882 # ------------------------------------------------------------------------- 883 @classmethod
884 - def write(cls, 885 repository_id=None, 886 resource_name=None, 887 transmission=None, 888 mode=None, 889 action=None, 890 result=None, 891 remote=False, 892 message=None):
893 """ 894 Writes a new entry to the log 895 896 @param repository_id: the repository record ID 897 @param resource_name: the resource name 898 @param transmission: transmission mode (IN, OUT or None) 899 @param mode: synchronization mode (PULL, PUSH or None) 900 @param action: action that triggers the log entry (if any) 901 @param result: the result of the transaction 902 (SUCCESS, WARNING, ERROR or FATAL) 903 @param remote: boolean, True if this is a remote error 904 @param message: clear text message 905 """ 906 907 if result not in (cls.SUCCESS, cls.WARNING, cls.ERROR, cls.FATAL): 908 result = cls.SUCCESS 909 910 if result == cls.SUCCESS: 911 # Can't be a remote error if it's not an error at all 912 remote = False 913 914 if transmission not in (cls.IN, cls.OUT): 915 transmission = cls.NONE 916 917 if mode not in (cls.PULL, cls.PUSH, cls.LOGIN, cls.REGISTER): 918 mode = cls.NONE 919 920 if not action: 921 action = cls.NONE 922 923 entry = {"timestmp": datetime.datetime.utcnow(), 924 "repository_id": repository_id, 925 "resource_name": resource_name, 926 "mode": "%s/%s" % (mode, transmission), 927 "action": action, 928 "result": result, 929 "remote": remote, 930 "message": message, 931 } 932 933 current.s3db[cls.TABLENAME].insert(**entry)
934 935 # ------------------------------------------------------------------------- 936 @staticmethod
937 - def rheader(r, **attr):
938 """ S3SyncLog resource header """ 939 940 if r.id is None: 941 return DIV(current.T("Showing latest entries first")) 942 else: 943 return None
944
945 # ============================================================================= 946 -class S3SyncRepository(object):
947 """ Class representation of a peer repository """ 948
949 - def __init__(self, repository):
950 """ 951 Constructor 952 953 @param repository: the repository record (Row) 954 """ 955 956 # Logger and Config 957 self.log = S3SyncLog 958 self._config = None 959 960 # Identifier and name 961 self.id = repository.id 962 self.name = repository.name 963 964 # API type and import/export backend 965 self.apitype = repository.apitype 966 self.backend = repository.backend 967 968 # URL / Path 969 self.url = repository.url 970 self.path = repository.path 971 972 # Authentication 973 self.username = repository.username 974 self.password = repository.password 975 self.client_id = repository.client_id 976 self.client_secret = repository.client_secret 977 self.site_key = repository.site_key 978 self.refresh_token = repository.refresh_token 979 980 # Network 981 self.proxy = repository.proxy 982 983 # Processing Options 984 self.synchronise_uuids = repository.synchronise_uuids 985 self.keep_source = repository.keep_source 986 self.last_refresh = repository.last_refresh 987 988 # Instantiate Adapter 989 import sync_adapter 990 api = sync_adapter.__dict__.get(self.apitype) 991 if api: 992 adapter = api.S3SyncAdapter(self) 993 else: 994 adapter = S3SyncBaseAdapter(self) 995 996 self.adapter = adapter 997 self.archives = {}
998 999 # ------------------------------------------------------------------------- 1000 @property
1001 - def config(self):
1002 """ Lazy access to the current sync config """ 1003 1004 if self._config is None: 1005 1006 table = current.s3db.sync_config 1007 row = current.db().select(table.ALL, limitby=(0, 1)).first() 1008 self._config = row 1009 1010 return self._config
1011 1012 # -------------------------------------------------------------------------
1013 - def __getattr__(self, name):
1014 """ 1015 Delegate other attributes and methods to the adapter 1016 1017 @param name: the attribute/method 1018 """ 1019 1020 return object.__getattribute__(self.adapter, name)
1021 1022 # -------------------------------------------------------------------------
1023 - def close_archives(self):
1024 """ 1025 Close any open archives 1026 """ 1027 1028 for archive in self.archives.values(): 1029 if archive: 1030 archive.close() 1031 self.archives = {}
1032
1033 # ============================================================================= 1034 -class S3SyncBaseAdapter(object):
1035 """ 1036 Sync Adapter (base class) - interface providing standard 1037 synchronization methods for the respective repository type. 1038 1039 This class isn't meant to be instantiated or accessed directly, 1040 but is normally accessed through the S3SyncRepository instance. 1041 """ 1042
1043 - def __init__(self, repository):
1044 """ 1045 Constructor 1046 1047 @param repository: the repository (S3Repository instance) 1048 """ 1049 1050 self.repository = repository 1051 self.log = repository.log 1052 1053 self.archives = {}
1054 1055 # ------------------------------------------------------------------------- 1056 # Methods to be implemented by subclasses: 1057 # -------------------------------------------------------------------------
1058 - def register(self):
1059 """ 1060 Register this site at the peer repository 1061 1062 @return: True|False to indicate success|failure, 1063 or None if registration is not required 1064 """ 1065 1066 raise NotImplementedError
1067 1068 # -------------------------------------------------------------------------
1069 - def login(self):
1070 """ 1071 Login at the peer repository 1072 1073 @return: None if successful, otherwise the error 1074 """ 1075 1076 raise NotImplementedError
1077 1078 # -------------------------------------------------------------------------
1079 - def pull(self, task, onconflict=None):
1080 """ 1081 Fetch updates from the peer repository and import them 1082 into the local database (active pull) 1083 1084 @param task: the synchronization task (sync_task Row) 1085 @param onconflict: callback for automatic conflict resolution 1086 1087 @return: tuple (error, mtime), with error=None if successful, 1088 else error=message, and mtime=modification timestamp 1089 of the youngest record sent 1090 """ 1091 1092 raise NotImplementedError
1093 1094 # -------------------------------------------------------------------------
1095 - def push(self, task):
1096 """ 1097 Extract new updates from the local database and send 1098 them to the peer repository (active push) 1099 1100 @param task: the synchronization task (sync_task Row) 1101 1102 @return: tuple (error, mtime), with error=None if successful, 1103 else error=message, and mtime=modification timestamp 1104 of the youngest record sent 1105 """ 1106 1107 raise NotImplementedError
1108 1109 # -------------------------------------------------------------------------
1110 - def send(self, 1111 resource, 1112 start=None, 1113 limit=None, 1114 msince=None, 1115 filters=None, 1116 mixed=False, 1117 pretty_print=False, 1118 ):
1119 """ 1120 Respond to an incoming pull from the peer repository 1121 1122 @param resource: the resource to be synchronized 1123 @param start: index of the first record to send 1124 @param limit: maximum number of records to send 1125 @param msince: minimum modification date/time for records to send 1126 @param filters: URL filters for record extraction 1127 @param mixed: negotiate resource with peer (disregard resource) 1128 @param pretty_print: make the output human-readable 1129 1130 @return: a dict {status, remote, message, response}, with: 1131 - status....the outcome of the operation 1132 - remote....whether the error was remote (or local) 1133 - message...the log message 1134 - response..the response to send to the peer 1135 """ 1136 1137 raise NotImplementedError
1138 1139 # -------------------------------------------------------------------------
1140 - def receive(self, 1141 source, 1142 resource, 1143 strategy=None, 1144 update_policy=None, 1145 conflict_policy=None, 1146 onconflict=None, 1147 last_sync=None, 1148 mixed=False):
1149 """ 1150 Respond to an incoming push from the peer repository 1151 1152 @param source: the input stream (list of file-like objects) 1153 @param resource: the target resource 1154 @param strategy: the import strategy 1155 @param update_policy: the update policy 1156 @param conflict_policy: the conflict resolution policy 1157 @param onconflict: callback for conflict resolution 1158 @param last_sync: the last synchronization date/time for the peer 1159 @param mixed: negotiate resource with peer (disregard resource) 1160 1161 @return: a dict {status, remote, message, response}, with: 1162 - status....the outcome of the operation 1163 - remote....whether the error was remote (or local) 1164 - message...the log message 1165 - response..the response to send to the peer 1166 """ 1167 1168 raise NotImplementedError
1169
1170 # ============================================================================= 1171 -class S3SyncDataArchive(object):
1172 """ 1173 Simple abstraction layer for (compressed) data archives, currently 1174 based on zipfile (Python standard library). Compression additionally 1175 requires zlib to be installed (both for write and read). 1176 """ 1177
1178 - def __init__(self, fileobj=None, compressed=True):
1179 """ 1180 Create or open an archive 1181 1182 @param fileobj: the file object containing the archive, 1183 None to create a new archive 1184 @param compress: enable (or suppress) compression of new 1185 archives 1186 """ 1187 1188 import zipfile 1189 1190 if compressed: 1191 compression = zipfile.ZIP_DEFLATED 1192 else: 1193 compression = zipfile.ZIP_STORED 1194 1195 if fileobj is not None: 1196 if not hasattr(fileobj, "seek"): 1197 # Possibly a addinfourl instance from urlopen, 1198 # => must copy to StringIO buffer for random access 1199 fileobj = StringIO(fileobj.read()) 1200 try: 1201 archive = zipfile.ZipFile(fileobj, "r") 1202 except RuntimeError: 1203 current.log.warn("invalid ZIP archive: %s" % sys.exc_info()[1]) 1204 archive = None 1205 else: 1206 fileobj = StringIO() 1207 try: 1208 archive = zipfile.ZipFile(fileobj, "w", compression, True) 1209 except RuntimeError: 1210 # Zlib not available? => try falling back to STORED 1211 compression = zipfile.ZIP_STORED 1212 archive = zipfile.ZipFile(fileobj, "w", compression, True) 1213 current.log.warn("zlib not available - cannot compress archive") 1214 1215 self.fileobj = fileobj 1216 self.archive = archive
1217 1218 # -------------------------------------------------------------------------
1219 - def add(self, name, obj):
1220 """ 1221 Add an object to the archive 1222 1223 @param name: the file name for the object inside the archive 1224 @param obj: the object to add (string or file-like object) 1225 1226 @raises UserWarning: when adding a duplicate name (overwrites 1227 the existing object in the archive) 1228 @raises RuntimeError: if the archive is not writable, or 1229 no valid object name has been provided 1230 @raises TypeError: if the object is not a unicode, str or 1231 file-like object 1232 """ 1233 1234 # Make sure the object name is an utf-8 encoded str 1235 if not name: 1236 raise RuntimeError("name is required") 1237 elif type(name) is not str: 1238 name = s3_str(name) 1239 1240 # Make sure the archive is available 1241 archive = self.archive 1242 if not archive: 1243 raise RuntimeError("cannot add to closed archive") 1244 1245 # Convert unicode objects to str 1246 if type(obj) is unicode: 1247 obj = obj.encode("utf-8") 1248 1249 # Write the object 1250 if type(obj) is str: 1251 archive.writestr(name, obj) 1252 1253 elif hasattr(obj, "read"): 1254 if hasattr(obj, "seek"): 1255 obj.seek(0) 1256 archive.writestr(name, obj.read()) 1257 1258 else: 1259 raise TypeError("invalid object type")
1260 1261 # -------------------------------------------------------------------------
1262 - def extract(self, name):
1263 """ 1264 Extract an object from the archive by name 1265 1266 @param name: the object name 1267 1268 @return: the object as file-like object, or None if 1269 the object could not be found in the archive 1270 """ 1271 1272 if not self.archive: 1273 raise RuntimeError("cannot extract from closed archive") 1274 1275 try: 1276 return self.archive.open(name) 1277 except KeyError: 1278 # Object doesn't exist 1279 return None
1280 1281 # -------------------------------------------------------------------------
1282 - def close(self):
1283 """ 1284 Close the archive and return it as file-like object; no further 1285 add/extract operations will be possible after closing. 1286 1287 @return: the file-like object containing the archive 1288 """ 1289 1290 if self.archive: 1291 self.archive.close() 1292 self.archive = None 1293 1294 fileobj = self.fileobj 1295 if fileobj: 1296 fileobj.seek(0) 1297 1298 return fileobj
1299 1300 # End ========================================================================= 1301