Browse code

sync: refactor parent/child and single process code

os.fork() and os.wait() don't exist on Windows, and the
multiprocessing module doesn't exist until python 2.6. So instead, we
conditionalize calling os.fork() depending on its existance, and on
there being > 1 destination.

Also simply rearranges the code so that subfunctions within
local2remote are defined at the top of their respective functions, for
better readability through the main execution of the function.

Matt Domsch authored on 2012/06/20 11:08:02
Showing 1 changed files
... ...
@@ -907,175 +907,187 @@ def cmd_sync_local2remote(args):
907 907
             s3.object_delete(uri)
908 908
             output(u"deleted: '%s'" % uri)
909 909
 
910
-    s3 = S3(cfg)
911
-
912
-    if cfg.encrypt:
913
-        error(u"S3cmd 'sync' doesn't yet support GPG encryption, sorry.")
914
-        error(u"Either use unconditional 's3cmd put --recursive'")
915
-        error(u"or disable encryption with --no-encrypt parameter.")
916
-        sys.exit(1)
917
-
918
-    local_list, single_file_local = fetch_local_list(args[:-1], recursive = True)
919
-
920
-    # Now that we've done all the disk I/O to look at the local file system and
921
-    # calculate the md5 for each file, fork for each destination to upload to them separately
922
-    # and in parallel
923
-    child_pids = []
924
-    destinations = [args[-1]]
925
-    if cfg.additional_destinations:
926
-        destinations = destinations + cfg.additional_destinations
927
-
928
-    for dest in destinations:
929
-        ## Normalize URI to convert s3://bkt to s3://bkt/ (trailing slash)
930
-        destination_base_uri = S3Uri(dest)
931
-        if destination_base_uri.type != 's3':
932
-            raise ParameterError("Destination must be S3Uri. Got: %s" % destination_base_uri)
933
-        destination_base = str(destination_base_uri)
934
-        child_pid = os.fork()
935
-        if child_pid == 0:
936
-            is_parent = False
937
-            break
938
-        else:
939
-            is_parent = True
940
-            child_pids.append(child_pid)
910
+    def _single_process(local_list):
911
+        for dest in destinations:
912
+            ## Normalize URI to convert s3://bkt to s3://bkt/ (trailing slash)
913
+            destination_base_uri = S3Uri(dest)
914
+            if destination_base_uri.type != 's3':
915
+                raise ParameterError("Destination must be S3Uri. Got: %s" % destination_base_uri)
916
+            destination_base = str(destination_base_uri)
917
+            _child(destination_base, local_list)
918
+
919
+    def _parent():
920
+        # Now that we've done all the disk I/O to look at the local file system and
921
+        # calculate the md5 for each file, fork for each destination to upload to them separately
922
+        # and in parallel
923
+        child_pids = []
924
+
925
+        for dest in destinations:
926
+            ## Normalize URI to convert s3://bkt to s3://bkt/ (trailing slash)
927
+            destination_base_uri = S3Uri(dest)
928
+            if destination_base_uri.type != 's3':
929
+                raise ParameterError("Destination must be S3Uri. Got: %s" % destination_base_uri)
930
+            destination_base = str(destination_base_uri)
931
+            child_pid = os.fork()
932
+            if child_pid == 0:
933
+                _child(destination_base, local_list)
934
+                os._exit(0)
935
+            else:
936
+                child_pids.append(child_pid)
941 937
 
942
-    if is_parent:
943 938
         while len(child_pids):
944 939
             (pid, status) = os.wait()
945 940
             child_pids.remove(pid)
946 941
         return
947 942
 
948
-    # This is all executed in the child thread
949
-    # remember to leave here with os._exit() so control doesn't resume elsewhere
950
-    remote_list = fetch_remote_list(destination_base, recursive = True, require_attribs = True)
943
+    def _child(destination_base, local_list):
944
+        def _set_remote_uri(local_list, destination_base, single_file_local):
945
+            if len(local_list) > 0:
946
+                ## Populate 'remote_uri' only if we've got something to upload
947
+                if not destination_base.endswith("/"):
948
+                    if not single_file_local:
949
+                        raise ParameterError("Destination S3 URI must end with '/' (ie must refer to a directory on the remote side).")
950
+                    local_list[local_list.keys()[0]]['remote_uri'] = unicodise(destination_base)
951
+                else:
952
+                    for key in local_list:
953
+                        local_list[key]['remote_uri'] = unicodise(destination_base + key)
954
+
955
+        def _upload(local_list, seq, total, total_size):
956
+            file_list = local_list.keys()
957
+            file_list.sort()
958
+            for file in file_list:
959
+                seq += 1
960
+                item = local_list[file]
961
+                src = item['full_name']
962
+                uri = S3Uri(item['remote_uri'])
963
+                seq_label = "[%d of %d]" % (seq, total)
964
+                extra_headers = copy(cfg.extra_headers)
965
+                try:
966
+                    if cfg.preserve_attrs:
967
+                        attr_header = _build_attr_header(local_list, file)
968
+                        debug(u"attr_header: %s" % attr_header)
969
+                        extra_headers.update(attr_header)
970
+                    response = s3.object_put(src, uri, extra_headers, extra_label = seq_label)
971
+                except InvalidFileError, e:
972
+                    warning(u"File can not be uploaded: %s" % e)
973
+                    continue
974
+                except S3UploadError, e:
975
+                    error(u"%s: upload failed too many times. Skipping that file." % item['full_name_unicode'])
976
+                    continue
977
+                speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
978
+                if not cfg.progress_meter:
979
+                    output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
980
+                        (item['full_name_unicode'], uri, response["size"], response["elapsed"],
981
+                        speed_fmt[0], speed_fmt[1], seq_label))
982
+                total_size += response["size"]
983
+                uploaded_objects_list.append(uri.object())
984
+            return seq, total_size
951 985
 
952
-    local_count = len(local_list)
953
-    remote_count = len(remote_list)
986
+        remote_list = fetch_remote_list(destination_base, recursive = True, require_attribs = True)
954 987
 
955
-    info(u"Found %d local files, %d remote files" % (local_count, remote_count))
988
+        local_count = len(local_list)
989
+        remote_count = len(remote_list)
956 990
 
957
-    local_list, exclude_list = filter_exclude_include(local_list)
958
-        
959
-    if single_file_local and len(local_list) == 1 and len(remote_list) == 1:
960
-        ## Make remote_key same as local_key for comparison if we're dealing with only one file
961
-        remote_list_entry = remote_list[remote_list.keys()[0]]
962
-        # Flush remote_list, by the way
963
-        remote_list = { local_list.keys()[0] : remote_list_entry }
991
+        info(u"Found %d local files, %d remote files" % (local_count, remote_count))
964 992
 
965
-    local_list, remote_list, update_list, copy_pairs = compare_filelists(local_list, remote_list, src_remote = False, dst_remote = True, delay_updates = cfg.delay_updates)
993
+        local_list, exclude_list = filter_exclude_include(local_list)
966 994
 
995
+        if single_file_local and len(local_list) == 1 and len(remote_list) == 1:
996
+            ## Make remote_key same as local_key for comparison if we're dealing with only one file
997
+            remote_list_entry = remote_list[remote_list.keys()[0]]
998
+            # Flush remote_list, by the way
999
+            remote_list = { local_list.keys()[0] : remote_list_entry }
967 1000
 
968
-    local_count = len(local_list)
969
-    update_count = len(update_list)
970
-    copy_count = len(copy_pairs)
971
-    remote_count = len(remote_list)
1001
+        local_list, remote_list, update_list, copy_pairs = compare_filelists(local_list, remote_list, src_remote = False, dst_remote = True, delay_updates = cfg.delay_updates)
972 1002
 
973
-    info(u"Summary: %d local files to upload, %d files to remote copy, %d remote files to delete" % (local_count + update_count, copy_count, remote_count))
1003
+        local_count = len(local_list)
1004
+        update_count = len(update_list)
1005
+        copy_count = len(copy_pairs)
1006
+        remote_count = len(remote_list)
974 1007
 
975
-    def _set_remote_uri(local_list, destination_base, single_file_local):
976
-        if len(local_list) > 0:
977
-            ## Populate 'remote_uri' only if we've got something to upload
978
-            if not destination_base.endswith("/"):
979
-                if not single_file_local:
980
-                    raise ParameterError("Destination S3 URI must end with '/' (ie must refer to a directory on the remote side).")
981
-                local_list[local_list.keys()[0]]['remote_uri'] = unicodise(destination_base)
982
-            else:
983
-                for key in local_list:
984
-                    local_list[key]['remote_uri'] = unicodise(destination_base + key)
1008
+        info(u"Summary: %d local files to upload, %d files to remote copy, %d remote files to delete" % (local_count + update_count, copy_count, remote_count))
985 1009
 
986
-    _set_remote_uri(local_list, destination_base, single_file_local)
987
-    _set_remote_uri(update_list, destination_base, single_file_local)
988
-    
989
-    if cfg.dry_run:
990
-        for key in exclude_list:
991
-            output(u"exclude: %s" % unicodise(key))
992
-        for key in local_list:
993
-            output(u"upload: %s -> %s" % (local_list[key]['full_name_unicode'], local_list[key]['remote_uri']))
994
-        for key in update_list:
995
-            output(u"upload: %s -> %s" % (update_list[key]['full_name_unicode'], update_list[key]['remote_uri']))
996
-        for (dst1, dst2) in copy_pairs:
997
-            output(u"remote copy: %s -> %s" % (dst1['object_key'], remote_list[dst2]['object_key']))
998
-        if cfg.delete_removed:
999
-            for key in remote_list:
1000
-                output(u"delete: %s" % remote_list[key]['object_uri_str'])
1010
+        _set_remote_uri(local_list, destination_base, single_file_local)
1011
+        _set_remote_uri(update_list, destination_base, single_file_local)
1001 1012
 
1002
-        warning(u"Exitting now because of --dry-run")
1003
-        os._exit(0)
1013
+        if cfg.dry_run:
1014
+            for key in exclude_list:
1015
+                output(u"exclude: %s" % unicodise(key))
1016
+            for key in local_list:
1017
+                output(u"upload: %s -> %s" % (local_list[key]['full_name_unicode'], local_list[key]['remote_uri']))
1018
+            for key in update_list:
1019
+                output(u"upload: %s -> %s" % (update_list[key]['full_name_unicode'], update_list[key]['remote_uri']))
1020
+            for (dst1, dst2) in copy_pairs:
1021
+                output(u"remote copy: %s -> %s" % (dst1['object_key'], remote_list[dst2]['object_key']))
1022
+            if cfg.delete_removed:
1023
+                for key in remote_list:
1024
+                    output(u"delete: %s" % remote_list[key]['object_uri_str'])
1025
+
1026
+            warning(u"Exitting now because of --dry-run")
1027
+            return
1004 1028
 
1005
-    # if there are copy pairs, we can't do delete_before, on the chance
1006
-    # we need one of the to-be-deleted files as a copy source.
1007
-    if len(copy_pairs) > 0:
1008
-        cfg.delete_after = True
1029
+        # if there are copy pairs, we can't do delete_before, on the chance
1030
+        # we need one of the to-be-deleted files as a copy source.
1031
+        if len(copy_pairs) > 0:
1032
+            cfg.delete_after = True
1033
+
1034
+        if cfg.delete_removed and not cfg.delete_after:
1035
+            _do_deletes(s3, remote_list)
1036
+
1037
+        uploaded_objects_list = []
1038
+        total_size = 0
1039
+        total_elapsed = 0.0
1040
+        timestamp_start = time.time()
1041
+        n, total_size = _upload(local_list, 0, local_count, total_size)
1042
+        n, total_size = _upload(update_list, n, local_count, total_size)
1043
+        n_copies, saved_bytes = remote_copy(s3, copy_pairs, destination_base)
1044
+        if cfg.delete_removed and cfg.delete_after:
1045
+            _do_deletes(s3, remote_list)
1046
+        total_elapsed = time.time() - timestamp_start
1047
+        total_speed = total_elapsed and total_size/total_elapsed or 0.0
1048
+        speed_fmt = formatSize(total_speed, human_readable = True, floating_point = True)
1049
+
1050
+        # Only print out the result if any work has been done or
1051
+        # if the user asked for verbose output
1052
+        outstr = "Done. Uploaded %d bytes in %0.1f seconds, %0.2f %sB/s.  Copied %d files saving %d bytes transfer." % (total_size, total_elapsed, speed_fmt[0], speed_fmt[1], n_copies, saved_bytes)
1053
+        if total_size + saved_bytes > 0:
1054
+            output(outstr)
1055
+        else:
1056
+            info(outstr)
1009 1057
 
1010
-    if cfg.delete_removed and not cfg.delete_after:
1011
-        _do_deletes(s3, remote_list)
1058
+        if cfg.invalidate_on_cf:
1059
+            if len(uploaded_objects_list) == 0:
1060
+                info("Nothing to invalidate in CloudFront")
1061
+            else:
1062
+                # 'uri' from the last iteration is still valid at this point
1063
+                cf = CloudFront(cfg)
1064
+                result = cf.InvalidateObjects(uri, uploaded_objects_list)
1065
+                if result['status'] == 201:
1066
+                    output("Created invalidation request for %d paths" % len(uploaded_objects_list))
1067
+                    output("Check progress with: s3cmd cfinvalinfo cf://%s/%s" % (result['dist_id'], result['request_id']))
1012 1068
 
1013
-    uploaded_objects_list = []
1014
-    total_size = 0
1015
-    total_elapsed = 0.0
1016
-    timestamp_start = time.time()
1069
+        return
1017 1070
 
1018
-    def _upload(local_list, seq, total, total_size):
1019
-        file_list = local_list.keys()
1020
-        file_list.sort()
1021
-        for file in file_list:
1022
-            seq += 1
1023
-            item = local_list[file]
1024
-            src = item['full_name']
1025
-            uri = S3Uri(item['remote_uri'])
1026
-            seq_label = "[%d of %d]" % (seq, total)
1027
-            extra_headers = copy(cfg.extra_headers)
1028
-            try:
1029
-                if cfg.preserve_attrs:
1030
-                    attr_header = _build_attr_header(local_list, file)
1031
-                    debug(u"attr_header: %s" % attr_header)
1032
-                    extra_headers.update(attr_header)
1033
-                response = s3.object_put(src, uri, extra_headers, extra_label = seq_label)
1034
-            except InvalidFileError, e:
1035
-                warning(u"File can not be uploaded: %s" % e)
1036
-                continue
1037
-            except S3UploadError, e:
1038
-                error(u"%s: upload failed too many times. Skipping that file." % item['full_name_unicode'])
1039
-                continue
1040
-            speed_fmt = formatSize(response["speed"], human_readable = True, floating_point = True)
1041
-            if not cfg.progress_meter:
1042
-                output(u"File '%s' stored as '%s' (%d bytes in %0.1f seconds, %0.2f %sB/s) %s" %
1043
-                    (item['full_name_unicode'], uri, response["size"], response["elapsed"],
1044
-                    speed_fmt[0], speed_fmt[1], seq_label))
1045
-            total_size += response["size"]
1046
-            uploaded_objects_list.append(uri.object())
1047
-        return seq, total_size
1071
+    # main execution
1072
+    s3 = S3(cfg)
1048 1073
 
1074
+    if cfg.encrypt:
1075
+        error(u"S3cmd 'sync' doesn't yet support GPG encryption, sorry.")
1076
+        error(u"Either use unconditional 's3cmd put --recursive'")
1077
+        error(u"or disable encryption with --no-encrypt parameter.")
1078
+        sys.exit(1)
1049 1079
 
1050
-    n, total_size = _upload(local_list, 0, local_count, total_size)
1051
-    n, total_size = _upload(update_list, n, local_count, total_size)
1052
-    n_copies, saved_bytes = remote_copy(s3, copy_pairs, destination_base)
1053
-    if cfg.delete_removed and cfg.delete_after:
1054
-        _do_deletes(s3, remote_list)
1055
-    total_elapsed = time.time() - timestamp_start
1056
-    total_speed = total_elapsed and total_size/total_elapsed or 0.0
1057
-    speed_fmt = formatSize(total_speed, human_readable = True, floating_point = True)
1080
+    local_list, single_file_local = fetch_local_list(args[:-1], recursive = True)
1058 1081
 
1059
-    # Only print out the result if any work has been done or
1060
-    # if the user asked for verbose output
1061
-    outstr = "Done. Uploaded %d bytes in %0.1f seconds, %0.2f %sB/s.  Copied %d files saving %d bytes transfer." % (total_size, total_elapsed, speed_fmt[0], speed_fmt[1], n_copies, saved_bytes)
1062
-    if total_size + saved_bytes > 0:
1063
-        output(outstr)
1082
+    destinations = [args[-1]]
1083
+    if cfg.additional_destinations:
1084
+        destinations = destinations + cfg.additional_destinations
1085
+
1086
+    if 'fork' not in os.__all__ or len(destinations) < 2:
1087
+        _single_process(local_list)
1064 1088
     else:
1065
-        info(outstr)
1089
+        _parent()
1066 1090
 
1067
-    if cfg.invalidate_on_cf:
1068
-        if len(uploaded_objects_list) == 0:
1069
-            info("Nothing to invalidate in CloudFront")
1070
-        else:
1071
-            # 'uri' from the last iteration is still valid at this point
1072
-            cf = CloudFront(cfg)
1073
-            result = cf.InvalidateObjects(uri, uploaded_objects_list)
1074
-            if result['status'] == 201:
1075
-                output("Created invalidation request for %d paths" % len(uploaded_objects_list))
1076
-                output("Check progress with: s3cmd cfinvalinfo cf://%s/%s" % (result['dist_id'], result['request_id']))
1077
-
1078
-    return os._exit(0)
1079 1091
 
1080 1092
 def cmd_sync(args):
1081 1093
     if (len(args) < 2):