#!/usr/bin/env python3 """Minimal HTTP server for netboot artifacts with optional CIDR/token controls.""" from __future__ import annotations import argparse import ipaddress import pathlib import urllib.parse from http import HTTPStatus from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from typing import Union ALLOWED_FILES = { "kernel": "application/octet-stream", "bzImage": "application/octet-stream", "initrd": "application/octet-stream", "netboot.ipxe": "text/plain; charset=utf-8", } Network = Union[ipaddress.IPv4Network, ipaddress.IPv6Network] class NetbootHTTPServer(ThreadingHTTPServer): def __init__( self, server_address: tuple[str, int], root: pathlib.Path, allowed_networks: list[Network], netboot_token: str | None, ) -> None: super().__init__(server_address, NetbootRequestHandler) self.root = root self.allowed_networks = allowed_networks self.netboot_token = netboot_token class NetbootRequestHandler(BaseHTTPRequestHandler): server_version = "ec-netboot/1.0" protocol_version = "HTTP/1.1" def do_GET(self) -> None: self._serve_file(include_body=True) def do_HEAD(self) -> None: self._serve_file(include_body=False) def _serve_file(self, include_body: bool) -> None: if not self._client_allowed(): self._send_error(HTTPStatus.FORBIDDEN, "client not allowed") return parsed = urllib.parse.urlparse(self.path) path = parsed.path.lstrip("/") if path not in ALLOWED_FILES: self._send_error(HTTPStatus.NOT_FOUND, "file not found") return if path == "netboot.ipxe" and self.server.netboot_token: query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True) token = query.get("token", [""])[0] if token != self.server.netboot_token: self._send_error(HTTPStatus.FORBIDDEN, "missing or invalid token") return if path == "bzImage": file_path = self.server.root / "kernel" else: file_path = self.server.root / path if not file_path.is_file(): self._send_error(HTTPStatus.NOT_FOUND, "file not found") return stat = file_path.stat() self.send_response(HTTPStatus.OK) self.send_header("Content-Type", ALLOWED_FILES[path]) self.send_header("Content-Length", str(stat.st_size)) self.send_header("Cache-Control", "no-store") self.end_headers() if not include_body: return with file_path.open("rb") as handle: while True: chunk = handle.read(64 * 1024) if not chunk: break self.wfile.write(chunk) def _client_allowed(self) -> bool: networks = self.server.allowed_networks if not networks: return True try: client_ip = ipaddress.ip_address(self.client_address[0]) except ValueError: return False return any(client_ip in network for network in networks) def _send_error(self, code: HTTPStatus, message: str) -> None: payload = (message + "\n").encode("utf-8") self.send_response(code) self.send_header("Content-Type", "text/plain; charset=utf-8") self.send_header("Content-Length", str(len(payload))) self.send_header("Cache-Control", "no-store") self.end_headers() self.wfile.write(payload) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument("--bind-ip", required=True) parser.add_argument("--port", type=int, required=True) parser.add_argument("--root", required=True) parser.add_argument("--allow-cidr", action="append", default=[]) parser.add_argument("--netboot-token", default="") return parser.parse_args() def main() -> None: args = parse_args() root = pathlib.Path(args.root).resolve() if not root.is_dir(): raise SystemExit(f"error: root directory does not exist: {root}") allowed_networks = [] for cidr in args.allow_cidr: try: allowed_networks.append(ipaddress.ip_network(cidr, strict=False)) except ValueError as exc: raise SystemExit(f"error: invalid CIDR '{cidr}': {exc}") from exc token = args.netboot_token or None server = NetbootHTTPServer((args.bind_ip, args.port), root, allowed_networks, token) server.serve_forever() if __name__ == "__main__": main()