every.channel/nix/modules/ec-node.nix
every.channel 91dad67fc2
Some checks failed
deploy-cloudflare / checks (push) Failing after 3s
ci-gates / checks (push) Failing after 5s
deploy-cloudflare / deploy (push) Has been skipped
Add duplicate publisher determinism proof
2026-06-10 03:33:46 -07:00

1901 lines
75 KiB
Nix

{ lib, config, pkgs, ... }:
let
cfg = config.services.every-channel.ec-node;
ecNodePkgDefault = pkgs.callPackage ../pkgs/ec-node.nix { };
ecCliPkgDefault = pkgs.callPackage ../pkgs/ec-cli.nix { };
nodeEntrypoint = pkgs.callPackage ../pkgs/every-channel-node.nix {
ec-node = cfg.package;
moq-relay = null;
};
# Minimal normalization for host strings.
normalizeHost = host:
let
h = lib.strings.removeSuffix "/" host;
in
if lib.strings.hasPrefix "http://" h || lib.strings.hasPrefix "https://" h then h else "http://${h}";
sanitizeUnitName = s:
lib.concatStringsSep "-" (lib.filter (x: x != "") (lib.splitString "/" (lib.replaceStrings [ " " ":" "." ] [ "-" "-" "-" ] s)));
archiveConvergenceProofUnitName = proof:
"every-channel-archive-convergence-${sanitizeUnitName proof.name}";
archiveConvergenceMeasureProofUnitName = proof:
"every-channel-archive-convergence-measure-${sanitizeUnitName proof.name}";
hdhrBase = if cfg.hdhomerun.host != null then normalizeHost cfg.hdhomerun.host else null;
normalizePath = path: lib.removeSuffix "/" (toString path);
sameOrUnder = parent: child:
let
p = normalizePath parent;
c = normalizePath child;
in
c == p || lib.hasPrefix "${p}/" c;
mkInputUrl = broadcast:
let
base =
if hdhrBase != null then hdhrBase
else if cfg.hdhomerun.deviceId != null then "http://${cfg.hdhomerun.deviceId}.local"
else null;
in
"${base}/auto/v${broadcast.channel}";
otelEnvironment =
lib.optionalAttrs (cfg.observability.otelTracesEndpoint != null) {
EVERY_CHANNEL_OTEL_TRACES_ENDPOINT = cfg.observability.otelTracesEndpoint;
OTEL_SERVICE_NAME = cfg.observability.serviceName;
};
serviceEnvironment = {
EVERY_CHANNEL_NODE_NAME = config.networking.hostName;
} // cfg.environment // otelEnvironment;
mkArchiveConvergenceProofService = proof:
let
unit = archiveConvergenceProofUnitName proof;
sourceArgLines = lib.concatMapStrings (source: "cmd+=(--source ${lib.escapeShellArg source})\n") proof.sources;
optionalArgLine = flag: value:
lib.optionalString (value != null) "cmd+=(${flag} ${lib.escapeShellArg (toString value)})\n";
runner = pkgs.writeShellApplication {
name = unit;
runtimeInputs = [
cfg.package
];
text = ''
set -euo pipefail
cmd=(
${lib.escapeShellArg "${cfg.package}/bin/ec-node"}
archive-convergence-serve
--broadcast ${lib.escapeShellArg proof.broadcast}
--track ${lib.escapeShellArg proof.track}
--listen ${lib.escapeShellArg proof.listen}
--metrics-role ${lib.escapeShellArg proof.metricsRole}
)
${sourceArgLines}
${optionalArgLine "--stream-id" proof.streamId}
${optionalArgLine "--rendition" proof.rendition}
${optionalArgLine "--start-sequence" proof.startSequence}
${optionalArgLine "--end-sequence" proof.endSequence}
${optionalArgLine "--metrics-node" proof.metricsNode}
exec "''${cmd[@]}"
'';
};
in
{
name = unit;
value = {
description = "every.channel archive convergence metrics (${proof.name})";
wantedBy = [ "multi-user.target" ];
after = [ "network-online.target" ];
wants = [ "network-online.target" ];
unitConfig = {
StartLimitIntervalSec = 0;
};
serviceConfig = {
Type = "simple";
ExecStart = "${runner}/bin/${unit}";
Restart = "always";
RestartSec = 2;
NoNewPrivileges = true;
PrivateTmp = true;
ProtectSystem = "strict";
ProtectHome = true;
ProtectKernelTunables = true;
ProtectKernelModules = true;
ProtectControlGroups = true;
LockPersonality = true;
MemoryDenyWriteExecute = true;
RestrictSUIDSGID = true;
RestrictRealtime = true;
SystemCallArchitectures = "native";
};
environment = serviceEnvironment;
};
};
mkArchiveConvergenceMeasureProofService = proof:
let
unit = archiveConvergenceMeasureProofUnitName proof;
agentArgLines = lib.concatMapStrings (source: "cmd+=(--agent-manifest ${lib.escapeShellArg source})\n") proof.agentManifests;
agentPrometheusSdArgLines = lib.concatMapStrings (source: "cmd+=(--agent-prometheus-sd ${lib.escapeShellArg source})\n") proof.agentPrometheusSdFiles;
agentPrometheusSdLabelArgLines = lib.concatMapStrings (label: "cmd+=(--agent-prometheus-sd-label ${lib.escapeShellArg label})\n") proof.agentPrometheusSdLabels;
manifestArgLines = lib.concatMapStrings (source: "cmd+=(--manifest ${lib.escapeShellArg source})\n") proof.manifests;
optionalArgLine = flag: value:
lib.optionalString (value != null) "cmd+=(${flag} ${lib.escapeShellArg (toString value)})\n";
runner = pkgs.writeShellApplication {
name = unit;
runtimeInputs = [
cfg.package
];
text = ''
set -euo pipefail
cmd=(
${lib.escapeShellArg "${cfg.package}/bin/ec-node"}
archive-convergence-measure-serve
--broadcast ${lib.escapeShellArg proof.broadcast}
--track ${lib.escapeShellArg proof.track}
--listen ${lib.escapeShellArg proof.listen}
--agent-manifest-role ${lib.escapeShellArg proof.agentManifestRole}
--timeout-ms ${toString proof.timeoutMs}
--max-manifest-bytes ${toString proof.maxManifestBytes}
--max-samples ${toString proof.maxSamples}
--min-elapsed-seconds ${toString proof.minElapsedSeconds}
--metrics-role ${lib.escapeShellArg proof.metricsRole}
)
${agentArgLines}
${agentPrometheusSdArgLines}
${agentPrometheusSdLabelArgLines}
${manifestArgLines}
${optionalArgLine "--stream-id" proof.streamId}
${optionalArgLine "--rendition" proof.rendition}
${optionalArgLine "--start-sequence" proof.startSequence}
${optionalArgLine "--end-sequence" proof.endSequence}
${optionalArgLine "--prometheus-url" proof.prometheusUrl}
${optionalArgLine "--metrics-node" proof.metricsNode}
exec "''${cmd[@]}"
'';
};
in
{
name = unit;
value = {
description = "every.channel remote archive convergence measurement metrics (${proof.name})";
wantedBy = [ "multi-user.target" ];
after = [ "network-online.target" ];
wants = [ "network-online.target" ];
unitConfig = {
StartLimitIntervalSec = 0;
};
serviceConfig = {
Type = "simple";
ExecStart = "${runner}/bin/${unit}";
Restart = "always";
RestartSec = 2;
NoNewPrivileges = true;
PrivateTmp = true;
ProtectSystem = "strict";
ProtectHome = true;
ProtectKernelTunables = true;
ProtectKernelModules = true;
ProtectControlGroups = true;
LockPersonality = true;
MemoryDenyWriteExecute = true;
RestrictSUIDSGID = true;
RestrictRealtime = true;
SystemCallArchitectures = "native";
};
environment = serviceEnvironment;
};
};
in
{
options.services.every-channel.ec-node = {
enable = lib.mkEnableOption "every.channel ec-node WebTransport publisher services";
package = lib.mkOption {
type = lib.types.package;
default = ecNodePkgDefault;
defaultText = "pkgs.callPackage /path/to/every.channel/nix/pkgs/ec-node.nix {}";
description = "The ec-node package to run.";
};
discoveryPackage = lib.mkOption {
type = lib.types.package;
default = ecCliPkgDefault;
defaultText = "pkgs.callPackage /path/to/every.channel/nix/pkgs/ec-cli.nix {}";
description = "Package used for HDHomeRun discovery when `hdhomerun.autoDiscover = true`.";
};
relayUrl = lib.mkOption {
type = lib.types.str;
default = "https://relay.every.channel/anon";
description = "MoQ relay URL for ec-node wt-publish.";
};
relayAnnouncedWatchdogMs = lib.mkOption {
type = lib.types.int;
default = 90000;
description = "Restart wt-publish when the relay /announced endpoint stops listing the broadcast for this many milliseconds. Set 0 to disable.";
};
relayAnnouncedWatchdogIntervalMs = lib.mkOption {
type = lib.types.int;
default = 10000;
description = "Polling interval for the wt-publish relay announcement watchdog.";
};
transcode = lib.mkOption {
type = lib.types.bool;
default = true;
description = "Whether ec-node should transcode to H.264/AAC before fragmenting.";
};
videoFilter = lib.mkOption {
type = lib.types.str;
default = "yadif=mode=send_frame:parity=auto:deint=interlaced,fps=30000/1001";
description = "ffmpeg video filter passed to ec-node wt-publish when transcoding.";
};
gopFrames = lib.mkOption {
type = lib.types.int;
default = 30;
description = "H.264 GOP/keyframe interval in frames for ec-node wt-publish.";
};
videoPreset = lib.mkOption {
type = lib.types.str;
default = "medium";
description = "x264 preset for ec-node wt-publish. Slower presets trade publisher CPU for lower bitrate at the same CRF.";
};
videoCrf = lib.mkOption {
type = lib.types.int;
default = 23;
description = "x264 CRF for ec-node wt-publish.";
};
publisherArchiveSegmentDurationMs = lib.mkOption {
type = lib.types.int;
default = 1001;
description = "Publisher-origin proof segment duration passed to ec-node wt-publish.";
};
publisherStartBoundaryMs = lib.mkOption {
type = lib.types.int;
default = 1001;
description = "Unix-epoch cadence boundary used before starting publisher ffmpeg.";
};
publisherArchive = {
enable = lib.mkOption {
type = lib.types.bool;
default = false;
description = "Enable publisher-origin source-window proof archive records for direct wt-publish units.";
};
outputDir = lib.mkOption {
type = lib.types.str;
default = "/srv/every-channel/archive-buffer";
description = "Publisher-local CAS cache root passed to ec-node wt-publish.";
};
manifestDir = lib.mkOption {
type = lib.types.str;
default = "/srv/every-channel/archive-buffer/manifests";
description = "Publisher-local manifest root passed to ec-node wt-publish.";
};
sourceNode = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Source node identity stamped into publisher-origin proof records.";
};
track = lib.mkOption {
type = lib.types.str;
default = "publisher.m4s";
description = "Publisher-origin proof track name.";
};
};
passthrough = lib.mkOption {
type = lib.types.bool;
default = false;
description = "Compatibility flag for older moq-mux passthrough selection; moq-mux 0.4 preserves generated CMAF fragments.";
};
tlsDisableVerify = lib.mkOption {
type = lib.types.bool;
default = false;
description = "Danger: disable TLS verification for the relay.";
};
extraArgs = lib.mkOption {
type = lib.types.listOf lib.types.str;
default = [ ];
description = "Extra arguments appended to each ec-node wt-publish invocation.";
};
ntscRsCli = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional ntsc-rs-cli binary path used by broadcasts that set `ntscRsPreset`.";
};
environment = lib.mkOption {
type = lib.types.attrsOf lib.types.str;
default = {
RUST_LOG = "info";
# Cloudflare's relay is WebTransport-first; moq-native's WebSocket fallback can "win"
# the race and then fail MoQ negotiation, causing a tight reconnect loop.
MOQ_CLIENT_WEBSOCKET_ENABLED = "false";
};
description = "Environment variables for the publisher services.";
};
observability = {
otelTracesEndpoint = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
example = "http://127.0.0.1:4320/v1/traces";
description = "Optional OTLP/HTTP trace endpoint for ec-node Rust tracing spans.";
};
serviceName = lib.mkOption {
type = lib.types.str;
default = "ec-node";
description = "OpenTelemetry service.name for ec-node processes.";
};
};
hdhomerun = {
host = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "HDHomeRun host/IP. When set, inputs are built as http://<host>/auto/v<channel>.";
};
deviceId = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "HDHomeRun device id (used as http://<deviceId>.local when host is unset).";
};
autoDiscover = lib.mkOption {
type = lib.types.bool;
default = false;
description = "If true and host is unset, attempt LAN discovery for the device id (best-effort).";
};
};
control = {
enable = lib.mkOption {
type = lib.types.bool;
default = false;
description = "Enable iroh-gossip control announcements from each wt-publish service.";
};
ttlMs = lib.mkOption {
type = lib.types.ints.positive;
default = 15000;
description = "Control announcement TTL passed to `ec-node wt-publish --control-ttl-ms`.";
};
intervalMs = lib.mkOption {
type = lib.types.ints.positive;
default = 5000;
description = "Control announcement interval passed to `ec-node wt-publish --control-interval-ms`.";
};
discovery = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
example = "dht,mdns,dns";
description = "Optional iroh discovery mode list for control announcements.";
};
irohSecret = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional iroh secret key (hex) for control announcement identity.";
};
gossipPeers = lib.mkOption {
type = lib.types.listOf lib.types.str;
default = [ ];
description = "Optional iroh endpoint addresses to seed control gossip joins.";
};
bridgeWeb = {
enable = lib.mkOption {
type = lib.types.bool;
default = false;
description = "Run `ec-node control-bridge-web` to upsert announced streams into the web directory.";
};
directoryUrl = lib.mkOption {
type = lib.types.str;
default = "https://every.channel";
description = "Directory base URL used by control-bridge-web for `/api/stream-upsert`.";
};
authTokenFile = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional file containing bearer token for `/api/stream-upsert`.";
};
timeoutMs = lib.mkOption {
type = lib.types.ints.u32;
default = 30000;
description = "Bridge run timeout; service restarts and refreshes gossip peers after this window (0 = run forever).";
};
maxAgeMs = lib.mkOption {
type = lib.types.ints.positive;
default = 60000;
description = "Maximum accepted staleness for control announcements consumed by the web bridge.";
};
streamPrefix = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional stream-id prefix filter for web upserts.";
};
discovery = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
example = "dht,mdns,dns";
description = "Optional discovery mode override for the web bridge; falls back to `control.discovery` when unset.";
};
};
};
archive = {
enable = lib.mkOption {
type = lib.types.bool;
default = false;
description = "Run relay archival workers that subscribe and persist streams into CAS storage.";
};
directoryUrl = lib.mkOption {
type = lib.types.str;
default = "https://every.channel";
description = "Directory base URL used to discover public streams (`/api/public-streams`).";
};
outputDir = lib.mkOption {
type = lib.types.str;
default = "/srv/every-channel/archive";
description = "CAS object root passed to `ec-node wt-archive --output-dir`.";
};
manifestDir = lib.mkOption {
type = lib.types.str;
default = "/srv/every-channel/archive/manifests";
description = "Manifest/index root passed to `ec-node wt-archive --manifest-dir`.";
};
pollIntervalMs = lib.mkOption {
type = lib.types.ints.positive;
default = 15000;
description = "Discovery poll interval for public streams.";
};
relayUrlOverride = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional fixed relay URL override for archive workers (bypasses per-stream relay_url from directory entries).";
};
streamPrefix = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional broadcast-name prefix filter for archival workers.";
};
tracks = lib.mkOption {
type = lib.types.listOf lib.types.str;
default = [
"catalog.json"
"catalog"
"init.mp4"
"0.m4s"
"1.m4s"
];
description = "Tracks passed to each `wt-archive` worker.";
};
tlsDisableVerify = lib.mkOption {
type = lib.types.bool;
default = false;
description = "Danger: disable TLS verification for relay archive subscribers.";
};
serve = {
enable = lib.mkOption {
type = lib.types.bool;
default = false;
description = "Run `ec-node wt-archive-serve` HTTP endpoints for archived replay/scrubbing.";
};
listen = lib.mkOption {
type = lib.types.str;
default = "0.0.0.0:7788";
description = "Listen address passed to `ec-node wt-archive-serve --listen`.";
};
};
convergence = {
proofs = lib.mkOption {
type = lib.types.listOf (lib.types.submodule {
options = {
name = lib.mkOption {
type = lib.types.str;
description = "Short unique name for this archive convergence proof service.";
};
sources = lib.mkOption {
type = lib.types.listOf lib.types.str;
default = [ ];
example = [
"nuc-a=/srv/every-channel/archive-buffer/manifests"
"nuc-b=/srv/every-channel/archive-peer/manifests"
];
description = "Archive source entries passed as repeated `ec-node archive-convergence-serve --source NAME=PATH` arguments.";
};
broadcast = lib.mkOption {
type = lib.types.str;
description = "Broadcast name passed to `archive-convergence-serve --broadcast`.";
};
track = lib.mkOption {
type = lib.types.str;
default = "publisher.m4s";
description = "Archive track passed to `archive-convergence-serve --track`.";
};
streamId = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional stream id filter passed to `archive-convergence-serve --stream-id`.";
};
rendition = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional rendition filter passed to `archive-convergence-serve --rendition`.";
};
startSequence = lib.mkOption {
type = lib.types.nullOr lib.types.int;
default = null;
description = "Optional start sequence passed to `archive-convergence-serve --start-sequence`.";
};
endSequence = lib.mkOption {
type = lib.types.nullOr lib.types.int;
default = null;
description = "Optional end sequence passed to `archive-convergence-serve --end-sequence`.";
};
listen = lib.mkOption {
type = lib.types.str;
default = "127.0.0.1:7812";
description = "Listen address for this proof service's `/health` and `/metrics` endpoints.";
};
metricsNode = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional node label override for emitted Prometheus metrics.";
};
metricsRole = lib.mkOption {
type = lib.types.str;
default = "duplicate-proof";
description = "Role label for emitted Prometheus metrics.";
};
};
});
default = [ ];
description = "Named Rust archive convergence proof services exposed as Prometheus scrape targets.";
};
remoteProofs = lib.mkOption {
type = lib.types.listOf (lib.types.submodule {
options = {
name = lib.mkOption {
type = lib.types.str;
description = "Short unique name for this remote archive convergence measurement service.";
};
agentManifests = lib.mkOption {
type = lib.types.listOf lib.types.str;
default = [ ];
example = [
"nuc-a=http://ec-publisher-a:7799"
"nuc-b=http://ec-publisher-b:7799"
];
description = "Node-agent base URLs passed as repeated `ec-node archive-convergence-measure-serve --agent-manifest NAME=URL` arguments.";
};
agentPrometheusSdFiles = lib.mkOption {
type = lib.types.listOf lib.types.str;
default = [ ];
example = [ "/var/lib/prometheus/every-channel-node-agents.json" ];
description = "Prometheus file-SD JSON files passed as repeated `ec-node archive-convergence-measure-serve --agent-prometheus-sd PATH` arguments.";
};
agentPrometheusSdLabels = lib.mkOption {
type = lib.types.listOf lib.types.str;
default = [ ];
example = [ "headscale_user=node" ];
description = "Optional label filters passed as repeated `ec-node archive-convergence-measure-serve --agent-prometheus-sd-label KEY=VALUE` arguments.";
};
agentManifestRole = lib.mkOption {
type = lib.types.str;
default = "publisher-buffer";
description = "Role query parameter for node-agent `/v1/archive-manifest` samples.";
};
manifests = lib.mkOption {
type = lib.types.listOf lib.types.str;
default = [ ];
example = [
"nuc-a=https://publisher-a.example/manifests/la-kcop/publisher.m4s.jsonl"
"nuc-b=https://publisher-b.example/manifests/la-kcop/publisher.m4s.jsonl"
];
description = "Direct manifest JSONL URLs passed as repeated `ec-node archive-convergence-measure-serve --manifest NAME=URL` arguments.";
};
broadcast = lib.mkOption {
type = lib.types.str;
description = "Broadcast name passed to `archive-convergence-measure-serve --broadcast`.";
};
track = lib.mkOption {
type = lib.types.str;
default = "publisher.m4s";
description = "Archive track passed to `archive-convergence-measure-serve --track`.";
};
streamId = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional stream id filter passed to `archive-convergence-measure-serve --stream-id`.";
};
rendition = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional rendition filter passed to `archive-convergence-measure-serve --rendition`.";
};
startSequence = lib.mkOption {
type = lib.types.nullOr lib.types.int;
default = null;
description = "Optional start sequence passed to `archive-convergence-measure-serve --start-sequence`.";
};
endSequence = lib.mkOption {
type = lib.types.nullOr lib.types.int;
default = null;
description = "Optional end sequence passed to `archive-convergence-measure-serve --end-sequence`.";
};
prometheusUrl = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional Prometheus URL queried by the remote proof service for Grafana-facing duplicate/miss series.";
};
timeoutMs = lib.mkOption {
type = lib.types.int;
default = 10000;
description = "HTTP timeout for each sampled manifest or Prometheus query.";
};
maxManifestBytes = lib.mkOption {
type = lib.types.int;
default = 4 * 1024 * 1024;
description = "Maximum manifest bytes fetched per remote source sample.";
};
listen = lib.mkOption {
type = lib.types.str;
default = "127.0.0.1:7813";
description = "Listen address for this remote proof service's `/health` and `/metrics` endpoints.";
};
maxSamples = lib.mkOption {
type = lib.types.int;
default = 16;
description = "Maximum recent scrape samples retained for elapsed duplicate convergence proof.";
};
minElapsedSeconds = lib.mkOption {
type = lib.types.int;
default = 30;
description = "Minimum elapsed sample window required before this remote proof reports ok.";
};
metricsNode = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional node label override for emitted Prometheus metrics.";
};
metricsRole = lib.mkOption {
type = lib.types.str;
default = "duplicate-proof";
description = "Role label for emitted Prometheus metrics.";
};
};
});
default = [ ];
description = "Named Rust remote archive convergence measurement services exposed as Prometheus scrape targets.";
};
};
};
nbc = {
enable = lib.mkOption {
type = lib.types.bool;
default = false;
description = "Enable Linux Chrome + virtual-display support for NBC browser-backed broadcasts.";
};
chromeBinary = lib.mkOption {
type = lib.types.str;
default = "/run/current-system/sw/bin/google-chrome-stable";
description = "Chrome binary used by `ec-node nbc-bootstrap` and `ec-node nbc-wt-publish`.";
};
profileDir = lib.mkOption {
type = lib.types.str;
default = "/var/lib/every-channel/nbc-profile";
description = "Persistent Chrome profile directory used for NBC / Adobe auth sessions.";
};
authScreenshotDir = lib.mkOption {
type = lib.types.str;
default = "/var/lib/every-channel/nbc-auth";
description = "Directory for operator-facing screenshots when bootstrap hits an interactive auth page.";
};
display = lib.mkOption {
type = lib.types.str;
default = ":99";
description = "DISPLAY used by the NBC virtual display session.";
};
screen = lib.mkOption {
type = lib.types.str;
default = "1920x1080x24";
description = "Xvfb screen geometry for the NBC virtual display.";
};
noSandbox = lib.mkOption {
type = lib.types.bool;
default = true;
description = "Pass `EVERY_CHANNEL_NBC_NO_SANDBOX=1` for Chrome worker sessions.";
};
mvpdProvider = lib.mkOption {
type = lib.types.str;
default = "Verizon Fios";
description = "MVPD provider name used when the NBC worker must choose a TV provider.";
};
mvpdUsernameFile = lib.mkOption {
type = lib.types.nullOr lib.types.path;
default = null;
description = "Optional root-managed file containing the MVPD username for unattended NBC login.";
};
mvpdPasswordFile = lib.mkOption {
type = lib.types.nullOr lib.types.path;
default = null;
description = "Optional root-managed file containing the MVPD password for unattended NBC login.";
};
isolateWithUserNetns = lib.mkOption {
type = lib.types.bool;
default = false;
description = ''
Launch NBC browser-backed workers inside a rootless user+network namespace backed by
slirp4netns. This keeps the Chrome / ec-node process tree in its own network context
while still using the host's active upstream route.
'';
};
requireMullvad = lib.mkOption {
type = lib.types.bool;
default = false;
description = ''
Refuse to start NBC browser-backed workers until `mullvad status` reports a connected
tunnel. This assumes the host Mullvad daemon is already logged in and connected.
'';
};
mullvadWaitSeconds = lib.mkOption {
type = lib.types.ints.positive;
default = 90;
description = "Maximum time to wait for Mullvad connectivity before failing an NBC worker start.";
};
mullvadLocation = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
example = "USA";
description = ''
Optional case-insensitive substring that must appear in `mullvad status` before an NBC
worker starts. Use this to pin workers to a country or city family without committing the
operational login material itself.
'';
};
vnc = {
enable = lib.mkOption {
type = lib.types.bool;
default = true;
description = "Expose the NBC virtual display over VNC so auth can be completed remotely when needed.";
};
listen = lib.mkOption {
type = lib.types.str;
default = "127.0.0.1";
description = "x11vnc listen address for the NBC virtual display.";
};
port = lib.mkOption {
type = lib.types.port;
default = 5900;
description = "x11vnc TCP port for the NBC virtual display.";
};
};
};
broadcasts = lib.mkOption {
type = lib.types.listOf (lib.types.submodule {
options = {
name = lib.mkOption {
type = lib.types.str;
description = "Broadcast name published to the relay (used in watch links).";
};
channel = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "HDHomeRun guide number (e.g. 4.1, 8.1). Required unless `input` is set.";
};
input = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional explicit ffmpeg input URL/file. When set, HDHomeRun settings are ignored for this broadcast.";
};
nbcUrl = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional NBC watch/live URL for a browser-backed relay publish worker.";
};
ntscRsPreset = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional ntsc-rs preset JSON rendered before wt-publish ffmpeg ingest.";
};
ntscRsOutput = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional ntsc-rs processed output path. Defaults to a private /tmp path for this service.";
};
ntscRsCli = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional per-broadcast ntsc-rs-cli binary path.";
};
};
});
default = [ ];
description = "List of broadcasts (HDHomeRun, explicit input, or NBC browser-backed URL) to publish.";
};
};
config = lib.mkIf cfg.enable {
assertions = [
{
assertion =
let
needsHdhr = builtins.any (b: b.input == null && b.nbcUrl == null) cfg.broadcasts;
in
(!needsHdhr) || (cfg.hdhomerun.host != null) || (cfg.hdhomerun.deviceId != null);
message = "Set services.every-channel.ec-node.hdhomerun.host or .deviceId (required when any broadcast omits `input`)";
}
{
assertion = !(cfg.hdhomerun.autoDiscover && cfg.hdhomerun.host != null);
message = "hdhomerun.autoDiscover only applies when hdhomerun.host is unset";
}
{
assertion = cfg.publisherArchiveSegmentDurationMs > 0 && cfg.publisherStartBoundaryMs >= 0;
message = "services.every-channel.ec-node publisher proof cadence values must be usable.";
}
{
assertion =
builtins.all
(b:
(lib.length (lib.filter (value: value != null) [ b.input b.channel b.nbcUrl ])) == 1)
cfg.broadcasts;
message = "Each broadcast must set exactly one of `input`, `channel`, or `nbcUrl`";
}
{
assertion =
let
hasNbcBroadcast = builtins.any (b: b.nbcUrl != null) cfg.broadcasts;
in
(!hasNbcBroadcast) || cfg.nbc.enable;
message = "Set services.every-channel.ec-node.nbc.enable = true before configuring `broadcasts.*.nbcUrl`";
}
{
assertion = !cfg.archive.enable || !(sameOrUnder "/var/lib/every-channel" cfg.archive.outputDir);
message = "services.every-channel.ec-node.archive.outputDir must not live under /var/lib/every-channel; put archive video CAS on /srv/every-channel, /tank/every-channel, or another data filesystem.";
}
{
assertion =
builtins.all
(proof: lib.length proof.sources >= 2)
cfg.archive.convergence.proofs;
message = "Each services.every-channel.ec-node.archive.convergence.proofs entry must set at least two `sources` values.";
}
{
assertion =
let
proofNames =
(map (proof: proof.name) cfg.archive.convergence.proofs)
++ (map (proof: proof.name) cfg.archive.convergence.remoteProofs);
in
lib.length proofNames == lib.length (lib.unique proofNames);
message = "services.every-channel.ec-node.archive.convergence proof names must be unique.";
}
{
assertion =
let
proofListens =
(map (proof: proof.listen) cfg.archive.convergence.proofs)
++ (map (proof: proof.listen) cfg.archive.convergence.remoteProofs);
in
lib.length proofListens == lib.length (lib.unique proofListens);
message = "services.every-channel.ec-node.archive.convergence proof listen addresses must be unique.";
}
{
assertion =
builtins.all
(proof:
(proof.startSequence == null || proof.startSequence >= 0)
&& (proof.endSequence == null || proof.endSequence >= 0))
cfg.archive.convergence.proofs;
message = "services.every-channel.ec-node.archive.convergence.proofs sequence bounds must be non-negative.";
}
{
assertion =
builtins.all
(proof:
lib.length proof.agentManifests + lib.length proof.manifests >= 2
|| lib.length proof.agentPrometheusSdFiles > 0)
cfg.archive.convergence.remoteProofs;
message = "Each services.every-channel.ec-node.archive.convergence.remoteProofs entry must set at least two static sources or at least one `agentPrometheusSdFiles` value.";
}
{
assertion =
builtins.all
(proof:
(proof.startSequence == null || proof.startSequence >= 0)
&& (proof.endSequence == null || proof.endSequence >= 0))
cfg.archive.convergence.remoteProofs;
message = "services.every-channel.ec-node.archive.convergence.remoteProofs sequence bounds must be non-negative.";
}
{
assertion =
builtins.all
(proof:
proof.timeoutMs > 0
&& proof.maxManifestBytes > 0
&& proof.maxSamples >= 2
&& proof.minElapsedSeconds >= 0)
cfg.archive.convergence.remoteProofs;
message = "services.every-channel.ec-node.archive.convergence.remoteProofs timeout, manifest byte limit, sample count, and elapsed window must be usable.";
}
];
systemd.tmpfiles.rules =
[
"d /run/every-channel 1777 root root - -"
]
++ lib.optionals cfg.nbc.enable [
"d /var/lib/every-channel 0750 every-channel every-channel - -"
"d ${cfg.nbc.profileDir} 0750 every-channel every-channel - -"
"d ${cfg.nbc.authScreenshotDir} 0750 every-channel every-channel - -"
]
++ lib.optionals cfg.archive.enable [
"d ${cfg.archive.outputDir} 0750 root root - -"
"d ${cfg.archive.manifestDir} 0750 root root - -"
]
++ lib.optionals cfg.publisherArchive.enable [
"d ${cfg.publisherArchive.outputDir} 0750 root root - -"
"d ${cfg.publisherArchive.outputDir}/objects 0750 root root - -"
"d ${cfg.publisherArchive.outputDir}/objects/blake3 0750 root root - -"
"d ${cfg.publisherArchive.manifestDir} 0750 root root - -"
];
users.groups.every-channel = lib.mkIf cfg.nbc.enable { };
users.users.every-channel = lib.mkIf cfg.nbc.enable {
isSystemUser = true;
group = "every-channel";
home = "/var/lib/every-channel";
createHome = true;
};
systemd.services =
lib.listToAttrs (map
(b:
let
unit = "every-channel-wt-publish-${sanitizeUnitName b.name}";
isNbc = b.nbcUrl != null;
ntscRsCli = if b.ntscRsCli != null then b.ntscRsCli else cfg.ntscRsCli;
ntscRsOutput =
if b.ntscRsOutput != null
then b.ntscRsOutput
else "/tmp/every-channel-ntsc-rs/${sanitizeUnitName b.name}.mp4";
runner = pkgs.writeShellApplication {
name = unit;
runtimeInputs =
[
pkgs.coreutils
pkgs.curl
pkgs.ffmpeg
pkgs.findutils
pkgs.gawk
pkgs.iproute2
nodeEntrypoint
]
++ lib.optionals (isNbc && cfg.nbc.requireMullvad) [ pkgs.mullvad-vpn ]
++ lib.optionals (isNbc && cfg.nbc.isolateWithUserNetns) [
pkgs.slirp4netns
pkgs.util-linux
]
++ lib.optionals cfg.hdhomerun.autoDiscover [ pkgs.jq cfg.discoveryPackage ];
text =
let
fixedHost = if cfg.hdhomerun.host != null then normalizeHost cfg.hdhomerun.host else "";
deviceId = cfg.hdhomerun.deviceId or "";
extraArgsLine =
if cfg.extraArgs == [ ] then
""
else
"cmd+=(${lib.concatStringsSep " " (map lib.escapeShellArg cfg.extraArgs)})";
explicitInputStr = if b.input == null then "" else b.input;
channelStr = if b.channel == null then "" else b.channel;
nbcUrlStr = if b.nbcUrl == null then "" else b.nbcUrl;
controlEndpointOutPath = "/run/every-channel/control-peer-${sanitizeUnitName b.name}.json";
controlDiscoveryStr = if cfg.control.discovery == null then "" else cfg.control.discovery;
controlIrohSecretStr = if cfg.control.irohSecret == null then "" else cfg.control.irohSecret;
controlGossipPeerLines = lib.concatMapStrings (peer: "cmd+=(--gossip-peer ${lib.escapeShellArg peer})\n") cfg.control.gossipPeers;
nbcMullvadLocationStr = if cfg.nbc.mullvadLocation == null then "" else cfg.nbc.mullvadLocation;
in
''
set -euo pipefail
wait_for_mullvad() {
local wait_seconds status expected
wait_seconds=${toString cfg.nbc.mullvadWaitSeconds}
expected=${lib.escapeShellArg nbcMullvadLocationStr}
for _ in $(seq 1 "$wait_seconds"); do
status="$(mullvad status 2>/dev/null || true)"
if [[ "$status" == Connected* ]]; then
if [[ -z "$expected" ]] || printf '%s\n' "$status" | grep -Fqi -- "$expected"; then
return 0
fi
fi
sleep 1
done
echo "ec-node: Mullvad was not connected${lib.optionalString (cfg.nbc.mullvadLocation != null) " to the expected location"} within ${toString cfg.nbc.mullvadWaitSeconds}s" >&2
mullvad status >&2 || true
return 1
}
run_in_user_netns() {
local tmpdir pid_file ready_fifo ns_pid slirp_pid status
tmpdir="$(mktemp -d /tmp/${unit}.usernet.XXXXXX)"
pid_file="$tmpdir/pid"
ready_fifo="$tmpdir/ready"
mkfifo "$ready_fifo"
# shellcheck disable=SC2016
unshare --user --map-root-user --net ${pkgs.bash}/bin/bash -lc '
set -euo pipefail
ip link set lo up
echo $$ > "$1"
read -r _ < "$2"
shift 2
exec "$@"
' bash "$pid_file" "$ready_fifo" "''${cmd[@]}" &
ns_pid=$!
for _ in $(seq 1 50); do
[[ -s "$pid_file" ]] && break
sleep 0.1
done
if [[ ! -s "$pid_file" ]]; then
echo "ec-node: timed out waiting for NBC user-netns PID" >&2
kill "$ns_pid" 2>/dev/null || true
rm -rf "$tmpdir"
return 1
fi
slirp4netns --configure --mtu=1500 "$(cat "$pid_file")" tap0 >"$tmpdir/slirp.log" 2>&1 &
slirp_pid=$!
sleep 1
printf 'go\n' > "$ready_fifo"
set +e
wait "$ns_pid"
status=$?
kill "$slirp_pid" 2>/dev/null || true
wait "$slirp_pid" 2>/dev/null || true
rm -rf "$tmpdir"
return "$status"
}
nbc_url=${lib.escapeShellArg nbcUrlStr}
input=""
if [[ -z "$nbc_url" ]]; then
explicit_input=${lib.escapeShellArg explicitInputStr}
if [[ -n "$explicit_input" ]]; then
input="$explicit_input"
else
ch=${lib.escapeShellArg channelStr}
if [[ -z "$ch" ]]; then
echo "ec-node: broadcast missing input, channel, and nbcUrl" >&2
exit 2
fi
# Note: don't wrap lib.escapeShellArg in double-quotes, otherwise empty strings
# become a literal two-quote token and break discovery.
base=${lib.escapeShellArg fixedHost}
if [[ -z "$base" ]]; then
dev_id=${lib.escapeShellArg deviceId}
if [[ -z "$dev_id" ]]; then
echo "ec-node: missing hdhomerun.host and hdhomerun.deviceId" >&2
exit 2
fi
try_ip() {
local ip="$1"
local json id base_url
json="$(curl -fsS --connect-timeout 0.10 --max-time 0.20 "http://$ip/discover.json" 2>/dev/null || true)"
if [[ -z "$json" ]]; then
return 1
fi
id="$(printf '%s' "$json" | jq -r '.DeviceID // empty' 2>/dev/null || true)"
if [[ "$id" != "$dev_id" ]]; then
return 1
fi
base_url="$(printf '%s' "$json" | jq -r '.BaseURL // empty' 2>/dev/null || true)"
if [[ -z "$base_url" ]]; then
base_url="http://$ip"
fi
printf '%s\n' "$base_url"
return 0
}
if ${lib.boolToString cfg.hdhomerun.autoDiscover}; then
base="$(${cfg.discoveryPackage}/bin/ec-cli discover | jq -r --arg id "$dev_id" '.[] | select(.id == $id) | .base_url // empty' | head -n1 || true)"
if [[ -z "$base" ]]; then
while read -r ip; do
found="$(try_ip "$ip" || true)"
if [[ -n "$found" ]]; then
base="$found"
break
fi
done < <(ip neigh | awk '{print $1}' | sort -u)
fi
if [[ -z "$base" ]]; then
while read -r cidr; do
ip_addr="''${cidr%/*}"
prefix="''${cidr#*/}"
if [[ "$prefix" != "24" ]]; then
continue
fi
net="''${ip_addr%.*}"
for i in $(seq 1 254); do
found="$(try_ip "$net.$i" || true)"
if [[ -n "$found" ]]; then
base="$found"
break
fi
done
if [[ -n "$base" ]]; then
break
fi
done < <(ip -o -4 addr show scope global | awk '{print $4}')
fi
if [[ -z "$base" ]]; then
echo "ec-node: HDHomeRun deviceId not found: $dev_id" >&2
exit 2
fi
else
base="http://$dev_id.local"
fi
fi
base="''${base%/}"
if [[ "$base" != http://* && "$base" != https://* ]]; then
base="http://$base"
fi
hostport="''${base#http://}"
hostport="''${hostport#https://}"
hostport="''${hostport%%/*}"
host="''${hostport%%:*}"
input="http://$host:5004/auto/v$ch"
fi
fi
if [[ -n "$nbc_url" ]]; then
cmd=(
${lib.escapeShellArg "${nodeEntrypoint}/bin/every-channel-node"}
nbc-wt-publish
--url ${lib.escapeShellArg cfg.relayUrl}
--name ${lib.escapeShellArg b.name}
--source-url "$nbc_url"
--gop-frames ${toString cfg.gopFrames}
)
else
cmd=(
${lib.escapeShellArg "${nodeEntrypoint}/bin/every-channel-node"}
wt-publish
--url ${lib.escapeShellArg cfg.relayUrl}
--name ${lib.escapeShellArg b.name}
--input "$input"
--video-filter ${lib.escapeShellArg cfg.videoFilter}
--gop-frames ${toString cfg.gopFrames}
--video-preset ${lib.escapeShellArg cfg.videoPreset}
--video-crf ${toString cfg.videoCrf}
--relay-announced-watchdog-ms ${toString cfg.relayAnnouncedWatchdogMs}
--relay-announced-watchdog-interval-ms ${toString cfg.relayAnnouncedWatchdogIntervalMs}
--publisher-archive-segment-duration-ms ${toString cfg.publisherArchiveSegmentDurationMs}
--publisher-start-boundary-ms ${toString cfg.publisherStartBoundaryMs}
)
${lib.optionalString cfg.publisherArchive.enable ''
cmd+=(--publisher-archive-output-dir ${lib.escapeShellArg cfg.publisherArchive.outputDir})
cmd+=(--publisher-archive-manifest-dir ${lib.escapeShellArg cfg.publisherArchive.manifestDir})
cmd+=(--publisher-archive-track ${lib.escapeShellArg cfg.publisherArchive.track})
''}
${lib.optionalString (cfg.publisherArchive.enable && cfg.publisherArchive.sourceNode != null) ''
cmd+=(--publisher-archive-source-node ${lib.escapeShellArg cfg.publisherArchive.sourceNode})
''}
${lib.optionalString (!cfg.transcode) "cmd+=(--transcode=false)"}
${lib.optionalString (b.ntscRsPreset != null) ''
cmd+=(--ntsc-rs-preset ${lib.escapeShellArg b.ntscRsPreset})
cmd+=(--ntsc-rs-output ${lib.escapeShellArg ntscRsOutput})
ntsc_rs_cli=${lib.escapeShellArg (if ntscRsCli != null then ntscRsCli else "")}
if [[ -n "$ntsc_rs_cli" ]]; then
cmd+=(--ntsc-rs-cli "$ntsc_rs_cli")
fi
''}
fi
${lib.optionalString cfg.passthrough "cmd+=(--passthrough=true)"}
${lib.optionalString cfg.tlsDisableVerify "cmd+=(--tls-disable-verify)"}
${lib.optionalString cfg.control.enable ''
cmd+=(--control-announce)
cmd+=(--control-ttl-ms ${toString cfg.control.ttlMs})
cmd+=(--control-interval-ms ${toString cfg.control.intervalMs})
control_discovery=${lib.escapeShellArg controlDiscoveryStr}
if [[ -n "$control_discovery" ]]; then
cmd+=(--discovery "$control_discovery")
fi
control_iroh_secret=${lib.escapeShellArg controlIrohSecretStr}
if [[ -n "$control_iroh_secret" ]]; then
cmd+=(--iroh-secret "$control_iroh_secret")
fi
cmd+=(--control-endpoint-addr-out ${lib.escapeShellArg controlEndpointOutPath})
${controlGossipPeerLines}
''}
${extraArgsLine}
# Keep the unit alive even if the relay is temporarily unreachable.
# This avoids `switch-to-configuration test` failing due to a unit that exits
# quickly during activation.
trap 'exit 0' INT TERM
is_nbc=${lib.boolToString isNbc}
failure_count=0
while true; do
${lib.optionalString (isNbc && cfg.nbc.requireMullvad) ''
if ! wait_for_mullvad; then
sleep 30
continue
fi
''}
started_at="$(date +%s)"
status=0
set +e
${lib.optionalString (isNbc && cfg.nbc.isolateWithUserNetns) "run_in_user_netns"}
${lib.optionalString (!isNbc || !cfg.nbc.isolateWithUserNetns) ''
"''${cmd[@]}"
''}
status=$?
set -e
elapsed=$(( $(date +%s) - started_at ))
if [[ "$status" -eq 0 ]]; then
failure_count=0
sleep_seconds=2
else
failure_count=$(( failure_count + 1 ))
if [[ "$is_nbc" == "true" ]]; then
sleep_seconds=300
if [[ "$failure_count" -gt 1 ]]; then
for _ in $(seq 2 "$failure_count"); do
sleep_seconds=$(( sleep_seconds * 2 ))
if [[ "$sleep_seconds" -ge 1800 ]]; then
sleep_seconds=1800
break
fi
done
fi
else
sleep_seconds=$(( 2 * failure_count ))
if [[ "$sleep_seconds" -gt 60 ]]; then
sleep_seconds=60
fi
fi
echo "ec-node: command exited with status $status after ''${elapsed}s; retrying in ''${sleep_seconds}s" >&2
fi
sleep "$sleep_seconds"
done
'';
};
in
{
name = unit;
value = {
description = "every.channel WebTransport publish (${b.name} -> ${cfg.relayUrl})";
wantedBy = [ "multi-user.target" ];
after =
[ "network-online.target" ]
++ lib.optionals isNbc [ "every-channel-nbc-display.service" ]
++ lib.optionals (isNbc && cfg.nbc.requireMullvad) [ "mullvad-daemon.service" ];
wants =
[ "network-online.target" ]
++ lib.optionals isNbc [ "every-channel-nbc-display.service" ]
++ lib.optionals (isNbc && cfg.nbc.requireMullvad) [ "mullvad-daemon.service" ];
# Keep the unit from entering "failed" due to rapid restarts (deploy-flake treats
# failed units during `switch-to-configuration test` as a deployment failure).
#
# If the relay is temporarily unreachable, we want systemd to keep retrying without
# tripping rate limits.
unitConfig = {
StartLimitIntervalSec = 0;
};
serviceConfig = {
Type = "simple";
ExecStart = "${runner}/bin/${unit}";
Restart = "always";
RestartSec = 2;
DynamicUser = !isNbc && !cfg.publisherArchive.enable;
User = lib.mkIf isNbc "every-channel";
Group = lib.mkIf isNbc "every-channel";
NoNewPrivileges = true;
PrivateTmp = !isNbc;
ProtectSystem = "strict";
ProtectHome = !isNbc;
ProtectKernelTunables = true;
ProtectKernelModules = true;
ProtectControlGroups = true;
LockPersonality = true;
MemoryDenyWriteExecute = !isNbc;
RestrictSUIDSGID = true;
RestrictRealtime = true;
SystemCallArchitectures = "native";
ReadWritePaths =
lib.optionals cfg.control.enable [ "/run/every-channel" ]
++ lib.optionals cfg.publisherArchive.enable [
cfg.publisherArchive.outputDir
cfg.publisherArchive.manifestDir
]
++ lib.optionals isNbc [ "/tmp" ]
++ lib.optionals isNbc [ cfg.nbc.profileDir cfg.nbc.authScreenshotDir ];
};
environment =
serviceEnvironment
// lib.optionalAttrs isNbc (
{
DISPLAY = cfg.nbc.display;
EVERY_CHANNEL_NBC_CHROME_PATH = cfg.nbc.chromeBinary;
EVERY_CHANNEL_NBC_MVPD_PROVIDER = cfg.nbc.mvpdProvider;
EVERY_CHANNEL_NBC_PROFILE_DIR = cfg.nbc.profileDir;
EVERY_CHANNEL_NBC_NO_SANDBOX = if cfg.nbc.noSandbox then "1" else "0";
HOME = "/var/lib/every-channel";
}
// lib.optionalAttrs (cfg.nbc.mvpdUsernameFile != null) {
EVERY_CHANNEL_NBC_MVPD_USERNAME_FILE = toString cfg.nbc.mvpdUsernameFile;
}
// lib.optionalAttrs (cfg.nbc.mvpdPasswordFile != null) {
EVERY_CHANNEL_NBC_MVPD_PASSWORD_FILE = toString cfg.nbc.mvpdPasswordFile;
}
);
};
})
cfg.broadcasts)
// lib.optionalAttrs (cfg.control.enable && cfg.control.bridgeWeb.enable)
(let
bridgeUnit = "every-channel-control-bridge-web";
bridgePeerFiles = map (b: "/run/every-channel/control-peer-${sanitizeUnitName b.name}.json") cfg.broadcasts;
bridgeDiscoveryStr =
if cfg.control.bridgeWeb.discovery != null then cfg.control.bridgeWeb.discovery
else if cfg.control.discovery != null then cfg.control.discovery
else "";
bridgeStreamPrefixStr = if cfg.control.bridgeWeb.streamPrefix == null then "" else cfg.control.bridgeWeb.streamPrefix;
bridgeAuthTokenFile = if cfg.control.bridgeWeb.authTokenFile == null then "" else cfg.control.bridgeWeb.authTokenFile;
staticGossipPeerLines = lib.concatMapStrings (peer: ''
cmd+=(--gossip-peer ${lib.escapeShellArg peer})
have_peer=1
'') cfg.control.gossipPeers;
peerFileLoopLines = lib.concatMapStrings (peerFile: ''
if [[ -s ${lib.escapeShellArg peerFile} ]]; then
peer="$(tr -d '\r\n' < ${lib.escapeShellArg peerFile})"
if [[ -n "$peer" ]]; then
cmd+=(--gossip-peer "$peer")
have_peer=1
fi
fi
'') bridgePeerFiles;
bridgeRunner = pkgs.writeShellApplication {
name = bridgeUnit;
runtimeInputs = [
pkgs.coreutils
nodeEntrypoint
];
text = ''
set -euo pipefail
directory_url=${lib.escapeShellArg cfg.control.bridgeWeb.directoryUrl}
bridge_discovery=${lib.escapeShellArg bridgeDiscoveryStr}
stream_prefix=${lib.escapeShellArg bridgeStreamPrefixStr}
auth_token_file=${lib.escapeShellArg bridgeAuthTokenFile}
while true; do
cmd=(
${lib.escapeShellArg "${nodeEntrypoint}/bin/every-channel-node"}
control-bridge-web
--directory-url "$directory_url"
--timeout-ms ${toString cfg.control.bridgeWeb.timeoutMs}
--max-age-ms ${toString cfg.control.bridgeWeb.maxAgeMs}
)
if [[ -n "$bridge_discovery" ]]; then
cmd+=(--discovery "$bridge_discovery")
fi
if [[ -n "$stream_prefix" ]]; then
cmd+=(--stream-prefix "$stream_prefix")
fi
if [[ -n "$auth_token_file" && -r "$auth_token_file" ]]; then
token="$(tr -d '\r\n' < "$auth_token_file")"
if [[ -n "$token" ]]; then
cmd+=(--auth-token "$token")
fi
fi
have_peer=0
${peerFileLoopLines}
${staticGossipPeerLines}
if [[ "$have_peer" -eq 0 ]]; then
sleep 2
continue
fi
"''${cmd[@]}" || true
sleep 2
done
'';
};
in
{
"${bridgeUnit}" = {
description = "every.channel control bridge to web directory";
wantedBy = [ "multi-user.target" ];
after =
[ "network-online.target" ]
++ map (b: "every-channel-wt-publish-${sanitizeUnitName b.name}.service") cfg.broadcasts;
wants =
[ "network-online.target" ]
++ map (b: "every-channel-wt-publish-${sanitizeUnitName b.name}.service") cfg.broadcasts;
unitConfig = {
StartLimitIntervalSec = 0;
};
serviceConfig = {
Type = "simple";
ExecStart = "${bridgeRunner}/bin/${bridgeUnit}";
Restart = "always";
RestartSec = 2;
DynamicUser = true;
NoNewPrivileges = true;
PrivateTmp = true;
ProtectSystem = "strict";
ProtectHome = true;
ProtectKernelTunables = true;
ProtectKernelModules = true;
ProtectControlGroups = true;
LockPersonality = true;
MemoryDenyWriteExecute = true;
RestrictSUIDSGID = true;
RestrictRealtime = true;
SystemCallArchitectures = "native";
};
environment = serviceEnvironment;
};
})
// lib.optionalAttrs cfg.archive.enable
(let
archiveUnit = "every-channel-wt-archive-auto";
archivePrefix = if cfg.archive.streamPrefix == null then "" else cfg.archive.streamPrefix;
archiveTrackLines = lib.concatMapStrings (track: " cmd+=(--track ${lib.escapeShellArg track})\n") cfg.archive.tracks;
archiveRunner = pkgs.writeShellApplication {
name = archiveUnit;
runtimeInputs = [
pkgs.coreutils
pkgs.curl
pkgs.gawk
pkgs.jq
nodeEntrypoint
];
text = ''
set -euo pipefail
directory_url=${lib.escapeShellArg (cfg.archive.directoryUrl + "/api/public-streams")}
output_dir=${lib.escapeShellArg cfg.archive.outputDir}
manifest_dir=${lib.escapeShellArg cfg.archive.manifestDir}
relay_fallback=${lib.escapeShellArg cfg.relayUrl}
relay_override=${lib.escapeShellArg (if cfg.archive.relayUrlOverride == null then "" else cfg.archive.relayUrlOverride)}
stream_prefix=${lib.escapeShellArg archivePrefix}
source_node="$(cat /proc/sys/kernel/hostname 2>/dev/null || echo unknown)"
state_dir="/run/every-channel/archive"
pids_dir="$state_dir/pids"
logs_dir="$state_dir/logs"
desired_file="$state_dir/desired"
mkdir -p "$pids_dir" "$logs_dir"
poll_secs="$(awk 'BEGIN { printf "%.3f", ${toString cfg.archive.pollIntervalMs} / 1000.0 }')"
relay_is_owned() {
local relay="$1"
local host
host="''${relay#https://}"
host="''${host%%/*}"
case "$host" in
relay.every.channel|lax.relay.every.channel|ord.relay.every.channel|nyc.relay.every.channel)
return 0
;;
esac
return 1
}
cleanup_children() {
for pid_file in "$pids_dir"/*.pid; do
[[ -e "$pid_file" ]] || continue
pid="$(cat "$pid_file" 2>/dev/null || true)"
if [[ -n "$pid" ]]; then
kill "$pid" 2>/dev/null || true
fi
rm -f "$pid_file"
done
}
trap cleanup_children INT TERM EXIT
while true; do
: > "$desired_file"
entries_json="$(curl -fsS "$directory_url" || true)"
if [[ -n "$entries_json" ]]; then
while IFS= read -r entry; do
name="$(printf '%s\n' "$entry" | jq -r '.broadcast_name // empty')"
relay="$(printf '%s\n' "$entry" | jq -r '.relay_url // empty')"
if [[ -z "$name" ]]; then
continue
fi
if [[ -n "$stream_prefix" && "$name" != "$stream_prefix"* ]]; then
continue
fi
if [[ -n "$relay_override" ]]; then
relay="$relay_override"
elif [[ -z "$relay" ]]; then
relay="$relay_fallback"
fi
if ! relay_is_owned "$relay"; then
echo "ec-node: refusing to archive non-owned relay URL for $name: $relay" >&2
continue
fi
key="$(printf '%s' "$name" | tr -c 'A-Za-z0-9_.-' '_')"
printf '%s\n' "$key" >> "$desired_file"
pid_file="$pids_dir/$key.pid"
if [[ -s "$pid_file" ]]; then
pid="$(cat "$pid_file" 2>/dev/null || true)"
if [[ -n "$pid" ]] && kill -0 "$pid" 2>/dev/null; then
continue
fi
fi
cmd=(
${lib.escapeShellArg "${nodeEntrypoint}/bin/every-channel-node"}
wt-archive
--url "$relay"
--name "$name"
--output-dir "$output_dir"
--manifest-dir "$manifest_dir"
--source-node "$source_node"
)
${lib.optionalString cfg.archive.tlsDisableVerify "cmd+=(--tls-disable-verify)"}
${archiveTrackLines}
log_file="$logs_dir/$key.log"
(
exec "''${cmd[@]}"
) >>"$log_file" 2>&1 &
echo "$!" > "$pid_file"
done < <(printf '%s\n' "$entries_json" | jq -rc '.entries[]?')
fi
for pid_file in "$pids_dir"/*.pid; do
[[ -e "$pid_file" ]] || continue
key="''${pid_file##*/}"
key="''${key%.pid}"
pid="$(cat "$pid_file" 2>/dev/null || true)"
if [[ -z "$pid" ]] || ! kill -0 "$pid" 2>/dev/null; then
rm -f "$pid_file"
continue
fi
if ! grep -Fxq "$key" "$desired_file"; then
echo "ec-node: stopping archive worker for no-longer-advertised stream $key" >&2
kill "$pid" 2>/dev/null || true
rm -f "$pid_file"
fi
done
sleep "$poll_secs"
done
'';
};
in
{
"${archiveUnit}" = {
description = "every.channel relay archival auto-worker";
wantedBy = [ "multi-user.target" ];
after = [ "network-online.target" ];
wants = [ "network-online.target" ];
unitConfig = {
StartLimitIntervalSec = 0;
};
serviceConfig = {
Type = "simple";
ExecStart = "${archiveRunner}/bin/${archiveUnit}";
Restart = "always";
RestartSec = 2;
NoNewPrivileges = true;
PrivateTmp = true;
ProtectSystem = "strict";
ProtectHome = true;
ProtectKernelTunables = true;
ProtectKernelModules = true;
ProtectControlGroups = true;
LockPersonality = true;
MemoryDenyWriteExecute = true;
RestrictSUIDSGID = true;
RestrictRealtime = true;
SystemCallArchitectures = "native";
ReadWritePaths = [
"/run/every-channel"
cfg.archive.outputDir
cfg.archive.manifestDir
];
};
environment = serviceEnvironment;
};
})
// lib.optionalAttrs (cfg.archive.enable && cfg.archive.serve.enable)
(let
archiveServeUnit = "every-channel-wt-archive-serve";
archiveServeRunner = pkgs.writeShellApplication {
name = archiveServeUnit;
runtimeInputs = [
nodeEntrypoint
];
text = ''
set -euo pipefail
exec ${lib.escapeShellArg "${nodeEntrypoint}/bin/every-channel-node"} \
wt-archive-serve \
--output-dir ${lib.escapeShellArg cfg.archive.outputDir} \
--manifest-dir ${lib.escapeShellArg cfg.archive.manifestDir} \
--listen ${lib.escapeShellArg cfg.archive.serve.listen}
'';
};
in
{
"${archiveServeUnit}" = {
description = "every.channel archived replay HTTP server";
wantedBy = [ "multi-user.target" ];
after = [ "network-online.target" ];
wants = [ "network-online.target" ];
unitConfig = {
StartLimitIntervalSec = 0;
};
serviceConfig = {
Type = "simple";
ExecStart = "${archiveServeRunner}/bin/${archiveServeUnit}";
Restart = "always";
RestartSec = 2;
NoNewPrivileges = true;
PrivateTmp = true;
ProtectSystem = "strict";
ProtectHome = true;
ProtectKernelTunables = true;
ProtectKernelModules = true;
ProtectControlGroups = true;
LockPersonality = true;
MemoryDenyWriteExecute = true;
RestrictSUIDSGID = true;
RestrictRealtime = true;
SystemCallArchitectures = "native";
ReadWritePaths = [
cfg.archive.outputDir
cfg.archive.manifestDir
];
};
environment = serviceEnvironment;
};
})
// lib.listToAttrs (map mkArchiveConvergenceProofService cfg.archive.convergence.proofs)
// lib.listToAttrs (map mkArchiveConvergenceMeasureProofService cfg.archive.convergence.remoteProofs)
// lib.optionalAttrs cfg.nbc.enable
(let
displayUnit = "every-channel-nbc-display";
displayNumber = lib.strings.removePrefix ":" cfg.nbc.display;
displayRunner = pkgs.writeShellApplication {
name = displayUnit;
runtimeInputs = [
pkgs.coreutils
pkgs.xorg.xorgserver
];
text = ''
set -euo pipefail
exec ${pkgs.xorg.xorgserver}/bin/Xvfb ${lib.escapeShellArg cfg.nbc.display} \
-screen 0 ${lib.escapeShellArg cfg.nbc.screen} \
-nolisten tcp \
-ac \
+extension RANDR
'';
};
vncUnit = "every-channel-nbc-vnc";
vncRunner = pkgs.writeShellApplication {
name = vncUnit;
runtimeInputs = [
pkgs.x11vnc
];
text = ''
set -euo pipefail
exec ${pkgs.x11vnc}/bin/x11vnc \
-display ${lib.escapeShellArg cfg.nbc.display} \
-forever \
-shared \
-nopw \
-listen ${lib.escapeShellArg cfg.nbc.vnc.listen} \
-rfbport ${toString cfg.nbc.vnc.port}
'';
};
in
({
"${displayUnit}" = {
description = "every.channel NBC virtual display";
wantedBy = [ "multi-user.target" ];
after = [ "network-online.target" ];
wants = [ "network-online.target" ];
serviceConfig = {
Type = "simple";
ExecStart = "${displayRunner}/bin/${displayUnit}";
Restart = "always";
RestartSec = 2;
User = "every-channel";
Group = "every-channel";
NoNewPrivileges = true;
PrivateTmp = false;
ProtectSystem = "strict";
ProtectHome = false;
ProtectKernelTunables = true;
ProtectKernelModules = true;
ProtectControlGroups = true;
LockPersonality = true;
MemoryDenyWriteExecute = false;
RestrictSUIDSGID = true;
RestrictRealtime = true;
SystemCallArchitectures = "native";
ReadWritePaths = [ "/tmp" "/var/lib/every-channel" ];
};
environment = serviceEnvironment // {
HOME = "/var/lib/every-channel";
};
};
}
// lib.optionalAttrs cfg.nbc.vnc.enable {
"${vncUnit}" = {
description = "every.channel NBC virtual display VNC bridge";
wantedBy = [ "multi-user.target" ];
after = [ "network-online.target" "${displayUnit}.service" ];
wants = [ "network-online.target" "${displayUnit}.service" ];
serviceConfig = {
Type = "simple";
ExecStart = "${vncRunner}/bin/${vncUnit}";
Restart = "always";
RestartSec = 2;
User = "every-channel";
Group = "every-channel";
NoNewPrivileges = true;
PrivateTmp = false;
ProtectSystem = "strict";
ProtectHome = false;
ProtectKernelTunables = true;
ProtectKernelModules = true;
ProtectControlGroups = true;
LockPersonality = true;
MemoryDenyWriteExecute = false;
RestrictSUIDSGID = true;
RestrictRealtime = true;
SystemCallArchitectures = "native";
ReadWritePaths = [ "/tmp" "/var/lib/every-channel" ];
};
environment = serviceEnvironment // {
DISPLAY = cfg.nbc.display;
HOME = "/var/lib/every-channel";
};
};
}));
};
}