from __future__ import annotations import ipaddress import logging import typing import capport.database import capport.comm.hub import capport.comm.message import capport.utils.cli import capport.utils.ipneigh import quart from capport import cptypes from .app import app _logger = logging.getLogger(__name__) def get_client_ip() -> cptypes.IPAddress: try: addr = ipaddress.ip_address(quart.request.remote_addr) except ValueError as e: _logger.warning(f'Invalid client address {quart.request.remote_addr!r}: {e}') quart.abort(500, 'Invalid client address') if addr.is_loopback: forw_addr_headers = quart.request.headers.getlist('X-Forwarded-For') if len(forw_addr_headers) == 1: try: return ipaddress.ip_address(forw_addr_headers[0]) except ValueError as e: _logger.warning(f'Invalid forwarded client address {forw_addr_headers!r} (from {addr}): {e}') quart.abort(500, 'Invalid client address') elif forw_addr_headers: _logger.warning(f'Multiple forwarded client addresses {forw_addr_headers!r} (from {addr})') quart.abort(500, 'Invalid client address') return addr async def get_client_mac_if_present(address: typing.Optional[cptypes.IPAddress]=None) -> typing.Optional[cptypes.MacAddress]: assert app.my_nc # for mypy if not address: address = get_client_ip() return await app.my_nc.get_neighbor_mac(address) async def get_client_mac(address: typing.Optional[cptypes.IPAddress]=None) -> cptypes.MacAddress: mac = await get_client_mac_if_present(address) if mac is None: _logger.warning(f"Couldn't find MAC addresss for {address}") quart.abort(404, 'Unknown client') return mac async def user_login(address: cptypes.IPAddress, mac: cptypes.MacAddress) -> None: assert app.my_hub # for mypy pu = capport.database.PendingUpdates() try: app.my_hub.database.login(mac, app.my_config.session_timeout, pending_updates=pu) except capport.database.NotReadyYet as e: quart.abort(500, str(e)) if pu.macs: _logger.info(f'User {mac} (with IP {address}) logged in') for msg in pu.serialize(): await app.my_hub.broadcast(msg) async def user_logout(mac: cptypes.MacAddress) -> None: assert app.my_hub # for mypy pu = capport.database.PendingUpdates() try: app.my_hub.database.logout(mac, pending_updates=pu) except capport.database.NotReadyYet as e: quart.abort(500, str(e)) if pu.macs: _logger.info(f'User {mac} logged out') for msg in pu.serialize(): await app.my_hub.broadcast(msg) async def user_lookup() -> cptypes.MacPublicState: assert app.my_hub # for mypy address = get_client_ip() mac = await get_client_mac_if_present(address) if not mac: return cptypes.MacPublicState.from_missing_mac(address) else: return app.my_hub.database.lookup(address, mac) # @app.route('/all') # async def route_all(): # return app.my_hub.database.as_json() @app.route('/', methods=['GET']) async def index(): state = await user_lookup() return await quart.render_template('index.html', state=state) @app.route('/login', methods=['POST']) async def login(): address = get_client_ip() mac = await get_client_mac(address) await user_login(address, mac) await quart.flash('Logged in') return quart.redirect('/', code=303) @app.route('/logout', methods=['POST']) async def logout(): mac = await get_client_mac() await user_logout(mac) await quart.flash('Logged out') return quart.redirect('/', code=303) @app.route('/api/captive-portal', methods=['GET']) # RFC 8908: https://datatracker.ietf.org/doc/html/rfc8908 async def captive_api(): state = await user_lookup() return state.to_rfc8908(app.my_config)