Browse code

MultipartCopy avoid retrieving again object info when we already know its size

Florent Viard authored on 2020/04/14 07:45:37
Showing 3 changed files
... ...
@@ -13,12 +13,14 @@ from .S3Uri import S3UriS3
13 13
 from .Utils import (getTextFromXml, getTreeFromXml, formatSize,
14 14
                     calculateChecksum, parseNodes, s3_quote)
15 15
 
16
+SIZE_1MB = 1024 * 1024
17
+
16 18
 
17 19
 class MultiPartUpload(object):
18 20
     """Supports MultiPartUpload and MultiPartUpload(Copy) operation"""
19 21
     MIN_CHUNK_SIZE_MB = 5        # 5MB
20
-    MAX_CHUNK_SIZE_MB = 5120     # 5GB
21
-    MAX_FILE_SIZE = 42949672960  # 5TB
22
+    MAX_CHUNK_SIZE_MB = 5 * 1024     # 5GB
23
+    MAX_FILE_SIZE = 5 * 1024 * 1024  # 5TB
22 24
 
23 25
     def __init__(self, s3, src, dst_uri, headers_baseline=None,
24 26
                  src_size=None):
... ...
@@ -36,11 +38,11 @@ class MultiPartUpload(object):
36 36
             if not src_size:
37 37
                 raise ParameterError("Source size is missing for "
38 38
                                      "MultipartUploadCopy operation")
39
-            c_size = self.s3.config.multipart_copy_chunk_size_mb * 1024 * 1024
39
+            c_size = self.s3.config.multipart_copy_chunk_size_mb * SIZE_1MB
40 40
         else:
41 41
             # Source is a file_stream to upload
42 42
             self.file_stream = src
43
-            c_size = self.s3.config.multipart_chunk_size_mb * 1024 * 1024
43
+            c_size = self.s3.config.multipart_chunk_size_mb * SIZE_1MB
44 44
 
45 45
         self.chunk_size = c_size
46 46
         self.upload_id = self.initiate_multipart_upload()
... ...
@@ -127,8 +127,9 @@ def mime_magic(file):
127 127
         result = (None, None)
128 128
     return result
129 129
 
130
-EXPECT_CONTINUE_TIMEOUT = 2
131 130
 
131
+EXPECT_CONTINUE_TIMEOUT = 2
132
+SIZE_1MB = 1024 * 1024
132 133
 
133 134
 __all__ = []
134 135
 class S3Request(object):
... ...
@@ -681,9 +682,9 @@ class S3(object):
681 681
         if not self.config.enable_multipart and filename == "-":
682 682
             raise ParameterError("Multi-part upload is required to upload from stdin")
683 683
         if self.config.enable_multipart:
684
-            if size > self.config.multipart_chunk_size_mb * 1024 * 1024 or filename == "-":
684
+            if size > self.config.multipart_chunk_size_mb * SIZE_1MB or filename == "-":
685 685
                 multipart = True
686
-                if size > self.config.multipart_max_chunks * self.config.multipart_chunk_size_mb * 1024 * 1024:
686
+                if size > self.config.multipart_max_chunks * self.config.multipart_chunk_size_mb * SIZE_1MB:
687 687
                     raise ParameterError("Chunk size %d MB results in more than %d chunks. Please increase --multipart-chunk-size-mb" % \
688 688
                           (self.config.multipart_chunk_size_mb, self.config.multipart_max_chunks))
689 689
         if multipart:
... ...
@@ -816,7 +817,7 @@ class S3(object):
816 816
         return headers
817 817
 
818 818
     def object_copy(self, src_uri, dst_uri, extra_headers=None,
819
-                    extra_label=""):
819
+                    src_size=None, extra_label=""):
820 820
         if src_uri.type != "s3":
821 821
             raise ValueError("Expected URI type 's3', got '%s'" % src_uri.type)
822 822
         if dst_uri.type != "s3":
... ...
@@ -830,7 +831,7 @@ class S3(object):
830 830
                 if exc.status != 501:
831 831
                     raise exc
832 832
                 acl = None
833
-        headers = SortedDict(ignore_case = True)
833
+        headers = SortedDict(ignore_case=True)
834 834
 
835 835
         if self.config.acl_public:
836 836
             headers["x-amz-acl"] = "public-read"
... ...
@@ -850,15 +851,18 @@ class S3(object):
850 850
         if extra_headers:
851 851
             headers.update(extra_headers)
852 852
 
853
-        ## Multipart decision - only do multipart copy for remote s3 files > 5gb
853
+        # Multipart decision. Only do multipart copy for remote s3 files
854
+        # bigger than the multipart copy threshlod.
854 855
         if self.config.enable_multipart:
855
-            # get size of remote src only if multipart is enabled
856
-            src_info = self.object_info(src_uri)
857
-            size = int(src_info["headers"]["content-length"])
856
+            # get size of remote src only if multipart is enabled and no size
857
+            # info was provided
858
+            if src_size is None:
859
+                src_info = self.object_info(src_uri)
860
+                src_size = int(src_info["headers"]["content-length"])
858 861
 
859
-            if size > self.config.multipart_copy_chunk_size_mb * 1024 * 1024:
862
+            if src_size > self.config.multipart_copy_chunk_size_mb * SIZE_1MB:
860 863
                 # Multipart requests are quite different... drop here
861
-                return self.copy_file_multipart(src_uri, dst_uri, size,
864
+                return self.copy_file_multipart(src_uri, dst_uri, src_size,
862 865
                                                 headers, extra_label)
863 866
 
864 867
         ## Not multipart...
... ...
@@ -872,7 +876,7 @@ class S3(object):
872 872
                                       headers=headers)
873 873
         response = self.send_request(request)
874 874
         if response["data"] and getRootTagName(response["data"]) == "Error":
875
-            #http://doc.s3.amazonaws.com/proposals/copy.html
875
+            # http://doc.s3.amazonaws.com/proposals/copy.html
876 876
             # Error during copy, status will be 200, so force error code 500
877 877
             response["status"] = 500
878 878
             error("Server error during the COPY operation. Overwrite response "
... ...
@@ -890,7 +894,7 @@ class S3(object):
890 890
         return response
891 891
 
892 892
     def object_modify(self, src_uri, dst_uri, extra_headers=None,
893
-                      extra_label=""):
893
+                      src_size=None, extra_label=""):
894 894
 
895 895
         if src_uri.type != "s3":
896 896
             raise ValueError("Expected URI type 's3', got '%s'" % src_uri.type)
... ...
@@ -957,12 +961,13 @@ class S3(object):
957 957
         return response
958 958
 
959 959
     def object_move(self, src_uri, dst_uri, extra_headers=None,
960
-                    extra_label=""):
961
-        response_copy = self.object_copy(src_uri, dst_uri, extra_headers)
960
+                    src_size=None, extra_label=""):
961
+        response_copy = self.object_copy(src_uri, dst_uri, extra_headers,
962
+                                         src_size, extra_label)
962 963
         debug("Object %s copied to %s" % (src_uri, dst_uri))
963 964
         if not response_copy["data"] \
964 965
            or getRootTagName(response_copy["data"]) \
965
-              in ["CopyObjectResult", "CompleteMultipartUploadResult"]:
966
+           in ["CopyObjectResult", "CompleteMultipartUploadResult"]:
966 967
             self.object_delete(src_uri)
967 968
             debug("Object '%s' deleted", src_uri)
968 969
         else:
... ...
@@ -831,19 +831,23 @@ def subcmd_cp_mv(args, process_fce, action_str, message):
831 831
     cfg = Config()
832 832
     if action_str == 'modify':
833 833
         if len(args) < 1:
834
-            raise ParameterError("Expecting one or more S3 URIs for " + action_str)
834
+            raise ParameterError("Expecting one or more S3 URIs for "
835
+                                 + action_str)
835 836
         destination_base = None
836 837
     else:
837 838
         if len(args) < 2:
838
-            raise ParameterError("Expecting two or more S3 URIs for " + action_str)
839
+            raise ParameterError("Expecting two or more S3 URIs for "
840
+                                 + action_str)
839 841
         dst_base_uri = S3Uri(args.pop())
840 842
         if dst_base_uri.type != "s3":
841
-            raise ParameterError("Destination must be S3 URI. To download a file use 'get' or 'sync'.")
843
+            raise ParameterError("Destination must be S3 URI. To download a "
844
+                                 "file use 'get' or 'sync'.")
842 845
         destination_base = dst_base_uri.uri()
843 846
 
844 847
     scoreboard = ExitScoreboard()
845 848
 
846
-    remote_list, exclude_list, remote_total_size = fetch_remote_list(args, require_attribs = False)
849
+    remote_list, exclude_list, remote_total_size = \
850
+        fetch_remote_list(args, require_attribs=False)
847 851
 
848 852
     remote_count = len(remote_list)
849 853
 
... ...
@@ -854,7 +858,9 @@ def subcmd_cp_mv(args, process_fce, action_str, message):
854 854
         # so we don't need to test for it here.
855 855
         if not destination_base.endswith('/') \
856 856
            and (len(remote_list) > 1 or cfg.recursive):
857
-            raise ParameterError("Destination must be a directory and end with '/' when acting on a folder content or on multiple sources.")
857
+            raise ParameterError("Destination must be a directory and end with"
858
+                                 " '/' when acting on a folder content or on "
859
+                                 "multiple sources.")
858 860
 
859 861
         if cfg.recursive:
860 862
             for key in remote_list:
... ...
@@ -873,7 +879,9 @@ def subcmd_cp_mv(args, process_fce, action_str, message):
873 873
         for key in exclude_list:
874 874
             output(u"exclude: %s" % key)
875 875
         for key in remote_list:
876
-            output(u"%s: '%s' -> '%s'" % (action_str, remote_list[key]['object_uri_str'], remote_list[key]['dest_name']))
876
+            output(u"%s: '%s' -> '%s'" % (action_str,
877
+                                          remote_list[key]['object_uri_str'],
878
+                                          remote_list[key]['dest_name']))
877 879
 
878 880
         warning(u"Exiting now because of --dry-run")
879 881
         return EX_OK
... ...
@@ -886,10 +894,12 @@ def subcmd_cp_mv(args, process_fce, action_str, message):
886 886
         item = remote_list[key]
887 887
         src_uri = S3Uri(item['object_uri_str'])
888 888
         dst_uri = S3Uri(item['dest_name'])
889
+        src_size = item.get('size')
889 890
 
890 891
         extra_headers = copy(cfg.extra_headers)
891 892
         try:
892 893
             response = process_fce(src_uri, dst_uri, extra_headers,
894
+                                   src_size=src_size,
893 895
                                    extra_label=seq_label)
894 896
             output(message % {"src": src_uri, "dst": dst_uri,
895 897
                               "extra": seq_label})
... ...
@@ -902,12 +912,14 @@ def subcmd_cp_mv(args, process_fce, action_str, message):
902 902
                 warning(u"Key not found %s" % item['object_uri_str'])
903 903
             else:
904 904
                 scoreboard.failed()
905
-            if cfg.stop_on_error: break
905
+            if cfg.stop_on_error:
906
+                break
906 907
     return scoreboard.rc()
907 908
 
908 909
 def cmd_cp(args):
909 910
     s3 = S3(Config())
910
-    return subcmd_cp_mv(args, s3.object_copy, "copy", u"remote copy: '%(src)s' -> '%(dst)s'  %(extra)s")
911
+    return subcmd_cp_mv(args, s3.object_copy, "copy",
912
+                        u"remote copy: '%(src)s' -> '%(dst)s'  %(extra)s")
911 913
 
912 914
 def cmd_modify(args):
913 915
     s3 = S3(Config())
... ...
@@ -916,7 +928,8 @@ def cmd_modify(args):
916 916
 
917 917
 def cmd_mv(args):
918 918
     s3 = S3(Config())
919
-    return subcmd_cp_mv(args, s3.object_move, "move", u"move: '%(src)s' -> '%(dst)s'  %(extra)s")
919
+    return subcmd_cp_mv(args, s3.object_move, "move",
920
+                        u"move: '%(src)s' -> '%(dst)s'  %(extra)s")
920 921
 
921 922
 def cmd_info(args):
922 923
     cfg = Config()
... ...
@@ -1112,10 +1125,12 @@ def cmd_sync_remote2remote(args):
1112 1112
             item = src_list[file]
1113 1113
             src_uri = S3Uri(item['object_uri_str'])
1114 1114
             dst_uri = S3Uri(item['target_uri'])
1115
+            src_size = item.get('size')
1115 1116
             seq_label = "[%d of %d]" % (seq, src_count)
1116 1117
             extra_headers = copy(cfg.extra_headers)
1117 1118
             try:
1118 1119
                 response = s3.object_copy(src_uri, dst_uri, extra_headers,
1120
+                                          src_size=src_size,
1119 1121
                                           extra_label=seq_label)
1120 1122
                 output(u"remote copy: '%s' -> '%s'  %s" %
1121 1123
                        (src_uri, dst_uri, seq_label))
... ...
@@ -1561,13 +1576,15 @@ def remote_copy(s3, copy_pairs, destination_base, uploaded_objects_list=None):
1561 1561
         debug(u"Remote Copying from %s to %s" % (dst1, dst2))
1562 1562
         dst1_uri = S3Uri(destination_base + dst1)
1563 1563
         dst2_uri = S3Uri(destination_base + dst2)
1564
+        src_obj_size = src_obj.get(u'size', 0)
1564 1565
         seq_label = "[%d of %d]" % (seq, src_count)
1565 1566
         extra_headers = copy(cfg.extra_headers)
1566 1567
         try:
1567 1568
             s3.object_copy(dst1_uri, dst2_uri, extra_headers,
1569
+                           src_size=src_obj_size,
1568 1570
                            extra_label=seq_label)
1569 1571
             output(u"remote copy: '%s' -> '%s'  %s" % (dst1, dst2, seq_label))
1570
-            saved_bytes += src_obj.get(u'size', 0)
1572
+            saved_bytes += src_obj_size
1571 1573
             if uploaded_objects_list is not None:
1572 1574
                 uploaded_objects_list.append(dst2)
1573 1575
         except Exception: