Browse code

Reworked Multipart upload

- Converted to non-threaded upload again
(will add threading for all uploads, not only multipart, later on)
- Using S3.send_file() instead of S3.send_request()
- Don't read data in the main loop, only compute offset and chunk size
and leave it to S3.send_file() to read the data.
- Re-enabled progress indicator.

Still broken:
- "s3cmd sync" doesn't work with multipart uploaded files because
the ETag no longer contains MD5sum of the file. MAJOR!
- Multipart upload abort is not triggered with all failures.
- s3cmd commands "mplist" and "mpabort" to be added.
- s3cmd should resume failed multipart uploads.

Michal Ludvig authored on 2012/01/05 10:25:39
Showing 2 changed files
... ...
@@ -2,53 +2,16 @@
2 2
 ## Author: Jerome Leclanche <jerome.leclanche@gmail.com>
3 3
 ## License: GPL Version 2
4 4
 
5
-from Queue import Queue
6
-from threading import Thread
5
+import os
6
+from stat import ST_SIZE
7 7
 from logging import debug, info, warning, error
8
-from Utils import getTextFromXml
9
-
10
-class Worker(Thread):
11
-    """
12
-    Thread executing tasks from a given tasks queue
13
-    """
14
-    def __init__(self, tasks):
15
-        super(Worker, self).__init__()
16
-        self.tasks = tasks
17
-        self.daemon = True
18
-        self.start()
19
-
20
-    def run(self):
21
-        while True:
22
-            func, args, kargs = self.tasks.get()
23
-            func(*args, **kargs)
24
-            self.tasks.task_done()
25
-
26
-class ThreadPool(object):
27
-    """
28
-    Pool of threads consuming tasks from a queue
29
-    """
30
-    def __init__(self, num_threads):
31
-        self.tasks = Queue(num_threads)
32
-        for _ in range(num_threads):
33
-            Worker(self.tasks)
34
-
35
-    def add_task(self, func, *args, **kargs):
36
-        """
37
-        Add a task to the queue
38
-        """
39
-        self.tasks.put((func, args, kargs))
40
-
41
-    def wait_completion(self):
42
-        """
43
-        Wait for completion of all the tasks in the queue
44
-        """
45
-        self.tasks.join()
8
+from Utils import getTextFromXml, formatSize, unicodise
9
+from Exceptions import S3UploadError
46 10
 
47 11
 class MultiPartUpload(object):
48 12
 
49 13
     MIN_CHUNK_SIZE_MB = 5       # 5MB
50 14
     MAX_CHUNK_SIZE_MB = 5120    # 5GB
51
-    MAX_CHUNKS = 100
52 15
     MAX_FILE_SIZE = 42949672960 # 5TB
53 16
 
54 17
     def __init__(self, s3, file, uri):
... ...
@@ -66,11 +29,10 @@ class MultiPartUpload(object):
66 66
         request = self.s3.create_request("OBJECT_POST", uri = self.uri, extra = "?uploads")
67 67
         response = self.s3.send_request(request)
68 68
         data = response["data"]
69
-        s3, key, upload_id = getTextFromXml(data, "Bucket"), getTextFromXml(data, "Key"), getTextFromXml(data, "UploadId")
70
-        self.upload_id = upload_id
71
-        return s3, key, upload_id
69
+        self.upload_id = getTextFromXml(data, "UploadId")
70
+        return self.upload_id
72 71
 
73
-    def upload_all_parts(self, num_threads, chunk_size):
72
+    def upload_all_parts(self):
74 73
         """
75 74
         Execute a full multipart upload on a file
76 75
         Returns the id/etag dict
... ...
@@ -79,50 +41,52 @@ class MultiPartUpload(object):
79 79
         if not self.upload_id:
80 80
             raise RuntimeError("Attempting to use a multipart upload that has not been initiated.")
81 81
 
82
+        size_left = file_size = os.stat(self.file.name)[ST_SIZE]
83
+        self.chunk_size = self.s3.config.multipart_chunk_size_mb * 1024 * 1024
84
+        nr_parts = file_size / self.chunk_size + (file_size % self.chunk_size and 1)
85
+        debug("MultiPart: Uploading %s in %d parts" % (self.file.name, nr_parts))
86
+
82 87
         id = 1
83
-        if num_threads > 1:
84
-            debug("MultiPart: Uploading in %d threads" % num_threads)
85
-            pool = ThreadPool(num_threads)
86
-        else:
87
-            debug("MultiPart: Uploading in a single thread")
88
-
89
-        while True:
90
-            if id == self.MAX_CHUNKS:
91
-                data = self.file.read(-1)
92
-            else:
93
-                data = self.file.read(chunk_size)
94
-            if not data:
95
-                break
96
-            if num_threads > 1:
97
-                pool.add_task(self.upload_part, data, id)
98
-            else:
99
-                self.upload_part(data, id)
88
+        while size_left > 0:
89
+            offset = self.chunk_size * (id - 1)
90
+            current_chunk_size = min(file_size - offset, self.chunk_size)
91
+            size_left -= current_chunk_size
92
+            labels = {
93
+                'source' : unicodise(self.file.name),
94
+                'destination' : unicodise(self.uri.uri()),
95
+                'extra' : "[part %d of %d, %s]" % (id, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
96
+            }
97
+            try:
98
+                self.upload_part(id, offset, current_chunk_size, labels)
99
+            except S3UploadError, e:
100
+                error(u"Upload of '%s' part %d failed too many times. Aborting multipart upload." % (self.file.name, id))
101
+                self.abort_upload()
102
+                raise e
100 103
             id += 1
101 104
 
102
-        if num_threads > 1:
103
-            debug("Thread pool with %i threads and %i tasks awaiting completion." % (num_threads, id))
104
-            pool.wait_completion()
105
+        debug("MultiPart: Upload finished: %d parts", id - 1)
105 106
 
106
-    def upload_part(self, data, id):
107
+    def upload_part(self, id, offset, chunk_size, labels):
107 108
         """
108 109
         Upload a file chunk
109 110
         http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
110 111
         """
111 112
         # TODO implement Content-MD5
112
-        content_length = str(len(data))
113
-        debug("Uploading part %i of %r (%s bytes)" % (id, self.upload_id, content_length))
114
-        headers = { "content-length": content_length }
113
+        debug("Uploading part %i of %r (%s bytes)" % (id, self.upload_id, chunk_size))
114
+        headers = { "content-length": chunk_size }
115 115
         query_string = "?partNumber=%i&uploadId=%s" % (id, self.upload_id)
116 116
         request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string)
117
-        response = self.s3.send_request(request, body = data)
118
-
117
+        response = self.s3.send_file(request, self.file, labels, offset = offset, chunk_size = chunk_size)
119 118
         self.parts[id] = response["headers"]["etag"]
119
+        return response
120 120
 
121 121
     def complete_multipart_upload(self):
122 122
         """
123 123
         Finish a multipart upload
124 124
         http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadComplete.html
125 125
         """
126
+        debug("MultiPart: Completing upload: %s" % self.upload_id)
127
+
126 128
         parts_xml = []
127 129
         part_xml = "<Part><PartNumber>%i</PartNumber><ETag>%s</ETag></Part>"
128 130
         for id, etag in self.parts.items():
... ...
@@ -135,4 +99,14 @@ class MultiPartUpload(object):
135 135
 
136 136
         return response
137 137
 
138
+    def abort_upload(self):
139
+        """
140
+        Abort multipart upload
141
+        http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadAbort.html
142
+        """
143
+        debug("MultiPart: Aborting upload: %s" % self.upload_id)
144
+        request = self.s3.create_request("OBJECT_DELETE", uri = self.uri, extra = "?uploadId=%s" % (self.upload_id))
145
+        response = self.s3.send_request(request, body = body)
146
+        return response
147
+
138 148
 # vim:et:ts=4:sts=4:ai
... ...
@@ -616,7 +616,7 @@ class S3(object):
616 616
 
617 617
         return response
618 618
 
619
-    def send_file(self, request, file, labels, throttle = 0, retries = _max_retries):
619
+    def send_file(self, request, file, labels, throttle = 0, retries = _max_retries, offset = 0, chunk_size = -1):
620 620
         method_string, resource, headers = request.get_triplet()
621 621
         size_left = size_total = headers.get("content-length")
622 622
         if self.config.progress_meter:
... ...
@@ -639,15 +639,15 @@ class S3(object):
639 639
                 warning("Waiting %d sec..." % self._fail_wait(retries))
640 640
                 time.sleep(self._fail_wait(retries))
641 641
                 # Connection error -> same throttle value
642
-                return self.send_file(request, file, labels, throttle, retries - 1)
642
+                return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
643 643
             else:
644 644
                 raise S3UploadError("Upload failed for: %s" % resource['uri'])
645
-        file.seek(0)
645
+        file.seek(offset)
646 646
         md5_hash = md5()
647 647
         try:
648 648
             while (size_left > 0):
649 649
                 #debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name))
650
-                data = file.read(self.config.send_chunk)
650
+                data = file.read(min(self.config.send_chunk, size_left))
651 651
                 md5_hash.update(data)
652 652
                 conn.send(data)
653 653
                 if self.config.progress_meter:
... ...
@@ -676,7 +676,7 @@ class S3(object):
676 676
                 warning("Waiting %d sec..." % self._fail_wait(retries))
677 677
                 time.sleep(self._fail_wait(retries))
678 678
                 # Connection error -> same throttle value
679
-                return self.send_file(request, file, labels, throttle, retries - 1)
679
+                return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
680 680
             else:
681 681
                 debug("Giving up on '%s' %s" % (file.name, e))
682 682
                 raise S3UploadError("Upload failed for: %s" % resource['uri'])
... ...
@@ -698,7 +698,7 @@ class S3(object):
698 698
             redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
699 699
             self.set_hostname(redir_bucket, redir_hostname)
700 700
             warning("Redirected to: %s" % (redir_hostname))
701
-            return self.send_file(request, file, labels)
701
+            return self.send_file(request, file, labels, offset = offset, chunk_size = chunk_size)
702 702
 
703 703
         # S3 from time to time doesn't send ETag back in a response :-(
704 704
         # Force re-upload here.
... ...
@@ -721,7 +721,7 @@ class S3(object):
721 721
                     warning("Upload failed: %s (%s)" % (resource['uri'], S3Error(response)))
722 722
                     warning("Waiting %d sec..." % self._fail_wait(retries))
723 723
                     time.sleep(self._fail_wait(retries))
724
-                    return self.send_file(request, file, labels, throttle, retries - 1)
724
+                    return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
725 725
                 else:
726 726
                     warning("Too many failures. Giving up on '%s'" % (file.name))
727 727
                     raise S3UploadError
... ...
@@ -734,7 +734,7 @@ class S3(object):
734 734
             warning("MD5 Sums don't match!")
735 735
             if retries:
736 736
                 warning("Retrying upload of %s" % (file.name))
737
-                return self.send_file(request, file, labels, throttle, retries - 1)
737
+                return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
738 738
             else:
739 739
                 warning("Too many failures. Giving up on '%s'" % (file.name))
740 740
                 raise S3UploadError
... ...
@@ -743,13 +743,12 @@ class S3(object):
743 743
 
744 744
     def send_file_multipart(self, file, headers, uri, size):
745 745
         upload = MultiPartUpload(self, file, uri)
746
-        bucket, key, upload_id = upload.initiate_multipart_upload()
746
+        upload_id = upload.initiate_multipart_upload()
747 747
 
748 748
         num_threads = self.config.multipart_num_threads
749 749
         chunk_size = self.config.multipart_chunk_size_mb * 1024 * 1024
750 750
 
751
-        file.seek(0)
752
-        upload.upload_all_parts(num_threads, chunk_size)
751
+        upload.upload_all_parts()
753 752
         response = upload.complete_multipart_upload()
754 753
         response["speed"] = 0 # XXX
755 754
         return response