Browse code

Merge branch 'multipart-single'

Michal Ludvig authored on 2012/01/06 08:43:07
Showing 5 changed files
... ...
@@ -63,6 +63,8 @@ class Config(object):
63 63
     default_mime_type = "binary/octet-stream"
64 64
     guess_mime_type = True
65 65
     mime_type = ""
66
+    enable_multipart = True
67
+    multipart_chunk_size_mb = 15    # MB
66 68
     # List of checks to be performed for 'sync'
67 69
     sync_checks = ['size', 'md5']   # 'weak-timestamp'
68 70
     # List of compiled REGEXPs
... ...
@@ -202,3 +204,4 @@ class ConfigDumper(object):
202 202
         for option in config.option_list():
203 203
             self.stream.write("%s = %s\n" % (option, getattr(config, option)))
204 204
 
205
+# vim:et:ts=4:sts=4:ai
205 206
new file mode 100644
... ...
@@ -0,0 +1,113 @@
0
+## Amazon S3 Multipart upload support
1
+## Author: Jerome Leclanche <jerome.leclanche@gmail.com>
2
+## License: GPL Version 2
3
+
4
+import os
5
+from stat import ST_SIZE
6
+from logging import debug, info, warning, error
7
+from Utils import getTextFromXml, formatSize, unicodise
8
+from Exceptions import S3UploadError
9
+
10
+class MultiPartUpload(object):
11
+
12
+    MIN_CHUNK_SIZE_MB = 5       # 5MB
13
+    MAX_CHUNK_SIZE_MB = 5120    # 5GB
14
+    MAX_FILE_SIZE = 42949672960 # 5TB
15
+
16
+    def __init__(self, s3, file, uri, headers_baseline = {}):
17
+        self.s3 = s3
18
+        self.file = file
19
+        self.uri = uri
20
+        self.parts = {}
21
+        self.headers_baseline = headers_baseline
22
+        self.upload_id = self.initiate_multipart_upload()
23
+
24
+    def initiate_multipart_upload(self):
25
+        """
26
+        Begin a multipart upload
27
+        http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadInitiate.html
28
+        """
29
+        request = self.s3.create_request("OBJECT_POST", uri = self.uri, headers = self.headers_baseline, extra = "?uploads")
30
+        response = self.s3.send_request(request)
31
+        data = response["data"]
32
+        self.upload_id = getTextFromXml(data, "UploadId")
33
+        return self.upload_id
34
+
35
+    def upload_all_parts(self):
36
+        """
37
+        Execute a full multipart upload on a file
38
+        Returns the seq/etag dict
39
+        TODO use num_processes to thread it
40
+        """
41
+        if not self.upload_id:
42
+            raise RuntimeError("Attempting to use a multipart upload that has not been initiated.")
43
+
44
+        size_left = file_size = os.stat(self.file.name)[ST_SIZE]
45
+        self.chunk_size = self.s3.config.multipart_chunk_size_mb * 1024 * 1024
46
+        nr_parts = file_size / self.chunk_size + (file_size % self.chunk_size and 1)
47
+        debug("MultiPart: Uploading %s in %d parts" % (self.file.name, nr_parts))
48
+
49
+        seq = 1
50
+        while size_left > 0:
51
+            offset = self.chunk_size * (seq - 1)
52
+            current_chunk_size = min(file_size - offset, self.chunk_size)
53
+            size_left -= current_chunk_size
54
+            labels = {
55
+                'source' : unicodise(self.file.name),
56
+                'destination' : unicodise(self.uri.uri()),
57
+                'extra' : "[part %d of %d, %s]" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
58
+            }
59
+            try:
60
+                self.upload_part(seq, offset, current_chunk_size, labels)
61
+            except:
62
+                error(u"Upload of '%s' part %d failed. Aborting multipart upload." % (self.file.name, seq))
63
+                self.abort_upload()
64
+                raise
65
+            seq += 1
66
+
67
+        debug("MultiPart: Upload finished: %d parts", seq - 1)
68
+
69
+    def upload_part(self, seq, offset, chunk_size, labels):
70
+        """
71
+        Upload a file chunk
72
+        http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
73
+        """
74
+        # TODO implement Content-MD5
75
+        debug("Uploading part %i of %r (%s bytes)" % (seq, self.upload_id, chunk_size))
76
+        headers = { "content-length": chunk_size }
77
+        query_string = "?partNumber=%i&uploadId=%s" % (seq, self.upload_id)
78
+        request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string)
79
+        response = self.s3.send_file(request, self.file, labels, offset = offset, chunk_size = chunk_size)
80
+        self.parts[seq] = response["headers"]["etag"]
81
+        return response
82
+
83
+    def complete_multipart_upload(self):
84
+        """
85
+        Finish a multipart upload
86
+        http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadComplete.html
87
+        """
88
+        debug("MultiPart: Completing upload: %s" % self.upload_id)
89
+
90
+        parts_xml = []
91
+        part_xml = "<Part><PartNumber>%i</PartNumber><ETag>%s</ETag></Part>"
92
+        for seq, etag in self.parts.items():
93
+            parts_xml.append(part_xml % (seq, etag))
94
+        body = "<CompleteMultipartUpload>%s</CompleteMultipartUpload>" % ("".join(parts_xml))
95
+
96
+        headers = { "content-length": len(body) }
97
+        request = self.s3.create_request("OBJECT_POST", uri = self.uri, headers = headers, extra = "?uploadId=%s" % (self.upload_id))
98
+        response = self.s3.send_request(request, body = body)
99
+
100
+        return response
101
+
102
+    def abort_upload(self):
103
+        """
104
+        Abort multipart upload
105
+        http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadAbort.html
106
+        """
107
+        debug("MultiPart: Aborting upload: %s" % self.upload_id)
108
+        request = self.s3.create_request("OBJECT_DELETE", uri = self.uri, extra = "?uploadId=%s" % (self.upload_id))
109
+        response = self.s3.send_request(request)
110
+        return response
111
+
112
+# vim:et:ts=4:sts=4:ai
... ...
@@ -20,11 +20,12 @@ except ImportError:
20 20
 
21 21
 from Utils import *
22 22
 from SortedDict import SortedDict
23
+from AccessLog import AccessLog
24
+from ACL import ACL, GranteeLogDelivery
23 25
 from BidirMap import BidirMap
24 26
 from Config import Config
25 27
 from Exceptions import *
26
-from ACL import ACL, GranteeLogDelivery
27
-from AccessLog import AccessLog
28
+from MultiPart import MultiPartUpload
28 29
 from S3Uri import S3Uri
29 30
 
30 31
 try:
... ...
@@ -111,15 +112,16 @@ class S3(object):
111 111
         PUT = 0x02,
112 112
         HEAD = 0x04,
113 113
         DELETE = 0x08,
114
-        MASK = 0x0F,
115
-        )
114
+        POST = 0x10,
115
+        MASK = 0x1F,
116
+    )
116 117
 
117 118
     targets = BidirMap(
118 119
         SERVICE = 0x0100,
119 120
         BUCKET = 0x0200,
120 121
         OBJECT = 0x0400,
121 122
         MASK = 0x0700,
122
-        )
123
+    )
123 124
 
124 125
     operations = BidirMap(
125 126
         UNDFINED = 0x0000,
... ...
@@ -131,13 +133,14 @@ class S3(object):
131 131
         OBJECT_GET = targets["OBJECT"] | http_methods["GET"],
132 132
         OBJECT_HEAD = targets["OBJECT"] | http_methods["HEAD"],
133 133
         OBJECT_DELETE = targets["OBJECT"] | http_methods["DELETE"],
134
+        OBJECT_POST = targets["OBJECT"] | http_methods["POST"],
134 135
     )
135 136
 
136 137
     codes = {
137 138
         "NoSuchBucket" : "Bucket '%s' does not exist",
138 139
         "AccessDenied" : "Access to bucket '%s' was denied",
139 140
         "BucketAlreadyExists" : "Bucket '%s' already exists",
140
-        }
141
+    }
141 142
 
142 143
     ## S3 sometimes sends HTTP-307 response
143 144
     redir_map = {}
... ...
@@ -345,10 +348,12 @@ class S3(object):
345 345
             size = os.stat(filename)[ST_SIZE]
346 346
         except (IOError, OSError), e:
347 347
             raise InvalidFileError(u"%s: %s" % (unicodise(filename), e.strerror))
348
+
348 349
         headers = SortedDict(ignore_case = True)
349 350
         if extra_headers:
350 351
             headers.update(extra_headers)
351
-        headers["content-length"] = size
352
+
353
+        ## MIME-type handling
352 354
         content_type = self.config.mime_type
353 355
         if not content_type and self.config.guess_mime_type:
354 356
             content_type = mime_magic(filename)
... ...
@@ -356,10 +361,24 @@ class S3(object):
356 356
             content_type = self.config.default_mime_type
357 357
         debug("Content-Type set to '%s'" % content_type)
358 358
         headers["content-type"] = content_type
359
+
360
+        ## Other Amazon S3 attributes
359 361
         if self.config.acl_public:
360 362
             headers["x-amz-acl"] = "public-read"
361 363
         if self.config.reduced_redundancy:
362 364
             headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY"
365
+
366
+        ## Multipart decision
367
+        multipart = False
368
+        if self.config.enable_multipart:
369
+            if size > self.config.multipart_chunk_size_mb * 1024 * 1024:
370
+                multipart = True
371
+        if multipart:
372
+            # Multipart requests are quite different... drop here
373
+            return self.send_file_multipart(file, headers, uri, size)
374
+
375
+        ## Not multipart...
376
+        headers["content-length"] = size
363 377
         request = self.create_request("OBJECT_PUT", uri = uri, headers = headers)
364 378
         labels = { 'source' : unicodise(filename), 'destination' : unicodise(uri.uri()), 'extra' : extra_label }
365 379
         response = self.send_file(request, file, labels)
... ...
@@ -558,7 +577,9 @@ class S3(object):
558 558
             for header in headers.keys():
559 559
                 headers[header] = str(headers[header])
560 560
             conn = self.get_connection(resource['bucket'])
561
-            conn.request(method_string, self.format_uri(resource), body, headers)
561
+            uri = self.format_uri(resource)
562
+            debug("Sending request method_string=%r, uri=%r, headers=%r, body=(%i bytes)" % (method_string, uri, headers, len(body or "")))
563
+            conn.request(method_string, uri, body, headers)
562 564
             response = {}
563 565
             http_response = conn.getresponse()
564 566
             response["status"] = http_response.status
... ...
@@ -600,7 +621,7 @@ class S3(object):
600 600
 
601 601
         return response
602 602
 
603
-    def send_file(self, request, file, labels, throttle = 0, retries = _max_retries):
603
+    def send_file(self, request, file, labels, throttle = 0, retries = _max_retries, offset = 0, chunk_size = -1):
604 604
         method_string, resource, headers = request.get_triplet()
605 605
         size_left = size_total = headers.get("content-length")
606 606
         if self.config.progress_meter:
... ...
@@ -623,15 +644,15 @@ class S3(object):
623 623
                 warning("Waiting %d sec..." % self._fail_wait(retries))
624 624
                 time.sleep(self._fail_wait(retries))
625 625
                 # Connection error -> same throttle value
626
-                return self.send_file(request, file, labels, throttle, retries - 1)
626
+                return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
627 627
             else:
628 628
                 raise S3UploadError("Upload failed for: %s" % resource['uri'])
629
-        file.seek(0)
629
+        file.seek(offset)
630 630
         md5_hash = md5()
631 631
         try:
632 632
             while (size_left > 0):
633 633
                 #debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name))
634
-                data = file.read(self.config.send_chunk)
634
+                data = file.read(min(self.config.send_chunk, size_left))
635 635
                 md5_hash.update(data)
636 636
                 conn.send(data)
637 637
                 if self.config.progress_meter:
... ...
@@ -660,7 +681,7 @@ class S3(object):
660 660
                 warning("Waiting %d sec..." % self._fail_wait(retries))
661 661
                 time.sleep(self._fail_wait(retries))
662 662
                 # Connection error -> same throttle value
663
-                return self.send_file(request, file, labels, throttle, retries - 1)
663
+                return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
664 664
             else:
665 665
                 debug("Giving up on '%s' %s" % (file.name, e))
666 666
                 raise S3UploadError("Upload failed for: %s" % resource['uri'])
... ...
@@ -682,7 +703,7 @@ class S3(object):
682 682
             redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
683 683
             self.set_hostname(redir_bucket, redir_hostname)
684 684
             warning("Redirected to: %s" % (redir_hostname))
685
-            return self.send_file(request, file, labels)
685
+            return self.send_file(request, file, labels, offset = offset, chunk_size = chunk_size)
686 686
 
687 687
         # S3 from time to time doesn't send ETag back in a response :-(
688 688
         # Force re-upload here.
... ...
@@ -705,7 +726,7 @@ class S3(object):
705 705
                     warning("Upload failed: %s (%s)" % (resource['uri'], S3Error(response)))
706 706
                     warning("Waiting %d sec..." % self._fail_wait(retries))
707 707
                     time.sleep(self._fail_wait(retries))
708
-                    return self.send_file(request, file, labels, throttle, retries - 1)
708
+                    return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
709 709
                 else:
710 710
                     warning("Too many failures. Giving up on '%s'" % (file.name))
711 711
                     raise S3UploadError
... ...
@@ -718,13 +739,21 @@ class S3(object):
718 718
             warning("MD5 Sums don't match!")
719 719
             if retries:
720 720
                 warning("Retrying upload of %s" % (file.name))
721
-                return self.send_file(request, file, labels, throttle, retries - 1)
721
+                return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
722 722
             else:
723 723
                 warning("Too many failures. Giving up on '%s'" % (file.name))
724 724
                 raise S3UploadError
725 725
 
726 726
         return response
727 727
 
728
+    def send_file_multipart(self, file, headers, uri, size):
729
+        chunk_size = self.config.multipart_chunk_size_mb * 1024 * 1024
730
+        upload = MultiPartUpload(self, file, uri, headers)
731
+        upload.upload_all_parts()
732
+        response = upload.complete_multipart_upload()
733
+        response["speed"] = 0 # XXX
734
+        return response
735
+
728 736
     def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries):
729 737
         method_string, resource, headers = request.get_triplet()
730 738
         if self.config.progress_meter:
... ...
@@ -40,6 +40,9 @@ class S3Uri(object):
40 40
     def __unicode__(self):
41 41
         return self.uri()
42 42
 
43
+    def __repr__(self):
44
+        return "<%s: %s>" % (self.__class__.__name__, self.__unicode__())
45
+
43 46
     def public_url(self):
44 47
         raise ValueError("This S3 URI does not have Anonymous URL representation")
45 48
 
... ...
@@ -837,6 +837,10 @@ def cmd_sync_local2remote(args):
837 837
 
838 838
     s3 = S3(cfg)
839 839
 
840
+    ## FIXME
841
+    cfg.multipart_enabled = False
842
+    warning(u"MultiPart: disabled for 'sync' command. Don't panic, we'll fix it!")
843
+
840 844
     if cfg.encrypt:
841 845
         error(u"S3cmd 'sync' doesn't yet support GPG encryption, sorry.")
842 846
         error(u"Either use unconditional 's3cmd put --recursive'")
... ...
@@ -1526,6 +1530,9 @@ def main():
1526 1526
     optparser.add_option(      "--encoding", dest="encoding", metavar="ENCODING", help="Override autodetected terminal and filesystem encoding (character set). Autodetected: %s" % preferred_encoding)
1527 1527
     optparser.add_option(      "--verbatim", dest="urlencoding_mode", action="store_const", const="verbatim", help="Use the S3 name as given on the command line. No pre-processing, encoding, etc. Use with caution!")
1528 1528
 
1529
+    optparser.add_option(      "--disable-multipart", dest="enable_multipart", action="store_false", help="Disable multipart upload on files bigger than --multipart-chunk-size")
1530
+    optparser.add_option(      "--multipart-chunk-size-mb", dest="multipart_chunk_size_mb", type="int", action="store", metavar="SIZE", help="Size of each chunk of a multipart upload. Files bigger than SIZE are automatically uploaded as multithreaded-multipart, smaller files are uploaded using the traditional method. SIZE is in Mega-Bytes, default chunk size is %defaultMB, minimum allowed chunk size is 5MB, maximum is 5GB.")
1531
+
1529 1532
     optparser.add_option(      "--list-md5", dest="list_md5", action="store_true", help="Include MD5 sums in bucket listings (only for 'ls' command).")
1530 1533
     optparser.add_option("-H", "--human-readable-sizes", dest="human_readable_sizes", action="store_true", help="Print sizes in human readable form (eg 1kB instead of 1234).")
1531 1534
 
... ...
@@ -1630,7 +1637,7 @@ def main():
1630 1630
     if options.check_md5 == False:
1631 1631
         try:
1632 1632
             cfg.sync_checks.remove("md5")
1633
-        except:
1633
+        except Exception:
1634 1634
             pass
1635 1635
     if options.check_md5 == True and cfg.sync_checks.count("md5") == 0:
1636 1636
         cfg.sync_checks.append("md5")
... ...
@@ -1649,6 +1656,12 @@ def main():
1649 1649
     cfg.update_option("enable", options.enable)
1650 1650
     cfg.update_option("acl_public", options.acl_public)
1651 1651
 
1652
+    ## Check multipart chunk constraints
1653
+    if cfg.multipart_chunk_size_mb < MultiPartUpload.MIN_CHUNK_SIZE_MB:
1654
+        raise ParameterError("Chunk size %d MB is too small, must be >= %d MB. Please adjust --multipart-chunk-size-mb" % (cfg.multipart_chunk_size_mb, MultiPartUpload.MIN_CHUNK_SIZE_MB))
1655
+    if cfg.multipart_chunk_size_mb > MultiPartUpload.MAX_CHUNK_SIZE_MB:
1656
+        raise ParameterError("Chunk size %d MB is too large, must be <= %d MB. Please adjust --multipart-chunk-size-mb" % (cfg.multipart_chunk_size_mb, MultiPartUpload.MAX_CHUNK_SIZE_MB))
1657
+
1652 1658
     ## CloudFront's cf_enable and Config's enable share the same --enable switch
1653 1659
     options.cf_enable = options.enable
1654 1660
 
... ...
@@ -1786,6 +1799,7 @@ if __name__ == '__main__':
1786 1786
         from S3.CloudFront import Cmd as CfCmd
1787 1787
         from S3.CloudFront import CloudFront
1788 1788
         from S3.FileLists import *
1789
+        from S3.MultiPart import MultiPartUpload
1789 1790
 
1790 1791
         main()
1791 1792
         sys.exit(0)