ec50b5a7 |
## Amazon S3 manager
## Author: Michal Ludvig <michal@logix.cz>
## http://www.logix.cz/michal
## License: GPL Version 2
|
f98a27f2 |
import sys |
f81e7fba |
import os, os.path
import base64 |
946e9636 |
import time |
f81e7fba |
import httplib
import logging |
eb31131e |
import mimetypes |
f81e7fba |
from logging import debug, info, warning, error
from stat import ST_SIZE
|
0b2aefe3 |
try:
from hashlib import md5, sha1
except ImportError: |
3c682455 |
from md5 import md5 |
6fa688fa |
import sha as sha1 |
0b2aefe3 |
import hmac
|
f81e7fba |
from Utils import *
from SortedDict import SortedDict
from BidirMap import BidirMap |
b008e471 |
from Config import Config |
6bb385f2 |
from Exceptions import * |
7f50f846 |
from ACL import ACL |
f81e7fba |
|
eb9c54ec |
class S3(object): |
f81e7fba |
http_methods = BidirMap(
GET = 0x01,
PUT = 0x02,
HEAD = 0x04,
DELETE = 0x08,
MASK = 0x0F,
)
targets = BidirMap(
SERVICE = 0x0100,
BUCKET = 0x0200,
OBJECT = 0x0400,
MASK = 0x0700,
)
operations = BidirMap(
UNDFINED = 0x0000,
LIST_ALL_BUCKETS = targets["SERVICE"] | http_methods["GET"],
BUCKET_CREATE = targets["BUCKET"] | http_methods["PUT"],
BUCKET_LIST = targets["BUCKET"] | http_methods["GET"],
BUCKET_DELETE = targets["BUCKET"] | http_methods["DELETE"],
OBJECT_PUT = targets["OBJECT"] | http_methods["PUT"],
OBJECT_GET = targets["OBJECT"] | http_methods["GET"],
OBJECT_HEAD = targets["OBJECT"] | http_methods["HEAD"],
OBJECT_DELETE = targets["OBJECT"] | http_methods["DELETE"],
)
codes = {
"NoSuchBucket" : "Bucket '%s' does not exist",
"AccessDenied" : "Access to bucket '%s' was denied",
"BucketAlreadyExists" : "Bucket '%s' already exists",
}
|
bb6d07ee |
## S3 sometimes sends HTTP-307 response
redir_map = {}
|
946e9636 |
## Maximum attempts of re-issuing failed requests
_max_retries = 5
|
f81e7fba |
def __init__(self, config):
self.config = config
|
dc758146 |
def get_connection(self, bucket): |
afe194f8 |
if self.config.proxy_host != "":
return httplib.HTTPConnection(self.config.proxy_host, self.config.proxy_port)
else: |
dc758146 |
if self.config.use_https:
return httplib.HTTPSConnection(self.get_hostname(bucket))
else:
return httplib.HTTPConnection(self.get_hostname(bucket)) |
afe194f8 |
|
dc758146 |
def get_hostname(self, bucket): |
97233238 |
if bucket and self.check_bucket_name_dns_conformity(bucket): |
bb6d07ee |
if self.redir_map.has_key(bucket):
host = self.redir_map[bucket]
else:
host = self.config.host_bucket % { 'bucket' : bucket } |
dc758146 |
else:
host = self.config.host_base |
bb6d07ee |
debug('get_hostname(%s): %s' % (bucket, host)) |
dc758146 |
return host |
afe194f8 |
|
bb6d07ee |
def set_hostname(self, bucket, redir_hostname):
self.redir_map[bucket] = redir_hostname
|
dc758146 |
def format_uri(self, resource): |
97233238 |
if resource['bucket'] and not self.check_bucket_name_dns_conformity(resource['bucket']): |
a208e977 |
uri = "/%s%s" % (resource['bucket'], resource['uri']) |
dc758146 |
else:
uri = resource['uri'] |
a208e977 |
if self.config.proxy_host != "":
uri = "http://%s%s" % (self.get_hostname(resource['bucket']), uri) |
dc758146 |
debug('format_uri(): ' + uri)
return uri |
afe194f8 |
|
ec50b5a7 |
## Commands / Actions |
f81e7fba |
def list_all_buckets(self):
request = self.create_request("LIST_ALL_BUCKETS")
response = self.send_request(request)
response["list"] = getListFromXml(response["data"], "Bucket")
return response
|
416741b2 |
def bucket_list(self, bucket, prefix = None, recursive = None): |
d94adea9 |
def _list_truncated(data): |
ac9940ec |
## <IsTruncated> can either be "true" or "false" or be missing completely
is_truncated = getTextFromXml(data, ".//IsTruncated") or "false"
return is_truncated.lower() != "false" |
d94adea9 |
def _get_contents(data):
return getListFromXml(data, "Contents")
|
36cfce67 |
def _get_common_prefixes(data):
return getListFromXml(data, "CommonPrefixes")
uri_params = {}
if prefix:
uri_params['prefix'] = self.urlencode_string(prefix) |
416741b2 |
if not self.config.recursive and not recursive: |
36cfce67 |
uri_params['delimiter'] = "/"
request = self.create_request("BUCKET_LIST", bucket = bucket, **uri_params) |
f81e7fba |
response = self.send_request(request) |
0d91ff3f |
#debug(response) |
d94adea9 |
list = _get_contents(response["data"]) |
36cfce67 |
prefixes = _get_common_prefixes(response["data"]) |
d94adea9 |
while _list_truncated(response["data"]): |
36cfce67 |
uri_params['marker'] = self.urlencode_string(list[-1]["Key"]) |
416741b2 |
debug("Listing continues after '%s'" % uri_params['marker']) |
36cfce67 |
request = self.create_request("BUCKET_LIST", bucket = bucket, **uri_params) |
d94adea9 |
response = self.send_request(request)
list += _get_contents(response["data"]) |
36cfce67 |
prefixes += _get_common_prefixes(response["data"]) |
d94adea9 |
response['list'] = list |
36cfce67 |
response['common_prefixes'] = prefixes |
f81e7fba |
return response
|
dc758146 |
def bucket_create(self, bucket, bucket_location = None): |
afe194f8 |
headers = SortedDict() |
dc758146 |
body = ""
if bucket_location and bucket_location.strip().upper() != "US":
body = "<CreateBucketConfiguration><LocationConstraint>"
body += bucket_location.strip().upper()
body += "</LocationConstraint></CreateBucketConfiguration>"
debug("bucket_location: " + body) |
4716b40d |
self.check_bucket_name(bucket, dns_strict = True)
else:
self.check_bucket_name(bucket, dns_strict = False) |
dc758146 |
headers["content-length"] = len(body) |
7143ef4f |
if self.config.acl_public:
headers["x-amz-acl"] = "public-read" |
afe194f8 |
request = self.create_request("BUCKET_CREATE", bucket = bucket, headers = headers) |
dc758146 |
response = self.send_request(request, body) |
f81e7fba |
return response
def bucket_delete(self, bucket):
request = self.create_request("BUCKET_DELETE", bucket = bucket)
response = self.send_request(request)
return response
|
e5c6f6c5 |
def bucket_info(self, uri):
request = self.create_request("BUCKET_LIST", bucket = uri.bucket(), extra = "?location") |
82157846 |
response = self.send_request(request) |
dc758146 |
response['bucket-location'] = getTextFromXml(response['data'], "LocationConstraint") or "any" |
82157846 |
return response
|
688964d7 |
def object_put(self, filename, uri, extra_headers = None, extra_label = ""): |
fc8a5df8 |
# TODO TODO
# Make it consistent with stream-oriented object_get()
if uri.type != "s3":
raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
|
f81e7fba |
if not os.path.isfile(filename): |
82d9eafa |
raise InvalidFileError(u"%s is not a regular file" % unicodise(filename)) |
f81e7fba |
try: |
37a8ad44 |
file = open(filename, "rb") |
f81e7fba |
size = os.stat(filename)[ST_SIZE]
except IOError, e: |
82d9eafa |
raise InvalidFileError(u"%s: %s" % (unicodise(filename), e.strerror)) |
f81e7fba |
headers = SortedDict() |
8ec1807f |
if extra_headers:
headers.update(extra_headers) |
f81e7fba |
headers["content-length"] = size |
eb31131e |
content_type = None
if self.config.guess_mime_type:
content_type = mimetypes.guess_type(filename)[0]
if not content_type:
content_type = self.config.default_mime_type
debug("Content-Type set to '%s'" % content_type)
headers["content-type"] = content_type |
9b7618ae |
if self.config.acl_public: |
9081133d |
headers["x-amz-acl"] = "public-read" |
fc8a5df8 |
request = self.create_request("OBJECT_PUT", uri = uri, headers = headers) |
82d9eafa |
labels = { 'source' : unicodise(filename), 'destination' : unicodise(uri.uri()), 'extra' : extra_label } |
4396d217 |
response = self.send_file(request, file, labels) |
f81e7fba |
return response
|
688964d7 |
def object_get(self, uri, stream, start_position = 0, extra_label = ""): |
ed27a45e |
if uri.type != "s3":
raise ValueError("Expected URI type 's3', got '%s'" % uri.type) |
fc8a5df8 |
request = self.create_request("OBJECT_GET", uri = uri) |
3894a49a |
labels = { 'source' : unicodise(uri.uri()), 'destination' : unicodise(stream.name), 'extra' : extra_label } |
4396d217 |
response = self.recv_file(request, stream, labels, start_position) |
f81e7fba |
return response |
ed27a45e |
|
fc8a5df8 |
def object_delete(self, uri): |
b819c70c |
if uri.type != "s3":
raise ValueError("Expected URI type 's3', got '%s'" % uri.type) |
fc8a5df8 |
request = self.create_request("OBJECT_DELETE", uri = uri)
response = self.send_request(request)
return response |
b819c70c |
|
7d0ac8ee |
def object_copy(self, src_uri, dst_uri, extra_headers = None):
if src_uri.type != "s3":
raise ValueError("Expected URI type 's3', got '%s'" % src_uri.type)
if dst_uri.type != "s3":
raise ValueError("Expected URI type 's3', got '%s'" % dst_uri.type)
headers = SortedDict()
headers['x-amz-copy-source'] = "/%s/%s" % (src_uri.bucket(), self.urlencode_string(src_uri.object()))
if self.config.acl_public:
headers["x-amz-acl"] = "public-read"
if extra_headers:
headers.update(extra_headers)
request = self.create_request("OBJECT_PUT", uri = dst_uri, headers = headers)
response = self.send_request(request)
return response
|
7d61be89 |
def object_move(self, src_uri, dst_uri, extra_headers = None):
response_copy = self.object_copy(src_uri, dst_uri, extra_headers)
debug("Object %s copied to %s" % (src_uri, dst_uri))
if getRootTagName(response_copy["data"]) == "CopyObjectResult":
response_delete = self.object_delete(src_uri)
debug("Object %s deleted" % src_uri)
return response_copy
|
e5c6f6c5 |
def object_info(self, uri): |
fc8a5df8 |
request = self.create_request("OBJECT_HEAD", uri = uri) |
e5c6f6c5 |
response = self.send_request(request)
return response
def get_acl(self, uri):
if uri.has_object(): |
fc8a5df8 |
request = self.create_request("OBJECT_GET", uri = uri, extra = "?acl") |
e5c6f6c5 |
else:
request = self.create_request("BUCKET_LIST", bucket = uri.bucket(), extra = "?acl") |
7f50f846 |
|
e5c6f6c5 |
response = self.send_request(request) |
7f50f846 |
acl = ACL(response['data']) |
90137a39 |
return acl |
e5c6f6c5 |
|
585c735a |
def set_acl(self, uri, acl):
if uri.has_object():
request = self.create_request("OBJECT_PUT", uri = uri, extra = "?acl")
else:
request = self.create_request("BUCKET_CREATE", bucket = uri.bucket(), extra = "?acl")
body = str(acl)
debug(u"set_acl(%s): acl-xml: %s" % (uri, body))
response = self.send_request(request, body)
return response
|
ec50b5a7 |
## Low level methods |
c0e0c042 |
def urlencode_string(self, string): |
66528933 |
if type(string) == unicode:
string = string.encode("utf-8") |
c0e0c042 |
encoded = ""
## List of characters that must be escaped for S3
## Haven't found this in any official docs
## but my tests show it's more less correct.
## If you start getting InvalidSignature errors
## from S3 check the error headers returned
## from S3 to see whether the list hasn't
## changed.
for c in string: # I'm not sure how to know in what encoding
# 'object' is. Apparently "type(object)==str"
# but the contents is a string of unicode
# bytes, e.g. '\xc4\x8d\xc5\xafr\xc3\xa1k'
# Don't know what it will do on non-utf8
# systems.
# [hope that sounds reassuring ;-)]
o = ord(c)
if (o <= 32 or # Space and below
o == 0x22 or # "
o == 0x23 or # #
o == 0x25 or # %
o == 0x2B or # + (or it would become <space>)
o == 0x3C or # <
o == 0x3E or # >
o == 0x3F or # ?
o == 0x5B or # [
o == 0x5C or # \
o == 0x5D or # ]
o == 0x5E or # ^
o == 0x60 or # `
o >= 123): # { and above, including >= 128 for UTF-8
encoded += "%%%02X" % o
else:
encoded += c
debug("String '%s' encoded to '%s'" % (string, encoded))
return encoded
|
fc8a5df8 |
def create_request(self, operation, uri = None, bucket = None, object = None, headers = None, extra = None, **params): |
dc758146 |
resource = { 'bucket' : None, 'uri' : "/" } |
fc8a5df8 |
if uri and (bucket or object):
raise ValueError("Both 'uri' and either 'bucket' or 'object' parameters supplied")
## If URI is given use that instead of bucket/object parameters
if uri:
bucket = uri.bucket()
object = uri.has_object() and uri.object() or None
|
f81e7fba |
if bucket: |
dc758146 |
resource['bucket'] = str(bucket) |
f81e7fba |
if object: |
dc758146 |
resource['uri'] = "/" + self.urlencode_string(object)
if extra:
resource['uri'] += extra |
f81e7fba |
if not headers:
headers = SortedDict()
if headers.has_key("date"):
if not headers.has_key("x-amz-date"):
headers["x-amz-date"] = headers["date"]
del(headers["date"])
if not headers.has_key("x-amz-date"): |
c4761f22 |
headers["x-amz-date"] = time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime()) |
f81e7fba |
method_string = S3.http_methods.getkey(S3.operations[operation] & S3.http_methods["MASK"])
signature = self.sign_headers(method_string, resource, headers)
headers["Authorization"] = "AWS "+self.config.access_key+":"+signature |
f4555c39 |
param_str = ""
for param in params:
if params[param] not in (None, ""):
param_str += "&%s=%s" % (param, params[param]) |
82157846 |
else:
param_str += "&%s" % param |
f4555c39 |
if param_str != "": |
dc758146 |
resource['uri'] += "?" + param_str[1:]
debug("CreateRequest: resource[uri]=" + resource['uri']) |
f81e7fba |
return (method_string, resource, headers)
|
946e9636 |
def _fail_wait(self, retries):
# Wait a few seconds. The more it fails the more we wait.
return (self._max_retries - retries + 1) * 3
def send_request(self, request, body = None, retries = _max_retries): |
f81e7fba |
method_string, resource, headers = request |
89a93383 |
debug("Processing request, please wait...") |
00f5f67e |
try:
conn = self.get_connection(resource['bucket'])
conn.request(method_string, self.format_uri(resource), body, headers)
response = {}
http_response = conn.getresponse()
response["status"] = http_response.status
response["reason"] = http_response.reason
response["headers"] = convertTupleListToDict(http_response.getheaders())
response["data"] = http_response.read()
debug("Response: " + str(response))
conn.close() |
f228b03f |
except Exception, e: |
00f5f67e |
if retries: |
f228b03f |
warning("Retrying failed request: %s (%s)" % (resource['uri'], e)) |
946e9636 |
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries)) |
00f5f67e |
return self.send_request(request, body, retries - 1)
else:
raise S3RequestError("Request failed for: %s" % resource['uri']) |
bb6d07ee |
if response["status"] == 307:
## RedirectPermanent
redir_bucket = getTextFromXml(response['data'], ".//Bucket")
redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
self.set_hostname(redir_bucket, redir_hostname) |
89a93383 |
warning("Redirected to: %s" % (redir_hostname)) |
bb6d07ee |
return self.send_request(request, body)
|
f228b03f |
if response["status"] >= 500:
e = S3Error(response) |
00f5f67e |
if retries: |
f228b03f |
warning(u"Retrying failed request: %s" % resource['uri'])
warning(unicode(e)) |
946e9636 |
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries)) |
00f5f67e |
return self.send_request(request, body, retries - 1)
else: |
f228b03f |
raise e
if response["status"] < 200 or response["status"] > 299:
raise S3Error(response)
|
f81e7fba |
return response
|
4396d217 |
def send_file(self, request, file, labels, throttle = 0, retries = _max_retries): |
f81e7fba |
method_string, resource, headers = request |
2d7ceec9 |
size_left = size_total = headers.get("content-length")
if self.config.progress_meter: |
4396d217 |
progress = self.config.progress_class(labels, size_total) |
2d7ceec9 |
else:
info("Sending file '%s', please wait..." % file.name)
timestamp_start = time.time() |
946e9636 |
try:
conn = self.get_connection(resource['bucket'])
conn.connect()
conn.putrequest(method_string, self.format_uri(resource))
for header in headers.keys():
conn.putheader(header, str(headers[header]))
conn.endheaders()
except Exception, e: |
9197e62e |
if self.config.progress_meter:
progress.done("failed") |
946e9636 |
if retries:
warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
# Connection error -> same throttle value |
4396d217 |
return self.send_file(request, file, labels, throttle, retries - 1) |
946e9636 |
else: |
9197e62e |
raise S3UploadError("Upload failed for: %s" % resource['uri']) |
bb6d07ee |
file.seek(0) |
0b2aefe3 |
md5_hash = md5() |
946e9636 |
try:
while (size_left > 0): |
d56ec482 |
#debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name)) |
946e9636 |
data = file.read(self.config.send_chunk)
md5_hash.update(data) |
9197e62e |
conn.send(data) |
2d7ceec9 |
if self.config.progress_meter: |
946e9636 |
progress.update(delta_position = len(data))
size_left -= len(data)
if throttle:
time.sleep(throttle)
md5_computed = md5_hash.hexdigest()
response = {}
http_response = conn.getresponse()
response["status"] = http_response.status
response["reason"] = http_response.reason
response["headers"] = convertTupleListToDict(http_response.getheaders())
response["data"] = http_response.read()
response["size"] = size_total
conn.close() |
d56ec482 |
debug(u"Response: %s" % response) |
946e9636 |
except Exception, e:
if self.config.progress_meter:
progress.done("failed")
if retries:
throttle = throttle and throttle * 5 or 0.01 |
9197e62e |
warning("Upload failed: %s (%s)" % (resource['uri'], e)) |
946e9636 |
warning("Retrying on lower speed (throttle=%0.2f)" % throttle)
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
# Connection error -> same throttle value |
4396d217 |
return self.send_file(request, file, labels, throttle, retries - 1) |
946e9636 |
else:
debug("Giving up on '%s' %s" % (file.name, e)) |
9197e62e |
raise S3UploadError("Upload failed for: %s" % resource['uri']) |
bb6d07ee |
|
2d7ceec9 |
timestamp_end = time.time()
response["elapsed"] = timestamp_end - timestamp_start
response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
if self.config.progress_meter:
## The above conn.close() takes some time -> update() progress meter
## to correct the average speed. Otherwise people will complain that
## 'progress' and response["speed"] are inconsistent ;-)
progress.update()
progress.done("done")
|
bb6d07ee |
if response["status"] == 307:
## RedirectPermanent
redir_bucket = getTextFromXml(response['data'], ".//Bucket")
redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
self.set_hostname(redir_bucket, redir_hostname) |
89a93383 |
warning("Redirected to: %s" % (redir_hostname)) |
4396d217 |
return self.send_file(request, file, labels) |
bb6d07ee |
|
493c0724 |
# S3 from time to time doesn't send ETag back in a response :-(
# Force re-upload here.
if not response['headers'].has_key('etag'):
response['headers']['etag'] = ''
|
d56ec482 |
if response["status"] < 200 or response["status"] > 299:
if response["status"] >= 500:
## AWS internal error - retry
if retries:
warning("Upload failed: %s (%s)" % (resource['uri'], S3Error(response)))
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
return self.send_file(request, file, labels, throttle, retries - 1)
else:
warning("Too many failures. Giving up on '%s'" % (file.name))
raise S3UploadError
## Non-recoverable error
raise S3Error(response)
|
e7906cf0 |
debug("MD5 sums: computed=%s, received=%s" % (md5_computed, response["headers"]["etag"]))
if response["headers"]["etag"].strip('"\'') != md5_hash.hexdigest():
warning("MD5 Sums don't match!")
if retries: |
89a93383 |
warning("Retrying upload of %s" % (file.name)) |
4396d217 |
return self.send_file(request, file, labels, throttle, retries - 1) |
e7906cf0 |
else: |
89a93383 |
warning("Too many failures. Giving up on '%s'" % (file.name)) |
e7906cf0 |
raise S3UploadError
|
f81e7fba |
return response
|
4396d217 |
def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries): |
f81e7fba |
method_string, resource, headers = request |
2d7ceec9 |
if self.config.progress_meter: |
4396d217 |
progress = self.config.progress_class(labels, 0) |
2d7ceec9 |
else:
info("Receiving file '%s', please wait..." % stream.name)
timestamp_start = time.time() |
9197e62e |
try:
conn = self.get_connection(resource['bucket'])
conn.connect()
conn.putrequest(method_string, self.format_uri(resource))
for header in headers.keys():
conn.putheader(header, str(headers[header]))
if start_position > 0:
debug("Requesting Range: %d .. end" % start_position)
conn.putheader("Range", "bytes=%d-" % start_position)
conn.endheaders()
response = {}
http_response = conn.getresponse()
response["status"] = http_response.status
response["reason"] = http_response.reason
response["headers"] = convertTupleListToDict(http_response.getheaders())
debug("Response: %s" % response)
except Exception, e:
if self.config.progress_meter:
progress.done("failed")
if retries:
warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
# Connection error -> same throttle value |
4396d217 |
return self.recv_file(request, stream, labels, start_position, retries - 1) |
9197e62e |
else:
raise S3DownloadError("Download failed for: %s" % resource['uri']) |
bb6d07ee |
if response["status"] == 307:
## RedirectPermanent
response['data'] = http_response.read()
redir_bucket = getTextFromXml(response['data'], ".//Bucket")
redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
self.set_hostname(redir_bucket, redir_hostname) |
89a93383 |
warning("Redirected to: %s" % (redir_hostname)) |
4396d217 |
return self.recv_file(request, stream, labels) |
bb6d07ee |
|
f81e7fba |
if response["status"] < 200 or response["status"] > 299:
raise S3Error(response)
|
9197e62e |
if start_position == 0:
# Only compute MD5 on the fly if we're downloading from beginning
# Otherwise we'd get a nonsense. |
0b2aefe3 |
md5_hash = md5() |
9197e62e |
size_left = int(response["headers"]["content-length"])
size_total = start_position + size_left
current_position = start_position
|
2d7ceec9 |
if self.config.progress_meter:
progress.total_size = size_total |
9197e62e |
progress.initial_position = current_position
progress.current_position = current_position
try:
while (current_position < size_total):
this_chunk = size_left > self.config.recv_chunk and self.config.recv_chunk or size_left
data = http_response.read(this_chunk)
stream.write(data)
if start_position == 0:
md5_hash.update(data)
current_position += len(data)
## Call progress meter from here...
if self.config.progress_meter:
progress.update(delta_position = len(data))
conn.close()
except Exception, e: |
2d7ceec9 |
if self.config.progress_meter: |
9197e62e |
progress.done("failed")
if retries:
warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
warning("Waiting %d sec..." % self._fail_wait(retries))
time.sleep(self._fail_wait(retries))
# Connection error -> same throttle value |
4396d217 |
return self.recv_file(request, stream, labels, current_position, retries - 1) |
2d7ceec9 |
else: |
9197e62e |
raise S3DownloadError("Download failed for: %s" % resource['uri'])
stream.flush() |
63ba9974 |
timestamp_end = time.time() |
9197e62e |
|
25f6f8c9 |
if self.config.progress_meter:
## The above stream.flush() may take some time -> update() progress meter
## to correct the average speed. Otherwise people will complain that
## 'progress' and response["speed"] are inconsistent ;-)
progress.update()
progress.done("done")
|
9197e62e |
if start_position == 0:
# Only compute MD5 on the fly if we were downloading from the beginning
response["md5"] = md5_hash.hexdigest()
else:
# Otherwise try to compute MD5 of the output file
try:
response["md5"] = hash_file_md5(stream.name)
except IOError, e:
if e.errno != errno.ENOENT:
warning("Unable to open file: %s: %s" % (stream.name, e))
warning("Unable to verify MD5. Assume it matches.")
response["md5"] = response["headers"]["etag"]
|
f81e7fba |
response["md5match"] = response["headers"]["etag"].find(response["md5"]) >= 0 |
63ba9974 |
response["elapsed"] = timestamp_end - timestamp_start |
9197e62e |
response["size"] = current_position |
711ec4a2 |
response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1) |
9197e62e |
if response["size"] != start_position + long(response["headers"]["content-length"]): |
f98a27f2 |
warning("Reported size (%s) does not match received size (%s)" % ( |
9197e62e |
start_position + response["headers"]["content-length"], response["size"])) |
f81e7fba |
debug("ReceiveFile: Computed MD5 = %s" % response["md5"])
if not response["md5match"]:
warning("MD5 signatures do not match: computed=%s, received=%s" % (
response["md5"], response["headers"]["etag"]))
return response
def sign_headers(self, method, resource, headers):
h = method+"\n"
h += headers.get("content-md5", "")+"\n"
h += headers.get("content-type", "")+"\n"
h += headers.get("date", "")+"\n"
for header in headers.keys():
if header.startswith("x-amz-"):
h += header+":"+str(headers[header])+"\n" |
dc758146 |
if resource['bucket']:
h += "/" + resource['bucket']
h += resource['uri'] |
f4555c39 |
debug("SignHeaders: " + repr(h)) |
0b2aefe3 |
return base64.encodestring(hmac.new(self.config.secret_key, h, sha1).digest()).strip() |
f81e7fba |
|
91a18f4f |
@staticmethod
def check_bucket_name(bucket, dns_strict = True): |
97233238 |
if dns_strict:
invalid = re.search("([^a-z0-9\.-])", bucket)
if invalid:
raise ParameterError("Bucket name '%s' contains disallowed character '%s'. The only supported ones are: lowercase us-ascii letters (a-z), digits (0-9), dot (.) and hyphen (-)." % (bucket, invalid.groups()[0]))
else:
invalid = re.search("([^A-Za-z0-9\._-])", bucket)
if invalid:
raise ParameterError("Bucket name '%s' contains disallowed character '%s'. The only supported ones are: us-ascii letters (a-z, A-Z), digits (0-9), dot (.), hyphen (-) and underscore (_)." % (bucket, invalid.groups()[0]))
|
f81e7fba |
if len(bucket) < 3:
raise ParameterError("Bucket name '%s' is too short (min 3 characters)" % bucket)
if len(bucket) > 255:
raise ParameterError("Bucket name '%s' is too long (max 255 characters)" % bucket) |
97233238 |
if dns_strict:
if len(bucket) > 63:
raise ParameterError("Bucket name '%s' is too long (max 63 characters)" % bucket)
if re.search("-\.", bucket):
raise ParameterError("Bucket name '%s' must not contain sequence '-.' for DNS compatibility" % bucket)
if re.search("\.\.", bucket):
raise ParameterError("Bucket name '%s' must not contain sequence '..' for DNS compatibility" % bucket)
if not re.search("^[0-9a-z]", bucket):
raise ParameterError("Bucket name '%s' must start with a letter or a digit" % bucket)
if not re.search("[0-9a-z]$", bucket):
raise ParameterError("Bucket name '%s' must end with a letter or a digit" % bucket) |
f81e7fba |
return True
|
91a18f4f |
@staticmethod
def check_bucket_name_dns_conformity(bucket): |
97233238 |
try: |
91a18f4f |
return S3.check_bucket_name(bucket, dns_strict = True) |
97233238 |
except ParameterError:
return False |