1
2
3 """ S3 Synchronization: Peer Repository Adapter
4
5 @copyright: 2011-2019 (c) Sahana Software Foundation
6 @license: MIT
7
8 Permission is hereby granted, free of charge, to any person
9 obtaining a copy of this software and associated documentation
10 files (the "Software"), to deal in the Software without
11 restriction, including without limitation the rights to use,
12 copy, modify, merge, publish, distribute, sublicense, and/or sell
13 copies of the Software, and to permit persons to whom the
14 Software is furnished to do so, subject to the following
15 conditions:
16
17 The above copyright notice and this permission notice shall be
18 included in all copies or substantial portions of the Software.
19
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
22 OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
25 WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
26 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
27 OTHER DEALINGS IN THE SOFTWARE.
28 """
29
30 import datetime
31 import json
32 import sys
33 import traceback
34 import urllib2
35
36 try:
37 from lxml import etree
38 except ImportError:
39 sys.stderr.write("ERROR: lxml module needed for XML handling\n")
40 raise
41
42 from gluon import current
43
44 from ..s3datetime import s3_encode_iso_datetime
45 from ..s3sync import S3SyncBaseAdapter, S3SyncDataArchive
46 from ..s3validators import JSONERRORS
47
48
50 """
51 Sahana Eden Synchronization Adapter (default sync adapter)
52 """
53
54
56 """
57 Register this site at the peer repository
58
59 @return: True to indicate success, otherwise False
60 """
61
62 repository = self.repository
63 if not repository.url:
64 return True
65
66
67 url = "%s/sync/repository/register.json" % repository.url
68 current.log.debug("S3Sync: register at %s" % url)
69
70
71 config = repository.config
72 name = current.deployment_settings.get_base_public_url().split("//", 1)[1]
73 parameters = {"uuid": config.uuid,
74 "name": name,
75 "apitype": "eden",
76 }
77 data = json.dumps(parameters)
78
79
80 opener = self._http_opener(url,
81 headers = [("Content-Type", "application/json"),
82 ],
83 )
84
85
86 log = repository.log
87 success = True
88 remote = False
89 try:
90 f = opener.open(url, data)
91 except urllib2.HTTPError, e:
92
93 result = log.FATAL
94 remote = True
95 message = e.read()
96 success = False
97 try:
98 message_json = json.loads(message)
99 except JSONERRORS:
100 pass
101 else:
102 message = message_json.get("message", message)
103
104 except urllib2.URLError, e:
105
106 result = log.ERROR
107 remote = True
108 message = "Peer repository unavailable (%s)" % e.reason
109
110 except:
111
112 result = log.FATAL
113 message = sys.exc_info()[1]
114 success = False
115
116 else:
117 result = log.SUCCESS
118
119 message = f.read()
120 try:
121 message_json = json.loads(message)
122 except JSONERRORS:
123 message = "Registration successful"
124 ruid = None
125 else:
126 message = message_json.get("message", message)
127 ruid = message_json.get("sender", None)
128
129 if ruid is not None:
130
131 db = current.db
132 rtable = current.s3db.sync_repository
133 db(rtable.id == repository.id).update(uuid=ruid)
134
135
136 log.write(repository_id = repository.id,
137 transmission = log.OUT,
138 mode = log.PUSH,
139 action = "request registration",
140 remote = remote,
141 result = result,
142 message = message,
143 )
144
145 return success
146
147
149 """
150 Login at the peer repository
151
152 @return: None if successful, otherwise the error
153 """
154
155
156 return None
157
158
159 - def pull(self, task, onconflict=None):
160 """
161 Fetch updates from the peer repository and import them
162 into the local database (active pull)
163
164 @param task: the synchronization task (sync_task Row)
165 @param onconflict: callback for automatic conflict resolution
166
167 @return: tuple (error, mtime), with error=None if successful,
168 else error=message, and mtime=modification timestamp
169 of the youngest record sent
170 """
171
172 xml = current.xml
173 debug = current.log.debug
174
175 repository = self.repository
176 config = repository.config
177 log = repository.log
178
179
180 resource_name = task.resource_name
181 try:
182 resource = current.s3db.resource(resource_name)
183 except AttributeError:
184
185 debug("Undefined resource %s - sync task ignored" % resource_name)
186 return (None, None)
187
188 last_pull = task.last_pull
189 dataset_id = task.dataset_id
190
191 remote = False
192 action = "fetch"
193 output = None
194 response = None
195 result = log.SUCCESS
196
197 use_archived = False
198
199 if not last_pull and dataset_id:
200
201 archive = self._get_archive(dataset_id)
202 if archive:
203 try:
204 response = archive.extract("%s.xml" % task.uuid)
205 except RuntimeError:
206
207
208
209 current.log.error("S3Sync: %s" % sys.exc_info()[1])
210 else:
211 use_archived = True
212
213 if response is None:
214
215 debug("S3Sync: pull %s from %s" % (resource_name, repository.url))
216
217
218 url = "%s/sync/sync.xml?resource=%s&repository=%s" % \
219 (repository.url, resource_name, config.uuid)
220 if last_pull and task.update_policy not in ("THIS", "OTHER"):
221 url += "&msince=%s" % s3_encode_iso_datetime(last_pull)
222 if task.components is False:
223 url += "&mcomponents=None"
224 url += "&include_deleted=True"
225
226
227 from urllib import quote
228 filters = current.sync.get_filters(task.id)
229 for tablename in filters:
230 prefix = "~" if not tablename or tablename == resource_name \
231 else tablename
232 for k, v in filters[tablename].items():
233 vlist = v if type(v) is list else [v]
234 for value in vlist:
235 urlfilter = "[%s]%s=%s" % (prefix, k, quote(value))
236 url += "&%s" % urlfilter
237
238 debug("...pull from URL %s" % url)
239
240
241 remote = False
242 action = "fetch"
243 response = None
244 output = None
245
246 opener = self._http_opener(url)
247 try:
248 f = opener.open(url)
249
250 except urllib2.HTTPError, e:
251 result = log.ERROR
252 remote = True
253 code = e.code
254 message = e.read()
255 try:
256
257
258 message_json = json.loads(message)
259 except JSONERRORS:
260 pass
261 else:
262 message = message_json.get("message", message)
263
264
265 message = "<message>%s</message>" % message
266 try:
267 markup = etree.XML(message)
268 message = markup.xpath(".//text()")
269 if message:
270 message = " ".join(message)
271 else:
272 message = ""
273 except etree.XMLSyntaxError:
274 pass
275 output = xml.json_message(False, code, message, tree=None)
276
277 except urllib2.URLError, e:
278
279 result = log.ERROR
280 remote = True
281 message = "Peer repository unavailable (%s)" % e.reason
282 output = xml.json_message(False, 400, message)
283
284 except:
285 result = log.FATAL
286 message = sys.exc_info()[1]
287 output = xml.json_message(False, 400, message)
288
289 else:
290 result = log.SUCCESS
291 response = f
292
293
294 mtime = None
295 if response:
296
297
298 strategy = task.strategy
299 update_policy = task.update_policy
300 conflict_policy = task.conflict_policy
301
302 success = True
303 message = ""
304 action = "import"
305
306
307 if onconflict:
308 onconflict_callback = lambda item: onconflict(item,
309 repository,
310 resource,
311 )
312 else:
313 onconflict_callback = None
314 count = 0
315 try:
316 success = resource.import_xml(response,
317 ignore_errors = True,
318 strategy = strategy,
319 update_policy = update_policy,
320 conflict_policy = conflict_policy,
321 last_sync = last_pull,
322 onconflict = onconflict_callback,
323 )
324 count = resource.import_count
325
326 except IOError, e:
327 result = log.FATAL
328 message = "%s" % e
329 output = xml.json_message(False, 400, message)
330
331 except:
332
333
334
335
336
337 result = log.FATAL
338 message = "Uncaught Exception During Import: %s" % \
339 traceback.format_exc()
340 output = xml.json_message(False, 500, sys.exc_info()[1])
341
342 mtime = resource.mtime
343
344
345 if resource.error_tree is not None:
346 result = log.WARNING
347 message = "%s" % resource.error
348 for element in resource.error_tree.findall("resource"):
349 for field in element.findall("data[@error]"):
350 error_msg = field.get("error", None)
351 if error_msg:
352 msg = "(UID: %s) %s.%s=%s: %s" % \
353 (element.get("uuid", None),
354 element.get("name", None),
355 field.get("field", None),
356 field.get("value", field.text),
357 field.get("error", None))
358 message = "%s, %s" % (message, msg)
359
360
361 if not success:
362 result = log.FATAL
363 if not message:
364 message = "%s" % resource.error
365 output = xml.json_message(False, 400, message)
366 mtime = None
367
368
369 elif not message:
370 if not count:
371 message = "No data to import (already up-to-date)"
372 else:
373 message = "Data imported successfully (%s records%%s)" % count
374 if use_archived:
375 message = message % ", from archive"
376 else:
377 message = message % ""
378
379 elif result == log.SUCCESS:
380
381 result = log.ERROR
382 remote = True
383 message = "No data received from peer"
384
385
386 log.write(repository_id = repository.id,
387 resource_name = task.resource_name,
388 transmission = log.OUT,
389 mode = log.PULL,
390 action = action,
391 remote = remote,
392 result = result,
393 message = message,
394 )
395
396 debug("S3Sync: pull %s: %s" % (result, message))
397 return (output, mtime)
398
399
400 - def push(self, task):
401 """
402 Extract new updates from the local database and send
403 them to the peer repository (active push)
404
405 @param task: the synchronization task (sync_task Row)
406
407 @return: tuple (error, mtime), with error=None if successful,
408 else error=message, and mtime=modification timestamp
409 of the youngest record sent
410 """
411
412 xml = current.xml
413 debug = current.log.debug
414
415 repository = self.repository
416 config = repository.config
417
418 resource_name = task.resource_name
419 debug("S3SyncRepository.push(%s, %s)" % (repository.url, resource_name))
420
421
422 url = "%s/sync/sync.xml?resource=%s&repository=%s" % \
423 (repository.url, resource_name, config.uuid)
424 strategy = task.strategy
425 if strategy:
426 url += "&strategy=%s" % ",".join(strategy)
427 update_policy = task.update_policy
428 if update_policy:
429 url += "&update_policy=%s" % update_policy
430 conflict_policy = task.conflict_policy
431 if conflict_policy:
432 url += "&conflict_policy=%s" % conflict_policy
433 last_push = task.last_push
434 if last_push and update_policy not in ("THIS", "OTHER"):
435 url += "&msince=%s" % s3_encode_iso_datetime(last_push)
436 else:
437 last_push = None
438
439 debug("...push to URL %s" % url)
440
441 if task.components is False:
442
443 components = []
444 else:
445
446 components = None
447
448
449 resource = current.s3db.resource(resource_name,
450 components = components,
451 include_deleted = True,
452 )
453
454
455 filters = current.sync.get_filters(task.id)
456
457
458 data = resource.export_xml(filters = filters,
459 msince = last_push,
460 )
461 count = resource.results or 0
462 mtime = resource.muntil
463
464
465 remote = False
466 output = None
467 log = repository.log
468 if data and count:
469
470 opener = self._http_opener(url,
471 headers = [("Content-Type", "text/xml"),
472 ],
473 )
474 try:
475 opener.open(url, data)
476 except urllib2.HTTPError, e:
477 result = log.FATAL
478 remote = True
479 code = e.code
480 message = e.read()
481 try:
482
483
484 message_json = json.loads(message)
485 except JSONERRORS:
486 pass
487 else:
488 message = message_json.get("message", message)
489 output = xml.json_message(False, code, message)
490 except urllib2.URLError, e:
491
492 result = log.ERROR
493 remote = True
494 message = "Peer repository unavailable (%s)" % e.reason
495 output = xml.json_message(False, 400, message)
496 except:
497 result = log.FATAL
498 code = 400
499 message = sys.exc_info()[1]
500 output = xml.json_message(False, code, message)
501 else:
502 result = log.SUCCESS
503 message = "data sent successfully (%s records)" % count
504 else:
505
506 result = log.WARNING
507 message = "No data to send"
508
509
510 log.write(repository_id = repository.id,
511 resource_name = task.resource_name,
512 transmission = log.OUT,
513 mode = log.PUSH,
514 action = "send",
515 remote = remote,
516 result = result,
517 message = message,
518 )
519
520 if output is not None:
521 mtime = None
522
523 return (output, mtime)
524
525
526 - def send(self,
527 resource,
528 start=None,
529 limit=None,
530 msince=None,
531 filters=None,
532 mixed=False,
533 pretty_print=False):
534 """
535 Respond to an incoming pull from the peer repository
536
537 @param resource: the resource to be synchronized
538 @param start: index of the first record to send
539 @param limit: maximum number of records to send
540 @param msince: minimum modification date/time for records to send
541 @param filters: URL filters for record extraction
542 @param mixed: negotiate resource with peer (disregard resource)
543 @param pretty_print: make the output human-readable
544
545 @return: a dict {status, remote, message, response}, with:
546 - status....the outcome of the operation
547 - remote....whether the error was remote (or local)
548 - message...the log message
549 - response..the response to send to the peer
550 """
551
552 if not resource or mixed:
553 msg = "Mixed resource synchronization not supported"
554 return {"status": self.log.FATAL,
555 "message": msg,
556 "response": current.xml.json_message(False, 400, msg),
557 }
558
559
560 output = resource.export_xml(start = start,
561 limit = limit,
562 filters = filters,
563 msince = msince,
564 pretty_print = pretty_print,
565 )
566 count = resource.results
567 msg = "Data sent to peer (%s records)" % count
568
569
570 current.db(current.s3db.sync_repository.id == self.repository.id).update(
571 last_connected = datetime.datetime.utcnow(),
572 )
573
574
575 headers = current.response.headers
576 headers["Content-Type"] = "text/xml"
577
578 return {"status": self.log.SUCCESS,
579 "message": msg,
580 "response": output,
581 }
582
583
584 - def receive(self,
585 source,
586 resource,
587 strategy=None,
588 update_policy=None,
589 conflict_policy=None,
590 onconflict=None,
591 last_sync=None,
592 mixed=False):
593 """
594 Respond to an incoming push from the peer repository
595
596 @param source: the input stream (list of file-like objects)
597 @param resource: the target resource
598 @param strategy: the import strategy
599 @param update_policy: the update policy
600 @param conflict_policy: the conflict resolution policy
601 @param onconflict: callback for conflict resolution
602 @param last_sync: the last synchronization date/time for the peer
603 @param mixed: negotiate resource with peer (disregard resource)
604
605 @return: a dict {status, remote, message, response}, with:
606 - status....the outcome of the operation
607 - remote....whether the error was remote (or local)
608 - message...the log message
609 - response..the response to send to the peer
610 """
611
612 if not resource or mixed:
613 msg = "Mixed resource synchronization not supported"
614 return {"status": self.log.FATAL,
615 "remote": False,
616 "message": msg,
617 "response": current.xml.json_message(False, 400, msg),
618 }
619
620 repository = self.repository
621
622
623
624 ignore_errors = True
625
626 if onconflict:
627 onconflict_callback = lambda item: onconflict(item,
628 repository,
629 resource,
630 )
631 else:
632 onconflict_callback = None
633
634 output = resource.import_xml(source,
635 format = "xml",
636 ignore_errors = ignore_errors,
637 strategy = strategy,
638 update_policy = update_policy,
639 conflict_policy = conflict_policy,
640 last_sync = last_sync,
641 onconflict = onconflict_callback,
642 )
643
644 log = self.log
645
646 if resource.error_tree is not None:
647
648 if ignore_errors:
649 result = log.WARNING
650 else:
651 result = log.FATAL
652 remote = True
653 message = "%s" % resource.error
654 for element in resource.error_tree.findall("resource"):
655
656 error_msg = element.get("error", "unknown error")
657
658 error_fields = element.findall("data[@error]")
659 if error_fields:
660 for field in error_fields:
661 error_msg = field.get("error", "unknown error")
662 if error_msg:
663 msg = "(UID: %s) %s.%s=%s: %s" % \
664 (element.get("uuid", None),
665 element.get("name", None),
666 field.get("field", None),
667 field.get("value", field.text),
668 error_msg)
669 message = "%s, %s" % (message, msg)
670
671 else:
672 msg = "(UID: %s) %s: %s" % \
673 (element.get("uuid", None),
674 element.get("name", None),
675 error_msg)
676 message = "%s, %s" % (message, msg)
677 else:
678 result = log.SUCCESS
679 remote = False
680 message = "Data received from peer"
681
682
683 current.db(current.s3db.sync_repository.id == self.repository.id).update(
684 last_connected = datetime.datetime.utcnow(),
685 )
686
687 return {"status": result,
688 "remote": remote,
689 "message": message,
690 "response": output,
691 }
692
693
695 """
696 Get the archive for a data set (fetch it from remote if
697 necessary and available)
698
699 @param dataset_id: the data set ID
700
701 @return: S3SyncDataArchive for extraction
702 """
703
704 s3db = current.s3db
705 db = current.db
706
707 repository = self.repository
708
709 archives = repository.archives
710 if dataset_id in archives:
711
712 return archives[dataset_id]
713
714
715 dtable = s3db.sync_dataset
716 query = (dtable.id == dataset_id) & \
717 (dtable.deleted == False)
718 dataset = db(query).select(dtable.id,
719 dtable.code,
720 dtable.use_archive,
721 dtable.archive_url,
722 limitby = (0, 1),
723 ).first()
724
725 if dataset:
726
727
728 dataset = self._update_dataset(dataset)
729
730 archive_url = dataset.archive_url
731 if not archive_url or not dataset.use_archive:
732
733 archives[dataset_id] = None
734 return None
735
736 archive = None
737
738
739 if archive_url[0] == "/":
740
741 repository_url = repository.url
742 if not repository_url:
743 archives[dataset_id] = None
744 return None
745 url = "%s/%s" % (repository_url.rstrip("/"),
746 archive_url.lstrip("/"),
747 )
748 auth = True
749 else:
750
751
752 url = archive_url
753 auth = False
754
755
756 opener = self._http_opener(url, auth=auth)
757 error = None
758 local_error = False
759 try:
760 f = opener.open(url)
761 except urllib2.HTTPError, e:
762
763 message = e.read()
764 try:
765
766
767 message_json = json.loads(message)
768 except JSONERRORS:
769 pass
770 else:
771 message = message_json.get("message", message)
772
773
774 message = "<message>%s</message>" % message
775 try:
776 markup = etree.XML(message)
777 message = markup.xpath(".//text()")
778 if message:
779 message = " ".join(message)
780 else:
781 message = ""
782 except etree.XMLSyntaxError:
783 pass
784
785
786 error = "[%s] %s" % (e.code, message)
787
788 except urllib2.URLError, e:
789
790 error = "Peer repository unavailable (%s)" % e.reason
791
792 except:
793
794 local_error = True
795 error = sys.exc_info()[1]
796
797 else:
798
799 try:
800 archive = S3SyncDataArchive(f)
801 except RuntimeError:
802 local_error = True
803 error = sys.exc_info()[1]
804
805 log = repository.log
806 if error:
807 log.write(repository_id = repository.id,
808 transmission = log.OUT,
809 mode = log.PULL,
810 action = "fetch archive",
811 remote = not local_error,
812 result = log.ERROR,
813 message = error,
814 )
815 archive = None
816 else:
817 message = "Dataset %s archive downloaded successfully" % dataset.code
818 log.write(repository_id = repository.id,
819 transmission = log.OUT,
820 mode = log.PULL,
821 action = "fetch archive",
822 remote = False,
823 result = log.SUCCESS,
824 message = message,
825 )
826
827
828 archives[dataset_id] = archive
829 return archive
830
831 else:
832 return None
833
834
836 """
837 Update the data set from the repo, if possible
838
839 @param dataset: the sync_dataset Row
840 """
841
842 s3 = current.response.s3
843
844 repository = self.repository
845
846 code = dataset.code
847 error_msg = "S3Sync: cannot update %s dataset from peer" % code
848
849
850 url = "%s/sync/dataset.xml?~.code=%s&mcomponents=None" % \
851 (repository.url, code)
852 opener = self._http_opener(url)
853 try:
854 dataset_info = opener.open(url)
855 except:
856 current.log.error()
857 return dataset
858
859 if dataset_info:
860
861 s3db = current.s3db
862 resource = s3db.resource("sync_dataset", id=dataset.id)
863
864
865 synchronise_uuids = s3.synchronise_uuids
866 s3.synchronise_uuids = True
867
868 try:
869 resource.import_xml(dataset_info)
870 except IOError:
871 current.log.error(error_msg)
872 return dataset
873
874 s3.synchronise_uuids = synchronise_uuids
875
876
877 table = s3db.sync_dataset
878 query = (table.id == dataset.id)
879 dataset = current.db(query).select(table.id,
880 table.code,
881 table.use_archive,
882 table.archive_url,
883 limitby = (0, 1),
884 ).first()
885 return dataset
886
887
888
890 """
891 Configure a HTTP opener for sync operations
892
893 @param url: the target URL
894 """
895
896 repository = self.repository
897 config = repository.config
898
899
900 addheaders = []
901 if headers:
902 addheaders.extend(headers)
903
904
905 handlers = []
906
907
908 proxy = repository.proxy or config.proxy or None
909 if proxy:
910
911 url_split = url.split("://", 1)
912 if len(url_split) == 2:
913 protocol = url_split[0]
914 else:
915 protocol = "http"
916 proxy_handler = urllib2.ProxyHandler({protocol: proxy})
917 handlers.append(proxy_handler)
918
919
920 if auth:
921 username = repository.username
922 password = repository.password
923 if username and password:
924
925 passwd_manager = urllib2.HTTPPasswordMgrWithDefaultRealm()
926 passwd_manager.add_password(realm = None,
927 uri = url,
928 user = username,
929 passwd = password,
930 )
931 auth_handler = urllib2.HTTPBasicAuthHandler(passwd_manager)
932 handlers.append(auth_handler)
933
934
935 opener = urllib2.build_opener(*handlers)
936 if auth and username and password:
937
938
939
940
941 import base64
942 base64string = base64.encodestring('%s:%s' % (username, password))[:-1]
943 addheaders.append(("Authorization", "Basic %s" % base64string))
944
945 if addheaders:
946 opener.addheaders = addheaders
947
948 return opener
949
950
951