Source code for nautilus.network.events.consumers.kafka

# external imports
import asyncio
import uuid
import json
import re
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
# local imports
from nautilus.conventions.actions import serialize_action, hydrate_action



[docs]class KafkaBroker: """ This class handles two way communication with the kafka server. Also allows for a question/answer interface served over the kafka stream. Args: consumer_pattern = None server (str): The location of the kafka stream. consumer_channel (optional, str): The channel to listen for events on. consumer_pattern (optional, regex): A regex pattern to match against the action types. The action handler is called for every matching event. If none is provided, the action handler is called for every action. producer_channel (optional, str): The default channel to user when producing events. initial_offset (optional, one of 'latest' or 'earliest'): Where to start on the event stream when run. loop (optional, ayncio.EventLoop): The event loop that the broker should run on. Example: .. code-block:: python from .kafka import KafkaBroker class ActionHandler(KafkaBroker): consumer_channel = 'myEvents' server = 'localhost:9092' async def handle_message(self, action_type, payload, **kwds): print("recieved action with type: {}".format(action_type)) print("and payload: {}".format(payload)) """ loop = None server = None consumer_channel = None producer_channel = None initial_offset = 'latest' consumer_pattern = None def __init__(self): # a dictionary to keep the question/answer correlation ids self._request_handlers = {} self._pending_outbound = {} # if there is no loop assigned if not self.loop: # use the current one self.loop = asyncio.get_event_loop() # a placeholder for the event consumer task self._consumer_task = None # create a consumer instance self._consumer = AIOKafkaConsumer( self.consumer_channel, loop=self.loop, bootstrap_servers=self.server, auto_offset_reset=self.initial_offset ) self._producer = AIOKafkaProducer(loop=self.loop, bootstrap_servers=self.server)
[docs] def start(self): """ This function starts the brokers interaction with the kafka stream """ self.loop.run_until_complete(self._consumer.start()) self.loop.run_until_complete(self._producer.start()) self._consumer_task = self.loop.create_task(self._consume_event_callback())
[docs] def stop(self): """ This method stops the brokers interaction with the kafka stream """ self.loop.run_until_complete(self._consumer.stop()) self.loop.run_until_complete(self._producer.stop()) # attempt try: # to cancel the service self._consumer_task.cancel() # if there was no service except AttributeError: # keep going pass
[docs] async def send(self, payload='', action_type='', channel=None, **kwds): """ This method sends a message over the kafka stream. """ # use a custom channel if one was provided channel = channel or self.producer_channel # serialize the action type for the message = serialize_action(action_type=action_type, payload=payload, **kwds) # send the message return await self._producer.send(channel, message.encode())
[docs] async def ask(self, action_type, **kwds): # create a correlation id for the question correlation_id = uuid.uuid4() # make sure its unique while correlation_id in self._request_handlers: # create a new correlation id correlation_id = uuid.uuid4() # use the integer form of the uuid correlation_id = correlation_id.int # create a future to wait on before we ask the question question_future = asyncio.Future() # register the future's callback with the request handler self._request_handlers[correlation_id] = question_future.set_result # add the entry to the outbound dictionary self._pending_outbound[correlation_id] = action_type # publish the question await self.send( correlation_id=correlation_id, action_type=action_type, **kwds ) # return the response return await question_future
## internal implementations
[docs] async def handle_message(self, props, action_type=None, payload=None, **kwds): raise NotImplementedError()
async def _consume_event_callback(self): # continuously loop while True: # grab the next message msg = await self._consumer.getone() # parse the message as json message = hydrate_action(msg.value.decode()) # the correlation_id associated with this message correlation_id = message.get('correlation_id') # the action type of the message action_type = message['action_type'] # if there is a consumer pattern if self.consumer_pattern: # if the action_type does not satisfy the pattern if not re.match(self.consumer_pattern, message['action_type']): # don't do anything continue # if we know how to respond to this message if correlation_id and correlation_id in self._request_handlers \ and action_type != self._pending_outbound[correlation_id]: # pass the message to the handler self._request_handlers[correlation_id](message['payload']) # remove the entry in the handler dict del self._request_handlers[correlation_id] del self._pending_outbound[correlation_id] # otherwise there was no correlation id, pass it along to the general handlers else: # build the dictionary of message properties message_props = { 'correlation_id': correlation_id } # pass it to the handler await self.handle_message( props=message_props, **message )