5c2eb565 |
## Amazon S3 Multipart upload support
## Author: Jerome Leclanche <jerome.leclanche@gmail.com>
## License: GPL Version 2
|
a184e0de |
import os |
dc071cc1 |
import sys |
a184e0de |
from stat import ST_SIZE |
5c2eb565 |
from logging import debug, info, warning, error |
dc071cc1 |
from Utils import getTextFromXml, getTreeFromXml, formatSize, unicodise, calculateChecksum, parseNodes |
a184e0de |
from Exceptions import S3UploadError |
dc071cc1 |
from collections import defaultdict |
6a909985 |
|
5c2eb565 |
class MultiPartUpload(object): |
731b7e0c |
|
9dda31d0 |
MIN_CHUNK_SIZE_MB = 5 # 5MB
MAX_CHUNK_SIZE_MB = 5120 # 5GB
MAX_FILE_SIZE = 42949672960 # 5TB |
731b7e0c |
|
f46250ab |
def __init__(self, s3, file, uri, headers_baseline = {}): |
731b7e0c |
self.s3 = s3
self.file = file
self.uri = uri
self.parts = {} |
f46250ab |
self.headers_baseline = headers_baseline |
b78cd50a |
self.upload_id = self.initiate_multipart_upload() |
731b7e0c |
|
dc071cc1 |
def get_parts_information(self, uri, upload_id):
multipart_response = self.s3.list_multipart(uri, upload_id)
tree = getTreeFromXml(multipart_response['data'])
parts = defaultdict(lambda: None)
for elem in parseNodes(tree):
try:
parts[int(elem['PartNumber'])] = {'checksum': elem['ETag'], 'size': elem['Size']}
except KeyError:
pass
return parts
def get_unique_upload_id(self, uri):
upload_id = None
multipart_response = self.s3.get_multipart(uri)
tree = getTreeFromXml(multipart_response['data'])
for mpupload in parseNodes(tree):
try:
mp_upload_id = mpupload['UploadId']
mp_path = mpupload['Key']
info("mp_path: %s, object: %s" % (mp_path, uri.object()))
if mp_path == uri.object():
if upload_id is not None:
raise ValueError("More than one UploadId for URI %s. Disable multipart upload, or use\n %s multipart %s\nto list the Ids, then pass a unique --upload-id into the put command." % (uri, sys.argv[0], uri))
upload_id = mp_upload_id
except KeyError:
pass
return upload_id
|
731b7e0c |
def initiate_multipart_upload(self):
"""
Begin a multipart upload
http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadInitiate.html
""" |
dc071cc1 |
if self.s3.config.upload_id is not None:
self.upload_id = self.s3.config.upload_id
elif self.s3.config.put_continue:
self.upload_id = self.get_unique_upload_id(self.uri)
else:
self.upload_id = None
if self.upload_id is None:
request = self.s3.create_request("OBJECT_POST", uri = self.uri, headers = self.headers_baseline, extra = "?uploads")
response = self.s3.send_request(request)
data = response["data"]
self.upload_id = getTextFromXml(data, "UploadId")
|
a184e0de |
return self.upload_id |
731b7e0c |
|
a184e0de |
def upload_all_parts(self): |
731b7e0c |
"""
Execute a full multipart upload on a file |
b78cd50a |
Returns the seq/etag dict |
731b7e0c |
TODO use num_processes to thread it
"""
if not self.upload_id:
raise RuntimeError("Attempting to use a multipart upload that has not been initiated.")
|
136fb2d6 |
self.chunk_size = self.s3.config.multipart_chunk_size_mb * 1024 * 1024 |
9147e25c |
|
490cca09 |
if self.file.name != "<stdin>":
size_left = file_size = os.stat(self.file.name)[ST_SIZE]
nr_parts = file_size / self.chunk_size + (file_size % self.chunk_size and 1)
debug("MultiPart: Uploading %s in %d parts" % (self.file.name, nr_parts))
else:
debug("MultiPart: Uploading from %s" % (self.file.name))
|
c7cbbf39 |
remote_statuses = defaultdict(lambda: None) |
dc071cc1 |
if self.s3.config.put_continue:
remote_statuses = self.get_parts_information(self.uri, self.upload_id)
|
b78cd50a |
seq = 1 |
136fb2d6 |
if self.file.name != "<stdin>": |
490cca09 |
while size_left > 0:
offset = self.chunk_size * (seq - 1)
current_chunk_size = min(file_size - offset, self.chunk_size)
size_left -= current_chunk_size
labels = {
'source' : unicodise(self.file.name),
'destination' : unicodise(self.uri.uri()),
'extra' : "[part %d of %d, %s]" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
}
try: |
dc071cc1 |
self.upload_part(seq, offset, current_chunk_size, labels, remote_status = remote_statuses[seq]) |
490cca09 |
except: |
dc071cc1 |
error(u"\nUpload of '%s' part %d failed. Use\n %s abortmp %s %s\nto abort the upload, or\n %s --upload-id %s put ...\nto continue the upload."
% (self.file.name, seq, sys.argv[0], self.uri, self.upload_id, sys.argv[0], self.upload_id)) |
490cca09 |
raise
seq += 1
else:
while True:
buffer = self.file.read(self.chunk_size)
offset = self.chunk_size * (seq - 1)
current_chunk_size = len(buffer)
labels = {
'source' : unicodise(self.file.name),
'destination' : unicodise(self.uri.uri()),
'extra' : "[part %d, %s]" % (seq, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
}
if len(buffer) == 0: # EOF
break
try: |
dc071cc1 |
self.upload_part(seq, offset, current_chunk_size, labels, buffer, remote_status = remote_statuses[seq]) |
490cca09 |
except: |
dc071cc1 |
error(u"\nUpload of '%s' part %d failed. Use\n %s abortmp %s %s\nto abort, or\n %s --upload-id %s put ...\nto continue the upload."
% (self.file.name, seq, self.uri, sys.argv[0], self.upload_id, sys.argv[0], self.upload_id)) |
490cca09 |
raise
seq += 1 |
731b7e0c |
|
b78cd50a |
debug("MultiPart: Upload finished: %d parts", seq - 1) |
731b7e0c |
|
dc071cc1 |
def upload_part(self, seq, offset, chunk_size, labels, buffer = '', remote_status = None): |
731b7e0c |
"""
Upload a file chunk
http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
"""
# TODO implement Content-MD5 |
b78cd50a |
debug("Uploading part %i of %r (%s bytes)" % (seq, self.upload_id, chunk_size)) |
dc071cc1 |
if remote_status is not None:
if int(remote_status['size']) == chunk_size:
checksum = calculateChecksum(buffer, self.file, offset, chunk_size, self.s3.config.send_chunk)
remote_checksum = remote_status['checksum'].strip('"')
if remote_checksum == checksum:
warning("MultiPart: size and md5sum match for %s part %d, skipping." % (self.uri, seq)) |
4ac28677 |
self.parts[seq] = remote_status['checksum'] |
dc071cc1 |
return
else:
warning("MultiPart: checksum (%s vs %s) does not match for %s part %d, reuploading."
% (remote_checksum, checksum, self.uri, seq))
else:
warning("MultiPart: size (%d vs %d) does not match for %s part %d, reuploading."
% (int(remote_status['size']), chunk_size, self.uri, seq))
|
a184e0de |
headers = { "content-length": chunk_size } |
b78cd50a |
query_string = "?partNumber=%i&uploadId=%s" % (seq, self.upload_id) |
731b7e0c |
request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string) |
490cca09 |
response = self.s3.send_file(request, self.file, labels, buffer, offset = offset, chunk_size = chunk_size) |
b78cd50a |
self.parts[seq] = response["headers"]["etag"] |
a184e0de |
return response |
731b7e0c |
def complete_multipart_upload(self):
"""
Finish a multipart upload
http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadComplete.html
""" |
a184e0de |
debug("MultiPart: Completing upload: %s" % self.upload_id)
|
731b7e0c |
parts_xml = []
part_xml = "<Part><PartNumber>%i</PartNumber><ETag>%s</ETag></Part>" |
b78cd50a |
for seq, etag in self.parts.items():
parts_xml.append(part_xml % (seq, etag)) |
731b7e0c |
body = "<CompleteMultipartUpload>%s</CompleteMultipartUpload>" % ("".join(parts_xml))
|
7b09ee87 |
headers = { "content-length": len(body) } |
731b7e0c |
request = self.s3.create_request("OBJECT_POST", uri = self.uri, headers = headers, extra = "?uploadId=%s" % (self.upload_id))
response = self.s3.send_request(request, body = body)
return response |
344cadc8 |
|
a184e0de |
def abort_upload(self):
"""
Abort multipart upload
http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadAbort.html
"""
debug("MultiPart: Aborting upload: %s" % self.upload_id) |
dc071cc1 |
#request = self.s3.create_request("OBJECT_DELETE", uri = self.uri, extra = "?uploadId=%s" % (self.upload_id))
#response = self.s3.send_request(request)
response = None |
a184e0de |
return response
|
344cadc8 |
# vim:et:ts=4:sts=4:ai |