gizmo 0.1.11 - DEPRECATED!!! WILL BE OFFICIALLY RETIRED UPON RELEASE OF GREMLIN SERVER 3.0.0 M9
PLEASE TRY aiogremlin INSTEAD
API BREAKING CHANGES HAVE OCCURRED BETWEEN 0.1.7 AND 0.1.9 - Full API documented below.
gizmo
is a Python 3 driver for the the TP3 Gremlin Server. This module is built on asyncio and websockets. gizmo
is currently in alpha mode, but all major functionality has test coverage.
Getting started
Since Python 3.4 is not the default version on many systems, it's nice to create a virtualenv that uses Python 3.4 by default. Then use pip to install gizmo
. Using virtualenvwrapper on Ubuntu 14.04:
$ mkvirtualenv -p /usr/bin/python3.4 gizmo
$ pip install gizmo
Fire up the Gremlin Server:
$ ./bin/gremlin-server.sh conf/gremlin-server-modern.yaml
Create and execute a task that submits a script to the Gremlin Server:
>>> from gizmo import AsyncGremlinClient
>>> gc = AsyncGremlinClient()
>>> task = gc.s("x + x", bindings={"x": 2}, consumer=lambda x: print(x[0] ** 2))
>>> task.execute()
16
The Basics - AsyncGremlinClient
The AsyncGremlinClient
uses asyncio
and websockets
to communicate asynchronously with the Gremlin Server. The client uses a combination asyncio.coroutineand asyncio.Task run on asyncio's pluggable event loop to achieve this communication.
At its most basic, the AsyncGremlinClient
sends and receives messages through a socket. The majority of AsyncGremlinClient
methods are an asyncio.coroutine
, so you will also need to use either asyncio
or the gizmo
Task API. The following examples use asyncio
to demonstrate the use of the AsyncioGremlineClient.
The Gremlin Server sends responses in chunks, so it is important to keep receiving messages until the AsyncGremlinClient.recv
returns None:
>>> import asyncio
>>> gc = AsyncGremlinClient('ws://localhost:8182/')
@asyncio.coroutine
def recv_coro(gc):
# send method returns the websocket used to submit the script.
websocket = yield from gc.send("g.V().has(n, val).values(n)",
bindings={"n": "name", "val": "gremlin"})
while True:
f = yield from gc.recv(websocket)
if f is None:
break
self.assertEqual(f[0], "gremlin")
>>> asyncio.get_event_loop()
>>> loop.run_until_complete(recv_coro(gc))
Message queue API
Sometimes you'll want to store the server results for later usage. To do so, AsyncGremlinClient
provides the submit
method, which allows you to submit a script to the server for evaluation, and then modify the responses on the fly as they come from the server before they are stored in AsyncGremlinClient.messages
, an asycio.Queue
. You can read messages off the queue using the read
method. Observe:
# This is applied to each message as it is received from the server.
>>> consumer = lambda x: x[0] ** 2
@asyncio.coroutine
def message_queue_coro(gc):
yield from gc.submit("2 + 2", consumer=consumer)
while True:
f = yield from gc.read()
if f is None:
break
assert(f == 16)
>>> loop.run_until_complete(message_queue_coro(gc))
# A consumer could also be a coroutine.
@asyncio.coroutine
def consumer_coro(x):
yield from asyncio.sleep(0)
return x[0] ** 2
@asyncio.coroutine
def coroutine_consumer_coro():
yield from gc.submit("2 + 2", consumer=consumer_coro)
# Access the messages queue directly.
while not gc.messages.empty():
f = yield from gc.read()
assert(f == 16)
>>> loop.run_until_complete(coroutine_consumer_coro(gc))
This is great if you are already using asyncio
, or another compatible framework like Tornado
. However, gizmo
also provides another way to interact with the server using its Task API.
Task API
The Task API provides a simple set of wrappers that allow you to easily manage the flow of the asynchronous websocket communication. It is loosely based on Python Celery's Canvas, albeit much simpler. Note - a gizmo.Task
mustn't be confused with an asyncio.Task
. Also similar to asyncio
, gizmo
provides a constructor function async
that returns a gizmo.Task
.
To get started, you can simply schedule a task by wrapping a coroutine. Then the task provide a method execute
that runs the asyncio
event loop:
# Here we will use the async constructor function to create a ``gizmo.Task`` inst.
>>> from gizmo import async
>>> task = async(gc.submit, "x + x", bindings={"x": 2}, consumer=consumer)
>>> task.execute()
16
Creating a task by wrapping the submit method is so common, there is a shortcut: AsyncGremlinClient.s
:
# In practice, you will rarely use gizmo.Task explicitly. AsyncGremlinClient.s and
# gizmo.async are the preferred way to create tasks.
>>> task = gc.s("x + x", bindings={"x": 2}, consumer=consumer)
>>> task.execute()
16
In order to design workflows by combining asynchronous tasks, gizmo
provides a series of classes that wrap a gizmo.Task
: gizmo.Group
, gizmo.Chain
, gizmo.Chord
. These classes are quite similar to their Celery counterparts. Note - all of the classes in the Task API inherit from gizmo.Task
and are therefore you may pass any task to any other task to compose complex workflows.
Group
gizmo.Group
allows you to group and then asynchronously execute tasks in parallel:
# These simulate slow tasks with various completion times.
@asyncio.coroutine
def consumer_coro1(x):
yield from asyncio.sleep(0.25)
return x[0] ** 0
def consumer_coro2(x):
yield from asyncio.sleep(0.50)
return x[0] ** 1
>>> t = gc.s("x + x", bindings={"x": 2}, consumer=lambda x : x[0] ** 2)
>>> slow = gc.s("x + x", bindings={"x": 2}, consumer=consumer_coro1)
>>> g = group(slow, t)
>>> g.execute()
>>> results = [m for m in gc]
>>> assert(results[0] == 16)
>>> assert(results[1] == 1)
Chain
gizmo.Chain
allows you to create a chain of tasks that execute in a synchronous fashion:
>>> t = gc.s("x + x", bindings={"x": 2}, consumer=lambda x : x[0] ** 2)
>>> slow = gc.s("x + x", bindings={"x": 2}, consumer=consumer_coro1)
>>> chain(slow, t).execute()
>>> results = [m for m in gc]
>>> assert(results[0] == 1)
>>> assert(results[1] == 16)
Chord
gizmo.Chord
allows you to asynchronously execute a group of tasks in parallel, with an added callback.
>>> slow1 = gc.s("x + x", bindings={"x": 2}, consumer=consumer_coro1)
>>> slow2 = gc.s("x + x", bindings={"x": 2}, consumer=consumer_coro2)
>>> t = gc.s("x + x", bindings={"x": 2}, consumer=lambda x : x[0] ** 2)
>>> chord([slow2, slow1], t).execute()
>>> results = [m for m in gc]
>>> assert(results[0] == 1)
>>> assert(results[1] == 4)
>>> assert(results[1] == 16)
A more complex example:
def simple_graph():
gc = AsyncGremlinClient()
# Define various tasks.
t = gc.s("g.V().remove(); g.E().remove();", collect=False)
t1 = gc.s("g.addVertex('uniqueId', x)", bindings={"x": "joe"},
collect=False)
t2 = gc.s("g.addVertex('uniqueId', x)", bindings={"x": "maria"},
collect=False)
t3 = gc.s("g.addVertex('uniqueId', x)", bindings={"x": "jill"},
collect=False)
t4 = gc.s("g.addVertex('uniqueId', x)", bindings={"x": "jack"},
collect=False)
t5 = gc.s("""
joe = g.V().has('uniqueId', 'joe').next();
itziri = g.V().has('uniqueId', 'itziri').next();
joe.addEdge('marriedTo', itziri);""")
t6 = gc.s("""
jill = g.V().has('uniqueId', 'jill').next();
jack = g.V().has('uniqueId', 'jack').next();
jill.addEdge('marriedTo', jack);""")
t7 = gc.s("""
jill = g.V().has('uniqueId', 'jill').next();
joe = g.V().has('uniqueId', 'joe').next();
jill.addEdge('hasSibling', joe);""")
t8 = gc.s("g.V();", consumer=lambda x: print(x))
t9 = gc.s("g.E();", consumer=lambda x: print(x))
t10 = gc.s("g.V().count();", consumer=lambda x: assert(x[0] == 4))
t11 = gc.s("g.E().count();", consumer=lambda x: assert(x[0] == 3))
# Define groups.
g1 = group(t1, t2, t3, t4)
g2 = group(t5, t6, t7)
# Use chain to control order of task execution.
c = chain(t, g1, g2, t8, t9, t10, t11, t)
c.execute()
>>> simple_graph()
[{'type': 'vertex', 'id': 17, 'label': 'vertex', 'uniqueId': ['maria']}, {'type': 'vertex', 'id': 11, 'label': 'vertex', 'uniqueId': ['jack']}, {'type': 'vertex', 'id': 13, 'label': 'vertex', 'uniqueId': ['joe']}, {'type': 'vertex', 'id': 15, 'label': 'vertex', 'uniqueId': ['jill']}]
[{'outVLabel': 'vertex', 'label': 'hasSibling', 'inV': 13, 'type': 'edge', 'id': 19, 'outV': 15, 'inVLabel': 'vertex'}, {'outVLabel': 'vertex', 'label': 'marriedTo', 'inV': 11, 'type': 'edge', 'id': 20, 'outV': 15, 'inVLabel': 'vertex'}, {'outVLabel': 'vertex', 'label': 'marriedTo', 'inV': 17, 'type': 'edge', 'id': 21, 'outV': 13, 'inVLabel': 'vertex'}]
[[{'outVLabel': 'vertex', 'label': 'hasSibling', 'inV': 13, 'type': 'edge', 'id': 19, 'outV': 15, 'inVLabel': 'vertex'}], [{'outVLabel': 'vertex', 'label': 'marriedTo', 'inV': 11, 'type': 'edge', 'id': 20, 'outV': 15, 'inVLabel': 'vertex'}], [{'outVLabel': 'vertex', 'label': 'marriedTo', 'inV': 17, 'type': 'edge', 'id': 21, 'outV': 13, 'inVLabel': 'vertex'}]]
GremlinResponse
gizmo
just barely parses the Gremlin Server response message by wrapping it in a GremlinResponse
object. This object inherits from a Python list
, and the content of the response is available using all normal list
methods, iteration etc. The GremlinResponse
also includes the metadata contained in the server response as properties.
asyncio with gizmo
As the above examples demonstrate, AsyncGremlinClient
is made to be interoperable with asyncio
. Really you can do just about anything. Here is an example that uses asyncio
to create synchronous communication with the Gremlin Server.
@asyncio.coroutine
def sleepy(gc):
yield from asyncio.sleep(0.25)
yield from gc.submit()"g.V().has(n, val).values(n)",
bindings={"n": "name", "val": "gremlin"})
# Define a coroutine that sequentially executes instructions.
@asyncio.coroutine
def client(gc):
yield from sleepy(gc)
yield from gc.submit("g.V().values(n)",
bindings={"n": "name"})
# Response messages sent by server are stored in an asyncio.Queue
while True:
f = yield from gc.messages.get()
if f is None:
break
print(f)
>>> gc = AsyncGremlinClient('ws://localhost:8182/')
>>> loop.run_until_complete(client(gc))
['gremlin']
['marko', 'vadas', 'lop', 'josh', 'ripple', 'peter']
gizmo with Titan
Tornado Interoperability Example
Use gizmo
with Tornado:
import asyncio
import json
from tornado import gen
from tornado.web import RequestHandler, Application, url
from tornado.platform.asyncio import AsyncIOMainLoop
from gizmo import AsyncGremlinClient
class GremlinHandler(RequestHandler):
@gen.coroutine
def get(self):
gc = AsyncGremlinClient(uri='ws://localhost:8182/')
yield from gc.submit("g.V().values(n)", bindings={"n": "name"})
while not gc.messages.empty():
message = yield from gc.messages.get()
message = json.dumps(message)
self.write(message)
def make_app():
return Application([
url(r"/", GremlinHandler),
])
def main():
app = make_app()
# Must create IOLoop before calling app.listen.
AsyncIOMainLoop().install()
app.listen(8888)
asyncio.get_event_loop().run_forever()
if __name__ == '__main__':
print("Starting server at http://localhost:8888/")
main()