# external imports
import aiohttp_cors
from collections.abc import Callable
from collections import defaultdict
import json
import functools
# local imports
from nautilus.auth.models import UserPassword
import nautilus.database
import nautilus.api.endpoints.requestHandlers.apiQuery as api_query
import nautilus.network.events.consumers.api as api_handler
from nautilus.conventions.services import api_gateway_name
from nautilus.conventions.actions import roll_call_type
from nautilus.conventions.actions import get_crud_action
from nautilus.conventions.api import root_query
from nautilus.auth.util import generate_session_token, read_session_token
from nautilus.api.endpoints import static_dir as api_endpoint_static
from nautilus.api.util import query_for_model, arg_string_from_dict
from .service import Service
from nautilus.api.util import GraphEntity
from nautilus.api.util import parse_string
from nautilus.api.endpoints import (
GraphiQLRequestHandler,
GraphQLRequestHandler
)
[docs]class APIGateway(Service):
"""
This provides a single endpoint that other services and clients can
use to query the cloud without worrying about the distributed nature
of the system.
Example:
.. code-block:: python
# external imports
import nautilus
# local imports
from .schema import schema
class MyAPIGateway(nautilus.APIGateway):
schema = schema
"""
name = api_gateway_name()
model = UserPassword
api_request_handler_class = api_query.APIQueryHandler
action_handler = api_handler.APIActionHandler
_external_service_data = defaultdict(list)
secret_key = None
def __init__(self, *args, **kwds):
# bubble up
super().__init__(*args, **kwds)
# attach this service to the action handler
self.action_handler.service = self
# do any sort of database setup
self.init_db()
# make sure there is a valid secret key
[docs] def init_db(self):
"""
This function configures the database used for models to make
the configuration parameters.
"""
# get the database url from the configuration
db_url = self.config.get('database_url', 'sqlite:///passwords.db')
# configure the nautilus database to the url
nautilus.database.init_db(db_url)
# when its time for the service to announce itself
[docs] async def announce(self):
# bubble up
await super().announce()
# ask for rollcall aswell
await self.event_broker.send(
action_type=roll_call_type(),
payload='please report yourself'
)
@property
def auth_criteria(self):
"""
This attribute provides the mapping of services to their auth requirement
Returns:
(dict) : the mapping from services to their auth requirements.
"""
# the dictionary we will return
auth = {}
# go over each attribute of the service
for attr in dir(self):
# make sure we could hit an infinite loop
if attr != 'auth_criteria':
# get the actual attribute
attribute = getattr(self, attr)
# if the service represents an auth criteria
if isinstance(attribute, Callable) and hasattr(attribute, '_service_auth'):
# add the criteria to the final results
auth[getattr(self, attr)._service_auth] = attribute
# return the auth mapping
return auth
[docs] def init_routes(self):
# add the cors handler
self.cors = aiohttp_cors.setup(self.app)
# for each route that was registered
for route in self._routes:
# add the corresponding http endpoint
self.add_http_endpoint(**route)
# add the schema reference to graphql handler
self.api_request_handler_class.service = self
# add a cors resource
api_resource = self.cors.add(self.app.router.add_resource("/"))
# add the root api handler
self.cors.add(
api_resource.add_route("GET", self.api_request_handler_class),
{
"": aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers=("X-Custom-Server-Header",),
allow_headers=("X-Requested-With", "Content-Type"),
max_age=3600,
)
}
)
# add the static file urls
self.app.router.add_static('/graphiql/static/', api_endpoint_static)
# add the graphiql endpoint
self.add_http_endpoint('/graphiql', GraphiQLRequestHandler)
[docs] async def login_user(self, password, **kwds):
"""
This function handles the registration of the given user credentials in the database
"""
# find the matching user with the given email
user_data = (await self._get_matching_user(fields=list(kwds.keys()), **kwds))['data']
try:
# look for a matching entry in the local database
passwordEntry = self.model.select().where(
self.model.user == user_data[root_query()][0]['pk']
)[0]
# if we couldn't acess the id of the result
except (KeyError, IndexError) as e:
# yell loudly
raise RuntimeError('Could not find matching registered user')
# if the given password matches the stored hash
if passwordEntry and passwordEntry.password == password:
# the remote entry for the user
user = user_data[root_query()][0]
# then return a dictionary with the user and sessionToken
return {
'user': user,
'sessionToken': self._user_session_token(user)
}
# otherwise the passwords don't match
raise RuntimeError("Incorrect credentials")
[docs] async def register_user(self, password, **kwds):
"""
This function is used to provide a sessionToken for later requests.
Args:
uid (str): The
"""
# so make one
user = await self._create_remote_user(password=password, **kwds)
# if there is no pk field
if not 'pk' in user:
# make sure the user has a pk field
user['pk'] = user['id']
# the query to find a matching query
match_query = self.model.user == user['id']
# if the user has already been registered
if self.model.select().where(match_query).count() > 0:
# yell loudly
raise RuntimeError('The user is already registered.')
# create an entry in the user password table
password = self.model(user=user['id'], password=password)
# save it to the database
password.save()
# return a dictionary with the user we created and a session token for later use
return {
'user': user,
'sessionToken': self._user_session_token(user)
}
[docs] async def object_resolver(self, object_name, fields, obey_auth=False, current_user=None, **filters):
"""
This function resolves a given object in the remote backend services
"""
try:
# check if an object with that name has been registered
registered = [model for model in self._external_service_data['models'] \
if model['name']==object_name][0]
# if there is no connection data yet
except AttributeError:
raise ValueError("No objects are registered with this schema yet.")
# if we dont recognize the model that was requested
except IndexError:
raise ValueError("Cannot query for object {} on this service.".format(object_name))
# the valid fields for this object
valid_fields = [field['name'] for field in registered['fields']]
# figure out if any invalid fields were requested
invalid_fields = [field for field in fields if field not in valid_fields]
try:
# make sure we never treat pk as invalid
invalid_fields.remove('pk')
# if they weren't asking for pk as a field
except ValueError:
pass
# if there were
if invalid_fields:
# yell loudly
raise ValueError("Cannot query for fields {!r} on {}".format(
invalid_fields, registered['name']
))
# make sure we include the id in the request
fields.append('pk')
# the query for model records
query = query_for_model(fields, **filters)
# the action type for the question
action_type = get_crud_action('read', object_name)
# query the appropriate stream for the information
response = await self.event_broker.ask(
action_type=action_type,
payload=query
)
# treat the reply like a json object
response_data = json.loads(response)
# if something went wrong
if 'errors' in response_data and response_data['errors']:
# return an empty response
raise ValueError(','.join(response_data['errors']))
# grab the valid list of matches
result = response_data['data'][root_query()]
# grab the auth handler for the object
auth_criteria = self.auth_criteria.get(object_name)
# if we care about auth requirements and there is one for this object
if obey_auth and auth_criteria:
# build a second list of authorized entries
authorized_results = []
# for each query result
for query_result in result:
# create a graph entity for the model
graph_entity = GraphEntity(self, model_type=object_name, id=query_result['pk'])
# if the auth handler passes
if await auth_criteria(model=graph_entity, user_id=current_user):
# add the result to the final list
authorized_results.append(query_result)
# overwrite the query result
result = authorized_results
# apply the auth handler to the result
return result
[docs] def user_session(self, user):
"""
This method handles what information the api gateway stores about
a particular user in their session.
"""
return {
'id': user['pk']
}
[docs] async def connection_resolver(self, connection_name, object):
try:
# grab the recorded data for this connection
expected = [ conn for conn in self._external_service_data['connections']\
if conn['name'] == connection_name][0]
# if there is no connection data yet
except AttributeError:
raise ValueError("No objects are registered with this schema yet.")
# if we dont recognize the model that was requested
except IndexError:
raise ValueError("Cannot query for {} on {}.".format(connection_name, object['name']))
# the target of the connection
to_service = expected['connection']['to']['service']
# ask for only the entries connected to the object
filters = {object['name']: object['pk']}
# the field of the connection is the model name
fields = [to_service]
# the query for model records
query = query_for_model(fields, **filters).replace("'", '"')
# the action type for the question
action_type = get_crud_action('read', connection_name)
# get the service name for the connection
response = json.loads(await self.event_broker.ask(
action_type=action_type,
payload=query
))
if 'errors' in response and response['errors']:
# return an empty response
raise ValueError(','.join(response['errors']))
# grab the ids from the response
ids = [int(entry[to_service]) for entry in response['data']['all_models']]
# the question for connected nodes
return ids, to_service
[docs] async def mutation_resolver(self, mutation_name, args, fields):
"""
the default behavior for mutations is to look up the event,
publish the correct event type with the args as the body,
and return the fields contained in the result
"""
try:
# make sure we can identify the mutation
mutation_summary = [mutation for mutation in \
self._external_service_data['mutations'] \
if mutation['name'] == mutation_name][0]
# if we couldn't get the first entry in the list
except KeyError as e:
# make sure the error is reported
raise ValueError("Could not execute mutation named: " + mutation_name)
# the function to use for running the mutation depends on its schronicity
# event_function = self.event_broker.ask \
# if mutation_summary['isAsync'] else self.event_broker.send
event_function = self.event_broker.ask
# send the event and wait for a response
value = await event_function(
action_type=mutation_summary['event'],
payload=args
)
try:
# return a dictionary with the values we asked for
return json.loads(value)
# if the result was not valid json
except json.decoder.JSONDecodeError:
# just throw the value
raise RuntimeError(value)
[docs] def get_models(self):
"""
Returns the models managed by this service.
Returns:
(list): the models managed by the service
"""
return [self.model]
## internal utilities
def _user_session_token(self, user):
# grab the session for this particular user
user_session = self.user_session(user)
# return the token signed by the services secret key
return generate_session_token(self.secret_key, **user_session)
def _read_session_token(self, token):
# make sure the token is valid while we're at it
return read_session_token(self.secret_key, token)
async def _get_matching_user(self, fields=[], **filters):
# the action type for a remote query
read_action = get_crud_action(method='read', model='user')
# the fields of the user to ask for
user_fields = ['pk'] + fields
# the query for matching entries
payload = """
query {
%s(%s) {
%s
}
}
""" % (root_query(), arg_string_from_dict(filters), '\n'.join(user_fields))
# perform the query and return the result
return json.loads(await self.event_broker.ask(
action_type=read_action,
payload=payload
))
async def _check_for_matching_user(self, **user_filters):
"""
This function checks if there is a user with the same uid in the
remote user service
Args:
**kwds : the filters of the user to check for
Returns:
(bool): wether or not there is a matching user
"""
# there is a matching user if there are no errors and no results from
user_data = self._get_matching_user(user_filters)
# return true if there were no errors and at lease one result
return not user_data['errors'] and len(user_data['data'][root_query()])
async def _create_remote_user(self, **payload):
"""
This method creates a service record in the remote user service
with the given email.
Args:
uid (str): the user identifier to create
Returns:
(dict): a summary of the user that was created
"""
# the action for reading user entries
read_action = get_crud_action(method='create', model='user')
# see if there is a matching user
user_data = await self.event_broker.ask(
action_type=read_action,
payload=payload
)
# treat the reply like a json object
return json.loads(user_data)