import base64
import collections
import uuid
try:
import ujson as json
except ImportError:
import json
Message = collections.namedtuple(
"Message",
["status_code", "data", "message", "metadata"])
[docs]class Connection(object):
"""This class encapsulates a connection to the Gremlin Server.
Don't directly create `Connection` instances. Use
:py:meth:`gremlinclient.graph.GraphDatabase.connect` or
:py:func:`gremlinclient.api.create_connection` instead.
:param tornado.websocket.WebSocketClientConnection conn: client
websocket connection.
:param float timeout: timeout for establishing connection (optional).
Values ``0`` or ``None`` mean no timeout
:param str username: Username for SASL auth
:param str password: Password for SASL auth
:param loop: If param is ``None``, `tornado.ioloop.IOLoop.current`
is used for getting default event loop (optional)
:param bool force_close: force connection to close after read.
:param class future_class: type of Future -
:py:class:`asyncio.Future`, :py:class:`trollius.Future`, or
:py:class:`tornado.concurrent.Future`
:param gremlinclient.pool.Pool pool: Connection pool. None by default
:param bool force_release: If possible, force release to pool after read.
:param str session: Session id (optional). Typically a uuid
"""
def __init__(self, conn, future_class, timeout=None, username="",
password="", loop=None, force_close=False,
pool=None, force_release=False, session=None):
self._conn = conn
self._future_class = future_class
self._closed = False
self._session = session
self._timeout = timeout
self._username = username
self._password = password
self._force_close = force_close
self._pool = pool
self._loop = loop
if not self._pool:
force_release = False
self._force_release = force_release
[docs] def release(self):
"""Release connection to associated pool."""
if self._pool:
return self._pool.release(self)
@property
def conn(self):
"""Read only property for websocket connection.
:returns: :py:class:`tornado.websocket.WebSocketClientConnection`
"""
return self._conn
@property
def closed(self):
"""Readonly property. Return True if client has been closed
or client connection has been closed
:returns: bool
"""
return self._closed or self._conn.closed
[docs] def close(self):
"""Close the underlying websocket connection, detach from pool,
and set to close.
"""
self._closed = True
self._pool = None
return self._conn.close()
[docs] def send(self, gremlin, bindings=None, lang="gremlin-groovy",
aliases=None, op="eval", processor="", session=None,
timeout=None, handler=None, request_id=None):
"""
Send a script to the Gremlin Server.
:param str gremlin: Gremlin script to submit to server.
:param dict bindings: A mapping of bindings for Gremlin script.
:param str lang: Language of scripts submitted to the server.
"gremlin-groovy" by default
:param dict aliases: Rebind ``Graph`` and ``TraversalSource``
objects to different variable names in the current request
:param str op: Gremlin Server op argument. "eval" by default.
:param str processor: Gremlin Server processor argument. "" by default.
:param float timeout: timeout for establishing connection (optional).
Values ``0`` or ``None`` mean no timeout
:param str session: Session id (optional). Typically a uuid
:param loop: If param is ``None``, `tornado.ioloop.IOLoop.current`
is used for getting default event loop (optional)
:returns: :py:class:`gremlinclient.connection.Stream` object
"""
if session is None:
session = self._session
if timeout is None:
timeout = self._timeout
if aliases is None:
aliases = {}
message = self._prepare_message(gremlin,
bindings,
lang,
aliases,
op,
processor,
session,
request_id)
self.conn.send(message, binary=True)
return Stream(self,
session,
processor,
handler,
self._loop,
self._username,
self._password,
self._force_close,
self._force_release,
self._future_class)
def _prepare_message(self, gremlin, bindings, lang, aliases, op, processor,
session, request_id):
if request_id is None:
request_id = str(uuid.uuid4())
message = {
"requestId": request_id,
"op": op,
"processor": processor,
"args": {
"gremlin": gremlin,
"bindings": bindings,
"language": lang,
"aliases": aliases
}
}
message = self._finalize_message(message, processor, session)
return message
def _authenticate(self, username, password, processor, session):
auth = b"".join([b"\x00", username.encode("utf-8"),
b"\x00", password.encode("utf-8")])
message = {
"requestId": str(uuid.uuid4()),
"op": "authentication",
"processor": "",
"args": {
"sasl": base64.b64encode(auth).decode()
}
}
message = self._finalize_message(message, processor, session)
self.conn.send(message, binary=True)
def _finalize_message(self, message, processor, session):
if processor == "session":
if session is None:
raise RuntimeError("session processor requires a session id")
else:
message["args"].update({"session": session})
message = json.dumps(message)
return self._set_message_header(message, "application/json")
@staticmethod
def _set_message_header(message, mime_type):
if mime_type == "application/json":
mime_len = b"\x10"
mime_type = b"application/json"
else:
raise ValueError("Unknown mime type.")
return b"".join([mime_len, mime_type, message.encode("utf-8")])
[docs]class Session(Connection):
"""
Child of :py:class:`gremlinclient.connection.Connection` object
that is bound to a session that maintains state across messages with
the server. Don't directly create Connection instances. Use
:py:meth:`gremlinclient.graph.GraphDatabase.session` instead.
:param tornado.websocket.WebSocketClientConnection conn: client
websocket connection.
:param float timeout: timeout for establishing connection (optional).
Values ``0`` or ``None`` mean no timeout
:param str username: Username for SASL auth
:param str password: Password for SASL auth
:param loop: If param is ``None``, `tornado.ioloop.IOLoop.current`
is used for getting default event loop (optional)
:param bool force_close: force connection to close after read.
:param class future_class: type of Future -
:py:class:`asyncio.Future`, :py:class:`trollius.Future`, or
:py:class:`tornado.concurrent.Future`
:param gremlinclient.pool.Pool pool: Connection pool. None by default
:param bool force_release: If possible, force release to pool after read.
:param str session: Session id (optional). Typically a uuid
"""
def __init__(self, *args, **kwargs):
super(Session, self).__init__(*args, **kwargs)
if self._session is None:
self._session = str(uuid.uuid4())
[docs] def send(self, gremlin, bindings=None, lang="gremlin-groovy",
aliases=None, op="eval", timeout=None, handler=None):
"""
send a script to the Gremlin Server using sessions.
:param str gremlin: Gremlin script to submit to server.
:param dict bindings: A mapping of bindings for Gremlin script.
:param str lang: Language of scripts submitted to the server.
"gremlin-groovy" by default
:param dict aliases: Rebind ``Graph`` and ``TraversalSource``
objects to different variable names in the current request
:param str op: Gremlin Server op argument. "eval" by default.
:param float timeout: timeout for establishing connection (optional).
Values ``0`` or ``None`` mean no timeout
:param loop: If param is ``None``, `tornado.ioloop.IOLoop.current`
is used for getting default event loop (optional)
:returns: :py:class:`gremlinclient.connection.Stream` object
"""
return super(Session, self).send(gremlin,
bindings=bindings,
lang=lang,
aliases=aliases,
op=op,
timeout=timeout,
processor="session",
session=self._session,
handler=handler)
def _authenticate(self, username, password, processor, session):
super(Session, self)._authenticate(username,
password,
"session",
self._session)
[docs]class Stream(object):
"""
This object provides an interface for reading the response sent
by the Gremlin Server over the websocket connection. Don't directly
create stream instances, they should by returned by
:py:meth:`gremlinclient.connection.Connection.send` or
:py:meth:`gremlinclient.connection.Session.send`
:param gremlinclient.connection.Connection conn: client
websocket connection.
:param str session: Session id. Typically a uuid
:param str processor: Gremlin Server processor argument. "" by default.
:param loop: If param is ``None``, `tornado.ioloop.IOLoop.current`
is used for getting default event loop (optional)
:param str username: Username for SASL auth
:param str password: Password for SASL auth
:param bool force_close: force connection to close after read.
:param bool force_release: If possible, force release to pool after read.
:param class future_class: type of Future -
:py:class:`asyncio.Future`, :py:class:`trollius.Future`, or
:py:class:`tornado.concurrent.Future`
"""
def __init__(self, conn, session, processor, handler,
loop, username, password, force_close,
force_release, future_class):
self._conn = conn
self._session = session
self._processor = processor
self._closed = False
self._username = username
self._password = password
self._force_close = force_close
self._force_release = force_release
self._loop = loop
self._future_class = future_class or Future
self._handlers = []
if handler is not None:
self._handlers.append(handler)
[docs] def add_handler(self, handler):
self._handlers.append(handler)
[docs] def read(self):
"""
Read a message from the response stream.
:returns: Future -
:py:class:`asyncio.Future`, :py:class:`trollius.Future`, or
:py:class:`tornado.concurrent.Future`
"""
future = self._future_class()
if self._closed:
future.set_result(None)
elif self._conn.closed:
future.set_exception(RuntimeError("Connection has been closed"))
else:
try:
future = self._read(future)
except Exception as e:
future.set_exception(e)
return future
def _read(self, future):
def parser(f):
terminate = True
try:
result = f.result()
# result can be none if conn is closed...test that
except Exception as e:
future.set_exception(e)
else:
message = json.loads(result.decode("utf-8"))
message = Message(message["status"]["code"],
message["result"]["data"],
message["status"]["message"],
message["result"]["meta"])
status_code = message.status_code
if status_code in [200, 206, 204]:
try:
message = self._process(message)
except Exception as e:
exc = e
if self._force_close:
# throws error asyncio.Cancelled ...
future_close = self._conn.close()
future_close.add_done_callback(
lambda f: future.set_exception(exc))
elif self._force_release:
future_release = self._conn.release()
future_release.add_done_callback(
lambda f: future.set_exception(exc))
else:
future.set_exception(exc)
else:
if status_code == 206:
terminate = False
future.set_result(message)
elif self._force_close:
future_close = self._conn.close()
future_close.add_done_callback(
lambda f: future.set_result(message))
elif self._force_release:
future_release = self._conn.release()
future_release.add_done_callback(
lambda f: future.set_result(message))
else:
future.set_result(message)
elif status_code == 407:
terminate = False
try:
self._conn._authenticate(
self._username, self._password, self._processor,
self._session)
except Exception as e:
future.set_exception(e)
else:
future_read = self.read()
def cb(f):
try:
result = f.result()
except Exception as e:
future.set_exception(e)
else:
future.set_result(result)
future_read.add_done_callback(cb)
elif self._force_close:
future_close = self._conn.close()
future_close.add_done_callback(
lambda f: future.set_exception(
RuntimeError("{0} {1}".format(
message.status_code, message.message))))
elif self._force_release:
future_release = self._conn.release()
future_release.add_done_callback(
lambda f: future.set_exception(
RuntimeError("{0} {1}".format(
message.status_code, message.message))))
else:
future.set_exception(
RuntimeError("{0} {1}".format(
message.status_code, message.message)))
finally:
if terminate:
self._closed = True
self._conn = None
future_resp = self._conn.conn.receive(callback=parser)
return future
def _process(self, message):
if self._handlers:
message = message.data
for handler in self._handlers:
message = handler(message)
return message