S3/MultiPart.py
5c2eb565
 ## Amazon S3 Multipart upload support
 ## Author: Jerome Leclanche <jerome.leclanche@gmail.com>
 ## License: GPL Version 2
 
a184e0de
 import os
 from stat import ST_SIZE
5c2eb565
 from logging import debug, info, warning, error
a184e0de
 from Utils import getTextFromXml, formatSize, unicodise
 from Exceptions import S3UploadError
6a909985
 
5c2eb565
 class MultiPartUpload(object):
731b7e0c
 
9dda31d0
     MIN_CHUNK_SIZE_MB = 5       # 5MB
     MAX_CHUNK_SIZE_MB = 5120    # 5GB
     MAX_FILE_SIZE = 42949672960 # 5TB
731b7e0c
 
f46250ab
     def __init__(self, s3, file, uri, headers_baseline = {}):
731b7e0c
         self.s3 = s3
         self.file = file
         self.uri = uri
         self.parts = {}
f46250ab
         self.headers_baseline = headers_baseline
b78cd50a
         self.upload_id = self.initiate_multipart_upload()
731b7e0c
 
     def initiate_multipart_upload(self):
         """
         Begin a multipart upload
         http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadInitiate.html
         """
f46250ab
         request = self.s3.create_request("OBJECT_POST", uri = self.uri, headers = self.headers_baseline, extra = "?uploads")
731b7e0c
         response = self.s3.send_request(request)
         data = response["data"]
a184e0de
         self.upload_id = getTextFromXml(data, "UploadId")
         return self.upload_id
731b7e0c
 
a184e0de
     def upload_all_parts(self):
731b7e0c
         """
         Execute a full multipart upload on a file
b78cd50a
         Returns the seq/etag dict
731b7e0c
         TODO use num_processes to thread it
         """
         if not self.upload_id:
             raise RuntimeError("Attempting to use a multipart upload that has not been initiated.")
 
a184e0de
         size_left = file_size = os.stat(self.file.name)[ST_SIZE]
         self.chunk_size = self.s3.config.multipart_chunk_size_mb * 1024 * 1024
         nr_parts = file_size / self.chunk_size + (file_size % self.chunk_size and 1)
         debug("MultiPart: Uploading %s in %d parts" % (self.file.name, nr_parts))
 
b78cd50a
         seq = 1
a184e0de
         while size_left > 0:
b78cd50a
             offset = self.chunk_size * (seq - 1)
a184e0de
             current_chunk_size = min(file_size - offset, self.chunk_size)
             size_left -= current_chunk_size
             labels = {
                 'source' : unicodise(self.file.name),
                 'destination' : unicodise(self.uri.uri()),
b78cd50a
                 'extra' : "[part %d of %d, %s]" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
a184e0de
             }
             try:
b78cd50a
                 self.upload_part(seq, offset, current_chunk_size, labels)
07ed7707
             except:
                 error(u"Upload of '%s' part %d failed. Aborting multipart upload." % (self.file.name, seq))
a184e0de
                 self.abort_upload()
07ed7707
                 raise
b78cd50a
             seq += 1
731b7e0c
 
b78cd50a
         debug("MultiPart: Upload finished: %d parts", seq - 1)
731b7e0c
 
b78cd50a
     def upload_part(self, seq, offset, chunk_size, labels):
731b7e0c
         """
         Upload a file chunk
         http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
         """
         # TODO implement Content-MD5
b78cd50a
         debug("Uploading part %i of %r (%s bytes)" % (seq, self.upload_id, chunk_size))
a184e0de
         headers = { "content-length": chunk_size }
b78cd50a
         query_string = "?partNumber=%i&uploadId=%s" % (seq, self.upload_id)
731b7e0c
         request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string)
a184e0de
         response = self.s3.send_file(request, self.file, labels, offset = offset, chunk_size = chunk_size)
b78cd50a
         self.parts[seq] = response["headers"]["etag"]
a184e0de
         return response
731b7e0c
 
     def complete_multipart_upload(self):
         """
         Finish a multipart upload
         http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadComplete.html
         """
a184e0de
         debug("MultiPart: Completing upload: %s" % self.upload_id)
 
731b7e0c
         parts_xml = []
         part_xml = "<Part><PartNumber>%i</PartNumber><ETag>%s</ETag></Part>"
b78cd50a
         for seq, etag in self.parts.items():
             parts_xml.append(part_xml % (seq, etag))
731b7e0c
         body = "<CompleteMultipartUpload>%s</CompleteMultipartUpload>" % ("".join(parts_xml))
 
7b09ee87
         headers = { "content-length": len(body) }
731b7e0c
         request = self.s3.create_request("OBJECT_POST", uri = self.uri, headers = headers, extra = "?uploadId=%s" % (self.upload_id))
         response = self.s3.send_request(request, body = body)
 
         return response
344cadc8
 
a184e0de
     def abort_upload(self):
         """
         Abort multipart upload
         http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadAbort.html
         """
         debug("MultiPart: Aborting upload: %s" % self.upload_id)
         request = self.s3.create_request("OBJECT_DELETE", uri = self.uri, extra = "?uploadId=%s" % (self.upload_id))
07ed7707
         response = self.s3.send_request(request)
a184e0de
         return response
 
344cadc8
 # vim:et:ts=4:sts=4:ai