Browse code

Merge branch 'http-speedup'

Michal Ludvig authored on 2013/03/10 20:50:20
Showing 2 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,71 @@
0
+import httplib
1
+from urlparse import urlparse
2
+from threading import Semaphore
3
+from logging import debug, info, warning, error
4
+
5
+from Config import Config
6
+from Exceptions import ParameterError
7
+
8
+__all__ = [ "ConnMan" ]
9
+
10
+class http_connection(object):
11
+    def __init__(self, id, hostname, ssl, cfg):
12
+        self.hostname = hostname
13
+        self.ssl = ssl
14
+        self.id = id
15
+        self.counter = 0
16
+        if cfg.proxy_host != "":
17
+            self.c = httplib.HTTPConnection(cfg.proxy_host, cfg.proxy_port)
18
+        elif not ssl:
19
+            self.c = httplib.HTTPConnection(hostname)
20
+        else:
21
+            self.c = httplib.HTTPSConnection(hostname)
22
+
23
+class ConnMan(object):
24
+    conn_pool_sem = Semaphore()
25
+    conn_pool = {}
26
+    conn_max_counter = 800    ## AWS closes connection after some ~90 requests
27
+
28
+    @staticmethod
29
+    def get(hostname, ssl = None):
30
+        cfg = Config()
31
+        if ssl == None:
32
+            ssl = cfg.use_https
33
+        conn = None
34
+        if cfg.proxy_host != "":
35
+            if ssl:
36
+                raise ParameterError("use_ssl=True can't be used with proxy")
37
+            conn_id = "proxy://%s:%s" % (cfg.proxy_host, cfg.proxy_port)
38
+        else:
39
+            conn_id = "http%s://%s" % (ssl and "s" or "", hostname)
40
+        ConnMan.conn_pool_sem.acquire()
41
+        if not ConnMan.conn_pool.has_key(conn_id):
42
+            ConnMan.conn_pool[conn_id] = []
43
+        if len(ConnMan.conn_pool[conn_id]):
44
+            conn = ConnMan.conn_pool[conn_id].pop()
45
+            debug("ConnMan.get(): re-using connection: %s#%d" % (conn.id, conn.counter))
46
+        ConnMan.conn_pool_sem.release()
47
+        if not conn:
48
+            debug("ConnMan.get(): creating new connection: %s" % conn_id)
49
+            conn = http_connection(conn_id, hostname, ssl, cfg)
50
+            conn.c.connect()
51
+        conn.counter += 1
52
+        return conn
53
+
54
+    @staticmethod
55
+    def put(conn):
56
+        if conn.id.startswith("proxy://"):
57
+            conn.c.close()
58
+            debug("ConnMan.put(): closing proxy connection (keep-alive not yet supported)")
59
+            return
60
+
61
+        if conn.counter >= ConnMan.conn_max_counter:
62
+            conn.c.close()
63
+            debug("ConnMan.put(): closing over-used connection")
64
+            return
65
+
66
+        ConnMan.conn_pool_sem.acquire()
67
+        ConnMan.conn_pool[conn.id].append(conn)
68
+        ConnMan.conn_pool_sem.release()
69
+        debug("ConnMan.put(): connection put back to pool (%s#%d)" % (conn.id, conn.counter))
70
+
... ...
@@ -27,6 +27,7 @@ from Config import Config
27 27
 from Exceptions import *
28 28
 from MultiPart import MultiPartUpload
29 29
 from S3Uri import S3Uri
30
+from ConnMan import ConnMan
30 31
 
31 32
 try:
32 33
     import magic, gzip
... ...
@@ -190,15 +191,6 @@ class S3(object):
190 190
     def __init__(self, config):
191 191
         self.config = config
192 192
 
193
-    def get_connection(self, bucket):
194
-        if self.config.proxy_host != "":
195
-            return httplib.HTTPConnection(self.config.proxy_host, self.config.proxy_port)
196
-        else:
197
-            if self.config.use_https:
198
-                return httplib.HTTPSConnection(self.get_hostname(bucket))
199
-            else:
200
-                return httplib.HTTPConnection(self.get_hostname(bucket))
201
-
202 193
     def get_hostname(self, bucket):
203 194
         if bucket and check_bucket_name_dns_conformity(bucket):
204 195
             if self.redir_map.has_key(bucket):
... ...
@@ -246,10 +238,9 @@ class S3(object):
246 246
         truncated = True
247 247
         list = []
248 248
         prefixes = []
249
-        conn = self.get_connection(bucket)
250 249
 
251 250
         while truncated:
252
-            response = self.bucket_list_noparse(conn, bucket, prefix, recursive, uri_params)
251
+            response = self.bucket_list_noparse(bucket, prefix, recursive, uri_params)
253 252
             current_list = _get_contents(response["data"])
254 253
             current_prefixes = _get_common_prefixes(response["data"])
255 254
             truncated = _list_truncated(response["data"])
... ...
@@ -263,19 +254,17 @@ class S3(object):
263 263
             list += current_list
264 264
             prefixes += current_prefixes
265 265
 
266
-        conn.close()
267
-
268 266
         response['list'] = list
269 267
         response['common_prefixes'] = prefixes
270 268
         return response
271 269
 
272
-    def bucket_list_noparse(self, connection, bucket, prefix = None, recursive = None, uri_params = {}):
270
+    def bucket_list_noparse(self, bucket, prefix = None, recursive = None, uri_params = {}):
273 271
         if prefix:
274 272
             uri_params['prefix'] = self.urlencode_string(prefix)
275 273
         if not self.config.recursive and not recursive:
276 274
             uri_params['delimiter'] = "/"
277 275
         request = self.create_request("BUCKET_LIST", bucket = bucket, **uri_params)
278
-        response = self.send_request(request, conn = connection)
276
+        response = self.send_request(request)
279 277
         #debug(response)
280 278
         return response
281 279
 
... ...
@@ -662,7 +651,7 @@ class S3(object):
662 662
         # Wait a few seconds. The more it fails the more we wait.
663 663
         return (self._max_retries - retries + 1) * 3
664 664
 
665
-    def send_request(self, request, body = None, retries = _max_retries, conn = None):
665
+    def send_request(self, request, body = None, retries = _max_retries):
666 666
         method_string, resource, headers = request.get_triplet()
667 667
         debug("Processing request, please wait...")
668 668
         if not headers.has_key('content-length'):
... ...
@@ -671,25 +660,20 @@ class S3(object):
671 671
             # "Stringify" all headers
672 672
             for header in headers.keys():
673 673
                 headers[header] = str(headers[header])
674
-            if conn is None:
675
-                debug("Establishing connection")
676
-                conn = self.get_connection(resource['bucket'])
677
-                close_conn = True
678
-            else:
679
-                debug("Using existing connection")
680
-                close_conn = False
674
+            conn = ConnMan.get(self.get_hostname(resource['bucket']))
681 675
             uri = self.format_uri(resource)
682 676
             debug("Sending request method_string=%r, uri=%r, headers=%r, body=(%i bytes)" % (method_string, uri, headers, len(body or "")))
683
-            conn.request(method_string, uri, body, headers)
677
+            conn.c.request(method_string, uri, body, headers)
684 678
             response = {}
685
-            http_response = conn.getresponse()
679
+            http_response = conn.c.getresponse()
686 680
             response["status"] = http_response.status
687 681
             response["reason"] = http_response.reason
688 682
             response["headers"] = convertTupleListToDict(http_response.getheaders())
689 683
             response["data"] =  http_response.read()
690 684
             debug("Response: " + str(response))
691
-            if close_conn is True:
692
-                conn.close()
685
+            ConnMan.put(conn)
686
+        except ParameterError, e:
687
+            raise
693 688
         except Exception, e:
694 689
             if retries:
695 690
                 warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
... ...
@@ -732,12 +716,13 @@ class S3(object):
732 732
             info("Sending file '%s', please wait..." % file.name)
733 733
         timestamp_start = time.time()
734 734
         try:
735
-            conn = self.get_connection(resource['bucket'])
736
-            conn.connect()
737
-            conn.putrequest(method_string, self.format_uri(resource))
735
+            conn = ConnMan.get(self.get_hostname(resource['bucket']))
736
+            conn.c.putrequest(method_string, self.format_uri(resource))
738 737
             for header in headers.keys():
739
-                conn.putheader(header, str(headers[header]))
740
-            conn.endheaders()
738
+                conn.c.putheader(header, str(headers[header]))
739
+            conn.c.endheaders()
740
+        except ParameterError, e:
741
+            raise
741 742
         except Exception, e:
742 743
             if self.config.progress_meter:
743 744
                 progress.done("failed")
... ...
@@ -760,7 +745,7 @@ class S3(object):
760 760
                 else:
761 761
                     data = buffer
762 762
                 md5_hash.update(data)
763
-                conn.send(data)
763
+                conn.c.send(data)
764 764
                 if self.config.progress_meter:
765 765
                     progress.update(delta_position = len(data))
766 766
                 size_left -= len(data)
... ...
@@ -768,14 +753,16 @@ class S3(object):
768 768
                     time.sleep(throttle)
769 769
             md5_computed = md5_hash.hexdigest()
770 770
             response = {}
771
-            http_response = conn.getresponse()
771
+            http_response = conn.c.getresponse()
772 772
             response["status"] = http_response.status
773 773
             response["reason"] = http_response.reason
774 774
             response["headers"] = convertTupleListToDict(http_response.getheaders())
775 775
             response["data"] = http_response.read()
776 776
             response["size"] = size_total
777
-            conn.close()
777
+            ConnMan.put(conn)
778 778
             debug(u"Response: %s" % response)
779
+        except ParameterError, e:
780
+            raise
779 781
         except Exception, e:
780 782
             if self.config.progress_meter:
781 783
                 progress.done("failed")
... ...
@@ -797,7 +784,7 @@ class S3(object):
797 797
         response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
798 798
 
799 799
         if self.config.progress_meter:
800
-            ## The above conn.close() takes some time -> update() progress meter
800
+            ## Finalising the upload takes some time -> update() progress meter
801 801
             ## to correct the average speed. Otherwise people will complain that
802 802
             ## 'progress' and response["speed"] are inconsistent ;-)
803 803
             progress.update()
... ...
@@ -872,21 +859,22 @@ class S3(object):
872 872
             info("Receiving file '%s', please wait..." % stream.name)
873 873
         timestamp_start = time.time()
874 874
         try:
875
-            conn = self.get_connection(resource['bucket'])
876
-            conn.connect()
877
-            conn.putrequest(method_string, self.format_uri(resource))
875
+            conn = ConnMan.get(self.get_hostname(resource['bucket']))
876
+            conn.c.putrequest(method_string, self.format_uri(resource))
878 877
             for header in headers.keys():
879
-                conn.putheader(header, str(headers[header]))
878
+                conn.c.putheader(header, str(headers[header]))
880 879
             if start_position > 0:
881 880
                 debug("Requesting Range: %d .. end" % start_position)
882
-                conn.putheader("Range", "bytes=%d-" % start_position)
883
-            conn.endheaders()
881
+                conn.c.putheader("Range", "bytes=%d-" % start_position)
882
+            conn.c.endheaders()
884 883
             response = {}
885
-            http_response = conn.getresponse()
884
+            http_response = conn.c.getresponse()
886 885
             response["status"] = http_response.status
887 886
             response["reason"] = http_response.reason
888 887
             response["headers"] = convertTupleListToDict(http_response.getheaders())
889 888
             debug("Response: %s" % response)
889
+        except ParameterError, e:
890
+            raise
890 891
         except Exception, e:
891 892
             if self.config.progress_meter:
892 893
                 progress.done("failed")
... ...
@@ -938,7 +926,7 @@ class S3(object):
938 938
                 ## Call progress meter from here...
939 939
                 if self.config.progress_meter:
940 940
                     progress.update(delta_position = len(data))
941
-            conn.close()
941
+            ConnMan.put(conn)
942 942
         except Exception, e:
943 943
             if self.config.progress_meter:
944 944
                 progress.done("failed")