+ def closereq(req):
+ respiter = current[req]
+ try:
+ if respiter is not None and hasattr(respiter, "close"):
+ respiter.close()
+ except:
+ log.error("exception occurred when closing iterator", exc_info=True)
+ try:
+ req.close()
+ except:
+ log.error("exception occurred when closing request", exc_info=True)
+ del current[req]
+ def ckiter(req):
+ respiter = current[req]
+ if respiter is not None:
+ rem = False
+ try:
+ data = respiter.next()
+ except StopIteration:
+ rem = True
+ try:
+ req.flushreq()
+ except:
+ log.error("exception occurred when handling response data", exc_info=True)
+ except:
+ rem = True
+ log.error("exception occurred when iterating response", exc_info=True)
+ if not rem:
+ if data:
+ try:
+ req.flushreq()
+ req.writedata(data)
+ except:
+ log.error("exception occurred when handling response data", exc_info=True)
+ rem = True
+ if rem:
+ current[req] = None
+ try:
+ if hasattr(respiter, "close"):
+ respiter.close()
+ except:
+ log.error("exception occurred when closing iterator", exc_info=True)
+ respiter = None
+ if respiter is None and not req.buffer:
+ closereq(req)
+
+ while True:
+ bufl = list(req for req in current.iterkeys() if req.buffer)
+ rls, wls, els = select.select([rp], bufl, [rp] + bufl)
+ if rp in rls:
+ ret = os.read(rp, 1024)
+ if not ret:
+ os.close(rp)
+ return
+ try:
+ while True:
+ req, respiter = self.cqueue.get(False)
+ current[req] = respiter
+ ckiter(req)
+ except Queue.Empty:
+ pass
+ for req in wls:
+ try:
+ req.flush()
+ except closed:
+ closereq(req)
+ except:
+ log.error("exception occurred when writing response", exc_info=True)
+ closereq(req)
+ else:
+ if len(req.buffer) < 65536:
+ ckiter(req)
+ except:
+ log.critical("unexpected exception occurred in response handler thread", exc_info=True)