Browse code

Added --continue-put and --upload-id, allowing skipping of files or part files during upload when their size and md5sum match the file being uploaded.

Eugene Brevdo authored on 2013/07/16 06:30:31
Showing 5 changed files
... ...
@@ -35,6 +35,8 @@ class Config(object):
35 35
     force = False
36 36
     enable = None
37 37
     get_continue = False
38
+    put_continue = False
39
+    upload_id = None
38 40
     skip_existing = False
39 41
     recursive = False
40 42
     acl_public = None
... ...
@@ -172,8 +174,6 @@ class Config(object):
172 172
                         else:
173 173
                             print_value = data["value"]
174 174
                         debug("env_Config: %s->%s" % (data["key"], print_value))
175
-                
176
-        
177 175
 
178 176
     def option_list(self):
179 177
         retval = []
... ...
@@ -3,10 +3,12 @@
3 3
 ## License: GPL Version 2
4 4
 
5 5
 import os
6
+import sys
6 7
 from stat import ST_SIZE
7 8
 from logging import debug, info, warning, error
8
-from Utils import getTextFromXml, formatSize, unicodise
9
+from Utils import getTextFromXml, getTreeFromXml, formatSize, unicodise, calculateChecksum, parseNodes
9 10
 from Exceptions import S3UploadError
11
+from collections import defaultdict
10 12
 
11 13
 class MultiPartUpload(object):
12 14
 
... ...
@@ -22,15 +24,55 @@ class MultiPartUpload(object):
22 22
         self.headers_baseline = headers_baseline
23 23
         self.upload_id = self.initiate_multipart_upload()
24 24
 
25
+    def get_parts_information(self, uri, upload_id):
26
+        multipart_response = self.s3.list_multipart(uri, upload_id)
27
+        tree = getTreeFromXml(multipart_response['data'])
28
+
29
+        parts = defaultdict(lambda: None)
30
+        for elem in parseNodes(tree):
31
+            try:
32
+                parts[int(elem['PartNumber'])] = {'checksum': elem['ETag'], 'size': elem['Size']}
33
+            except KeyError:
34
+                pass
35
+
36
+        return parts
37
+
38
+    def get_unique_upload_id(self, uri):
39
+        upload_id = None
40
+        multipart_response = self.s3.get_multipart(uri)
41
+        tree = getTreeFromXml(multipart_response['data'])
42
+        for mpupload in parseNodes(tree):
43
+            try:
44
+                mp_upload_id = mpupload['UploadId']
45
+                mp_path = mpupload['Key']
46
+                info("mp_path: %s, object: %s" % (mp_path, uri.object()))
47
+                if mp_path == uri.object():
48
+                    if upload_id is not None:
49
+                        raise ValueError("More than one UploadId for URI %s.  Disable multipart upload, or use\n %s multipart %s\nto list the Ids, then pass a unique --upload-id into the put command." % (uri, sys.argv[0], uri))
50
+                    upload_id = mp_upload_id
51
+            except KeyError:
52
+                pass
53
+
54
+        return upload_id
55
+
25 56
     def initiate_multipart_upload(self):
26 57
         """
27 58
         Begin a multipart upload
28 59
         http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadInitiate.html
29 60
         """
30
-        request = self.s3.create_request("OBJECT_POST", uri = self.uri, headers = self.headers_baseline, extra = "?uploads")
31
-        response = self.s3.send_request(request)
32
-        data = response["data"]
33
-        self.upload_id = getTextFromXml(data, "UploadId")
61
+        if self.s3.config.upload_id is not None:
62
+            self.upload_id = self.s3.config.upload_id
63
+        elif self.s3.config.put_continue:
64
+            self.upload_id = self.get_unique_upload_id(self.uri)
65
+        else:
66
+            self.upload_id = None
67
+
68
+        if self.upload_id is None:
69
+            request = self.s3.create_request("OBJECT_POST", uri = self.uri, headers = self.headers_baseline, extra = "?uploads")
70
+            response = self.s3.send_request(request)
71
+            data = response["data"]
72
+            self.upload_id = getTextFromXml(data, "UploadId")
73
+
34 74
         return self.upload_id
35 75
 
36 76
     def upload_all_parts(self):
... ...
@@ -51,6 +93,10 @@ class MultiPartUpload(object):
51 51
         else:
52 52
             debug("MultiPart: Uploading from %s" % (self.file.name))
53 53
 
54
+        remote_statuses = None
55
+        if self.s3.config.put_continue:
56
+            remote_statuses = self.get_parts_information(self.uri, self.upload_id)
57
+
54 58
         seq = 1
55 59
         if self.file.name != "<stdin>":
56 60
             while size_left > 0:
... ...
@@ -63,10 +109,10 @@ class MultiPartUpload(object):
63 63
                     'extra' : "[part %d of %d, %s]" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
64 64
                 }
65 65
                 try:
66
-                    self.upload_part(seq, offset, current_chunk_size, labels)
66
+                    self.upload_part(seq, offset, current_chunk_size, labels, remote_status = remote_statuses[seq])
67 67
                 except:
68
-                    error(u"Upload of '%s' part %d failed. Aborting multipart upload." % (self.file.name, seq))
69
-                    self.abort_upload()
68
+                    error(u"\nUpload of '%s' part %d failed. Use\n  %s abortmp %s %s\nto abort the upload, or\n  %s --upload-id %s put ...\nto continue the upload."
69
+                          % (self.file.name, seq, sys.argv[0], self.uri, self.upload_id, sys.argv[0], self.upload_id))
70 70
                     raise
71 71
                 seq += 1
72 72
         else:
... ...
@@ -82,22 +128,37 @@ class MultiPartUpload(object):
82 82
                 if len(buffer) == 0: # EOF
83 83
                     break
84 84
                 try:
85
-                    self.upload_part(seq, offset, current_chunk_size, labels, buffer)
85
+                    self.upload_part(seq, offset, current_chunk_size, labels, buffer, remote_status = remote_statuses[seq])
86 86
                 except:
87
-                    error(u"Upload of '%s' part %d failed. Aborting multipart upload." % (self.file.name, seq))
88
-                    self.abort_upload()
87
+                    error(u"\nUpload of '%s' part %d failed. Use\n  %s abortmp %s %s\nto abort, or\n  %s --upload-id %s put ...\nto continue the upload."
88
+                          % (self.file.name, seq, self.uri, sys.argv[0], self.upload_id, sys.argv[0], self.upload_id))
89 89
                     raise
90 90
                 seq += 1
91 91
 
92 92
         debug("MultiPart: Upload finished: %d parts", seq - 1)
93 93
 
94
-    def upload_part(self, seq, offset, chunk_size, labels, buffer = ''):
94
+    def upload_part(self, seq, offset, chunk_size, labels, buffer = '', remote_status = None):
95 95
         """
96 96
         Upload a file chunk
97 97
         http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
98 98
         """
99 99
         # TODO implement Content-MD5
100 100
         debug("Uploading part %i of %r (%s bytes)" % (seq, self.upload_id, chunk_size))
101
+
102
+        if remote_status is not None:
103
+            if int(remote_status['size']) == chunk_size:
104
+                checksum = calculateChecksum(buffer, self.file, offset, chunk_size, self.s3.config.send_chunk)
105
+                remote_checksum = remote_status['checksum'].strip('"')
106
+                if remote_checksum == checksum:
107
+                    warning("MultiPart: size and md5sum match for %s part %d, skipping." % (self.uri, seq))
108
+                    return
109
+                else:
110
+                    warning("MultiPart: checksum (%s vs %s) does not match for %s part %d, reuploading."
111
+                            % (remote_checksum, checksum, self.uri, seq))
112
+            else:
113
+                warning("MultiPart: size (%d vs %d) does not match for %s part %d, reuploading."
114
+                        % (int(remote_status['size']), chunk_size, self.uri, seq))
115
+
101 116
         headers = { "content-length": chunk_size }
102 117
         query_string = "?partNumber=%i&uploadId=%s" % (seq, self.upload_id)
103 118
         request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string)
... ...
@@ -130,8 +191,19 @@ class MultiPartUpload(object):
130 130
         http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadAbort.html
131 131
         """
132 132
         debug("MultiPart: Aborting upload: %s" % self.upload_id)
133
-        request = self.s3.create_request("OBJECT_DELETE", uri = self.uri, extra = "?uploadId=%s" % (self.upload_id))
134
-        response = self.s3.send_request(request)
133
+        #request = self.s3.create_request("OBJECT_DELETE", uri = self.uri, extra = "?uploadId=%s" % (self.upload_id))
134
+        #response = self.s3.send_request(request)
135
+        response = None
135 136
         return response
136 137
 
137 138
 # vim:et:ts=4:sts=4:ai
139
+
140
+
141
+
142
+
143
+
144
+
145
+
146
+
147
+
148
+
... ...
@@ -439,6 +439,31 @@ class S3(object):
439 439
             return self.send_file_multipart(file, headers, uri, size)
440 440
 
441 441
         ## Not multipart...
442
+        if self.config.put_continue:
443
+            # Note, if input was stdin, we would be performing multipart upload.
444
+            # So this will always work as long as the file already uploaded was
445
+            # not uploaded via MultiUpload, in which case its ETag will not be
446
+            # an md5.
447
+            try:
448
+                info = self.object_info(uri)
449
+            except:
450
+                info = None
451
+
452
+            if info is not None:
453
+                remote_size = int(info['headers']['content-length'])
454
+                remote_checksum = info['headers']['etag'].strip('"')
455
+                if size == remote_size:
456
+                    checksum = calculateChecksum('', file, 0, size, self.config.send_chunk)
457
+                    if remote_checksum == checksum:
458
+                        warning("Put: size and md5sum match for %s, skipping." % uri)
459
+                        return
460
+                    else:
461
+                        warning("MultiPart: checksum (%s vs %s) does not match for %s, reuploading."
462
+                                % (remote_checksum, checksum, uri))
463
+                else:
464
+                    warning("MultiPart: size (%d vs %d) does not match for %s, reuploading."
465
+                            % (remote_size, size, uri))
466
+
442 467
         headers["content-length"] = size
443 468
         request = self.create_request("OBJECT_PUT", uri = uri, headers = headers)
444 469
         labels = { 'source' : unicodise(filename), 'destination' : unicodise(uri.uri()), 'extra' : extra_label }
... ...
@@ -754,6 +779,7 @@ class S3(object):
754 754
         if buffer == '':
755 755
             file.seek(offset)
756 756
         md5_hash = md5()
757
+
757 758
         try:
758 759
             while (size_left > 0):
759 760
                 #debug("SendFile: Reading up to %d bytes from '%s' - remaining bytes: %s" % (self.config.send_chunk, file.name, size_left))
... ...
@@ -761,6 +787,7 @@ class S3(object):
761 761
                     data = file.read(min(self.config.send_chunk, size_left))
762 762
                 else:
763 763
                     data = buffer
764
+
764 765
                 md5_hash.update(data)
765 766
                 conn.c.send(data)
766 767
                 if self.config.progress_meter:
... ...
@@ -769,6 +796,7 @@ class S3(object):
769 769
                 if throttle:
770 770
                     time.sleep(throttle)
771 771
             md5_computed = md5_hash.hexdigest()
772
+
772 773
             response = {}
773 774
             http_response = conn.c.getresponse()
774 775
             response["status"] = http_response.status
... ...
@@ -459,4 +459,22 @@ def getHostnameFromBucket(bucket):
459 459
     return Config.Config().host_bucket % { 'bucket' : bucket }
460 460
 __all__.append("getHostnameFromBucket")
461 461
 
462
+
463
+def calculateChecksum(buffer, mfile, offset, chunk_size, send_chunk):
464
+    md5_hash = md5()
465
+    size_left = chunk_size
466
+    if buffer == '':
467
+        mfile.seek(offset)
468
+        while size_left > 0:
469
+            data = mfile.read(min(send_chunk, size_left))
470
+            md5_hash.update(data)
471
+            size_left -= len(data)
472
+    else:
473
+        md5_hash.update(buffer)
474
+
475
+    return md5_hash.hexdigest()
476
+
477
+
478
+__all__.append("calculateChecksum")
479
+
462 480
 # vim:et:ts=4:sts=4:ai
... ...
@@ -325,14 +325,15 @@ def cmd_object_put(args):
325 325
         except InvalidFileError, e:
326 326
             warning(u"File can not be uploaded: %s" % e)
327 327
             continue
328
-        speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
329
-        if not Config().progress_meter:
330
-            output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
331
-                (unicodise(full_name_orig), uri_final, response["size"], response["elapsed"],
332
-                speed_fmt[0], speed_fmt[1], seq_label))
328
+        if response is not None:
329
+            speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
330
+            if not Config().progress_meter:
331
+                output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
332
+                       (unicodise(full_name_orig), uri_final, response["size"], response["elapsed"],
333
+                        speed_fmt[0], speed_fmt[1], seq_label))
333 334
         if Config().acl_public:
334 335
             output(u"Public URL of the object is: %s" %
335
-                (uri_final.public_url()))
336
+                   (uri_final.public_url()))
336 337
         if Config().encrypt and full_name != full_name_orig:
337 338
             debug(u"Removing temporary encrypted file: %s" % unicodise(full_name))
338 339
             os.remove(full_name)
... ...
@@ -1268,7 +1269,7 @@ def cmd_delpolicy(args):
1268 1268
 def cmd_multipart(args):
1269 1269
     s3 = S3(cfg)
1270 1270
     uri = S3Uri(args[0])
1271
-    
1271
+
1272 1272
     #id = ''
1273 1273
     #if(len(args) > 1): id = args[1]
1274 1274
 
... ...
@@ -1277,11 +1278,11 @@ def cmd_multipart(args):
1277 1277
     output(u"%s" % uri)
1278 1278
     tree = getTreeFromXml(response['data'])
1279 1279
     debug(parseNodes(tree))
1280
-    output(u"Initiated\tId\tPath")
1280
+    output(u"Initiated\tPath\tId")
1281 1281
     for mpupload in parseNodes(tree):
1282 1282
         try:
1283
-            output("%s\t%s\t%s" % (mpupload['Initiated'], mpupload['UploadId'], mpupload['Key']))
1284
-        except:
1283
+            output("%s\t%s\t%s" % (mpupload['Initiated'], "s3://" + uri.bucket() + "/" + mpupload['Key'], mpupload['UploadId']))
1284
+        except KeyError:
1285 1285
             pass
1286 1286
 
1287 1287
 def cmd_abort_multipart(args):
... ...
@@ -1808,6 +1809,8 @@ def main():
1808 1808
     optparser.add_option(      "--no-encrypt", dest="encrypt", action="store_false", help="Don't encrypt files.")
1809 1809
     optparser.add_option("-f", "--force", dest="force", action="store_true", help="Force overwrite and other dangerous operations.")
1810 1810
     optparser.add_option(      "--continue", dest="get_continue", action="store_true", help="Continue getting a partially downloaded file (only for [get] command).")
1811
+    optparser.add_option(      "--continue-put", dest="put_continue", action="store_true", help="Continue uploading partially uploaded files or multipart upload parts.  Restarts/parts files that don't have matching size and md5.  Skips files/parts that do.  Note: md5sum checks are not always sufficient to check (part) file equality.  Enable this at your own risk.")
1812
+    optparser.add_option(      "--upload-id", dest="upload_id", help="UploadId for Multipart Upload, in case you want continue an existing upload (equivalent to --continue-put) and there are multiple partial uploads.  Use s3cmd multipart [URI] to see what UploadIds are associated with the given URI.")
1811 1813
     optparser.add_option(      "--skip-existing", dest="skip_existing", action="store_true", help="Skip over files that exist at the destination (only for [get] and [sync] commands).")
1812 1814
     optparser.add_option("-r", "--recursive", dest="recursive", action="store_true", help="Recursive upload, download or removal.")
1813 1815
     optparser.add_option(      "--check-md5", dest="check_md5", action="store_true", help="Check MD5 sums when comparing files for [sync]. (default)")
... ...
@@ -1997,6 +2000,14 @@ def main():
1997 1997
     if cfg.multipart_chunk_size_mb > MultiPartUpload.MAX_CHUNK_SIZE_MB:
1998 1998
         raise ParameterError("Chunk size %d MB is too large, must be <= %d MB. Please adjust --multipart-chunk-size-mb" % (cfg.multipart_chunk_size_mb, MultiPartUpload.MAX_CHUNK_SIZE_MB))
1999 1999
 
2000
+    ## If an UploadId was provided, set put_continue True
2001
+    if options.upload_id is not None:
2002
+        cfg.upload_id = options.upload_id
2003
+        cfg.put_continue = True
2004
+
2005
+    if cfg.put_continue and not cfg.multipart_chunk_size_mb:
2006
+        raise ParameterError("Must have --multipart-chunk-size-mb if using --put-continue or --upload-id")
2007
+
2000 2008
     ## CloudFront's cf_enable and Config's enable share the same --enable switch
2001 2009
     options.cf_enable = options.enable
2002 2010