Browse code

sync: add --add-destination, parallelize uploads to multiple destinations

Only meaningful at present in the sync local->remote(s) case, this
adds the --add-destination <foo> command line option. For the last
arg (the traditional destination), and each destination specified via
--add-destination, fork and upload after the initial walk of the local
file system has completed (and done all the disk I/O to calculate md5
values for each file).

This keeps us from pounding the file system doing (the same) disk I/O
for each possible destination, and allows full use of our bandwidth to
upload in parallel.

Matt Domsch authored on 2012/06/19 05:17:32
Showing 2 changed files
... ...
@@ -86,6 +86,7 @@ class Config(object):
86 86
     website_index = "index.html"
87 87
     website_error = ""
88 88
     website_endpoint = "http://%(bucket)s.s3-website-%(location)s.amazonaws.com/"
89
+    additional_destinations = []
89 90
 
90 91
     ## Creating a singleton
91 92
     def __new__(self, configfile = None):
... ...
@@ -915,13 +915,38 @@ def cmd_sync_local2remote(args):
915 915
         error(u"or disable encryption with --no-encrypt parameter.")
916 916
         sys.exit(1)
917 917
 
918
-    ## Normalize URI to convert s3://bkt to s3://bkt/ (trailing slash)
919
-    destination_base_uri = S3Uri(args[-1])
920
-    if destination_base_uri.type != 's3':
921
-        raise ParameterError("Destination must be S3Uri. Got: %s" % destination_base_uri)
922
-    destination_base = str(destination_base_uri)
923
-
924 918
     local_list, single_file_local = fetch_local_list(args[:-1], recursive = True)
919
+
920
+    # Now that we've done all the disk I/O to look at the local file system and
921
+    # calculate the md5 for each file, fork for each destination to upload to them separately
922
+    # and in parallel
923
+    child_pids = []
924
+    destinations = [args[-1]]
925
+    if cfg.additional_destinations:
926
+        destinations = destinations + cfg.additional_destinations
927
+
928
+    for dest in destinations:
929
+        ## Normalize URI to convert s3://bkt to s3://bkt/ (trailing slash)
930
+        destination_base_uri = S3Uri(dest)
931
+        if destination_base_uri.type != 's3':
932
+            raise ParameterError("Destination must be S3Uri. Got: %s" % destination_base_uri)
933
+        destination_base = str(destination_base_uri)
934
+        child_pid = os.fork()
935
+        if child_pid == 0:
936
+            is_parent = False
937
+            break
938
+        else:
939
+            is_parent = True
940
+            child_pids.append(child_pid)
941
+
942
+    if is_parent:
943
+        while len(child_pids):
944
+            (pid, status) = os.wait()
945
+            child_pids.remove(pid)
946
+        return
947
+
948
+    # This is all executed in the child thread
949
+    # remember to leave here with os._exit() so control doesn't resume elsewhere
925 950
     remote_list = fetch_remote_list(destination_base, recursive = True, require_attribs = True)
926 951
 
927 952
     local_count = len(local_list)
... ...
@@ -975,7 +1000,7 @@ def cmd_sync_local2remote(args):
975 975
                 output(u"delete: %s" % remote_list[key]['object_uri_str'])
976 976
 
977 977
         warning(u"Exitting now because of --dry-run")
978
-        return
978
+        os._exit(0)
979 979
 
980 980
     # if there are copy pairs, we can't do delete_before, on the chance
981 981
     # we need one of the to-be-deleted files as a copy source.
... ...
@@ -1050,6 +1075,8 @@ def cmd_sync_local2remote(args):
1050 1050
                 output("Created invalidation request for %d paths" % len(uploaded_objects_list))
1051 1051
                 output("Check progress with: s3cmd cfinvalinfo cf://%s/%s" % (result['dist_id'], result['request_id']))
1052 1052
 
1053
+    return os._exit(0)
1054
+
1053 1055
 def cmd_sync(args):
1054 1056
     if (len(args) < 2):
1055 1057
         raise ParameterError("Too few parameters! Expected: %s" % commands['sync']['param'])
... ...
@@ -1594,6 +1621,7 @@ def main():
1594 1594
     optparser.add_option(      "--no-delete-removed", dest="delete_removed", action="store_false", help="Don't delete remote objects.")
1595 1595
     optparser.add_option(      "--delete-after", dest="delete_after", action="store_true", help="Perform deletes after new uploads [sync]")
1596 1596
     optparser.add_option(      "--delay-updates", dest="delay_updates", action="store_true", help="Put all updated files into place at end [sync]")
1597
+    optparser.add_option(      "--add-destination", dest="additional_destinations", action="append", help="Additional destination for parallel uploads, in addition to last arg.  May be repeated.")
1597 1598
     optparser.add_option("-p", "--preserve", dest="preserve_attrs", action="store_true", help="Preserve filesystem attributes (mode, ownership, timestamps). Default for [sync] command.")
1598 1599
     optparser.add_option(      "--no-preserve", dest="preserve_attrs", action="store_false", help="Don't store FS attributes")
1599 1600
     optparser.add_option(      "--exclude", dest="exclude", action="append", metavar="GLOB", help="Filenames and paths matching GLOB will be excluded from sync")
... ...
@@ -1769,6 +1797,9 @@ def main():
1769 1769
             ## Some CloudFront.Cmd.Options() options are not settable from command line
1770 1770
             pass
1771 1771
 
1772
+    if options.additional_destinations:
1773
+        cfg.additional_destinations = options.additional_destinations
1774
+
1772 1775
     ## Set output and filesystem encoding for printing out filenames.
1773 1776
     sys.stdout = codecs.getwriter(cfg.encoding)(sys.stdout, "replace")
1774 1777
     sys.stderr = codecs.getwriter(cfg.encoding)(sys.stderr, "replace")