Browse code

getting sequential batches for dealing with S3s eventual consistency

Ubuntu authored on 2014/02/20 00:52:45
Showing 3 changed files
... ...
@@ -322,7 +322,7 @@ def fetch_local_list(args, is_src = False, recursive = None):
322 322
 
323 323
     return local_list, single_file
324 324
 
325
-def fetch_remote_list(args, require_attribs = False, recursive = None, batch_mode = False):
325
+def fetch_remote_list(args, require_attribs = False, recursive = None, batch_mode = False, uri_params = {}):
326 326
     def _get_remote_attribs(uri, remote_item):
327 327
         response = S3(cfg).object_info(uri)
328 328
         remote_item.update({
... ...
@@ -357,7 +357,8 @@ def fetch_remote_list(args, require_attribs = False, recursive = None, batch_mod
357 357
         empty_fname_re = re.compile(r'\A\s*\Z')
358 358
 
359 359
         s3 = S3(Config())
360
-        response = s3.bucket_list(remote_uri.bucket(), prefix = remote_uri.object(), recursive = recursive, batch_mode = batch_mode)
360
+        response = s3.bucket_list(remote_uri.bucket(), prefix = remote_uri.object(),
361
+                                  recursive = recursive, batch_mode = batch_mode, uri_params = uri_params)
361 362
 
362 363
         rem_base_original = rem_base = remote_uri.object()
363 364
         remote_uri_original = remote_uri
... ...
@@ -226,7 +226,7 @@ class S3(object):
226 226
         response["list"] = getListFromXml(response["data"], "Bucket")
227 227
         return response
228 228
 
229
-    def bucket_list(self, bucket, prefix = None, recursive = None, batch_mode = False):
229
+    def bucket_list(self, bucket, prefix = None, recursive = None, batch_mode = False, uri_params = {}):
230 230
         def _list_truncated(data):
231 231
             ## <IsTruncated> can either be "true" or "false" or be missing completely
232 232
             is_truncated = getTextFromXml(data, ".//IsTruncated") or "false"
... ...
@@ -238,7 +238,6 @@ class S3(object):
238 238
         def _get_common_prefixes(data):
239 239
             return getListFromXml(data, "CommonPrefixes")
240 240
 
241
-        uri_params = {}
242 241
         truncated = True
243 242
         list = []
244 243
         prefixes = []
... ...
@@ -242,14 +242,17 @@ def cmd_website_delete(args):
242 242
                 raise
243 243
 
244 244
 def cmd_bucket_delete(args):
245
-    def _bucket_delete_one(uri):
245
+    def _bucket_delete_one(uri, args = None):
246 246
         try:
247
+            marker = args.get('marker', '') if args else ''
247 248
             response = s3.bucket_delete(uri.bucket())
248 249
         except S3Error, e:
249 250
             if e.info['Code'] == 'BucketNotEmpty' and (cfg.force or cfg.recursive):
250 251
                 warning(u"Bucket is not empty. Removing all the objects from it first. This may take some time...")
251
-                subcmd_object_del_uri(uri.uri(), recursive = True, batch_mode = True)
252
-                return _bucket_delete_one(uri)
252
+                batch_uri_args = {'marker': marker}
253
+                marker = subcmd_object_del_uri(uri.uri(), recursive = True, 
254
+                                               batch_mode = True, batch_uri_args = batch_uri_args)
255
+                return _bucket_delete_one(uri, args = {'marker': marker})
253 256
             elif S3.codes.has_key(e.info["Code"]):
254 257
                 error(S3.codes[e.info["Code"]] % uri.bucket())
255 258
                 return
... ...
@@ -508,13 +511,14 @@ def cmd_object_del(args):
508 508
                 raise ParameterError("File name required, not only the bucket name. Alternatively use --recursive")
509 509
         subcmd_object_del_uri(uri_str)
510 510
 
511
-def subcmd_object_del_uri(uri_str, recursive = None, batch_mode = False):
511
+def subcmd_object_del_uri(uri_str, recursive = None, batch_mode = False, batch_uri_args = {}):
512 512
     s3 = S3(cfg)
513 513
 
514 514
     if recursive is None:
515 515
         recursive = cfg.recursive
516 516
 
517
-    remote_list = fetch_remote_list(uri_str, require_attribs = False, recursive = recursive, batch_mode = batch_mode)
517
+    remote_list = fetch_remote_list(uri_str, require_attribs = False, recursive = recursive, 
518
+                                    batch_mode = batch_mode, uri_params = batch_uri_args)
518 519
     remote_list, exclude_list = filter_exclude_include(remote_list)
519 520
 
520 521
     remote_count = len(remote_list)
... ...
@@ -535,18 +539,26 @@ def subcmd_object_del_uri(uri_str, recursive = None, batch_mode = False):
535 535
 
536 536
     if batch_mode:
537 537
         response = s3.object_batch_delete(remote_list)
538
+        keys_count = len(remote_list) if remote_list else 0
538 539
         first_key = S3Uri(remote_list[remote_list.keys()[0]]['object_uri_str']) if remote_list else "None"
539 540
         last_key = S3Uri(remote_list[remote_list.keys()[-1]]['object_uri_str']) if remote_list else "None"
540
-        output(u"Number of deleted keys: %d" % len(remote_list))
541
+        output(u"Number of deleted keys: %d" % keys_count)
541 542
         output(u"From: \t%s" % first_key)
542 543
         output(u"To: \t%s" % last_key)
543 544
         remote_list = []
545
+        if keys_count == 1000:
546
+            # reached limit of keys, probably there are more..
547
+            # worst case scenario we do a useless listing but that's ok
548
+            marker = last_key.object() if last_key != "None" and last_key.has_object() else ''
549
+            return marker 
544 550
 
545 551
     for key in remote_list:
546 552
         item = remote_list[key]
547 553
         response = s3.object_delete(S3Uri(item['object_uri_str']))
548 554
         output(u"File %s deleted" % item['object_uri_str'])
549 555
 
556
+    return ''
557
+
550 558
 def cmd_object_restore(args):
551 559
     s3 = S3(cfg)
552 560