# -*- coding: utf-8 -*- ## Amazon S3 manager ## Author: Michal Ludvig <michal@logix.cz> ## http://www.logix.cz/michal ## License: GPL Version 2 ## Copyright: TGRMN Software and contributors import sys import hmac import base64 import Config from logging import debug from Utils import encode_to_s3, time_to_epoch, deunicodise import datetime import urllib # hashlib backported to python 2.4 / 2.5 is not compatible with hmac! if sys.version_info[0] == 2 and sys.version_info[1] < 6: import sha as sha1 from Crypto.Hash import SHA256 as sha256 else: from hashlib import sha1, sha256 __all__ = [] ### AWS Version 2 signing def sign_string_v2(string_to_sign): """Sign a string with the secret key, returning base64 encoded results. By default the configured secret key is used, but may be overridden as an argument. Useful for REST authentication. See http://s3.amazonaws.com/doc/s3-developer-guide/RESTAuthentication.html """ signature = base64.encodestring(hmac.new(Config.Config().secret_key, string_to_sign, sha1).digest()).strip() return signature __all__.append("sign_string_v2") def sign_url_v2(url_to_sign, expiry): """Sign a URL in s3://bucket/object form with the given expiry time. The object will be accessible via the signed URL until the AWS key and secret are revoked or the expiry time is reached, even if the object is otherwise private. See: http://s3.amazonaws.com/doc/s3-developer-guide/RESTAuthentication.html """ return sign_url_base_v2( bucket = url_to_sign.bucket(), object = url_to_sign.object(), expiry = expiry ) __all__.append("sign_url_v2") def sign_url_base_v2(**parms): """Shared implementation of sign_url methods. Takes a hash of 'bucket', 'object' and 'expiry' as args.""" content_disposition=Config.Config().content_disposition content_type=Config.Config().content_type parms['expiry']=time_to_epoch(parms['expiry']) parms['access_key']=Config.Config().access_key parms['host_base']=Config.Config().host_base debug("Expiry interpreted as epoch time %s", parms['expiry']) signtext = 'GET\n\n\n%(expiry)d\n/%(bucket)s/%(object)s' % parms param_separator = '?' if content_disposition is not None: signtext += param_separator + 'response-content-disposition=' + content_disposition param_separator = '&' if content_type is not None: signtext += param_separator + 'response-content-type=' + content_type param_separator = '&' debug("Signing plaintext: %r", signtext) parms['sig'] = urllib.quote_plus(sign_string_v2(signtext)) debug("Urlencoded signature: %s", parms['sig']) url = "http://%(bucket)s.%(host_base)s/%(object)s?AWSAccessKeyId=%(access_key)s&Expires=%(expiry)d&Signature=%(sig)s" % parms if content_disposition is not None: url += "&response-content-disposition=" + urllib.quote_plus(content_disposition) if content_type is not None: url += "&response-content-type=" + urllib.quote_plus(content_type) return url def sign(key, msg): return hmac.new(key, encode_to_s3(msg), sha256).digest() def getSignatureKey(key, dateStamp, regionName, serviceName): kDate = sign(encode_to_s3('AWS4' + key), dateStamp) kRegion = sign(kDate, regionName) kService = sign(kRegion, serviceName) kSigning = sign(kService, 'aws4_request') return kSigning def sign_string_v4(method='GET', host='', canonical_uri='/', params={}, region='us-east-1', cur_headers={}, body=''): service = 's3' cfg = Config.Config() access_key = cfg.access_key secret_key = cfg.secret_key t = datetime.datetime.utcnow() amzdate = t.strftime('%Y%m%dT%H%M%SZ') datestamp = t.strftime('%Y%m%d') canonical_querystring = '&'.join(['%s=%s' % (urllib.quote_plus(p), quote_param(params[p])) for p in sorted(params.keys())]) splits = canonical_uri.split('?') canonical_uri = quote_param(splits[0], quote_backslashes=False) canonical_querystring += '&'.join([('%s' if '=' in qs else '%s=') % qs for qs in splits[1:]]) if type(body) == type(sha256('')): payload_hash = body.hexdigest() else: payload_hash = sha256(body).hexdigest() canonical_headers = {'host' : host, 'x-amz-content-sha256': payload_hash, 'x-amz-date' : amzdate } signed_headers = 'host;x-amz-content-sha256;x-amz-date' for header in cur_headers.keys(): # avoid duplicate headers and previous Authorization if header == 'Authorization' or header in signed_headers.split(';'): continue canonical_headers[header.strip()] = str(cur_headers[header]).strip() signed_headers += ';' + header.strip() # sort headers into a string canonical_headers_str = '' for k, v in sorted(canonical_headers.items()): canonical_headers_str += k + ":" + v + "\n" canonical_headers = canonical_headers_str debug(u"canonical_headers = %s" % canonical_headers) signed_headers = ';'.join(sorted(signed_headers.split(';'))) canonical_request = method + '\n' + canonical_uri + '\n' + canonical_querystring + '\n' + canonical_headers + '\n' + signed_headers + '\n' + payload_hash debug('Canonical Request:\n%s\n----------------------' % canonical_request) algorithm = 'AWS4-HMAC-SHA256' credential_scope = datestamp + '/' + region + '/' + service + '/' + 'aws4_request' string_to_sign = algorithm + '\n' + amzdate + '\n' + credential_scope + '\n' + sha256(canonical_request).hexdigest() signing_key = getSignatureKey(secret_key, datestamp, region, service) signature = hmac.new(signing_key, encode_to_s3(string_to_sign), sha256).hexdigest() authorization_header = algorithm + ' ' + 'Credential=' + access_key + '/' + credential_scope + ',' + 'SignedHeaders=' + signed_headers + ',' + 'Signature=' + signature headers = dict(cur_headers.items() + {'x-amz-date':amzdate, 'Authorization':authorization_header, 'x-amz-content-sha256': payload_hash}.items()) debug("signature-v4 headers: %s" % headers) return headers def quote_param(param, quote_backslashes=True): # As stated by Amazon the '/' in the filename should stay unquoted and %20 should be used for space instead of '+' quoted = urllib.quote_plus(urllib.unquote_plus(param), safe='~').replace('+', '%20') if not quote_backslashes: quoted = quoted.replace('%2F', '/') return quoted def checksum_sha256_file(filename, offset=0, size=None): try: hash = sha256() except: # fallback to Crypto SHA256 module hash = sha256.new() with open(deunicodise(filename),'rb') as f: if size is None: for chunk in iter(lambda: f.read(8192), b''): hash.update(chunk) else: f.seek(offset) size_left = size while size_left > 0: chunk = f.read(min(8192, size_left)) size_left -= len(chunk) hash.update(chunk) return hash def checksum_sha256_buffer(buffer, offset=0, size=None): try: hash = sha256() except: # fallback to Crypto SHA256 module hash = sha256.new() if size is None: hash.update(buffer) else: hash.update(buffer[offset:offset+size]) return hash