Package s3 :: Package sync_adapter :: Module filesync
[frames] | no frames]

Source Code for Module s3.sync_adapter.filesync

  1  # -*- coding: utf-8 -*- 
  2   
  3  """ S3 Synchronization: Peer Repository Adapter 
  4   
  5      @copyright: 2011-2019 (c) Sahana Software Foundation 
  6      @license: MIT 
  7   
  8      Permission is hereby granted, free of charge, to any person 
  9      obtaining a copy of this software and associated documentation 
 10      files (the "Software"), to deal in the Software without 
 11      restriction, including without limitation the rights to use, 
 12      copy, modify, merge, publish, distribute, sublicense, and/or sell 
 13      copies of the Software, and to permit persons to whom the 
 14      Software is furnished to do so, subject to the following 
 15      conditions: 
 16   
 17      The above copyright notice and this permission notice shall be 
 18      included in all copies or substantial portions of the Software. 
 19   
 20      THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 
 21      EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 
 22      OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 
 23      NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 
 24      HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
 25      WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
 26      FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 
 27      OTHER DEALINGS IN THE SOFTWARE. 
 28  """ 
 29   
 30  import datetime 
 31  import glob 
 32  import os 
 33  import sys 
 34   
 35  try: 
 36      from lxml import etree 
 37  except ImportError: 
 38      sys.stderr.write("ERROR: lxml module needed for XML handling\n") 
 39      raise 
 40   
 41  from gluon import * 
 42   
 43  from ..s3sync import S3SyncBaseAdapter 
 44   
 45  # ============================================================================= 
46 -class S3SyncAdapter(S3SyncBaseAdapter):
47 """ 48 File Synchronization Adapter 49 """ 50
51 - def register(self):
52 """ 53 Register this site at the peer repository 54 55 @return: True to indicate success, otherwise False 56 """ 57 58 # No registration needed 59 return True
60 61 # -------------------------------------------------------------------------
62 - def login(self):
63 """ 64 Login at the peer repository 65 66 @return: None if successful, otherwise the error 67 """ 68 69 # No explicit login required 70 return None
71 72 # -------------------------------------------------------------------------
73 - def pull(self, task, onconflict=None):
74 """ 75 Fetch updates from the peer repository and import them 76 into the local database (active pull) 77 78 @param task: the synchronization task (sync_task Row) 79 @param onconflict: callback for automatic conflict resolution 80 81 @return: tuple (error, mtime), with error=None if successful, 82 else error=message, and mtime=modification timestamp 83 of the youngest record sent 84 """ 85 86 repository = self.repository 87 log = repository.log 88 89 error = None 90 result = None 91 92 # Instantiate the target resource 93 tablename = task.resource_name 94 if tablename == "mixed": 95 resource = None 96 mixed = True 97 else: 98 try: 99 resource = current.s3db.resource(tablename) 100 except SyntaxError: 101 result = log.FATAL 102 error = msg = sys.exc_info()[1] 103 mixed = False 104 105 # Get input files 106 if not result: 107 input_files = self._input_files(task) 108 if not input_files: 109 result = log.SUCCESS 110 msg = "No files to import" 111 112 # Instantiate back-end 113 if not result: 114 adapter = None 115 backend = repository.backend 116 if not backend: 117 backend = "eden" 118 backend = "s3.sync_adapter.%s" % backend 119 try: 120 name = "S3SyncAdapter" 121 api = getattr(__import__(backend, fromlist=[name]), name) 122 except ImportError: 123 result = log.FATAL 124 error = msg = "Unsupported back-end: %s" % backend 125 else: 126 adapter = api(repository) 127 128 # If any of the previous actions has produced a non-default result: 129 if result: 130 # Log the operation and return 131 log.write(repository_id = repository.id, 132 resource_name = tablename, 133 transmission = log.OUT, 134 mode = log.PULL, 135 action = None, 136 remote = False, 137 result = result, 138 message = msg, 139 ) 140 return (error, None) 141 142 # Set strategy and policies 143 from ..s3import import S3ImportItem 144 strategy = task.strategy 145 conflict_policy = task.conflict_policy 146 if not conflict_policy: 147 conflict_policy = S3ImportItem.POLICY.MASTER 148 update_policy = task.update_policy 149 if not update_policy: 150 update_policy = S3ImportItem.POLICY.NEWER 151 if update_policy not in ("THIS", "OTHER"): 152 last_sync = task.last_pull 153 else: 154 last_sync = None 155 156 # Import the files 157 error = None 158 mtime = None 159 160 for f in input_files: 161 current.log.debug("FileSync: importing %s" % f) 162 try: 163 with open(f, "r") as source: 164 result = adapter.receive([source], 165 resource, 166 strategy = strategy, 167 update_policy = update_policy, 168 conflict_policy = conflict_policy, 169 onconflict = onconflict, 170 last_sync = last_sync, 171 mixed = mixed, 172 ) 173 except IOError: 174 msg = sys.exc_info()[1] 175 current.log.warning(msg) 176 continue 177 178 status = result["status"] 179 180 # Log the operation 181 log.write(repository_id = repository.id, 182 resource_name = tablename, 183 transmission = log.OUT, 184 mode = log.PULL, 185 action = "import %s" % f, 186 remote = result["remote"], 187 result = status, 188 message = result["message"], 189 ) 190 191 if status in (log.ERROR, log.FATAL): 192 error = "Error while importing %s" % f 193 current.log.error(error) 194 mtime = None 195 196 else: 197 if resource: 198 mtime = resource.mtime 199 else: 200 mtime = current.request.utcnow 201 if task.delete_input_files: 202 try: 203 os.remove(f) 204 except os.error: 205 current.log.warning("FileSync: can not delete %s" % f) 206 else: 207 current.log.debug("FileSync: %s deleted" % f) 208 209 return error, mtime
210 211 # -------------------------------------------------------------------------
212 - def push(self, task):
213 """ 214 Extract new updates from the local database and send 215 them to the peer repository (active push) 216 217 @param task: the synchronization task (sync_task Row) 218 219 @return: tuple (error, mtime), with error=None if successful, 220 else error=message, and mtime=modification timestamp 221 of the youngest record sent 222 """ 223 224 repository = self.repository 225 log = repository.log 226 227 error = None 228 result = None 229 230 # Instantiate the target resource 231 tablename = task.resource_name 232 if tablename == "mixed": 233 resource = None 234 mixed = True 235 else: 236 try: 237 resource = current.s3db.resource(tablename, 238 include_deleted=True, 239 ) 240 except SyntaxError: 241 result = log.FATAL 242 error = msg = sys.exc_info()[1] 243 mixed = False 244 245 # Get output file name 246 if not result: 247 outfile = self._output_file(task) 248 if not outfile: 249 result = log.ERROR 250 if repository.path: 251 error = msg = "No pattern configured for output file name" 252 else: 253 error = msg = "No file path configured for repository" 254 255 # Instantiate the back-end 256 if not result: 257 adapter = None 258 backend = repository.backend 259 if not backend: 260 backend = "eden" 261 backend = "s3.sync_adapter.%s" % backend 262 try: 263 name = "S3SyncAdapter" 264 api = getattr(__import__(backend, fromlist=[name]), name) 265 except ImportError: 266 result = log.FATAL 267 error = msg = "Unsupported back-end: %s" % backend 268 else: 269 adapter = api(repository) 270 271 # If any of the previous actions has produced a non-default result: 272 if result: 273 # Log the operation and return 274 log.write(repository_id = repository.id, 275 resource_name = tablename, 276 transmission = log.OUT, 277 mode = log.PUSH, 278 action = None, 279 remote = False, 280 result = result, 281 message = msg, 282 ) 283 return (error, None) 284 285 # Update policy and msince 286 from ..s3import import S3ImportItem 287 update_policy = task.update_policy 288 if not update_policy: 289 update_policy = S3ImportItem.POLICY.NEWER 290 if update_policy not in ("THIS", "OTHER"): 291 msince = task.last_push 292 else: 293 msince = None 294 295 # Sync filters for this task 296 filters = current.sync.get_filters(task.id) 297 298 # Export the data through the back-end adapter (send) 299 error = None 300 mtime = None 301 302 action = "data export" 303 output = adapter.send(resource, 304 msince = msince, 305 filters = filters, 306 mixed = mixed, 307 pretty_print = task.human_readable, 308 ) 309 310 status = output.get("status") 311 if status in (log.ERROR, log.FATAL): 312 result = status 313 msg = output.get("message") 314 if not msg: 315 msg = "Error while exporting data" 316 error = msg 317 else: 318 response = output.get("response") 319 320 path = repository.path 321 if not os.path.exists(path): 322 # Try to create it 323 try: 324 os.makedirs(path) 325 except OSError: 326 result = log.FATAL 327 error = msg = sys.exc_info()[1] 328 329 if not error: 330 try: 331 action = "open %s" % outfile 332 with open(outfile, "w") as target: 333 target.write(response) 334 except IOError: 335 result = log.FATAL 336 error = msg = sys.exc_info()[1] 337 else: 338 result = log.SUCCESS 339 msg = "Data successfully written to %s" % outfile 340 if resource: 341 msg = "%s (%s records)" % (msg, resource.results) 342 mtime = resource.muntil 343 else: 344 mtime = current.request.utcnow 345 346 # Log the operation 347 log.write(repository_id = repository.id, 348 resource_name = task.resource_name, 349 transmission = log.OUT, 350 mode = log.PUSH, 351 action = action, 352 remote = False, 353 result = result, 354 message = msg, 355 ) 356 357 return (error, mtime)
358 359 # -------------------------------------------------------------------------
360 - def send(self, 361 resource, 362 start=None, 363 limit=None, 364 msince=None, 365 filters=None, 366 mixed=False, 367 pretty_print=False):
368 """ 369 Respond to an incoming pull from the peer repository 370 371 @param resource: the resource to be synchronized 372 @param start: index of the first record to send 373 @param limit: maximum number of records to send 374 @param msince: minimum modification date/time for records to send 375 @param filters: URL filters for record extraction 376 @param mixed: negotiate resource with peer (disregard resource) 377 @param pretty_print: make the output human-readable 378 379 @return: a dict {status, remote, message, response}, with: 380 - status....the outcome of the operation 381 - remote....whether the error was remote (or local) 382 - message...the log message 383 - response..the response to send to the peer 384 """ 385 386 msg = "Send not supported for this repository type" 387 388 return {"status": self.log.FATAL, 389 "remote": False, 390 "message": msg, 391 "response": None, 392 }
393 394 # -------------------------------------------------------------------------
395 - def receive(self, 396 source, 397 resource, 398 strategy=None, 399 update_policy=None, 400 conflict_policy=None, 401 onconflict=None, 402 last_sync=None, 403 mixed=False):
404 """ 405 Respond to an incoming push from the peer repository 406 407 @param source: the input stream (list of file-like objects) 408 @param resource: the target resource 409 @param strategy: the import strategy 410 @param update_policy: the update policy 411 @param conflict_policy: the conflict resolution policy 412 @param onconflict: callback for conflict resolution 413 @param last_sync: the last synchronization date/time for the peer 414 @param mixed: negotiate resource with peer (disregard resource) 415 416 @return: a dict {status, remote, message, response}, with: 417 - status....the outcome of the operation 418 - remote....whether the error was remote (or local) 419 - message...the log message 420 - response..the response to send to the peer 421 """ 422 423 msg = "Receive not supported for this repository type" 424 425 return {"status": self.log.FATAL, 426 "remote": False, 427 "message": msg, 428 "response": None, 429 }
430 431 # -------------------------------------------------------------------------
432 - def _input_files(self, task):
433 """ 434 Helper function to get all relevant input files from the 435 repository path, excluding files which have not been modified 436 since the last pull of the task 437 438 @param task: the synchronization task 439 @return: a list of file paths, ordered by their time 440 stamp (oldest first) 441 """ 442 443 path = self.repository.path 444 if not os.path.isabs(path): 445 path = os.path.join(current.request.folder, path) 446 447 pattern = task.infile_pattern 448 449 if path and pattern: 450 pattern = os.path.join(path, pattern) 451 else: 452 return [] 453 454 all_files = glob.glob(pattern) 455 456 infiles = [] 457 append = infiles.append 458 msince = task.last_pull 459 for f in filter(os.path.isfile, all_files): 460 mtime = datetime.datetime.utcfromtimestamp(os.path.getmtime(f)) 461 # Disregard files which have not been modified since the last pull 462 if msince and mtime <= msince: 463 continue 464 append((mtime, f)) 465 466 # Sort by mtime 467 infiles.sort(key=lambda item: item[0]) 468 469 return [item[1] for item in infiles]
470 471 # -------------------------------------------------------------------------
472 - def _output_file(self, task):
473 """ 474 Helper function to construct the output file name from 475 the repository path and the output file name pattern 476 477 @param task: the synchronization task 478 @return: the output file name, or None if either 479 path or pattern are missing 480 """ 481 482 path = self.repository.path 483 if not os.path.isabs(path): 484 path = os.path.join(current.request.folder, path) 485 486 pattern = task.outfile_pattern 487 488 if not path or not pattern: 489 return None 490 491 # Substitute placeholders in pattern 492 from string import Template 493 template = Template(pattern).safe_substitute( 494 year="%(y)04d", 495 month="%(m)02d", 496 day="%(d)02d", 497 hour="%(H)02d", 498 minute="%(M)02d", 499 second="%(S)02d", 500 timestamp="%(y)04d%(m)02d%(d)02d%(H)02d%(M)02d%(S)02d" 501 ) 502 503 # Fill in the template 504 now = current.request.utcnow 505 filename = template % {"y": now.year, 506 "m": now.month, 507 "d": now.day, 508 "H": now.hour, 509 "M": now.minute, 510 "S": now.second, 511 } 512 513 # Prepend path 514 outfile = os.path.join(path, filename) 515 516 return outfile
517 518 # End ========================================================================= 519