Browse code

Merge remote-tracking branch 'kuenishi/more_multipart' into merge

Matt Domsch authored on 2013/11/26 09:57:05
Showing 5 changed files
... ...
@@ -38,6 +38,8 @@ class Config(object):
38 38
     force = False
39 39
     enable = None
40 40
     get_continue = False
41
+    put_continue = False
42
+    upload_id = None
41 43
     skip_existing = False
42 44
     recursive = False
43 45
     acl_public = None
... ...
@@ -3,10 +3,12 @@
3 3
 ## License: GPL Version 2
4 4
 
5 5
 import os
6
+import sys
6 7
 from stat import ST_SIZE
7 8
 from logging import debug, info, warning, error
8
-from Utils import getTextFromXml, formatSize, unicodise
9
+from Utils import getTextFromXml, getTreeFromXml, formatSize, unicodise, calculateChecksum, parseNodes
9 10
 from Exceptions import S3UploadError
11
+from collections import defaultdict
10 12
 
11 13
 class MultiPartUpload(object):
12 14
 
... ...
@@ -22,15 +24,55 @@ class MultiPartUpload(object):
22 22
         self.headers_baseline = headers_baseline
23 23
         self.upload_id = self.initiate_multipart_upload()
24 24
 
25
+    def get_parts_information(self, uri, upload_id):
26
+        multipart_response = self.s3.list_multipart(uri, upload_id)
27
+        tree = getTreeFromXml(multipart_response['data'])
28
+
29
+        parts = defaultdict(lambda: None)
30
+        for elem in parseNodes(tree):
31
+            try:
32
+                parts[int(elem['PartNumber'])] = {'checksum': elem['ETag'], 'size': elem['Size']}
33
+            except KeyError:
34
+                pass
35
+
36
+        return parts
37
+
38
+    def get_unique_upload_id(self, uri):
39
+        upload_id = None
40
+        multipart_response = self.s3.get_multipart(uri)
41
+        tree = getTreeFromXml(multipart_response['data'])
42
+        for mpupload in parseNodes(tree):
43
+            try:
44
+                mp_upload_id = mpupload['UploadId']
45
+                mp_path = mpupload['Key']
46
+                info("mp_path: %s, object: %s" % (mp_path, uri.object()))
47
+                if mp_path == uri.object():
48
+                    if upload_id is not None:
49
+                        raise ValueError("More than one UploadId for URI %s.  Disable multipart upload, or use\n %s multipart %s\nto list the Ids, then pass a unique --upload-id into the put command." % (uri, sys.argv[0], uri))
50
+                    upload_id = mp_upload_id
51
+            except KeyError:
52
+                pass
53
+
54
+        return upload_id
55
+
25 56
     def initiate_multipart_upload(self):
26 57
         """
27 58
         Begin a multipart upload
28 59
         http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadInitiate.html
29 60
         """
30
-        request = self.s3.create_request("OBJECT_POST", uri = self.uri, headers = self.headers_baseline, extra = "?uploads")
31
-        response = self.s3.send_request(request)
32
-        data = response["data"]
33
-        self.upload_id = getTextFromXml(data, "UploadId")
61
+        if self.s3.config.upload_id is not None:
62
+            self.upload_id = self.s3.config.upload_id
63
+        elif self.s3.config.put_continue:
64
+            self.upload_id = self.get_unique_upload_id(self.uri)
65
+        else:
66
+            self.upload_id = None
67
+
68
+        if self.upload_id is None:
69
+            request = self.s3.create_request("OBJECT_POST", uri = self.uri, headers = self.headers_baseline, extra = "?uploads")
70
+            response = self.s3.send_request(request)
71
+            data = response["data"]
72
+            self.upload_id = getTextFromXml(data, "UploadId")
73
+
34 74
         return self.upload_id
35 75
 
36 76
     def upload_all_parts(self):
... ...
@@ -51,6 +93,10 @@ class MultiPartUpload(object):
51 51
         else:
52 52
             debug("MultiPart: Uploading from %s" % (self.file.name))
53 53
 
54
+        remote_statuses = defaultdict(lambda: None)
55
+        if self.s3.config.put_continue:
56
+            remote_statuses = self.get_parts_information(self.uri, self.upload_id)
57
+
54 58
         seq = 1
55 59
         if self.file.name != "<stdin>":
56 60
             while size_left > 0:
... ...
@@ -63,10 +109,10 @@ class MultiPartUpload(object):
63 63
                     'extra' : "[part %d of %d, %s]" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
64 64
                 }
65 65
                 try:
66
-                    self.upload_part(seq, offset, current_chunk_size, labels)
66
+                    self.upload_part(seq, offset, current_chunk_size, labels, remote_status = remote_statuses[seq])
67 67
                 except:
68
-                    error(u"Upload of '%s' part %d failed. Aborting multipart upload." % (self.file.name, seq))
69
-                    self.abort_upload()
68
+                    error(u"\nUpload of '%s' part %d failed. Use\n  %s abortmp %s %s\nto abort the upload, or\n  %s --upload-id %s put ...\nto continue the upload."
69
+                          % (self.file.name, seq, sys.argv[0], self.uri, self.upload_id, sys.argv[0], self.upload_id))
70 70
                     raise
71 71
                 seq += 1
72 72
         else:
... ...
@@ -82,22 +128,38 @@ class MultiPartUpload(object):
82 82
                 if len(buffer) == 0: # EOF
83 83
                     break
84 84
                 try:
85
-                    self.upload_part(seq, offset, current_chunk_size, labels, buffer)
85
+                    self.upload_part(seq, offset, current_chunk_size, labels, buffer, remote_status = remote_statuses[seq])
86 86
                 except:
87
-                    error(u"Upload of '%s' part %d failed. Aborting multipart upload." % (self.file.name, seq))
88
-                    self.abort_upload()
87
+                    error(u"\nUpload of '%s' part %d failed. Use\n  %s abortmp %s %s\nto abort, or\n  %s --upload-id %s put ...\nto continue the upload."
88
+                          % (self.file.name, seq, self.uri, sys.argv[0], self.upload_id, sys.argv[0], self.upload_id))
89 89
                     raise
90 90
                 seq += 1
91 91
 
92 92
         debug("MultiPart: Upload finished: %d parts", seq - 1)
93 93
 
94
-    def upload_part(self, seq, offset, chunk_size, labels, buffer = ''):
94
+    def upload_part(self, seq, offset, chunk_size, labels, buffer = '', remote_status = None):
95 95
         """
96 96
         Upload a file chunk
97 97
         http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
98 98
         """
99 99
         # TODO implement Content-MD5
100 100
         debug("Uploading part %i of %r (%s bytes)" % (seq, self.upload_id, chunk_size))
101
+
102
+        if remote_status is not None:
103
+            if int(remote_status['size']) == chunk_size:
104
+                checksum = calculateChecksum(buffer, self.file, offset, chunk_size, self.s3.config.send_chunk)
105
+                remote_checksum = remote_status['checksum'].strip('"')
106
+                if remote_checksum == checksum:
107
+                    warning("MultiPart: size and md5sum match for %s part %d, skipping." % (self.uri, seq))
108
+                    self.parts[seq] = remote_status['checksum']
109
+                    return
110
+                else:
111
+                    warning("MultiPart: checksum (%s vs %s) does not match for %s part %d, reuploading."
112
+                            % (remote_checksum, checksum, self.uri, seq))
113
+            else:
114
+                warning("MultiPart: size (%d vs %d) does not match for %s part %d, reuploading."
115
+                        % (int(remote_status['size']), chunk_size, self.uri, seq))
116
+
101 117
         headers = { "content-length": chunk_size }
102 118
         query_string = "?partNumber=%i&uploadId=%s" % (seq, self.upload_id)
103 119
         request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string)
... ...
@@ -130,8 +192,19 @@ class MultiPartUpload(object):
130 130
         http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadAbort.html
131 131
         """
132 132
         debug("MultiPart: Aborting upload: %s" % self.upload_id)
133
-        request = self.s3.create_request("OBJECT_DELETE", uri = self.uri, extra = "?uploadId=%s" % (self.upload_id))
134
-        response = self.s3.send_request(request)
133
+        #request = self.s3.create_request("OBJECT_DELETE", uri = self.uri, extra = "?uploadId=%s" % (self.upload_id))
134
+        #response = self.s3.send_request(request)
135
+        response = None
135 136
         return response
136 137
 
137 138
 # vim:et:ts=4:sts=4:ai
139
+
140
+
141
+
142
+
143
+
144
+
145
+
146
+
147
+
148
+
... ...
@@ -438,6 +438,31 @@ class S3(object):
438 438
             return self.send_file_multipart(file, headers, uri, size)
439 439
 
440 440
         ## Not multipart...
441
+        if self.config.put_continue:
442
+            # Note, if input was stdin, we would be performing multipart upload.
443
+            # So this will always work as long as the file already uploaded was
444
+            # not uploaded via MultiUpload, in which case its ETag will not be
445
+            # an md5.
446
+            try:
447
+                info = self.object_info(uri)
448
+            except:
449
+                info = None
450
+
451
+            if info is not None:
452
+                remote_size = int(info['headers']['content-length'])
453
+                remote_checksum = info['headers']['etag'].strip('"')
454
+                if size == remote_size:
455
+                    checksum = calculateChecksum('', file, 0, size, self.config.send_chunk)
456
+                    if remote_checksum == checksum:
457
+                        warning("Put: size and md5sum match for %s, skipping." % uri)
458
+                        return
459
+                    else:
460
+                        warning("MultiPart: checksum (%s vs %s) does not match for %s, reuploading."
461
+                                % (remote_checksum, checksum, uri))
462
+                else:
463
+                    warning("MultiPart: size (%d vs %d) does not match for %s, reuploading."
464
+                            % (remote_size, size, uri))
465
+
441 466
         headers["content-length"] = size
442 467
         request = self.create_request("OBJECT_PUT", uri = uri, headers = headers)
443 468
         labels = { 'source' : unicodise(filename), 'destination' : unicodise(uri.uri()), 'extra' : extra_label }
... ...
@@ -535,6 +560,23 @@ class S3(object):
535 535
         response = self.send_request(request)
536 536
         return response
537 537
 
538
+    def get_multipart(self, uri):
539
+        request = self.create_request("BUCKET_LIST", bucket = uri.bucket(), extra = "?uploads")
540
+        response = self.send_request(request)
541
+        return response
542
+
543
+    def abort_multipart(self, uri, id):
544
+        request = self.create_request("OBJECT_DELETE", uri=uri,
545
+                                      extra = ("?uploadId=%s" % id))
546
+        response = self.send_request(request)
547
+        return response
548
+
549
+    def list_multipart(self, uri, id):
550
+        request = self.create_request("OBJECT_GET", uri=uri,
551
+                                      extra = ("?uploadId=%s" % id))
552
+        response = self.send_request(request)
553
+        return response
554
+
538 555
     def get_accesslog(self, uri):
539 556
         request = self.create_request("BUCKET_LIST", bucket = uri.bucket(), extra = "?logging")
540 557
         response = self.send_request(request)
... ...
@@ -739,6 +781,7 @@ class S3(object):
739 739
         if buffer == '':
740 740
             file.seek(offset)
741 741
         md5_hash = md5()
742
+
742 743
         try:
743 744
             while (size_left > 0):
744 745
                 #debug("SendFile: Reading up to %d bytes from '%s' - remaining bytes: %s" % (self.config.send_chunk, file.name, size_left))
... ...
@@ -746,6 +789,7 @@ class S3(object):
746 746
                     data = file.read(min(self.config.send_chunk, size_left))
747 747
                 else:
748 748
                     data = buffer
749
+
749 750
                 md5_hash.update(data)
750 751
                 conn.c.send(data)
751 752
                 if self.config.progress_meter:
... ...
@@ -754,6 +798,7 @@ class S3(object):
754 754
                 if throttle:
755 755
                     time.sleep(throttle)
756 756
             md5_computed = md5_hash.hexdigest()
757
+
757 758
             response = {}
758 759
             http_response = conn.c.getresponse()
759 760
             response["status"] = http_response.status
... ...
@@ -459,4 +459,22 @@ def getHostnameFromBucket(bucket):
459 459
     return Config.Config().host_bucket % { 'bucket' : bucket }
460 460
 __all__.append("getHostnameFromBucket")
461 461
 
462
+
463
+def calculateChecksum(buffer, mfile, offset, chunk_size, send_chunk):
464
+    md5_hash = md5()
465
+    size_left = chunk_size
466
+    if buffer == '':
467
+        mfile.seek(offset)
468
+        while size_left > 0:
469
+            data = mfile.read(min(send_chunk, size_left))
470
+            md5_hash.update(data)
471
+            size_left -= len(data)
472
+    else:
473
+        md5_hash.update(buffer)
474
+
475
+    return md5_hash.hexdigest()
476
+
477
+
478
+__all__.append("calculateChecksum")
479
+
462 480
 # vim:et:ts=4:sts=4:ai
... ...
@@ -339,14 +339,15 @@ def cmd_object_put(args):
339 339
         except InvalidFileError, e:
340 340
             warning(u"File can not be uploaded: %s" % e)
341 341
             continue
342
-        speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
343
-        if not Config().progress_meter:
344
-            output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
345
-                (unicodise(full_name_orig), uri_final, response["size"], response["elapsed"],
346
-                speed_fmt[0], speed_fmt[1], seq_label))
342
+        if response is not None:
343
+            speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
344
+            if not Config().progress_meter:
345
+                output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
346
+                       (unicodise(full_name_orig), uri_final, response["size"], response["elapsed"],
347
+                        speed_fmt[0], speed_fmt[1], seq_label))
347 348
         if Config().acl_public:
348 349
             output(u"Public URL of the object is: %s" %
349
-                (uri_final.public_url()))
350
+                   (uri_final.public_url()))
350 351
         if Config().encrypt and full_name != full_name_orig:
351 352
             debug(u"Removing temporary encrypted file: %s" % unicodise(full_name))
352 353
             os.remove(full_name)
... ...
@@ -1340,6 +1341,50 @@ def cmd_delpolicy(args):
1340 1340
     output(u"%s: Policy deleted" % uri)
1341 1341
 
1342 1342
 
1343
+def cmd_multipart(args):
1344
+    s3 = S3(cfg)
1345
+    uri = S3Uri(args[0])
1346
+
1347
+    #id = ''
1348
+    #if(len(args) > 1): id = args[1]
1349
+
1350
+    response = s3.get_multipart(uri)
1351
+    debug(u"response - %s" % response['status'])
1352
+    output(u"%s" % uri)
1353
+    tree = getTreeFromXml(response['data'])
1354
+    debug(parseNodes(tree))
1355
+    output(u"Initiated\tPath\tId")
1356
+    for mpupload in parseNodes(tree):
1357
+        try:
1358
+            output("%s\t%s\t%s" % (mpupload['Initiated'], "s3://" + uri.bucket() + "/" + mpupload['Key'], mpupload['UploadId']))
1359
+        except KeyError:
1360
+            pass
1361
+
1362
+def cmd_abort_multipart(args):
1363
+    '''{"cmd":"abortmp",   "label":"abort a multipart upload", "param":"s3://BUCKET Id", "func":cmd_abort_multipart, "argc":2},'''
1364
+    s3 = S3(cfg)
1365
+    uri = S3Uri(args[0])
1366
+    id = args[1]
1367
+    response = s3.abort_multipart(uri, id)
1368
+    debug(u"response - %s" % response['status'])
1369
+    output(u"%s" % uri)
1370
+
1371
+def cmd_list_multipart(args):
1372
+    '''{"cmd":"abortmp",   "label":"list a multipart upload", "param":"s3://BUCKET Id", "func":cmd_list_multipart, "argc":2},'''
1373
+    s3 = S3(cfg)
1374
+    uri = S3Uri(args[0])
1375
+    id = args[1]
1376
+
1377
+    response = s3.list_multipart(uri, id)
1378
+    debug(u"response - %s" % response['status'])
1379
+    tree = getTreeFromXml(response['data'])
1380
+    output(u"LastModified\t\t\tPartNumber\tETag\tSize")
1381
+    for mpupload in parseNodes(tree):
1382
+        try:
1383
+            output("%s\t%s\t%s\t%s" % (mpupload['LastModified'], mpupload['PartNumber'], mpupload['ETag'], mpupload['Size']))
1384
+        except:
1385
+            pass
1386
+
1343 1387
 def cmd_accesslog(args):
1344 1388
     s3 = S3(cfg)
1345 1389
     bucket_uri = S3Uri(args.pop())
... ...
@@ -1694,6 +1739,11 @@ def get_commands_list():
1694 1694
     {"cmd":"setpolicy", "label":"Modify Bucket Policy", "param":"FILE s3://BUCKET", "func":cmd_setpolicy, "argc":2},
1695 1695
     {"cmd":"delpolicy", "label":"Delete Bucket Policy", "param":"s3://BUCKET", "func":cmd_delpolicy, "argc":1},
1696 1696
 
1697
+    {"cmd":"multipart", "label":"show multipart uploads", "param":"s3://BUCKET [Id]", "func":cmd_multipart, "argc":1},
1698
+    {"cmd":"abortmp",   "label":"abort a multipart upload", "param":"s3://BUCKET/OBJECT Id", "func":cmd_abort_multipart, "argc":2},
1699
+
1700
+    {"cmd":"listmp",    "label":"list parts of a multipart upload", "param":"s3://BUCKET/OBJECT Id", "func":cmd_list_multipart, "argc":2},
1701
+
1697 1702
     {"cmd":"accesslog", "label":"Enable/disable bucket access logging", "param":"s3://BUCKET", "func":cmd_accesslog, "argc":1},
1698 1703
     {"cmd":"sign", "label":"Sign arbitrary string using the secret key", "param":"STRING-TO-SIGN", "func":cmd_sign, "argc":1},
1699 1704
     {"cmd":"signurl", "label":"Sign an S3 URL to provide limited public access with expiry", "param":"s3://BUCKET/OBJECT expiry_epoch", "func":cmd_signurl, "argc":2},
... ...
@@ -1834,6 +1884,8 @@ def main():
1834 1834
     optparser.add_option(      "--no-encrypt", dest="encrypt", action="store_false", help="Don't encrypt files.")
1835 1835
     optparser.add_option("-f", "--force", dest="force", action="store_true", help="Force overwrite and other dangerous operations.")
1836 1836
     optparser.add_option(      "--continue", dest="get_continue", action="store_true", help="Continue getting a partially downloaded file (only for [get] command).")
1837
+    optparser.add_option(      "--continue-put", dest="put_continue", action="store_true", help="Continue uploading partially uploaded files or multipart upload parts.  Restarts/parts files that don't have matching size and md5.  Skips files/parts that do.  Note: md5sum checks are not always sufficient to check (part) file equality.  Enable this at your own risk.")
1838
+    optparser.add_option(      "--upload-id", dest="upload_id", help="UploadId for Multipart Upload, in case you want continue an existing upload (equivalent to --continue-put) and there are multiple partial uploads.  Use s3cmd multipart [URI] to see what UploadIds are associated with the given URI.")
1837 1839
     optparser.add_option(      "--skip-existing", dest="skip_existing", action="store_true", help="Skip over files that exist at the destination (only for [get] and [sync] commands).")
1838 1840
     optparser.add_option("-r", "--recursive", dest="recursive", action="store_true", help="Recursive upload, download or removal.")
1839 1841
     optparser.add_option(      "--check-md5", dest="check_md5", action="store_true", help="Check MD5 sums when comparing files for [sync]. (default)")
... ...
@@ -2027,6 +2079,14 @@ def main():
2027 2027
     if cfg.multipart_chunk_size_mb > MultiPartUpload.MAX_CHUNK_SIZE_MB:
2028 2028
         raise ParameterError("Chunk size %d MB is too large, must be <= %d MB. Please adjust --multipart-chunk-size-mb" % (cfg.multipart_chunk_size_mb, MultiPartUpload.MAX_CHUNK_SIZE_MB))
2029 2029
 
2030
+    ## If an UploadId was provided, set put_continue True
2031
+    if options.upload_id is not None:
2032
+        cfg.upload_id = options.upload_id
2033
+        cfg.put_continue = True
2034
+
2035
+    if cfg.upload_id and not cfg.multipart_chunk_size_mb:
2036
+        raise ParameterError("Must have --multipart-chunk-size-mb if using --put-continue or --upload-id")
2037
+
2030 2038
     ## CloudFront's cf_enable and Config's enable share the same --enable switch
2031 2039
     options.cf_enable = options.enable
2032 2040