Browse code

refactor content_type() and add object_replace(), tests

Addresses issues #214, #409, and incorporates pull request #406.

content_type() was trying to do too many things in line, making it
hard to read. Split the part that gets the mime-type via reading a
file into a helper function. This lets the command-line-provided
--mime-type override file-detected methods.

Raise ParameterError if file is stdin and no --mime-type or
--default-mime-type has been provided.

Split object_replace() out from object_copy(). Yes, they are
similar. No, they're not identical. This lets us change the MIME
type of a file via 's3cmd modify'.

Add test cases to change the MIME type via 's3cmd modify'.

Matt Domsch authored on 2014/12/01 10:26:10
Showing 3 changed files
... ...
@@ -438,21 +438,33 @@ class S3(object):
438 438
         request =  self.create_request("BUCKET_CREATE", bucket = bucket, headers = headers, extra="?lifecycle")
439 439
         return (request, body)
440 440
 
441
-    def content_type(self, filename):
442
-        content_type = self.config.mime_type
441
+    def _guess_content_type(self, filename):
442
+        content_type = self.config.default_mime_type
443 443
         content_charset = None
444 444
 
445
-        if filename != "-" and not content_type and self.config.guess_mime_type:
445
+        if filename == "-" and not self.config.default_mime_type:
446
+            raise ParameterError("You must specify --mime-type or --default-mime-type for files uploaded from stdin.")
447
+
448
+        if self.config.guess_mime_type:
446 449
             if self.config.use_mime_magic:
447 450
                 (content_type, content_charset) = mime_magic(filename)
448 451
             else:
449 452
                 (content_type, content_charset) = mimetypes.guess_type(filename)
450 453
         if not content_type:
451 454
             content_type = self.config.default_mime_type
452
-        if not content_charset:
453
-            content_charset = self.config.encoding.upper()
455
+        return (content_type, content_charset)
456
+
457
+    def content_type(self, filename=None):
458
+        # explicit command line argument always wins
459
+        content_type = self.config.mime_type
460
+        content_charset = None
461
+
462
+        if not content_type:
463
+            (content_type, content_charset) = self._guess_content_type(filename)
454 464
 
455 465
         ## add charset to content type
466
+        if not content_charset:
467
+            content_charset = self.config.encoding.upper()
456 468
         if self.add_encoding(filename, content_type) and content_charset is not None:
457 469
             content_type = content_type + "; charset=" + content_charset
458 470
 
... ...
@@ -500,7 +512,7 @@ class S3(object):
500 500
             headers["x-amz-server-side-encryption"] = "AES256"
501 501
 
502 502
         ## MIME-type handling
503
-        headers["content-type"] = self.content_type(filename)
503
+        headers["content-type"] = self.content_type(filename=filename)
504 504
 
505 505
         ## Other Amazon S3 attributes
506 506
         if self.config.acl_public:
... ...
@@ -614,7 +626,6 @@ class S3(object):
614 614
             raise ValueError("Expected URI type 's3', got '%s'" % dst_uri.type)
615 615
         headers = SortedDict(ignore_case = True)
616 616
         headers['x-amz-copy-source'] = "/%s/%s" % (src_uri.bucket(), self.urlencode_string(src_uri.object()))
617
-        ## TODO: For now COPY, later maybe add a switch?
618 617
         headers['x-amz-metadata-directive'] = "COPY"
619 618
         if self.config.acl_public:
620 619
             headers["x-amz-acl"] = "public-read"
... ...
@@ -626,11 +637,33 @@ class S3(object):
626 626
             headers["x-amz-server-side-encryption"] = "AES256"
627 627
 
628 628
         if extra_headers:
629
-            headers['x-amz-metadata-directive'] = "REPLACE"
630 629
             headers.update(extra_headers)
631 630
 
632
-        filename = os.path.basename(str(src_uri))
633
-        headers["content-type"] = self.content_type(filename)
631
+        request = self.create_request("OBJECT_PUT", uri = dst_uri, headers = headers)
632
+        response = self.send_request(request)
633
+        return response
634
+
635
+    def object_replace(self, src_uri, dst_uri, extra_headers = None):
636
+        if src_uri.type != "s3":
637
+            raise ValueError("Expected URI type 's3', got '%s'" % src_uri.type)
638
+        if dst_uri.type != "s3":
639
+            raise ValueError("Expected URI type 's3', got '%s'" % dst_uri.type)
640
+        headers = SortedDict(ignore_case = True)
641
+        headers['x-amz-copy-source'] = "/%s/%s" % (src_uri.bucket(), self.urlencode_string(src_uri.object()))
642
+        headers['x-amz-metadata-directive'] = "REPLACE"
643
+        if self.config.acl_public:
644
+            headers["x-amz-acl"] = "public-read"
645
+        if self.config.reduced_redundancy:
646
+            headers["x-amz-storage-class"] = "REDUCED_REDUNDANCY"
647
+
648
+        ## Set server side encryption
649
+        if self.config.server_side_encryption:
650
+            headers["x-amz-server-side-encryption"] = "AES256"
651
+
652
+        if extra_headers:
653
+            headers.update(extra_headers)
654
+
655
+        headers["content-type"] = self.content_type()
634 656
 
635 657
         request = self.create_request("OBJECT_PUT", uri = dst_uri, headers = headers)
636 658
         response = self.send_request(request)
... ...
@@ -488,6 +488,22 @@ test_s3cmd("Verify ACL and MIME type", ['info', '%s/copy/etc2/Logo.PNG' % pbucke
488 488
                      "ACL:.*\*anon\*: READ",
489 489
                      "URL:.*http://%s.%s/copy/etc2/Logo.PNG" % (bucket(2), cfg.host_base) ])
490 490
 
491
+## ====== modify MIME type
492
+test_s3cmd("Modify MIME type", ['modify', '--acl-public', '--mime-type=binary/octet-stream', '%s/copy/etc2/Logo.PNG' % pbucket(2) ])
493
+
494
+test_s3cmd("Verify ACL and MIME type", ['info', '%s/copy/etc2/Logo.PNG' % pbucket(2) ],
495
+    must_find_re = [ "MIME type:.*binary/octet-stream",
496
+                     "ACL:.*\*anon\*: READ",
497
+                     "URL:.*http://%s.%s/copy/etc2/Logo.PNG" % (bucket(2), cfg.host_base) ])
498
+
499
+test_s3cmd("Modify MIME type back", ['modify', '--acl-public', '--mime-type=image/png', '%s/copy/etc2/Logo.PNG' % pbucket(2) ])
500
+
501
+test_s3cmd("Verify ACL and MIME type", ['info', '%s/copy/etc2/Logo.PNG' % pbucket(2) ],
502
+    must_find_re = [ "MIME type:.*image/png",
503
+                     "ACL:.*\*anon\*: READ",
504
+                     "URL:.*http://%s.%s/copy/etc2/Logo.PNG" % (bucket(2), cfg.host_base) ])
505
+
506
+
491 507
 ## ====== Rename within S3
492 508
 test_s3cmd("Rename within S3", ['mv', '%s/copy/etc2/Logo.PNG' % pbucket(2), '%s/copy/etc/logo.png' % pbucket(2)],
493 509
     must_find = [ 'File %s/copy/etc2/Logo.PNG moved to %s/copy/etc/logo.png' % (pbucket(2), pbucket(2))])
... ...
@@ -753,7 +753,7 @@ def cmd_cp(args):
753 753
 
754 754
 def cmd_modify(args):
755 755
     s3 = S3(Config())
756
-    return subcmd_cp_mv(args, s3.object_copy, "modify", u"File %(src)s modified")
756
+    return subcmd_cp_mv(args, s3.object_replace, "modify", u"File %(src)s modified")
757 757
 
758 758
 def cmd_mv(args):
759 759
     s3 = S3(Config())