Browse code

Fixes #346 - Add support for listing and resume of multipart uploads of more than 1000 parts

Add server pagination handling support to:
"get_multipart" (list incomplete multipart files)
"list_multipart" (list parts of an incomplete multipart file)

In order to be able to list and resume multipart uploads that have more
than 1000 parts.

Florent Viard authored on 2020/06/22 10:54:16
Showing 3 changed files
... ...
@@ -48,11 +48,10 @@ class MultiPartUpload(object):
48 48
         self.upload_id = self.initiate_multipart_upload()
49 49
 
50 50
     def get_parts_information(self, uri, upload_id):
51
-        multipart_response = self.s3.list_multipart(uri, upload_id)
52
-        tree = getTreeFromXml(multipart_response['data'])
51
+        part_list = self.s3.list_multipart(uri, upload_id)
53 52
 
54 53
         parts = dict()
55
-        for elem in parseNodes(tree):
54
+        for elem in part_list:
56 55
             try:
57 56
                 parts[int(elem['PartNumber'])] = {
58 57
                     'checksum': elem['ETag'],
... ...
@@ -65,9 +64,8 @@ class MultiPartUpload(object):
65 65
 
66 66
     def get_unique_upload_id(self, uri):
67 67
         upload_id = ""
68
-        multipart_response = self.s3.get_multipart(uri)
69
-        tree = getTreeFromXml(multipart_response['data'])
70
-        for mpupload in parseNodes(tree):
68
+        multipart_list = self.s3.get_multipart(uri)
69
+        for mpupload in multipart_list:
71 70
             try:
72 71
                 mp_upload_id = mpupload['UploadId']
73 72
                 mp_path = mpupload['Key']
... ...
@@ -1135,9 +1135,131 @@ class S3(object):
1135 1135
         response = self.send_request(request)
1136 1136
         return response
1137 1137
 
1138
-    def get_multipart(self, uri):
1139
-        request = self.create_request("BUCKET_LIST", bucket = uri.bucket(),
1140
-                                      uri_params = {'uploads': None})
1138
+    def get_multipart(self, uri, uri_params=None, limit=-1):
1139
+        upload_list = []
1140
+        for truncated, uploads in self.get_multipart_streaming(uri,
1141
+                                                               uri_params,
1142
+                                                               limit):
1143
+            upload_list.extend(uploads)
1144
+
1145
+        return upload_list
1146
+
1147
+    def get_multipart_streaming(self, uri, uri_params=None, limit=-1):
1148
+        uri_params = uri_params and uri_params.copy() or {}
1149
+        bucket = uri.bucket()
1150
+
1151
+        truncated = True
1152
+        num_objects = 0
1153
+        max_keys = limit
1154
+
1155
+        # It is the "uploads: None" in uri_params that will change the
1156
+        # behavior of bucket_list to return multiparts instead of keys
1157
+        uri_params['uploads'] = None
1158
+        while truncated:
1159
+            response = self.bucket_list_noparse(bucket, recursive=True,
1160
+                                                uri_params=uri_params,
1161
+                                                max_keys=max_keys)
1162
+
1163
+            xml_data = response["data"]
1164
+            # extract list of info of uploads
1165
+            upload_list = getListFromXml(xml_data, "Upload")
1166
+            num_objects += len(upload_list)
1167
+            if limit > num_objects:
1168
+                max_keys = limit - num_objects
1169
+
1170
+            xml_truncated = getTextFromXml(xml_data, ".//IsTruncated")
1171
+            if not xml_truncated or xml_truncated.lower() == "false":
1172
+                truncated = False
1173
+
1174
+            if truncated:
1175
+                if limit == -1 or num_objects < limit:
1176
+                    if upload_list:
1177
+                        next_key = getTextFromXml(xml_data, "NextKeyMarker")
1178
+                        if not next_key:
1179
+                            next_key = upload_list[-1]["Key"]
1180
+                        uri_params['KeyMarker'] = next_key
1181
+
1182
+                        upload_id_marker = getTextFromXml(
1183
+                            xml_data, "NextUploadIdMarker")
1184
+                        if upload_id_marker:
1185
+                            uri_params['UploadIdMarker'] = upload_id_marker
1186
+                        elif 'UploadIdMarker' in uri_params:
1187
+                            # Clear any pre-existing value
1188
+                            del uri_params['UploadIdMarker']
1189
+                    else:
1190
+                        # Unexpectedly, the server lied, and so the previous
1191
+                        # response was not truncated. So, no new key to get.
1192
+                        yield False, upload_list
1193
+                        break
1194
+                    debug("Listing continues after '%s'" %
1195
+                          uri_params['KeyMarker'])
1196
+                else:
1197
+                    yield truncated, upload_list
1198
+                    break
1199
+            yield truncated, upload_list
1200
+
1201
+    def list_multipart(self, uri, upload_id, uri_params=None, limit=-1):
1202
+        part_list = []
1203
+        for truncated, parts in self.list_multipart_streaming(uri,
1204
+                                                              upload_id,
1205
+                                                              uri_params,
1206
+                                                              limit):
1207
+            part_list.extend(parts)
1208
+
1209
+        return part_list
1210
+
1211
+    def list_multipart_streaming(self, uri, upload_id, uri_params=None,
1212
+                                 limit=-1):
1213
+        uri_params = uri_params and uri_params.copy() or {}
1214
+
1215
+        truncated = True
1216
+        num_objects = 0
1217
+        max_parts = limit
1218
+
1219
+        while truncated:
1220
+            response = self.list_multipart_noparse(uri, upload_id,
1221
+                                                   uri_params, max_parts)
1222
+
1223
+            xml_data = response["data"]
1224
+            # extract list of multipart upload parts
1225
+            part_list = getListFromXml(xml_data, "Part")
1226
+            num_objects += len(part_list)
1227
+            if limit > num_objects:
1228
+                max_parts = limit - num_objects
1229
+
1230
+            xml_truncated = getTextFromXml(xml_data, ".//IsTruncated")
1231
+            if not xml_truncated or xml_truncated.lower() == "false":
1232
+                truncated = False
1233
+
1234
+            if truncated:
1235
+                if limit == -1 or num_objects < limit:
1236
+                    if part_list:
1237
+                        next_part_number = getTextFromXml(
1238
+                            xml_data, "NextPartNumberMarker")
1239
+                        if not next_part_number:
1240
+                            next_part_number = part_list[-1]["PartNumber"]
1241
+                        uri_params['part-number-marker'] = next_part_number
1242
+                    else:
1243
+                        # Unexpectedly, the server lied, and so the previous
1244
+                        # response was not truncated. So, no new part to get.
1245
+                        yield False, part_list
1246
+                        break
1247
+                    debug("Listing continues after Part '%s'" %
1248
+                          uri_params['part-number-marker'])
1249
+                else:
1250
+                    yield truncated, part_list
1251
+                    break
1252
+            yield truncated, part_list
1253
+
1254
+    def list_multipart_noparse(self, uri, upload_id, uri_params=None,
1255
+                               max_parts=-1):
1256
+        if uri_params is None:
1257
+            uri_params = {}
1258
+        if max_parts != -1:
1259
+            uri_params['max-parts'] = str(max_parts)
1260
+        uri_params['uploadId'] = upload_id
1261
+        request = self.create_request("OBJECT_GET", uri=uri,
1262
+                                      uri_params=uri_params)
1141 1263
         response = self.send_request(request)
1142 1264
         return response
1143 1265
 
... ...
@@ -1147,12 +1269,6 @@ class S3(object):
1147 1147
         response = self.send_request(request)
1148 1148
         return response
1149 1149
 
1150
-    def list_multipart(self, uri, id):
1151
-        request = self.create_request("OBJECT_GET", uri = uri,
1152
-                                      uri_params = {'uploadId': id})
1153
-        response = self.send_request(request)
1154
-        return response
1155
-
1156 1150
     def get_accesslog(self, uri):
1157 1151
         request = self.create_request("BUCKET_LIST", bucket = uri.bucket(),
1158 1152
                                       uri_params = {'logging': None})
... ...
@@ -2120,15 +2120,16 @@ def cmd_multipart(args):
2120 2120
     #id = ''
2121 2121
     #if(len(args) > 1): id = args[1]
2122 2122
 
2123
-    response = s3.get_multipart(uri)
2124
-    debug(u"response - %s" % response['status'])
2123
+    upload_list = s3.get_multipart(uri)
2125 2124
     output(u"%s" % uri)
2126
-    tree = getTreeFromXml(response['data'])
2127
-    debug(parseNodes(tree))
2125
+    debug(upload_list)
2128 2126
     output(u"Initiated\tPath\tId")
2129
-    for mpupload in parseNodes(tree):
2127
+    for mpupload in upload_list:
2130 2128
         try:
2131
-            output(u"%s\t%s\t%s" % (mpupload['Initiated'], "s3://" + uri.bucket() + "/" + mpupload['Key'], mpupload['UploadId']))
2129
+            output(u"%s\t%s\t%s" % (
2130
+                mpupload['Initiated'],
2131
+                "s3://" + uri.bucket() + "/" + mpupload['Key'],
2132
+                mpupload['UploadId']))
2132 2133
         except KeyError:
2133 2134
             pass
2134 2135
     return EX_OK
... ...
@@ -2151,14 +2152,15 @@ def cmd_list_multipart(args):
2151 2151
     uri = S3Uri(args[0])
2152 2152
     id = args[1]
2153 2153
 
2154
-    response = s3.list_multipart(uri, id)
2155
-    debug(u"response - %s" % response['status'])
2156
-    tree = getTreeFromXml(response['data'])
2154
+    part_list = s3.list_multipart(uri, id)
2157 2155
     output(u"LastModified\t\t\tPartNumber\tETag\tSize")
2158
-    for mpupload in parseNodes(tree):
2156
+    for mpupload in part_list:
2159 2157
         try:
2160
-            output(u"%s\t%s\t%s\t%s" % (mpupload['LastModified'], mpupload['PartNumber'], mpupload['ETag'], mpupload['Size']))
2161
-        except Exception:
2158
+            output(u"%s\t%s\t%s\t%s" % (mpupload['LastModified'],
2159
+                                        mpupload['PartNumber'],
2160
+                                        mpupload['ETag'],
2161
+                                        mpupload['Size']))
2162
+        except KeyError:
2162 2163
             pass
2163 2164
     return EX_OK
2164 2165