Browse code

Merge branch 'master' of git://github.com/vamitrou/s3cmd into vamitrou-master

Matt Domsch authored on 2014/03/12 11:17:05
Showing 3 changed files
... ...
@@ -322,7 +322,7 @@ def fetch_local_list(args, is_src = False, recursive = None):
322 322
 
323 323
     return local_list, single_file
324 324
 
325
-def fetch_remote_list(args, require_attribs = False, recursive = None):
325
+def fetch_remote_list(args, require_attribs = False, recursive = None, batch_mode = False, uri_params = {}):
326 326
     def _get_remote_attribs(uri, remote_item):
327 327
         response = S3(cfg).object_info(uri)
328 328
         remote_item.update({
... ...
@@ -337,7 +337,7 @@ def fetch_remote_list(args, require_attribs = False, recursive = None):
337 337
         except KeyError:
338 338
             pass
339 339
 
340
-    def _get_filelist_remote(remote_uri, recursive = True):
340
+    def _get_filelist_remote(remote_uri, recursive = True, batch_mode = False):
341 341
         ## If remote_uri ends with '/' then all remote files will have
342 342
         ## the remote_uri prefix removed in the relative path.
343 343
         ## If, on the other hand, the remote_uri ends with something else
... ...
@@ -357,7 +357,8 @@ def fetch_remote_list(args, require_attribs = False, recursive = None):
357 357
         empty_fname_re = re.compile(r'\A\s*\Z')
358 358
 
359 359
         s3 = S3(Config())
360
-        response = s3.bucket_list(remote_uri.bucket(), prefix = remote_uri.object(), recursive = recursive)
360
+        response = s3.bucket_list(remote_uri.bucket(), prefix = remote_uri.object(),
361
+                                  recursive = recursive, batch_mode = batch_mode, uri_params = uri_params)
361 362
 
362 363
         rem_base_original = rem_base = remote_uri.object()
363 364
         remote_uri_original = remote_uri
... ...
@@ -417,7 +418,7 @@ def fetch_remote_list(args, require_attribs = False, recursive = None):
417 417
 
418 418
     if recursive:
419 419
         for uri in remote_uris:
420
-            objectlist = _get_filelist_remote(uri)
420
+            objectlist = _get_filelist_remote(uri, batch_mode = batch_mode)
421 421
             for key in objectlist:
422 422
                 remote_list[key] = objectlist[key]
423 423
                 remote_list.record_md5(key, objectlist.get_md5(key))
... ...
@@ -7,10 +7,12 @@ import sys
7 7
 import os, os.path
8 8
 import time
9 9
 import errno
10
+import base64
10 11
 import httplib
11 12
 import logging
12 13
 import mimetypes
13 14
 import re
15
+from xml.sax import saxutils
14 16
 from logging import debug, info, warning, error
15 17
 from stat import ST_SIZE
16 18
 
... ...
@@ -161,6 +163,7 @@ class S3(object):
161 161
         SERVICE = 0x0100,
162 162
         BUCKET = 0x0200,
163 163
         OBJECT = 0x0400,
164
+        BATCH = 0x0800,
164 165
         MASK = 0x0700,
165 166
     )
166 167
 
... ...
@@ -175,6 +178,7 @@ class S3(object):
175 175
         OBJECT_HEAD = targets["OBJECT"] | http_methods["HEAD"],
176 176
         OBJECT_DELETE = targets["OBJECT"] | http_methods["DELETE"],
177 177
         OBJECT_POST = targets["OBJECT"] | http_methods["POST"],
178
+        BATCH_DELETE = targets["BATCH"] | http_methods["POST"],
178 179
     )
179 180
 
180 181
     codes = {
... ...
@@ -223,7 +227,7 @@ class S3(object):
223 223
         response["list"] = getListFromXml(response["data"], "Bucket")
224 224
         return response
225 225
 
226
-    def bucket_list(self, bucket, prefix = None, recursive = None):
226
+    def bucket_list(self, bucket, prefix = None, recursive = None, batch_mode = False, uri_params = {}):
227 227
         def _list_truncated(data):
228 228
             ## <IsTruncated> can either be "true" or "false" or be missing completely
229 229
             is_truncated = getTextFromXml(data, ".//IsTruncated") or "false"
... ...
@@ -235,7 +239,6 @@ class S3(object):
235 235
         def _get_common_prefixes(data):
236 236
             return getListFromXml(data, "CommonPrefixes")
237 237
 
238
-        uri_params = {}
239 238
         truncated = True
240 239
         list = []
241 240
         prefixes = []
... ...
@@ -244,7 +247,7 @@ class S3(object):
244 244
             response = self.bucket_list_noparse(bucket, prefix, recursive, uri_params)
245 245
             current_list = _get_contents(response["data"])
246 246
             current_prefixes = _get_common_prefixes(response["data"])
247
-            truncated = _list_truncated(response["data"])
247
+            truncated = _list_truncated(response["data"]) and not batch_mode
248 248
             if truncated:
249 249
                 if current_list:
250 250
                     uri_params['marker'] = self.urlencode_string(current_list[-1]["Key"])
... ...
@@ -484,13 +487,42 @@ class S3(object):
484 484
         response = self.recv_file(request, stream, labels, start_position)
485 485
         return response
486 486
 
487
+    def object_batch_delete(self, remote_list):
488
+        def compose_batch_del_xml(bucket, key_list):
489
+            body = u"<?xml version=\"1.0\" encoding=\"UTF-8\"?><Delete>"
490
+            for key in key_list:
491
+                uri = S3Uri(key)
492
+                if uri.type != "s3":
493
+                    raise ValueError("Excpected URI type 's3', got '%s'" % uri.type)
494
+                if not uri.has_object():
495
+                    raise ValueError("URI '%s' has no object" % key)
496
+                if uri.bucket() != bucket:
497
+                    raise ValueError("The batch should contain keys from the same bucket")
498
+                object = saxutils.escape(uri.object())
499
+                body += u"<Object><Key>%s</Key></Object>" % object
500
+            body += u"</Delete>"
501
+            body = body.encode('utf-8')
502
+            return body
503
+
504
+        batch = [remote_list[item]['object_uri_str'] for item in remote_list]
505
+        if len(batch) == 0:
506
+            raise ValueError("Key list is empty")
507
+        bucket = S3Uri(batch[0]).bucket()
508
+        request_body = compose_batch_del_xml(bucket, batch)
509
+        md5_hash = md5()
510
+        md5_hash.update(request_body)
511
+        headers = {'content-md5': base64.b64encode(md5_hash.digest())}
512
+        request = self.create_request("BATCH_DELETE", bucket = bucket, extra = '?delete', headers = headers)
513
+        response = self.send_request(request, request_body)
514
+        return response
515
+
487 516
     def object_delete(self, uri):
488 517
         if uri.type != "s3":
489 518
             raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
490 519
         request = self.create_request("OBJECT_DELETE", uri = uri)
491 520
         response = self.send_request(request)
492 521
         return response
493
-    
522
+
494 523
     def object_restore(self, uri):
495 524
         if uri.type != "s3":
496 525
             raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
... ...
@@ -245,11 +245,15 @@ def cmd_bucket_delete(args):
245 245
     def _bucket_delete_one(uri):
246 246
         try:
247 247
             response = s3.bucket_delete(uri.bucket())
248
+            output(u"Bucket '%s' removed" % uri.uri())
248 249
         except S3Error, e:
249 250
             if e.info['Code'] == 'BucketNotEmpty' and (cfg.force or cfg.recursive):
250 251
                 warning(u"Bucket is not empty. Removing all the objects from it first. This may take some time...")
251
-                subcmd_object_del_uri(uri.uri(), recursive = True)
252
-                return _bucket_delete_one(uri)
252
+                success = subcmd_batch_del(uri.uri())
253
+                if success:
254
+                    return _bucket_delete_one(uri)
255
+                else:
256
+                    output(u"Bucket was not removed")
253 257
             elif S3.codes.has_key(e.info["Code"]):
254 258
                 error(S3.codes[e.info["Code"]] % uri.bucket())
255 259
                 return
... ...
@@ -262,7 +266,6 @@ def cmd_bucket_delete(args):
262 262
         if not uri.type == "s3" or not uri.has_bucket() or uri.has_object():
263 263
             raise ParameterError("Expecting S3 URI with just the bucket name set instead of '%s'" % arg)
264 264
         _bucket_delete_one(uri)
265
-        output(u"Bucket '%s' removed" % uri.uri())
266 265
 
267 266
 def cmd_object_put(args):
268 267
     cfg = Config()
... ...
@@ -508,6 +511,47 @@ def cmd_object_del(args):
508 508
                 raise ParameterError("File name required, not only the bucket name. Alternatively use --recursive")
509 509
         subcmd_object_del_uri(uri_str)
510 510
 
511
+def subcmd_batch_del(uri_str = None, bucket = None, remote_list = None):
512
+    s3 = S3(cfg)
513
+
514
+    if len([item for item in [uri_str, bucket, remote_list] if item]) != 1:
515
+        raise ValueError("One and only one of 'uri_str', 'bucket', 'remote_list' can be specified.")
516
+
517
+    batch_mode = False
518
+    if bucket:
519
+        uri_str = "s3://%s" % bucket
520
+    if not remote_list:
521
+        batch_mode = True
522
+        remote_list = fetch_remote_list(uri_str, require_attribs = False, batch_mode = True)
523
+    if len(remote_list) == 0:
524
+        warning(u"Remote list is empty.")
525
+        return False
526
+
527
+    if cfg.max_delete > 0 and len(remote_list) > cfg.max_delete:
528
+        warning(u"delete: maximum requested number of deletes would be exceeded, none performed.")
529
+        return False
530
+
531
+    while True:
532
+        if cfg.dry_run:
533
+            output('\n'.join((u"File %s deleted" % remote_list[p]['object_uri_str']) for p in remote_list))
534
+        else:
535
+            response = s3.object_batch_delete(remote_list)
536
+            output('\n'.join((u"File %s deleted" % remote_list[p]['object_uri_str']) for p in remote_list))
537
+
538
+        keys_count = len(remote_list) if remote_list else 0
539
+        if keys_count < 1000 or not batch_mode:
540
+            break
541
+        last_key = S3Uri(remote_list[remote_list.keys()[-1]]['object_uri_str']) if remote_list else None
542
+        if not last_key or not last_key.has_object:
543
+            break
544
+        marker = last_key.object()
545
+        remote_list = fetch_remote_list(uri_str, require_attribs = False, batch_mode = True,
546
+                                        uri_params = {"marker": marker})
547
+    if cfg.dry_run:
548
+        warning(u"Exiting now because of --dry-run")
549
+        return False
550
+    return True
551
+
511 552
 def subcmd_object_del_uri(uri_str, recursive = None):
512 553
     s3 = S3(cfg)
513 554
 
... ...
@@ -537,10 +581,10 @@ def subcmd_object_del_uri(uri_str, recursive = None):
537 537
         item = remote_list[key]
538 538
         response = s3.object_delete(S3Uri(item['object_uri_str']))
539 539
         output(u"File %s deleted" % item['object_uri_str'])
540
-        
540
+
541 541
 def cmd_object_restore(args):
542 542
     s3 = S3(cfg)
543
-    
543
+
544 544
     if cfg.restore_days < 1:
545 545
         raise ParameterError("You must restore a file for 1 or more days")
546 546
 
... ...
@@ -562,14 +606,14 @@ def cmd_object_restore(args):
562 562
 
563 563
     for key in remote_list:
564 564
         item = remote_list[key]
565
-        
565
+
566 566
         uri = S3Uri(item['object_uri_str'])
567 567
         if not item['object_uri_str'].endswith("/"):
568 568
             response = s3.object_restore(S3Uri(item['object_uri_str']))
569 569
             output(u"File %s restoration started" % item['object_uri_str'])
570 570
         else:
571 571
             debug(u"Skipping directory since only files may be restored")
572
-        
572
+
573 573
 
574 574
 def subcmd_cp_mv(args, process_fce, action_str, message):
575 575
     if len(args) < 2:
... ...
@@ -697,20 +741,6 @@ def filedicts_to_keys(*args):
697 697
     return keys
698 698
 
699 699
 def cmd_sync_remote2remote(args):
700
-    def _do_deletes(s3, dst_list):
701
-        if cfg.max_delete > 0 and len(dst_list) > cfg.max_delete:
702
-            warning(u"delete: maximum requested number of deletes would be exceeded, none performed.")
703
-            return
704
-        # Delete items in destination that are not in source
705
-        if cfg.dry_run:
706
-            for key in dst_list:
707
-                output(u"delete: %s" % dst_list[key]['object_uri_str'])
708
-        else:
709
-            for key in dst_list:
710
-                uri = S3Uri(dst_list[key]['object_uri_str'])
711
-                s3.object_delete(uri)
712
-                output(u"deleted: '%s'" % uri)
713
-
714 700
     s3 = S3(Config())
715 701
 
716 702
     # Normalise s3://uri (e.g. assert trailing slash)
... ...
@@ -765,7 +795,7 @@ def cmd_sync_remote2remote(args):
765 765
 
766 766
     # Delete items in destination that are not in source
767 767
     if cfg.delete_removed and not cfg.delete_after:
768
-        _do_deletes(s3, dst_list)
768
+        subcmd_batch_del(remote_list = dst_list)
769 769
 
770 770
     def _upload(src_list, seq, src_count):
771 771
         file_list = src_list.keys()
... ...
@@ -809,7 +839,7 @@ def cmd_sync_remote2remote(args):
809 809
 
810 810
     # Delete items in destination that are not in source
811 811
     if cfg.delete_removed and cfg.delete_after:
812
-        _do_deletes(s3, dst_list)
812
+        subcmd_batch_del(remote_list = dst_list)
813 813
 
814 814
 def cmd_sync_remote2local(args):
815 815
     def _do_deletes(local_list):
... ...
@@ -1115,16 +1145,6 @@ def _build_attr_header(local_list, src):
1115 1115
 
1116 1116
 
1117 1117
 def cmd_sync_local2remote(args):
1118
-
1119
-    def _do_deletes(s3, remote_list):
1120
-        if cfg.max_delete > 0 and len(remote_list) > cfg.max_delete:
1121
-            warning(u"delete: maximum requested number of deletes would be exceeded, none performed.")
1122
-            return
1123
-        for key in remote_list:
1124
-            uri = S3Uri(remote_list[key]['object_uri_str'])
1125
-            s3.object_delete(uri)
1126
-            output(u"deleted: '%s'" % uri)
1127
-
1128 1118
     def _single_process(local_list):
1129 1119
         for dest in destinations:
1130 1120
             ## Normalize URI to convert s3://bkt to s3://bkt/ (trailing slash)
... ...
@@ -1260,7 +1280,7 @@ def cmd_sync_local2remote(args):
1260 1260
             cfg.delete_removed = False
1261 1261
 
1262 1262
         if cfg.delete_removed and not cfg.delete_after:
1263
-            _do_deletes(s3, remote_list)
1263
+            subcmd_batch_del(remote_list = remote_list)
1264 1264
 
1265 1265
         total_size = 0
1266 1266
         total_elapsed = 0.0
... ...
@@ -1276,7 +1296,7 @@ def cmd_sync_local2remote(args):
1276 1276
         n, total_size = _upload(failed_copy_files, n, failed_copy_count, total_size)
1277 1277
 
1278 1278
         if cfg.delete_removed and cfg.delete_after:
1279
-            _do_deletes(s3, remote_list)
1279
+            subcmd_batch_del(remote_list = remote_list)
1280 1280
         total_elapsed = time.time() - timestamp_start
1281 1281
         total_speed = total_elapsed and total_size/total_elapsed or 0.0
1282 1282
         speed_fmt = formatSize(total_speed, human_readable = True, floating_point = True)