from __future__ import annotations import ipaddress import logging import quart import trio import capport.comm.hub import capport.comm.message import capport.database import capport.utils.cli import capport.utils.ipneigh from capport import cptypes from .app import app from .lang import render_i18n_template _logger = logging.getLogger(__name__) def get_client_ip() -> cptypes.IPAddress: remote_addr = quart.request.remote_addr if not remote_addr: quart.abort(500, 'Missing client address') try: addr = ipaddress.ip_address(remote_addr) except ValueError as e: _logger.warning(f'Invalid client address {remote_addr!r}: {e}') quart.abort(500, 'Invalid client address') return addr async def get_client_mac_if_present( address: cptypes.IPAddress | None = None, ) -> cptypes.MacAddress | None: assert app.my_nc # for mypy if not address: address = get_client_ip() return await app.my_nc.get_neighbor_mac(address) async def get_client_mac(address: cptypes.IPAddress | None = None) -> cptypes.MacAddress: mac = await get_client_mac_if_present(address) if mac is None: _logger.warning(f"Couldn't find MAC addresss for {address}") quart.abort(404, 'Unknown client') return mac async def user_login(address: cptypes.IPAddress, mac: cptypes.MacAddress) -> None: assert app.my_hub # for mypy async with app.my_hub.database.make_changes() as pu: try: pu.login(mac, app.my_config.session_timeout) except capport.database.NotReadyYet as e: quart.abort(500, str(e)) if pu: _logger.debug(f'User {mac} (with IP {address}) logged in') for msg in pu.serialized: await app.my_hub.broadcast(msg) async def user_logout(mac: cptypes.MacAddress) -> None: assert app.my_hub # for mypy async with app.my_hub.database.make_changes() as pu: try: pu.logout(mac) except capport.database.NotReadyYet as e: quart.abort(500, str(e)) if pu: _logger.debug(f'User {mac} logged out') for msg in pu.serialized: await app.my_hub.broadcast(msg) async def user_lookup() -> cptypes.MacPublicState: assert app.my_hub # for mypy address = get_client_ip() mac = await get_client_mac_if_present(address) if not mac: return cptypes.MacPublicState.from_missing_mac(address) else: return app.my_hub.database.lookup(address, mac) # @app.route('/all') # async def route_all(): # return app.my_hub.database.as_json() def check_self_origin(): origin = quart.request.headers.get('Origin', None) if origin is None: # not a request by a modern browser - probably curl or something similar. don't care. return origin = origin.lower().strip() if origin == 'none': quart.abort(403, 'Origin is none') origin_parts = origin.split('/') # Origin should look like: :// (optionally followed by :) if len(origin_parts) < 3: quart.abort(400, 'Broken Origin header') if origin_parts[0] != 'https:' and not app.my_config.debug: # -> require https in production quart.abort(403, 'Non-https Origin not allowed') origin_host = origin_parts[2] host = quart.request.headers.get('Host', None) if host is None: quart.abort(403, 'Missing Host header') if host.lower() != origin_host: quart.abort(403, 'Origin mismatch') @app.route('/', methods=['GET']) async def index(missing_accept: bool = False): state = await user_lookup() if not state.mac: return await render_i18n_template('index_unknown.html', state=state, missing_accept=missing_accept) elif state.allowed: return await render_i18n_template('index_active.html', state=state, missing_accept=missing_accept) else: return await render_i18n_template('index_inactive.html', state=state, missing_accept=missing_accept) @app.route('/login', methods=['POST']) async def login(): check_self_origin() with trio.fail_after(5.0): form = await quart.request.form if form.get('accept') != '1': return await index(missing_accept=True) req_mac = form.get('mac') if not req_mac: quart.abort(400, description='Missing MAC in request form data') address = get_client_ip() mac = await get_client_mac(address) if str(mac) != req_mac: quart.abort(403, description="Passed MAC in request form doesn't match client address") await user_login(address, mac) return quart.redirect('/', code=303) @app.route('/logout', methods=['POST']) async def logout(): check_self_origin() with trio.fail_after(5.0): form = await quart.request.form req_mac = form.get('mac') if not req_mac: quart.abort(400, description='Missing MAC in request form data') mac = await get_client_mac() if str(mac) != req_mac: quart.abort(403, description="Passed MAC in request form doesn't match client address") await user_logout(mac) return quart.redirect('/', code=303) @app.route('/api/captive-portal', methods=['GET']) # RFC 8908: https://datatracker.ietf.org/doc/html/rfc8908 async def captive_api(): state = await user_lookup() return state.to_rfc8908(app.my_config)