Source code for dbpool.impl

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import queue
from typing import (
    NamedTuple,
    Optional,
)
import threading
import logging
import math

from mysql.connector import MySQLConnection

logger = logging.getLogger(__name__)


[docs]class PoolError(Exception): """Pool Error"""
[docs]class NoAvailableConnectionError(PoolError): """no available conneciton"""
[docs]class CreateConnectionError(PoolError): """When create new conneciton """
[docs]class TestConnectionError(PoolError): """test connection error"""
[docs]class PoolOption(NamedTuple): """ Arguments: min_idle (int): Hold min idle connection count. max_idle (int): Hold max idle connection count. max_age_in_sec (float): When a connection expired, reconnect. Default: 300.0 seconds check_idle_interval (float): Check idle thread run interval time. Default: 60.0 seconds """ min_idle: int max_idle: int max_age_in_sec: float = 300.0 check_idle_interval: float = 60.0 def check(self): if self.min_idle < 0: raise PoolError('min_idle should be greater than 0') if self.max_idle < self.min_idle: raise PoolError('max_idle shouble be granter than min_idle')
class ConnectionQueue(queue.Queue): def __init__(self, maxsize): super().__init__(maxsize) def __contains__(self, item) -> bool: with self.mutex: return item in self.queue def remove(self, item) -> Optional: with self.mutex: try: self.queue.remove(item) return item except ValueError: return None def free_all(self) -> int: with self.mutex: free_cnt = 0 for conn in self.queue: conn.discard_and_close() free_cnt += 1 self.queue.clear() return free_cnt
[docs]class PooledConnection(MySQLConnection): """ Inherit from ``mysql.connector.MySQLConnection``. Client should not create PooledConnection. Just call :py:meth:`dbpool.ConnectionPool.borrow_connection`. """ def __init__(self, *args, **kwargs): self._pool = None super().__init__(*args, **kwargs) self._last_connected = time.time() def set_pool(self, pool: 'ConnectionPool') -> None: self._pool = pool
[docs] def close(self) -> None: """ Return this connection to the pool. """ if not self._pool: super().close() else: self._pool.return_connection(self)
def is_max_age_expired(self) -> bool: age = time.time() - self._last_connected return age > self._pool.option.max_age_in_sec def force_reconnect(self) -> None: try: self.reconnect(attempts=3, delay=0.1) self._last_connected = time.time() except Exception as e: super().close() # 如何重连失败,则放弃这个连接 raise e def discard_and_close(self) -> None: self._pool = None super().close() def test(self) -> bool: try: self.ping(reconnect=True, attempts=3, delay=0.1) self._last_connected = time.time() return True except Exception: return False
[docs]class ConnectionPool: #pylint: disable=too-many-instance-attributes def __init__(self, option: PoolOption, **kwargs): option.check() self._option = option self._lock = threading.RLock() self._closed = False self._idle_cnt = 0 self._busy_cnt = 0 self._idle = ConnectionQueue(option.max_idle) self._busy = ConnectionQueue(option.max_idle) for pname in ('pool_name', 'pool_size'): if pname in kwargs: del kwargs[pname] self._mysql_settings = kwargs # 建立最小核心连接 with self._lock: count = self.option.min_idle for _ in range(count): self._idle.put(self._create_connection()) self._idle_cnt += 1 self._check_idle_event = threading.Event() thd = threading.Thread( target=self, daemon=True, name=f'Check idle thread, with option={option}', ) thd.start() self._check_idle_thread = thd @property def option(self) -> PoolOption: return self._option @property def idle_cnt(self) -> int: with self._lock: return self._idle_cnt @property def busy_cnt(self) -> int: with self._lock: return self._busy_cnt def __call__(self, *args, **kwargs): while not self._check_idle_event.is_set(): try: to = self.option.check_idle_interval if not self._check_idle_event.wait(timeout=to): self.check_idle() except Exception as e: logger.error(e) def is_closed(self) -> bool: with self._lock: return self._closed
[docs] def close(self): """ Close the :py:class:`ConnectionPool`. Free idle connections. """ with self._lock: self._closed = True self._check_idle_event.set() # 已经释放锁 time.sleep(0.5) with self._lock: self._idle_cnt -= self._idle.free_all()
[docs] def borrow_connection(self) -> PooledConnection: """ Borrow one connection from pool. Returns: PooledConnection: The available connection. Raises: TestConnectionError: Test ping got error. CreateConnectionError: Create new connection failed. PoolError: When pool is closed. NoAvailableConnectionError: The pool is busy fully. """ if self.is_closed(): raise PoolError('ConnectionPool is closed!') try: idle_conn = self._idle.get_nowait() except queue.Empty: idle_conn = None if idle_conn: with self._lock: self._idle_cnt -= 1 if idle_conn.is_max_age_expired(): idle_conn.force_reconnect() if not idle_conn.test(): idle_conn.discard_and_close() raise TestConnectionError() try: self._busy.put(idle_conn) with self._lock: self._busy_cnt += 1 return idle_conn except queue.Full as e: logger.error(e) raise e with self._lock: total_cnt = self._idle_cnt + self._busy_cnt if total_cnt >= self.option.max_idle: raise NoAvailableConnectionError('ConnectionPool is full!') try: new_conn = self._create_connection() except Exception as e: raise CreateConnectionError() from e if not new_conn.test(): new_conn.discard_and_close() raise TestConnectionError() try: self._busy.put(new_conn) self._busy_cnt += 1 except queue.Full as e: logger.error(e) return new_conn
def _create_connection(self) -> PooledConnection: config = self._mysql_settings conn = PooledConnection(**config) conn.set_pool(self) return conn def return_connection(self, conn: PooledConnection) -> None: removed_conn = self._busy.remove(conn) if not removed_conn: logger.warning('Connection has already been returned?') else: # 成功删除 with self._lock: self._busy_cnt -= 1 if self.is_closed(): conn.discard_and_close() return if conn.is_max_age_expired(): conn.force_reconnect() try: self._idle.put(conn) self._idle_cnt += 1 except queue.Full as e: logger.error(e) conn.discard_and_close() def check_idle(self) -> None: if self._idle.empty(): return min_idle = self.option.min_idle with self._lock: free_cnt = self._idle_cnt - min_idle # if _idle queue needs to reduce ,at least free one idle conneciton remove_cnt = math.ceil(free_cnt * 0.1) if remove_cnt <= 0: return logger.info(f'Check idle on {self}, remove_cnt={remove_cnt}') while remove_cnt > 0: try: idle_conn = self._idle.get(timeout=1.0) except queue.Empty: # now _idle queue is empty # just quit check_idle break else: with self._lock: idle_conn.discard_and_close() self._idle_cnt -= 1 if self._idle_cnt - min_idle <= 0: break remove_cnt -= 1 def __repr__(self) -> str: mysql_settings = self._mysql_settings.copy() mysql_settings['password'] = 'xxxxxxxxxxxx' with self._lock: return (f'<ConnectionPool(option={self.option},' f'mysql_settings={mysql_settings},' f'idle_cnt={self._idle_cnt},' f'idle_qsize={self._idle.qsize()},' f'busy_cnt={self._busy_cnt},' f'busy_qsize={self._busy.qsize()},' f'closed={self._closed}' f' at {hex(id(self))}>')