Browse code

Adding MultiPartCopy class and enable for object_copy when remote file is greater than 5gb.

Damian Martinez authored on 2012/07/17 16:45:39
Showing 3 changed files
... ...
@@ -163,6 +163,9 @@ class Config(object):
163 163
     enable_multipart = True
164 164
     multipart_chunk_size_mb = 15    # MB
165 165
     multipart_max_chunks = 10000    # Maximum chunks on AWS S3, could be different on other S3-compatible APIs
166
+    #- minimum size to use multipart remote s3-to-s3 copy with byte range is 5gb
167
+    #multipart_copy_size = (5 * 1024 * 1024 * 1024) - 1
168
+    multipart_copy_size = 5 * 1024 * 1024 * 1024
166 169
     # List of checks to be performed for 'sync'
167 170
     sync_checks = ['size', 'md5']   # 'weak-timestamp'
168 171
     # List of compiled REGEXPs
... ...
@@ -210,4 +210,100 @@ class MultiPartUpload(object):
210 210
         response = None
211 211
         return response
212 212
 
213
+
214
+class MultiPartCopy(MultiPartUpload):
215
+
216
+    def __init__(self, s3, src_uri, dst_uri, src_size, headers_baseline = {}):
217
+        self.s3 = s3
218
+        self.file = self.src_uri = src_uri
219
+        self.uri  = self.dst_uri = dst_uri
220
+        # ...
221
+        self.src_size = src_size
222
+        self.parts = {}
223
+        self.headers_baseline = headers_baseline
224
+        self.upload_id = self.initiate_multipart_copy()
225
+
226
+    def initiate_multipart_copy(self):
227
+        return self.initiate_multipart_upload()
228
+
229
+    def complete_multipart_copy(self):
230
+        return self.complete_multipart_upload()
231
+
232
+    def abort_copy(self):
233
+        return self.abort_upload()
234
+
235
+
236
+    def copy_all_parts(self):
237
+        """
238
+        Execute a full multipart upload copy on a remote file
239
+        Returns the seq/etag dict
240
+        """
241
+        if not self.upload_id:
242
+            raise RuntimeError("Attempting to use a multipart copy that has not been initiated.")
243
+
244
+        size_left = file_size = self.src_size
245
+        self.chunk_size = self.s3.config.multipart_copy_size # - 1
246
+        nr_parts = file_size / self.chunk_size + (file_size % self.chunk_size and 1)
247
+        debug("MultiPart: Copying %s in %d parts" % (self.src_uri, nr_parts))
248
+
249
+        seq = 1
250
+        while size_left > 0:
251
+            offset = self.chunk_size * (seq - 1)
252
+            current_chunk_size = min(file_size - offset, self.chunk_size)
253
+            size_left -= current_chunk_size
254
+            labels = {
255
+                'source' : unicodise(self.src_uri.uri()),
256
+                'destination' : unicodise(self.uri.uri()),
257
+                'extra' : "[part %d of %d, %s]" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
258
+            }
259
+            try:
260
+                self.copy_part(seq, offset, current_chunk_size, labels)
261
+            except:
262
+                # TODO: recover from some "retriable" errors?
263
+                error(u"Upload copy of '%s' part %d failed. Aborting multipart upload copy." % (self.src_uri, seq))
264
+                self.abort_copy()
265
+                raise
266
+            seq += 1
267
+
268
+        debug("MultiPart: Copy finished: %d parts", seq - 1)
269
+
270
+    def copy_part(self, seq, offset, chunk_size, labels):
271
+        """
272
+        Copy a remote file chunk
273
+        http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
274
+        http://docs.amazonwebservices.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
275
+        """
276
+        debug("Copying part %i of %r (%s bytes)" % (seq, self.upload_id, chunk_size))
277
+
278
+        # set up headers with copy-params.
279
+        # Examples:
280
+        #    x-amz-copy-source: /source_bucket/sourceObject
281
+        #    x-amz-copy-source-range:bytes=first-last
282
+        #    x-amz-copy-source-if-match: etag
283
+        #    x-amz-copy-source-if-none-match: etag
284
+        #    x-amz-copy-source-if-unmodified-since: time_stamp
285
+        #    x-amz-copy-source-if-modified-since: time_stamp
286
+        headers = { "x-amz-copy-source": "/%s/%s" % (self.src_uri.bucket(), self.src_uri.object()) }
287
+
288
+        # include byte range header if already on next sequence or original file is > 5gb
289
+        if (seq > 1) or (chunk_size >= self.s3.config.multipart_copy_size):
290
+            # a 10 byte file has bytes=0-9
291
+            headers["x-amz-copy-source-range"] = "bytes=%d-%d" % (offset, (offset + chunk_size - 1))
292
+
293
+        query_string = "?partNumber=%i&uploadId=%s" % (seq, self.upload_id)
294
+
295
+        request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string)
296
+        response = self.s3.send_request(request)
297
+
298
+        # NOTE: Amazon sends whitespace while upload progresses, which
299
+        # accumulates in response body and seems to confuse XML parser.
300
+        # Strip newlines to find ETag in XML response data
301
+        data = response["data"].replace("\n", '')
302
+        self.parts[seq] = getTextFromXml(data, "ETag")
303
+
304
+        # TODO: how to fail if no ETag found ... raise Exception?
305
+        #debug("Uploaded copy part %i of %r (%s bytes): etag=%s" % (seq, self.upload_id, chunk_size, self.parts[seq]))
306
+
307
+        return response
308
+
213 309
 # vim:et:ts=4:sts=4:ai
... ...
@@ -40,7 +40,7 @@ from .ACL import ACL, GranteeLogDelivery
40 40
 from .BidirMap import BidirMap
41 41
 from .Config import Config
42 42
 from .Exceptions import *
43
-from .MultiPart import MultiPartUpload
43
+from .MultiPart import MultiPartUpload, MultiPartCopy
44 44
 from .S3Uri import S3Uri
45 45
 from .ConnMan import ConnMan
46 46
 from .Crypto import (sign_request_v2, sign_request_v4, checksum_sha256_file,
... ...
@@ -826,9 +826,7 @@ class S3(object):
826 826
                     raise exc
827 827
                 acl = None
828 828
         headers = SortedDict(ignore_case = True)
829
-        headers['x-amz-copy-source'] = "/%s/%s" % (src_uri.bucket(),
830
-                                                   urlencode_string(src_uri.object(), unicode_output=True))
831
-        headers['x-amz-metadata-directive'] = "COPY"
829
+
832 830
         if self.config.acl_public:
833 831
             headers["x-amz-acl"] = "public-read"
834 832
 
... ...
@@ -846,6 +844,28 @@ class S3(object):
846 846
         if extra_headers:
847 847
             headers.update(extra_headers)
848 848
 
849
+        ## Multipart decision - only do multipart copy for remote s3 files > 5gb
850
+        multipart = False
851
+        # TODO: does it need new config option for: enable_multipart_copy ?
852
+        if self.config.enable_multipart:
853
+            # get size of remote src only if multipart is enabled
854
+            src_info = self.object_info(src_uri)
855
+            size = int(src_info["headers"]["content-length"])
856
+
857
+            if size > self.config.multipart_copy_size:
858
+                multipart = True
859
+
860
+        if multipart:
861
+            # Multipart requests are quite different... drop here
862
+            return self.copy_file_multipart(src_uri, dst_uri, size, headers)
863
+
864
+        ## Not multipart...
865
+        headers['x-amz-copy-source'] = "/%s/%s" % (
866
+            src_uri.bucket(),
867
+            urlencode_string(src_uri.object(), unicode_output=True)
868
+        )
869
+        headers['x-amz-metadata-directive'] = "COPY"
870
+
849 871
         request = self.create_request("OBJECT_PUT", uri = dst_uri, headers = headers)
850 872
         response = self.send_request(request)
851 873
         if response["data"] and getRootTagName(response["data"]) == "Error":
... ...
@@ -1623,6 +1643,18 @@ class S3(object):
1623 1623
             raise S3UploadError(getTextFromXml(response["data"], 'Message'))
1624 1624
         return response
1625 1625
 
1626
+    def copy_file_multipart(self, src_uri, dst_uri, size, headers):
1627
+        debug("copying multi-part ..." )
1628
+        timestamp_start = time.time()
1629
+        multicopy = MultiPartCopy(self, src_uri, dst_uri, size, headers)
1630
+        multicopy.copy_all_parts()
1631
+        response = multicopy.complete_multipart_copy()
1632
+        timestamp_end = time.time()
1633
+        response["elapsed"] = timestamp_end - timestamp_start
1634
+        response["size"] = size
1635
+        response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
1636
+        return response
1637
+
1626 1638
     def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries):
1627 1639
         self.update_region_inner_request(request)
1628 1640