S3/S3.py
ec50b5a7
 ## Amazon S3 manager
 ## Author: Michal Ludvig <michal@logix.cz>
 ##         http://www.logix.cz/michal
 ## License: GPL Version 2
 
f98a27f2
 import sys
f81e7fba
 import os, os.path
946e9636
 import time
f81e7fba
 import httplib
 import logging
eb31131e
 import mimetypes
f81e7fba
 from logging import debug, info, warning, error
 from stat import ST_SIZE
 
0b2aefe3
 try:
0b8ea559
 	from hashlib import md5
0b2aefe3
 except ImportError:
3c682455
 	from md5 import md5
0b2aefe3
 
f81e7fba
 from Utils import *
 from SortedDict import SortedDict
 from BidirMap import BidirMap
b008e471
 from Config import Config
6bb385f2
 from Exceptions import *
7f50f846
 from ACL import ACL
f81e7fba
 
eb9c54ec
 class S3(object):
f81e7fba
 	http_methods = BidirMap(
 		GET = 0x01,
 		PUT = 0x02,
 		HEAD = 0x04,
 		DELETE = 0x08,
 		MASK = 0x0F,
 		)
 	
 	targets = BidirMap(
 		SERVICE = 0x0100,
 		BUCKET = 0x0200,
 		OBJECT = 0x0400,
 		MASK = 0x0700,
 		)
 
 	operations = BidirMap(
 		UNDFINED = 0x0000,
 		LIST_ALL_BUCKETS = targets["SERVICE"] | http_methods["GET"],
 		BUCKET_CREATE = targets["BUCKET"] | http_methods["PUT"],
 		BUCKET_LIST = targets["BUCKET"] | http_methods["GET"],
 		BUCKET_DELETE = targets["BUCKET"] | http_methods["DELETE"],
 		OBJECT_PUT = targets["OBJECT"] | http_methods["PUT"],
 		OBJECT_GET = targets["OBJECT"] | http_methods["GET"],
 		OBJECT_HEAD = targets["OBJECT"] | http_methods["HEAD"],
 		OBJECT_DELETE = targets["OBJECT"] | http_methods["DELETE"],
 	)
 
 	codes = {
 		"NoSuchBucket" : "Bucket '%s' does not exist",
 		"AccessDenied" : "Access to bucket '%s' was denied",
 		"BucketAlreadyExists" : "Bucket '%s' already exists",
 		}
 
bb6d07ee
 	## S3 sometimes sends HTTP-307 response 
 	redir_map = {}
 
946e9636
 	## Maximum attempts of re-issuing failed requests
 	_max_retries = 5
 
f81e7fba
 	def __init__(self, config):
 		self.config = config
 
dc758146
 	def get_connection(self, bucket):
afe194f8
 		if self.config.proxy_host != "":
 			return httplib.HTTPConnection(self.config.proxy_host, self.config.proxy_port)
 		else:
dc758146
 			if self.config.use_https:
 				return httplib.HTTPSConnection(self.get_hostname(bucket))
 			else:
 				return httplib.HTTPConnection(self.get_hostname(bucket))
afe194f8
 
dc758146
 	def get_hostname(self, bucket):
97233238
 		if bucket and self.check_bucket_name_dns_conformity(bucket):
bb6d07ee
 			if self.redir_map.has_key(bucket):
 				host = self.redir_map[bucket]
 			else:
 				host = self.config.host_bucket % { 'bucket' : bucket }
dc758146
 		else:
 			host = self.config.host_base
bb6d07ee
 		debug('get_hostname(%s): %s' % (bucket, host))
dc758146
 		return host
afe194f8
 
bb6d07ee
 	def set_hostname(self, bucket, redir_hostname):
 		self.redir_map[bucket] = redir_hostname
 
dc758146
 	def format_uri(self, resource):
97233238
 		if resource['bucket'] and not self.check_bucket_name_dns_conformity(resource['bucket']):
a208e977
 			uri = "/%s%s" % (resource['bucket'], resource['uri'])
dc758146
 		else:
 			uri = resource['uri']
a208e977
 		if self.config.proxy_host != "":
 			uri = "http://%s%s" % (self.get_hostname(resource['bucket']), uri)
dc758146
 		debug('format_uri(): ' + uri)
 		return uri
afe194f8
 
ec50b5a7
 	## Commands / Actions
f81e7fba
 	def list_all_buckets(self):
 		request = self.create_request("LIST_ALL_BUCKETS")
 		response = self.send_request(request)
 		response["list"] = getListFromXml(response["data"], "Bucket")
 		return response
 	
416741b2
 	def bucket_list(self, bucket, prefix = None, recursive = None):
d94adea9
 		def _list_truncated(data):
ac9940ec
 			## <IsTruncated> can either be "true" or "false" or be missing completely
 			is_truncated = getTextFromXml(data, ".//IsTruncated") or "false"
 			return is_truncated.lower() != "false"
d94adea9
 
 		def _get_contents(data):
 			return getListFromXml(data, "Contents")
 
36cfce67
 		def _get_common_prefixes(data):
 			return getListFromXml(data, "CommonPrefixes")
 
 		uri_params = {}
 		if prefix:
 			uri_params['prefix'] = self.urlencode_string(prefix)
416741b2
 		if not self.config.recursive and not recursive:
36cfce67
 			uri_params['delimiter'] = "/"
 		request = self.create_request("BUCKET_LIST", bucket = bucket, **uri_params)
f81e7fba
 		response = self.send_request(request)
0d91ff3f
 		#debug(response)
d94adea9
 		list = _get_contents(response["data"])
36cfce67
 		prefixes = _get_common_prefixes(response["data"])
d94adea9
 		while _list_truncated(response["data"]):
36cfce67
 			uri_params['marker'] = self.urlencode_string(list[-1]["Key"])
416741b2
 			debug("Listing continues after '%s'" % uri_params['marker'])
36cfce67
 			request = self.create_request("BUCKET_LIST", bucket = bucket, **uri_params)
d94adea9
 			response = self.send_request(request)
 			list += _get_contents(response["data"])
36cfce67
 			prefixes += _get_common_prefixes(response["data"])
d94adea9
 		response['list'] = list
36cfce67
 		response['common_prefixes'] = prefixes
f81e7fba
 		return response
 
dc758146
 	def bucket_create(self, bucket, bucket_location = None):
afe194f8
 		headers = SortedDict()
dc758146
 		body = ""
 		if bucket_location and bucket_location.strip().upper() != "US":
 			body  = "<CreateBucketConfiguration><LocationConstraint>"
 			body += bucket_location.strip().upper()
 			body += "</LocationConstraint></CreateBucketConfiguration>"
 			debug("bucket_location: " + body)
4716b40d
 			self.check_bucket_name(bucket, dns_strict = True)
 		else:
 			self.check_bucket_name(bucket, dns_strict = False)
7143ef4f
 		if self.config.acl_public:
 			headers["x-amz-acl"] = "public-read"
afe194f8
 		request = self.create_request("BUCKET_CREATE", bucket = bucket, headers = headers)
dc758146
 		response = self.send_request(request, body)
f81e7fba
 		return response
 
 	def bucket_delete(self, bucket):
 		request = self.create_request("BUCKET_DELETE", bucket = bucket)
 		response = self.send_request(request)
 		return response
 
e5c6f6c5
 	def bucket_info(self, uri):
 		request = self.create_request("BUCKET_LIST", bucket = uri.bucket(), extra = "?location")
82157846
 		response = self.send_request(request)
dc758146
 		response['bucket-location'] = getTextFromXml(response['data'], "LocationConstraint") or "any"
82157846
 		return response
 
688964d7
 	def object_put(self, filename, uri, extra_headers = None, extra_label = ""):
fc8a5df8
 		# TODO TODO
 		# Make it consistent with stream-oriented object_get()
 		if uri.type != "s3":
 			raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
 
f81e7fba
 		if not os.path.isfile(filename):
82d9eafa
 			raise InvalidFileError(u"%s is not a regular file" % unicodise(filename))
f81e7fba
 		try:
37a8ad44
 			file = open(filename, "rb")
f81e7fba
 			size = os.stat(filename)[ST_SIZE]
 		except IOError, e:
82d9eafa
 			raise InvalidFileError(u"%s: %s" % (unicodise(filename), e.strerror))
f81e7fba
 		headers = SortedDict()
8ec1807f
 		if extra_headers:
 			headers.update(extra_headers)
f81e7fba
 		headers["content-length"] = size
eb31131e
 		content_type = None
 		if self.config.guess_mime_type:
 			content_type = mimetypes.guess_type(filename)[0]
 		if not content_type:
 			content_type = self.config.default_mime_type
 		debug("Content-Type set to '%s'" % content_type)
 		headers["content-type"] = content_type
9b7618ae
 		if self.config.acl_public:
9081133d
 			headers["x-amz-acl"] = "public-read"
fc8a5df8
 		request = self.create_request("OBJECT_PUT", uri = uri, headers = headers)
82d9eafa
 		labels = { 'source' : unicodise(filename), 'destination' : unicodise(uri.uri()), 'extra' : extra_label }
4396d217
 		response = self.send_file(request, file, labels)
f81e7fba
 		return response
 
688964d7
 	def object_get(self, uri, stream, start_position = 0, extra_label = ""):
ed27a45e
 		if uri.type != "s3":
 			raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
fc8a5df8
 		request = self.create_request("OBJECT_GET", uri = uri)
3894a49a
 		labels = { 'source' : unicodise(uri.uri()), 'destination' : unicodise(stream.name), 'extra' : extra_label }
4396d217
 		response = self.recv_file(request, stream, labels, start_position)
f81e7fba
 		return response
ed27a45e
 
fc8a5df8
 	def object_delete(self, uri):
b819c70c
 		if uri.type != "s3":
 			raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
fc8a5df8
 		request = self.create_request("OBJECT_DELETE", uri = uri)
 		response = self.send_request(request)
 		return response
b819c70c
 
7d0ac8ee
 	def object_copy(self, src_uri, dst_uri, extra_headers = None):
 		if src_uri.type != "s3":
 			raise ValueError("Expected URI type 's3', got '%s'" % src_uri.type)
 		if dst_uri.type != "s3":
 			raise ValueError("Expected URI type 's3', got '%s'" % dst_uri.type)
 		headers = SortedDict()
 		headers['x-amz-copy-source'] = "/%s/%s" % (src_uri.bucket(), self.urlencode_string(src_uri.object()))
 		if self.config.acl_public:
 			headers["x-amz-acl"] = "public-read"
 		if extra_headers:
 			headers.update(extra_headers)
 		request = self.create_request("OBJECT_PUT", uri = dst_uri, headers = headers)
 		response = self.send_request(request)
 		return response
 
7d61be89
 	def object_move(self, src_uri, dst_uri, extra_headers = None):
 		response_copy = self.object_copy(src_uri, dst_uri, extra_headers)
 		debug("Object %s copied to %s" % (src_uri, dst_uri))
 		if getRootTagName(response_copy["data"]) == "CopyObjectResult":
 			response_delete = self.object_delete(src_uri)
 			debug("Object %s deleted" % src_uri)
 		return response_copy
 
e5c6f6c5
 	def object_info(self, uri):
fc8a5df8
 		request = self.create_request("OBJECT_HEAD", uri = uri)
e5c6f6c5
 		response = self.send_request(request)
 		return response
 
 	def get_acl(self, uri):
 		if uri.has_object():
fc8a5df8
 			request = self.create_request("OBJECT_GET", uri = uri, extra = "?acl")
e5c6f6c5
 		else:
 			request = self.create_request("BUCKET_LIST", bucket = uri.bucket(), extra = "?acl")
7f50f846
 
e5c6f6c5
 		response = self.send_request(request)
7f50f846
 		acl = ACL(response['data'])
90137a39
 		return acl
e5c6f6c5
 
585c735a
 	def set_acl(self, uri, acl):
 		if uri.has_object():
 			request = self.create_request("OBJECT_PUT", uri = uri, extra = "?acl")
 		else:
 			request = self.create_request("BUCKET_CREATE", bucket = uri.bucket(), extra = "?acl")
 
 		body = str(acl)
 		debug(u"set_acl(%s): acl-xml: %s" % (uri, body))
 		response = self.send_request(request, body)
 		return response
 
ec50b5a7
 	## Low level methods
c0e0c042
 	def urlencode_string(self, string):
66528933
 		if type(string) == unicode:
 			string = string.encode("utf-8")
c0e0c042
 		encoded = ""
 		## List of characters that must be escaped for S3
 		## Haven't found this in any official docs
 		## but my tests show it's more less correct.
 		## If you start getting InvalidSignature errors
 		## from S3 check the error headers returned
 		## from S3 to see whether the list hasn't
 		## changed.
 		for c in string:	# I'm not sure how to know in what encoding 
 					# 'object' is. Apparently "type(object)==str"
 					# but the contents is a string of unicode
 					# bytes, e.g. '\xc4\x8d\xc5\xafr\xc3\xa1k'
 					# Don't know what it will do on non-utf8 
 					# systems.
 					#           [hope that sounds reassuring ;-)]
 			o = ord(c)
 			if (o <= 32 or		# Space and below
 			    o == 0x22 or	# "
 			    o == 0x23 or	# #
 			    o == 0x25 or	# %
 			    o == 0x2B or	# + (or it would become <space>)
 			    o == 0x3C or	# <
 			    o == 0x3E or	# >
 			    o == 0x3F or	# ?
 			    o == 0x5B or	# [
 			    o == 0x5C or	# \
 			    o == 0x5D or	# ]
 			    o == 0x5E or	# ^
 			    o == 0x60 or	# `
 			    o >= 123):   	# { and above, including >= 128 for UTF-8
 				encoded += "%%%02X" % o
 			else:
 				encoded += c
 		debug("String '%s' encoded to '%s'" % (string, encoded))
 		return encoded
 
fc8a5df8
 	def create_request(self, operation, uri = None, bucket = None, object = None, headers = None, extra = None, **params):
dc758146
 		resource = { 'bucket' : None, 'uri' : "/" }
fc8a5df8
 
 		if uri and (bucket or object):
 			raise ValueError("Both 'uri' and either 'bucket' or 'object' parameters supplied")
 		## If URI is given use that instead of bucket/object parameters
 		if uri:
 			bucket = uri.bucket()
 			object = uri.has_object() and uri.object() or None
 
f81e7fba
 		if bucket:
dc758146
 			resource['bucket'] = str(bucket)
f81e7fba
 			if object:
dc758146
 				resource['uri'] = "/" + self.urlencode_string(object)
 		if extra:
 			resource['uri'] += extra
f81e7fba
 
 		if not headers:
 			headers = SortedDict()
 
fa664913
 		debug("headers: %s" % headers)
 
f81e7fba
 		if headers.has_key("date"):
 			if not headers.has_key("x-amz-date"):
 				headers["x-amz-date"] = headers["date"]
 			del(headers["date"])
 		
 		if not headers.has_key("x-amz-date"):
c4761f22
 			headers["x-amz-date"] = time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime())
f81e7fba
 
 		method_string = S3.http_methods.getkey(S3.operations[operation] & S3.http_methods["MASK"])
 		signature = self.sign_headers(method_string, resource, headers)
 		headers["Authorization"] = "AWS "+self.config.access_key+":"+signature
f4555c39
 		param_str = ""
 		for param in params:
 			if params[param] not in (None, ""):
 				param_str += "&%s=%s" % (param, params[param])
82157846
 			else:
 				param_str += "&%s" % param
f4555c39
 		if param_str != "":
dc758146
 			resource['uri'] += "?" + param_str[1:]
 		debug("CreateRequest: resource[uri]=" + resource['uri'])
f81e7fba
 		return (method_string, resource, headers)
 	
946e9636
 	def _fail_wait(self, retries):
 		# Wait a few seconds. The more it fails the more we wait.
 		return (self._max_retries - retries + 1) * 3
 		
 	def send_request(self, request, body = None, retries = _max_retries):
f81e7fba
 		method_string, resource, headers = request
89a93383
 		debug("Processing request, please wait...")
96b50049
 		if not headers.has_key('content-length'):
 			headers['content-length'] = body and len(body) or 0
00f5f67e
 		try:
 			conn = self.get_connection(resource['bucket'])
 			conn.request(method_string, self.format_uri(resource), body, headers)
 			response = {}
 			http_response = conn.getresponse()
 			response["status"] = http_response.status
 			response["reason"] = http_response.reason
 			response["headers"] = convertTupleListToDict(http_response.getheaders())
 			response["data"] =  http_response.read()
 			debug("Response: " + str(response))
 			conn.close()
f228b03f
 		except Exception, e:
00f5f67e
 			if retries:
f228b03f
 				warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
946e9636
 				warning("Waiting %d sec..." % self._fail_wait(retries))
 				time.sleep(self._fail_wait(retries))
00f5f67e
 				return self.send_request(request, body, retries - 1)
 			else:
 				raise S3RequestError("Request failed for: %s" % resource['uri'])
bb6d07ee
 
 		if response["status"] == 307:
 			## RedirectPermanent
 			redir_bucket = getTextFromXml(response['data'], ".//Bucket")
 			redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
 			self.set_hostname(redir_bucket, redir_hostname)
89a93383
 			warning("Redirected to: %s" % (redir_hostname))
bb6d07ee
 			return self.send_request(request, body)
 
f228b03f
 		if response["status"] >= 500:
 			e = S3Error(response)
00f5f67e
 			if retries:
f228b03f
 				warning(u"Retrying failed request: %s" % resource['uri'])
 				warning(unicode(e))
946e9636
 				warning("Waiting %d sec..." % self._fail_wait(retries))
 				time.sleep(self._fail_wait(retries))
00f5f67e
 				return self.send_request(request, body, retries - 1)
 			else:
f228b03f
 				raise e
 
 		if response["status"] < 200 or response["status"] > 299:
 			raise S3Error(response)
 
f81e7fba
 		return response
 
4396d217
 	def send_file(self, request, file, labels, throttle = 0, retries = _max_retries):
f81e7fba
 		method_string, resource, headers = request
2d7ceec9
 		size_left = size_total = headers.get("content-length")
 		if self.config.progress_meter:
4396d217
 			progress = self.config.progress_class(labels, size_total)
2d7ceec9
 		else:
 			info("Sending file '%s', please wait..." % file.name)
 		timestamp_start = time.time()
946e9636
 		try:
 			conn = self.get_connection(resource['bucket'])
 			conn.connect()
 			conn.putrequest(method_string, self.format_uri(resource))
 			for header in headers.keys():
 				conn.putheader(header, str(headers[header]))
 			conn.endheaders()
 		except Exception, e:
9197e62e
 			if self.config.progress_meter:
 				progress.done("failed")
946e9636
 			if retries:
 				warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
 				warning("Waiting %d sec..." % self._fail_wait(retries))
 				time.sleep(self._fail_wait(retries))
 				# Connection error -> same throttle value
4396d217
 				return self.send_file(request, file, labels, throttle, retries - 1)
946e9636
 			else:
9197e62e
 				raise S3UploadError("Upload failed for: %s" % resource['uri'])
bb6d07ee
 		file.seek(0)
0b2aefe3
 		md5_hash = md5()
946e9636
 		try:
 			while (size_left > 0):
d56ec482
 				#debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name))
946e9636
 				data = file.read(self.config.send_chunk)
 				md5_hash.update(data)
9197e62e
 				conn.send(data)
2d7ceec9
 				if self.config.progress_meter:
946e9636
 					progress.update(delta_position = len(data))
 				size_left -= len(data)
 				if throttle:
 					time.sleep(throttle)
 			md5_computed = md5_hash.hexdigest()
 			response = {}
 			http_response = conn.getresponse()
 			response["status"] = http_response.status
 			response["reason"] = http_response.reason
 			response["headers"] = convertTupleListToDict(http_response.getheaders())
 			response["data"] = http_response.read()
 			response["size"] = size_total
 			conn.close()
d56ec482
 			debug(u"Response: %s" % response)
946e9636
 		except Exception, e:
 			if self.config.progress_meter:
 				progress.done("failed")
 			if retries:
 				throttle = throttle and throttle * 5 or 0.01
9197e62e
 				warning("Upload failed: %s (%s)" % (resource['uri'], e))
946e9636
 				warning("Retrying on lower speed (throttle=%0.2f)" % throttle)
 				warning("Waiting %d sec..." % self._fail_wait(retries))
 				time.sleep(self._fail_wait(retries))
 				# Connection error -> same throttle value
4396d217
 				return self.send_file(request, file, labels, throttle, retries - 1)
946e9636
 			else:
 				debug("Giving up on '%s' %s" % (file.name, e))
9197e62e
 				raise S3UploadError("Upload failed for: %s" % resource['uri'])
bb6d07ee
 
2d7ceec9
 		timestamp_end = time.time()
 		response["elapsed"] = timestamp_end - timestamp_start
 		response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
 
 		if self.config.progress_meter:
 			## The above conn.close() takes some time -> update() progress meter
 			## to correct the average speed. Otherwise people will complain that 
 			## 'progress' and response["speed"] are inconsistent ;-)
 			progress.update()
 			progress.done("done")
 
bb6d07ee
 		if response["status"] == 307:
 			## RedirectPermanent
 			redir_bucket = getTextFromXml(response['data'], ".//Bucket")
 			redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
 			self.set_hostname(redir_bucket, redir_hostname)
89a93383
 			warning("Redirected to: %s" % (redir_hostname))
4396d217
 			return self.send_file(request, file, labels)
bb6d07ee
 
493c0724
 		# S3 from time to time doesn't send ETag back in a response :-(
 		# Force re-upload here.
 		if not response['headers'].has_key('etag'):
 			response['headers']['etag'] = '' 
 
d56ec482
 		if response["status"] < 200 or response["status"] > 299:
 			if response["status"] >= 500:
 				## AWS internal error - retry
 				if retries:
 					warning("Upload failed: %s (%s)" % (resource['uri'], S3Error(response)))
 					warning("Waiting %d sec..." % self._fail_wait(retries))
 					time.sleep(self._fail_wait(retries))
 					return self.send_file(request, file, labels, throttle, retries - 1)
 				else:
 					warning("Too many failures. Giving up on '%s'" % (file.name))
 					raise S3UploadError
 			## Non-recoverable error
 			raise S3Error(response)
 
e7906cf0
 		debug("MD5 sums: computed=%s, received=%s" % (md5_computed, response["headers"]["etag"]))
 		if response["headers"]["etag"].strip('"\'') != md5_hash.hexdigest():
 			warning("MD5 Sums don't match!")
 			if retries:
89a93383
 				warning("Retrying upload of %s" % (file.name))
4396d217
 				return self.send_file(request, file, labels, throttle, retries - 1)
e7906cf0
 			else:
89a93383
 				warning("Too many failures. Giving up on '%s'" % (file.name))
e7906cf0
 				raise S3UploadError
 
f81e7fba
 		return response
 
4396d217
 	def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries):
f81e7fba
 		method_string, resource, headers = request
2d7ceec9
 		if self.config.progress_meter:
4396d217
 			progress = self.config.progress_class(labels, 0)
2d7ceec9
 		else:
 			info("Receiving file '%s', please wait..." % stream.name)
 		timestamp_start = time.time()
9197e62e
 		try:
 			conn = self.get_connection(resource['bucket'])
 			conn.connect()
 			conn.putrequest(method_string, self.format_uri(resource))
 			for header in headers.keys():
 				conn.putheader(header, str(headers[header]))
 			if start_position > 0:
 				debug("Requesting Range: %d .. end" % start_position)
 				conn.putheader("Range", "bytes=%d-" % start_position)
 			conn.endheaders()
 			response = {}
 			http_response = conn.getresponse()
 			response["status"] = http_response.status
 			response["reason"] = http_response.reason
 			response["headers"] = convertTupleListToDict(http_response.getheaders())
 			debug("Response: %s" % response)
 		except Exception, e:
 			if self.config.progress_meter:
 				progress.done("failed")
 			if retries:
 				warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
 				warning("Waiting %d sec..." % self._fail_wait(retries))
 				time.sleep(self._fail_wait(retries))
 				# Connection error -> same throttle value
4396d217
 				return self.recv_file(request, stream, labels, start_position, retries - 1)
9197e62e
 			else:
 				raise S3DownloadError("Download failed for: %s" % resource['uri'])
bb6d07ee
 
 		if response["status"] == 307:
 			## RedirectPermanent
 			response['data'] = http_response.read()
 			redir_bucket = getTextFromXml(response['data'], ".//Bucket")
 			redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
 			self.set_hostname(redir_bucket, redir_hostname)
89a93383
 			warning("Redirected to: %s" % (redir_hostname))
4396d217
 			return self.recv_file(request, stream, labels)
bb6d07ee
 
f81e7fba
 		if response["status"] < 200 or response["status"] > 299:
 			raise S3Error(response)
 
9197e62e
 		if start_position == 0:
 			# Only compute MD5 on the fly if we're downloading from beginning
 			# Otherwise we'd get a nonsense.
0b2aefe3
 			md5_hash = md5()
9197e62e
 		size_left = int(response["headers"]["content-length"])
 		size_total = start_position + size_left
 		current_position = start_position
 
2d7ceec9
 		if self.config.progress_meter:
 			progress.total_size = size_total
9197e62e
 			progress.initial_position = current_position
 			progress.current_position = current_position
 
 		try:
 			while (current_position < size_total):
 				this_chunk = size_left > self.config.recv_chunk and self.config.recv_chunk or size_left
 				data = http_response.read(this_chunk)
 				stream.write(data)
 				if start_position == 0:
 					md5_hash.update(data)
 				current_position += len(data)
 				## Call progress meter from here...
 				if self.config.progress_meter:
 					progress.update(delta_position = len(data))
 			conn.close()
 		except Exception, e:
2d7ceec9
 			if self.config.progress_meter:
9197e62e
 				progress.done("failed")
 			if retries:
 				warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
 				warning("Waiting %d sec..." % self._fail_wait(retries))
 				time.sleep(self._fail_wait(retries))
 				# Connection error -> same throttle value
4396d217
 				return self.recv_file(request, stream, labels, current_position, retries - 1)
2d7ceec9
 			else:
9197e62e
 				raise S3DownloadError("Download failed for: %s" % resource['uri'])
 
 		stream.flush()
63ba9974
 		timestamp_end = time.time()
9197e62e
 
25f6f8c9
 		if self.config.progress_meter:
 			## The above stream.flush() may take some time -> update() progress meter
 			## to correct the average speed. Otherwise people will complain that 
 			## 'progress' and response["speed"] are inconsistent ;-)
 			progress.update()
 			progress.done("done")
 
9197e62e
 		if start_position == 0:
 			# Only compute MD5 on the fly if we were downloading from the beginning
 			response["md5"] = md5_hash.hexdigest()
 		else:
 			# Otherwise try to compute MD5 of the output file
 			try:
 				response["md5"] = hash_file_md5(stream.name)
 			except IOError, e:
 				if e.errno != errno.ENOENT:
 					warning("Unable to open file: %s: %s" % (stream.name, e))
 				warning("Unable to verify MD5. Assume it matches.")
 				response["md5"] = response["headers"]["etag"]
 
f81e7fba
 		response["md5match"] = response["headers"]["etag"].find(response["md5"]) >= 0
63ba9974
 		response["elapsed"] = timestamp_end - timestamp_start
9197e62e
 		response["size"] = current_position
711ec4a2
 		response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
9197e62e
 		if response["size"] != start_position + long(response["headers"]["content-length"]):
f98a27f2
 			warning("Reported size (%s) does not match received size (%s)" % (
9197e62e
 				start_position + response["headers"]["content-length"], response["size"]))
f81e7fba
 		debug("ReceiveFile: Computed MD5 = %s" % response["md5"])
 		if not response["md5match"]:
 			warning("MD5 signatures do not match: computed=%s, received=%s" % (
 				response["md5"], response["headers"]["etag"]))
 		return response
 
 	def sign_headers(self, method, resource, headers):
 		h  = method+"\n"
 		h += headers.get("content-md5", "")+"\n"
 		h += headers.get("content-type", "")+"\n"
 		h += headers.get("date", "")+"\n"
 		for header in headers.keys():
 			if header.startswith("x-amz-"):
 				h += header+":"+str(headers[header])+"\n"
dc758146
 		if resource['bucket']:
 			h += "/" + resource['bucket']
 		h += resource['uri']
f4555c39
 		debug("SignHeaders: " + repr(h))
0b8ea559
 		return sign_string(h)
f81e7fba
 
91a18f4f
 	@staticmethod
 	def check_bucket_name(bucket, dns_strict = True):
97233238
 		if dns_strict:
 			invalid = re.search("([^a-z0-9\.-])", bucket)
 			if invalid:
 				raise ParameterError("Bucket name '%s' contains disallowed character '%s'. The only supported ones are: lowercase us-ascii letters (a-z), digits (0-9), dot (.) and hyphen (-)." % (bucket, invalid.groups()[0]))
 		else:
 			invalid = re.search("([^A-Za-z0-9\._-])", bucket)
 			if invalid:
 				raise ParameterError("Bucket name '%s' contains disallowed character '%s'. The only supported ones are: us-ascii letters (a-z, A-Z), digits (0-9), dot (.), hyphen (-) and underscore (_)." % (bucket, invalid.groups()[0]))
 
f81e7fba
 		if len(bucket) < 3:
 			raise ParameterError("Bucket name '%s' is too short (min 3 characters)" % bucket)
 		if len(bucket) > 255:
 			raise ParameterError("Bucket name '%s' is too long (max 255 characters)" % bucket)
97233238
 		if dns_strict:
 			if len(bucket) > 63:
 				raise ParameterError("Bucket name '%s' is too long (max 63 characters)" % bucket)
 			if re.search("-\.", bucket):
 				raise ParameterError("Bucket name '%s' must not contain sequence '-.' for DNS compatibility" % bucket)
 			if re.search("\.\.", bucket):
 				raise ParameterError("Bucket name '%s' must not contain sequence '..' for DNS compatibility" % bucket)
 			if not re.search("^[0-9a-z]", bucket):
 				raise ParameterError("Bucket name '%s' must start with a letter or a digit" % bucket)
 			if not re.search("[0-9a-z]$", bucket):
 				raise ParameterError("Bucket name '%s' must end with a letter or a digit" % bucket)
f81e7fba
 		return True
 
91a18f4f
 	@staticmethod
 	def check_bucket_name_dns_conformity(bucket):
97233238
 		try:
91a18f4f
 			return S3.check_bucket_name(bucket, dns_strict = True)
97233238
 		except ParameterError:
 			return False