Browse code

Add progress info output for multipart copy and current-total info in output for cp, mv and modify

Florent Viard authored on 2020/04/14 05:27:36
Showing 3 changed files
... ...
@@ -130,12 +130,11 @@ class MultiPartUpload(object):
130 130
         if extra_label:
131 131
             extra_label = u' ' + extra_label
132 132
         labels = {
133
-            'source' : filename,
134
-            'destination' : self.dst_uri.uri(),
133
+            'source': filename,
134
+            'destination': self.dst_uri.uri(),
135 135
         }
136 136
 
137 137
         seq = 1
138
-
139 138
         if self.src_size:
140 139
             size_left = self.src_size
141 140
             nr_parts = self.src_size // self.chunk_size \
... ...
@@ -274,7 +273,10 @@ class MultiPartUpload(object):
274 274
         request = self.s3.create_request("OBJECT_PUT", uri=self.dst_uri,
275 275
                                          headers=headers,
276 276
                                          uri_params=query_string_params)
277
-        response = self.s3.send_request(request)
277
+
278
+        labels[u'action'] = u'remote copy'
279
+        response = self.s3.send_request_with_progress(request, labels,
280
+                                                      chunk_size)
278 281
 
279 282
         # NOTE: Amazon sends whitespace while upload progresses, which
280 283
         # accumulates in response body and seems to confuse XML parser.
... ...
@@ -811,7 +811,8 @@ class S3(object):
811 811
                 del headers[h.lower()]
812 812
         return headers
813 813
 
814
-    def object_copy(self, src_uri, dst_uri, extra_headers = None):
814
+    def object_copy(self, src_uri, dst_uri, extra_headers=None,
815
+                    extra_label=""):
815 816
         if src_uri.type != "s3":
816 817
             raise ValueError("Expected URI type 's3', got '%s'" % src_uri.type)
817 818
         if dst_uri.type != "s3":
... ...
@@ -853,7 +854,7 @@ class S3(object):
853 853
             if size > self.config.multipart_copy_chunk_size_mb * 1024 * 1024:
854 854
                 # Multipart requests are quite different... drop here
855 855
                 return self.copy_file_multipart(src_uri, dst_uri, size,
856
-                                                headers)
856
+                                                headers, extra_label)
857 857
 
858 858
         ## Not multipart...
859 859
         headers['x-amz-copy-source'] = "/%s/%s" % (
... ...
@@ -882,7 +883,8 @@ class S3(object):
882 882
                     raise exc
883 883
         return response
884 884
 
885
-    def object_modify(self, src_uri, dst_uri, extra_headers = None):
885
+    def object_modify(self, src_uri, dst_uri, extra_headers=None,
886
+                      extra_label=""):
886 887
 
887 888
         if src_uri.type != "s3":
888 889
             raise ValueError("Expected URI type 's3', got '%s'" % src_uri.type)
... ...
@@ -932,7 +934,7 @@ class S3(object):
932 932
             error("Server error during the MODIFY operation. Overwrite response status to 500")
933 933
             raise S3Error(response)
934 934
 
935
-        if acl != None:
935
+        if acl is not None:
936 936
             try:
937 937
                 self.set_acl(src_uri, acl)
938 938
             except S3Error as exc:
... ...
@@ -943,7 +945,8 @@ class S3(object):
943 943
 
944 944
         return response
945 945
 
946
-    def object_move(self, src_uri, dst_uri, extra_headers = None):
946
+    def object_move(self, src_uri, dst_uri, extra_headers=None,
947
+                    extra_label=""):
947 948
         response_copy = self.object_copy(src_uri, dst_uri, extra_headers)
948 949
         debug("Object %s copied to %s" % (src_uri, dst_uri))
949 950
         if not response_copy["data"] or getRootTagName(response_copy["data"]) == "CopyObjectResult":
... ...
@@ -1385,6 +1388,30 @@ class S3(object):
1385 1385
 
1386 1386
         return response
1387 1387
 
1388
+    def send_request_with_progress(self, request, labels, operation_size=0):
1389
+        """Wrapper around send_request for slow requests.
1390
+
1391
+        To be able to show progression for small requests
1392
+        """
1393
+        if not self.config.progress_meter:
1394
+            info("Sending slow request '%s', please wait..." % filename)
1395
+            return self.send_request(request)
1396
+
1397
+        if 'action' not in labels:
1398
+            labels[u'action'] = u'request'
1399
+        progress = self.config.progress_class(labels, operation_size)
1400
+
1401
+        try:
1402
+            response = self.send_request(request)
1403
+        except Exception as exc:
1404
+            progress.done("failed")
1405
+            raise
1406
+
1407
+        progress.update(current_position=operation_size)
1408
+        progress.done("done")
1409
+
1410
+        return response
1411
+
1388 1412
     def send_file(self, request, stream, labels, buffer = '', throttle = 0,
1389 1413
                   retries = _max_retries, offset = 0, chunk_size = -1,
1390 1414
                   use_expect_continue = None):
... ...
@@ -889,8 +889,10 @@ def subcmd_cp_mv(args, process_fce, action_str, message):
889 889
 
890 890
         extra_headers = copy(cfg.extra_headers)
891 891
         try:
892
-            response = process_fce(src_uri, dst_uri, extra_headers)
893
-            output(message % { "src" : src_uri, "dst" : dst_uri })
892
+            response = process_fce(src_uri, dst_uri, extra_headers,
893
+                                   extra_label=seq_label)
894
+            output(message % {"src": src_uri, "dst": dst_uri,
895
+                              "extra": seq_label})
894 896
             if Config().acl_public:
895 897
                 info(u"Public URL is: %s" % dst_uri.public_url())
896 898
             scoreboard.success()
... ...
@@ -905,15 +907,16 @@ def subcmd_cp_mv(args, process_fce, action_str, message):
905 905
 
906 906
 def cmd_cp(args):
907 907
     s3 = S3(Config())
908
-    return subcmd_cp_mv(args, s3.object_copy, "copy", u"remote copy: '%(src)s' -> '%(dst)s'")
908
+    return subcmd_cp_mv(args, s3.object_copy, "copy", u"remote copy: '%(src)s' -> '%(dst)s'  %(extra)s")
909 909
 
910 910
 def cmd_modify(args):
911 911
     s3 = S3(Config())
912
-    return subcmd_cp_mv(args, s3.object_modify, "modify", u"modify: '%(src)s'")
912
+    return subcmd_cp_mv(args, s3.object_modify, "modify",
913
+                        u"modify: '%(src)s'  %(extra)s")
913 914
 
914 915
 def cmd_mv(args):
915 916
     s3 = S3(Config())
916
-    return subcmd_cp_mv(args, s3.object_move, "move", u"move: '%(src)s' -> '%(dst)s'")
917
+    return subcmd_cp_mv(args, s3.object_move, "move", u"move: '%(src)s' -> '%(dst)s'  %(extra)s")
917 918
 
918 919
 def cmd_info(args):
919 920
     cfg = Config()
... ...
@@ -1112,8 +1115,10 @@ def cmd_sync_remote2remote(args):
1112 1112
             seq_label = "[%d of %d]" % (seq, src_count)
1113 1113
             extra_headers = copy(cfg.extra_headers)
1114 1114
             try:
1115
-                response = s3.object_copy(src_uri, dst_uri, extra_headers)
1116
-                output(u"remote copy: '%s' -> '%s'" % (src_uri, dst_uri))
1115
+                response = s3.object_copy(src_uri, dst_uri, extra_headers,
1116
+                                          extra_label=seq_label)
1117
+                output(u"remote copy: '%s' -> '%s'  %s" %
1118
+                       (src_uri, dst_uri, seq_label))
1117 1119
                 total_nb_files += 1
1118 1120
                 total_size += item.get(u'size', 0)
1119 1121
             except S3Error as exc:
... ...
@@ -1549,15 +1554,20 @@ def remote_copy(s3, copy_pairs, destination_base, uploaded_objects_list=None):
1549 1549
     cfg = Config()
1550 1550
     saved_bytes = 0
1551 1551
     failed_copy_list = FileDict()
1552
+    seq = 0
1553
+    src_count = len(copy_pairs)
1552 1554
     for (src_obj, dst1, dst2) in copy_pairs:
1555
+        seq += 1
1553 1556
         debug(u"Remote Copying from %s to %s" % (dst1, dst2))
1554 1557
         dst1_uri = S3Uri(destination_base + dst1)
1555 1558
         dst2_uri = S3Uri(destination_base + dst2)
1559
+        seq_label = "[%d of %d]" % (seq, src_count)
1556 1560
         extra_headers = copy(cfg.extra_headers)
1557 1561
         try:
1558
-            s3.object_copy(dst1_uri, dst2_uri, extra_headers)
1562
+            s3.object_copy(dst1_uri, dst2_uri, extra_headers,
1563
+                           extra_label=seq_label)
1564
+            output(u"remote copy: '%s' -> '%s'  %s" % (dst1, dst2, seq_label))
1559 1565
             saved_bytes += src_obj.get(u'size', 0)
1560
-            output(u"remote copy: '%s' -> '%s'" % (dst1, dst2))
1561 1566
             if uploaded_objects_list is not None:
1562 1567
                 uploaded_objects_list.append(dst2)
1563 1568
         except Exception: