1
2
3 """ S3 Synchronization: Peer Repository Adapter
4
5 @copyright: 2011-2019 (c) Sahana Software Foundation
6 @license: MIT
7
8 Permission is hereby granted, free of charge, to any person
9 obtaining a copy of this software and associated documentation
10 files (the "Software"), to deal in the Software without
11 restriction, including without limitation the rights to use,
12 copy, modify, merge, publish, distribute, sublicense, and/or sell
13 copies of the Software, and to permit persons to whom the
14 Software is furnished to do so, subject to the following
15 conditions:
16
17 The above copyright notice and this permission notice shall be
18 included in all copies or substantial portions of the Software.
19
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
22 OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
25 WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
26 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
27 OTHER DEALINGS IN THE SOFTWARE.
28 """
29
30 import datetime
31 import json
32 import sys
33 import urllib2
34
35 try:
36 from lxml import etree
37 except ImportError:
38 sys.stderr.write("ERROR: lxml module needed for XML handling\n")
39 raise
40
41 from gluon import current
42
43 from ..s3datetime import s3_encode_iso_datetime
44 from ..s3validators import JSONERRORS
45 from eden import S3SyncAdapter as S3SyncEdenAdapter
49 """
50 Sahana Eden Data Repository Synchronization Adapter
51 """
52
53 @staticmethod
55 """
56 Register this site at the peer repository
57
58 @return: True|False to indicate success|failure,
59 or None if registration is not required
60 """
61
62
63 return None
64
65
67 """
68 Refresh sync tasks from peer
69
70 @return: True|False to indicate success|failure,
71 or None if registration is not required
72 """
73
74 db = current.db
75 s3db = current.s3db
76
77 repository = self.repository
78 log = repository.log
79
80 base_url = repository.url
81 if not base_url:
82
83 return None
84
85 debug = current.log.debug
86 debug("Refreshing sync tasks from %s" % base_url)
87
88
89 repository_id = repository.id
90 table = s3db.sync_dataset
91 query = (table.repository_id == repository_id) & \
92 (table.deleted == False)
93 datasets = db(query).select(table.code)
94
95 if not datasets:
96
97 return None
98 else:
99
100 codes = set(dataset.code for dataset in datasets)
101
102
103 from urllib import quote
104 current_tasks = "%s/sync/task.xml?~.dataset_id$code=%s" % \
105 (base_url, quote(",".join(codes)))
106 updates = [{"tablename": "sync_task",
107 "url": current_tasks,
108 "strategy": ("create", "update"),
109 },
110 ]
111
112
113 last_refresh = repository.last_refresh
114 if last_refresh:
115
116 msince = s3_encode_iso_datetime(last_refresh)
117
118
119 deleted = "?include_deleted=True&~.deleted=True"
120 deleted_tasks = "%s/sync/task.xml%s" % (base_url, deleted)
121 deleted_filters = "%s/sync/resource_filter.xml%s" % (base_url, deleted)
122 updates.extend([{"tablename": "sync_task",
123 "url": deleted_tasks,
124 "strategy": ("delete",),
125 },
126 {"tablename": "sync_resource_filter",
127 "url": deleted_filters,
128 "strategy": ("delete",),
129 },
130 ])
131 else:
132
133 msince = None
134
135
136 success = False
137 count = 0
138 timestamp = None
139
140
141 s3 = current.response.s3
142 synchronise_uuids = s3.synchronise_uuids
143 s3.synchronise_uuids = True
144
145
146 for update in updates:
147
148 error = self._fetch(update, msince)
149
150 if error:
151
152 log.write(repository_id = repository_id,
153 transmission = log.OUT,
154 mode = log.PULL,
155 action = "refresh",
156 remote = update.get("remote", False),
157 result = update.get("result", log.ERROR),
158 message = error,
159 )
160 continue
161
162 if update.get("response"):
163 error = self._import(update)
164 if error:
165
166 log.write(repository_id = repository_id,
167 transmission = log.OUT,
168 mode = log.PULL,
169 action = "refresh",
170 remote = update.get("remote", False),
171 result = update.get("result", log.ERROR),
172 message = error,
173 )
174 else:
175
176 success = True
177 if update["tablename"] == "sync_task":
178 count += update.get("count", 0)
179 mtime = update.get("mtime")
180 if mtime and (not timestamp or timestamp < mtime):
181 timestamp = mtime
182 else:
183
184 log.write(repository_id = repository_id,
185 transmission = log.OUT,
186 mode = log.PULL,
187 action = "refresh",
188 remote = False,
189 result = log.WARNING,
190 message = "No data received",
191 )
192
193 if success:
194 if count:
195 message = "Successfully updated %s tasks" % count
196 else:
197 message = "No new/updated tasks found"
198
199 log.write(repository_id = repository_id,
200 transmission = log.OUT,
201 mode = log.PULL,
202 action = "refresh",
203 remote = False,
204 result = log.SUCCESS,
205 message = message,
206 )
207
208 if timestamp:
209
210 delta = datetime.timedelta(seconds=1)
211 rtable = s3db.sync_repository
212 query = (rtable.id == repository_id)
213 db(query).update(last_refresh=timestamp+delta)
214
215 s3.synchronise_uuids = synchronise_uuids
216 return True
217
218
219 - def _fetch(self, update, msince):
220 """
221 Fetch sync task updates from the repository
222
223 @param update: the update dict containing:
224 {"url": the url to fetch,
225 }
226 @return: error message if there was an error, otherwise None
227 """
228
229 log = self.repository.log
230 error = None
231
232
233 url = update["url"]
234 if msince:
235 url = "%s&msince=%s" % (url, msince)
236
237
238 opener = self._http_opener(url)
239 try:
240 f = opener.open(url)
241 except urllib2.HTTPError, e:
242
243 result = log.ERROR
244 update["remote"] = True
245
246
247 message = e.read()
248 try:
249
250
251 message_json = json.loads(message)
252 except JSONERRORS:
253 pass
254 else:
255 message = message_json.get("message", message)
256
257
258 message = "<message>%s</message>" % message
259 try:
260 markup = etree.XML(message)
261 message = markup.xpath(".//text()")
262 if message:
263 message = " ".join(message)
264 else:
265 message = ""
266 except etree.XMLSyntaxError:
267 pass
268
269
270 error = "[%s] %s" % (e.code, message)
271
272 except urllib2.URLError, e:
273
274 result = log.ERROR
275 update["remote"] = True
276 error = "Peer repository unavailable (%s)" % e.reason
277
278 except:
279
280 result = log.FATAL
281 error = sys.exc_info()[1]
282
283 else:
284
285 result = log.SUCCESS
286 update["response"] = f
287
288 update["result"] = result
289 return error
290
291
293 """
294 Import sync task updates
295
296 @param update: the update dict containing:
297 {"response": the response from _fetch,
298 "strategy": the import strategy,
299 }
300 @return: error message if there was an error, otherwise None
301 """
302
303
304 repository = self.repository
305 log = repository.log
306
307 resource = current.s3db.resource(update["tablename"])
308
309
310 strategy = update.get("strategy")
311 if "create" in strategy:
312 table = resource.table
313 table.repository_id.default = repository.id
314
315 error = None
316 result = None
317
318 response = update["response"]
319 try:
320 resource.import_xml(response,
321 ignore_errors = True,
322 strategy = strategy,
323 )
324 except IOError:
325 result = log.FATAL
326 error = "%s" % sys.exc_info()[1]
327
328 except:
329 result = log.FATAL
330 import traceback
331 error = "Uncaught Exception During Import: %s" % \
332 traceback.format_exc()
333
334 else:
335 if resource.error:
336 result = log.ERROR
337 error = resource.error
338 else:
339 result = log.SUCCESS
340 update["count"] = resource.import_count
341 update["mtime"] = resource.mtime
342
343 update["result"] = result
344
345 return error
346
347
348