Source code for gremlinclient.pool

import collections
import sys
import textwrap

from logging import WARNING

from gremlinclient.graph import GraphDatabase
from gremlinclient.log import pool_logger


[docs]class Pool(object): """ Pool of :py:class:`gremlinclient.connection.Connection` objects. :param str url: url for Gremlin Server. :param float timeout: timeout for establishing connection (optional). Values ``0`` or ``None`` mean no timeout :param str username: Username for SASL auth :param str password: Password for SASL auth :param gremlinclient.graph.GraphDatabase graph: The graph instances used to create connections :param int maxsize: Maximum number of connections. :param loop: event loop :param bool validate_cert: validate ssl certificate. False by default :param class future_class: type of Future - :py:class:`asyncio.Future`, :py:class:`trollius.Future`, or :py:class:`tornado.concurrent.Future` """ def __init__(self, graph, maxsize=256, loop=None, force_release=False, future_class=None): self._graph = graph self._maxsize = maxsize self._pool = collections.deque() self._waiters = collections.deque() self._acquired = set() self._acquiring = 0 self._closed = False self._loop = loop self._force_release = force_release self._future_class = self._graph.future_class @property def freesize(self): """ Number of free connections :returns: int """ return len(self._pool) @property def size(self): """ Total number of connections :returns: int """ return len(self._acquired) + self._acquiring + self.freesize @property def maxsize(self): """ Maximum number of connections :returns: in """ return self._maxsize @property def graph(self): """ Associated graph instance used for creating connections :returns: :py:class:`gremlinclient.graph.GraphDatabase` """ return self._graph @property def pool(self): """ Object that stores unused connections :returns: :py:class:`collections.deque` """ return self._pool @property def closed(self): """ Check if pool has been closed :returns: bool """ return self._closed or self._graph is None @property def future_class(self): """ :return: :py:class:`type` Concrete class of the future instances created by this pool, for example :py:class: `asyncio.Future` """ return self._future_class
[docs] def acquire(self): # maybe have max connection open time here """ Acquire a connection from the Pool :returns: Future - :py:class:`asyncio.Future`, :py:class:`trollius.Future`, or :py:class:`tornado.concurrent.Future` """ future = self._future_class() if self._pool: while self._pool: conn = self._pool.popleft() if not conn.closed: pool_logger.debug("Reusing connection: {}".format(conn)) future.set_result(conn) self._acquired.add(conn) break else: pool_logger.debug( "Discarded closed connection: {}".format(conn)) elif self.size < self.maxsize: self._acquiring += 1 conn_future = self.graph.connect( force_release=self._force_release, pool=self) def cb(f): try: conn = f.result() except Exception as e: future.set_exception(e) else: pool_logger.debug("Got new connection {}".format(conn)) self._acquired.add(conn) future.set_result(conn) finally: self._acquiring -= 1 conn_future.add_done_callback(cb) else: pool_logger.debug( "Waiting for available conn on future: {}...".format(future)) self._waiters.append(future) return future
[docs] def release(self, conn): """ Release a connection back to the pool. :param gremlinclient.connection.Connection: The connection to be released """ future = self._future_class() if self.size <= self.maxsize: if conn.closed: # conn has been closed pool_logger.info( "Released closed connection: {}".format(conn)) self._acquired.remove(conn) conn = None elif self._waiters: waiter = self._waiters.popleft() waiter.set_result(conn) pool_logger.debug( "Completeing future with connection: {}".format(conn)) else: self._pool.append(conn) self._acquired.remove(conn) future.set_result(None) else: future_conn = conn.close() future_conn.add_done_callback( lambda f: future.set_result(f.result())) return future
[docs] def close(self): """ Close pool """ while self.pool: conn = self.pool.popleft() conn.close() while self._waiters: f = self._waiters.popleft() f.cancel() self._graph = None self._closed = True pool_logger.info( "Connection pool {} has been closed".format(self))
def __enter__(self): raise RuntimeError( "context manager should use some variation of yield/yield from") def __exit__(self, *args): pass # pragma: no cover