S3/S3.py
ec50b5a7
 ## Amazon S3 manager
 ## Author: Michal Ludvig <michal@logix.cz>
 ##         http://www.logix.cz/michal
 ## License: GPL Version 2
 
f98a27f2
 import sys
f81e7fba
 import os, os.path
 import base64
946e9636
 import time
f81e7fba
 import httplib
 import logging
eb31131e
 import mimetypes
f81e7fba
 from logging import debug, info, warning, error
 from stat import ST_SIZE
 
0b2aefe3
 try:
 	from hashlib import md5, sha1
 except ImportError:
3c682455
 	from md5 import md5
6fa688fa
 	import sha as sha1
0b2aefe3
 import hmac
 
f81e7fba
 from Utils import *
 from SortedDict import SortedDict
 from BidirMap import BidirMap
b008e471
 from Config import Config
6bb385f2
 from Exceptions import *
7f50f846
 from ACL import ACL
f81e7fba
 
eb9c54ec
 class S3(object):
f81e7fba
 	http_methods = BidirMap(
 		GET = 0x01,
 		PUT = 0x02,
 		HEAD = 0x04,
 		DELETE = 0x08,
 		MASK = 0x0F,
 		)
 	
 	targets = BidirMap(
 		SERVICE = 0x0100,
 		BUCKET = 0x0200,
 		OBJECT = 0x0400,
 		MASK = 0x0700,
 		)
 
 	operations = BidirMap(
 		UNDFINED = 0x0000,
 		LIST_ALL_BUCKETS = targets["SERVICE"] | http_methods["GET"],
 		BUCKET_CREATE = targets["BUCKET"] | http_methods["PUT"],
 		BUCKET_LIST = targets["BUCKET"] | http_methods["GET"],
 		BUCKET_DELETE = targets["BUCKET"] | http_methods["DELETE"],
 		OBJECT_PUT = targets["OBJECT"] | http_methods["PUT"],
 		OBJECT_GET = targets["OBJECT"] | http_methods["GET"],
 		OBJECT_HEAD = targets["OBJECT"] | http_methods["HEAD"],
 		OBJECT_DELETE = targets["OBJECT"] | http_methods["DELETE"],
 	)
 
 	codes = {
 		"NoSuchBucket" : "Bucket '%s' does not exist",
 		"AccessDenied" : "Access to bucket '%s' was denied",
 		"BucketAlreadyExists" : "Bucket '%s' already exists",
 		}
 
bb6d07ee
 	## S3 sometimes sends HTTP-307 response 
 	redir_map = {}
 
946e9636
 	## Maximum attempts of re-issuing failed requests
 	_max_retries = 5
 
f81e7fba
 	def __init__(self, config):
 		self.config = config
 
dc758146
 	def get_connection(self, bucket):
afe194f8
 		if self.config.proxy_host != "":
 			return httplib.HTTPConnection(self.config.proxy_host, self.config.proxy_port)
 		else:
dc758146
 			if self.config.use_https:
 				return httplib.HTTPSConnection(self.get_hostname(bucket))
 			else:
 				return httplib.HTTPConnection(self.get_hostname(bucket))
afe194f8
 
dc758146
 	def get_hostname(self, bucket):
97233238
 		if bucket and self.check_bucket_name_dns_conformity(bucket):
bb6d07ee
 			if self.redir_map.has_key(bucket):
 				host = self.redir_map[bucket]
 			else:
 				host = self.config.host_bucket % { 'bucket' : bucket }
dc758146
 		else:
 			host = self.config.host_base
bb6d07ee
 		debug('get_hostname(%s): %s' % (bucket, host))
dc758146
 		return host
afe194f8
 
bb6d07ee
 	def set_hostname(self, bucket, redir_hostname):
 		self.redir_map[bucket] = redir_hostname
 
dc758146
 	def format_uri(self, resource):
97233238
 		if resource['bucket'] and not self.check_bucket_name_dns_conformity(resource['bucket']):
a208e977
 			uri = "/%s%s" % (resource['bucket'], resource['uri'])
dc758146
 		else:
 			uri = resource['uri']
a208e977
 		if self.config.proxy_host != "":
 			uri = "http://%s%s" % (self.get_hostname(resource['bucket']), uri)
dc758146
 		debug('format_uri(): ' + uri)
 		return uri
afe194f8
 
ec50b5a7
 	## Commands / Actions
f81e7fba
 	def list_all_buckets(self):
 		request = self.create_request("LIST_ALL_BUCKETS")
 		response = self.send_request(request)
 		response["list"] = getListFromXml(response["data"], "Bucket")
 		return response
 	
416741b2
 	def bucket_list(self, bucket, prefix = None, recursive = None):
d94adea9
 		def _list_truncated(data):
ac9940ec
 			## <IsTruncated> can either be "true" or "false" or be missing completely
 			is_truncated = getTextFromXml(data, ".//IsTruncated") or "false"
 			return is_truncated.lower() != "false"
d94adea9
 
 		def _get_contents(data):
 			return getListFromXml(data, "Contents")
 
36cfce67
 		def _get_common_prefixes(data):
 			return getListFromXml(data, "CommonPrefixes")
 
 		uri_params = {}
 		if prefix:
 			uri_params['prefix'] = self.urlencode_string(prefix)
416741b2
 		if not self.config.recursive and not recursive:
36cfce67
 			uri_params['delimiter'] = "/"
 		request = self.create_request("BUCKET_LIST", bucket = bucket, **uri_params)
f81e7fba
 		response = self.send_request(request)
0d91ff3f
 		#debug(response)
d94adea9
 		list = _get_contents(response["data"])
36cfce67
 		prefixes = _get_common_prefixes(response["data"])
d94adea9
 		while _list_truncated(response["data"]):
36cfce67
 			uri_params['marker'] = self.urlencode_string(list[-1]["Key"])
416741b2
 			debug("Listing continues after '%s'" % uri_params['marker'])
36cfce67
 			request = self.create_request("BUCKET_LIST", bucket = bucket, **uri_params)
d94adea9
 			response = self.send_request(request)
 			list += _get_contents(response["data"])
36cfce67
 			prefixes += _get_common_prefixes(response["data"])
d94adea9
 		response['list'] = list
36cfce67
 		response['common_prefixes'] = prefixes
f81e7fba
 		return response
 
dc758146
 	def bucket_create(self, bucket, bucket_location = None):
afe194f8
 		headers = SortedDict()
dc758146
 		body = ""
 		if bucket_location and bucket_location.strip().upper() != "US":
 			body  = "<CreateBucketConfiguration><LocationConstraint>"
 			body += bucket_location.strip().upper()
 			body += "</LocationConstraint></CreateBucketConfiguration>"
 			debug("bucket_location: " + body)
4716b40d
 			self.check_bucket_name(bucket, dns_strict = True)
 		else:
 			self.check_bucket_name(bucket, dns_strict = False)
dc758146
 		headers["content-length"] = len(body)
7143ef4f
 		if self.config.acl_public:
 			headers["x-amz-acl"] = "public-read"
afe194f8
 		request = self.create_request("BUCKET_CREATE", bucket = bucket, headers = headers)
dc758146
 		response = self.send_request(request, body)
f81e7fba
 		return response
 
 	def bucket_delete(self, bucket):
 		request = self.create_request("BUCKET_DELETE", bucket = bucket)
 		response = self.send_request(request)
 		return response
 
e5c6f6c5
 	def bucket_info(self, uri):
 		request = self.create_request("BUCKET_LIST", bucket = uri.bucket(), extra = "?location")
82157846
 		response = self.send_request(request)
dc758146
 		response['bucket-location'] = getTextFromXml(response['data'], "LocationConstraint") or "any"
82157846
 		return response
 
688964d7
 	def object_put(self, filename, uri, extra_headers = None, extra_label = ""):
fc8a5df8
 		# TODO TODO
 		# Make it consistent with stream-oriented object_get()
 		if uri.type != "s3":
 			raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
 
f81e7fba
 		if not os.path.isfile(filename):
82d9eafa
 			raise InvalidFileError(u"%s is not a regular file" % unicodise(filename))
f81e7fba
 		try:
37a8ad44
 			file = open(filename, "rb")
f81e7fba
 			size = os.stat(filename)[ST_SIZE]
 		except IOError, e:
82d9eafa
 			raise InvalidFileError(u"%s: %s" % (unicodise(filename), e.strerror))
f81e7fba
 		headers = SortedDict()
8ec1807f
 		if extra_headers:
 			headers.update(extra_headers)
f81e7fba
 		headers["content-length"] = size
eb31131e
 		content_type = None
 		if self.config.guess_mime_type:
 			content_type = mimetypes.guess_type(filename)[0]
 		if not content_type:
 			content_type = self.config.default_mime_type
 		debug("Content-Type set to '%s'" % content_type)
 		headers["content-type"] = content_type
9b7618ae
 		if self.config.acl_public:
9081133d
 			headers["x-amz-acl"] = "public-read"
fc8a5df8
 		request = self.create_request("OBJECT_PUT", uri = uri, headers = headers)
82d9eafa
 		labels = { 'source' : unicodise(filename), 'destination' : unicodise(uri.uri()), 'extra' : extra_label }
4396d217
 		response = self.send_file(request, file, labels)
f81e7fba
 		return response
 
688964d7
 	def object_get(self, uri, stream, start_position = 0, extra_label = ""):
ed27a45e
 		if uri.type != "s3":
 			raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
fc8a5df8
 		request = self.create_request("OBJECT_GET", uri = uri)
3894a49a
 		labels = { 'source' : unicodise(uri.uri()), 'destination' : unicodise(stream.name), 'extra' : extra_label }
4396d217
 		response = self.recv_file(request, stream, labels, start_position)
f81e7fba
 		return response
ed27a45e
 
fc8a5df8
 	def object_delete(self, uri):
b819c70c
 		if uri.type != "s3":
 			raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
fc8a5df8
 		request = self.create_request("OBJECT_DELETE", uri = uri)
 		response = self.send_request(request)
 		return response
b819c70c
 
7d0ac8ee
 	def object_copy(self, src_uri, dst_uri, extra_headers = None):
 		if src_uri.type != "s3":
 			raise ValueError("Expected URI type 's3', got '%s'" % src_uri.type)
 		if dst_uri.type != "s3":
 			raise ValueError("Expected URI type 's3', got '%s'" % dst_uri.type)
 		headers = SortedDict()
 		headers['x-amz-copy-source'] = "/%s/%s" % (src_uri.bucket(), self.urlencode_string(src_uri.object()))
 		if self.config.acl_public:
 			headers["x-amz-acl"] = "public-read"
 		if extra_headers:
 			headers.update(extra_headers)
 		request = self.create_request("OBJECT_PUT", uri = dst_uri, headers = headers)
 		response = self.send_request(request)
 		return response
 
7d61be89
 	def object_move(self, src_uri, dst_uri, extra_headers = None):
 		response_copy = self.object_copy(src_uri, dst_uri, extra_headers)
 		debug("Object %s copied to %s" % (src_uri, dst_uri))
 		if getRootTagName(response_copy["data"]) == "CopyObjectResult":
 			response_delete = self.object_delete(src_uri)
 			debug("Object %s deleted" % src_uri)
 		return response_copy
 
e5c6f6c5
 	def object_info(self, uri):
fc8a5df8
 		request = self.create_request("OBJECT_HEAD", uri = uri)
e5c6f6c5
 		response = self.send_request(request)
 		return response
 
 	def get_acl(self, uri):
 		if uri.has_object():
fc8a5df8
 			request = self.create_request("OBJECT_GET", uri = uri, extra = "?acl")
e5c6f6c5
 		else:
 			request = self.create_request("BUCKET_LIST", bucket = uri.bucket(), extra = "?acl")
7f50f846
 
e5c6f6c5
 		response = self.send_request(request)
7f50f846
 		acl = ACL(response['data'])
90137a39
 		return acl
e5c6f6c5
 
585c735a
 	def set_acl(self, uri, acl):
 		if uri.has_object():
 			request = self.create_request("OBJECT_PUT", uri = uri, extra = "?acl")
 		else:
 			request = self.create_request("BUCKET_CREATE", bucket = uri.bucket(), extra = "?acl")
 
 		body = str(acl)
 		debug(u"set_acl(%s): acl-xml: %s" % (uri, body))
 		response = self.send_request(request, body)
 		return response
 
ec50b5a7
 	## Low level methods
c0e0c042
 	def urlencode_string(self, string):
66528933
 		if type(string) == unicode:
 			string = string.encode("utf-8")
c0e0c042
 		encoded = ""
 		## List of characters that must be escaped for S3
 		## Haven't found this in any official docs
 		## but my tests show it's more less correct.
 		## If you start getting InvalidSignature errors
 		## from S3 check the error headers returned
 		## from S3 to see whether the list hasn't
 		## changed.
 		for c in string:	# I'm not sure how to know in what encoding 
 					# 'object' is. Apparently "type(object)==str"
 					# but the contents is a string of unicode
 					# bytes, e.g. '\xc4\x8d\xc5\xafr\xc3\xa1k'
 					# Don't know what it will do on non-utf8 
 					# systems.
 					#           [hope that sounds reassuring ;-)]
 			o = ord(c)
 			if (o <= 32 or		# Space and below
 			    o == 0x22 or	# "
 			    o == 0x23 or	# #
 			    o == 0x25 or	# %
 			    o == 0x2B or	# + (or it would become <space>)
 			    o == 0x3C or	# <
 			    o == 0x3E or	# >
 			    o == 0x3F or	# ?
 			    o == 0x5B or	# [
 			    o == 0x5C or	# \
 			    o == 0x5D or	# ]
 			    o == 0x5E or	# ^
 			    o == 0x60 or	# `
 			    o >= 123):   	# { and above, including >= 128 for UTF-8
 				encoded += "%%%02X" % o
 			else:
 				encoded += c
 		debug("String '%s' encoded to '%s'" % (string, encoded))
 		return encoded
 
fc8a5df8
 	def create_request(self, operation, uri = None, bucket = None, object = None, headers = None, extra = None, **params):
dc758146
 		resource = { 'bucket' : None, 'uri' : "/" }
fc8a5df8
 
 		if uri and (bucket or object):
 			raise ValueError("Both 'uri' and either 'bucket' or 'object' parameters supplied")
 		## If URI is given use that instead of bucket/object parameters
 		if uri:
 			bucket = uri.bucket()
 			object = uri.has_object() and uri.object() or None
 
f81e7fba
 		if bucket:
dc758146
 			resource['bucket'] = str(bucket)
f81e7fba
 			if object:
dc758146
 				resource['uri'] = "/" + self.urlencode_string(object)
 		if extra:
 			resource['uri'] += extra
f81e7fba
 
 		if not headers:
 			headers = SortedDict()
 
 		if headers.has_key("date"):
 			if not headers.has_key("x-amz-date"):
 				headers["x-amz-date"] = headers["date"]
 			del(headers["date"])
 		
 		if not headers.has_key("x-amz-date"):
c4761f22
 			headers["x-amz-date"] = time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime())
f81e7fba
 
 		method_string = S3.http_methods.getkey(S3.operations[operation] & S3.http_methods["MASK"])
 		signature = self.sign_headers(method_string, resource, headers)
 		headers["Authorization"] = "AWS "+self.config.access_key+":"+signature
f4555c39
 		param_str = ""
 		for param in params:
 			if params[param] not in (None, ""):
 				param_str += "&%s=%s" % (param, params[param])
82157846
 			else:
 				param_str += "&%s" % param
f4555c39
 		if param_str != "":
dc758146
 			resource['uri'] += "?" + param_str[1:]
 		debug("CreateRequest: resource[uri]=" + resource['uri'])
f81e7fba
 		return (method_string, resource, headers)
 	
946e9636
 	def _fail_wait(self, retries):
 		# Wait a few seconds. The more it fails the more we wait.
 		return (self._max_retries - retries + 1) * 3
 		
 	def send_request(self, request, body = None, retries = _max_retries):
f81e7fba
 		method_string, resource, headers = request
89a93383
 		debug("Processing request, please wait...")
00f5f67e
 		try:
 			conn = self.get_connection(resource['bucket'])
 			conn.request(method_string, self.format_uri(resource), body, headers)
 			response = {}
 			http_response = conn.getresponse()
 			response["status"] = http_response.status
 			response["reason"] = http_response.reason
 			response["headers"] = convertTupleListToDict(http_response.getheaders())
 			response["data"] =  http_response.read()
 			debug("Response: " + str(response))
 			conn.close()
f228b03f
 		except Exception, e:
00f5f67e
 			if retries:
f228b03f
 				warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
946e9636
 				warning("Waiting %d sec..." % self._fail_wait(retries))
 				time.sleep(self._fail_wait(retries))
00f5f67e
 				return self.send_request(request, body, retries - 1)
 			else:
 				raise S3RequestError("Request failed for: %s" % resource['uri'])
bb6d07ee
 
 		if response["status"] == 307:
 			## RedirectPermanent
 			redir_bucket = getTextFromXml(response['data'], ".//Bucket")
 			redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
 			self.set_hostname(redir_bucket, redir_hostname)
89a93383
 			warning("Redirected to: %s" % (redir_hostname))
bb6d07ee
 			return self.send_request(request, body)
 
f228b03f
 		if response["status"] >= 500:
 			e = S3Error(response)
00f5f67e
 			if retries:
f228b03f
 				warning(u"Retrying failed request: %s" % resource['uri'])
 				warning(unicode(e))
946e9636
 				warning("Waiting %d sec..." % self._fail_wait(retries))
 				time.sleep(self._fail_wait(retries))
00f5f67e
 				return self.send_request(request, body, retries - 1)
 			else:
f228b03f
 				raise e
 
 		if response["status"] < 200 or response["status"] > 299:
 			raise S3Error(response)
 
f81e7fba
 		return response
 
4396d217
 	def send_file(self, request, file, labels, throttle = 0, retries = _max_retries):
f81e7fba
 		method_string, resource, headers = request
2d7ceec9
 		size_left = size_total = headers.get("content-length")
 		if self.config.progress_meter:
4396d217
 			progress = self.config.progress_class(labels, size_total)
2d7ceec9
 		else:
 			info("Sending file '%s', please wait..." % file.name)
 		timestamp_start = time.time()
946e9636
 		try:
 			conn = self.get_connection(resource['bucket'])
 			conn.connect()
 			conn.putrequest(method_string, self.format_uri(resource))
 			for header in headers.keys():
 				conn.putheader(header, str(headers[header]))
 			conn.endheaders()
 		except Exception, e:
9197e62e
 			if self.config.progress_meter:
 				progress.done("failed")
946e9636
 			if retries:
 				warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
 				warning("Waiting %d sec..." % self._fail_wait(retries))
 				time.sleep(self._fail_wait(retries))
 				# Connection error -> same throttle value
4396d217
 				return self.send_file(request, file, labels, throttle, retries - 1)
946e9636
 			else:
9197e62e
 				raise S3UploadError("Upload failed for: %s" % resource['uri'])
bb6d07ee
 		file.seek(0)
0b2aefe3
 		md5_hash = md5()
946e9636
 		try:
 			while (size_left > 0):
d56ec482
 				#debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name))
946e9636
 				data = file.read(self.config.send_chunk)
 				md5_hash.update(data)
9197e62e
 				conn.send(data)
2d7ceec9
 				if self.config.progress_meter:
946e9636
 					progress.update(delta_position = len(data))
 				size_left -= len(data)
 				if throttle:
 					time.sleep(throttle)
 			md5_computed = md5_hash.hexdigest()
 			response = {}
 			http_response = conn.getresponse()
 			response["status"] = http_response.status
 			response["reason"] = http_response.reason
 			response["headers"] = convertTupleListToDict(http_response.getheaders())
 			response["data"] = http_response.read()
 			response["size"] = size_total
 			conn.close()
d56ec482
 			debug(u"Response: %s" % response)
946e9636
 		except Exception, e:
 			if self.config.progress_meter:
 				progress.done("failed")
 			if retries:
 				throttle = throttle and throttle * 5 or 0.01
9197e62e
 				warning("Upload failed: %s (%s)" % (resource['uri'], e))
946e9636
 				warning("Retrying on lower speed (throttle=%0.2f)" % throttle)
 				warning("Waiting %d sec..." % self._fail_wait(retries))
 				time.sleep(self._fail_wait(retries))
 				# Connection error -> same throttle value
4396d217
 				return self.send_file(request, file, labels, throttle, retries - 1)
946e9636
 			else:
 				debug("Giving up on '%s' %s" % (file.name, e))
9197e62e
 				raise S3UploadError("Upload failed for: %s" % resource['uri'])
bb6d07ee
 
2d7ceec9
 		timestamp_end = time.time()
 		response["elapsed"] = timestamp_end - timestamp_start
 		response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
 
 		if self.config.progress_meter:
 			## The above conn.close() takes some time -> update() progress meter
 			## to correct the average speed. Otherwise people will complain that 
 			## 'progress' and response["speed"] are inconsistent ;-)
 			progress.update()
 			progress.done("done")
 
bb6d07ee
 		if response["status"] == 307:
 			## RedirectPermanent
 			redir_bucket = getTextFromXml(response['data'], ".//Bucket")
 			redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
 			self.set_hostname(redir_bucket, redir_hostname)
89a93383
 			warning("Redirected to: %s" % (redir_hostname))
4396d217
 			return self.send_file(request, file, labels)
bb6d07ee
 
493c0724
 		# S3 from time to time doesn't send ETag back in a response :-(
 		# Force re-upload here.
 		if not response['headers'].has_key('etag'):
 			response['headers']['etag'] = '' 
 
d56ec482
 		if response["status"] < 200 or response["status"] > 299:
 			if response["status"] >= 500:
 				## AWS internal error - retry
 				if retries:
 					warning("Upload failed: %s (%s)" % (resource['uri'], S3Error(response)))
 					warning("Waiting %d sec..." % self._fail_wait(retries))
 					time.sleep(self._fail_wait(retries))
 					return self.send_file(request, file, labels, throttle, retries - 1)
 				else:
 					warning("Too many failures. Giving up on '%s'" % (file.name))
 					raise S3UploadError
 			## Non-recoverable error
 			raise S3Error(response)
 
e7906cf0
 		debug("MD5 sums: computed=%s, received=%s" % (md5_computed, response["headers"]["etag"]))
 		if response["headers"]["etag"].strip('"\'') != md5_hash.hexdigest():
 			warning("MD5 Sums don't match!")
 			if retries:
89a93383
 				warning("Retrying upload of %s" % (file.name))
4396d217
 				return self.send_file(request, file, labels, throttle, retries - 1)
e7906cf0
 			else:
89a93383
 				warning("Too many failures. Giving up on '%s'" % (file.name))
e7906cf0
 				raise S3UploadError
 
f81e7fba
 		return response
 
4396d217
 	def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries):
f81e7fba
 		method_string, resource, headers = request
2d7ceec9
 		if self.config.progress_meter:
4396d217
 			progress = self.config.progress_class(labels, 0)
2d7ceec9
 		else:
 			info("Receiving file '%s', please wait..." % stream.name)
 		timestamp_start = time.time()
9197e62e
 		try:
 			conn = self.get_connection(resource['bucket'])
 			conn.connect()
 			conn.putrequest(method_string, self.format_uri(resource))
 			for header in headers.keys():
 				conn.putheader(header, str(headers[header]))
 			if start_position > 0:
 				debug("Requesting Range: %d .. end" % start_position)
 				conn.putheader("Range", "bytes=%d-" % start_position)
 			conn.endheaders()
 			response = {}
 			http_response = conn.getresponse()
 			response["status"] = http_response.status
 			response["reason"] = http_response.reason
 			response["headers"] = convertTupleListToDict(http_response.getheaders())
 			debug("Response: %s" % response)
 		except Exception, e:
 			if self.config.progress_meter:
 				progress.done("failed")
 			if retries:
 				warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
 				warning("Waiting %d sec..." % self._fail_wait(retries))
 				time.sleep(self._fail_wait(retries))
 				# Connection error -> same throttle value
4396d217
 				return self.recv_file(request, stream, labels, start_position, retries - 1)
9197e62e
 			else:
 				raise S3DownloadError("Download failed for: %s" % resource['uri'])
bb6d07ee
 
 		if response["status"] == 307:
 			## RedirectPermanent
 			response['data'] = http_response.read()
 			redir_bucket = getTextFromXml(response['data'], ".//Bucket")
 			redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
 			self.set_hostname(redir_bucket, redir_hostname)
89a93383
 			warning("Redirected to: %s" % (redir_hostname))
4396d217
 			return self.recv_file(request, stream, labels)
bb6d07ee
 
f81e7fba
 		if response["status"] < 200 or response["status"] > 299:
 			raise S3Error(response)
 
9197e62e
 		if start_position == 0:
 			# Only compute MD5 on the fly if we're downloading from beginning
 			# Otherwise we'd get a nonsense.
0b2aefe3
 			md5_hash = md5()
9197e62e
 		size_left = int(response["headers"]["content-length"])
 		size_total = start_position + size_left
 		current_position = start_position
 
2d7ceec9
 		if self.config.progress_meter:
 			progress.total_size = size_total
9197e62e
 			progress.initial_position = current_position
 			progress.current_position = current_position
 
 		try:
 			while (current_position < size_total):
 				this_chunk = size_left > self.config.recv_chunk and self.config.recv_chunk or size_left
 				data = http_response.read(this_chunk)
 				stream.write(data)
 				if start_position == 0:
 					md5_hash.update(data)
 				current_position += len(data)
 				## Call progress meter from here...
 				if self.config.progress_meter:
 					progress.update(delta_position = len(data))
 			conn.close()
 		except Exception, e:
2d7ceec9
 			if self.config.progress_meter:
9197e62e
 				progress.done("failed")
 			if retries:
 				warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
 				warning("Waiting %d sec..." % self._fail_wait(retries))
 				time.sleep(self._fail_wait(retries))
 				# Connection error -> same throttle value
4396d217
 				return self.recv_file(request, stream, labels, current_position, retries - 1)
2d7ceec9
 			else:
9197e62e
 				raise S3DownloadError("Download failed for: %s" % resource['uri'])
 
 		stream.flush()
63ba9974
 		timestamp_end = time.time()
9197e62e
 
25f6f8c9
 		if self.config.progress_meter:
 			## The above stream.flush() may take some time -> update() progress meter
 			## to correct the average speed. Otherwise people will complain that 
 			## 'progress' and response["speed"] are inconsistent ;-)
 			progress.update()
 			progress.done("done")
 
9197e62e
 		if start_position == 0:
 			# Only compute MD5 on the fly if we were downloading from the beginning
 			response["md5"] = md5_hash.hexdigest()
 		else:
 			# Otherwise try to compute MD5 of the output file
 			try:
 				response["md5"] = hash_file_md5(stream.name)
 			except IOError, e:
 				if e.errno != errno.ENOENT:
 					warning("Unable to open file: %s: %s" % (stream.name, e))
 				warning("Unable to verify MD5. Assume it matches.")
 				response["md5"] = response["headers"]["etag"]
 
f81e7fba
 		response["md5match"] = response["headers"]["etag"].find(response["md5"]) >= 0
63ba9974
 		response["elapsed"] = timestamp_end - timestamp_start
9197e62e
 		response["size"] = current_position
711ec4a2
 		response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
9197e62e
 		if response["size"] != start_position + long(response["headers"]["content-length"]):
f98a27f2
 			warning("Reported size (%s) does not match received size (%s)" % (
9197e62e
 				start_position + response["headers"]["content-length"], response["size"]))
f81e7fba
 		debug("ReceiveFile: Computed MD5 = %s" % response["md5"])
 		if not response["md5match"]:
 			warning("MD5 signatures do not match: computed=%s, received=%s" % (
 				response["md5"], response["headers"]["etag"]))
 		return response
 
 	def sign_headers(self, method, resource, headers):
 		h  = method+"\n"
 		h += headers.get("content-md5", "")+"\n"
 		h += headers.get("content-type", "")+"\n"
 		h += headers.get("date", "")+"\n"
 		for header in headers.keys():
 			if header.startswith("x-amz-"):
 				h += header+":"+str(headers[header])+"\n"
dc758146
 		if resource['bucket']:
 			h += "/" + resource['bucket']
 		h += resource['uri']
f4555c39
 		debug("SignHeaders: " + repr(h))
0b2aefe3
 		return base64.encodestring(hmac.new(self.config.secret_key, h, sha1).digest()).strip()
f81e7fba
 
91a18f4f
 	@staticmethod
 	def check_bucket_name(bucket, dns_strict = True):
97233238
 		if dns_strict:
 			invalid = re.search("([^a-z0-9\.-])", bucket)
 			if invalid:
 				raise ParameterError("Bucket name '%s' contains disallowed character '%s'. The only supported ones are: lowercase us-ascii letters (a-z), digits (0-9), dot (.) and hyphen (-)." % (bucket, invalid.groups()[0]))
 		else:
 			invalid = re.search("([^A-Za-z0-9\._-])", bucket)
 			if invalid:
 				raise ParameterError("Bucket name '%s' contains disallowed character '%s'. The only supported ones are: us-ascii letters (a-z, A-Z), digits (0-9), dot (.), hyphen (-) and underscore (_)." % (bucket, invalid.groups()[0]))
 
f81e7fba
 		if len(bucket) < 3:
 			raise ParameterError("Bucket name '%s' is too short (min 3 characters)" % bucket)
 		if len(bucket) > 255:
 			raise ParameterError("Bucket name '%s' is too long (max 255 characters)" % bucket)
97233238
 		if dns_strict:
 			if len(bucket) > 63:
 				raise ParameterError("Bucket name '%s' is too long (max 63 characters)" % bucket)
 			if re.search("-\.", bucket):
 				raise ParameterError("Bucket name '%s' must not contain sequence '-.' for DNS compatibility" % bucket)
 			if re.search("\.\.", bucket):
 				raise ParameterError("Bucket name '%s' must not contain sequence '..' for DNS compatibility" % bucket)
 			if not re.search("^[0-9a-z]", bucket):
 				raise ParameterError("Bucket name '%s' must start with a letter or a digit" % bucket)
 			if not re.search("[0-9a-z]$", bucket):
 				raise ParameterError("Bucket name '%s' must end with a letter or a digit" % bucket)
f81e7fba
 		return True
 
91a18f4f
 	@staticmethod
 	def check_bucket_name_dns_conformity(bucket):
97233238
 		try:
91a18f4f
 			return S3.check_bucket_name(bucket, dns_strict = True)
97233238
 		except ParameterError:
 			return False