Browse code

* S3/S3.py, s3cmd, S3/Config.py, s3cmd.1: Added --continue for 'get' command, improved 'get' failure resiliency.

git-svn-id: https://s3tools.svn.sourceforge.net/svnroot/s3tools/s3cmd/trunk@267 830e0280-6d2a-0410-9c65-932aecc39d9d

Michal Ludvig authored on 2008/11/25 14:38:01
Showing 6 changed files
... ...
@@ -1,5 +1,7 @@
1 1
 2008-11-24  Michal Ludvig  <michal@logix.cz>
2 2
 
3
+	* S3/S3.py, s3cmd, S3/Config.py, s3cmd.1: Added --continue for
4
+	  'get' command, improved 'get' failure resiliency.
3 5
 	* S3/Progress.py: Support for progress meter not starting in 0.
4 6
 	* S3/S3.py: improved retrying in send_request() and send_file()
5 7
 
... ...
@@ -6,6 +6,9 @@ s3cmd 0.9.9   -   ???
6 6
   prefix with --recursive (-r)
7 7
 * Copying and moving objects, within or between buckets.
8 8
   (Andrew Ryan)
9
+* Continue getting partially downloaded files with --continue
10
+* Improved resistance to communication errors (Connection 
11
+  reset by peer, etc.)
9 12
 
10 13
 s3cmd 0.9.8.4 -   2008-11-07
11 14
 =============
... ...
@@ -24,6 +24,7 @@ class Config(object):
24 24
 	recv_chunk = 4096
25 25
 	human_readable_sizes = False
26 26
 	force = False
27
+	get_continue = False
27 28
 	recursive = False
28 29
 	acl_public = False
29 30
 	proxy_host = ""
... ...
@@ -189,11 +189,11 @@ class S3(object):
189 189
 		response = self.send_file(request, file)
190 190
 		return response
191 191
 
192
-	def object_get(self, uri, stream):
192
+	def object_get(self, uri, stream, start_position):
193 193
 		if uri.type != "s3":
194 194
 			raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
195 195
 		request = self.create_request("OBJECT_GET", uri = uri)
196
-		response = self.recv_file(request, stream)
196
+		response = self.recv_file(request, stream, start_position)
197 197
 		return response
198 198
 
199 199
 	def object_delete(self, uri):
... ...
@@ -399,6 +399,8 @@ class S3(object):
399 399
 				conn.putheader(header, str(headers[header]))
400 400
 			conn.endheaders()
401 401
 		except Exception, e:
402
+			if self.config.progress_meter:
403
+				progress.done("failed")
402 404
 			if retries:
403 405
 				warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
404 406
 				warning("Waiting %d sec..." % self._fail_wait(retries))
... ...
@@ -406,7 +408,7 @@ class S3(object):
406 406
 				# Connection error -> same throttle value
407 407
 				return self.send_file(request, file, throttle, retries - 1)
408 408
 			else:
409
-				raise S3UploadError("Request failed for: %s" % resource['uri'])
409
+				raise S3UploadError("Upload failed for: %s" % resource['uri'])
410 410
 		file.seek(0)
411 411
 		md5_hash = md5.new()
412 412
 		try:
... ...
@@ -414,20 +416,12 @@ class S3(object):
414 414
 				debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name))
415 415
 				data = file.read(self.config.send_chunk)
416 416
 				md5_hash.update(data)
417
+				conn.send(data)
417 418
 				if self.config.progress_meter:
418 419
 					progress.update(delta_position = len(data))
419
-				else:
420
-					debug("SendFile: Sending %d bytes to the server" % len(data))
421
-				conn.send(data)
422
-        
423 420
 				size_left -= len(data)
424 421
 				if throttle:
425 422
 					time.sleep(throttle)
426
-				## Call progress meter from here
427
-				debug("Sent %d bytes (%d %% of %d)" % (
428
-					(size_total - size_left),
429
-					(size_total - size_left) * 100 / size_total,
430
-					size_total))
431 423
 			md5_computed = md5_hash.hexdigest()
432 424
 			response = {}
433 425
 			http_response = conn.getresponse()
... ...
@@ -442,7 +436,7 @@ class S3(object):
442 442
 				progress.done("failed")
443 443
 			if retries:
444 444
 				throttle = throttle and throttle * 5 or 0.01
445
-				warning("Request failed: %s (%s)" % (resource['uri'], e))
445
+				warning("Upload failed: %s (%s)" % (resource['uri'], e))
446 446
 				warning("Retrying on lower speed (throttle=%0.2f)" % throttle)
447 447
 				warning("Waiting %d sec..." % self._fail_wait(retries))
448 448
 				time.sleep(self._fail_wait(retries))
... ...
@@ -450,7 +444,7 @@ class S3(object):
450 450
 				return self.send_file(request, file, throttle, retries - 1)
451 451
 			else:
452 452
 				debug("Giving up on '%s' %s" % (file.name, e))
453
-				raise S3UploadError("Request failed for: %s" % resource['uri'])
453
+				raise S3UploadError("Upload failed for: %s" % resource['uri'])
454 454
 
455 455
 		timestamp_end = time.time()
456 456
 		response["elapsed"] = timestamp_end - timestamp_start
... ...
@@ -490,24 +484,40 @@ class S3(object):
490 490
 			raise S3Error(response)
491 491
 		return response
492 492
 
493
-	def recv_file(self, request, stream):
493
+	def recv_file(self, request, stream, start_position = 0, retries = _max_retries):
494 494
 		method_string, resource, headers = request
495 495
 		if self.config.progress_meter:
496 496
 			progress = self.config.progress_class(stream.name, 0)
497 497
 		else:
498 498
 			info("Receiving file '%s', please wait..." % stream.name)
499 499
 		timestamp_start = time.time()
500
-		conn = self.get_connection(resource['bucket'])
501
-		conn.connect()
502
-		conn.putrequest(method_string, self.format_uri(resource))
503
-		for header in headers.keys():
504
-			conn.putheader(header, str(headers[header]))
505
-		conn.endheaders()
506
-		response = {}
507
-		http_response = conn.getresponse()
508
-		response["status"] = http_response.status
509
-		response["reason"] = http_response.reason
510
-		response["headers"] = convertTupleListToDict(http_response.getheaders())
500
+		try:
501
+			conn = self.get_connection(resource['bucket'])
502
+			conn.connect()
503
+			conn.putrequest(method_string, self.format_uri(resource))
504
+			for header in headers.keys():
505
+				conn.putheader(header, str(headers[header]))
506
+			if start_position > 0:
507
+				debug("Requesting Range: %d .. end" % start_position)
508
+				conn.putheader("Range", "bytes=%d-" % start_position)
509
+			conn.endheaders()
510
+			response = {}
511
+			http_response = conn.getresponse()
512
+			response["status"] = http_response.status
513
+			response["reason"] = http_response.reason
514
+			response["headers"] = convertTupleListToDict(http_response.getheaders())
515
+			debug("Response: %s" % response)
516
+		except Exception, e:
517
+			if self.config.progress_meter:
518
+				progress.done("failed")
519
+			if retries:
520
+				warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
521
+				warning("Waiting %d sec..." % self._fail_wait(retries))
522
+				time.sleep(self._fail_wait(retries))
523
+				# Connection error -> same throttle value
524
+				return self.recv_file(request, stream, start_position, retries - 1)
525
+			else:
526
+				raise S3DownloadError("Download failed for: %s" % resource['uri'])
511 527
 
512 528
 		if response["status"] == 307:
513 529
 			## RedirectPermanent
... ...
@@ -521,38 +531,67 @@ class S3(object):
521 521
 		if response["status"] < 200 or response["status"] > 299:
522 522
 			raise S3Error(response)
523 523
 
524
-		md5_hash = md5.new()
525
-		size_left = size_total = int(response["headers"]["content-length"])
524
+		if start_position == 0:
525
+			# Only compute MD5 on the fly if we're downloading from beginning
526
+			# Otherwise we'd get a nonsense.
527
+			md5_hash = md5.new()
528
+		size_left = int(response["headers"]["content-length"])
529
+		size_total = start_position + size_left
530
+		current_position = start_position
531
+
526 532
 		if self.config.progress_meter:
527 533
 			progress.total_size = size_total
528
-		size_recvd = 0
529
-		while (size_recvd < size_total):
530
-			this_chunk = size_left > self.config.recv_chunk and self.config.recv_chunk or size_left
531
-			debug("ReceiveFile: Receiving up to %d bytes from the server" % this_chunk)
532
-			data = http_response.read(this_chunk)
533
-			debug("ReceiveFile: Writing %d bytes to file '%s'" % (len(data), stream.name))
534
-			stream.write(data)
535
-			md5_hash.update(data)
536
-			size_recvd += len(data)
537
-			## Call progress meter from here...
534
+			progress.initial_position = current_position
535
+			progress.current_position = current_position
536
+
537
+		try:
538
+			while (current_position < size_total):
539
+				this_chunk = size_left > self.config.recv_chunk and self.config.recv_chunk or size_left
540
+				data = http_response.read(this_chunk)
541
+				stream.write(data)
542
+				if start_position == 0:
543
+					md5_hash.update(data)
544
+				current_position += len(data)
545
+				## Call progress meter from here...
546
+				if self.config.progress_meter:
547
+					progress.update(delta_position = len(data))
548
+			conn.close()
549
+		except Exception, e:
538 550
 			if self.config.progress_meter:
539
-				progress.update(delta_position = len(data))
551
+				progress.done("failed")
552
+			if retries:
553
+				warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
554
+				warning("Waiting %d sec..." % self._fail_wait(retries))
555
+				time.sleep(self._fail_wait(retries))
556
+				# Connection error -> same throttle value
557
+				return self.recv_file(request, stream, current_position, retries - 1)
540 558
 			else:
541
-				debug("Received %d bytes (%d %% of %d)" % (
542
-					size_recvd,
543
-					size_recvd * 100 / size_total,
544
-					size_total))
545
-		conn.close()
559
+				raise S3DownloadError("Download failed for: %s" % resource['uri'])
560
+
561
+		stream.flush()
546 562
 		progress.done("done")
547 563
 		timestamp_end = time.time()
548
-		response["md5"] = md5_hash.hexdigest()
564
+
565
+		if start_position == 0:
566
+			# Only compute MD5 on the fly if we were downloading from the beginning
567
+			response["md5"] = md5_hash.hexdigest()
568
+		else:
569
+			# Otherwise try to compute MD5 of the output file
570
+			try:
571
+				response["md5"] = hash_file_md5(stream.name)
572
+			except IOError, e:
573
+				if e.errno != errno.ENOENT:
574
+					warning("Unable to open file: %s: %s" % (stream.name, e))
575
+				warning("Unable to verify MD5. Assume it matches.")
576
+				response["md5"] = response["headers"]["etag"]
577
+
549 578
 		response["md5match"] = response["headers"]["etag"].find(response["md5"]) >= 0
550 579
 		response["elapsed"] = timestamp_end - timestamp_start
551
-		response["size"] = size_recvd
580
+		response["size"] = current_position
552 581
 		response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
553
-		if response["size"] != long(response["headers"]["content-length"]):
582
+		if response["size"] != start_position + long(response["headers"]["content-length"]):
554 583
 			warning("Reported size (%s) does not match received size (%s)" % (
555
-				response["headers"]["content-length"], response["size"]))
584
+				start_position + response["headers"]["content-length"], response["size"]))
556 585
 		debug("ReceiveFile: Computed MD5 = %s" % response["md5"])
557 586
 		if not response["md5match"]:
558 587
 			warning("MD5 signatures do not match: computed=%s, received=%s" % (
... ...
@@ -253,6 +253,7 @@ def cmd_object_get(args):
253 253
 		uri_arg = args.pop(0)
254 254
 		uri = S3Uri(uri_arg)
255 255
 
256
+		start_position = 0
256 257
 		if destination_file:
257 258
 			destination = destination_file
258 259
 		elif destination_dir:
... ...
@@ -265,14 +266,21 @@ def cmd_object_get(args):
265 265
 			dst_stream = sys.stdout
266 266
 		else:
267 267
 			## File
268
-			if not Config().force and os.path.exists(destination):
269
-				raise ParameterError("File %s already exists. Use --force to overwrite it" % destination)
270 268
 			try:
271
-				dst_stream = open(destination, "wb")
269
+				dst_stream = open(destination, "ab")
270
+				if os.path.exists(destination):
271
+					if Config().get_continue:
272
+						start_position = dst_stream.tell()
273
+					elif Config().force:
274
+						start_position = 0L
275
+						dst_stream.seek(0L)
276
+						dst_stream.truncate()
277
+					else:
278
+						raise ParameterError("File %s already exists. Use either --force or --continue or give it a new name." % destination)
272 279
 			except IOError, e:
273 280
 				error("Skipping %s: %s" % (destination, e.strerror))
274 281
 				continue
275
-		response = s3.object_get(uri, dst_stream)
282
+		response = s3.object_get(uri, dst_stream, start_position = start_position)
276 283
 		if response["headers"].has_key("x-amz-meta-s3tools-gpgenc"):
277 284
 			gpg_decrypt(destination, response["headers"]["x-amz-meta-s3tools-gpgenc"])
278 285
 			response["size"] = os.stat(destination)[6]
... ...
@@ -965,6 +973,7 @@ def main():
965 965
 	optparser.add_option("-e", "--encrypt", dest="encrypt", action="store_true", help="Encrypt files before uploading to S3.")
966 966
 	optparser.add_option(      "--no-encrypt", dest="encrypt", action="store_false", help="Don't encrypt files.")
967 967
 	optparser.add_option("-f", "--force", dest="force", action="store_true", help="Force overwrite and other dangerous operations.")
968
+	optparser.add_option(      "--continue", dest="get_continue", action="store_true", help="Continue getting a partially downloaded file (only for [get] command).")
968 969
 	optparser.add_option("-r", "--recursive", dest="recursive", action="store_true", help="Recursive upload, download or removal.")
969 970
 	optparser.add_option("-P", "--acl-public", dest="acl_public", action="store_true", help="Store objects with ACL allowing read for anyone.")
970 971
 	optparser.add_option(      "--acl-private", dest="acl_public", action="store_false", help="Store objects with default ACL allowing access for you only.")
... ...
@@ -113,6 +113,9 @@ Options common for all commands (where it makes sense indeed):
113 113
 \fB\-f\fR, \fB\-\-force\fR
114 114
 Force overwrite and other dangerous operations.
115 115
 .TP
116
+\fB\-\-continue\fR
117
+Continue getting a partially downloaded file (only for \fIget\fR command). This comes handy once download of a large file, say an ISO image, from a S3 bucket fails and a partially downloaded file is left on the disk. Unfortunately \fIput\fR command doesn't support restarting of failed upload due to Amazon S3 limitation.
118
+.TP
116 119
 \fB\-P\fR, \fB\-\-acl\-public\fR
117 120
 Store objects with permissions allowing read for anyone.
118 121
 .TP