S3/S3.py
ec50b5a7
 ## Amazon S3 manager
 ## Author: Michal Ludvig <michal@logix.cz>
 ##         http://www.logix.cz/michal
 ## License: GPL Version 2
 
f98a27f2
 import sys
f81e7fba
 import os, os.path
946e9636
 import time
f81e7fba
 import httplib
 import logging
eb31131e
 import mimetypes
cb0bbaef
 import re
f81e7fba
 from logging import debug, info, warning, error
 from stat import ST_SIZE
 
0b2aefe3
 try:
0b8ea559
 	from hashlib import md5
0b2aefe3
 except ImportError:
3c682455
 	from md5 import md5
0b2aefe3
 
f81e7fba
 from Utils import *
 from SortedDict import SortedDict
 from BidirMap import BidirMap
b008e471
 from Config import Config
6bb385f2
 from Exceptions import *
cb0bbaef
 from ACL import ACL, GranteeLogDelivery
 from AccessLog import AccessLog
 from S3Uri import S3Uri
f81e7fba
 
cb0bbaef
 __all__ = []
17901aa1
 class S3Request(object):
 	def __init__(self, s3, method_string, resource, headers, params = {}):
 		self.s3 = s3
dc1c96cf
 		self.headers = SortedDict(headers or {}, ignore_case = True)
17901aa1
 		self.resource = resource
 		self.method_string = method_string
 		self.params = params
 
 		self.update_timestamp()
 		self.sign()
 
 	def update_timestamp(self):
 		if self.headers.has_key("date"):
 			del(self.headers["date"])
 		self.headers["x-amz-date"] = time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime())
 
 	def format_param_str(self):
 		"""
 		Format URL parameters from self.params and returns
 		?parm1=val1&parm2=val2 or an empty string if there 
 		are no parameters.  Output of this function should 
 		be appended directly to self.resource['uri']
 		"""
 		param_str = ""
 		for param in self.params:
 			if self.params[param] not in (None, ""):
 				param_str += "&%s=%s" % (param, self.params[param])
 			else:
 				param_str += "&%s" % param
 		return param_str and "?" + param_str[1:]
 
 	def sign(self):
 		h  = self.method_string + "\n"
 		h += self.headers.get("content-md5", "")+"\n"
 		h += self.headers.get("content-type", "")+"\n"
 		h += self.headers.get("date", "")+"\n"
 		for header in self.headers.keys():
 			if header.startswith("x-amz-"):
 				h += header+":"+str(self.headers[header])+"\n"
 		if self.resource['bucket']:
 			h += "/" + self.resource['bucket']
 		h += self.resource['uri']
 		debug("SignHeaders: " + repr(h))
 		signature = sign_string(h)
 
 		self.headers["Authorization"] = "AWS "+self.s3.config.access_key+":"+signature
 
 	def get_triplet(self):
 		self.update_timestamp()
 		self.sign()
 		resource = dict(self.resource)	## take a copy
 		resource['uri'] += self.format_param_str()
 		return (self.method_string, resource, self.headers)
 
eb9c54ec
 class S3(object):
f81e7fba
 	http_methods = BidirMap(
 		GET = 0x01,
 		PUT = 0x02,
 		HEAD = 0x04,
 		DELETE = 0x08,
 		MASK = 0x0F,
 		)
 	
 	targets = BidirMap(
 		SERVICE = 0x0100,
 		BUCKET = 0x0200,
 		OBJECT = 0x0400,
 		MASK = 0x0700,
 		)
 
 	operations = BidirMap(
 		UNDFINED = 0x0000,
 		LIST_ALL_BUCKETS = targets["SERVICE"] | http_methods["GET"],
 		BUCKET_CREATE = targets["BUCKET"] | http_methods["PUT"],
 		BUCKET_LIST = targets["BUCKET"] | http_methods["GET"],
 		BUCKET_DELETE = targets["BUCKET"] | http_methods["DELETE"],
 		OBJECT_PUT = targets["OBJECT"] | http_methods["PUT"],
 		OBJECT_GET = targets["OBJECT"] | http_methods["GET"],
 		OBJECT_HEAD = targets["OBJECT"] | http_methods["HEAD"],
 		OBJECT_DELETE = targets["OBJECT"] | http_methods["DELETE"],
 	)
 
 	codes = {
 		"NoSuchBucket" : "Bucket '%s' does not exist",
 		"AccessDenied" : "Access to bucket '%s' was denied",
 		"BucketAlreadyExists" : "Bucket '%s' already exists",
 		}
 
bb6d07ee
 	## S3 sometimes sends HTTP-307 response 
 	redir_map = {}
 
946e9636
 	## Maximum attempts of re-issuing failed requests
 	_max_retries = 5
 
f81e7fba
 	def __init__(self, config):
 		self.config = config
 
dc758146
 	def get_connection(self, bucket):
afe194f8
 		if self.config.proxy_host != "":
 			return httplib.HTTPConnection(self.config.proxy_host, self.config.proxy_port)
 		else:
dc758146
 			if self.config.use_https:
 				return httplib.HTTPSConnection(self.get_hostname(bucket))
 			else:
 				return httplib.HTTPConnection(self.get_hostname(bucket))
afe194f8
 
dc758146
 	def get_hostname(self, bucket):
b020ea02
 		if bucket and check_bucket_name_dns_conformity(bucket):
bb6d07ee
 			if self.redir_map.has_key(bucket):
 				host = self.redir_map[bucket]
 			else:
b020ea02
 				host = getHostnameFromBucket(bucket)
dc758146
 		else:
 			host = self.config.host_base
bb6d07ee
 		debug('get_hostname(%s): %s' % (bucket, host))
dc758146
 		return host
afe194f8
 
bb6d07ee
 	def set_hostname(self, bucket, redir_hostname):
 		self.redir_map[bucket] = redir_hostname
 
dc758146
 	def format_uri(self, resource):
b020ea02
 		if resource['bucket'] and not check_bucket_name_dns_conformity(resource['bucket']):
a208e977
 			uri = "/%s%s" % (resource['bucket'], resource['uri'])
dc758146
 		else:
 			uri = resource['uri']
a208e977
 		if self.config.proxy_host != "":
 			uri = "http://%s%s" % (self.get_hostname(resource['bucket']), uri)
dc758146
 		debug('format_uri(): ' + uri)
 		return uri
afe194f8
 
ec50b5a7
 	## Commands / Actions
f81e7fba
 	def list_all_buckets(self):
 		request = self.create_request("LIST_ALL_BUCKETS")
 		response = self.send_request(request)
 		response["list"] = getListFromXml(response["data"], "Bucket")
 		return response
 	
416741b2
 	def bucket_list(self, bucket, prefix = None, recursive = None):
d94adea9
 		def _list_truncated(data):
ac9940ec
 			## <IsTruncated> can either be "true" or "false" or be missing completely
 			is_truncated = getTextFromXml(data, ".//IsTruncated") or "false"
 			return is_truncated.lower() != "false"
d94adea9
 
 		def _get_contents(data):
 			return getListFromXml(data, "Contents")
 
36cfce67
 		def _get_common_prefixes(data):
 			return getListFromXml(data, "CommonPrefixes")
 
 		uri_params = {}
d9550f2b
 		truncated = True
 		list = []
 		prefixes = []
 
 		while truncated:
3c07424d
 			response = self.bucket_list_noparse(bucket, prefix, recursive, uri_params)
79382cd1
 			current_list = _get_contents(response["data"])
 			current_prefixes = _get_common_prefixes(response["data"])
d9550f2b
 			truncated = _list_truncated(response["data"])
 			if truncated:
79382cd1
 				if current_list:
 					uri_params['marker'] = self.urlencode_string(current_list[-1]["Key"])
d9550f2b
 				else:
79382cd1
 					uri_params['marker'] = self.urlencode_string(current_prefixes[-1]["Prefix"])
d9550f2b
 				debug("Listing continues after '%s'" % uri_params['marker'])
 
79382cd1
 			list += current_list
 			prefixes += current_prefixes
d9550f2b
 
d94adea9
 		response['list'] = list
36cfce67
 		response['common_prefixes'] = prefixes
f81e7fba
 		return response
 
3c07424d
 	def bucket_list_noparse(self, bucket, prefix = None, recursive = None, uri_params = {}):
 		if prefix:
 			uri_params['prefix'] = self.urlencode_string(prefix)
 		if not self.config.recursive and not recursive:
 			uri_params['delimiter'] = "/"
 		request = self.create_request("BUCKET_LIST", bucket = bucket, **uri_params)
 		response = self.send_request(request)
 		#debug(response)
 		return response
 
dc758146
 	def bucket_create(self, bucket, bucket_location = None):
dc1c96cf
 		headers = SortedDict(ignore_case = True)
dc758146
 		body = ""
 		if bucket_location and bucket_location.strip().upper() != "US":
9b07e5f2
 			bucket_location = bucket_location.strip()
 			if bucket_location.upper() == "EU":
 				bucket_location = bucket_location.upper()
 			else:
 				bucket_location = bucket_location.lower()
dc758146
 			body  = "<CreateBucketConfiguration><LocationConstraint>"
9b07e5f2
 			body += bucket_location
dc758146
 			body += "</LocationConstraint></CreateBucketConfiguration>"
 			debug("bucket_location: " + body)
b020ea02
 			check_bucket_name(bucket, dns_strict = True)
4716b40d
 		else:
b020ea02
 			check_bucket_name(bucket, dns_strict = False)
7143ef4f
 		if self.config.acl_public:
 			headers["x-amz-acl"] = "public-read"
afe194f8
 		request = self.create_request("BUCKET_CREATE", bucket = bucket, headers = headers)
dc758146
 		response = self.send_request(request, body)
f81e7fba
 		return response
 
 	def bucket_delete(self, bucket):
 		request = self.create_request("BUCKET_DELETE", bucket = bucket)
 		response = self.send_request(request)
 		return response
 
e5c6f6c5
 	def bucket_info(self, uri):
 		request = self.create_request("BUCKET_LIST", bucket = uri.bucket(), extra = "?location")
82157846
 		response = self.send_request(request)
dc758146
 		response['bucket-location'] = getTextFromXml(response['data'], "LocationConstraint") or "any"
82157846
 		return response
 
688964d7
 	def object_put(self, filename, uri, extra_headers = None, extra_label = ""):
fc8a5df8
 		# TODO TODO
 		# Make it consistent with stream-oriented object_get()
 		if uri.type != "s3":
 			raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
 
f81e7fba
 		if not os.path.isfile(filename):
82d9eafa
 			raise InvalidFileError(u"%s is not a regular file" % unicodise(filename))
f81e7fba
 		try:
37a8ad44
 			file = open(filename, "rb")
f81e7fba
 			size = os.stat(filename)[ST_SIZE]
 		except IOError, e:
82d9eafa
 			raise InvalidFileError(u"%s: %s" % (unicodise(filename), e.strerror))
dc1c96cf
 		headers = SortedDict(ignore_case = True)
8ec1807f
 		if extra_headers:
 			headers.update(extra_headers)
f81e7fba
 		headers["content-length"] = size
eb31131e
 		content_type = None
 		if self.config.guess_mime_type:
 			content_type = mimetypes.guess_type(filename)[0]
 		if not content_type:
 			content_type = self.config.default_mime_type
 		debug("Content-Type set to '%s'" % content_type)
 		headers["content-type"] = content_type
9b7618ae
 		if self.config.acl_public:
9081133d
 			headers["x-amz-acl"] = "public-read"
a7ea0bee
 		if self.config.reduced_redundancy:
 			headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY"
fc8a5df8
 		request = self.create_request("OBJECT_PUT", uri = uri, headers = headers)
82d9eafa
 		labels = { 'source' : unicodise(filename), 'destination' : unicodise(uri.uri()), 'extra' : extra_label }
4396d217
 		response = self.send_file(request, file, labels)
f81e7fba
 		return response
 
688964d7
 	def object_get(self, uri, stream, start_position = 0, extra_label = ""):
ed27a45e
 		if uri.type != "s3":
 			raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
fc8a5df8
 		request = self.create_request("OBJECT_GET", uri = uri)
3894a49a
 		labels = { 'source' : unicodise(uri.uri()), 'destination' : unicodise(stream.name), 'extra' : extra_label }
4396d217
 		response = self.recv_file(request, stream, labels, start_position)
f81e7fba
 		return response
ed27a45e
 
fc8a5df8
 	def object_delete(self, uri):
b819c70c
 		if uri.type != "s3":
 			raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
fc8a5df8
 		request = self.create_request("OBJECT_DELETE", uri = uri)
 		response = self.send_request(request)
 		return response
b819c70c
 
7d0ac8ee
 	def object_copy(self, src_uri, dst_uri, extra_headers = None):
 		if src_uri.type != "s3":
 			raise ValueError("Expected URI type 's3', got '%s'" % src_uri.type)
 		if dst_uri.type != "s3":
 			raise ValueError("Expected URI type 's3', got '%s'" % dst_uri.type)
dc1c96cf
 		headers = SortedDict(ignore_case = True)
7d0ac8ee
 		headers['x-amz-copy-source'] = "/%s/%s" % (src_uri.bucket(), self.urlencode_string(src_uri.object()))
e0b946c0
 		## TODO: For now COPY, later maybe add a switch?
 		headers['x-amz-metadata-directive'] = "COPY"
7d0ac8ee
 		if self.config.acl_public:
 			headers["x-amz-acl"] = "public-read"
a7ea0bee
 		if self.config.reduced_redundancy:
 			headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY"
e0b946c0
 		# if extra_headers:
 		# 	headers.update(extra_headers)
7d0ac8ee
 		request = self.create_request("OBJECT_PUT", uri = dst_uri, headers = headers)
 		response = self.send_request(request)
 		return response
 
7d61be89
 	def object_move(self, src_uri, dst_uri, extra_headers = None):
 		response_copy = self.object_copy(src_uri, dst_uri, extra_headers)
 		debug("Object %s copied to %s" % (src_uri, dst_uri))
 		if getRootTagName(response_copy["data"]) == "CopyObjectResult":
 			response_delete = self.object_delete(src_uri)
 			debug("Object %s deleted" % src_uri)
 		return response_copy
 
e5c6f6c5
 	def object_info(self, uri):
fc8a5df8
 		request = self.create_request("OBJECT_HEAD", uri = uri)
e5c6f6c5
 		response = self.send_request(request)
 		return response
 
 	def get_acl(self, uri):
 		if uri.has_object():
fc8a5df8
 			request = self.create_request("OBJECT_GET", uri = uri, extra = "?acl")
e5c6f6c5
 		else:
 			request = self.create_request("BUCKET_LIST", bucket = uri.bucket(), extra = "?acl")
7f50f846
 
e5c6f6c5
 		response = self.send_request(request)
7f50f846
 		acl = ACL(response['data'])
90137a39
 		return acl
e5c6f6c5
 
585c735a
 	def set_acl(self, uri, acl):
 		if uri.has_object():
 			request = self.create_request("OBJECT_PUT", uri = uri, extra = "?acl")
 		else:
 			request = self.create_request("BUCKET_CREATE", bucket = uri.bucket(), extra = "?acl")
 
 		body = str(acl)
 		debug(u"set_acl(%s): acl-xml: %s" % (uri, body))
 		response = self.send_request(request, body)
 		return response
 
cb0bbaef
 	def get_accesslog(self, uri):
 		request = self.create_request("BUCKET_LIST", bucket = uri.bucket(), extra = "?logging")
 		response = self.send_request(request)
 		accesslog = AccessLog(response['data'])
 		return accesslog
 
 	def set_accesslog_acl(self, uri):
 		acl = self.get_acl(uri)
 		debug("Current ACL(%s): %s" % (uri.uri(), str(acl)))
 		acl.appendGrantee(GranteeLogDelivery("READ_ACP"))
 		acl.appendGrantee(GranteeLogDelivery("WRITE"))
 		debug("Updated ACL(%s): %s" % (uri.uri(), str(acl)))
 		self.set_acl(uri, acl)
 
 	def set_accesslog(self, uri, enable, log_target_prefix_uri = None, acl_public = False):
 		request = self.create_request("BUCKET_CREATE", bucket = uri.bucket(), extra = "?logging")
 		accesslog = AccessLog()
 		if enable:
 			accesslog.enableLogging(log_target_prefix_uri)
 			accesslog.setAclPublic(acl_public)
 		else:
 			accesslog.disableLogging()
 		body = str(accesslog)
 		debug(u"set_accesslog(%s): accesslog-xml: %s" % (uri, body))
 		try:
 			response = self.send_request(request, body)
 		except S3Error, e:
 			if e.info['Code'] == "InvalidTargetBucketForLogging":
 				info("Setting up log-delivery ACL for target bucket.")
 				self.set_accesslog_acl(S3Uri("s3://%s" % log_target_prefix_uri.bucket()))
 				response = self.send_request(request, body)
 			else:
 				raise
 		return accesslog, response
 
ec50b5a7
 	## Low level methods
3c07424d
 	def urlencode_string(self, string, urlencoding_mode = None):
66528933
 		if type(string) == unicode:
 			string = string.encode("utf-8")
9676c6e6
 
3c07424d
 		if urlencoding_mode is None:
 			urlencoding_mode = self.config.urlencoding_mode
 
 		if urlencoding_mode == "verbatim":
9676c6e6
 			## Don't do any pre-processing
 			return string
 
c0e0c042
 		encoded = ""
 		## List of characters that must be escaped for S3
 		## Haven't found this in any official docs
 		## but my tests show it's more less correct.
 		## If you start getting InvalidSignature errors
 		## from S3 check the error headers returned
 		## from S3 to see whether the list hasn't
 		## changed.
 		for c in string:	# I'm not sure how to know in what encoding 
 					# 'object' is. Apparently "type(object)==str"
 					# but the contents is a string of unicode
 					# bytes, e.g. '\xc4\x8d\xc5\xafr\xc3\xa1k'
 					# Don't know what it will do on non-utf8 
 					# systems.
 					#           [hope that sounds reassuring ;-)]
 			o = ord(c)
b40dd815
 			if (o < 0x20 or o == 0x7f):
3c07424d
 				if urlencoding_mode == "fixbucket":
 					encoded += "%%%02X" % o
 				else:
 					error(u"Non-printable character 0x%02x in: %s" % (o, string))
 					error(u"Please report it to s3tools-bugs@lists.sourceforge.net")
 					encoded += replace_nonprintables(c)
b40dd815
 			elif (o == 0x20 or	# Space and below
c0e0c042
 			    o == 0x22 or	# "
 			    o == 0x23 or	# #
b40dd815
 			    o == 0x25 or	# % (escape character)
9676c6e6
 			    o == 0x26 or	# &
c0e0c042
 			    o == 0x2B or	# + (or it would become <space>)
 			    o == 0x3C or	# <
 			    o == 0x3E or	# >
 			    o == 0x3F or	# ?
 			    o == 0x60 or	# `
 			    o >= 123):   	# { and above, including >= 128 for UTF-8
 				encoded += "%%%02X" % o
 			else:
 				encoded += c
 		debug("String '%s' encoded to '%s'" % (string, encoded))
 		return encoded
 
fc8a5df8
 	def create_request(self, operation, uri = None, bucket = None, object = None, headers = None, extra = None, **params):
dc758146
 		resource = { 'bucket' : None, 'uri' : "/" }
fc8a5df8
 
 		if uri and (bucket or object):
 			raise ValueError("Both 'uri' and either 'bucket' or 'object' parameters supplied")
 		## If URI is given use that instead of bucket/object parameters
 		if uri:
 			bucket = uri.bucket()
 			object = uri.has_object() and uri.object() or None
 
f81e7fba
 		if bucket:
dc758146
 			resource['bucket'] = str(bucket)
f81e7fba
 			if object:
dc758146
 				resource['uri'] = "/" + self.urlencode_string(object)
 		if extra:
 			resource['uri'] += extra
f81e7fba
 
17901aa1
 		method_string = S3.http_methods.getkey(S3.operations[operation] & S3.http_methods["MASK"])
fa664913
 
17901aa1
 		request = S3Request(self, method_string, resource, headers, params)
f81e7fba
 
dc758146
 		debug("CreateRequest: resource[uri]=" + resource['uri'])
17901aa1
 		return request
f81e7fba
 	
946e9636
 	def _fail_wait(self, retries):
 		# Wait a few seconds. The more it fails the more we wait.
 		return (self._max_retries - retries + 1) * 3
 		
 	def send_request(self, request, body = None, retries = _max_retries):
17901aa1
 		method_string, resource, headers = request.get_triplet()
89a93383
 		debug("Processing request, please wait...")
96b50049
 		if not headers.has_key('content-length'):
 			headers['content-length'] = body and len(body) or 0
00f5f67e
 		try:
 			conn = self.get_connection(resource['bucket'])
 			conn.request(method_string, self.format_uri(resource), body, headers)
 			response = {}
 			http_response = conn.getresponse()
 			response["status"] = http_response.status
 			response["reason"] = http_response.reason
 			response["headers"] = convertTupleListToDict(http_response.getheaders())
 			response["data"] =  http_response.read()
 			debug("Response: " + str(response))
 			conn.close()
f228b03f
 		except Exception, e:
00f5f67e
 			if retries:
f228b03f
 				warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
946e9636
 				warning("Waiting %d sec..." % self._fail_wait(retries))
 				time.sleep(self._fail_wait(retries))
00f5f67e
 				return self.send_request(request, body, retries - 1)
 			else:
 				raise S3RequestError("Request failed for: %s" % resource['uri'])
bb6d07ee
 
 		if response["status"] == 307:
 			## RedirectPermanent
 			redir_bucket = getTextFromXml(response['data'], ".//Bucket")
 			redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
 			self.set_hostname(redir_bucket, redir_hostname)
89a93383
 			warning("Redirected to: %s" % (redir_hostname))
bb6d07ee
 			return self.send_request(request, body)
 
f228b03f
 		if response["status"] >= 500:
 			e = S3Error(response)
00f5f67e
 			if retries:
f228b03f
 				warning(u"Retrying failed request: %s" % resource['uri'])
 				warning(unicode(e))
946e9636
 				warning("Waiting %d sec..." % self._fail_wait(retries))
 				time.sleep(self._fail_wait(retries))
00f5f67e
 				return self.send_request(request, body, retries - 1)
 			else:
f228b03f
 				raise e
 
 		if response["status"] < 200 or response["status"] > 299:
 			raise S3Error(response)
 
f81e7fba
 		return response
 
4396d217
 	def send_file(self, request, file, labels, throttle = 0, retries = _max_retries):
17901aa1
 		method_string, resource, headers = request.get_triplet()
2d7ceec9
 		size_left = size_total = headers.get("content-length")
 		if self.config.progress_meter:
4396d217
 			progress = self.config.progress_class(labels, size_total)
2d7ceec9
 		else:
 			info("Sending file '%s', please wait..." % file.name)
 		timestamp_start = time.time()
946e9636
 		try:
 			conn = self.get_connection(resource['bucket'])
 			conn.connect()
 			conn.putrequest(method_string, self.format_uri(resource))
 			for header in headers.keys():
 				conn.putheader(header, str(headers[header]))
 			conn.endheaders()
 		except Exception, e:
9197e62e
 			if self.config.progress_meter:
 				progress.done("failed")
946e9636
 			if retries:
 				warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
 				warning("Waiting %d sec..." % self._fail_wait(retries))
 				time.sleep(self._fail_wait(retries))
 				# Connection error -> same throttle value
4396d217
 				return self.send_file(request, file, labels, throttle, retries - 1)
946e9636
 			else:
9197e62e
 				raise S3UploadError("Upload failed for: %s" % resource['uri'])
bb6d07ee
 		file.seek(0)
0b2aefe3
 		md5_hash = md5()
946e9636
 		try:
 			while (size_left > 0):
d56ec482
 				#debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name))
946e9636
 				data = file.read(self.config.send_chunk)
 				md5_hash.update(data)
9197e62e
 				conn.send(data)
2d7ceec9
 				if self.config.progress_meter:
946e9636
 					progress.update(delta_position = len(data))
 				size_left -= len(data)
 				if throttle:
 					time.sleep(throttle)
 			md5_computed = md5_hash.hexdigest()
 			response = {}
 			http_response = conn.getresponse()
 			response["status"] = http_response.status
 			response["reason"] = http_response.reason
 			response["headers"] = convertTupleListToDict(http_response.getheaders())
 			response["data"] = http_response.read()
 			response["size"] = size_total
 			conn.close()
d56ec482
 			debug(u"Response: %s" % response)
946e9636
 		except Exception, e:
 			if self.config.progress_meter:
 				progress.done("failed")
 			if retries:
1013a133
 				if retries < self._max_retries:
2d2983d1
 					throttle = throttle and throttle * 5 or 0.01
9197e62e
 				warning("Upload failed: %s (%s)" % (resource['uri'], e))
946e9636
 				warning("Retrying on lower speed (throttle=%0.2f)" % throttle)
 				warning("Waiting %d sec..." % self._fail_wait(retries))
 				time.sleep(self._fail_wait(retries))
 				# Connection error -> same throttle value
4396d217
 				return self.send_file(request, file, labels, throttle, retries - 1)
946e9636
 			else:
 				debug("Giving up on '%s' %s" % (file.name, e))
9197e62e
 				raise S3UploadError("Upload failed for: %s" % resource['uri'])
bb6d07ee
 
2d7ceec9
 		timestamp_end = time.time()
 		response["elapsed"] = timestamp_end - timestamp_start
 		response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
 
 		if self.config.progress_meter:
 			## The above conn.close() takes some time -> update() progress meter
 			## to correct the average speed. Otherwise people will complain that 
 			## 'progress' and response["speed"] are inconsistent ;-)
 			progress.update()
 			progress.done("done")
 
bb6d07ee
 		if response["status"] == 307:
 			## RedirectPermanent
 			redir_bucket = getTextFromXml(response['data'], ".//Bucket")
 			redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
 			self.set_hostname(redir_bucket, redir_hostname)
89a93383
 			warning("Redirected to: %s" % (redir_hostname))
4396d217
 			return self.send_file(request, file, labels)
bb6d07ee
 
493c0724
 		# S3 from time to time doesn't send ETag back in a response :-(
 		# Force re-upload here.
 		if not response['headers'].has_key('etag'):
 			response['headers']['etag'] = '' 
 
d56ec482
 		if response["status"] < 200 or response["status"] > 299:
46631fce
 			try_retry = False
d56ec482
 			if response["status"] >= 500:
 				## AWS internal error - retry
46631fce
 				try_retry = True
 			elif response["status"] >= 400:
 				err = S3Error(response)
 				## Retriable client error?
 				if err.code in [ 'BadDigest', 'OperationAborted', 'TokenRefreshRequired', 'RequestTimeout' ]:
 					try_retry = True
 
 			if try_retry:
d56ec482
 				if retries:
 					warning("Upload failed: %s (%s)" % (resource['uri'], S3Error(response)))
 					warning("Waiting %d sec..." % self._fail_wait(retries))
 					time.sleep(self._fail_wait(retries))
 					return self.send_file(request, file, labels, throttle, retries - 1)
 				else:
 					warning("Too many failures. Giving up on '%s'" % (file.name))
 					raise S3UploadError
46631fce
 
d56ec482
 			## Non-recoverable error
 			raise S3Error(response)
 
e7906cf0
 		debug("MD5 sums: computed=%s, received=%s" % (md5_computed, response["headers"]["etag"]))
 		if response["headers"]["etag"].strip('"\'') != md5_hash.hexdigest():
 			warning("MD5 Sums don't match!")
 			if retries:
89a93383
 				warning("Retrying upload of %s" % (file.name))
4396d217
 				return self.send_file(request, file, labels, throttle, retries - 1)
e7906cf0
 			else:
89a93383
 				warning("Too many failures. Giving up on '%s'" % (file.name))
e7906cf0
 				raise S3UploadError
 
f81e7fba
 		return response
 
4396d217
 	def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries):
17901aa1
 		method_string, resource, headers = request.get_triplet()
2d7ceec9
 		if self.config.progress_meter:
4396d217
 			progress = self.config.progress_class(labels, 0)
2d7ceec9
 		else:
 			info("Receiving file '%s', please wait..." % stream.name)
 		timestamp_start = time.time()
9197e62e
 		try:
 			conn = self.get_connection(resource['bucket'])
 			conn.connect()
 			conn.putrequest(method_string, self.format_uri(resource))
 			for header in headers.keys():
 				conn.putheader(header, str(headers[header]))
 			if start_position > 0:
 				debug("Requesting Range: %d .. end" % start_position)
 				conn.putheader("Range", "bytes=%d-" % start_position)
 			conn.endheaders()
 			response = {}
 			http_response = conn.getresponse()
 			response["status"] = http_response.status
 			response["reason"] = http_response.reason
 			response["headers"] = convertTupleListToDict(http_response.getheaders())
 			debug("Response: %s" % response)
 		except Exception, e:
 			if self.config.progress_meter:
 				progress.done("failed")
 			if retries:
 				warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
 				warning("Waiting %d sec..." % self._fail_wait(retries))
 				time.sleep(self._fail_wait(retries))
 				# Connection error -> same throttle value
4396d217
 				return self.recv_file(request, stream, labels, start_position, retries - 1)
9197e62e
 			else:
 				raise S3DownloadError("Download failed for: %s" % resource['uri'])
bb6d07ee
 
 		if response["status"] == 307:
 			## RedirectPermanent
 			response['data'] = http_response.read()
 			redir_bucket = getTextFromXml(response['data'], ".//Bucket")
 			redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
 			self.set_hostname(redir_bucket, redir_hostname)
89a93383
 			warning("Redirected to: %s" % (redir_hostname))
4396d217
 			return self.recv_file(request, stream, labels)
bb6d07ee
 
f81e7fba
 		if response["status"] < 200 or response["status"] > 299:
 			raise S3Error(response)
 
9197e62e
 		if start_position == 0:
 			# Only compute MD5 on the fly if we're downloading from beginning
 			# Otherwise we'd get a nonsense.
0b2aefe3
 			md5_hash = md5()
9197e62e
 		size_left = int(response["headers"]["content-length"])
 		size_total = start_position + size_left
 		current_position = start_position
 
2d7ceec9
 		if self.config.progress_meter:
 			progress.total_size = size_total
9197e62e
 			progress.initial_position = current_position
 			progress.current_position = current_position
 
 		try:
 			while (current_position < size_total):
 				this_chunk = size_left > self.config.recv_chunk and self.config.recv_chunk or size_left
 				data = http_response.read(this_chunk)
 				stream.write(data)
 				if start_position == 0:
 					md5_hash.update(data)
 				current_position += len(data)
 				## Call progress meter from here...
 				if self.config.progress_meter:
 					progress.update(delta_position = len(data))
 			conn.close()
 		except Exception, e:
2d7ceec9
 			if self.config.progress_meter:
9197e62e
 				progress.done("failed")
 			if retries:
 				warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
 				warning("Waiting %d sec..." % self._fail_wait(retries))
 				time.sleep(self._fail_wait(retries))
 				# Connection error -> same throttle value
4396d217
 				return self.recv_file(request, stream, labels, current_position, retries - 1)
2d7ceec9
 			else:
9197e62e
 				raise S3DownloadError("Download failed for: %s" % resource['uri'])
 
 		stream.flush()
63ba9974
 		timestamp_end = time.time()
9197e62e
 
25f6f8c9
 		if self.config.progress_meter:
 			## The above stream.flush() may take some time -> update() progress meter
 			## to correct the average speed. Otherwise people will complain that 
 			## 'progress' and response["speed"] are inconsistent ;-)
 			progress.update()
 			progress.done("done")
 
9197e62e
 		if start_position == 0:
 			# Only compute MD5 on the fly if we were downloading from the beginning
 			response["md5"] = md5_hash.hexdigest()
 		else:
 			# Otherwise try to compute MD5 of the output file
 			try:
 				response["md5"] = hash_file_md5(stream.name)
 			except IOError, e:
 				if e.errno != errno.ENOENT:
 					warning("Unable to open file: %s: %s" % (stream.name, e))
 				warning("Unable to verify MD5. Assume it matches.")
 				response["md5"] = response["headers"]["etag"]
 
f81e7fba
 		response["md5match"] = response["headers"]["etag"].find(response["md5"]) >= 0
63ba9974
 		response["elapsed"] = timestamp_end - timestamp_start
9197e62e
 		response["size"] = current_position
711ec4a2
 		response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
9197e62e
 		if response["size"] != start_position + long(response["headers"]["content-length"]):
f98a27f2
 			warning("Reported size (%s) does not match received size (%s)" % (
9197e62e
 				start_position + response["headers"]["content-length"], response["size"]))
f81e7fba
 		debug("ReceiveFile: Computed MD5 = %s" % response["md5"])
 		if not response["md5match"]:
 			warning("MD5 signatures do not match: computed=%s, received=%s" % (
 				response["md5"], response["headers"]["etag"]))
 		return response
cb0bbaef
 __all__.append("S3")