S3/MultiPart.py
5c2eb565
 ## Amazon S3 Multipart upload support
 ## Author: Jerome Leclanche <jerome.leclanche@gmail.com>
 ## License: GPL Version 2
 
a184e0de
 import os
dc071cc1
 import sys
a184e0de
 from stat import ST_SIZE
5c2eb565
 from logging import debug, info, warning, error
dc071cc1
 from Utils import getTextFromXml, getTreeFromXml, formatSize, unicodise, calculateChecksum, parseNodes
a184e0de
 from Exceptions import S3UploadError
dc071cc1
 from collections import defaultdict
6a909985
 
5c2eb565
 class MultiPartUpload(object):
731b7e0c
 
9dda31d0
     MIN_CHUNK_SIZE_MB = 5       # 5MB
     MAX_CHUNK_SIZE_MB = 5120    # 5GB
     MAX_FILE_SIZE = 42949672960 # 5TB
731b7e0c
 
f46250ab
     def __init__(self, s3, file, uri, headers_baseline = {}):
731b7e0c
         self.s3 = s3
         self.file = file
         self.uri = uri
         self.parts = {}
f46250ab
         self.headers_baseline = headers_baseline
b78cd50a
         self.upload_id = self.initiate_multipart_upload()
731b7e0c
 
dc071cc1
     def get_parts_information(self, uri, upload_id):
         multipart_response = self.s3.list_multipart(uri, upload_id)
         tree = getTreeFromXml(multipart_response['data'])
 
         parts = defaultdict(lambda: None)
         for elem in parseNodes(tree):
             try:
                 parts[int(elem['PartNumber'])] = {'checksum': elem['ETag'], 'size': elem['Size']}
             except KeyError:
                 pass
 
         return parts
 
     def get_unique_upload_id(self, uri):
         upload_id = None
         multipart_response = self.s3.get_multipart(uri)
         tree = getTreeFromXml(multipart_response['data'])
         for mpupload in parseNodes(tree):
             try:
                 mp_upload_id = mpupload['UploadId']
                 mp_path = mpupload['Key']
                 info("mp_path: %s, object: %s" % (mp_path, uri.object()))
                 if mp_path == uri.object():
                     if upload_id is not None:
                         raise ValueError("More than one UploadId for URI %s.  Disable multipart upload, or use\n %s multipart %s\nto list the Ids, then pass a unique --upload-id into the put command." % (uri, sys.argv[0], uri))
                     upload_id = mp_upload_id
             except KeyError:
                 pass
 
         return upload_id
 
731b7e0c
     def initiate_multipart_upload(self):
         """
         Begin a multipart upload
         http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadInitiate.html
         """
dc071cc1
         if self.s3.config.upload_id is not None:
             self.upload_id = self.s3.config.upload_id
         elif self.s3.config.put_continue:
             self.upload_id = self.get_unique_upload_id(self.uri)
         else:
             self.upload_id = None
 
         if self.upload_id is None:
             request = self.s3.create_request("OBJECT_POST", uri = self.uri, headers = self.headers_baseline, extra = "?uploads")
             response = self.s3.send_request(request)
             data = response["data"]
             self.upload_id = getTextFromXml(data, "UploadId")
 
a184e0de
         return self.upload_id
731b7e0c
 
a184e0de
     def upload_all_parts(self):
731b7e0c
         """
         Execute a full multipart upload on a file
b78cd50a
         Returns the seq/etag dict
731b7e0c
         TODO use num_processes to thread it
         """
         if not self.upload_id:
             raise RuntimeError("Attempting to use a multipart upload that has not been initiated.")
 
136fb2d6
         self.chunk_size = self.s3.config.multipart_chunk_size_mb * 1024 * 1024
9147e25c
 
490cca09
         if self.file.name != "<stdin>":
                 size_left = file_size = os.stat(self.file.name)[ST_SIZE]
                 nr_parts = file_size / self.chunk_size + (file_size % self.chunk_size and 1)
                 debug("MultiPart: Uploading %s in %d parts" % (self.file.name, nr_parts))
         else:
             debug("MultiPart: Uploading from %s" % (self.file.name))
 
c7cbbf39
         remote_statuses = defaultdict(lambda: None)
dc071cc1
         if self.s3.config.put_continue:
             remote_statuses = self.get_parts_information(self.uri, self.upload_id)
 
b78cd50a
         seq = 1
136fb2d6
         if self.file.name != "<stdin>":
490cca09
             while size_left > 0:
                 offset = self.chunk_size * (seq - 1)
                 current_chunk_size = min(file_size - offset, self.chunk_size)
                 size_left -= current_chunk_size
                 labels = {
                     'source' : unicodise(self.file.name),
                     'destination' : unicodise(self.uri.uri()),
                     'extra' : "[part %d of %d, %s]" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
                 }
                 try:
dc071cc1
                     self.upload_part(seq, offset, current_chunk_size, labels, remote_status = remote_statuses[seq])
490cca09
                 except:
dc071cc1
                     error(u"\nUpload of '%s' part %d failed. Use\n  %s abortmp %s %s\nto abort the upload, or\n  %s --upload-id %s put ...\nto continue the upload."
                           % (self.file.name, seq, sys.argv[0], self.uri, self.upload_id, sys.argv[0], self.upload_id))
490cca09
                     raise
                 seq += 1
         else:
             while True:
                 buffer = self.file.read(self.chunk_size)
                 offset = self.chunk_size * (seq - 1)
                 current_chunk_size = len(buffer)
                 labels = {
                     'source' : unicodise(self.file.name),
                     'destination' : unicodise(self.uri.uri()),
                     'extra' : "[part %d, %s]" % (seq, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
                 }
                 if len(buffer) == 0: # EOF
                     break
                 try:
dc071cc1
                     self.upload_part(seq, offset, current_chunk_size, labels, buffer, remote_status = remote_statuses[seq])
490cca09
                 except:
dc071cc1
                     error(u"\nUpload of '%s' part %d failed. Use\n  %s abortmp %s %s\nto abort, or\n  %s --upload-id %s put ...\nto continue the upload."
                           % (self.file.name, seq, self.uri, sys.argv[0], self.upload_id, sys.argv[0], self.upload_id))
490cca09
                     raise
                 seq += 1
731b7e0c
 
b78cd50a
         debug("MultiPart: Upload finished: %d parts", seq - 1)
731b7e0c
 
dc071cc1
     def upload_part(self, seq, offset, chunk_size, labels, buffer = '', remote_status = None):
731b7e0c
         """
         Upload a file chunk
         http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
         """
         # TODO implement Content-MD5
b78cd50a
         debug("Uploading part %i of %r (%s bytes)" % (seq, self.upload_id, chunk_size))
dc071cc1
 
         if remote_status is not None:
             if int(remote_status['size']) == chunk_size:
                 checksum = calculateChecksum(buffer, self.file, offset, chunk_size, self.s3.config.send_chunk)
                 remote_checksum = remote_status['checksum'].strip('"')
                 if remote_checksum == checksum:
                     warning("MultiPart: size and md5sum match for %s part %d, skipping." % (self.uri, seq))
4ac28677
                     self.parts[seq] = remote_status['checksum']
dc071cc1
                     return
                 else:
                     warning("MultiPart: checksum (%s vs %s) does not match for %s part %d, reuploading."
                             % (remote_checksum, checksum, self.uri, seq))
             else:
                 warning("MultiPart: size (%d vs %d) does not match for %s part %d, reuploading."
                         % (int(remote_status['size']), chunk_size, self.uri, seq))
 
a184e0de
         headers = { "content-length": chunk_size }
b78cd50a
         query_string = "?partNumber=%i&uploadId=%s" % (seq, self.upload_id)
731b7e0c
         request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string)
490cca09
         response = self.s3.send_file(request, self.file, labels, buffer, offset = offset, chunk_size = chunk_size)
b78cd50a
         self.parts[seq] = response["headers"]["etag"]
a184e0de
         return response
731b7e0c
 
     def complete_multipart_upload(self):
         """
         Finish a multipart upload
         http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadComplete.html
         """
a184e0de
         debug("MultiPart: Completing upload: %s" % self.upload_id)
 
731b7e0c
         parts_xml = []
         part_xml = "<Part><PartNumber>%i</PartNumber><ETag>%s</ETag></Part>"
b78cd50a
         for seq, etag in self.parts.items():
             parts_xml.append(part_xml % (seq, etag))
731b7e0c
         body = "<CompleteMultipartUpload>%s</CompleteMultipartUpload>" % ("".join(parts_xml))
 
7b09ee87
         headers = { "content-length": len(body) }
731b7e0c
         request = self.s3.create_request("OBJECT_POST", uri = self.uri, headers = headers, extra = "?uploadId=%s" % (self.upload_id))
         response = self.s3.send_request(request, body = body)
 
         return response
344cadc8
 
a184e0de
     def abort_upload(self):
         """
         Abort multipart upload
         http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadAbort.html
         """
         debug("MultiPart: Aborting upload: %s" % self.upload_id)
dc071cc1
         #request = self.s3.create_request("OBJECT_DELETE", uri = self.uri, extra = "?uploadId=%s" % (self.upload_id))
         #response = self.s3.send_request(request)
         response = None
a184e0de
         return response
 
344cadc8
 # vim:et:ts=4:sts=4:ai