plugins.base_plugin
index
/Users/anthony/Development/opq/mauka/plugins/base_plugin.py

This module provides classes and base functionality for building OPQMauka plugins.

 
Modules
       
bson
config
json
log
mongo
multiprocessing
protobuf
signal
threading
typing
zmq

 
Classes
       
builtins.object
MaukaPlugin
json.encoder.JSONEncoder(builtins.object)
JSONEncoder

 
class JSONEncoder(json.encoder.JSONEncoder)
    JSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)
 
This class allows us to serialize items with ObjectIds to JSON
 
 
Method resolution order:
JSONEncoder
json.encoder.JSONEncoder
builtins.object

Methods defined here:
default(self, o)
If o is an object id, return the string of it, otherwise use the default encoder for this object
 
:param o: Object to serialize
:return: Serialized version of this object

Methods inherited from json.encoder.JSONEncoder:
__init__(self, *, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)
Constructor for JSONEncoder, with sensible defaults.
 
If skipkeys is false, then it is a TypeError to attempt
encoding of keys that are not str, int, float or None.  If
skipkeys is True, such items are simply skipped.
 
If ensure_ascii is true, the output is guaranteed to be str
objects with all incoming non-ASCII characters escaped.  If
ensure_ascii is false, the output can contain non-ASCII characters.
 
If check_circular is true, then lists, dicts, and custom encoded
objects will be checked for circular references during encoding to
prevent an infinite recursion (which would cause an OverflowError).
Otherwise, no such check takes place.
 
If allow_nan is true, then NaN, Infinity, and -Infinity will be
encoded as such.  This behavior is not JSON specification compliant,
but is consistent with most JavaScript based encoders and decoders.
Otherwise, it will be a ValueError to encode such floats.
 
If sort_keys is true, then the output of dictionaries will be
sorted by key; this is useful for regression tests to ensure
that JSON serializations can be compared on a day-to-day basis.
 
If indent is a non-negative integer, then JSON array
elements and object members will be pretty-printed with that
indent level.  An indent level of 0 will only insert newlines.
None is the most compact representation.
 
If specified, separators should be an (item_separator, key_separator)
tuple.  The default is (', ', ': ') if *indent* is ``None`` and
(',', ': ') otherwise.  To get the most compact JSON representation,
you should specify (',', ':') to eliminate whitespace.
 
If specified, default is a function that gets called for objects
that can't otherwise be serialized.  It should return a JSON encodable
version of the object or raise a ``TypeError``.
encode(self, o)
Return a JSON string representation of a Python data structure.
 
>>> from json.encoder import JSONEncoder
>>> JSONEncoder().encode({"foo": ["bar", "baz"]})
'{"foo": ["bar", "baz"]}'
iterencode(self, o, _one_shot=False)
Encode the given object and yield each string
representation as available.
 
For example::
 
    for chunk in JSONEncoder().iterencode(bigobject):
        mysocket.write(chunk)

Data descriptors inherited from json.encoder.JSONEncoder:
__dict__
dictionary for instance variables (if defined)
__weakref__
list of weak references to the object (if defined)

Data and other attributes inherited from json.encoder.JSONEncoder:
item_separator = ', '
key_separator = ': '

 
class MaukaPlugin(builtins.object)
    MaukaPlugin(conf: config.MaukaConfig, subscriptions: List[str], name: str, exit_event: <bound method BaseContext.Event of <multiprocessing.context.DefaultContext object at 0x10b3b6748>>)
 
This is the base MaukaPlugin class that provides easy access to the database and also provides publish/subscribe
semantics and distributed processing primitives.
 
  Methods defined here:
__init__(self, conf: config.MaukaConfig, subscriptions: List[str], name: str, exit_event: <bound method BaseContext.Event of <multiprocessing.context.DefaultContext object at 0x10b3b6748>>)
Initializes the base plugin
 
:param conf: Configuration dictionary
:param subscriptions: List of subscriptions this plugin should subscribe to
:param name: The name of this plugin
debug(self, msg: str)
Prints a debug message using this classes logger and formatted the plugin name.
:param msg: Message to print to debug.
get_mongo_client(self)
Returns an OPQ mongo client
 
:return: An OPQ mongo client
get_status(self) -> str
Return the status of this plugin
:return: The status of this plugin
handle_self_message(self, message: str)
Handles a self-message
 
:param message: The message to handle
is_self_message(self, topic: str) -> bool
Determines if this is a message directed at this plugin. I.e. the topic is the name of the plugin.
 
:param topic: Topic of the message
:return: If this is a self message or not
on_message(self, topic: str, mauka_message: mauka_pb2.MaukaMessage)
This gets called when a subscriber receives a message from a topic they are subscribed too.
 
This should be implemented in all subclasses.
 
:param topic: The topic this message is associated with
:param mauka_message: The message contents
produce(self, topic: str, mauka_message: mauka_pb2.MaukaMessage)
Produces a message with a given topic to the system
 
:param topic: The topic to produce this message to
:param mauka_message: The message to produce
run_plugin(self)
This is the run loop for this plugin process
start_heartbeat(self)
This is a recursive function that acts as a heartbeat.
 
This function calls itself over-and-over on a timer to produce heartbeat messages. The interval can be
configured is the configuration file.

Data descriptors defined here:
__dict__
dictionary for instance variables (if defined)
__weakref__
list of weak references to the object (if defined)

Data and other attributes defined here:
NAME = 'MaukaPlugin'

 
Functions
       
from_json(json_str: str) -> Dict
Deserialize json into dictionary
 
:param json_str: JSON string to deserialize
:return: Dictionary from json
run_plugin(plugin_class, conf: config.MaukaConfig)
Runs the given plugin using the given configuration dictionary
 
:param plugin_class: Name of the class of the plugin to be ran
:param conf: Configuration dictionary
to_json(obj: object) -> str
Serializes the given object to json
 
:param obj: The object to serialize
:return: JSON representation of object

 
Data
        logger = <Logger plugins.base_plugin (DEBUG)>