diff --git a/README.md b/README.md index 2e83cdd..d4b3b94 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,16 @@ Watch (web): https://every.channel/watch?url=https%3A%2F%2Fcdn.moq.dev%2Fanon&name=la-nbc ``` +Archive (relay -> CAS objects + JSONL manifests): + +```sh +cargo run -p ec-node -- wt-archive \ + --url https://cdn.moq.dev/anon \ + --name la-nbc \ + --output-dir /tank/every-channel/archive \ + --manifest-dir /var/lib/every-channel/manifests +``` + Control protocol (iroh gossip, relay + direct transport discovery): ```sh diff --git a/crates/ec-node/src/main.rs b/crates/ec-node/src/main.rs index 4d2e9ae..a629a5d 100644 --- a/crates/ec-node/src/main.rs +++ b/crates/ec-node/src/main.rs @@ -32,7 +32,7 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::fs; use std::fs::File; use std::future::Future; -use std::io::Read; +use std::io::{Read, Write}; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use std::str::FromStr; @@ -48,6 +48,7 @@ const DIRECT_WIRE_TAG_PING: u8 = 0x02; const DIRECT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(8); // Conservatively under typical SCTP data channel max message sizes. const DIRECT_WIRE_CHUNK_BYTES: usize = 16 * 1024; +const WT_ARCHIVE_DEFAULT_TRACKS: &[&str] = &["catalog.json", "init.mp4", "video0.m4s", "audio0.m4s"]; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::RwLock; @@ -80,6 +81,8 @@ enum Commands { WsSubscribe(WsSubscribeArgs), /// Publish a CMAF (fMP4) stream to a MoQ relay over WebTransport (Cloudflare preview by default). WtPublish(WtPublishArgs), + /// Subscribe to a relay broadcast over WebTransport/MoQ and archive groups into CAS. + WtArchive(WtArchiveArgs), /// Announce stream transport availability over iroh gossip control topic. ControlAnnounce(ControlAnnounceArgs), /// Listen for stream transport announcements from iroh gossip control topic. @@ -456,6 +459,30 @@ struct WtPublishArgs { control_endpoint_addr_out: Option, } +#[derive(Parser, Debug)] +struct WtArchiveArgs { + /// Relay URL (WebTransport) to connect to. + #[arg(long, default_value = "https://cdn.moq.dev/anon")] + url: String, + /// Broadcast name to subscribe to. + #[arg(long)] + name: String, + /// Output directory for CAS objects. + #[arg(long, default_value = "./tmp/wt-archive")] + output_dir: PathBuf, + /// Optional manifest/index output directory. + /// Defaults to `/manifests` when omitted. + #[arg(long)] + manifest_dir: Option, + /// Track names to archive (repeatable). + /// When omitted, defaults to catalog+init+primary audio/video tracks. + #[arg(long)] + track: Vec, + /// Danger: disable TLS verification for the relay. + #[arg(long, default_value_t = false)] + tls_disable_verify: bool, +} + #[derive(Parser, Debug)] struct ControlAnnounceArgs { /// Stable stream id to announce. @@ -666,6 +693,7 @@ fn main() -> Result<()> { Commands::WsPublish(args) => run_async(ws_publish(args))?, Commands::WsSubscribe(args) => run_async(ws_subscribe(args))?, Commands::WtPublish(args) => run_async(wt_publish(args))?, + Commands::WtArchive(args) => run_async(wt_archive(args))?, Commands::ControlAnnounce(args) => run_async(control_announce(args))?, Commands::ControlListen(args) => run_async(control_listen(args))?, Commands::ControlResolve(args) => run_async(control_resolve(args))?, @@ -4846,6 +4874,523 @@ fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> { )) } +#[derive(Debug, serde::Serialize)] +struct ArchiveIndexRecord { + received_unix_ms: u64, + relay_url: String, + broadcast_name: String, + track_name: String, + group_sequence: u64, + frame_count: u64, + size_bytes: usize, + blake3: String, + cas_path: String, +} + +fn sanitize_path_component(value: &str) -> String { + let mut out = String::with_capacity(value.len()); + for ch in value.chars() { + if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == '.' { + out.push(ch); + } else { + out.push('_'); + } + } + if out.is_empty() { + "_".to_string() + } else { + out + } +} + +fn default_wt_archive_tracks() -> Vec { + WT_ARCHIVE_DEFAULT_TRACKS + .iter() + .map(|value| (*value).to_string()) + .collect() +} + +fn cas_store_blob(cas_root: &Path, data: &[u8]) -> Result<(String, PathBuf, bool)> { + let hash = blake3::hash(data).to_hex().to_string(); + let shard = &hash[0..2]; + let rel_path = PathBuf::from("objects") + .join("blake3") + .join(shard) + .join(format!("{hash}.bin")); + let out_path = cas_root.join(shard).join(format!("{hash}.bin")); + + if out_path.exists() { + return Ok((hash, rel_path, false)); + } + + let parent = out_path + .parent() + .ok_or_else(|| anyhow!("invalid CAS path: {}", out_path.display()))?; + fs::create_dir_all(parent) + .with_context(|| format!("failed to create CAS shard dir {}", parent.display()))?; + + let tmp = parent.join(format!( + ".{}.tmp-{}-{}", + out_path + .file_name() + .and_then(|f| f.to_str()) + .unwrap_or("blob"), + std::process::id(), + SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos() + )); + fs::write(&tmp, data).with_context(|| format!("failed to write {}", tmp.display()))?; + match fs::rename(&tmp, &out_path) { + Ok(()) => {} + Err(err) if out_path.exists() => { + let _ = fs::remove_file(&tmp); + tracing::debug!( + path = %out_path.display(), + err = %err, + "CAS object already exists during rename race" + ); + return Ok((hash, rel_path, false)); + } + Err(err) => { + let _ = fs::remove_file(&tmp); + return Err(err).with_context(|| { + format!( + "failed to move CAS blob {} -> {}", + tmp.display(), + out_path.display() + ) + }); + } + } + + Ok((hash, rel_path, true)) +} + +fn append_archive_index_record(path: &Path, record: &ArchiveIndexRecord) -> Result<()> { + let parent = path + .parent() + .ok_or_else(|| anyhow!("invalid manifest path: {}", path.display()))?; + fs::create_dir_all(parent) + .with_context(|| format!("failed to create manifest dir {}", parent.display()))?; + + let mut file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(path) + .with_context(|| format!("failed to open manifest {}", path.display()))?; + serde_json::to_writer(&mut file, record) + .with_context(|| format!("failed to write manifest {}", path.display()))?; + file.write_all(b"\n") + .with_context(|| format!("failed to newline-terminate {}", path.display()))?; + Ok(()) +} + +async fn archive_track_loop( + mut track: moq_lite::TrackConsumer, + relay_url: String, + broadcast_name: String, + track_name: String, + cas_root: PathBuf, + manifest_path: PathBuf, +) -> Result<()> { + let mut archived_groups: u64 = 0; + loop { + let next_group = track.next_group().await; + let Some(mut group) = next_group.with_context(|| { + format!( + "track {} read failed for broadcast {}", + track_name, broadcast_name + ) + })? else { + return Err(anyhow!( + "track {} ended for broadcast {}", + track_name, + broadcast_name + )); + }; + + let group_sequence = group.info.sequence; + let mut frame_count: u64 = 0; + let mut bytes = Vec::new(); + loop { + match group.read_frame().await { + Ok(Some(frame)) => { + frame_count += 1; + bytes.extend_from_slice(&frame); + } + Ok(None) => break, + Err(err) => { + return Err(anyhow!( + "track {} group {} read frame failed: {err:#}", + track_name, + group_sequence + )); + } + } + } + + if bytes.is_empty() { + continue; + } + + let size_bytes = bytes.len(); + let (hash, rel_path, inserted) = cas_store_blob(&cas_root, &bytes)?; + let record = ArchiveIndexRecord { + received_unix_ms: now_unix_ms(), + relay_url: relay_url.clone(), + broadcast_name: broadcast_name.clone(), + track_name: track_name.clone(), + group_sequence, + frame_count, + size_bytes, + blake3: hash.clone(), + cas_path: rel_path.display().to_string(), + }; + append_archive_index_record(&manifest_path, &record)?; + archived_groups += 1; + + if archived_groups % 50 == 0 { + tracing::info!( + relay = %relay_url, + broadcast = %broadcast_name, + track = %track_name, + archived_groups, + inserted, + size_bytes, + hash = %hash, + "archived relay group" + ); + } + } +} + +async fn wt_archive(args: WtArchiveArgs) -> Result<()> { + let relay_url = + Url::parse(&args.url).with_context(|| format!("invalid relay url: {}", args.url))?; + let relay_url_str = relay_url.to_string(); + let mut tracks = if args.track.is_empty() { + default_wt_archive_tracks() + } else { + args.track.clone() + }; + tracks.sort(); + tracks.dedup(); + if tracks.is_empty() { + return Err(anyhow!("no tracks configured for wt-archive")); + } + + let cas_root = args.output_dir.join("objects").join("blake3"); + let manifest_root = args + .manifest_dir + .unwrap_or_else(|| args.output_dir.join("manifests")); + fs::create_dir_all(&cas_root) + .with_context(|| format!("failed to create CAS root {}", cas_root.display()))?; + fs::create_dir_all(&manifest_root) + .with_context(|| format!("failed to create manifest root {}", manifest_root.display()))?; + + let consume = moq_lite::Origin::produce(); + let mut consume_updates = consume.consume(); + + #[derive(Clone)] + struct ProtocolOverride { + inner: web_transport_quinn::Session, + protocol: Option, + } + + impl web_transport_trait::Session for ProtocolOverride { + type SendStream = ::SendStream; + type RecvStream = ::RecvStream; + type Error = ::Error; + + fn accept_bi( + &self, + ) -> impl Future> + + web_transport_trait::MaybeSend { + self.inner.accept_bi() + } + + fn accept_uni( + &self, + ) -> impl Future> + web_transport_trait::MaybeSend + { + self.inner.accept_uni() + } + + fn open_bi( + &self, + ) -> impl Future> + + web_transport_trait::MaybeSend { + self.inner.open_bi() + } + + fn open_uni( + &self, + ) -> impl Future> + web_transport_trait::MaybeSend + { + self.inner.open_uni() + } + + fn send_datagram(&self, payload: bytes::Bytes) -> Result<(), Self::Error> { + self.inner.send_datagram(payload) + } + + fn recv_datagram( + &self, + ) -> impl Future> + web_transport_trait::MaybeSend + { + self.inner.recv_datagram() + } + + fn max_datagram_size(&self) -> usize { + self.inner.max_datagram_size() + } + + fn protocol(&self) -> Option<&str> { + self.protocol.as_deref().or_else(|| self.inner.protocol()) + } + + fn close(&self, code: u32, reason: &str) { + web_transport_trait::Session::close(&self.inner, code, reason) + } + + fn closed(&self) -> impl Future + web_transport_trait::MaybeSend { + self.inner.closed() + } + } + + async fn connect_moq_session( + relay_url: &Url, + consume: moq_lite::OriginProducer, + tls_disable_verify: bool, + ) -> Result { + let host = relay_url + .host_str() + .ok_or_else(|| anyhow!("relay url missing host: {relay_url}"))? + .to_string(); + let port = relay_url.port().unwrap_or(443); + + let mut roots = rustls::RootCertStore::empty(); + let native = rustls_native_certs::load_native_certs(); + if !native.errors.is_empty() { + tracing::warn!( + errors = ?native.errors, + "some native root certs could not be loaded" + ); + } + for cert in native.certs { + let _ = roots.add(cert); + } + + let mut tls = rustls::ClientConfig::builder() + .with_root_certificates(roots) + .with_no_client_auth(); + + if tls_disable_verify { + #[derive(Debug)] + struct NoCertificateVerification(Arc); + + impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification { + fn verify_server_cert( + &self, + _end_entity: &rustls::pki_types::CertificateDer<'_>, + _intermediates: &[rustls::pki_types::CertificateDer<'_>], + _server_name: &rustls::pki_types::ServerName<'_>, + _ocsp: &[u8], + _now: rustls::pki_types::UnixTime, + ) -> Result + { + Ok(rustls::client::danger::ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + message: &[u8], + cert: &rustls::pki_types::CertificateDer<'_>, + dss: &rustls::DigitallySignedStruct, + ) -> Result + { + rustls::crypto::verify_tls12_signature( + message, + cert, + dss, + &self.0.signature_verification_algorithms, + ) + } + + fn verify_tls13_signature( + &self, + message: &[u8], + cert: &rustls::pki_types::CertificateDer<'_>, + dss: &rustls::DigitallySignedStruct, + ) -> Result + { + rustls::crypto::verify_tls13_signature( + message, + cert, + dss, + &self.0.signature_verification_algorithms, + ) + } + + fn supported_verify_schemes(&self) -> Vec { + self.0.signature_verification_algorithms.supported_schemes() + } + } + + let provider = rustls::crypto::CryptoProvider::get_default() + .cloned() + .unwrap_or_else(|| Arc::new(rustls::crypto::ring::default_provider())); + tls.dangerous() + .set_certificate_verifier(Arc::new(NoCertificateVerification(provider))); + } + + tls.alpn_protocols = vec![web_transport_quinn::ALPN.as_bytes().to_vec()]; + + let socket = std::net::UdpSocket::bind("[::]:0").context("failed to bind UDP socket")?; + let mut transport = quinn::TransportConfig::default(); + transport.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap())); + transport.keep_alive_interval(Some(Duration::from_secs(4))); + transport.mtu_discovery_config(None); + + let transport = Arc::new(transport); + let runtime = quinn::default_runtime().context("no async runtime")?; + let endpoint_config = quinn::EndpointConfig::default(); + let endpoint = quinn::Endpoint::new(endpoint_config, None, socket, runtime) + .context("failed to create QUIC endpoint")?; + + let ip = tokio::net::lookup_host((host.clone(), port)) + .await + .context("failed DNS lookup")? + .next() + .context("no DNS entries")?; + + let quic: quinn::crypto::rustls::QuicClientConfig = tls.try_into()?; + let mut client_cfg = quinn::ClientConfig::new(Arc::new(quic)); + client_cfg.transport_config(transport); + + tracing::debug!(%ip, %host, %relay_url, "connecting QUIC"); + let connection = endpoint + .connect_with(client_cfg, ip, &host)? + .await + .context("failed QUIC connect")?; + + let mut request = web_transport_quinn::proto::ConnectRequest::new(relay_url.clone()); + for alpn in moq_lite::ALPNS { + request = request.with_protocol(alpn.to_string()); + } + let wt = web_transport_quinn::Session::connect(connection, request) + .await + .context("failed WebTransport CONNECT")?; + + let client = moq_lite::Client::new().with_consume(consume); + match client.connect(wt.clone()).await { + Ok(session) => { + tracing::info!("connected to relay (native protocol negotiation)"); + return Ok(session); + } + Err(err) => { + tracing::debug!(err = %err, "native MoQ SETUP failed; trying protocol overrides"); + } + } + + let attempts: [&str; 4] = ["moqt-16", "moqt-15", "moq-00", ""]; + let mut last_err: Option = None; + for p in attempts { + let session = ProtocolOverride { + inner: wt.clone(), + protocol: (!p.is_empty()).then(|| p.to_string()), + }; + match client.connect(session).await { + Ok(session) => { + tracing::info!(protocol = %p, "connected to relay"); + return Ok(session); + } + Err(err) => { + last_err = Some(anyhow::Error::new(err)); + tracing::debug!( + protocol = %p, + err = %last_err.as_ref().unwrap(), + "MoQ SETUP failed; retrying" + ); + } + } + } + + Err(last_err.unwrap_or_else(|| anyhow!("failed to connect"))).context("failed MoQ SETUP") + } + + tracing::info!(url=%relay_url, name=%args.name, "connecting to relay for archival"); + let session = connect_moq_session(&relay_url, consume.clone(), args.tls_disable_verify).await?; + + let broadcast = if let Some(active) = consume.consume_broadcast(args.name.as_str()) { + active + } else { + tracing::info!(name=%args.name, "waiting for relay broadcast announcement"); + loop { + let Some((path, active)) = consume_updates.announced().await else { + return Err(anyhow!("relay announcement stream closed")); + }; + if path.to_string() != args.name { + continue; + } + if let Some(active) = active { + break active; + } + } + }; + + let broadcast_dir = sanitize_path_component(&args.name); + let mut tasks = tokio::task::JoinSet::new(); + for track_name in tracks { + let track = broadcast.subscribe_track(&moq_lite::Track::new(&track_name)); + let track_file = sanitize_path_component(&track_name); + let manifest_path = manifest_root + .join(&broadcast_dir) + .join(format!("{track_file}.jsonl")); + tasks.spawn(archive_track_loop( + track, + relay_url_str.clone(), + args.name.clone(), + track_name.clone(), + cas_root.clone(), + manifest_path, + )); + tracing::info!( + relay = %relay_url_str, + broadcast = %args.name, + track = %track_name, + "archival track subscribed" + ); + } + + if tasks.is_empty() { + return Err(anyhow!("no tracks spawned for archival")); + } + + loop { + tokio::select! { + next = tasks.join_next() => { + match next { + Some(Ok(Ok(()))) => { + return Err(anyhow!("archival track task exited unexpectedly")); + } + Some(Ok(Err(err))) => return Err(err), + Some(Err(err)) => return Err(anyhow!("archival track task join failure: {err}")), + None => return Err(anyhow!("all archival track tasks exited")), + } + } + _ = session.closed() => { + return Err(anyhow!("relay session closed")); + } + _ = tokio::signal::ctrl_c() => { + tracing::info!("ctrl-c; shutting down wt-archive"); + break; + } + } + } + + Ok(()) +} + async fn wt_publish(args: WtPublishArgs) -> Result<()> { let relay_url = Url::parse(&args.url).with_context(|| format!("invalid relay url: {}", args.url))?; diff --git a/evolution/proposals/ECP-0070-relay-cas-archival.md b/evolution/proposals/ECP-0070-relay-cas-archival.md new file mode 100644 index 0000000..0a20108 --- /dev/null +++ b/evolution/proposals/ECP-0070-relay-cas-archival.md @@ -0,0 +1,62 @@ +# ECP-0070: Relay-Native CAS Archival + NixOS Auto-Archive Service + +## Summary + +Add a first-party archival path for MoQ relay streams: + +1. `ec-node wt-archive`: +- Subscribe to a relay broadcast over WebTransport/MoQ. +- Persist each received group payload as a content-addressed object (BLAKE3). +- Append per-track JSONL index records ("manifests") that reference CAS object hashes/paths. + +2. Extend the NixOS `services.every-channel.ec-node` module with `archive.*` options: +- Poll `every.channel` public stream directory. +- Auto-start one `wt-archive` worker per discovered broadcast. +- Store large CAS blobs and manifest indices in separately configurable roots. + +## Motivation + +We need durable, low-duplication TV archive storage on dedicated storage hosts. + +- Relay ingestion makes archive nodes location-agnostic. +- CAS enables deduplication and integrity-friendly storage semantics. +- Separate manifest/index storage allows faster metadata operations and simpler backup policy. +- Nix-managed host config keeps archival behavior immutable and repeatable. + +## Decision + +- Introduce `ec-node wt-archive` as the minimal relay subscriber for archival. +- Use BLAKE3 object IDs and a sharded filesystem layout (`/objects/blake3/aa/.bin`). +- Write append-only JSONL records per `broadcast/track` under a configurable manifest directory. +- Add `services.every-channel.ec-node.archive.*` in the Nix module to auto-discover and archive public streams. + +Initial default archived tracks target current relay publisher output: +- `catalog.json` +- `init.mp4` +- `video0.m4s` +- `audio0.m4s` + +## Consequences + +Pros: +- Immediate archival path without requiring browser components. +- Works with current Cloudflare relay + WebTransport deployment. +- Fully declarative deployment via existing Nix input wiring. + +Tradeoffs: +- First version archives raw group payloads and does not yet normalize across track schema variants. +- Discovery source is the web public stream list (not full control-topic gossip ingestion). +- Per-broadcast workers are process-based and best-effort supervised. + +## Rollout + +1. Ship `wt-archive` command in `ec-node`. +2. Extend Nix module with `archive.*`. +3. Enable on storage host(s) with: +- CAS root on large ZFS pool. +- manifest root on separate filesystem/dataset. + +## Reversibility + +- Disable `services.every-channel.ec-node.archive.enable`. +- Existing CAS/manifests remain inert on disk and can be pruned independently. diff --git a/nix/modules/ec-node.nix b/nix/modules/ec-node.nix index 2bd32c4..0a98824 100644 --- a/nix/modules/ec-node.nix +++ b/nix/modules/ec-node.nix @@ -191,6 +191,61 @@ in }; }; + archive = { + enable = lib.mkOption { + type = lib.types.bool; + default = false; + description = "Run relay archival workers that subscribe and persist streams into CAS storage."; + }; + + directoryUrl = lib.mkOption { + type = lib.types.str; + default = "https://every.channel"; + description = "Directory base URL used to discover public streams (`/api/public-streams`)."; + }; + + outputDir = lib.mkOption { + type = lib.types.str; + default = "/var/lib/every-channel/archive"; + description = "CAS object root passed to `ec-node wt-archive --output-dir`."; + }; + + manifestDir = lib.mkOption { + type = lib.types.str; + default = "/var/lib/every-channel/manifests"; + description = "Manifest/index root passed to `ec-node wt-archive --manifest-dir`."; + }; + + pollIntervalMs = lib.mkOption { + type = lib.types.ints.positive; + default = 15000; + description = "Discovery poll interval for public streams."; + }; + + streamPrefix = lib.mkOption { + type = lib.types.nullOr lib.types.str; + default = null; + description = "Optional broadcast-name prefix filter for archival workers."; + }; + + tracks = lib.mkOption { + type = lib.types.listOf lib.types.str; + default = [ + "catalog.json" + "init.mp4" + "video0.m4s" + "audio0.m4s" + ]; + description = "Tracks passed to each `wt-archive` worker."; + }; + + tlsDisableVerify = lib.mkOption { + type = lib.types.bool; + default = false; + description = "Danger: disable TLS verification for relay archive subscribers."; + }; + }; + broadcasts = lib.mkOption { type = lib.types.listOf (lib.types.submodule { options = { @@ -218,8 +273,8 @@ in config = lib.mkIf cfg.enable { assertions = [ { - assertion = cfg.broadcasts != [ ]; - message = "services.every-channel.ec-node.broadcasts must be non-empty when enabled"; + assertion = (cfg.broadcasts != [ ]) || cfg.archive.enable; + message = "services.every-channel.ec-node.broadcasts must be non-empty unless archive.enable=true"; } { assertion = @@ -239,9 +294,14 @@ in } ]; - systemd.tmpfiles.rules = [ - "d /run/every-channel 1777 root root - -" - ]; + systemd.tmpfiles.rules = + [ + "d /run/every-channel 1777 root root - -" + ] + ++ lib.optionals cfg.archive.enable [ + "d ${cfg.archive.outputDir} 0750 root root - -" + "d ${cfg.archive.manifestDir} 0750 root root - -" + ]; systemd.services = lib.listToAttrs (map @@ -571,6 +631,146 @@ in SystemCallArchitectures = "native"; }; + environment = cfg.environment; + }; + }) + // lib.optionalAttrs cfg.archive.enable + (let + archiveUnit = "every-channel-wt-archive-auto"; + archivePrefix = if cfg.archive.streamPrefix == null then "" else cfg.archive.streamPrefix; + archiveTrackLines = lib.concatMapStrings (track: " cmd+=(--track ${lib.escapeShellArg track})\n") cfg.archive.tracks; + archiveRunner = pkgs.writeShellApplication { + name = archiveUnit; + runtimeInputs = [ + pkgs.coreutils + pkgs.curl + pkgs.gawk + pkgs.jq + cfg.package + ]; + text = '' + set -euo pipefail + + directory_url=${lib.escapeShellArg (cfg.archive.directoryUrl + "/api/public-streams")} + output_dir=${lib.escapeShellArg cfg.archive.outputDir} + manifest_dir=${lib.escapeShellArg cfg.archive.manifestDir} + relay_fallback=${lib.escapeShellArg cfg.relayUrl} + stream_prefix=${lib.escapeShellArg archivePrefix} + state_dir="/run/every-channel/archive" + pids_dir="$state_dir/pids" + logs_dir="$state_dir/logs" + mkdir -p "$pids_dir" "$logs_dir" + poll_secs="$(awk 'BEGIN { printf "%.3f", ${toString cfg.archive.pollIntervalMs} / 1000.0 }')" + + cleanup_children() { + for pid_file in "$pids_dir"/*.pid; do + [[ -e "$pid_file" ]] || continue + pid="$(cat "$pid_file" 2>/dev/null || true)" + if [[ -n "$pid" ]]; then + kill "$pid" 2>/dev/null || true + fi + rm -f "$pid_file" + done + } + + trap cleanup_children INT TERM EXIT + + while true; do + entries_json="$(curl -fsS "$directory_url" || true)" + if [[ -z "$entries_json" ]]; then + sleep "$poll_secs" + continue + fi + + while IFS= read -r entry; do + name="$(printf '%s\n' "$entry" | jq -r '.broadcast_name // empty')" + relay="$(printf '%s\n' "$entry" | jq -r '.relay_url // empty')" + if [[ -z "$name" ]]; then + continue + fi + if [[ -n "$stream_prefix" && "$name" != "$stream_prefix"* ]]; then + continue + fi + if [[ -z "$relay" ]]; then + relay="$relay_fallback" + fi + + key="$(printf '%s' "$name" | tr -c 'A-Za-z0-9_.-' '_')" + pid_file="$pids_dir/$key.pid" + if [[ -s "$pid_file" ]]; then + pid="$(cat "$pid_file" 2>/dev/null || true)" + if [[ -n "$pid" ]] && kill -0 "$pid" 2>/dev/null; then + continue + fi + fi + + cmd=( + ${lib.escapeShellArg "${cfg.package}/bin/ec-node"} + wt-archive + --url "$relay" + --name "$name" + --output-dir "$output_dir" + --manifest-dir "$manifest_dir" + ) + ${lib.optionalString cfg.archive.tlsDisableVerify "cmd+=(--tls-disable-verify)"} + ${archiveTrackLines} + + log_file="$logs_dir/$key.log" + ( + exec "''${cmd[@]}" + ) >>"$log_file" 2>&1 & + echo "$!" > "$pid_file" + done < <(printf '%s\n' "$entries_json" | jq -rc '.entries[]?') + + for pid_file in "$pids_dir"/*.pid; do + [[ -e "$pid_file" ]] || continue + pid="$(cat "$pid_file" 2>/dev/null || true)" + if [[ -z "$pid" ]] || ! kill -0 "$pid" 2>/dev/null; then + rm -f "$pid_file" + fi + done + + sleep "$poll_secs" + done + ''; + }; + in + { + "${archiveUnit}" = { + description = "every.channel relay archival auto-worker"; + wantedBy = [ "multi-user.target" ]; + after = [ "network-online.target" ]; + wants = [ "network-online.target" ]; + + unitConfig = { + StartLimitIntervalSec = 0; + }; + + serviceConfig = { + Type = "simple"; + ExecStart = "${archiveRunner}/bin/${archiveUnit}"; + Restart = "always"; + RestartSec = 2; + + NoNewPrivileges = true; + PrivateTmp = true; + ProtectSystem = "strict"; + ProtectHome = true; + ProtectKernelTunables = true; + ProtectKernelModules = true; + ProtectControlGroups = true; + LockPersonality = true; + MemoryDenyWriteExecute = true; + RestrictSUIDSGID = true; + RestrictRealtime = true; + SystemCallArchitectures = "native"; + ReadWritePaths = [ + "/run/every-channel" + cfg.archive.outputDir + cfg.archive.manifestDir + ]; + }; + environment = cfg.environment; }; });