Browse code

Ref #1114 - Add a time based expiration for idle pool connections in order to avoid random broken pipe errors

Also add a new "connection_max_age" config entry to define the maximum
idle time for a connection in the pool.

Florent Viard authored on 2020/06/26 09:18:39
Showing 2 changed files
... ...
@@ -218,6 +218,10 @@ class Config(object):
218 218
     throttle_max = 100
219 219
     public_url_use_https = False
220 220
     connection_pooling = True
221
+    # How long a connection can be kept idle in the pool and still be alive
222
+    # Currently, aws s3 closes connections that are idle for 20 seconds or
223
+    # longer, but 16s is used here by default to be safe.
224
+    connection_max_age = 16
221 225
 
222 226
     ## Creating a singleton
223 227
     def __new__(self, configfile = None, access_key=None, secret_key=None, access_token=None):
... ...
@@ -14,8 +14,10 @@ if sys.version_info >= (3, 0):
14 14
 else:
15 15
     from .Custom_httplib27 import httplib
16 16
 import ssl
17
-from threading import Semaphore
17
+
18 18
 from logging import debug
19
+from threading import Semaphore
20
+from time import time
19 21
 try:
20 22
     # python 3 support
21 23
     from urlparse import urlparse
... ...
@@ -238,18 +240,19 @@ class http_connection(object):
238 238
                 debug(u'proxied HTTPConnection(%s, %s)', cfg.proxy_host, cfg.proxy_port)
239 239
                 # No tunnel here for the moment
240 240
 
241
+        self.last_used_time = time()
241 242
 
242 243
 class ConnMan(object):
243 244
     _CS_REQ_SENT = httplib._CS_REQ_SENT
244 245
     CONTINUE = httplib.CONTINUE
245 246
     conn_pool_sem = Semaphore()
246 247
     conn_pool = {}
247
-    conn_max_counter = 800    ## AWS closes connection after some ~90 requests
248
+    conn_max_counter = 800  ## AWS closes connection after some ~90 requests
248 249
 
249 250
     @staticmethod
250
-    def get(hostname, ssl = None):
251
+    def get(hostname, ssl=None):
251 252
         cfg = Config()
252
-        if ssl == None:
253
+        if ssl is None:
253 254
             ssl = cfg.use_https
254 255
         conn = None
255 256
         if cfg.proxy_host != "":
... ...
@@ -261,9 +264,19 @@ class ConnMan(object):
261 261
         ConnMan.conn_pool_sem.acquire()
262 262
         if conn_id not in ConnMan.conn_pool:
263 263
             ConnMan.conn_pool[conn_id] = []
264
-        if len(ConnMan.conn_pool[conn_id]):
264
+        while ConnMan.conn_pool[conn_id]:
265 265
             conn = ConnMan.conn_pool[conn_id].pop()
266
-            debug("ConnMan.get(): re-using connection: %s#%d" % (conn.id, conn.counter))
266
+            cur_time = time()
267
+            if cur_time < conn.last_used_time + cfg.connection_max_age \
268
+               and cur_time >= conn.last_used_time:
269
+                debug("ConnMan.get(): re-using connection: %s#%d"
270
+                      % (conn.id, conn.counter))
271
+                break
272
+            # Conn is too old or wall clock went back in the past
273
+            debug("ConnMan.get(): closing expired connection")
274
+            ConnMan.close(conn)
275
+            conn = None
276
+
267 277
         ConnMan.conn_pool_sem.release()
268 278
         if not conn:
269 279
             debug("ConnMan.get(): creating new connection: %s" % conn_id)
... ...
@@ -278,7 +291,8 @@ class ConnMan(object):
278 278
     def put(conn):
279 279
         if conn.id.startswith("proxy://"):
280 280
             ConnMan.close(conn)
281
-            debug("ConnMan.put(): closing proxy connection (keep-alive not yet supported)")
281
+            debug("ConnMan.put(): closing proxy connection (keep-alive not yet"
282
+                  " supported)")
282 283
             return
283 284
 
284 285
         if conn.counter >= ConnMan.conn_max_counter:
... ...
@@ -292,10 +306,14 @@ class ConnMan(object):
292 292
             debug("ConnMan.put(): closing connection (connection pooling disabled)")
293 293
             return
294 294
 
295
+        # Update timestamp of conn to record when was its last use
296
+        conn.last_used_time = time()
297
+
295 298
         ConnMan.conn_pool_sem.acquire()
296 299
         ConnMan.conn_pool[conn.id].append(conn)
297 300
         ConnMan.conn_pool_sem.release()
298
-        debug("ConnMan.put(): connection put back to pool (%s#%d)" % (conn.id, conn.counter))
301
+        debug("ConnMan.put(): connection put back to pool (%s#%d)"
302
+              % (conn.id, conn.counter))
299 303
 
300 304
     @staticmethod
301 305
     def close(conn):