1
2
3 """ S3 Synchronization: Peer Repository Adapter
4
5 @copyright: 2011-2019 (c) Sahana Software Foundation
6 @license: MIT
7
8 Permission is hereby granted, free of charge, to any person
9 obtaining a copy of this software and associated documentation
10 files (the "Software"), to deal in the Software without
11 restriction, including without limitation the rights to use,
12 copy, modify, merge, publish, distribute, sublicense, and/or sell
13 copies of the Software, and to permit persons to whom the
14 Software is furnished to do so, subject to the following
15 conditions:
16
17 The above copyright notice and this permission notice shall be
18 included in all copies or substantial portions of the Software.
19
20 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
22 OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
23 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
25 WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
26 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
27 OTHER DEALINGS IN THE SOFTWARE.
28 """
29
30 import datetime
31 import glob
32 import os
33 import sys
34
35 try:
36 from lxml import etree
37 except ImportError:
38 sys.stderr.write("ERROR: lxml module needed for XML handling\n")
39 raise
40
41 from gluon import *
42
43 from ..s3sync import S3SyncBaseAdapter
44
45
47 """
48 File Synchronization Adapter
49 """
50
52 """
53 Register this site at the peer repository
54
55 @return: True to indicate success, otherwise False
56 """
57
58
59 return True
60
61
63 """
64 Login at the peer repository
65
66 @return: None if successful, otherwise the error
67 """
68
69
70 return None
71
72
73 - def pull(self, task, onconflict=None):
74 """
75 Fetch updates from the peer repository and import them
76 into the local database (active pull)
77
78 @param task: the synchronization task (sync_task Row)
79 @param onconflict: callback for automatic conflict resolution
80
81 @return: tuple (error, mtime), with error=None if successful,
82 else error=message, and mtime=modification timestamp
83 of the youngest record sent
84 """
85
86 repository = self.repository
87 log = repository.log
88
89 error = None
90 result = None
91
92
93 tablename = task.resource_name
94 if tablename == "mixed":
95 resource = None
96 mixed = True
97 else:
98 try:
99 resource = current.s3db.resource(tablename)
100 except SyntaxError:
101 result = log.FATAL
102 error = msg = sys.exc_info()[1]
103 mixed = False
104
105
106 if not result:
107 input_files = self._input_files(task)
108 if not input_files:
109 result = log.SUCCESS
110 msg = "No files to import"
111
112
113 if not result:
114 adapter = None
115 backend = repository.backend
116 if not backend:
117 backend = "eden"
118 backend = "s3.sync_adapter.%s" % backend
119 try:
120 name = "S3SyncAdapter"
121 api = getattr(__import__(backend, fromlist=[name]), name)
122 except ImportError:
123 result = log.FATAL
124 error = msg = "Unsupported back-end: %s" % backend
125 else:
126 adapter = api(repository)
127
128
129 if result:
130
131 log.write(repository_id = repository.id,
132 resource_name = tablename,
133 transmission = log.OUT,
134 mode = log.PULL,
135 action = None,
136 remote = False,
137 result = result,
138 message = msg,
139 )
140 return (error, None)
141
142
143 from ..s3import import S3ImportItem
144 strategy = task.strategy
145 conflict_policy = task.conflict_policy
146 if not conflict_policy:
147 conflict_policy = S3ImportItem.POLICY.MASTER
148 update_policy = task.update_policy
149 if not update_policy:
150 update_policy = S3ImportItem.POLICY.NEWER
151 if update_policy not in ("THIS", "OTHER"):
152 last_sync = task.last_pull
153 else:
154 last_sync = None
155
156
157 error = None
158 mtime = None
159
160 for f in input_files:
161 current.log.debug("FileSync: importing %s" % f)
162 try:
163 with open(f, "r") as source:
164 result = adapter.receive([source],
165 resource,
166 strategy = strategy,
167 update_policy = update_policy,
168 conflict_policy = conflict_policy,
169 onconflict = onconflict,
170 last_sync = last_sync,
171 mixed = mixed,
172 )
173 except IOError:
174 msg = sys.exc_info()[1]
175 current.log.warning(msg)
176 continue
177
178 status = result["status"]
179
180
181 log.write(repository_id = repository.id,
182 resource_name = tablename,
183 transmission = log.OUT,
184 mode = log.PULL,
185 action = "import %s" % f,
186 remote = result["remote"],
187 result = status,
188 message = result["message"],
189 )
190
191 if status in (log.ERROR, log.FATAL):
192 error = "Error while importing %s" % f
193 current.log.error(error)
194 mtime = None
195
196 else:
197 if resource:
198 mtime = resource.mtime
199 else:
200 mtime = current.request.utcnow
201 if task.delete_input_files:
202 try:
203 os.remove(f)
204 except os.error:
205 current.log.warning("FileSync: can not delete %s" % f)
206 else:
207 current.log.debug("FileSync: %s deleted" % f)
208
209 return error, mtime
210
211
212 - def push(self, task):
213 """
214 Extract new updates from the local database and send
215 them to the peer repository (active push)
216
217 @param task: the synchronization task (sync_task Row)
218
219 @return: tuple (error, mtime), with error=None if successful,
220 else error=message, and mtime=modification timestamp
221 of the youngest record sent
222 """
223
224 repository = self.repository
225 log = repository.log
226
227 error = None
228 result = None
229
230
231 tablename = task.resource_name
232 if tablename == "mixed":
233 resource = None
234 mixed = True
235 else:
236 try:
237 resource = current.s3db.resource(tablename,
238 include_deleted=True,
239 )
240 except SyntaxError:
241 result = log.FATAL
242 error = msg = sys.exc_info()[1]
243 mixed = False
244
245
246 if not result:
247 outfile = self._output_file(task)
248 if not outfile:
249 result = log.ERROR
250 if repository.path:
251 error = msg = "No pattern configured for output file name"
252 else:
253 error = msg = "No file path configured for repository"
254
255
256 if not result:
257 adapter = None
258 backend = repository.backend
259 if not backend:
260 backend = "eden"
261 backend = "s3.sync_adapter.%s" % backend
262 try:
263 name = "S3SyncAdapter"
264 api = getattr(__import__(backend, fromlist=[name]), name)
265 except ImportError:
266 result = log.FATAL
267 error = msg = "Unsupported back-end: %s" % backend
268 else:
269 adapter = api(repository)
270
271
272 if result:
273
274 log.write(repository_id = repository.id,
275 resource_name = tablename,
276 transmission = log.OUT,
277 mode = log.PUSH,
278 action = None,
279 remote = False,
280 result = result,
281 message = msg,
282 )
283 return (error, None)
284
285
286 from ..s3import import S3ImportItem
287 update_policy = task.update_policy
288 if not update_policy:
289 update_policy = S3ImportItem.POLICY.NEWER
290 if update_policy not in ("THIS", "OTHER"):
291 msince = task.last_push
292 else:
293 msince = None
294
295
296 filters = current.sync.get_filters(task.id)
297
298
299 error = None
300 mtime = None
301
302 action = "data export"
303 output = adapter.send(resource,
304 msince = msince,
305 filters = filters,
306 mixed = mixed,
307 pretty_print = task.human_readable,
308 )
309
310 status = output.get("status")
311 if status in (log.ERROR, log.FATAL):
312 result = status
313 msg = output.get("message")
314 if not msg:
315 msg = "Error while exporting data"
316 error = msg
317 else:
318 response = output.get("response")
319
320 path = repository.path
321 if not os.path.exists(path):
322
323 try:
324 os.makedirs(path)
325 except OSError:
326 result = log.FATAL
327 error = msg = sys.exc_info()[1]
328
329 if not error:
330 try:
331 action = "open %s" % outfile
332 with open(outfile, "w") as target:
333 target.write(response)
334 except IOError:
335 result = log.FATAL
336 error = msg = sys.exc_info()[1]
337 else:
338 result = log.SUCCESS
339 msg = "Data successfully written to %s" % outfile
340 if resource:
341 msg = "%s (%s records)" % (msg, resource.results)
342 mtime = resource.muntil
343 else:
344 mtime = current.request.utcnow
345
346
347 log.write(repository_id = repository.id,
348 resource_name = task.resource_name,
349 transmission = log.OUT,
350 mode = log.PUSH,
351 action = action,
352 remote = False,
353 result = result,
354 message = msg,
355 )
356
357 return (error, mtime)
358
359
360 - def send(self,
361 resource,
362 start=None,
363 limit=None,
364 msince=None,
365 filters=None,
366 mixed=False,
367 pretty_print=False):
368 """
369 Respond to an incoming pull from the peer repository
370
371 @param resource: the resource to be synchronized
372 @param start: index of the first record to send
373 @param limit: maximum number of records to send
374 @param msince: minimum modification date/time for records to send
375 @param filters: URL filters for record extraction
376 @param mixed: negotiate resource with peer (disregard resource)
377 @param pretty_print: make the output human-readable
378
379 @return: a dict {status, remote, message, response}, with:
380 - status....the outcome of the operation
381 - remote....whether the error was remote (or local)
382 - message...the log message
383 - response..the response to send to the peer
384 """
385
386 msg = "Send not supported for this repository type"
387
388 return {"status": self.log.FATAL,
389 "remote": False,
390 "message": msg,
391 "response": None,
392 }
393
394
395 - def receive(self,
396 source,
397 resource,
398 strategy=None,
399 update_policy=None,
400 conflict_policy=None,
401 onconflict=None,
402 last_sync=None,
403 mixed=False):
404 """
405 Respond to an incoming push from the peer repository
406
407 @param source: the input stream (list of file-like objects)
408 @param resource: the target resource
409 @param strategy: the import strategy
410 @param update_policy: the update policy
411 @param conflict_policy: the conflict resolution policy
412 @param onconflict: callback for conflict resolution
413 @param last_sync: the last synchronization date/time for the peer
414 @param mixed: negotiate resource with peer (disregard resource)
415
416 @return: a dict {status, remote, message, response}, with:
417 - status....the outcome of the operation
418 - remote....whether the error was remote (or local)
419 - message...the log message
420 - response..the response to send to the peer
421 """
422
423 msg = "Receive not supported for this repository type"
424
425 return {"status": self.log.FATAL,
426 "remote": False,
427 "message": msg,
428 "response": None,
429 }
430
431
470
471
473 """
474 Helper function to construct the output file name from
475 the repository path and the output file name pattern
476
477 @param task: the synchronization task
478 @return: the output file name, or None if either
479 path or pattern are missing
480 """
481
482 path = self.repository.path
483 if not os.path.isabs(path):
484 path = os.path.join(current.request.folder, path)
485
486 pattern = task.outfile_pattern
487
488 if not path or not pattern:
489 return None
490
491
492 from string import Template
493 template = Template(pattern).safe_substitute(
494 year="%(y)04d",
495 month="%(m)02d",
496 day="%(d)02d",
497 hour="%(H)02d",
498 minute="%(M)02d",
499 second="%(S)02d",
500 timestamp="%(y)04d%(m)02d%(d)02d%(H)02d%(M)02d%(S)02d"
501 )
502
503
504 now = current.request.utcnow
505 filename = template % {"y": now.year,
506 "m": now.month,
507 "d": now.day,
508 "H": now.hour,
509 "M": now.minute,
510 "S": now.second,
511 }
512
513
514 outfile = os.path.join(path, filename)
515
516 return outfile
517
518
519