144 lines
4.5 KiB
Python
Executable file
144 lines
4.5 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""Minimal HTTP server for netboot artifacts with optional CIDR/token controls."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import ipaddress
|
|
import pathlib
|
|
import urllib.parse
|
|
from http import HTTPStatus
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from typing import Union
|
|
|
|
|
|
ALLOWED_FILES = {
|
|
"kernel": "application/octet-stream",
|
|
"bzImage": "application/octet-stream",
|
|
"initrd": "application/octet-stream",
|
|
"netboot.ipxe": "text/plain; charset=utf-8",
|
|
}
|
|
|
|
Network = Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
|
|
|
|
|
|
class NetbootHTTPServer(ThreadingHTTPServer):
|
|
def __init__(
|
|
self,
|
|
server_address: tuple[str, int],
|
|
root: pathlib.Path,
|
|
allowed_networks: list[Network],
|
|
netboot_token: str | None,
|
|
) -> None:
|
|
super().__init__(server_address, NetbootRequestHandler)
|
|
self.root = root
|
|
self.allowed_networks = allowed_networks
|
|
self.netboot_token = netboot_token
|
|
|
|
|
|
class NetbootRequestHandler(BaseHTTPRequestHandler):
|
|
server_version = "ec-netboot/1.0"
|
|
protocol_version = "HTTP/1.1"
|
|
|
|
def do_GET(self) -> None:
|
|
self._serve_file(include_body=True)
|
|
|
|
def do_HEAD(self) -> None:
|
|
self._serve_file(include_body=False)
|
|
|
|
def _serve_file(self, include_body: bool) -> None:
|
|
if not self._client_allowed():
|
|
self._send_error(HTTPStatus.FORBIDDEN, "client not allowed")
|
|
return
|
|
|
|
parsed = urllib.parse.urlparse(self.path)
|
|
path = parsed.path.lstrip("/")
|
|
if path not in ALLOWED_FILES:
|
|
self._send_error(HTTPStatus.NOT_FOUND, "file not found")
|
|
return
|
|
|
|
if path == "netboot.ipxe" and self.server.netboot_token:
|
|
query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True)
|
|
token = query.get("token", [""])[0]
|
|
if token != self.server.netboot_token:
|
|
self._send_error(HTTPStatus.FORBIDDEN, "missing or invalid token")
|
|
return
|
|
|
|
if path == "bzImage":
|
|
file_path = self.server.root / "kernel"
|
|
else:
|
|
file_path = self.server.root / path
|
|
if not file_path.is_file():
|
|
self._send_error(HTTPStatus.NOT_FOUND, "file not found")
|
|
return
|
|
|
|
stat = file_path.stat()
|
|
self.send_response(HTTPStatus.OK)
|
|
self.send_header("Content-Type", ALLOWED_FILES[path])
|
|
self.send_header("Content-Length", str(stat.st_size))
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.end_headers()
|
|
|
|
if not include_body:
|
|
return
|
|
|
|
with file_path.open("rb") as handle:
|
|
while True:
|
|
chunk = handle.read(64 * 1024)
|
|
if not chunk:
|
|
break
|
|
self.wfile.write(chunk)
|
|
|
|
def _client_allowed(self) -> bool:
|
|
networks = self.server.allowed_networks
|
|
if not networks:
|
|
return True
|
|
|
|
try:
|
|
client_ip = ipaddress.ip_address(self.client_address[0])
|
|
except ValueError:
|
|
return False
|
|
|
|
return any(client_ip in network for network in networks)
|
|
|
|
def _send_error(self, code: HTTPStatus, message: str) -> None:
|
|
payload = (message + "\n").encode("utf-8")
|
|
self.send_response(code)
|
|
self.send_header("Content-Type", "text/plain; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(payload)))
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.end_headers()
|
|
self.wfile.write(payload)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--bind-ip", required=True)
|
|
parser.add_argument("--port", type=int, required=True)
|
|
parser.add_argument("--root", required=True)
|
|
parser.add_argument("--allow-cidr", action="append", default=[])
|
|
parser.add_argument("--netboot-token", default="")
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
|
|
root = pathlib.Path(args.root).resolve()
|
|
if not root.is_dir():
|
|
raise SystemExit(f"error: root directory does not exist: {root}")
|
|
|
|
allowed_networks = []
|
|
for cidr in args.allow_cidr:
|
|
try:
|
|
allowed_networks.append(ipaddress.ip_network(cidr, strict=False))
|
|
except ValueError as exc:
|
|
raise SystemExit(f"error: invalid CIDR '{cidr}': {exc}") from exc
|
|
|
|
token = args.netboot_token or None
|
|
server = NetbootHTTPServer((args.bind_ip, args.port), root, allowed_networks, token)
|
|
server.serve_forever()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|