Browse code

* s3cmd, S3/S3.py, S3/Utils.py: Throttle upload speed and retry when upload failed. Report download/upload speed and time elapsed.

git-svn-id: https://s3tools.svn.sourceforge.net/svnroot/s3tools/s3cmd/trunk@172 830e0280-6d2a-0410-9c65-932aecc39d9d

Michal Ludvig authored on 2008/03/04 21:48:10
Showing 4 changed files
... ...
@@ -1,3 +1,9 @@
1
+2008-03-05  Michal Ludvig  <michal@logix.cz>
2
+
3
+	* s3cmd, S3/S3.py, S3/Utils.py: Throttle upload speed and retry 
4
+	  when upload failed.
5
+	  Report download/upload speed and time elapsed.
6
+
1 7
 2008-02-28  Michal Ludvig  <michal@logix.cz>
2 8
 
3 9
 	* Released version 0.9.6
... ...
@@ -44,6 +44,9 @@ class S3Error (Exception):
44 44
 			pass
45 45
 		return retval
46 46
 
47
+class S3UploadError(Exception):
48
+	pass
49
+
47 50
 class ParameterError(Exception):
48 51
 	pass
49 52
 
... ...
@@ -199,7 +202,6 @@ class S3(object):
199 199
 			headers["x-amz-acl"] = "public-read"
200 200
 		request = self.create_request("OBJECT_PUT", bucket = bucket, object = object, headers = headers)
201 201
 		response = self.send_file(request, file)
202
-		response["size"] = size
203 202
 		return response
204 203
 
205 204
 	def object_get_file(self, bucket, object, filename):
... ...
@@ -359,7 +361,7 @@ class S3(object):
359 359
 			raise S3Error(response)
360 360
 		return response
361 361
 
362
-	def send_file(self, request, file):
362
+	def send_file(self, request, file, throttle = 0, retries = 3):
363 363
 		method_string, resource, headers = request
364 364
 		info("Sending file '%s', please wait..." % file.name)
365 365
 		conn = self.get_connection(resource['bucket'])
... ...
@@ -369,23 +371,43 @@ class S3(object):
369 369
 			conn.putheader(header, str(headers[header]))
370 370
 		conn.endheaders()
371 371
 		file.seek(0)
372
+		timestamp_start = time.time()
372 373
 		size_left = size_total = headers.get("content-length")
373 374
 		while (size_left > 0):
374 375
 			debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name))
375 376
 			data = file.read(self.config.send_chunk)
376 377
 			debug("SendFile: Sending %d bytes to the server" % len(data))
377
-			conn.send(data)
378
+			try:
379
+				conn.send(data)
380
+			except Exception, e:
381
+				## When an exception occurs insert a 
382
+				if retries:
383
+					conn.close()
384
+					warning("Upload of '%s' failed %s " % (file.name, e))
385
+					throttle = throttle and throttle * 5 or 0.01
386
+					warning("Retrying on lower speed (throttle=%0.2f)" % throttle)
387
+					return self.send_file(request, file, throttle, retries - 1)
388
+				else:
389
+					debug("Giving up on '%s' %s" % (file.name, e))
390
+					raise S3UploadError
391
+
378 392
 			size_left -= len(data)
393
+			if throttle:
394
+				time.sleep(throttle)
379 395
 			info("Sent %d bytes (%d %% of %d)" % (
380 396
 				(size_total - size_left),
381 397
 				(size_total - size_left) * 100 / size_total,
382 398
 				size_total))
399
+		timestamp_end = time.time()
383 400
 		response = {}
384 401
 		http_response = conn.getresponse()
385 402
 		response["status"] = http_response.status
386 403
 		response["reason"] = http_response.reason
387 404
 		response["headers"] = convertTupleListToDict(http_response.getheaders())
388
-		response["data"] =  http_response.read()
405
+		response["data"] = http_response.read()
406
+		response["elapsed"] = timestamp_end - timestamp_start
407
+		response["size"] = size_total
408
+		response["speed"] = float(response["size"]) / response["elapsed"]
389 409
 		conn.close()
390 410
 
391 411
 		if response["status"] == 307:
... ...
@@ -430,6 +452,7 @@ class S3(object):
430 430
 		md5_hash = md5.new()
431 431
 		size_left = size_total = int(response["headers"]["content-length"])
432 432
 		size_recvd = 0
433
+		timestamp_start = time.time()
433 434
 		while (size_recvd < size_total):
434 435
 			this_chunk = size_left > self.config.recv_chunk and self.config.recv_chunk or size_left
435 436
 			debug("ReceiveFile: Receiving up to %d bytes from the server" % this_chunk)
... ...
@@ -443,9 +466,12 @@ class S3(object):
443 443
 				size_recvd * 100 / size_total,
444 444
 				size_total))
445 445
 		conn.close()
446
+		timestamp_end = time.time()
446 447
 		response["md5"] = md5_hash.hexdigest()
447 448
 		response["md5match"] = response["headers"]["etag"].find(response["md5"]) >= 0
449
+		response["elapsed"] = timestamp_end - timestamp_start
448 450
 		response["size"] = size_recvd
451
+		response["speed"] = float(response["size"]) / response["elapsed"]
449 452
 		if response["size"] != long(response["headers"]["content-length"]):
450 453
 			warning("Reported size (%s) does not match received size (%s)" % (
451 454
 				response["headers"]["content-length"], response["size"]))
... ...
@@ -79,8 +79,8 @@ def dateS3toUnix(date):
79 79
 	## treats it as "localtime". Anyway...
80 80
 	return time.mktime(dateS3toPython(date))
81 81
 
82
-def formatSize(size, human_readable = False):
83
-	size = int(size)
82
+def formatSize(size, human_readable = False, floating_point = False):
83
+	size = floating_point and float(size) or int(size)
84 84
 	if human_readable:
85 85
 		coeffs = ['k', 'M', 'G', 'T']
86 86
 		coeff = ""
... ...
@@ -190,9 +190,14 @@ def cmd_object_put(args):
190 190
 		real_filename = file
191 191
 		if Config().encrypt:
192 192
 			exitcode, real_filename, extra_headers["x-amz-meta-s3tools-gpgenc"] = gpg_encrypt(file)
193
-		response = s3.object_put_uri(real_filename, uri_final, extra_headers)
194
-		output("File '%s' stored as %s (%d bytes) [%d of %d]" %
195
-			(file, uri_final, response["size"],
193
+		try:
194
+			response = s3.object_put_uri(real_filename, uri_final, extra_headers)
195
+		except S3UploadError, e:
196
+			error("Upload of '%s' failed too many times. Skipping that file." % real_filename)
197
+			continue
198
+		speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
199
+		output("File '%s' stored as %s (%d bytes in %0.1f seconds, %0.2f %sB/s) [%d of %d]" %
200
+			(file, uri_final, response["size"], response["elapsed"], speed_fmt[0], speed_fmt[1],
196 201
 			seq, total))
197 202
 		if Config().acl_public:
198 203
 			output("Public URL of the object is: %s" %
... ...
@@ -246,8 +251,9 @@ def cmd_object_get(args):
246 246
 			gpg_decrypt(destination, response["headers"]["x-amz-meta-s3tools-gpgenc"])
247 247
 			response["size"] = os.stat(destination)[6]
248 248
 		if destination != "-":
249
-			output("Object %s saved as '%s' (%d bytes)" %
250
-				(uri, destination, response["size"]))
249
+			speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
250
+			output("Object %s saved as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s)" %
251
+				(uri, destination, response["size"], response["elapsed"], speed_fmt[0], speed_fmt[1]))
251 252
 
252 253
 def cmd_object_del(args):
253 254
 	s3 = S3(Config())
... ...
@@ -413,6 +419,8 @@ def cmd_sync(args):
413 413
 
414 414
 	total_size = 0
415 415
 	total_count = len(loc_list)
416
+	total_elapsed = 0.0
417
+	timestamp_start = time.time()
416 418
 	seq = 0
417 419
 	dst_base = dst_uri.uri()
418 420
 	if not dst_base[-1] == "/": dst_base += "/"
... ...
@@ -425,10 +433,21 @@ def cmd_sync(args):
425 425
 		if cfg.preserve_attrs:
426 426
 			attr_header = _build_attr_header(src)
427 427
 			debug(attr_header)
428
-		response = s3.object_put_uri(src, uri, attr_header)
429
-		output("stored '%s' as '%s' (%d bytes) [%d of %d]" % (src, uri, response["size"], seq, total_count))
428
+		try:
429
+			response = s3.object_put_uri(src, uri, attr_header)
430
+		except S3UploadError, e:
431
+			error("Upload of '%s' failed too many times. Skipping that file." % src)
432
+			continue
433
+		speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
434
+		output("File '%s' stored as %s (%d bytes in %0.1f seconds, %0.2f %sB/s) [%d of %d]" %
435
+			(src, uri, response["size"], response["elapsed"], speed_fmt[0], speed_fmt[1],
436
+			seq, total_count))
430 437
 		total_size += response["size"]
431
-	output("Done. Uploaded %d bytes." % total_size)
438
+
439
+	total_elapsed = time.time() - timestamp_start
440
+	speed_fmt = formatSize(total_size/total_elapsed, human_readable = True, floating_point = True)
441
+	output("Done. Uploaded %d bytes in %0.1f seconds, %0.2f %sB/s" % 
442
+	       (total_size, total_elapsed, speed_fmt[0], speed_fmt[1]))
432 443
 
433 444
 def resolve_list(lst, args):
434 445
 	retval = []