Browse code

Rework and cleanup of MultiPart and MultiPartCopy (following the merge of PR #74)

The previous commit was a simple dysfonctional rebase of the original PR
This one is a full rework to make it functionnal.
And the whole MultiPart feature was also cleaned up.

Florent Viard authored on 2020/04/14 00:47:30
Showing 3 changed files
... ...
@@ -161,11 +161,13 @@ class Config(object):
161 161
     use_mime_magic = True
162 162
     mime_type = u""
163 163
     enable_multipart = True
164
-    multipart_chunk_size_mb = 15    # MB
165
-    multipart_max_chunks = 10000    # Maximum chunks on AWS S3, could be different on other S3-compatible APIs
166
-    #- minimum size to use multipart remote s3-to-s3 copy with byte range is 5gb
167
-    #multipart_copy_size = (5 * 1024 * 1024 * 1024) - 1
168
-    multipart_copy_size = 5 * 1024 * 1024 * 1024
164
+    # Chunk size is at the same time the chunk size and the threshold
165
+    multipart_chunk_size_mb = 15    # MiB
166
+    # Maximum chunk size for s3-to-s3 copy is 5 GiB.
167
+    # But, use a lot lower value by default
168
+    multipart_copy_chunk_size_mb = 2 * 1024
169
+    # Maximum chunks on AWS S3, could be different on other S3-compatible APIs
170
+    multipart_max_chunks = 10000
169 171
     # List of checks to be performed for 'sync'
170 172
     sync_checks = ['size', 'md5']   # 'weak-timestamp'
171 173
     # List of compiled REGEXPs
... ...
@@ -6,24 +6,43 @@
6 6
 
7 7
 from __future__ import absolute_import
8 8
 
9
-import os
10 9
 import sys
11
-from stat import ST_SIZE
12 10
 from logging import debug, info, warning, error
13
-from .Utils import getTextFromXml, getTreeFromXml, formatSize, unicodise, deunicodise, calculateChecksum, parseNodes, encode_to_s3
11
+from .Exceptions import ParameterError
12
+from .S3Uri import S3UriS3
13
+from .Utils import (getTextFromXml, getTreeFromXml, formatSize,
14
+                    calculateChecksum, parseNodes)
14 15
 
15
-class MultiPartUpload(object):
16 16
 
17
-    MIN_CHUNK_SIZE_MB = 5       # 5MB
18
-    MAX_CHUNK_SIZE_MB = 5120    # 5GB
19
-    MAX_FILE_SIZE = 42949672960 # 5TB
17
+class MultiPartUpload(object):
18
+    """Supports MultiPartUpload and MultiPartUpload(Copy) operation"""
19
+    MIN_CHUNK_SIZE_MB = 5        # 5MB
20
+    MAX_CHUNK_SIZE_MB = 5120     # 5GB
21
+    MAX_FILE_SIZE = 42949672960  # 5TB
20 22
 
21
-    def __init__(self, s3, file_stream, uri, headers_baseline=None):
23
+    def __init__(self, s3, src, dst_uri, headers_baseline=None,
24
+                 src_size=None):
22 25
         self.s3 = s3
23
-        self.file_stream = file_stream
24
-        self.uri = uri
26
+        self.file_stream = None
27
+        self.src_uri = None
28
+        self.src_size = src_size
29
+        self.dst_uri = dst_uri
25 30
         self.parts = {}
26 31
         self.headers_baseline = headers_baseline or {}
32
+
33
+        if isinstance(src, S3UriS3):
34
+            # Source is the uri of an object to s3-to-s3 copy with multipart.
35
+            self.src_uri = src
36
+            if not src_size:
37
+                raise ParameterError("Source size is missing for "
38
+                                     "MultipartUploadCopy operation")
39
+            c_size = self.s3.config.multipart_copy_chunk_size_mb * 1024 * 1024
40
+        else:
41
+            # Source is a file_stream to upload
42
+            self.file_stream = src
43
+            c_size = self.s3.config.multipart_chunk_size_mb * 1024 * 1024
44
+
45
+        self.chunk_size = c_size
27 46
         self.upload_id = self.initiate_multipart_upload()
28 47
 
29 48
     def get_parts_information(self, uri, upload_id):
... ...
@@ -33,7 +52,10 @@ class MultiPartUpload(object):
33 33
         parts = dict()
34 34
         for elem in parseNodes(tree):
35 35
             try:
36
-                parts[int(elem['PartNumber'])] = {'checksum': elem['ETag'], 'size': elem['Size']}
36
+                parts[int(elem['PartNumber'])] = {
37
+                    'checksum': elem['ETag'],
38
+                    'size': elem['Size']
39
+                }
37 40
             except KeyError:
38 41
                 pass
39 42
 
... ...
@@ -50,7 +72,11 @@ class MultiPartUpload(object):
50 50
                 info("mp_path: %s, object: %s" % (mp_path, uri.object()))
51 51
                 if mp_path == uri.object():
52 52
                     if upload_id:
53
-                        raise ValueError("More than one UploadId for URI %s.  Disable multipart upload, or use\n %s multipart %s\nto list the Ids, then pass a unique --upload-id into the put command." % (uri, sys.argv[0], uri))
53
+                        raise ValueError(
54
+                            "More than one UploadId for URI %s.  Disable "
55
+                            "multipart upload, or use\n %s multipart %s\n"
56
+                            "to list the Ids, then pass a unique --upload-id "
57
+                            "into the put command." % (uri, sys.argv[0], uri))
54 58
                     upload_id = mp_upload_id
55 59
             except KeyError:
56 60
                 pass
... ...
@@ -65,14 +91,14 @@ class MultiPartUpload(object):
65 65
         if self.s3.config.upload_id:
66 66
             self.upload_id = self.s3.config.upload_id
67 67
         elif self.s3.config.put_continue:
68
-            self.upload_id = self.get_unique_upload_id(self.uri)
68
+            self.upload_id = self.get_unique_upload_id(self.dst_uri)
69 69
         else:
70 70
             self.upload_id = ""
71 71
 
72 72
         if not self.upload_id:
73
-            request = self.s3.create_request("OBJECT_POST", uri = self.uri,
74
-                                             headers = self.headers_baseline,
75
-                                             uri_params = {'uploads': None})
73
+            request = self.s3.create_request("OBJECT_POST", uri=self.dst_uri,
74
+                                             headers=self.headers_baseline,
75
+                                             uri_params={'uploads': None})
76 76
             response = self.s3.send_request(request)
77 77
             data = response["data"]
78 78
             self.upload_id = getTextFromXml(data, "UploadId")
... ...
@@ -86,97 +112,178 @@ class MultiPartUpload(object):
86 86
         TODO use num_processes to thread it
87 87
         """
88 88
         if not self.upload_id:
89
-            raise RuntimeError("Attempting to use a multipart upload that has not been initiated.")
89
+            raise ParameterError("Attempting to use a multipart upload that "
90
+                                 "has not been initiated.")
90 91
 
91
-        self.chunk_size = self.s3.config.multipart_chunk_size_mb * 1024 * 1024
92
-        filename = self.file_stream.stream_name
92
+        remote_statuses = {}
93 93
 
94
-        if filename != u"<stdin>":
95
-                size_left = file_size = os.stat(deunicodise(filename))[ST_SIZE]
96
-                nr_parts = file_size // self.chunk_size + (file_size % self.chunk_size and 1)
97
-                debug("MultiPart: Uploading %s in %d parts" % (filename, nr_parts))
94
+        if self.src_uri:
95
+            filename = self.src_uri.uri()
96
+            # Continue is not possible with multipart copy
98 97
         else:
99
-            debug("MultiPart: Uploading from %s" % filename)
98
+            filename = self.file_stream.stream_name
100 99
 
101
-        remote_statuses = dict()
102 100
         if self.s3.config.put_continue:
103
-            remote_statuses = self.get_parts_information(self.uri, self.upload_id)
101
+            remote_statuses = self.get_parts_information(self.dst_uri,
102
+                                                         self.upload_id)
104 103
 
105 104
         if extra_label:
106 105
             extra_label = u' ' + extra_label
106
+        labels = {
107
+            'source' : filename,
108
+            'destination' : self.dst_uri.uri(),
109
+        }
110
+
107 111
         seq = 1
108
-        if filename != u"<stdin>":
112
+
113
+        if self.src_size:
114
+            size_left = self.src_size
115
+            nr_parts = self.src_size // self.chunk_size \
116
+                + (self.src_size % self.chunk_size and 1)
117
+            debug("MultiPart: Uploading %s in %d parts" % (filename, nr_parts))
118
+
109 119
             while size_left > 0:
110 120
                 offset = self.chunk_size * (seq - 1)
111
-                current_chunk_size = min(file_size - offset, self.chunk_size)
121
+                current_chunk_size = min(self.src_size - offset,
122
+                                         self.chunk_size)
112 123
                 size_left -= current_chunk_size
113
-                labels = {
114
-                    'source' : filename,
115
-                    'destination' : self.uri.uri(),
116
-                    'extra' : "[part %d of %d, %s]%s" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True), extra_label)
117
-                }
118
-                try:
119
-                    self.upload_part(seq, offset, current_chunk_size, labels, remote_status = remote_statuses.get(seq))
120
-                except:
121
-                    error(u"\nUpload of '%s' part %d failed. Use\n  %s abortmp %s %s\nto abort the upload, or\n  %s --upload-id %s put ...\nto continue the upload."
122
-                          % (filename, seq, sys.argv[0], self.uri, self.upload_id, sys.argv[0], self.upload_id))
123
-                    raise
124
-                seq += 1
125
-        else:
126
-            while True:
127
-                buffer = self.file_stream.read(self.chunk_size)
128
-                offset = 0 # send from start of the buffer
129
-                current_chunk_size = len(buffer)
130
-                labels = {
131
-                    'source' : filename,
132
-                    'destination' : self.uri.uri(),
133
-                    'extra' : "[part %d, %s]" % (seq, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
134
-                }
135
-                if len(buffer) == 0: # EOF
136
-                    break
124
+                labels['extra'] = "[part %d of %d, %s]%s" % (
125
+                    seq, nr_parts, "%d%sB" % formatSize(current_chunk_size,
126
+                                                        human_readable=True),
127
+                    extra_label)
137 128
                 try:
138
-                    self.upload_part(seq, offset, current_chunk_size, labels, buffer, remote_status = remote_statuses.get(seq))
129
+                    if self.file_stream:
130
+                        self.upload_part(
131
+                            seq, offset, current_chunk_size, labels,
132
+                            remote_status=remote_statuses.get(seq))
133
+                    else:
134
+                        self.copy_part(
135
+                            seq, offset, current_chunk_size, labels,
136
+                            remote_status=remote_statuses.get(seq))
139 137
                 except:
140
-                    error(u"\nUpload of '%s' part %d failed. Use\n  %s abortmp %s %s\nto abort, or\n  %s --upload-id %s put ...\nto continue the upload."
141
-                          % (filename, seq, sys.argv[0], self.uri, self.upload_id, sys.argv[0], self.upload_id))
138
+                    error(u"\nUpload of '%s' part %d failed. Use\n  "
139
+                          "%s abortmp %s %s\nto abort the upload, or\n  "
140
+                          "%s --upload-id %s put ...\nto continue the upload."
141
+                          % (filename, seq, sys.argv[0], self.dst_uri,
142
+                             self.upload_id, sys.argv[0], self.upload_id))
142 143
                     raise
143 144
                 seq += 1
144 145
 
146
+            debug("MultiPart: Upload finished: %d parts", seq - 1)
147
+            return
148
+
149
+
150
+        # Else -> Case of u"<stdin>" source
151
+        debug("MultiPart: Uploading from %s" % filename)
152
+        while True:
153
+            buffer = self.file_stream.read(self.chunk_size)
154
+            offset = 0 # send from start of the buffer
155
+            current_chunk_size = len(buffer)
156
+            labels['extra'] = "[part %d of %d, %s]%s" % (
157
+                seq, nr_parts,
158
+                "%d%sB" % formatSize(current_chunk_size,
159
+                                     human_readable=True),
160
+                extra_label)
161
+            if not buffer:
162
+                # EOF
163
+                break
164
+            try:
165
+                self.upload_part(seq, offset, current_chunk_size, labels,
166
+                                 buffer,
167
+                                 remote_status=remote_statuses.get(seq))
168
+            except:
169
+                error(u"\nUpload of '%s' part %d failed. Use\n  "
170
+                      "%s abortmp %s %s\nto abort, or\n  "
171
+                      "%s --upload-id %s put ...\nto continue the upload."
172
+                      % (filename, seq, sys.argv[0], self.dst_uri,
173
+                         self.upload_id, sys.argv[0], self.upload_id))
174
+                raise
175
+            seq += 1
176
+
145 177
         debug("MultiPart: Upload finished: %d parts", seq - 1)
146 178
 
147
-    def upload_part(self, seq, offset, chunk_size, labels, buffer = '', remote_status = None):
179
+    def upload_part(self, seq, offset, chunk_size, labels, buffer='',
180
+                    remote_status=None):
148 181
         """
149 182
         Upload a file chunk
150 183
         http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
151 184
         """
152 185
         # TODO implement Content-MD5
153
-        debug("Uploading part %i of %r (%s bytes)" % (seq, self.upload_id, chunk_size))
186
+        debug("Uploading part %i of %r (%s bytes)" % (seq, self.upload_id,
187
+                                                      chunk_size))
154 188
 
155 189
         if remote_status is not None:
156 190
             if int(remote_status['size']) == chunk_size:
157
-                checksum = calculateChecksum(buffer, self.file_stream, offset, chunk_size, self.s3.config.send_chunk)
191
+                checksum = calculateChecksum(buffer, self.file_stream, offset,
192
+                                             chunk_size,
193
+                                             self.s3.config.send_chunk)
158 194
                 remote_checksum = remote_status['checksum'].strip('"\'')
159 195
                 if remote_checksum == checksum:
160
-                    warning("MultiPart: size and md5sum match for %s part %d, skipping." % (self.uri, seq))
196
+                    warning("MultiPart: size and md5sum match for %s part %d, "
197
+                            "skipping." % (self.dst_uri, seq))
161 198
                     self.parts[seq] = remote_status['checksum']
162
-                    return
199
+                    return None
163 200
                 else:
164
-                    warning("MultiPart: checksum (%s vs %s) does not match for %s part %d, reuploading."
165
-                            % (remote_checksum, checksum, self.uri, seq))
201
+                    warning("MultiPart: checksum (%s vs %s) does not match for"
202
+                            " %s part %d, reuploading."
203
+                            % (remote_checksum, checksum, self.dst_uri, seq))
166 204
             else:
167
-                warning("MultiPart: size (%d vs %d) does not match for %s part %d, reuploading."
168
-                        % (int(remote_status['size']), chunk_size, self.uri, seq))
205
+                warning("MultiPart: size (%d vs %d) does not match for %s part"
206
+                        " %d, reuploading." % (int(remote_status['size']),
207
+                                               chunk_size, self.dst_uri, seq))
169 208
 
170
-        headers = { "content-length": str(chunk_size) }
171
-        query_string_params = {'partNumber':'%s' % seq,
209
+        headers = {"content-length": str(chunk_size)}
210
+        query_string_params = {'partNumber': '%s' % seq,
172 211
                                'uploadId': self.upload_id}
173
-        request = self.s3.create_request("OBJECT_PUT", uri = self.uri,
174
-                                         headers = headers,
175
-                                         uri_params = query_string_params)
176
-        response = self.s3.send_file(request, self.file_stream, labels, buffer, offset = offset, chunk_size = chunk_size)
212
+        request = self.s3.create_request("OBJECT_PUT", uri=self.dst_uri,
213
+                                         headers=headers,
214
+                                         uri_params=query_string_params)
215
+        response = self.s3.send_file(request, self.file_stream, labels, buffer,
216
+                                     offset=offset, chunk_size=chunk_size)
177 217
         self.parts[seq] = response["headers"].get('etag', '').strip('"\'')
178 218
         return response
179 219
 
220
+    def copy_part(self, seq, offset, chunk_size, labels, remote_status=None):
221
+        """
222
+        Copy a remote file chunk
223
+        http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
224
+        http://docs.amazonwebservices.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
225
+        """
226
+        debug("Copying part %i of %r (%s bytes)" % (seq, self.upload_id,
227
+                                                    chunk_size))
228
+
229
+        # set up headers with copy-params.
230
+        # Examples:
231
+        #    x-amz-copy-source: /source_bucket/sourceObject
232
+        #    x-amz-copy-source-range:bytes=first-last
233
+        #    x-amz-copy-source-if-match: etag
234
+        #    x-amz-copy-source-if-none-match: etag
235
+        #    x-amz-copy-source-if-unmodified-since: time_stamp
236
+        #    x-amz-copy-source-if-modified-since: time_stamp
237
+        headers = {
238
+            "x-amz-copy-source": "/%s/%s" % (self.src_uri.bucket(),
239
+                                             self.src_uri.object()),
240
+        }
241
+
242
+        # byte range, with end byte included. A 10 byte file has bytes=0-9
243
+        headers["x-amz-copy-source-range"] = \
244
+            "bytes=%d-%d" % (offset, (offset + chunk_size - 1))
245
+
246
+        query_string_params = {'partNumber': '%s' % seq,
247
+                               'uploadId': self.upload_id}
248
+        request = self.s3.create_request("OBJECT_PUT", uri=self.dst_uri,
249
+                                         headers=headers,
250
+                                         uri_params=query_string_params)
251
+        response = self.s3.send_request(request)
252
+
253
+        # NOTE: Amazon sends whitespace while upload progresses, which
254
+        # accumulates in response body and seems to confuse XML parser.
255
+        # Strip newlines to find ETag in XML response data
256
+        #data = response["data"].replace("\n", '')
257
+        self.parts[seq] = getTextFromXml(response['data'], "ETag") or ''
258
+
259
+        return response
260
+
180 261
     def complete_multipart_upload(self):
181 262
         """
182 263
         Finish a multipart upload
... ...
@@ -188,12 +295,13 @@ class MultiPartUpload(object):
188 188
         part_xml = "<Part><PartNumber>%i</PartNumber><ETag>%s</ETag></Part>"
189 189
         for seq, etag in self.parts.items():
190 190
             parts_xml.append(part_xml % (seq, etag))
191
-        body = "<CompleteMultipartUpload>%s</CompleteMultipartUpload>" % ("".join(parts_xml))
191
+        body = "<CompleteMultipartUpload>%s</CompleteMultipartUpload>" % (
192
+            "".join(parts_xml))
192 193
 
193
-        headers = { "content-length": str(len(body)) }
194
-        request = self.s3.create_request("OBJECT_POST", uri = self.uri,
195
-                                         headers = headers, body = body,
196
-                                         uri_params = {'uploadId': self.upload_id})
194
+        headers = {"content-length": str(len(body))}
195
+        request = self.s3.create_request(
196
+            "OBJECT_POST", uri=self.dst_uri, headers=headers, body=body,
197
+            uri_params={'uploadId': self.upload_id})
197 198
         response = self.s3.send_request(request)
198 199
 
199 200
         return response
... ...
@@ -211,99 +319,4 @@ class MultiPartUpload(object):
211 211
         return response
212 212
 
213 213
 
214
-class MultiPartCopy(MultiPartUpload):
215
-
216
-    def __init__(self, s3, src_uri, dst_uri, src_size, headers_baseline = {}):
217
-        self.s3 = s3
218
-        self.file = self.src_uri = src_uri
219
-        self.uri  = self.dst_uri = dst_uri
220
-        # ...
221
-        self.src_size = src_size
222
-        self.parts = {}
223
-        self.headers_baseline = headers_baseline
224
-        self.upload_id = self.initiate_multipart_copy()
225
-
226
-    def initiate_multipart_copy(self):
227
-        return self.initiate_multipart_upload()
228
-
229
-    def complete_multipart_copy(self):
230
-        return self.complete_multipart_upload()
231
-
232
-    def abort_copy(self):
233
-        return self.abort_upload()
234
-
235
-
236
-    def copy_all_parts(self):
237
-        """
238
-        Execute a full multipart upload copy on a remote file
239
-        Returns the seq/etag dict
240
-        """
241
-        if not self.upload_id:
242
-            raise RuntimeError("Attempting to use a multipart copy that has not been initiated.")
243
-
244
-        size_left = file_size = self.src_size
245
-        self.chunk_size = self.s3.config.multipart_copy_size # - 1
246
-        nr_parts = file_size / self.chunk_size + (file_size % self.chunk_size and 1)
247
-        debug("MultiPart: Copying %s in %d parts" % (self.src_uri, nr_parts))
248
-
249
-        seq = 1
250
-        while size_left > 0:
251
-            offset = self.chunk_size * (seq - 1)
252
-            current_chunk_size = min(file_size - offset, self.chunk_size)
253
-            size_left -= current_chunk_size
254
-            labels = {
255
-                'source' : unicodise(self.src_uri.uri()),
256
-                'destination' : unicodise(self.uri.uri()),
257
-                'extra' : "[part %d of %d, %s]" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
258
-            }
259
-            try:
260
-                self.copy_part(seq, offset, current_chunk_size, labels)
261
-            except:
262
-                # TODO: recover from some "retriable" errors?
263
-                error(u"Upload copy of '%s' part %d failed. Aborting multipart upload copy." % (self.src_uri, seq))
264
-                self.abort_copy()
265
-                raise
266
-            seq += 1
267
-
268
-        debug("MultiPart: Copy finished: %d parts", seq - 1)
269
-
270
-    def copy_part(self, seq, offset, chunk_size, labels):
271
-        """
272
-        Copy a remote file chunk
273
-        http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
274
-        http://docs.amazonwebservices.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
275
-        """
276
-        debug("Copying part %i of %r (%s bytes)" % (seq, self.upload_id, chunk_size))
277
-
278
-        # set up headers with copy-params.
279
-        # Examples:
280
-        #    x-amz-copy-source: /source_bucket/sourceObject
281
-        #    x-amz-copy-source-range:bytes=first-last
282
-        #    x-amz-copy-source-if-match: etag
283
-        #    x-amz-copy-source-if-none-match: etag
284
-        #    x-amz-copy-source-if-unmodified-since: time_stamp
285
-        #    x-amz-copy-source-if-modified-since: time_stamp
286
-        headers = { "x-amz-copy-source": "/%s/%s" % (self.src_uri.bucket(), self.src_uri.object()) }
287
-
288
-        # include byte range header if already on next sequence or original file is > 5gb
289
-        if (seq > 1) or (chunk_size >= self.s3.config.multipart_copy_size):
290
-            # a 10 byte file has bytes=0-9
291
-            headers["x-amz-copy-source-range"] = "bytes=%d-%d" % (offset, (offset + chunk_size - 1))
292
-
293
-        query_string = "?partNumber=%i&uploadId=%s" % (seq, self.upload_id)
294
-
295
-        request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string)
296
-        response = self.s3.send_request(request)
297
-
298
-        # NOTE: Amazon sends whitespace while upload progresses, which
299
-        # accumulates in response body and seems to confuse XML parser.
300
-        # Strip newlines to find ETag in XML response data
301
-        data = response["data"].replace("\n", '')
302
-        self.parts[seq] = getTextFromXml(data, "ETag")
303
-
304
-        # TODO: how to fail if no ETag found ... raise Exception?
305
-        #debug("Uploaded copy part %i of %r (%s bytes): etag=%s" % (seq, self.upload_id, chunk_size, self.parts[seq]))
306
-
307
-        return response
308
-
309 214
 # vim:et:ts=4:sts=4:ai
... ...
@@ -40,7 +40,7 @@ from .ACL import ACL, GranteeLogDelivery
40 40
 from .BidirMap import BidirMap
41 41
 from .Config import Config
42 42
 from .Exceptions import *
43
-from .MultiPart import MultiPartUpload, MultiPartCopy
43
+from .MultiPart import MultiPartUpload
44 44
 from .S3Uri import S3Uri
45 45
 from .ConnMan import ConnMan
46 46
 from .Crypto import (sign_request_v2, sign_request_v4, checksum_sha256_file,
... ...
@@ -845,19 +845,15 @@ class S3(object):
845 845
             headers.update(extra_headers)
846 846
 
847 847
         ## Multipart decision - only do multipart copy for remote s3 files > 5gb
848
-        multipart = False
849
-        # TODO: does it need new config option for: enable_multipart_copy ?
850 848
         if self.config.enable_multipart:
851 849
             # get size of remote src only if multipart is enabled
852 850
             src_info = self.object_info(src_uri)
853 851
             size = int(src_info["headers"]["content-length"])
854 852
 
855
-            if size > self.config.multipart_copy_size:
856
-                multipart = True
857
-
858
-        if multipart:
859
-            # Multipart requests are quite different... drop here
860
-            return self.copy_file_multipart(src_uri, dst_uri, size, headers)
853
+            if size > self.config.multipart_copy_chunk_size_mb * 1024 * 1024:
854
+                # Multipart requests are quite different... drop here
855
+                return self.copy_file_multipart(src_uri, dst_uri, size,
856
+                                                headers)
861 857
 
862 858
         ## Not multipart...
863 859
         headers['x-amz-copy-source'] = "/%s/%s" % (
... ...
@@ -866,7 +862,8 @@ class S3(object):
866 866
         )
867 867
         headers['x-amz-metadata-directive'] = "COPY"
868 868
 
869
-        request = self.create_request("OBJECT_PUT", uri = dst_uri, headers = headers)
869
+        request = self.create_request("OBJECT_PUT", uri=dst_uri,
870
+                                      headers=headers)
870 871
         response = self.send_request(request)
871 872
         if response["data"] and getRootTagName(response["data"]) == "Error":
872 873
             #http://doc.s3.amazonaws.com/proposals/copy.html
... ...
@@ -1627,9 +1624,9 @@ class S3(object):
1627 1627
 
1628 1628
         return response
1629 1629
 
1630
-    def send_file_multipart(self, stream, headers, uri, size, extra_label = ""):
1630
+    def send_file_multipart(self, stream, headers, uri, size, extra_label=""):
1631 1631
         timestamp_start = time.time()
1632
-        upload = MultiPartUpload(self, stream, uri, headers)
1632
+        upload = MultiPartUpload(self, stream, uri, headers, size)
1633 1633
         upload.upload_all_parts(extra_label)
1634 1634
         response = upload.complete_multipart_upload()
1635 1635
         timestamp_end = time.time()
... ...
@@ -1643,17 +1640,10 @@ class S3(object):
1643 1643
             raise S3UploadError(getTextFromXml(response["data"], 'Message'))
1644 1644
         return response
1645 1645
 
1646
-    def copy_file_multipart(self, src_uri, dst_uri, size, headers):
1647
-        debug("copying multi-part ..." )
1648
-        timestamp_start = time.time()
1649
-        multicopy = MultiPartCopy(self, src_uri, dst_uri, size, headers)
1650
-        multicopy.copy_all_parts()
1651
-        response = multicopy.complete_multipart_copy()
1652
-        timestamp_end = time.time()
1653
-        response["elapsed"] = timestamp_end - timestamp_start
1654
-        response["size"] = size
1655
-        response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
1656
-        return response
1646
+    def copy_file_multipart(self, src_uri, dst_uri, size, headers,
1647
+                            extra_label=""):
1648
+        return self.send_file_multipart(src_uri, headers, dst_uri, size,
1649
+                                        extra_label)
1657 1650
 
1658 1651
     def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries):
1659 1652
         self.update_region_inner_request(request)