Browse code

Reuse HTTP connections

Revived very old approach from v0.9.9-speedup branch (149226b)
TODO: Proxies not supported!

Michal Ludvig authored on 2013/03/10 19:27:52
Showing 2 changed files
1 1
new file mode 100644
... ...
@@ -0,0 +1,57 @@
0
+import httplib
1
+from urlparse import urlparse
2
+from threading import Semaphore
3
+from logging import debug, info, warning, error
4
+
5
+from Config import Config
6
+
7
+__all__ = [ "ConnMan" ]
8
+
9
+class http_connection(object):
10
+    def __init__(self, id, hostname, ssl):
11
+        self.hostname = hostname
12
+        self.ssl = ssl
13
+        self.id = id
14
+        self.counter = 0
15
+        if not ssl:
16
+            self.c = httplib.HTTPConnection(hostname)
17
+        else:
18
+            self.c = httplib.HTTPSConnection(hostname)
19
+
20
+class ConnMan(object):
21
+    conn_pool_sem = Semaphore()
22
+    conn_pool = {}
23
+    conn_max_counter = 800    ## AWS closes connection after some ~90 requests
24
+
25
+    @staticmethod
26
+    def get(hostname, ssl = None):
27
+        if ssl == None:
28
+            ssl = Config().use_https
29
+        conn = None
30
+        conn_id = "http%s://%s" % (ssl and "s" or "", hostname)
31
+        ConnMan.conn_pool_sem.acquire()
32
+        if not ConnMan.conn_pool.has_key(conn_id):
33
+            ConnMan.conn_pool[conn_id] = []
34
+        if len(ConnMan.conn_pool[conn_id]):
35
+            conn = ConnMan.conn_pool[conn_id].pop()
36
+            debug("ConnMan.get(): re-using connection: %s#%d" % (conn.id, conn.counter))
37
+        ConnMan.conn_pool_sem.release()
38
+        if not conn:
39
+            debug("ConnMan.get(): creating new connection: %s" % conn_id)
40
+            conn = http_connection(conn_id, hostname, ssl)
41
+            conn.c.connect()
42
+        conn.counter += 1
43
+        return conn
44
+
45
+    @staticmethod
46
+    def put(conn):
47
+        if conn.counter >= ConnMan.conn_max_counter:
48
+            conn.c.close()
49
+            debug("ConnMan.put(): closing over-used connection")
50
+            return
51
+
52
+        ConnMan.conn_pool_sem.acquire()
53
+        ConnMan.conn_pool[conn.id].append(conn)
54
+        ConnMan.conn_pool_sem.release()
55
+        debug("ConnMan.put(): connection put back to pool (%s#%d)" % (conn.id, conn.counter))
56
+
... ...
@@ -27,6 +27,7 @@ from Config import Config
27 27
 from Exceptions import *
28 28
 from MultiPart import MultiPartUpload
29 29
 from S3Uri import S3Uri
30
+from ConnMan import ConnMan
30 31
 
31 32
 try:
32 33
     import magic, gzip
... ...
@@ -668,18 +669,18 @@ class S3(object):
668 668
             # "Stringify" all headers
669 669
             for header in headers.keys():
670 670
                 headers[header] = str(headers[header])
671
-            conn = self.get_connection(resource['bucket'])
671
+            conn = ConnMan.get(self.get_hostname(resource['bucket']))
672 672
             uri = self.format_uri(resource)
673 673
             debug("Sending request method_string=%r, uri=%r, headers=%r, body=(%i bytes)" % (method_string, uri, headers, len(body or "")))
674
-            conn.request(method_string, uri, body, headers)
674
+            conn.c.request(method_string, uri, body, headers)
675 675
             response = {}
676
-            http_response = conn.getresponse()
676
+            http_response = conn.c.getresponse()
677 677
             response["status"] = http_response.status
678 678
             response["reason"] = http_response.reason
679 679
             response["headers"] = convertTupleListToDict(http_response.getheaders())
680 680
             response["data"] =  http_response.read()
681 681
             debug("Response: " + str(response))
682
-            conn.close()
682
+            ConnMan.put(conn)
683 683
         except Exception, e:
684 684
             if retries:
685 685
                 warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
... ...
@@ -722,12 +723,11 @@ class S3(object):
722 722
             info("Sending file '%s', please wait..." % file.name)
723 723
         timestamp_start = time.time()
724 724
         try:
725
-            conn = self.get_connection(resource['bucket'])
726
-            conn.connect()
727
-            conn.putrequest(method_string, self.format_uri(resource))
725
+            conn = ConnMan.get(self.get_hostname(resource['bucket']))
726
+            conn.c.putrequest(method_string, self.format_uri(resource))
728 727
             for header in headers.keys():
729
-                conn.putheader(header, str(headers[header]))
730
-            conn.endheaders()
728
+                conn.c.putheader(header, str(headers[header]))
729
+            conn.c.endheaders()
731 730
         except Exception, e:
732 731
             if self.config.progress_meter:
733 732
                 progress.done("failed")
... ...
@@ -750,7 +750,7 @@ class S3(object):
750 750
                 else:
751 751
                     data = buffer
752 752
                 md5_hash.update(data)
753
-                conn.send(data)
753
+                conn.c.send(data)
754 754
                 if self.config.progress_meter:
755 755
                     progress.update(delta_position = len(data))
756 756
                 size_left -= len(data)
... ...
@@ -758,13 +758,13 @@ class S3(object):
758 758
                     time.sleep(throttle)
759 759
             md5_computed = md5_hash.hexdigest()
760 760
             response = {}
761
-            http_response = conn.getresponse()
761
+            http_response = conn.c.getresponse()
762 762
             response["status"] = http_response.status
763 763
             response["reason"] = http_response.reason
764 764
             response["headers"] = convertTupleListToDict(http_response.getheaders())
765 765
             response["data"] = http_response.read()
766 766
             response["size"] = size_total
767
-            conn.close()
767
+            ConnMan.put(conn)
768 768
             debug(u"Response: %s" % response)
769 769
         except Exception, e:
770 770
             if self.config.progress_meter:
... ...
@@ -787,7 +787,7 @@ class S3(object):
787 787
         response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
788 788
 
789 789
         if self.config.progress_meter:
790
-            ## The above conn.close() takes some time -> update() progress meter
790
+            ## Finalising the upload takes some time -> update() progress meter
791 791
             ## to correct the average speed. Otherwise people will complain that
792 792
             ## 'progress' and response["speed"] are inconsistent ;-)
793 793
             progress.update()
... ...
@@ -862,17 +862,16 @@ class S3(object):
862 862
             info("Receiving file '%s', please wait..." % stream.name)
863 863
         timestamp_start = time.time()
864 864
         try:
865
-            conn = self.get_connection(resource['bucket'])
866
-            conn.connect()
867
-            conn.putrequest(method_string, self.format_uri(resource))
865
+            conn = ConnMan.get(self.get_hostname(resource['bucket']))
866
+            conn.c.putrequest(method_string, self.format_uri(resource))
868 867
             for header in headers.keys():
869
-                conn.putheader(header, str(headers[header]))
868
+                conn.c.putheader(header, str(headers[header]))
870 869
             if start_position > 0:
871 870
                 debug("Requesting Range: %d .. end" % start_position)
872
-                conn.putheader("Range", "bytes=%d-" % start_position)
873
-            conn.endheaders()
871
+                conn.c.putheader("Range", "bytes=%d-" % start_position)
872
+            conn.c.endheaders()
874 873
             response = {}
875
-            http_response = conn.getresponse()
874
+            http_response = conn.c.getresponse()
876 875
             response["status"] = http_response.status
877 876
             response["reason"] = http_response.reason
878 877
             response["headers"] = convertTupleListToDict(http_response.getheaders())
... ...
@@ -928,7 +927,7 @@ class S3(object):
928 928
                 ## Call progress meter from here...
929 929
                 if self.config.progress_meter:
930 930
                     progress.update(delta_position = len(data))
931
-            conn.close()
931
+            ConnMan.put(conn)
932 932
         except Exception, e:
933 933
             if self.config.progress_meter:
934 934
                 progress.done("failed")