Package s3 :: Package sync_adapter :: Module data
[frames] | no frames]

Source Code for Module s3.sync_adapter.data

  1  # -*- coding: utf-8 -*- 
  2   
  3  """ S3 Synchronization: Peer Repository Adapter 
  4   
  5      @copyright: 2011-2019 (c) Sahana Software Foundation 
  6      @license: MIT 
  7   
  8      Permission is hereby granted, free of charge, to any person 
  9      obtaining a copy of this software and associated documentation 
 10      files (the "Software"), to deal in the Software without 
 11      restriction, including without limitation the rights to use, 
 12      copy, modify, merge, publish, distribute, sublicense, and/or sell 
 13      copies of the Software, and to permit persons to whom the 
 14      Software is furnished to do so, subject to the following 
 15      conditions: 
 16   
 17      The above copyright notice and this permission notice shall be 
 18      included in all copies or substantial portions of the Software. 
 19   
 20      THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 
 21      EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 
 22      OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 
 23      NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 
 24      HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
 25      WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
 26      FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 
 27      OTHER DEALINGS IN THE SOFTWARE. 
 28  """ 
 29   
 30  import datetime 
 31  import json 
 32  import sys 
 33  import urllib2 
 34   
 35  try: 
 36      from lxml import etree 
 37  except ImportError: 
 38      sys.stderr.write("ERROR: lxml module needed for XML handling\n") 
 39      raise 
 40   
 41  from gluon import current 
 42   
 43  from ..s3datetime import s3_encode_iso_datetime 
 44  from ..s3validators import JSONERRORS 
 45  from eden import S3SyncAdapter as S3SyncEdenAdapter 
46 47 # ============================================================================= 48 -class S3SyncAdapter(S3SyncEdenAdapter):
49 """ 50 Sahana Eden Data Repository Synchronization Adapter 51 """ 52 53 @staticmethod
54 - def register():
55 """ 56 Register this site at the peer repository 57 58 @return: True|False to indicate success|failure, 59 or None if registration is not required 60 """ 61 62 # No registration required 63 return None
64 65 # -------------------------------------------------------------------------
66 - def refresh(self):
67 """ 68 Refresh sync tasks from peer 69 70 @return: True|False to indicate success|failure, 71 or None if registration is not required 72 """ 73 74 db = current.db 75 s3db = current.s3db 76 77 repository = self.repository 78 log = repository.log 79 80 base_url = repository.url 81 if not base_url: 82 # No repository URL => refresh not possible 83 return None 84 85 debug = current.log.debug 86 debug("Refreshing sync tasks from %s" % base_url) 87 88 # Look up all data sets currently configured for this repository 89 repository_id = repository.id 90 table = s3db.sync_dataset 91 query = (table.repository_id == repository_id) & \ 92 (table.deleted == False) 93 datasets = db(query).select(table.code) 94 95 if not datasets: 96 # No data sets configured => no tasks to refresh 97 return None 98 else: 99 # Collect the data set codes 100 codes = set(dataset.code for dataset in datasets) 101 102 # Determine the necessary updates 103 from urllib import quote 104 current_tasks = "%s/sync/task.xml?~.dataset_id$code=%s" % \ 105 (base_url, quote(",".join(codes))) 106 updates = [{"tablename": "sync_task", 107 "url": current_tasks, 108 "strategy": ("create", "update"), 109 }, 110 ] 111 112 # Look up last refresh date 113 last_refresh = repository.last_refresh 114 if last_refresh: 115 # Use msince 116 msince = s3_encode_iso_datetime(last_refresh) 117 118 # Also fetch deleted tasks/filters 119 deleted = "?include_deleted=True&~.deleted=True" 120 deleted_tasks = "%s/sync/task.xml%s" % (base_url, deleted) 121 deleted_filters = "%s/sync/resource_filter.xml%s" % (base_url, deleted) 122 updates.extend([{"tablename": "sync_task", 123 "url": deleted_tasks, 124 "strategy": ("delete",), 125 }, 126 {"tablename": "sync_resource_filter", 127 "url": deleted_filters, 128 "strategy": ("delete",), 129 }, 130 ]) 131 else: 132 # No msince, just get all current tasks for the configured data sets 133 msince = None 134 135 # Initialize log details 136 success = False 137 count = 0 138 timestamp = None 139 140 # Enable UUID synchronization 141 s3 = current.response.s3 142 synchronise_uuids = s3.synchronise_uuids 143 s3.synchronise_uuids = True 144 145 # Fetch the sync tasks for the configured data sets 146 for update in updates: 147 148 error = self._fetch(update, msince) 149 150 if error: 151 # Log the error, skip the update 152 log.write(repository_id = repository_id, 153 transmission = log.OUT, 154 mode = log.PULL, 155 action = "refresh", 156 remote = update.get("remote", False), 157 result = update.get("result", log.ERROR), 158 message = error, 159 ) 160 continue 161 162 if update.get("response"): 163 error = self._import(update) 164 if error: 165 # Log the error 166 log.write(repository_id = repository_id, 167 transmission = log.OUT, 168 mode = log.PULL, 169 action = "refresh", 170 remote = update.get("remote", False), 171 result = update.get("result", log.ERROR), 172 message = error, 173 ) 174 else: 175 # Success - collect count/mtime 176 success = True 177 if update["tablename"] == "sync_task": 178 count += update.get("count", 0) 179 mtime = update.get("mtime") 180 if mtime and (not timestamp or timestamp < mtime): 181 timestamp = mtime 182 else: 183 # Log no data received 184 log.write(repository_id = repository_id, 185 transmission = log.OUT, 186 mode = log.PULL, 187 action = "refresh", 188 remote = False, 189 result = log.WARNING, 190 message = "No data received", 191 ) 192 193 if success: 194 if count: 195 message = "Successfully updated %s tasks" % count 196 else: 197 message = "No new/updated tasks found" 198 199 log.write(repository_id = repository_id, 200 transmission = log.OUT, 201 mode = log.PULL, 202 action = "refresh", 203 remote = False, 204 result = log.SUCCESS, 205 message = message, 206 ) 207 208 if timestamp: 209 # Update last_refresh 210 delta = datetime.timedelta(seconds=1) 211 rtable = s3db.sync_repository 212 query = (rtable.id == repository_id) 213 db(query).update(last_refresh=timestamp+delta) 214 215 s3.synchronise_uuids = synchronise_uuids 216 return True
217 218 # -------------------------------------------------------------------------
219 - def _fetch(self, update, msince):
220 """ 221 Fetch sync task updates from the repository 222 223 @param update: the update dict containing: 224 {"url": the url to fetch, 225 } 226 @return: error message if there was an error, otherwise None 227 """ 228 229 log = self.repository.log 230 error = None 231 232 # Get the URL, add msince if available 233 url = update["url"] 234 if msince: 235 url = "%s&msince=%s" % (url, msince) 236 237 # Fetch the data 238 opener = self._http_opener(url) 239 try: 240 f = opener.open(url) 241 except urllib2.HTTPError, e: 242 # HTTP status (remote error) 243 result = log.ERROR 244 update["remote"] = True 245 246 # Get the error message 247 message = e.read() 248 try: 249 # Sahana-Eden would send a JSON message, 250 # try to extract the actual error message: 251 message_json = json.loads(message) 252 except JSONERRORS: 253 pass 254 else: 255 message = message_json.get("message", message) 256 257 # Strip XML markup from the message 258 message = "<message>%s</message>" % message 259 try: 260 markup = etree.XML(message) 261 message = markup.xpath(".//text()") 262 if message: 263 message = " ".join(message) 264 else: 265 message = "" 266 except etree.XMLSyntaxError: 267 pass 268 269 # Prepend HTTP status code 270 error = "[%s] %s" % (e.code, message) 271 272 except urllib2.URLError, e: 273 # URL Error (network error) 274 result = log.ERROR 275 update["remote"] = True 276 error = "Peer repository unavailable (%s)" % e.reason 277 278 except: 279 # Other error (local error) 280 result = log.FATAL 281 error = sys.exc_info()[1] 282 283 else: 284 # Success 285 result = log.SUCCESS 286 update["response"] = f 287 288 update["result"] = result 289 return error
290 291 # -------------------------------------------------------------------------
292 - def _import(self, update):
293 """ 294 Import sync task updates 295 296 @param update: the update dict containing: 297 {"response": the response from _fetch, 298 "strategy": the import strategy, 299 } 300 @return: error message if there was an error, otherwise None 301 """ 302 303 304 repository = self.repository 305 log = repository.log 306 307 resource = current.s3db.resource(update["tablename"]) 308 309 # Set default repository for newly imported tasks 310 strategy = update.get("strategy") 311 if "create" in strategy: 312 table = resource.table 313 table.repository_id.default = repository.id 314 315 error = None 316 result = None 317 318 response = update["response"] 319 try: 320 resource.import_xml(response, 321 ignore_errors = True, 322 strategy = strategy, 323 ) 324 except IOError: 325 result = log.FATAL 326 error = "%s" % sys.exc_info()[1] 327 328 except: 329 result = log.FATAL 330 import traceback 331 error = "Uncaught Exception During Import: %s" % \ 332 traceback.format_exc() 333 334 else: 335 if resource.error: 336 result = log.ERROR 337 error = resource.error 338 else: 339 result = log.SUCCESS 340 update["count"] = resource.import_count 341 update["mtime"] = resource.mtime 342 343 update["result"] = result 344 345 return error
346 347 # End ========================================================================= 348