every.channel/nix/modules/ec-node.nix
2026-02-22 23:39:42 -08:00

578 lines
22 KiB
Nix

{ lib, config, pkgs, ... }:
let
cfg = config.services.every-channel.ec-node;
ecNodePkgDefault = pkgs.callPackage ../pkgs/ec-node.nix { };
ecCliPkgDefault = pkgs.callPackage ../pkgs/ec-cli.nix { };
# Minimal normalization for host strings.
normalizeHost = host:
let
h = lib.strings.removeSuffix "/" host;
in
if lib.strings.hasPrefix "http://" h || lib.strings.hasPrefix "https://" h then h else "http://${h}";
sanitizeUnitName = s:
lib.concatStringsSep "-" (lib.filter (x: x != "") (lib.splitString "/" (lib.replaceStrings [ " " ":" "." ] [ "-" "-" "-" ] s)));
hdhrBase = if cfg.hdhomerun.host != null then normalizeHost cfg.hdhomerun.host else null;
mkInputUrl = broadcast:
let
base =
if hdhrBase != null then hdhrBase
else if cfg.hdhomerun.deviceId != null then "http://${cfg.hdhomerun.deviceId}.local"
else null;
in
"${base}/auto/v${broadcast.channel}";
in
{
options.services.every-channel.ec-node = {
enable = lib.mkEnableOption "every.channel ec-node WebTransport publisher services";
package = lib.mkOption {
type = lib.types.package;
default = ecNodePkgDefault;
defaultText = "pkgs.callPackage /path/to/every.channel/nix/pkgs/ec-node.nix {}";
description = "The ec-node package to run.";
};
discoveryPackage = lib.mkOption {
type = lib.types.package;
default = ecCliPkgDefault;
defaultText = "pkgs.callPackage /path/to/every.channel/nix/pkgs/ec-cli.nix {}";
description = "Package used for HDHomeRun discovery when `hdhomerun.autoDiscover = true`.";
};
relayUrl = lib.mkOption {
type = lib.types.str;
default = "https://cdn.moq.dev/anon";
description = "MoQ relay URL for ec-node wt-publish.";
};
transcode = lib.mkOption {
type = lib.types.bool;
default = true;
description = "Whether ec-node should transcode to H.264/AAC before fragmenting.";
};
passthrough = lib.mkOption {
type = lib.types.bool;
default = false;
description = "Whether to transmit fMP4 fragments directly (moq-mux passthrough).";
};
tlsDisableVerify = lib.mkOption {
type = lib.types.bool;
default = false;
description = "Danger: disable TLS verification for the relay.";
};
extraArgs = lib.mkOption {
type = lib.types.listOf lib.types.str;
default = [ ];
description = "Extra arguments appended to each ec-node wt-publish invocation.";
};
environment = lib.mkOption {
type = lib.types.attrsOf lib.types.str;
default = {
RUST_LOG = "info";
# Cloudflare's relay is WebTransport-first; moq-native's WebSocket fallback can "win"
# the race and then fail MoQ negotiation, causing a tight reconnect loop.
MOQ_CLIENT_WEBSOCKET_ENABLED = "false";
};
description = "Environment variables for the publisher services.";
};
hdhomerun = {
host = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "HDHomeRun host/IP. When set, inputs are built as http://<host>/auto/v<channel>.";
};
deviceId = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "HDHomeRun device id (used as http://<deviceId>.local when host is unset).";
};
autoDiscover = lib.mkOption {
type = lib.types.bool;
default = false;
description = "If true and host is unset, attempt LAN discovery for the device id (best-effort).";
};
};
control = {
enable = lib.mkOption {
type = lib.types.bool;
default = false;
description = "Enable iroh-gossip control announcements from each wt-publish service.";
};
ttlMs = lib.mkOption {
type = lib.types.ints.positive;
default = 15000;
description = "Control announcement TTL passed to `ec-node wt-publish --control-ttl-ms`.";
};
intervalMs = lib.mkOption {
type = lib.types.ints.positive;
default = 5000;
description = "Control announcement interval passed to `ec-node wt-publish --control-interval-ms`.";
};
discovery = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
example = "dht,mdns,dns";
description = "Optional iroh discovery mode list for control announcements.";
};
irohSecret = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional iroh secret key (hex) for control announcement identity.";
};
gossipPeers = lib.mkOption {
type = lib.types.listOf lib.types.str;
default = [ ];
description = "Optional iroh endpoint addresses to seed control gossip joins.";
};
bridgeWeb = {
enable = lib.mkOption {
type = lib.types.bool;
default = false;
description = "Run `ec-node control-bridge-web` to upsert announced streams into the web directory.";
};
directoryUrl = lib.mkOption {
type = lib.types.str;
default = "https://every.channel";
description = "Directory base URL used by control-bridge-web for `/api/stream-upsert`.";
};
authTokenFile = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional file containing bearer token for `/api/stream-upsert`.";
};
timeoutMs = lib.mkOption {
type = lib.types.ints.u64;
default = 30000;
description = "Bridge run timeout; service restarts and refreshes gossip peers after this window (0 = run forever).";
};
maxAgeMs = lib.mkOption {
type = lib.types.ints.positive;
default = 60000;
description = "Maximum accepted staleness for control announcements consumed by the web bridge.";
};
streamPrefix = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional stream-id prefix filter for web upserts.";
};
discovery = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
example = "dht,mdns,dns";
description = "Optional discovery mode override for the web bridge; falls back to `control.discovery` when unset.";
};
};
};
broadcasts = lib.mkOption {
type = lib.types.listOf (lib.types.submodule {
options = {
name = lib.mkOption {
type = lib.types.str;
description = "Broadcast name published to the relay (used in watch links).";
};
channel = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "HDHomeRun guide number (e.g. 4.1, 8.1). Required unless `input` is set.";
};
input = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional explicit ffmpeg input URL/file. When set, HDHomeRun settings are ignored for this broadcast.";
};
};
});
default = [ ];
description = "List of broadcasts (name + channel, or explicit input) to publish.";
};
};
config = lib.mkIf cfg.enable {
assertions = [
{
assertion = cfg.broadcasts != [ ];
message = "services.every-channel.ec-node.broadcasts must be non-empty when enabled";
}
{
assertion =
let
needsHdhr = builtins.any (b: b.input == null) cfg.broadcasts;
in
(!needsHdhr) || (cfg.hdhomerun.host != null) || (cfg.hdhomerun.deviceId != null);
message = "Set services.every-channel.ec-node.hdhomerun.host or .deviceId (required when any broadcast omits `input`)";
}
{
assertion = !(cfg.hdhomerun.autoDiscover && cfg.hdhomerun.host != null);
message = "hdhomerun.autoDiscover only applies when hdhomerun.host is unset";
}
{
assertion = builtins.all (b: (b.input != null) || (b.channel != null)) cfg.broadcasts;
message = "Each broadcast must set either `input` or `channel`";
}
];
systemd.tmpfiles.rules = [
"d /run/every-channel 1777 root root - -"
];
systemd.services =
lib.listToAttrs (map
(b:
let
unit = "every-channel-wt-publish-${sanitizeUnitName b.name}";
runner = pkgs.writeShellApplication {
name = unit;
runtimeInputs =
[
pkgs.coreutils
pkgs.curl
pkgs.ffmpeg
pkgs.findutils
pkgs.gawk
pkgs.iproute2
cfg.package
]
++ lib.optionals cfg.hdhomerun.autoDiscover [ pkgs.jq cfg.discoveryPackage ];
text =
let
fixedHost = if cfg.hdhomerun.host != null then normalizeHost cfg.hdhomerun.host else "";
deviceId = cfg.hdhomerun.deviceId or "";
extraArgsLine =
if cfg.extraArgs == [ ] then
""
else
"cmd+=(${lib.concatStringsSep " " (map lib.escapeShellArg cfg.extraArgs)})";
explicitInputStr = if b.input == null then "" else b.input;
channelStr = if b.channel == null then "" else b.channel;
controlEndpointOutPath = "/run/every-channel/control-peer-${sanitizeUnitName b.name}.json";
controlDiscoveryStr = if cfg.control.discovery == null then "" else cfg.control.discovery;
controlIrohSecretStr = if cfg.control.irohSecret == null then "" else cfg.control.irohSecret;
controlGossipPeerLines = lib.concatMapStrings (peer: "cmd+=(--gossip-peer ${lib.escapeShellArg peer})\n") cfg.control.gossipPeers;
in
''
set -euo pipefail
input=""
explicit_input=${lib.escapeShellArg explicitInputStr}
if [[ -n "$explicit_input" ]]; then
input="$explicit_input"
else
ch=${lib.escapeShellArg channelStr}
if [[ -z "$ch" ]]; then
echo "ec-node: broadcast missing both input and channel" >&2
exit 2
fi
# Note: don't wrap lib.escapeShellArg in double-quotes, otherwise empty strings
# become a literal two-quote token and break discovery.
base=${lib.escapeShellArg fixedHost}
if [[ -z "$base" ]]; then
dev_id=${lib.escapeShellArg deviceId}
if [[ -z "$dev_id" ]]; then
echo "ec-node: missing hdhomerun.host and hdhomerun.deviceId" >&2
exit 2
fi
try_ip() {
local ip="$1"
local json id base_url
json="$(curl -fsS --connect-timeout 0.10 --max-time 0.20 "http://$ip/discover.json" 2>/dev/null || true)"
if [[ -z "$json" ]]; then
return 1
fi
id="$(printf '%s' "$json" | jq -r '.DeviceID // empty' 2>/dev/null || true)"
if [[ "$id" != "$dev_id" ]]; then
return 1
fi
base_url="$(printf '%s' "$json" | jq -r '.BaseURL // empty' 2>/dev/null || true)"
if [[ -z "$base_url" ]]; then
base_url="http://$ip"
fi
printf '%s\n' "$base_url"
return 0
}
if ${lib.boolToString cfg.hdhomerun.autoDiscover}; then
# Primary: UDP broadcast discover.
base="$(${cfg.discoveryPackage}/bin/ec-cli discover | jq -r --arg id "$dev_id" '.[] | select(.id == $id) | .base_url // empty' | head -n1 || true)"
# Fallback: probe known neighbors for /discover.json (fast; avoids full /24 scan).
if [[ -z "$base" ]]; then
while read -r ip; do
found="$(try_ip "$ip" || true)"
if [[ -n "$found" ]]; then
base="$found"
break
fi
done < <(ip neigh | awk '{print $1}' | sort -u)
fi
# Fallback: scan local /24 subnets for /discover.json (slow; worst-case ~50s).
if [[ -z "$base" ]]; then
while read -r cidr; do
ip_addr="''${cidr%/*}"
prefix="''${cidr#*/}"
if [[ "$prefix" != "24" ]]; then
continue
fi
net="''${ip_addr%.*}"
for i in $(seq 1 254); do
found="$(try_ip "$net.$i" || true)"
if [[ -n "$found" ]]; then
base="$found"
break
fi
done
if [[ -n "$base" ]]; then
break
fi
done < <(ip -o -4 addr show scope global | awk '{print $4}')
fi
if [[ -z "$base" ]]; then
echo "ec-node: HDHomeRun deviceId not found: $dev_id" >&2
exit 2
fi
else
# Best-effort mDNS convention.
base="http://$dev_id.local"
fi
fi
base="''${base%/}"
if [[ "$base" != http://* && "$base" != https://* ]]; then
base="http://$base"
fi
# HDHomeRun streaming is on port 5004, regardless of the discover BaseURL.
hostport="''${base#http://}"
hostport="''${hostport#https://}"
hostport="''${hostport%%/*}"
host="''${hostport%%:*}"
input="http://$host:5004/auto/v$ch"
fi
cmd=(
${lib.escapeShellArg "${cfg.package}/bin/ec-node"}
wt-publish
--url ${lib.escapeShellArg cfg.relayUrl}
--name ${lib.escapeShellArg b.name}
--input "$input"
)
${lib.optionalString (!cfg.transcode) "cmd+=(--transcode=false)"}
${lib.optionalString (!cfg.passthrough) "cmd+=(--passthrough=false)"}
${lib.optionalString cfg.tlsDisableVerify "cmd+=(--tls-disable-verify)"}
${lib.optionalString cfg.control.enable ''
cmd+=(--control-announce)
cmd+=(--control-ttl-ms ${toString cfg.control.ttlMs})
cmd+=(--control-interval-ms ${toString cfg.control.intervalMs})
control_discovery=${lib.escapeShellArg controlDiscoveryStr}
if [[ -n "$control_discovery" ]]; then
cmd+=(--discovery "$control_discovery")
fi
control_iroh_secret=${lib.escapeShellArg controlIrohSecretStr}
if [[ -n "$control_iroh_secret" ]]; then
cmd+=(--iroh-secret "$control_iroh_secret")
fi
cmd+=(--control-endpoint-addr-out ${lib.escapeShellArg controlEndpointOutPath})
${controlGossipPeerLines}
''}
${extraArgsLine}
# Keep the unit alive even if the relay is temporarily unreachable.
# This avoids `switch-to-configuration test` failing due to a unit that exits
# quickly during activation.
trap 'exit 0' INT TERM
while true; do
"''${cmd[@]}" || true
sleep 2
done
'';
};
in
{
name = unit;
value = {
description = "every.channel WebTransport publish (${b.name} -> ${cfg.relayUrl})";
wantedBy = [ "multi-user.target" ];
after = [ "network-online.target" ];
wants = [ "network-online.target" ];
# Keep the unit from entering "failed" due to rapid restarts (deploy-flake treats
# failed units during `switch-to-configuration test` as a deployment failure).
#
# If the relay is temporarily unreachable, we want systemd to keep retrying without
# tripping rate limits.
unitConfig = {
StartLimitIntervalSec = 0;
};
serviceConfig = {
Type = "simple";
ExecStart = "${runner}/bin/${unit}";
Restart = "always";
RestartSec = 2;
DynamicUser = true;
NoNewPrivileges = true;
PrivateTmp = true;
ProtectSystem = "strict";
ProtectHome = true;
ProtectKernelTunables = true;
ProtectKernelModules = true;
ProtectControlGroups = true;
LockPersonality = true;
MemoryDenyWriteExecute = true;
RestrictSUIDSGID = true;
RestrictRealtime = true;
SystemCallArchitectures = "native";
ReadWritePaths = lib.optionals cfg.control.enable [ "/run/every-channel" ];
};
environment = cfg.environment;
};
})
cfg.broadcasts)
// lib.optionalAttrs (cfg.control.enable && cfg.control.bridgeWeb.enable)
(let
bridgeUnit = "every-channel-control-bridge-web";
bridgePeerFiles = map (b: "/run/every-channel/control-peer-${sanitizeUnitName b.name}.json") cfg.broadcasts;
bridgeDiscoveryStr =
if cfg.control.bridgeWeb.discovery != null then cfg.control.bridgeWeb.discovery
else if cfg.control.discovery != null then cfg.control.discovery
else "";
bridgeStreamPrefixStr = if cfg.control.bridgeWeb.streamPrefix == null then "" else cfg.control.bridgeWeb.streamPrefix;
bridgeAuthTokenFile = if cfg.control.bridgeWeb.authTokenFile == null then "" else cfg.control.bridgeWeb.authTokenFile;
staticGossipPeerLines = lib.concatMapStrings (peer: ''
cmd+=(--gossip-peer ${lib.escapeShellArg peer})
have_peer=1
'') cfg.control.gossipPeers;
peerFileLoopLines = lib.concatMapStrings (peerFile: ''
if [[ -s ${lib.escapeShellArg peerFile} ]]; then
peer="$(tr -d '\r\n' < ${lib.escapeShellArg peerFile})"
if [[ -n "$peer" ]]; then
cmd+=(--gossip-peer "$peer")
have_peer=1
fi
fi
'') bridgePeerFiles;
bridgeRunner = pkgs.writeShellApplication {
name = bridgeUnit;
runtimeInputs = [
pkgs.coreutils
cfg.package
];
text = ''
set -euo pipefail
directory_url=${lib.escapeShellArg cfg.control.bridgeWeb.directoryUrl}
bridge_discovery=${lib.escapeShellArg bridgeDiscoveryStr}
stream_prefix=${lib.escapeShellArg bridgeStreamPrefixStr}
auth_token_file=${lib.escapeShellArg bridgeAuthTokenFile}
while true; do
cmd=(
${lib.escapeShellArg "${cfg.package}/bin/ec-node"}
control-bridge-web
--directory-url "$directory_url"
--timeout-ms ${toString cfg.control.bridgeWeb.timeoutMs}
--max-age-ms ${toString cfg.control.bridgeWeb.maxAgeMs}
)
if [[ -n "$bridge_discovery" ]]; then
cmd+=(--discovery "$bridge_discovery")
fi
if [[ -n "$stream_prefix" ]]; then
cmd+=(--stream-prefix "$stream_prefix")
fi
if [[ -n "$auth_token_file" && -r "$auth_token_file" ]]; then
token="$(tr -d '\r\n' < "$auth_token_file")"
if [[ -n "$token" ]]; then
cmd+=(--auth-token "$token")
fi
fi
have_peer=0
${peerFileLoopLines}
${staticGossipPeerLines}
if [[ "$have_peer" -eq 0 ]]; then
sleep 2
continue
fi
"''${cmd[@]}" || true
sleep 2
done
'';
};
in
{
"${bridgeUnit}" = {
description = "every.channel control bridge to web directory";
wantedBy = [ "multi-user.target" ];
after =
[ "network-online.target" ]
++ map (b: "every-channel-wt-publish-${sanitizeUnitName b.name}.service") cfg.broadcasts;
wants =
[ "network-online.target" ]
++ map (b: "every-channel-wt-publish-${sanitizeUnitName b.name}.service") cfg.broadcasts;
unitConfig = {
StartLimitIntervalSec = 0;
};
serviceConfig = {
Type = "simple";
ExecStart = "${bridgeRunner}/bin/${bridgeUnit}";
Restart = "always";
RestartSec = 2;
DynamicUser = true;
NoNewPrivileges = true;
PrivateTmp = true;
ProtectSystem = "strict";
ProtectHome = true;
ProtectKernelTunables = true;
ProtectKernelModules = true;
ProtectControlGroups = true;
LockPersonality = true;
MemoryDenyWriteExecute = true;
RestrictSUIDSGID = true;
RestrictRealtime = true;
SystemCallArchitectures = "native";
};
environment = cfg.environment;
};
});
};
}