1
2
3 """ S3 Synchronization: Peer Repository Adapter for ADASHI
4
5 @copyright: 2011-2019 (c) Sahana Software Foundation
6 @license: MIT
7
8 Permission is hereby granted, free of charge, to any person
9 obtaining a copy of this software and associated documentation
10 files (the "Software"), to deal in the Software without
11 restriction, including without limitation the rights to use,
12 copy, modify, merge, publish, distribute, sublicense, and/or sell
13 copies of the Software, and to permit persons to whom the
14 Software is furnished to do so, subject to the following
15 conditions:
16
17 The above copyright notice and this permission notice shall be
18 included in all copies or substantial portions of the Software.
19
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
22 OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
25 WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
26 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
27 OTHER DEALINGS IN THE SOFTWARE.
28 """
29
30 import os
31 import sys
32
33 from gluon import *
34
35 from ..s3sync import S3SyncBaseAdapter
36
37
39 """
40 ADASHI Synchronization Adapter (passive)
41
42 http://www.adashisystems.com
43 """
44
45
47 """
48 Register this site at the peer repository
49
50 @return: True to indicate success, otherwise False
51 """
52
53
54 return True
55
56
58 """
59 Login at the peer repository
60
61 @return: None if successful, otherwise the error
62 """
63
64
65 return None
66
67
68 - def pull(self, task, onconflict=None):
69 """
70 Outgoing pull
71
72 @param task: the task (sync_task Row)
73 """
74
75 repository = self.repository
76 log = repository.log
77
78
79 PATH = os.path.join(current.request.folder, "uploads", "adashi_feeds")
80
81
82 try:
83 files_list = os.listdir(PATH)
84 except os.error:
85 message = "Upload path does not exist or can not be accessed"
86 log.write(repository_id = repository.id,
87 resource_name = "mixed",
88 transmission = log.IN,
89 mode = log.PUSH,
90 action = "read files from %s" % PATH,
91 remote = False,
92 result = log.FATAL,
93 message = message,
94 )
95 return message, None
96
97
98 files = [os.path.join(PATH, f)
99 for f in files_list if f[-4:] == ".xml"]
100 files = filter(os.path.isfile, files)
101 files.sort(key=os.path.getmtime)
102
103
104 from ..s3import import S3ImportItem
105 default_update_policy = S3ImportItem.POLICY.NEWER
106 default_conflict_policy = S3ImportItem.POLICY.MASTER
107 strategy = task.strategy
108 update_policy = task.update_policy or default_update_policy
109 conflict_policy = task.conflict_policy or default_conflict_policy
110 if update_policy not in ("THIS", "OTHER"):
111 last_sync = task.last_pull
112
113
114 for f in files:
115 current.log.debug("ADASHI Sync: importing %s" % f)
116 try:
117 with open(f, "r") as source:
118 result = self.receive([source],
119 None,
120 strategy=strategy,
121 update_policy=update_policy,
122 conflict_policy=conflict_policy,
123 onconflict=onconflict,
124 last_sync=last_sync,
125 mixed=True,
126 )
127 except IOError:
128 continue
129
130
131 log.write(repository_id = repository.id,
132 resource_name = "mixed",
133 transmission = log.IN,
134 mode = log.PUSH,
135 action = "import %s" % f,
136 remote = result["remote"],
137 result = result["status"],
138 message = result["message"],
139 )
140
141
142 try:
143 os.remove(f)
144 except os.error:
145 current.log.error("ADASHI Sync: can not delete %s" % f)
146
147 return None, current.request.utcnow
148
149
150 - def push(self, task):
151 """
152 Outgoing push
153
154 @param task: the sync_task Row
155 """
156
157 repository = self.repository
158
159
160 log = repository.log
161 log.write(repository_id = repository.id,
162 resource_name = task.resource_name,
163 transmission = log.OUT,
164 mode = log.PUSH,
165 action = None,
166 remote = False,
167 result = log.FATAL,
168 message = "Push to ADASHI currently not supported",
169 )
170
171 output = current.xml.json_message(False, 400, message)
172 return output, None
173
174
175 - def send(self,
176 resource,
177 start=None,
178 limit=None,
179 msince=None,
180 filters=None,
181 mixed=False,
182 pretty_print=False):
183 """
184 Respond to an incoming pull from a peer repository
185
186 @param resource: the resource to be synchronized
187 @param start: index of the first record to send
188 @param limit: maximum number of records to send
189 @param msince: minimum modification date/time for records to send
190 @param filters: URL filters for record extraction
191 @param mixed: negotiate resource with peer (disregard resource)
192 @param pretty_print: make the output human-readable
193 """
194
195 if not resource or mixed:
196 msg = "Mixed resource synchronization not supported"
197 return {"status": self.log.FATAL,
198 "message": msg,
199 "response": current.xml.json_message(False, 400, msg),
200 }
201
202
203 stylesheet = os.path.join(current.request.folder,
204 "static", "formats", "georss", "export.xsl")
205 output = resource.export_xml(start=start,
206 limit=limit,
207 filters=filters,
208 msince=msince,
209 stylesheet=stylesheet,
210 pretty_print=pretty_print,
211 )
212 count = resource.results
213 msg = "Data sent to peer (%s records)" % count
214
215
216 headers = current.response.headers
217 headers["Content-Type"] = "text/xml"
218
219 return {"status": self.log.SUCCESS,
220 "message": msg,
221 "response": output,
222 }
223
224
225 - def receive(self,
226 source,
227 resource,
228 strategy=None,
229 update_policy=None,
230 conflict_policy=None,
231 onconflict=None,
232 last_sync=None,
233 mixed=False):
234 """
235 Respond to an incoming push from the peer repository
236
237 @param source: the input stream (list of file-like objects)
238 @param resource: the target resource
239 @param strategy: the import strategy
240 @param update_policy: the update policy
241 @param conflict_policy: the conflict resolution policy
242 @param onconflict: callback for conflict resolution
243 @param last_sync: the last synchronization date/time for the peer
244 @param mixed: negotiate resource with peer (disregard resource)
245 """
246
247 s3db = current.s3db
248
249 xml = current.xml
250 log = self.log
251 remote = False
252
253
254 source = source[0]
255
256
257 tree = xml.parse(source)
258 if not tree:
259
260 msg = xml.error if xml.error else "Invalid source"
261 return {"status": log.FATAL,
262 "message": msg,
263 "remote": remote,
264 "response": xml.json_message(False, 400, msg),
265 }
266
267
268 category = tree.findall("//channel/category")
269 if not category:
270 msg = "Feed category missing"
271 return {"status": log.ERROR,
272 "message": msg,
273 "remote": remote,
274 "response": xml.json_message(False, 400, msg),
275 }
276 category = category[0].text
277
278
279 if category == "AVL":
280 resource = s3db.resource("pr_group")
281 elif category == "Incidents":
282 resource = s3db.resource("event_incident")
283 resource.configure(oncommit_import_item = self.update_assignments)
284 else:
285 msg = "Unknown feed category"
286 return {"status": log.WARNING,
287 "message": msg,
288 "remote": remote,
289 "response": xml.json_message(False, 400, msg),
290 }
291
292
293 repository = self.repository
294 if repository.keep_source:
295 self.keep_source(tree, category)
296
297
298 stylesheet = os.path.join(current.request.folder,
299 "static",
300 "formats",
301 "georss",
302 "import.xsl",
303 )
304
305
306 if onconflict:
307 onconflict_callback = lambda item: onconflict(item,
308 repository,
309 resource,
310 )
311 else:
312 onconflict_callback = None
313 ignore_errors = True
314
315
316
317 s3 = current.response.s3
318 s3.repository_id = self.repository.id
319 output = resource.import_xml(tree,
320 format = "xml",
321 stylesheet = stylesheet,
322 ignore_errors = ignore_errors,
323 strategy = strategy,
324 update_policy = update_policy,
325 conflict_policy = conflict_policy,
326 last_sync = last_sync,
327 onconflict = onconflict_callback,
328 source_type = "adashi",
329 )
330 s3.repository_id = None
331
332
333 if resource.error_tree is not None:
334
335 result = log.WARNING if ignore_errors else log.FATAL
336 message = "%s" % resource.error
337
338 for element in resource.error_tree.findall("resource"):
339 error_msg = element.get("error", "unknown error")
340 error_fields = element.findall("data[@error]")
341 if error_fields:
342 for field in error_fields:
343 error_msg = field.get("error", "unknown error")
344 if error_msg:
345 msg = "(UID: %s) %s.%s=%s: %s" % \
346 (element.get("uuid", None),
347 element.get("name", None),
348 field.get("field", None),
349 field.get("value", field.text),
350 error_msg)
351 message = "%s, %s" % (message, msg)
352 else:
353 msg = "(UID: %s) %s: %s" % \
354 (element.get("uuid", None),
355 element.get("name", None),
356 error_msg)
357 message = "%s, %s" % (message, msg)
358 else:
359 result = log.SUCCESS
360 message = "Data received from peer"
361
362 return {"status": result,
363 "remote": remote,
364 "message": message,
365 "response": output,
366 }
367
368
370 """
371 Deactivate all previous unit assignments (event_team) for
372 an incident which are not in this feed update.
373
374 @param item: the import item
375
376 @note: this assumes that the list of incident resources in
377 the feed update is complete (confirmed for ADASHI)
378 @note: must not deactivate assignments which are newer
379 than the feed update (Sync policy NEWER)
380 """
381
382 if item.tablename == "event_incident" and \
383 item.id and \
384 item.method == item.METHOD.UPDATE:
385
386 job = item.job
387 mtime = item.data.get("modified_on")
388
389 if not job or not mtime:
390 return
391 get_item = lambda item_id: job.items.get(item_id)
392
393
394 team_names = set()
395 add_name = team_names.add
396 for citem in item.components:
397 if citem.tablename == "event_team":
398 for ref in citem.references:
399 entry = ref.entry
400 team_item_id = entry.item_id
401 if entry.tablename == "pr_group" and team_item_id:
402 team_item = get_item(team_item_id)
403 team_name = team_item.data.get("name")
404 if team_name:
405 add_name(team_name)
406 break
407
408 s3db = current.s3db
409
410 ltable = s3db.event_team
411 gtable = s3db.pr_group
412
413
414
415
416 left = gtable.on(ltable.group_id == gtable.id)
417 query = (ltable.incident_id == item.id) & \
418 (ltable.modified_on <= mtime) & \
419 (ltable.status == 3) & \
420 (~(gtable.name.belongs(team_names)))
421 rows = current.db(query).select(ltable.id, left=left)
422
423 inactive = set(row.id for row in rows)
424 current.db(ltable.id.belongs(inactive)).update(status=4)
425
426
428 """
429 Helper method to store source data in file system
430
431 @param tree: the XML element tree of the source
432 @param category: the feed category
433 """
434
435 repository = self.repository
436
437
438 log = repository.log
439 log.write(repository_id = repository.id,
440 resource_name = None,
441 transmission = log.IN,
442 mode = log.PUSH,
443 action = "receive",
444 remote = False,
445 result = log.WARNING,
446 message = "'Keep Source Data' active for this repository!",
447 )
448
449 request = current.request
450 folder = os.path.join(request.folder, "uploads", "adashi")
451 dt = request.utcnow.replace(microsecond=0).isoformat()
452 dt = dt.replace(":", "").replace("-", "")
453 filename = os.path.join(folder,
454 "%s_%s.xml" % (category, dt),
455 )
456 if not os.path.exists(folder):
457 try:
458 os.mkdir(folder)
459 except OSError:
460 return
461 if filename:
462 try:
463 with open(filename, "w") as f:
464 tree.write(f, pretty_print=True)
465 except IOError:
466 return
467
468
469