nautilus.network.events.consumers package¶
Submodules¶
nautilus.network.events.consumers.actions module¶
nautilus.network.events.consumers.api module¶
-
class
nautilus.network.events.consumers.api.APIActionHandler[source]¶ Bases:
nautilus.network.events.consumers.actions.ActionHandlerThis action handler is used by the api service to build a schema of the underlying services as they announce their existence over the action system.
-
consumer_pattern= '(.*\\..*\\.(?!(pending)))|init|query'¶
-
nautilus.network.events.consumers.kafka module¶
-
class
nautilus.network.events.consumers.kafka.KafkaBroker[source]¶ Bases:
objectThis class handles two way communication with the kafka server. Also allows for a question/answer interface served over the kafka stream.
Parameters: - = None (consumer_pattern) –
- server (str) – The location of the kafka stream.
- consumer_channel (optional, str) – The channel to listen for events on.
- consumer_pattern (optional, regex) – A regex pattern to match against the action types. The action handler is called for every matching event. If none is provided, the action handler is called for every action.
- producer_channel (optional, str) – The default channel to user when producing events.
- initial_offset (optional, one of 'latest' or 'earliest') – Where to start on the event stream when run.
- loop (optional, ayncio.EventLoop) – The event loop that the broker should run on.
Example
from .kafka import KafkaBroker class ActionHandler(KafkaBroker): consumer_channel = 'myEvents' server = 'localhost:9092' async def handle_message(self, action_type, payload, **kwds): print("recieved action with type: {}".format(action_type)) print("and payload: {}".format(payload))
-
consumer_channel= None¶
-
consumer_pattern= None¶
-
initial_offset= 'latest'¶
-
loop= None¶
-
producer_channel= None¶
-
send(payload='', action_type='', channel=None, **kwds)[source]¶ This method sends a message over the kafka stream.
-
server= None¶