1
2
3 """ S3 Synchronization
4
5 @copyright: 2011-2019 (c) Sahana Software Foundation
6 @license: MIT
7
8 Permission is hereby granted, free of charge, to any person
9 obtaining a copy of this software and associated documentation
10 files (the "Software"), to deal in the Software without
11 restriction, including without limitation the rights to use,
12 copy, modify, merge, publish, distribute, sublicense, and/or sell
13 copies of the Software, and to permit persons to whom the
14 Software is furnished to do so, subject to the following
15 conditions:
16
17 The above copyright notice and this permission notice shall be
18 included in all copies or substantial portions of the Software.
19
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
22 OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
25 WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
26 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
27 OTHER DEALINGS IN THE SOFTWARE.
28 """
29
30 import json
31 import sys
32 import datetime
33
34 try:
35 from cStringIO import StringIO
36 except ImportError:
37 from StringIO import StringIO
38
39 from gluon import current, URL, DIV
40 from gluon.storage import Storage
41
42 from s3datetime import s3_parse_datetime, s3_utc
43 from s3rest import S3Method
44 from s3import import S3ImportItem
45 from s3query import S3URLQuery
46 from s3utils import s3_str
50 """ Synchronization Handler """
51
59
60
62 """
63 RESTful method handler, responds to:
64 - GET [prefix]/[name]/sync.xml - incoming pull
65 - PUT|POST [prefix]/[name]/sync.xml - incoming push
66 - POST sync/repository/register.json - remote registration
67
68 NB incoming pull/push reponse normally by local sync/sync
69 controller as resource proxy => back-end generated S3Request
70
71 @param r: the S3Request
72 @param attr: controller parameters for the request
73 """
74
75 output = {}
76
77 method = r.method
78 if method == "sync":
79
80 if r.http == "GET":
81
82 output = self.__send(r, **attr)
83
84 elif r.http in ("PUT", "POST"):
85
86 output = self.__receive(r, **attr)
87
88 else:
89 r.error(405, current.ERROR.BAD_METHOD)
90
91 elif method == "register":
92
93 if r.http in ("PUT", "POST"):
94
95 if r.representation == "json":
96 output = self.__register(r, **attr)
97 else:
98 r.error(415, current.ERROR.BAD_FORMAT)
99
100 else:
101 r.error(405, current.ERROR.BAD_METHOD)
102
103 else:
104 r.error(405, current.ERROR.BAD_METHOD)
105
106 return output
107
108
109
110
112 """
113 Respond to an incoming registration request
114
115 @param r: the S3Request
116 @param attr: controller parameters for the request
117 """
118
119
120 from s3validators import JSONERRORS
121 source = r.read_body()
122 if not source:
123 r.error(400, "Missing parameters")
124 try:
125 parameters = json.load(source[0])
126 except JSONERRORS:
127 r.error(400, "Invalid parameters: %s" % (sys.exc_info()[1]))
128
129 log = self.log
130
131 result = log.SUCCESS
132 repository_id = None
133
134 ruid = parameters.get("uuid")
135 if ruid:
136
137
138 db = current.db
139 rtable = current.s3db.sync_repository
140 row = db(rtable.uuid == ruid).select(rtable.id,
141 limitby = (0, 1),
142 ).first()
143
144
145 permitted = current.auth.s3_has_permission
146 if row:
147 repository_id = row.id
148 if not permitted("update", rtable, record_id = repository_id):
149 r.unauthorised()
150 data = {"deleted": False}
151 else:
152 if not permitted("create", rtable):
153 r.unauthorised()
154 data = {"uuid": ruid}
155
156
157 apitype = parameters.get("apitype")
158 if apitype:
159 data["apitype"] = apitype
160
161 name = parameters.get("name")
162 if name:
163 data["name"] = name
164
165
166 if row:
167 success = row.update_record(**data)
168 if success:
169 message = "Registration update successful"
170 else:
171 result = log.ERROR
172 message = "Registration update failed"
173 else:
174 repository_id = rtable.insert(**data)
175 if repository_id:
176 message = "Registration successful"
177 else:
178 result = log.ERROR
179 message = "Registration failed"
180 else:
181 result = log.ERROR
182 message = "No repository identifier specified"
183
184
185 if result == log.SUCCESS:
186 output = current.xml.json_message(message = message,
187 sender = "%s" % self.config.uuid,
188 )
189 else:
190 output = current.xml.json_message(False, 400,
191 message = message,
192 sender = "%s" % self.config.uuid,
193 )
194
195
196 current.response.headers["Content-Type"] = "application/json"
197
198
199 log.write(repository_id = repository_id,
200 resource_name = log.NONE,
201 transmission = log.IN,
202 mode = log.REGISTER,
203 result = result,
204 message = message,
205 )
206
207 return output
208
209
211 """
212 Respond to an incoming pull
213
214 @param r: the S3Request
215 @param attr: the controller attributes
216 """
217
218 mixed = attr.get("mixed", False)
219 get_vars = r.get_vars
220 vars_get = get_vars.get
221
222 resource = r.resource
223
224
225 repository_uuid = vars_get("repository")
226 connector = None
227
228 if repository_uuid:
229
230 rtable = current.s3db.sync_repository
231 query = rtable.uuid == repository_uuid
232 row = current.db(query).select(limitby=(0, 1)).first()
233 if row:
234 connector = S3SyncRepository(row)
235
236 if connector is None:
237
238 connector = S3SyncRepository(Storage(id = None,
239 name = "unknown",
240 apitype = "eden",
241 ))
242
243 current.log.debug("S3Sync PULL from %s (%s)" % (connector.name,
244 connector.apitype))
245
246
247 start = vars_get("start", None)
248 if start is not None:
249 try:
250 start = int(start)
251 except ValueError:
252 start = None
253 limit = vars_get("limit", None)
254 if limit is not None:
255 try:
256 limit = int(limit)
257 except ValueError:
258 limit = None
259 msince = vars_get("msince", None)
260 if msince is not None:
261 msince = s3_parse_datetime(msince)
262
263
264 filters = {}
265 for k, v in get_vars.items():
266 if k[0] == "[" and "]" in k:
267 tablename, urlvar = k[1:].split("]", 1)
268 if urlvar:
269 if not tablename or tablename == "~":
270 tablename = resource.tablename
271 f = filters.get(tablename, {})
272 u = f.get(urlvar, None)
273 if u:
274 u = "%s&%s" % (u, v)
275 else:
276 u = v
277 f[urlvar] = u
278 filters[tablename] = f
279 if not filters:
280 filters = None
281
282
283
284 components = vars_get("components", None)
285 if components and components.lower() == "none":
286 resource.components.reset(expose=[])
287
288 try:
289 result = connector.send(resource,
290 start = start,
291 limit = limit,
292 msince = msince,
293 filters = filters,
294 mixed = mixed,
295 )
296 except NotImplementedError:
297 r.error(405, "Synchronization method not supported for repository")
298
299 log = self.log
300 log.write(repository_id = connector.id,
301 resource_name = "mixed" if mixed else resource.tablename,
302 transmission = log.IN,
303 mode = log.PULL,
304 action = "send",
305 remote = result.get("remote", False),
306 result = result.get("status", log.NONE),
307 message = result.get("message", ""),
308 )
309
310 return result.get("response")
311
312
314 """
315 Respond to an incoming push
316
317 @param r: the S3Request
318 @param attr: the controller attributes
319 """
320
321 mixed = attr.get("mixed", False)
322 get_vars = r.get_vars
323
324 s3db = current.s3db
325 db = current.db
326
327
328 repository_uuid = get_vars.get("repository")
329 connector = None
330
331 if repository_uuid:
332
333 rtable = s3db.sync_repository
334 query = rtable.uuid == repository_uuid
335 row = current.db(query).select(limitby=(0, 1)).first()
336 if row:
337 connector = S3SyncRepository(row)
338
339 if connector is None:
340
341
342 r.error(403, "Registration required")
343
344 current.log.debug("S3Sync PUSH from %s (%s)" % (connector.name,
345 connector.apitype,
346 ))
347
348
349 default_update_policy = S3ImportItem.POLICY.NEWER
350 default_conflict_policy = S3ImportItem.POLICY.MASTER
351
352
353 ttable = s3db.sync_task
354 if not mixed:
355 query = (ttable.repository_id == connector.id) & \
356 (ttable.resource_name == r.tablename) & \
357 (ttable.deleted != True)
358 task = db(query).select(limitby=(0, 1)).first()
359 else:
360 task = None
361
362 last_sync = None
363 if task:
364 strategy = task.strategy
365 update_policy = task.update_policy or default_update_policy
366 conflict_policy = task.conflict_policy or default_conflict_policy
367 if update_policy not in ("THIS", "OTHER"):
368 last_sync = task.last_pull
369
370 else:
371 policies = S3ImportItem.POLICY
372 p = get_vars.get("update_policy", None)
373 values = {"THIS": "OTHER", "OTHER": "THIS"}
374 switch = lambda p: p in values and values[p] or p
375 if p and p in policies:
376 p = switch(p)
377 update_policy = policies[p]
378 else:
379 update_policy = default_update_policy
380 p = get_vars.get("conflict_policy", None)
381 if p and p in policies:
382 p = switch(p)
383 conflict_policy = policies[p]
384 else:
385 conflict_policy = default_conflict_policy
386 msince = get_vars.get("msince", None)
387 if msince is not None:
388 last_sync = s3_parse_datetime(msince)
389 s = get_vars.get("strategy", None)
390 if s:
391 s = str(s).split(",")
392 methods = S3ImportItem.METHOD
393 strategy = [method for method in methods.values()
394 if method in s]
395 else:
396 strategy = ttable.strategy.default
397
398
399 source = r.read_body()
400
401
402 resource = r.resource
403
404 try:
405 result = connector.receive(source,
406 resource,
407 strategy = strategy,
408 update_policy = update_policy,
409 conflict_policy = conflict_policy,
410 last_sync = last_sync,
411 onconflict = self.onconflict,
412 mixed = mixed,
413 )
414 except IOError:
415 current.auth.permission.fail()
416 except SyntaxError:
417 e = sys.exc_info()[1]
418 r.error(400, e)
419 except NotImplementedError:
420 r.error(405, "Synchronization method not supported for repository")
421
422 log = self.log
423 log.write(repository_id = connector.id,
424 resource_name = "mixed" if mixed else resource.tablename,
425 transmission = log.IN,
426 mode = log.PUSH,
427 action = "receive",
428 remote = result.get("remote", False),
429 result = result.get("status", log.NONE),
430 message = result.get("message", ""),
431 )
432
433 return result.get("response")
434
435
436
437
439 """
440 Synchronize with a repository, called from scheduler task
441
442 @param repository: the repository Row
443
444 @return: True if successful, False if there was an error
445 """
446
447 current.log.debug("S3Sync: synchronize %s" % repository.url)
448
449 log = self.log
450 repository_id = repository.id
451
452 error = None
453 if repository.apitype == "filesync":
454 if not repository.path:
455 error = "No path set for repository"
456 else:
457 if not repository.url:
458 error = "No URL set for repository"
459 if error:
460 log.write(repository_id = repository_id,
461 resource_name = None,
462 transmission = None,
463 mode = log.NONE,
464 action = "connect",
465 remote = False,
466 result = log.FATAL,
467 message = error,
468 )
469 return False
470
471
472 connector = S3SyncRepository(repository)
473 if hasattr(connector, "refresh"):
474 success = connector.refresh()
475
476
477 db = current.db
478 s3db = current.s3db
479 ttable = s3db.sync_task
480 query = (ttable.repository_id == repository_id) & \
481 (ttable.deleted == False)
482 tasks = db(query).select()
483
484
485 error = connector.login()
486 if error:
487 log.write(repository_id = repository_id,
488 resource_name = None,
489 transmission = log.OUT,
490 mode = log.LOGIN,
491 action = "login",
492 remote = True,
493 result = log.FATAL,
494 message = error,
495 )
496 return False
497
498
499 s3 = current.response.s3
500 s3.synchronise_uuids = connector.synchronise_uuids
501
502
503
504
505
506 delta = datetime.timedelta(seconds=1)
507
508 success = True
509 for task in tasks:
510
511
512 mtime = None
513 if task.mode in (1, 3):
514 error, mtime = connector.pull(task,
515 onconflict=self.onconflict,
516 )
517 if error:
518 success = False
519 current.log.debug("S3Sync: %s PULL error: %s" %
520 (task.resource_name, error))
521 continue
522 if mtime is not None:
523 task.update_record(last_pull=mtime+delta)
524
525
526 mtime = None
527 if task.mode in (2, 3):
528 error, mtime = connector.push(task)
529 if error:
530 success = False
531 current.log.debug("S3Sync: %s PUSH error: %s" %
532 (task.resource_name, error))
533 continue
534 if mtime is not None:
535 task.update_record(last_push=mtime+delta)
536
537 current.log.debug("S3Sync.synchronize: %s done" % task.resource_name)
538
539 s3.synchronise_uuids = False
540 db(s3db.sync_repository.id == repository_id).update(
541 last_connected = datetime.datetime.utcnow(),
542 )
543
544 connector.close_archives()
545
546 return success
547
548
549 @classmethod
551 """
552 Automatic conflict resolution
553
554 @param item: the conflicting import item
555 @param repository: the repository the item comes from
556 @param resource: the resource the item shall be imported to
557 """
558
559 s3db = current.s3db
560 debug = current.log.debug
561
562 tablename = resource.tablename
563 resolver = s3db.get_config(tablename, "onconflict")
564
565 debug("Resolving conflict in %s" % resource.tablename)
566 debug("Repository: %s" % repository.name)
567 debug("Conflicting item: %s" % item)
568 debug("Method: %s" % item.method)
569
570 if resolver:
571 debug("Applying custom rule")
572 resolver(item, repository, resource)
573 if item.conflict:
574 debug("Do not accept")
575 else:
576 debug("Accept per custom rule")
577 else:
578 debug("Applying default rule")
579 ttable = s3db.sync_task
580 policies = S3ImportItem.POLICY
581 query = (ttable.repository_id == repository.id) & \
582 (ttable.resource_name == tablename) & \
583 (ttable.deleted != True)
584 task = current.db(query).select(limitby=(0, 1)).first()
585 if task and item.original:
586 original = item.original
587 conflict_policy = task.conflict_policy
588 if conflict_policy == policies.OTHER:
589
590 debug("Accept by default")
591 item.conflict = False
592 elif conflict_policy == policies.NEWER:
593
594 xml = current.xml
595 if xml.MTIME in original and \
596 s3_utc(original[xml.MTIME]) <= item.mtime:
597 debug("Accept because newer")
598 item.conflict = False
599 else:
600 debug("Do not accept")
601 elif conflict_policy == policies.MASTER:
602
603 if current.xml.MCI in original and \
604 original.mci == 0 or item.mci == 1:
605 debug("Accept because master")
606 item.conflict = False
607 else:
608 debug("Do not accept")
609 else:
610
611 debug("Do not accept")
612 else:
613
614 debug("Accept because no rule found")
615 item.conflict = False
616
617
619 """
620 Create an archive for a data set
621
622 @param dataset_id: the data set record ID
623 @param task_id: the scheduler task ID if the archive is
624 created asynchronously
625
626 @return: error message if an error occured, otherwise None
627 """
628
629 db = current.db
630 s3db = current.s3db
631
632 T = current.T
633
634
635 DOWNLOAD = "/default/download"
636
637
638 dtable = s3db.sync_dataset
639 query = (dtable.id == dataset_id) & \
640 (dtable.deleted == False)
641 dataset = db(query).select(dtable.id,
642 dtable.code,
643 dtable.archive_url,
644 dtable.repository_id,
645 limitby = (0, 1),
646 ).first()
647 if not dataset:
648 return T("Data Set not found")
649 elif dataset.repository_id:
650 return T("Cannot create archive from remote data set")
651 else:
652 code = dataset.code
653 if code:
654 filename = "%s.zip" % code
655 else:
656 filename = "dataset-%s.zip" % dataset_id
657
658
659 ttable = s3db.sync_task
660 query = (ttable.dataset_id == dataset_id) & \
661 (ttable.repository_id == None) & \
662 (ttable.mode == 1) & \
663 (ttable.deleted == False)
664 tasks = db(query).select(ttable.id,
665 ttable.uuid,
666 ttable.resource_name,
667 ttable.components,
668 )
669
670 if not tasks:
671 return T("No resources defined for dataset")
672
673
674 atable = s3db.sync_dataset_archive
675 query = (atable.dataset_id == dataset_id) & \
676 (atable.deleted == False)
677 row = db(query).select(atable.id,
678 limitby = (0, 1),
679 ).first()
680 if row:
681 archive_id = row.id
682 if task_id:
683 row.update_record(task_id=task_id)
684 db.commit()
685 else:
686 archive_id = atable.insert(dataset_id = dataset_id)
687 if not archive_id:
688 return T("Could not create or update archive")
689
690
691 archive = S3SyncDataArchive()
692
693 for task in tasks:
694
695
696 components = [] if task.components is False else None
697 resource = current.s3db.resource(task.resource_name,
698 components = components,
699 )
700
701
702 filters = current.sync.get_filters(task.id)
703
704
705 data = resource.export_xml(filters = filters,
706
707 )
708
709
710 archive.add("%s.xml" % task.uuid, data)
711
712
713 fileobj = archive.close()
714
715
716 stored_filename = atable.archive.store(fileobj, filename)
717 db(atable.id == archive_id).update(date = datetime.datetime.utcnow(),
718 archive = stored_filename,
719 task_id = None,
720 )
721
722
723
724 archive_url = dataset.archive_url
725 if not archive_url or archive_url.startswith(DOWNLOAD):
726 dataset.update_record(
727 use_archive = True,
728 archive_url = "%s/%s" % (DOWNLOAD, stored_filename),
729 )
730
731
732 return None
733
734
735
736
737 @property
739 """ Lazy access to the current sync config """
740
741 if self._config is None:
742
743 table = current.s3db.sync_config
744 row = current.db().select(table.ALL, limitby=(0, 1)).first()
745 self._config = row
746
747 return self._config
748
749
751 """ Read the current sync status """
752
753 table = current.s3db.sync_status
754 row = current.db().select(table.ALL, limitby=(0, 1)).first()
755 if not row:
756 row = Storage()
757 return row
758
759
775
776
777 @staticmethod
779 """
780 Get all filters for a synchronization task
781
782 @param task_id: the task ID
783 @return: a dict of dicts like {tablename: {url_var: value}}
784 """
785
786 db = current.db
787 s3db = current.s3db
788
789 ftable = s3db.sync_resource_filter
790 query = (ftable.task_id == task_id) & \
791 (ftable.deleted != True)
792 rows = db(query).select(ftable.tablename,
793 ftable.filter_string,
794 )
795
796 filters = {}
797 for row in rows:
798 tablename = row.tablename
799 if tablename in filters:
800 filters[tablename] = "%s&%s" % (filters[tablename],
801 row.filter_string,
802 )
803 else:
804 filters[tablename] = row.filter_string
805
806 parse_url = S3URLQuery.parse_url
807 for tablename in filters:
808 filters[tablename] = parse_url(filters[tablename])
809 return filters
810
813 """ Synchronization Logger """
814
815 TABLENAME = "sync_log"
816
817
818 SUCCESS = "success"
819 WARNING = "warning"
820 ERROR = "error"
821 FATAL = "fatal"
822
823
824 IN = "incoming"
825 OUT = "outgoing"
826
827
828 PULL = "pull"
829 PUSH = "push"
830 LOGIN = "login"
831 REGISTER = "register"
832
833
834 NONE = "none"
835
836
838 """
839 RESTful method handler
840
841 @param r: the S3Request instance
842 @param attr: controller attributes for the request
843 """
844
845 output = {}
846
847 resource = r.resource
848 if resource.tablename == self.TABLENAME:
849 return resource.crud.select(r, **attr)
850
851 elif resource.tablename == "sync_repository":
852
853 pass
854
855 else:
856 if r.interactive:
857
858 here = "%s.%s" % (r.controller, r.function)
859 sync_log = current.s3db[self.TABLENAME]
860 sync_log.resource_name.readable = False
861 query = (sync_log.resource_name == resource.tablename)
862 r = r.factory(prefix="sync", name="log", args=[])
863 s3 = current.response.s3
864 s3.filter = query
865 s3.prep = None
866 s3.postp = None
867 s3.actions = [{"label": s3_str(current.T("Details")),
868 "_class": "action-btn",
869 "url": URL(c = "sync",
870 f = "log",
871 args = ["[id]"],
872 vars = {"return":here},
873 )
874 },
875 ]
876 output = r(subtitle=None, rheader=self.rheader)
877 else:
878 r.error(415, current.ERROR.BAD_FORMAT)
879
880 return output
881
882
883 @classmethod
884 - def write(cls,
885 repository_id=None,
886 resource_name=None,
887 transmission=None,
888 mode=None,
889 action=None,
890 result=None,
891 remote=False,
892 message=None):
893 """
894 Writes a new entry to the log
895
896 @param repository_id: the repository record ID
897 @param resource_name: the resource name
898 @param transmission: transmission mode (IN, OUT or None)
899 @param mode: synchronization mode (PULL, PUSH or None)
900 @param action: action that triggers the log entry (if any)
901 @param result: the result of the transaction
902 (SUCCESS, WARNING, ERROR or FATAL)
903 @param remote: boolean, True if this is a remote error
904 @param message: clear text message
905 """
906
907 if result not in (cls.SUCCESS, cls.WARNING, cls.ERROR, cls.FATAL):
908 result = cls.SUCCESS
909
910 if result == cls.SUCCESS:
911
912 remote = False
913
914 if transmission not in (cls.IN, cls.OUT):
915 transmission = cls.NONE
916
917 if mode not in (cls.PULL, cls.PUSH, cls.LOGIN, cls.REGISTER):
918 mode = cls.NONE
919
920 if not action:
921 action = cls.NONE
922
923 entry = {"timestmp": datetime.datetime.utcnow(),
924 "repository_id": repository_id,
925 "resource_name": resource_name,
926 "mode": "%s/%s" % (mode, transmission),
927 "action": action,
928 "result": result,
929 "remote": remote,
930 "message": message,
931 }
932
933 current.s3db[cls.TABLENAME].insert(**entry)
934
935
936 @staticmethod
938 """ S3SyncLog resource header """
939
940 if r.id is None:
941 return DIV(current.T("Showing latest entries first"))
942 else:
943 return None
944
947 """ Class representation of a peer repository """
948
950 """
951 Constructor
952
953 @param repository: the repository record (Row)
954 """
955
956
957 self.log = S3SyncLog
958 self._config = None
959
960
961 self.id = repository.id
962 self.name = repository.name
963
964
965 self.apitype = repository.apitype
966 self.backend = repository.backend
967
968
969 self.url = repository.url
970 self.path = repository.path
971
972
973 self.username = repository.username
974 self.password = repository.password
975 self.client_id = repository.client_id
976 self.client_secret = repository.client_secret
977 self.site_key = repository.site_key
978 self.refresh_token = repository.refresh_token
979
980
981 self.proxy = repository.proxy
982
983
984 self.synchronise_uuids = repository.synchronise_uuids
985 self.keep_source = repository.keep_source
986 self.last_refresh = repository.last_refresh
987
988
989 import sync_adapter
990 api = sync_adapter.__dict__.get(self.apitype)
991 if api:
992 adapter = api.S3SyncAdapter(self)
993 else:
994 adapter = S3SyncBaseAdapter(self)
995
996 self.adapter = adapter
997 self.archives = {}
998
999
1000 @property
1002 """ Lazy access to the current sync config """
1003
1004 if self._config is None:
1005
1006 table = current.s3db.sync_config
1007 row = current.db().select(table.ALL, limitby=(0, 1)).first()
1008 self._config = row
1009
1010 return self._config
1011
1012
1014 """
1015 Delegate other attributes and methods to the adapter
1016
1017 @param name: the attribute/method
1018 """
1019
1020 return object.__getattribute__(self.adapter, name)
1021
1022
1024 """
1025 Close any open archives
1026 """
1027
1028 for archive in self.archives.values():
1029 if archive:
1030 archive.close()
1031 self.archives = {}
1032
1035 """
1036 Sync Adapter (base class) - interface providing standard
1037 synchronization methods for the respective repository type.
1038
1039 This class isn't meant to be instantiated or accessed directly,
1040 but is normally accessed through the S3SyncRepository instance.
1041 """
1042
1044 """
1045 Constructor
1046
1047 @param repository: the repository (S3Repository instance)
1048 """
1049
1050 self.repository = repository
1051 self.log = repository.log
1052
1053 self.archives = {}
1054
1055
1056
1057
1059 """
1060 Register this site at the peer repository
1061
1062 @return: True|False to indicate success|failure,
1063 or None if registration is not required
1064 """
1065
1066 raise NotImplementedError
1067
1068
1070 """
1071 Login at the peer repository
1072
1073 @return: None if successful, otherwise the error
1074 """
1075
1076 raise NotImplementedError
1077
1078
1079 - def pull(self, task, onconflict=None):
1080 """
1081 Fetch updates from the peer repository and import them
1082 into the local database (active pull)
1083
1084 @param task: the synchronization task (sync_task Row)
1085 @param onconflict: callback for automatic conflict resolution
1086
1087 @return: tuple (error, mtime), with error=None if successful,
1088 else error=message, and mtime=modification timestamp
1089 of the youngest record sent
1090 """
1091
1092 raise NotImplementedError
1093
1094
1095 - def push(self, task):
1096 """
1097 Extract new updates from the local database and send
1098 them to the peer repository (active push)
1099
1100 @param task: the synchronization task (sync_task Row)
1101
1102 @return: tuple (error, mtime), with error=None if successful,
1103 else error=message, and mtime=modification timestamp
1104 of the youngest record sent
1105 """
1106
1107 raise NotImplementedError
1108
1109
1110 - def send(self,
1111 resource,
1112 start=None,
1113 limit=None,
1114 msince=None,
1115 filters=None,
1116 mixed=False,
1117 pretty_print=False,
1118 ):
1119 """
1120 Respond to an incoming pull from the peer repository
1121
1122 @param resource: the resource to be synchronized
1123 @param start: index of the first record to send
1124 @param limit: maximum number of records to send
1125 @param msince: minimum modification date/time for records to send
1126 @param filters: URL filters for record extraction
1127 @param mixed: negotiate resource with peer (disregard resource)
1128 @param pretty_print: make the output human-readable
1129
1130 @return: a dict {status, remote, message, response}, with:
1131 - status....the outcome of the operation
1132 - remote....whether the error was remote (or local)
1133 - message...the log message
1134 - response..the response to send to the peer
1135 """
1136
1137 raise NotImplementedError
1138
1139
1140 - def receive(self,
1141 source,
1142 resource,
1143 strategy=None,
1144 update_policy=None,
1145 conflict_policy=None,
1146 onconflict=None,
1147 last_sync=None,
1148 mixed=False):
1149 """
1150 Respond to an incoming push from the peer repository
1151
1152 @param source: the input stream (list of file-like objects)
1153 @param resource: the target resource
1154 @param strategy: the import strategy
1155 @param update_policy: the update policy
1156 @param conflict_policy: the conflict resolution policy
1157 @param onconflict: callback for conflict resolution
1158 @param last_sync: the last synchronization date/time for the peer
1159 @param mixed: negotiate resource with peer (disregard resource)
1160
1161 @return: a dict {status, remote, message, response}, with:
1162 - status....the outcome of the operation
1163 - remote....whether the error was remote (or local)
1164 - message...the log message
1165 - response..the response to send to the peer
1166 """
1167
1168 raise NotImplementedError
1169
1172 """
1173 Simple abstraction layer for (compressed) data archives, currently
1174 based on zipfile (Python standard library). Compression additionally
1175 requires zlib to be installed (both for write and read).
1176 """
1177
1178 - def __init__(self, fileobj=None, compressed=True):
1179 """
1180 Create or open an archive
1181
1182 @param fileobj: the file object containing the archive,
1183 None to create a new archive
1184 @param compress: enable (or suppress) compression of new
1185 archives
1186 """
1187
1188 import zipfile
1189
1190 if compressed:
1191 compression = zipfile.ZIP_DEFLATED
1192 else:
1193 compression = zipfile.ZIP_STORED
1194
1195 if fileobj is not None:
1196 if not hasattr(fileobj, "seek"):
1197
1198
1199 fileobj = StringIO(fileobj.read())
1200 try:
1201 archive = zipfile.ZipFile(fileobj, "r")
1202 except RuntimeError:
1203 current.log.warn("invalid ZIP archive: %s" % sys.exc_info()[1])
1204 archive = None
1205 else:
1206 fileobj = StringIO()
1207 try:
1208 archive = zipfile.ZipFile(fileobj, "w", compression, True)
1209 except RuntimeError:
1210
1211 compression = zipfile.ZIP_STORED
1212 archive = zipfile.ZipFile(fileobj, "w", compression, True)
1213 current.log.warn("zlib not available - cannot compress archive")
1214
1215 self.fileobj = fileobj
1216 self.archive = archive
1217
1218
1219 - def add(self, name, obj):
1220 """
1221 Add an object to the archive
1222
1223 @param name: the file name for the object inside the archive
1224 @param obj: the object to add (string or file-like object)
1225
1226 @raises UserWarning: when adding a duplicate name (overwrites
1227 the existing object in the archive)
1228 @raises RuntimeError: if the archive is not writable, or
1229 no valid object name has been provided
1230 @raises TypeError: if the object is not a unicode, str or
1231 file-like object
1232 """
1233
1234
1235 if not name:
1236 raise RuntimeError("name is required")
1237 elif type(name) is not str:
1238 name = s3_str(name)
1239
1240
1241 archive = self.archive
1242 if not archive:
1243 raise RuntimeError("cannot add to closed archive")
1244
1245
1246 if type(obj) is unicode:
1247 obj = obj.encode("utf-8")
1248
1249
1250 if type(obj) is str:
1251 archive.writestr(name, obj)
1252
1253 elif hasattr(obj, "read"):
1254 if hasattr(obj, "seek"):
1255 obj.seek(0)
1256 archive.writestr(name, obj.read())
1257
1258 else:
1259 raise TypeError("invalid object type")
1260
1261
1263 """
1264 Extract an object from the archive by name
1265
1266 @param name: the object name
1267
1268 @return: the object as file-like object, or None if
1269 the object could not be found in the archive
1270 """
1271
1272 if not self.archive:
1273 raise RuntimeError("cannot extract from closed archive")
1274
1275 try:
1276 return self.archive.open(name)
1277 except KeyError:
1278
1279 return None
1280
1281
1283 """
1284 Close the archive and return it as file-like object; no further
1285 add/extract operations will be possible after closing.
1286
1287 @return: the file-like object containing the archive
1288 """
1289
1290 if self.archive:
1291 self.archive.close()
1292 self.archive = None
1293
1294 fileobj = self.fileobj
1295 if fileobj:
1296 fileobj.seek(0)
1297
1298 return fileobj
1299
1300
1301