Python Examples¶
Server-side¶
Sample server-side code:
from gofer.proxy import Agent
url = 'amqp://localhost'
address = 'test'
agent = Agent(url, address)
Define Agent-side¶
Sample agent-side code. This module is placed in /user/share/gofer/plugins/
along with a plugin
descriptor in /etc/gofer/plugins/
Plugin descriptor: /etc/gofer/plugins/plugin.conf
[main]
enabled=1
[messaging]
url=amqp://localhost
uuid=test
Code: /user/share/gofer/plugins/plugin.py
from gofer.decorators import remote
from gofer.rmi.model.FORK
from gofer.agent.plugin import Plugin
# (optional) access to the plugin descriptor
# which you can use to define custom sections/properties
plugin = Plugin.find(__name__)
class Dog:
@remote
def bark(self, words):
woof = cfg.dog.bark_noise
print '%s %s' % (woof, words)
return 'Yes master. I will bark because that is what dogs do.'
@remote(model=FORK)
def wag(self, n):
for i in range(0, n):
print 'wag'
return 'Yes master. I will wag my tail because that is what dogs do.'
The plugin may be loaded from the PYTHON path by specifying the plugin property in descriptor as follows:
[main]
enabled=1
plugin=application.agent.plugin.py
[messaging]
url=amqp://localhost
uuid=zoo
Synchronous Invocation¶
Sample of server code invoking synchronously methods (remotely) on the agent. This is the default behaviour and the timeout is 90 seconds by default.
Python¶
from gofer.proxy import Agent
agent = Agent('amqp://localhost', 'test')
# invoke methods on the agent (remotely)
dog = agent.Dog()
print dog.bark('hello')
print dog.wag(3)
print dog.bark('hello')
# methods that raise exceptions
try:
print dog.sit()
except Exception, e:
print repr(e)
try:
print dog.not_permitted()
except Exception, e:
print repr(e)
Using the CLI¶
$ gofer rmi -u 'amqp://localhost' -a test -t Dog.bark hello
$ gofer rmi -u 'amqp://localhost' -a test -t Dog.wag 3
$ gofer rmi -u 'amqp://localhost' -a test -t Dog.bark hello
Synchronous Invocation (specify timeout)¶
Sample of server code invoking synchronously methods (remotely) on the agent with a timeout of 180 seconds.
Python¶
from gofer.proxy import Agent
amqp://localhost
agent = Agent('amqp://localhost', 'test', wait=180) # specify timeout
# invoke methods on the agent (remotely)
dog = agent.Dog()
dog.bark('hello')
dog.wag(3)
dog.bark('hello')
Using the CLI¶
$ gofer rmi -u 'amqp://localhost' -a test -w 180 -t Dog.bark hello
$ gofer rmi -u 'amqp://localhost' -a test -w 180 -t Dog.wag 3
$ gofer rmi -u 'amqp://localhost' -a test -w 180 -t Dog.bark hello
Asynchronous (fire & forget) Invocation¶
Sample of server code invoking synchronously methods (remotely) on the agent. This works the same for asynchronous fire-and-forget where not reply is wanted. Asynchronous invocation returns the serial number of the request.
Python¶
from gofer.proxy import Agent
# create an agent where user data = 'task_id'
agent = Agent('amqp://localhost', 'test', wait=0)
# invoke methods on the agent (remotely)
dog = agent.Dog()
dog.bark('hello')
dog.wag(3)
print dog.bark('hello')
'e688f50b-3108-43dd-9a57-813f434749a8'
# methods that raise exceptions
try:
print dog.sit()
except Exception, e:
print repr(e)
try:
print dog.not_permitted()
except Exception, e:
print repr(e)
Using the CLI¶
$ gofer rmi -u 'amqp://localhost' -a test -w 0 -t Dog.bark hello
e688f50b-3108-43dd-9a57-813f434749a8
Asynchronous (callback) Invocation¶
Sample of server code invoking asynchronously methods (remotely) on the agent. The is the callback form of asynchronous invocation. This example uses a Listener class. But, the listener can also be any callable. Asynchronous invocation returns the serial number of the request to be used by the caller to further correlate request & response.
Python¶
from gofer.proxy import Agent
from gofer.messaging.async import ReplyConsumer
# specify a reply address to be used for asynchronous responses.
reply_to = 'tasks'
# create my listener class
class Listener:
"""
An asynchronous operation callback listener.
"""
def succeeded(self, reply):
"""
Async request succeeded.
:param reply: The reply data.
:type reply: Succeeded.
"""
pass
def failed(self, reply):
"""
Async request failed (raised an exception).
:param reply: The reply data.
:type reply: Failed.
"""
pass
def accepted(self, reply):
"""
Async request has been accepted.
:param reply: The request.
:type reply: Accepted.
"""
pass
def rejected(self, reply):
"""
Async request has been rejected.
:param reply: The request.
:type reply: Accepted.
"""
pass
def started(self, reply):
"""
Async request has started.
:param reply: The request.
:type reply: Started.
"""
pass
def progress(self, reply):
"""
Async progress report.
:param reply: The request.
:type reply: Progress.
"""
pass
# create my reply consumer using the reply to and my listener
reader = ReplyConsumer(reply_to)
reader.start(Listener())
# create an agent where user data is {'task_id': 1234} and
# setup for asynchronous invocation with my reply address.
agent = Agent('amqp://localhost', 'test', reply=reply_to)
# invoke methods on the agent (remotely)
dog = agent.Dog()
dog.bark('hello')
dog.wag(3)
print dog.bark('hello')
'e688f50b-3108-43dd-9a57-813f434749a8'
Same asynchronous example except specify a callable as the listener. Also, it uses the throw() method on reply.
# specify a reply address to be used for responses.
reply_to = 'tasks'
# create my listener
def callback(reply):
try:
reply.throw()
...
print reply.retval # succeeded, do something with return value.
...
except Exception, ex:
# handle general exception
pass
# create my reply consumer using the reply address and my callback
reader = ReplyConsumer(reply_to)
reader.start(callback)
...
Using the CLI¶
Note: -r tasks
$ gofer rmi -u 'amqp://localhost' -a test -w 0 -r tasks -t Dog.bark hello
e688f50b-3108-43dd-9a57-813f434749a8
Class Constructor Arguments¶
Classes defined in the agent can have constructor arguments. Though, remember, an instance is constructed for each request so remote objects are stateless. The stub provides for passing __init__() arguments by calling the stub.
Examples:
In the plugin:
class Dog:
def __init__(self, name, age=1):
self.name = name
self.age = age
@remote
def bark(self):
pass
@remote
def wag():
pass
Calling:
...
dog = agent.Dog() # stub constructor, pass gofer options here.
dog('rover', age=10) # constructor arguments set here.
dog.bark('hello')
dog.wag()
# change the constructor arguments and call something else.
dog('max', age=5) # changing constructor arguments.
dog.bark('howdy')
Subsequent calls simply update the constructor arguments.
This:
dog('rover', age=10)
equals this (in the agent):
dog = Dog('rover', age=10)
Security¶
When remote methods or functions are decorated to require a shared secret for request authentication, it must be passed as an option.
Example:
from gofer.proxy import Agent
from gofer.messaging.dispatcher import NotAuthorized
agent = Agent('amqp://localhost', 'test', secret='mycathas9lives')
# invoke methods on the agent (remotely)
dog = agent.Dog()
try:
dog.bark('secure hello')
except NotAuthorized:
log.error('wrong secret')
Progress Reporting¶
In gofer 0.72+ remote method progress can be reported by plugins. In the case of synchronous RMI, the caller can specify a callback for progress reporting by specifying the progress option. The callback must take a single (dict) parameter (report).
The report has the following keys:
- sn - serial number
- any - user data
- total - the number total units
- completed - the number of completed units
- details - arbitrary details
For asynchronous RMI, the listener is called with progress reports.
Examples:
from gofer.proxy import Agent
def progress_reported(report)
pass
agent = Agent('amqp://localhost', 'test')
dog = agent.Dog(progress=progress_reported)
dog.bark('howdy')
On the agent, plugins report progress from with a method by using the Progress object defined within the current call Context.
Example:
from gofer.agent.rmi import Context
from gofer.decorators import remote
class MyClass:
@remote
def foo(self):
"""
Do something reports progress
"""
total = 10
# get the call context
ctx = Context.current()
ctx.progress.total = total
# demo reporting progress for 10 units
for n in range(0, total):
ctx.progress.completed += 1
sleep(1)
@remote
def bar(self):
"""
Do something reports progress with details.
"""
total = 10
# get the call context
ctx = Context.current()
ctx.progress.total = total
# demo reporting progress for 10 units
for n in range(0, total):
ctx.progress.completed += 1
ctx.progress.details='for: %d' % n)
sleep(1)
Testing¶
Logs¶
After adding/updating classes or methods in myplugin.py, you’ll want to test them. First, ensure the
plugin is still loading properly. The easiest way to do this is by examining the gofer log file
at: /var/log/gofer/agent
. At start up, you should see something like:
2010-11-08 08:49:04,491 [WARNING][MainThread] __mangled() @ plugin.py:122 - "pulp" found in python-path
2010-11-08 08:49:04,503 [INFO][MainThread] __mangled() @ plugin.py:123 - "pulp" mangled to avoid collisions
2010-11-08 08:49:04,909 [INFO][MainThread] __import() @ plugin.py:103 - plugin "pulp", imported as: "pulp_plugin"
Either the gofer log or the pulp client.log may be examined to verify that Actions are running as expected.
Interactive Shell¶
Testing added/updated remote methods, can be done easily using an interactive python (shell). Be sure your changes to the pulp plugin have been picked up by Gofer by restarting goferd. Let’s say you added a new class named “Foo” that has a remote method named … you guessed it: “bar”.
You can test your new stuff as follows:
[jortel@localhost pulp]$ python
Python 2.6.2 (r262:71600, Jun 4 2010, 18:28:04)
[GCC 4.4.3 20100127 (Red Hat 4.4.3-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from gofer.proxy import Agent
>>> uuid = <your consumer ID>
>>> agent = Agent('amqp://localhost', uuid)
>>> foo = agent.Foo()
>>> print foo.bar()
Or, using the proxy module API:
[jortel@localhost pulp]$ python
Python 2.6.2 (r262:71600, Jun 4 2010, 18:28:04)
[GCC 4.4.3 20100127 (Red Hat 4.4.3-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from gofer import proxy
>>> uuid = <your consumer ID>
>>> agent = proxy.agent('amqp://localhost', uuid)
>>> foo = agent.Foo()
>>> print foo.bar()
Admin.help()¶
Another useful tool, it invoke Admin.help() from within interactive python as follows:
[jortel@localhost pulp]$ python
Python 2.6.2 (r262:71600, Jun 4 2010, 18:28:04)
[GCC 4.4.3 20100127 (Red Hat 4.4.3-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from pulp.server.agent import Agent
>>> uuid = <your consumer ID>
>>> agent = Agent('amqp://localhost', uuid)
>>> admin = agent.Admin()
>>> print admin.help()
Plugins:
demo
pulp [pulp_admin]
Actions:
demo.TestAction 0:10:00
Methods:
custom.Dog.bark()
custom.Dog.wag()
demo.Admin.hello()
demo.Admin.help()
demo.Shell.run()
Functions:
demo.echo()
>>>
Test Main¶
The test/main.py
module provides a good testing entry point that does not require the process owner
to be root.
Mocks¶
The gofer mock feature provides better testability. Essentially, it allows uses to test the server-side code that uses the gofer proxy. Instead of calling through to the remote agent, RMI calls can be mocked-up.
Added 0.33.
The mock module provides an API for registering custom stub mocks.
Items that can be registered with mock.register():
- instance (object)
- class
- module
Example:
from gofer.messaging import mock
mock.install()
from gofer.proxy import Agent
agent = Agent('xyz')
# define mock impl for testing
class Dog:
def bark(self, msg):
return 'mock Dog, called with: [%s]' % msg
# register our mock class
mock.register(Dog=Dog)
# call bark()
dog = agent.Dog()
print dog.bark('hello')
'mock Dog, called with: [hello]'
print dog.bark('world')
'mock Dog, called with: [world]'
#
# now, let look at the call history
#
h = dog.bark.history()
print h
'[("hello",),{}), ("world",),{})]'
# get last call
last = h[-1]
# look at the passed args
print last.args[0]
'world'
# look at the keyword args
print last.kwargs
'{}'
It’s very important to note the difference between registering a class (as a stub) and an instance (as a stub). In short, nstances are shared across all mock agents and classes are associated to the instance of the mock agent that created them. That way, call history is scoped to mock agent as well.
In some cases, it’s useful to have a stub method raise an exception. Here’s how it’s done:
from gofer.messaging import mock
mock.install()
from gofer.proxy import Agent
agent = Agent('amqp://localhost', 'xyz')
# define mock impl for testing
class Dog:
def bark(self, msg):
return 'mock Dog, called with: [%s]' % msg
# register our mock class
mock.register(Dog=Dog)
dog = agent.Dog()
# call bark() normally
print dog.bark('hello')
# now, let's have it raise an exception
dog.bark.push(Exception('no more barking'))
try:
dog.bark('hello')
except Exception, e:
print e
'"no more barking'"