Package s3 :: Package sync_adapter :: Module adashi
[frames] | no frames]

Source Code for Module s3.sync_adapter.adashi

  1  # -*- coding: utf-8 -*- 
  2   
  3  """ S3 Synchronization: Peer Repository Adapter for ADASHI 
  4   
  5      @copyright: 2011-2019 (c) Sahana Software Foundation 
  6      @license: MIT 
  7   
  8      Permission is hereby granted, free of charge, to any person 
  9      obtaining a copy of this software and associated documentation 
 10      files (the "Software"), to deal in the Software without 
 11      restriction, including without limitation the rights to use, 
 12      copy, modify, merge, publish, distribute, sublicense, and/or sell 
 13      copies of the Software, and to permit persons to whom the 
 14      Software is furnished to do so, subject to the following 
 15      conditions: 
 16   
 17      The above copyright notice and this permission notice shall be 
 18      included in all copies or substantial portions of the Software. 
 19   
 20      THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 
 21      EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 
 22      OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 
 23      NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 
 24      HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
 25      WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
 26      FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 
 27      OTHER DEALINGS IN THE SOFTWARE. 
 28  """ 
 29   
 30  import os 
 31  import sys 
 32   
 33  from gluon import * 
 34   
 35  from ..s3sync import S3SyncBaseAdapter 
 36   
 37  # ============================================================================= 
38 -class S3SyncAdapter(S3SyncBaseAdapter):
39 """ 40 ADASHI Synchronization Adapter (passive) 41 42 http://www.adashisystems.com 43 """ 44 45 # -------------------------------------------------------------------------
46 - def register(self):
47 """ 48 Register this site at the peer repository 49 50 @return: True to indicate success, otherwise False 51 """ 52 53 # No registration required (passive adapter) 54 return True
55 56 # -------------------------------------------------------------------------
57 - def login(self):
58 """ 59 Login at the peer repository 60 61 @return: None if successful, otherwise the error 62 """ 63 64 # No login required (passive adapter) 65 return None
66 67 # -------------------------------------------------------------------------
68 - def pull(self, task, onconflict=None):
69 """ 70 Outgoing pull 71 72 @param task: the task (sync_task Row) 73 """ 74 75 repository = self.repository 76 log = repository.log 77 78 # Import path 79 PATH = os.path.join(current.request.folder, "uploads", "adashi_feeds") 80 81 # Read names from path 82 try: 83 files_list = os.listdir(PATH) 84 except os.error: 85 message = "Upload path does not exist or can not be accessed" 86 log.write(repository_id = repository.id, 87 resource_name = "mixed", 88 transmission = log.IN, 89 mode = log.PUSH, 90 action = "read files from %s" % PATH, 91 remote = False, 92 result = log.FATAL, 93 message = message, 94 ) 95 return message, None 96 97 # Add path to file names, filter for .xml files, sort by mtime 98 files = [os.path.join(PATH, f) 99 for f in files_list if f[-4:] == ".xml"] 100 files = filter(os.path.isfile, files) 101 files.sort(key=os.path.getmtime) 102 103 # Strategy and Policies 104 from ..s3import import S3ImportItem 105 default_update_policy = S3ImportItem.POLICY.NEWER 106 default_conflict_policy = S3ImportItem.POLICY.MASTER 107 strategy = task.strategy 108 update_policy = task.update_policy or default_update_policy 109 conflict_policy = task.conflict_policy or default_conflict_policy 110 if update_policy not in ("THIS", "OTHER"): 111 last_sync = task.last_pull 112 113 # Import files 114 for f in files: 115 current.log.debug("ADASHI Sync: importing %s" % f) 116 try: 117 with open(f, "r") as source: 118 result = self.receive([source], 119 None, 120 strategy=strategy, 121 update_policy=update_policy, 122 conflict_policy=conflict_policy, 123 onconflict=onconflict, 124 last_sync=last_sync, 125 mixed=True, 126 ) 127 except IOError: 128 continue 129 130 # Log the operation 131 log.write(repository_id = repository.id, 132 resource_name = "mixed", 133 transmission = log.IN, 134 mode = log.PUSH, 135 action = "import %s" % f, 136 remote = result["remote"], 137 result = result["status"], 138 message = result["message"], 139 ) 140 141 # Remove the file 142 try: 143 os.remove(f) 144 except os.error: 145 current.log.error("ADASHI Sync: can not delete %s" % f) 146 147 return None, current.request.utcnow
148 149 # -------------------------------------------------------------------------
150 - def push(self, task):
151 """ 152 Outgoing push 153 154 @param task: the sync_task Row 155 """ 156 157 repository = self.repository 158 159 # Log the operation 160 log = repository.log 161 log.write(repository_id = repository.id, 162 resource_name = task.resource_name, 163 transmission = log.OUT, 164 mode = log.PUSH, 165 action = None, 166 remote = False, 167 result = log.FATAL, 168 message = "Push to ADASHI currently not supported", 169 ) 170 171 output = current.xml.json_message(False, 400, message) 172 return output, None
173 174 # -------------------------------------------------------------------------
175 - def send(self, 176 resource, 177 start=None, 178 limit=None, 179 msince=None, 180 filters=None, 181 mixed=False, 182 pretty_print=False):
183 """ 184 Respond to an incoming pull from a peer repository 185 186 @param resource: the resource to be synchronized 187 @param start: index of the first record to send 188 @param limit: maximum number of records to send 189 @param msince: minimum modification date/time for records to send 190 @param filters: URL filters for record extraction 191 @param mixed: negotiate resource with peer (disregard resource) 192 @param pretty_print: make the output human-readable 193 """ 194 195 if not resource or mixed: 196 msg = "Mixed resource synchronization not supported" 197 return {"status": self.log.FATAL, 198 "message": msg, 199 "response": current.xml.json_message(False, 400, msg), 200 } 201 202 # Export the data as S3XML 203 stylesheet = os.path.join(current.request.folder, 204 "static", "formats", "georss", "export.xsl") 205 output = resource.export_xml(start=start, 206 limit=limit, 207 filters=filters, 208 msince=msince, 209 stylesheet=stylesheet, 210 pretty_print=pretty_print, 211 ) 212 count = resource.results 213 msg = "Data sent to peer (%s records)" % count 214 215 # Set content type header 216 headers = current.response.headers 217 headers["Content-Type"] = "text/xml" 218 219 return {"status": self.log.SUCCESS, 220 "message": msg, 221 "response": output, 222 }
223 224 # -------------------------------------------------------------------------
225 - def receive(self, 226 source, 227 resource, 228 strategy=None, 229 update_policy=None, 230 conflict_policy=None, 231 onconflict=None, 232 last_sync=None, 233 mixed=False):
234 """ 235 Respond to an incoming push from the peer repository 236 237 @param source: the input stream (list of file-like objects) 238 @param resource: the target resource 239 @param strategy: the import strategy 240 @param update_policy: the update policy 241 @param conflict_policy: the conflict resolution policy 242 @param onconflict: callback for conflict resolution 243 @param last_sync: the last synchronization date/time for the peer 244 @param mixed: negotiate resource with peer (disregard resource) 245 """ 246 247 s3db = current.s3db 248 249 xml = current.xml 250 log = self.log 251 remote = False 252 253 # Sync always has only one source per request 254 source = source[0] 255 256 # Parse the feed 257 tree = xml.parse(source) 258 if not tree: 259 # Parser error 260 msg = xml.error if xml.error else "Invalid source" 261 return {"status": log.FATAL, 262 "message": msg, 263 "remote": remote, 264 "response": xml.json_message(False, 400, msg), 265 } 266 267 # Identify feed category 268 category = tree.findall("//channel/category") 269 if not category: 270 msg = "Feed category missing" 271 return {"status": log.ERROR, 272 "message": msg, 273 "remote": remote, 274 "response": xml.json_message(False, 400, msg), 275 } 276 category = category[0].text 277 278 # Instantiate target resource after feed category 279 if category == "AVL": 280 resource = s3db.resource("pr_group") 281 elif category == "Incidents": 282 resource = s3db.resource("event_incident") 283 resource.configure(oncommit_import_item = self.update_assignments) 284 else: 285 msg = "Unknown feed category" 286 return {"status": log.WARNING, 287 "message": msg, 288 "remote": remote, 289 "response": xml.json_message(False, 400, msg), 290 } 291 292 # Store source data? 293 repository = self.repository 294 if repository.keep_source: 295 self.keep_source(tree, category) 296 297 # Import transformation stylesheet 298 stylesheet = os.path.join(current.request.folder, 299 "static", 300 "formats", 301 "georss", 302 "import.xsl", 303 ) 304 305 # Import parameters 306 if onconflict: 307 onconflict_callback = lambda item: onconflict(item, 308 repository, 309 resource, 310 ) 311 else: 312 onconflict_callback = None 313 ignore_errors = True 314 315 # Import 316 # Flag to let audit know the repository 317 s3 = current.response.s3 318 s3.repository_id = self.repository.id 319 output = resource.import_xml(tree, 320 format = "xml", 321 stylesheet = stylesheet, 322 ignore_errors = ignore_errors, 323 strategy = strategy, 324 update_policy = update_policy, 325 conflict_policy = conflict_policy, 326 last_sync = last_sync, 327 onconflict = onconflict_callback, 328 source_type = "adashi", 329 ) 330 s3.repository_id = None 331 332 # Process validation errors, if any 333 if resource.error_tree is not None: 334 335 result = log.WARNING if ignore_errors else log.FATAL 336 message = "%s" % resource.error 337 338 for element in resource.error_tree.findall("resource"): 339 error_msg = element.get("error", "unknown error") 340 error_fields = element.findall("data[@error]") 341 if error_fields: 342 for field in error_fields: 343 error_msg = field.get("error", "unknown error") 344 if error_msg: 345 msg = "(UID: %s) %s.%s=%s: %s" % \ 346 (element.get("uuid", None), 347 element.get("name", None), 348 field.get("field", None), 349 field.get("value", field.text), 350 error_msg) 351 message = "%s, %s" % (message, msg) 352 else: 353 msg = "(UID: %s) %s: %s" % \ 354 (element.get("uuid", None), 355 element.get("name", None), 356 error_msg) 357 message = "%s, %s" % (message, msg) 358 else: 359 result = log.SUCCESS 360 message = "Data received from peer" 361 362 return {"status": result, 363 "remote": remote, 364 "message": message, 365 "response": output, 366 }
367 368 # -------------------------------------------------------------------------
369 - def update_assignments(self, item):
370 """ 371 Deactivate all previous unit assignments (event_team) for 372 an incident which are not in this feed update. 373 374 @param item: the import item 375 376 @note: this assumes that the list of incident resources in 377 the feed update is complete (confirmed for ADASHI) 378 @note: must not deactivate assignments which are newer 379 than the feed update (Sync policy NEWER) 380 """ 381 382 if item.tablename == "event_incident" and \ 383 item.id and \ 384 item.method == item.METHOD.UPDATE: 385 386 job = item.job 387 mtime = item.data.get("modified_on") 388 389 if not job or not mtime: 390 return 391 get_item = lambda item_id: job.items.get(item_id) 392 393 # Get the unit names of all current assignments in the feed 394 team_names = set() 395 add_name = team_names.add 396 for citem in item.components: 397 if citem.tablename == "event_team": 398 for ref in citem.references: 399 entry = ref.entry 400 team_item_id = entry.item_id 401 if entry.tablename == "pr_group" and team_item_id: 402 team_item = get_item(team_item_id) 403 team_name = team_item.data.get("name") 404 if team_name: 405 add_name(team_name) 406 break 407 408 s3db = current.s3db 409 410 ltable = s3db.event_team 411 gtable = s3db.pr_group 412 413 # Get all active assignments in the database which are older 414 # than the feed update and which are not in the feed update, 415 # and deactivate them 416 left = gtable.on(ltable.group_id == gtable.id) 417 query = (ltable.incident_id == item.id) & \ 418 (ltable.modified_on <= mtime) & \ 419 (ltable.status == 3) & \ 420 (~(gtable.name.belongs(team_names))) 421 rows = current.db(query).select(ltable.id, left=left) 422 423 inactive = set(row.id for row in rows) 424 current.db(ltable.id.belongs(inactive)).update(status=4)
425 426 # -------------------------------------------------------------------------
427 - def keep_source(self, tree, category):
428 """ 429 Helper method to store source data in file system 430 431 @param tree: the XML element tree of the source 432 @param category: the feed category 433 """ 434 435 repository = self.repository 436 437 # Log the operation 438 log = repository.log 439 log.write(repository_id = repository.id, 440 resource_name = None, 441 transmission = log.IN, 442 mode = log.PUSH, 443 action = "receive", 444 remote = False, 445 result = log.WARNING, 446 message = "'Keep Source Data' active for this repository!", 447 ) 448 449 request = current.request 450 folder = os.path.join(request.folder, "uploads", "adashi") 451 dt = request.utcnow.replace(microsecond=0).isoformat() 452 dt = dt.replace(":", "").replace("-", "") 453 filename = os.path.join(folder, 454 "%s_%s.xml" % (category, dt), 455 ) 456 if not os.path.exists(folder): 457 try: 458 os.mkdir(folder) 459 except OSError: 460 return 461 if filename: 462 try: 463 with open(filename, "w") as f: 464 tree.write(f, pretty_print=True) 465 except IOError: 466 return
467 468 # End ========================================================================= 469