... | ... |
@@ -78,6 +78,16 @@ def filter_exclude_include(src_list): |
78 | 78 |
def fetch_local_list(args, recursive = None): |
79 | 79 |
def _get_filelist_local(local_uri): |
80 | 80 |
info(u"Compiling list of local files...") |
81 |
+ |
|
82 |
+ if deunicodise(local_uri.basename()) == "-": |
|
83 |
+ loc_list = SortedDict(ignore_case = False) |
|
84 |
+ loc_list["-"] = { |
|
85 |
+ 'full_name_unicode' : '-', |
|
86 |
+ 'full_name' : '-', |
|
87 |
+ 'size' : -1, |
|
88 |
+ 'mtime' : -1, |
|
89 |
+ } |
|
90 |
+ return loc_list, True |
|
81 | 91 |
if local_uri.isdir(): |
82 | 92 |
local_base = deunicodise(local_uri.basename()) |
83 | 93 |
local_path = deunicodise(local_uri.path()) |
... | ... |
@@ -42,32 +42,56 @@ class MultiPartUpload(object): |
42 | 42 |
if not self.upload_id: |
43 | 43 |
raise RuntimeError("Attempting to use a multipart upload that has not been initiated.") |
44 | 44 |
|
45 |
- size_left = file_size = os.stat(self.file.name)[ST_SIZE] |
|
46 |
- self.chunk_size = self.s3.config.multipart_chunk_size_mb * 1024 * 1024 |
|
47 |
- nr_parts = file_size / self.chunk_size + (file_size % self.chunk_size and 1) |
|
48 |
- debug("MultiPart: Uploading %s in %d parts" % (self.file.name, nr_parts)) |
|
45 |
+ if self.file.name != "<stdin>": |
|
46 |
+ size_left = file_size = os.stat(self.file.name)[ST_SIZE] |
|
47 |
+ nr_parts = file_size / self.chunk_size + (file_size % self.chunk_size and 1) |
|
48 |
+ debug("MultiPart: Uploading %s in %d parts" % (self.file.name, nr_parts)) |
|
49 |
+ else: |
|
50 |
+ debug("MultiPart: Uploading from %s" % (self.file.name)) |
|
51 |
+ |
|
52 |
+ self.chunk_size = self.s3.config.multipart_chunk_size_mb * 1024 * 1024 |
|
49 | 53 |
|
50 | 54 |
seq = 1 |
51 |
- while size_left > 0: |
|
52 |
- offset = self.chunk_size * (seq - 1) |
|
53 |
- current_chunk_size = min(file_size - offset, self.chunk_size) |
|
54 |
- size_left -= current_chunk_size |
|
55 |
- labels = { |
|
56 |
- 'source' : unicodise(self.file.name), |
|
57 |
- 'destination' : unicodise(self.uri.uri()), |
|
58 |
- 'extra' : "[part %d of %d, %s]" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True)) |
|
59 |
- } |
|
60 |
- try: |
|
61 |
- self.upload_part(seq, offset, current_chunk_size, labels) |
|
62 |
- except: |
|
63 |
- error(u"Upload of '%s' part %d failed. Aborting multipart upload." % (self.file.name, seq)) |
|
64 |
- self.abort_upload() |
|
65 |
- raise |
|
66 |
- seq += 1 |
|
55 |
+ if self.file.name != "<stdin>": |
|
56 |
+ while size_left > 0: |
|
57 |
+ offset = self.chunk_size * (seq - 1) |
|
58 |
+ current_chunk_size = min(file_size - offset, self.chunk_size) |
|
59 |
+ size_left -= current_chunk_size |
|
60 |
+ labels = { |
|
61 |
+ 'source' : unicodise(self.file.name), |
|
62 |
+ 'destination' : unicodise(self.uri.uri()), |
|
63 |
+ 'extra' : "[part %d of %d, %s]" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True)) |
|
64 |
+ } |
|
65 |
+ try: |
|
66 |
+ self.upload_part(seq, offset, current_chunk_size, labels) |
|
67 |
+ except: |
|
68 |
+ error(u"Upload of '%s' part %d failed. Aborting multipart upload." % (self.file.name, seq)) |
|
69 |
+ self.abort_upload() |
|
70 |
+ raise |
|
71 |
+ seq += 1 |
|
72 |
+ else: |
|
73 |
+ while True: |
|
74 |
+ buffer = self.file.read(self.chunk_size) |
|
75 |
+ offset = self.chunk_size * (seq - 1) |
|
76 |
+ current_chunk_size = len(buffer) |
|
77 |
+ labels = { |
|
78 |
+ 'source' : unicodise(self.file.name), |
|
79 |
+ 'destination' : unicodise(self.uri.uri()), |
|
80 |
+ 'extra' : "[part %d, %s]" % (seq, "%d%sB" % formatSize(current_chunk_size, human_readable = True)) |
|
81 |
+ } |
|
82 |
+ if len(buffer) == 0: # EOF |
|
83 |
+ break |
|
84 |
+ try: |
|
85 |
+ self.upload_part(seq, offset, current_chunk_size, labels, buffer) |
|
86 |
+ except: |
|
87 |
+ error(u"Upload of '%s' part %d failed. Aborting multipart upload." % (self.file.name, seq)) |
|
88 |
+ self.abort_upload() |
|
89 |
+ raise |
|
90 |
+ seq += 1 |
|
67 | 91 |
|
68 | 92 |
debug("MultiPart: Upload finished: %d parts", seq - 1) |
69 | 93 |
|
70 |
- def upload_part(self, seq, offset, chunk_size, labels): |
|
94 |
+ def upload_part(self, seq, offset, chunk_size, labels, buffer = ''): |
|
71 | 95 |
""" |
72 | 96 |
Upload a file chunk |
73 | 97 |
http://docs.amazonwebservices.com/AmazonS3/latest/API/index.html?mpUploadUploadPart.html |
... | ... |
@@ -77,7 +101,7 @@ class MultiPartUpload(object): |
77 | 77 |
headers = { "content-length": chunk_size } |
78 | 78 |
query_string = "?partNumber=%i&uploadId=%s" % (seq, self.upload_id) |
79 | 79 |
request = self.s3.create_request("OBJECT_PUT", uri = self.uri, headers = headers, extra = query_string) |
80 |
- response = self.s3.send_file(request, self.file, labels, offset = offset, chunk_size = chunk_size) |
|
80 |
+ response = self.s3.send_file(request, self.file, labels, buffer, offset = offset, chunk_size = chunk_size) |
|
81 | 81 |
self.parts[seq] = response["headers"]["etag"] |
82 | 82 |
return response |
83 | 83 |
|
... | ... |
@@ -345,11 +345,15 @@ class S3(object): |
345 | 345 |
if uri.type != "s3": |
346 | 346 |
raise ValueError("Expected URI type 's3', got '%s'" % uri.type) |
347 | 347 |
|
348 |
- if not os.path.isfile(filename): |
|
348 |
+ if filename != "-" and not os.path.isfile(filename): |
|
349 | 349 |
raise InvalidFileError(u"%s is not a regular file" % unicodise(filename)) |
350 | 350 |
try: |
351 |
- file = open(filename, "rb") |
|
352 |
- size = os.stat(filename)[ST_SIZE] |
|
351 |
+ if filename == "-": |
|
352 |
+ file = sys.stdin |
|
353 |
+ size = 0 |
|
354 |
+ else: |
|
355 |
+ file = open(filename, "rb") |
|
356 |
+ size = os.stat(filename)[ST_SIZE] |
|
353 | 357 |
except (IOError, OSError), e: |
354 | 358 |
raise InvalidFileError(u"%s: %s" % (unicodise(filename), e.strerror)) |
355 | 359 |
|
... | ... |
@@ -359,7 +363,7 @@ class S3(object): |
359 | 359 |
|
360 | 360 |
## MIME-type handling |
361 | 361 |
content_type = self.config.mime_type |
362 |
- if not content_type and self.config.guess_mime_type: |
|
362 |
+ if filename != "-" and not content_type and self.config.guess_mime_type: |
|
363 | 363 |
content_type = mime_magic(filename) |
364 | 364 |
if not content_type: |
365 | 365 |
content_type = self.config.default_mime_type |
... | ... |
@@ -374,8 +378,10 @@ class S3(object): |
374 | 374 |
|
375 | 375 |
## Multipart decision |
376 | 376 |
multipart = False |
377 |
+ if not self.config.enable_multipart and filename == "-": |
|
378 |
+ raise ParameterError("Multi-part upload is required to upload from stdin") |
|
377 | 379 |
if self.config.enable_multipart: |
378 |
- if size > self.config.multipart_chunk_size_mb * 1024 * 1024: |
|
380 |
+ if size > self.config.multipart_chunk_size_mb * 1024 * 1024 or filename == "-": |
|
379 | 381 |
multipart = True |
380 | 382 |
if multipart: |
381 | 383 |
# Multipart requests are quite different... drop here |
... | ... |
@@ -625,7 +631,7 @@ class S3(object): |
625 | 625 |
|
626 | 626 |
return response |
627 | 627 |
|
628 |
- def send_file(self, request, file, labels, throttle = 0, retries = _max_retries, offset = 0, chunk_size = -1): |
|
628 |
+ def send_file(self, request, file, labels, buffer = '', throttle = 0, retries = _max_retries, offset = 0, chunk_size = -1): |
|
629 | 629 |
method_string, resource, headers = request.get_triplet() |
630 | 630 |
size_left = size_total = headers.get("content-length") |
631 | 631 |
if self.config.progress_meter: |
... | ... |
@@ -648,15 +654,19 @@ class S3(object): |
648 | 648 |
warning("Waiting %d sec..." % self._fail_wait(retries)) |
649 | 649 |
time.sleep(self._fail_wait(retries)) |
650 | 650 |
# Connection error -> same throttle value |
651 |
- return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size) |
|
651 |
+ return self.send_file(request, file, labels, buffer, throttle, retries - 1, offset, chunk_size) |
|
652 | 652 |
else: |
653 | 653 |
raise S3UploadError("Upload failed for: %s" % resource['uri']) |
654 |
- file.seek(offset) |
|
654 |
+ if buffer == '': |
|
655 |
+ file.seek(offset) |
|
655 | 656 |
md5_hash = md5() |
656 | 657 |
try: |
657 | 658 |
while (size_left > 0): |
658 |
- #debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name)) |
|
659 |
- data = file.read(min(self.config.send_chunk, size_left)) |
|
659 |
+ #debug("SendFile: Reading up to %d bytes from '%s' - remaining bytes: %s" % (self.config.send_chunk, file.name, size_left)) |
|
660 |
+ if buffer == '': |
|
661 |
+ data = file.read(min(self.config.send_chunk, size_left)) |
|
662 |
+ else: |
|
663 |
+ data = buffer |
|
660 | 664 |
md5_hash.update(data) |
661 | 665 |
conn.send(data) |
662 | 666 |
if self.config.progress_meter: |
... | ... |
@@ -685,7 +695,7 @@ class S3(object): |
685 | 685 |
warning("Waiting %d sec..." % self._fail_wait(retries)) |
686 | 686 |
time.sleep(self._fail_wait(retries)) |
687 | 687 |
# Connection error -> same throttle value |
688 |
- return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size) |
|
688 |
+ return self.send_file(request, file, labels, buffer, throttle, retries - 1, offset, chunk_size) |
|
689 | 689 |
else: |
690 | 690 |
debug("Giving up on '%s' %s" % (file.name, e)) |
691 | 691 |
raise S3UploadError("Upload failed for: %s" % resource['uri']) |
... | ... |
@@ -707,7 +717,7 @@ class S3(object): |
707 | 707 |
redir_hostname = getTextFromXml(response['data'], ".//Endpoint") |
708 | 708 |
self.set_hostname(redir_bucket, redir_hostname) |
709 | 709 |
warning("Redirected to: %s" % (redir_hostname)) |
710 |
- return self.send_file(request, file, labels, offset = offset, chunk_size = chunk_size) |
|
710 |
+ return self.send_file(request, file, labels, buffer, offset = offset, chunk_size = chunk_size) |
|
711 | 711 |
|
712 | 712 |
# S3 from time to time doesn't send ETag back in a response :-( |
713 | 713 |
# Force re-upload here. |
... | ... |
@@ -730,7 +740,7 @@ class S3(object): |
730 | 730 |
warning("Upload failed: %s (%s)" % (resource['uri'], S3Error(response))) |
731 | 731 |
warning("Waiting %d sec..." % self._fail_wait(retries)) |
732 | 732 |
time.sleep(self._fail_wait(retries)) |
733 |
- return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size) |
|
733 |
+ return self.send_file(request, file, labels, buffer, throttle, retries - 1, offset, chunk_size) |
|
734 | 734 |
else: |
735 | 735 |
warning("Too many failures. Giving up on '%s'" % (file.name)) |
736 | 736 |
raise S3UploadError |
... | ... |
@@ -743,7 +753,7 @@ class S3(object): |
743 | 743 |
warning("MD5 Sums don't match!") |
744 | 744 |
if retries: |
745 | 745 |
warning("Retrying upload of %s" % (file.name)) |
746 |
- return self.send_file(request, file, labels, throttle, retries - 1, offset, chunk_size) |
|
746 |
+ return self.send_file(request, file, labels, buffer, throttle, retries - 1, offset, chunk_size) |
|
747 | 747 |
else: |
748 | 748 |
warning("Too many failures. Giving up on '%s'" % (file.name)) |
749 | 749 |
raise S3UploadError |
... | ... |
@@ -266,7 +266,13 @@ def cmd_object_put(args): |
266 | 266 |
info(u"Summary: %d local files to upload" % local_count) |
267 | 267 |
|
268 | 268 |
if local_count > 0: |
269 |
- if not destination_base.endswith("/"): |
|
269 |
+ if not single_file_local: |
|
270 |
+ for key in local_list: |
|
271 |
+ if key == "-": |
|
272 |
+ raise ParameterError("Cannot specify multiple local files if uploading from '-' (ie stdin)") |
|
273 |
+ elif single_file_local and local_list.keys()[0] == "-" and destination_base.endswith("/"): |
|
274 |
+ raise ParameterError("Destination S3 URI must not end with '/' when uploading from stdin.") |
|
275 |
+ elif not destination_base.endswith("/"): |
|
270 | 276 |
if not single_file_local: |
271 | 277 |
raise ParameterError("Destination S3 URI must end with '/' (ie must refer to a directory on the remote side).") |
272 | 278 |
local_list[local_list.keys()[0]]['remote_uri'] = unicodise(destination_base) |
... | ... |
@@ -278,7 +284,7 @@ def cmd_object_put(args): |
278 | 278 |
for key in exclude_list: |
279 | 279 |
output(u"exclude: %s" % unicodise(key)) |
280 | 280 |
for key in local_list: |
281 |
- output(u"upload: %s -> %s" % (local_list[key]['full_name_unicode'], local_list[key]['remote_uri'])) |
|
281 |
+ output(u"upload: %s -> %s" % (local_list[key]['full_name_unicode'] if key != "-" else "<stdin>", local_list[key]['remote_uri'])) |
|
282 | 282 |
|
283 | 283 |
warning(u"Exitting now because of --dry-run") |
284 | 284 |
return |