Browse code

Implemented S3->local sync

* s3cmd: Implemented cmd_sync_remote2local() for restoring
backup from S3 to a local filesystem
* S3/S3.py: S3.object_get_uri() now requires writable stream
and not a path name.
* S3/Utils.py: Added mkdir_with_parents()


git-svn-id: https://s3tools.svn.sourceforge.net/svnroot/s3tools/s3cmd/trunk@181 830e0280-6d2a-0410-9c65-932aecc39d9d

Michal Ludvig authored on 2008/06/04 19:21:29
Showing 5 changed files
... ...
@@ -1,5 +1,13 @@
1 1
 2008-06-04  Michal Ludvig  <michal@logix.cz>
2 2
 
3
+	* s3cmd: Implemented cmd_sync_remote2local() for restoring
4
+	  backup from S3 to a local filesystem
5
+	* S3/S3.py: S3.object_get_uri() now requires writable stream 
6
+	  and not a path name.
7
+	* S3/Utils.py: Added mkdir_with_parents()
8
+
9
+2008-06-04  Michal Ludvig  <michal@logix.cz>
10
+
3 11
 	* s3cmd: Refactored cmd_sync() in preparation 
4 12
 	  for remote->local sync.
5 13
 
... ...
@@ -37,6 +37,9 @@ class S3Error (Exception):
37 37
 class S3UploadError(Exception):
38 38
 	pass
39 39
 
40
+class S3DownloadError(Exception):
41
+	pass
42
+
40 43
 class ParameterError(Exception):
41 44
 	pass
42 45
 
... ...
@@ -175,36 +175,25 @@ class S3(object):
175 175
 		response = self.send_file(request, file)
176 176
 		return response
177 177
 
178
-	def object_get_file(self, bucket, object, filename):
179
-		try:
180
-			stream = open(filename, "wb")
181
-		except IOError, e:
182
-			raise ParameterError("%s: %s" % (filename, e.strerror))
183
-		return self.object_get_stream(bucket, object, stream)
184
-
185
-	def object_get_stream(self, bucket, object, stream):
186
-		request = self.create_request("OBJECT_GET", bucket = bucket, object = object)
178
+	def object_get_uri(self, uri, stream):
179
+		if uri.type != "s3":
180
+			raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
181
+		request = self.create_request("OBJECT_GET", bucket = uri.bucket(), object = uri.object())
187 182
 		response = self.recv_file(request, stream)
188 183
 		return response
189
-		
184
+
190 185
 	def object_delete(self, bucket, object):
191 186
 		request = self.create_request("OBJECT_DELETE", bucket = bucket, object = object)
192 187
 		response = self.send_request(request)
193 188
 		return response
194 189
 
195 190
 	def object_put_uri(self, filename, uri, extra_headers = None):
191
+		# TODO TODO
192
+		# Make it consistent with stream-oriented object_get_uri()
196 193
 		if uri.type != "s3":
197 194
 			raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
198 195
 		return self.object_put(filename, uri.bucket(), uri.object(), extra_headers)
199 196
 
200
-	def object_get_uri(self, uri, filename):
201
-		if uri.type != "s3":
202
-			raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
203
-		if filename == "-":
204
-			return self.object_get_stream(uri.bucket(), uri.object(), sys.stdout)
205
-		else:
206
-			return self.object_get_file(uri.bucket(), uri.object(), filename)
207
-
208 197
 	def object_delete_uri(self, uri):
209 198
 		if uri.type != "s3":
210 199
 			raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
... ...
@@ -11,6 +11,8 @@ import random
11 11
 import md5
12 12
 import errno
13 13
 
14
+from logging import debug, info, warning, error
15
+
14 16
 try:
15 17
 	import xml.etree.ElementTree as ET
16 18
 except ImportError:
... ...
@@ -140,3 +142,29 @@ def hash_file_md5(filename):
140 140
 	h.update(f.read())
141 141
 	f.close()
142 142
 	return h.hexdigest()
143
+
144
+def mkdir_with_parents(dir_name, mode):
145
+	"""
146
+	mkdir_with_parents(dst_dir, mode)
147
+	
148
+	Create directory 'dir_name' with all parent directories
149
+
150
+	Returns True on success, False otherwise.
151
+	"""
152
+	pathmembers = dir_name.split(os.sep)
153
+	tmp_stack = []
154
+	while pathmembers and not os.path.isdir(os.sep.join(pathmembers)):
155
+		tmp_stack.append(pathmembers.pop())
156
+	while tmp_stack:
157
+		pathmembers.append(tmp_stack.pop())
158
+		cur_dir = os.sep.join(pathmembers)
159
+		try:
160
+			debug("mkdir(%s)" % cur_dir)
161
+			os.mkdir(cur_dir)
162
+		except IOError, e:
163
+			error("%s: can not make directory: %s" % (cur_dir, e.strerror))
164
+			return False
165
+		except Exception, e:
166
+			error("%s: %s" % (cur_dir, e))
167
+			return False
168
+	return True
... ...
@@ -243,10 +243,19 @@ def cmd_object_get(args):
243 243
 		else:
244 244
 			# By default the destination filename is the object name
245 245
 			destination = uri.object()
246
-
247
-		if not Config().force and os.path.exists(destination):
248
-			raise ParameterError("File %s already exists. Use --force to overwrite it" % destination)
249
-		response = s3.object_get_uri(uri, destination)
246
+		if destination == "-":
247
+			## stdout
248
+			dst_stream = sys.stdout
249
+		else:
250
+			## File
251
+			if not Config().force and os.path.exists(destination):
252
+				raise ParameterError("File %s already exists. Use --force to overwrite it" % destination)
253
+			try:
254
+				dst_stream = open(destination, "wb")
255
+			except IOError, e:
256
+				error("Skipping %s: %s" % (destination, e.strerror))
257
+				continue
258
+		response = s3.object_get_uri(uri, dst_stream)
250 259
 		if response["headers"].has_key("x-amz-meta-s3tools-gpgenc"):
251 260
 			gpg_decrypt(destination, response["headers"]["x-amz-meta-s3tools-gpgenc"])
252 261
 			response["size"] = os.stat(destination)[6]
... ...
@@ -383,7 +392,106 @@ def _compare_filelists(src_list, dst_list, src_is_local_and_dst_is_remote):
383 383
 	return src_list, dst_list, exists_list
384 384
 
385 385
 def cmd_sync_remote2local(src, dst):
386
-	raise NotImplementedError("Remote->Local sync is not yet implemented.") 
386
+	def _parse_attrs_header(attrs_header):
387
+		attrs = {}
388
+		for attr in attrs_header.split("/"):
389
+			key, val = attr.split(":")
390
+			attrs[key] = val
391
+		return attrs
392
+		
393
+	s3 = S3(Config())
394
+
395
+	src_uri = S3Uri(src)
396
+	dst_uri = S3Uri(dst)
397
+
398
+	rem_list = _get_filelist_remote(src_uri)
399
+	rem_count = len(rem_list)
400
+
401
+	loc_list = _get_filelist_local(dst_uri)
402
+	loc_count = len(loc_list)
403
+	
404
+	output("Found %d remote files, %d local files" % (rem_count, loc_count))
405
+
406
+	_compare_filelists(rem_list, loc_list, False)
407
+
408
+	output("Summary: %d remote files to download, %d local files to delete" % (len(rem_list), len(loc_list)))
409
+
410
+	for file in loc_list:
411
+		if cfg.delete_removed:
412
+			# os.unlink(file)
413
+			output("deleted '%s'" % file)
414
+		else:
415
+			output("not-deleted '%s'" % file)
416
+
417
+	total_size = 0
418
+	total_count = len(rem_list)
419
+	total_elapsed = 0.0
420
+	timestamp_start = time.time()
421
+	seq = 0
422
+	dir_cache = {}
423
+	src_base = src_uri.uri()
424
+	dst_base = dst_uri.path()
425
+	if not src_base[-1] == "/": src_base += "/"
426
+	file_list = rem_list.keys()
427
+	file_list.sort()
428
+	for file in file_list:
429
+		seq += 1
430
+		uri = S3Uri(src_base + file)
431
+		dst_file = dst_base + file
432
+		try:
433
+			dst_dir = os.path.dirname(dst_file)
434
+			if not dir_cache.has_key(dst_dir):
435
+				dir_cache[dst_dir] = Utils.mkdir_with_parents(dst_dir, mode = 022)
436
+			if dir_cache[dst_dir] == False:
437
+				warning("%s: destination directory not writable: %s" % (file, dst_dir))
438
+				continue
439
+			try:
440
+				open_flags = os.O_CREAT
441
+				if cfg.force:
442
+					open_flags |= os.O_TRUNC
443
+				else:
444
+					open_flags |= os.O_EXCL
445
+
446
+				debug("dst_file=%s" % dst_file)
447
+				# This will have failed should the file exist
448
+				os.open(dst_file, open_flags)
449
+				# Yeah I know there is a race condition here. Sadly I don't know how to open() in exclusive mode.
450
+				dst_stream = open(dst_file, "wb")
451
+				response = s3.object_get_uri(uri, dst_stream)
452
+				dst_stream.close()
453
+				if response['headers'].has_key('x-amz-meta-s3cmd-attrs') and cfg.preserve_attrs:
454
+					attrs = _parse_attrs_header(response['headers']['x-amz-meta-s3cmd-attrs'])
455
+					if attrs.has_key('mode'):
456
+						os.chmod(dst_file, int(attrs['mode']))
457
+					## FIXME: uid/gid and mtime/ctime handling comes here! TODO
458
+			except OSError, e:
459
+				if e.errno == errno.EEXIST:
460
+					warning("%s exists - not overwriting" % (dst_file))
461
+					continue
462
+				raise
463
+			except IOError, e:
464
+				## See if it's missing path and try again
465
+				error("%s: %s" % (file, e))
466
+				continue
467
+			finally:
468
+				## Close the file if still open. Don't care if not.
469
+				try:
470
+					dst_stream.close()
471
+				except:
472
+					pass
473
+		except S3DownloadError, e:
474
+			error("%s: download failed too many times. Skipping that file." % file)
475
+			continue
476
+		speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
477
+		output("File '%s' stored as %s (%d bytes in %0.1f seconds, %0.2f %sB/s) [%d of %d]" %
478
+			(uri, dst_file, response["size"], response["elapsed"], speed_fmt[0], speed_fmt[1],
479
+			seq, total_count))
480
+		total_size += response["size"]
481
+
482
+	total_elapsed = time.time() - timestamp_start
483
+	speed_fmt = formatSize(total_size/total_elapsed, human_readable = True, floating_point = True)
484
+	output("Done. Downloaded %d bytes in %0.1f seconds, %0.2f %sB/s" % 
485
+	       (total_size, total_elapsed, speed_fmt[0], speed_fmt[1]))
387 486
 
388 487
 def cmd_sync_local2remote(src, dst):
389 488
 	def _build_attr_header(src):
... ...
@@ -427,6 +535,7 @@ def cmd_sync_local2remote(src, dst):
427 427
 	_compare_filelists(loc_list, rem_list, True)
428 428
 
429 429
 	output("Summary: %d local files to upload, %d remote files to delete" % (len(loc_list), len(rem_list)))
430
+
430 431
 	for file in rem_list:
431 432
 		uri = S3Uri("s3://" + dst_uri.bucket()+"/"+rem_list[file]['object_key'])
432 433
 		if cfg.delete_removed:
... ...
@@ -454,7 +563,7 @@ def cmd_sync_local2remote(src, dst):
454 454
 		try:
455 455
 			response = s3.object_put_uri(src, uri, attr_header)
456 456
 		except S3UploadError, e:
457
-			error("Upload of '%s' failed too many times. Skipping that file." % src)
457
+			error("%s: upload failed too many times. Skipping that file." % src)
458 458
 			continue
459 459
 		speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
460 460
 		output("File '%s' stored as %s (%d bytes in %0.1f seconds, %0.2f %sB/s) [%d of %d]" %
... ...
@@ -473,6 +582,9 @@ def cmd_sync(args):
473 473
 	if (len(args)):
474 474
 		raise ParameterError("Too many parameters! Expected: %s" % commands['sync']['param'])
475 475
 
476
+	if S3Uri(src).type == "s3" and not src.endswith('/'):
477
+		src += "/"
478
+
476 479
 	if not dst.endswith('/'):
477 480
 		dst += "/"
478 481