nautilus.network.events.consumers package

Submodules

nautilus.network.events.consumers.actions module

class nautilus.network.events.consumers.actions.ActionHandler[source]

Bases: nautilus.network.events.consumers.kafka.KafkaBroker

consumer_channel = 'actions'
handle_action(action_type, payload, props, **kwds)[source]
handle_message(**kwds)[source]
producer_channel = 'actions'
server = 'localhost:9092'

nautilus.network.events.consumers.api module

class nautilus.network.events.consumers.api.APIActionHandler[source]

Bases: nautilus.network.events.consumers.actions.ActionHandler

This action handler is used by the api service to build a schema of the underlying services as they announce their existence over the action system.

consumer_pattern = '(.*\\..*\\.(?!(pending)))|init|query'
handle_action(*args, **kwds)[source]

nautilus.network.events.consumers.kafka module

class nautilus.network.events.consumers.kafka.KafkaBroker[source]

Bases: object

This class handles two way communication with the kafka server. Also allows for a question/answer interface served over the kafka stream.

Parameters:
  • = None (consumer_pattern) –
  • server (str) – The location of the kafka stream.
  • consumer_channel (optional, str) – The channel to listen for events on.
  • consumer_pattern (optional, regex) – A regex pattern to match against the action types. The action handler is called for every matching event. If none is provided, the action handler is called for every action.
  • producer_channel (optional, str) – The default channel to user when producing events.
  • initial_offset (optional, one of 'latest' or 'earliest') – Where to start on the event stream when run.
  • loop (optional, ayncio.EventLoop) – The event loop that the broker should run on.

Example

from .kafka import KafkaBroker


class ActionHandler(KafkaBroker):

    consumer_channel = 'myEvents'
    server = 'localhost:9092'

    async def handle_message(self, action_type, payload, **kwds):
        print("recieved action with type: {}".format(action_type))
        print("and payload: {}".format(payload))
ask(action_type, **kwds)[source]
consumer_channel = None
consumer_pattern = None
handle_message(props, action_type=None, payload=None, **kwds)[source]
initial_offset = 'latest'
loop = None
producer_channel = None
send(payload='', action_type='', channel=None, **kwds)[source]

This method sends a message over the kafka stream.

server = None
start()[source]

This function starts the brokers interaction with the kafka stream

stop()[source]

This method stops the brokers interaction with the kafka stream

Module contents