Browse code

Merge branch 'econnell/master' into econnell-merge

Matt Domsch authored on 2012/07/14 23:02:48
Showing 4 changed files
... ...
@@ -140,6 +140,16 @@ def handle_exclude_include_walk(root, dirs, files):
140 140
 def fetch_local_list(args, recursive = None):
141 141
     def _get_filelist_local(loc_list, local_uri, cache):
142 142
         info(u"Compiling list of local files...")
143
+        
144
+        if deunicodise(local_uri.basename()) == "-":
145
+            loc_list = SortedDict(ignore_case = False)
146
+            loc_list["-"] = {
147
+                'full_name_unicode' : '-',
148
+                'full_name' : '-',
149
+                'size' : -1,
150
+                'mtime' : -1,
151
+            }
152
+            return loc_list, True
143 153
         if local_uri.isdir():
144 154
             local_base = deunicodise(local_uri.basename())
145 155
             local_path = deunicodise(local_uri.path())
... ...
@@ -42,32 +42,56 @@ class MultiPartUpload(object):
42 42
         if not self.upload_id:
43 43
             raise RuntimeError("Attempting to use a multipart upload that has not been initiated.")
44 44
 
45
-        size_left = file_size = os.stat(self.file.name)[ST_SIZE]
46
-        self.chunk_size = self.s3.config.multipart_chunk_size_mb * 1024 * 1024
47
-        nr_parts = file_size / self.chunk_size + (file_size % self.chunk_size and 1)
48
-        debug("MultiPart: Uploading %s in %d parts" % (self.file.name, nr_parts))
45
+        if self.file.name != "<stdin>":
46
+                size_left = file_size = os.stat(self.file.name)[ST_SIZE]
47
+                nr_parts = file_size / self.chunk_size + (file_size % self.chunk_size and 1)
48
+                debug("MultiPart: Uploading %s in %d parts" % (self.file.name, nr_parts))
49
+        else:
50
+            debug("MultiPart: Uploading from %s" % (self.file.name))
51
+
52
+	self.chunk_size = self.s3.config.multipart_chunk_size_mb * 1024 * 1024
49 53
 
50 54
         seq = 1
51
-        while size_left > 0:
52
-            offset = self.chunk_size * (seq - 1)
53
-            current_chunk_size = min(file_size - offset, self.chunk_size)
54
-            size_left -= current_chunk_size
55
-            labels = {
56
-                'source' : unicodise(self.file.name),
57
-                'destination' : unicodise(self.uri.uri()),
58
-                'extra' : "[part %d of %d, %s]" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
59
-            }
60
-            try:
61
-                self.upload_part(seq, offset, current_chunk_size, labels)
62
-            except:
63
-                error(u"Upload of '%s' part %d failed. Aborting multipart upload." % (self.file.name, seq))
64
-                self.abort_upload()
65
-                raise
66
-            seq += 1
55
+	if self.file.name != "<stdin>":
56
+            while size_left > 0:
57
+                offset = self.chunk_size * (seq - 1)
58
+                current_chunk_size = min(file_size - offset, self.chunk_size)
59
+                size_left -= current_chunk_size
60
+                labels = {
61
+                    'source' : unicodise(self.file.name),
62
+                    'destination' : unicodise(self.uri.uri()),
63
+                    'extra' : "[part %d of %d, %s]" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
64
+                }
65
+                try:
66
+                    self.upload_part(seq, offset, current_chunk_size, labels)
67
+                except:
68
+                    error(u"Upload of '%s' part %d failed. Aborting multipart upload." % (self.file.name, seq))
69
+                    self.abort_upload()
70
+                    raise
71
+                seq += 1
72
+        else:
73
+            while True:
74
+                buffer = self.file.read(self.chunk_size)
75
+                offset = self.chunk_size * (seq - 1)
76
+                current_chunk_size = len(buffer)
77
+                labels = {
78
+                    'source' : unicodise(self.file.name),
79
+                    'destination' : unicodise(self.uri.uri()),
80
+                    'extra' : "[part %d, %s]" % (seq, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
81
+                }
82
+                if len(buffer) == 0: # EOF
83
+                    break
84
+                try:
85
+                    self.upload_part(seq, offset, current_chunk_size, labels, buffer)
86
+                except:
87
+                    error(u"Upload of '%s' part %d failed. Aborting multipart upload." % (self.file.name, seq))
88
+                    self.abort_upload()
89
+                    raise
90
+                seq += 1
67 91
 
68 92
         debug("MultiPart: Upload finished: %d parts", seq - 1)
69 93
 
70
-    def upload_part(self, seq, offset, chunk_size, labels):
94
+    def upload_part(self, seq, offset, chunk_size, labels, buffer = ''):
71 95
         """
72 96
         Upload a file chunk
73 97
         http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html
... ...
@@ -77,7 +101,7 @@ class MultiPartUpload(object):
77 77
         headers = { "content-length": chunk_size }
78 78
         query_string = "?partNumber=%i&uploadId=%s" % (seq, self.upload_id)
79 79
         request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string)
80
-        response = self.s3.send_file(request, self.file, labels, offset = offset, chunk_size = chunk_size)
80
+        response = self.s3.send_file(request, self.file, labels, buffer, offset = offset, chunk_size = chunk_size)
81 81
         self.parts[seq] = response["headers"]["etag"]
82 82
         return response
83 83
 
... ...
@@ -345,11 +345,15 @@ class S3(object):
345 345
         if uri.type != "s3":
346 346
             raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
347 347
 
348
-        if not os.path.isfile(filename):
348
+        if filename != "-" and not os.path.isfile(filename):
349 349
             raise InvalidFileError(u"%s is not a regular file" % unicodise(filename))
350 350
         try:
351
-            file = open(filename, "rb")
352
-            size = os.stat(filename)[ST_SIZE]
351
+            if filename == "-":
352
+                file = sys.stdin
353
+                size = 0
354
+            else:
355
+                file = open(filename, "rb")
356
+                size = os.stat(filename)[ST_SIZE]
353 357
         except (IOError, OSError), e:
354 358
             raise InvalidFileError(u"%s: %s" % (unicodise(filename), e.strerror))
355 359
 
... ...
@@ -359,7 +363,7 @@ class S3(object):
359 359
 
360 360
         ## MIME-type handling
361 361
         content_type = self.config.mime_type
362
-        if not content_type and self.config.guess_mime_type:
362
+        if filename != "-" and not content_type and self.config.guess_mime_type:
363 363
             content_type = mime_magic(filename)
364 364
         if not content_type:
365 365
             content_type = self.config.default_mime_type
... ...
@@ -374,8 +378,10 @@ class S3(object):
374 374
 
375 375
         ## Multipart decision
376 376
         multipart = False
377
+        if not self.config.enable_multipart and filename == "-":
378
+            raise ParameterError("Multi-part upload is required to upload from stdin")
377 379
         if self.config.enable_multipart:
378
-            if size > self.config.multipart_chunk_size_mb * 1024 * 1024:
380
+            if size > self.config.multipart_chunk_size_mb * 1024 * 1024 or filename == "-":
379 381
                 multipart = True
380 382
         if multipart:
381 383
             # Multipart requests are quite different... drop here
... ...
@@ -625,7 +631,7 @@ class S3(object):
625 625
 
626 626
         return response
627 627
 
628
-    def send_file(self, request, file, labels, throttle = 0, retries = _max_retries, offset = 0, chunk_size = -1):
628
+    def send_file(self, request, file, labels, buffer = '', throttle = 0, retries = _max_retries, offset = 0, chunk_size = -1):
629 629
         method_string, resource, headers = request.get_triplet()
630 630
         size_left = size_total = headers.get("content-length")
631 631
         if self.config.progress_meter:
... ...
@@ -648,15 +654,19 @@ class S3(object):
648 648
                 warning("Waiting %d sec..." % self._fail_wait(retries))
649 649
                 time.sleep(self._fail_wait(retries))
650 650
                 # Connection error -> same throttle value
651
-                return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
651
+                return self.send_file(request, file, labels, buffer, throttle, retries - 1, offset, chunk_size)
652 652
             else:
653 653
                 raise S3UploadError("Upload failed for: %s" % resource['uri'])
654
-        file.seek(offset)
654
+        if buffer == '':
655
+            file.seek(offset)
655 656
         md5_hash = md5()
656 657
         try:
657 658
             while (size_left > 0):
658
-                #debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name))
659
-                data = file.read(min(self.config.send_chunk, size_left))
659
+                #debug("SendFile: Reading up to %d bytes from '%s' - remaining bytes: %s" % (self.config.send_chunk, file.name, size_left))
660
+                if buffer == '':
661
+                    data = file.read(min(self.config.send_chunk, size_left))
662
+                else:
663
+                    data = buffer
660 664
                 md5_hash.update(data)
661 665
                 conn.send(data)
662 666
                 if self.config.progress_meter:
... ...
@@ -685,7 +695,7 @@ class S3(object):
685 685
                 warning("Waiting %d sec..." % self._fail_wait(retries))
686 686
                 time.sleep(self._fail_wait(retries))
687 687
                 # Connection error -> same throttle value
688
-                return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
688
+                return self.send_file(request, file, labels, buffer, throttle, retries - 1, offset, chunk_size)
689 689
             else:
690 690
                 debug("Giving up on '%s' %s" % (file.name, e))
691 691
                 raise S3UploadError("Upload failed for: %s" % resource['uri'])
... ...
@@ -707,7 +717,7 @@ class S3(object):
707 707
             redir_hostname = getTextFromXml(response['data'], ".//Endpoint")
708 708
             self.set_hostname(redir_bucket, redir_hostname)
709 709
             warning("Redirected to: %s" % (redir_hostname))
710
-            return self.send_file(request, file, labels, offset = offset, chunk_size = chunk_size)
710
+            return self.send_file(request, file, labels, buffer, offset = offset, chunk_size = chunk_size)
711 711
 
712 712
         # S3 from time to time doesn't send ETag back in a response :-(
713 713
         # Force re-upload here.
... ...
@@ -730,7 +740,7 @@ class S3(object):
730 730
                     warning("Upload failed: %s (%s)" % (resource['uri'], S3Error(response)))
731 731
                     warning("Waiting %d sec..." % self._fail_wait(retries))
732 732
                     time.sleep(self._fail_wait(retries))
733
-                    return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
733
+                    return self.send_file(request, file, labels, buffer, throttle, retries - 1, offset, chunk_size)
734 734
                 else:
735 735
                     warning("Too many failures. Giving up on '%s'" % (file.name))
736 736
                     raise S3UploadError
... ...
@@ -743,7 +753,7 @@ class S3(object):
743 743
             warning("MD5 Sums don't match!")
744 744
             if retries:
745 745
                 warning("Retrying upload of %s" % (file.name))
746
-                return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size)
746
+                return self.send_file(request, file, labels, buffer, throttle, retries - 1, offset, chunk_size)
747 747
             else:
748 748
                 warning("Too many failures. Giving up on '%s'" % (file.name))
749 749
                 raise S3UploadError
... ...
@@ -267,7 +267,13 @@ def cmd_object_put(args):
267 267
     info(u"Summary: %d local files to upload" % local_count)
268 268
 
269 269
     if local_count > 0:
270
-        if not destination_base.endswith("/"):
270
+        if not single_file_local:
271
+            for key in local_list:
272
+                if key == "-":
273
+                    raise ParameterError("Cannot specify multiple local files if uploading from '-' (ie stdin)")
274
+        elif single_file_local and local_list.keys()[0] == "-" and destination_base.endswith("/"):
275
+            raise ParameterError("Destination S3 URI must not end with '/' when uploading from stdin.")
276
+        elif not destination_base.endswith("/"):
271 277
             if not single_file_local:
272 278
                 raise ParameterError("Destination S3 URI must end with '/' (ie must refer to a directory on the remote side).")
273 279
             local_list[local_list.keys()[0]]['remote_uri'] = unicodise(destination_base)
... ...
@@ -279,7 +285,11 @@ def cmd_object_put(args):
279 279
         for key in exclude_list:
280 280
             output(u"exclude: %s" % unicodise(key))
281 281
         for key in local_list:
282
-            output(u"upload: %s -> %s" % (local_list[key]['full_name_unicode'], local_list[key]['remote_uri']))
282
+            if key != "-":
283
+                nicekey = local_list[key]['full_name_unicode']
284
+            else:
285
+                nicekey = "<stdin>"
286
+            output(u"upload: %s -> %s" % (nicekey, local_list[key]['remote_uri']))
283 287
 
284 288
         warning(u"Exitting now because of --dry-run")
285 289
         return