From faec62f9ae8a733bde008dd44ac2974c087ee7d0 Mon Sep 17 00:00:00 2001 From: "every.channel" Date: Sun, 22 Feb 2026 02:23:06 -0800 Subject: [PATCH] control: add transport resolver and nix control announce wiring --- README.md | 10 +- crates/ec-node/src/main.rs | 194 +++++++++++++++--- ...P-0067-control-resolve-and-nixos-wiring.md | 38 ++++ nix/modules/ec-node.nix | 48 +++++ 4 files changed, 260 insertions(+), 30 deletions(-) create mode 100644 evolution/proposals/ECP-0067-control-resolve-and-nixos-wiring.md diff --git a/README.md b/README.md index b8db931..c0ed671 100644 --- a/README.md +++ b/README.md @@ -68,14 +68,20 @@ Control protocol (iroh gossip, relay + direct transport discovery): ```sh # Listener (on node A) -cargo run -p ec-node -- control-listen --gossip-peer +cargo run -p ec-node -- control-listen --gossip-peer # Announcer (on node B) cargo run -p ec-node -- control-announce \ --stream-id la-nbc \ --relay-url https://cdn.moq.dev/anon \ --relay-broadcast la-nbc \ - --gossip-peer + --gossip-peer + +# Resolver (consumer picks best announced path) +cargo run -p ec-node -- control-resolve \ + --stream-id la-nbc \ + --prefer direct-first \ + --gossip-peer ``` Coverage: diff --git a/crates/ec-node/src/main.rs b/crates/ec-node/src/main.rs index 5bfa9d8..7f39195 100644 --- a/crates/ec-node/src/main.rs +++ b/crates/ec-node/src/main.rs @@ -84,6 +84,8 @@ enum Commands { ControlAnnounce(ControlAnnounceArgs), /// Listen for stream transport announcements from iroh gossip control topic. ControlListen(ControlListenArgs), + /// Resolve a stream id to the best currently-announced transport. + ControlResolve(ControlResolveArgs), } #[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] @@ -508,6 +510,46 @@ struct ControlListenArgs { once: bool, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] +enum ControlTransportPreference { + /// Prefer iroh direct transport when both are available. + DirectFirst, + /// Prefer relay transport when both are available. + RelayFirst, + /// Accept only iroh direct transport. + DirectOnly, + /// Accept only relay transport. + RelayOnly, +} + +#[derive(Parser, Debug)] +struct ControlResolveArgs { + /// Stable stream id to resolve from control announcements. + #[arg(long)] + stream_id: String, + /// Transport selection preference. + #[arg(long, value_enum, default_value_t = ControlTransportPreference::DirectFirst)] + prefer: ControlTransportPreference, + /// Maximum wall-clock wait for a matching announcement (ms). + #[arg(long, default_value_t = 30000)] + timeout_ms: u64, + /// Maximum accepted staleness from updated_unix_ms (ms). + #[arg(long, default_value_t = 30000)] + max_age_ms: u64, + /// Include the full raw announcement in output JSON. + #[arg(long, default_value_t = false)] + include_announcement: bool, + /// Optional iroh secret key (hex) for control gossip endpoint identity. + #[arg(long)] + iroh_secret: Option, + /// Discovery modes to enable (comma-separated: dht, mdns, dns). + #[arg(long)] + discovery: Option, + /// Gossip peers to connect to (repeatable). + #[arg(long)] + gossip_peer: Vec, +} + #[derive(Subcommand, Debug)] enum IngestSource { /// Ingest from an HDHomeRun device. @@ -587,6 +629,7 @@ fn main() -> Result<()> { Commands::WtPublish(args) => run_async(wt_publish(args))?, Commands::ControlAnnounce(args) => run_async(control_announce(args))?, Commands::ControlListen(args) => run_async(control_listen(args))?, + Commands::ControlResolve(args) => run_async(control_resolve(args))?, } Ok(()) @@ -4361,12 +4404,6 @@ async fn spawn_control_announcer_task( } async fn control_announce(args: ControlAnnounceArgs) -> Result<()> { - if args.gossip_peer.is_empty() { - return Err(anyhow!( - "control announce requires at least one --gossip-peer currently" - )); - } - let secret = parse_iroh_secret(args.iroh_secret)?; let discovery = parse_discovery(args.discovery.as_deref())?; let endpoint = ec_iroh::build_endpoint(secret, discovery).await?; @@ -4459,6 +4496,111 @@ async fn control_listen(args: ControlListenArgs) -> Result<()> { Ok(()) } +fn control_announcement_is_fresh( + announcement: &StreamControlAnnouncement, + max_age_ms: u64, +) -> bool { + let now = now_unix_ms(); + let freshness_ms = announcement.ttl_ms.min(max_age_ms.max(1)); + now <= announcement.updated_unix_ms.saturating_add(freshness_ms) +} + +fn select_control_transport( + transports: &[StreamTransportDescriptor], + prefer: ControlTransportPreference, +) -> Option { + let is_direct = + |t: &StreamTransportDescriptor| matches!(t, StreamTransportDescriptor::IrohDirect { .. }); + let is_relay = + |t: &StreamTransportDescriptor| matches!(t, StreamTransportDescriptor::RelayMoq { .. }); + + match prefer { + ControlTransportPreference::DirectOnly => transports.iter().find(|t| is_direct(t)).cloned(), + ControlTransportPreference::RelayOnly => transports.iter().find(|t| is_relay(t)).cloned(), + ControlTransportPreference::DirectFirst => transports + .iter() + .find(|t| is_direct(t)) + .cloned() + .or_else(|| transports.iter().find(|t| is_relay(t)).cloned()), + ControlTransportPreference::RelayFirst => transports + .iter() + .find(|t| is_relay(t)) + .cloned() + .or_else(|| transports.iter().find(|t| is_direct(t)).cloned()), + } +} + +async fn control_resolve(args: ControlResolveArgs) -> Result<()> { + let secret = parse_iroh_secret(args.iroh_secret)?; + let discovery = parse_discovery(args.discovery.as_deref())?; + let endpoint = ec_iroh::build_endpoint(secret, discovery).await?; + eprintln!("control endpoint id: {}", endpoint.id()); + + let mut gossip = tokio::time::timeout( + Duration::from_secs(30), + ec_iroh::ControlGossip::join(endpoint.clone(), &args.gossip_peer), + ) + .await + .context("timed out joining control gossip topic")??; + + let timeout = Duration::from_millis(args.timeout_ms.max(1000)); + let deadline = Instant::now() + timeout; + + loop { + let now = Instant::now(); + if now >= deadline { + break; + } + let remaining = deadline - now; + + let maybe = tokio::time::timeout(remaining, gossip.next_announcement()) + .await + .context("timed out waiting for control announcement")??; + let Some(announcement) = maybe else { + continue; + }; + if announcement.stream.id.0.as_str() != args.stream_id { + continue; + } + if !control_announcement_is_fresh(&announcement, args.max_age_ms) { + continue; + } + let Some(transport) = select_control_transport(&announcement.transports, args.prefer) + else { + continue; + }; + + let stream_id = announcement.stream.id.0.clone(); + let title = announcement.stream.title.clone(); + + let output = if args.include_announcement { + serde_json::json!({ + "stream_id": stream_id, + "title": title, + "transport": transport, + "updated_unix_ms": announcement.updated_unix_ms, + "ttl_ms": announcement.ttl_ms, + "announcement": announcement, + }) + } else { + serde_json::json!({ + "stream_id": stream_id, + "title": title, + "transport": transport, + "updated_unix_ms": announcement.updated_unix_ms, + "ttl_ms": announcement.ttl_ms, + }) + }; + println!("{}", serde_json::to_string(&output)?); + return Ok(()); + } + + Err(anyhow!( + "timed out resolving stream_id '{}' from control topic", + args.stream_id + )) +} + fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> { let start = Instant::now(); let mut last_len: Option = None; @@ -4513,28 +4655,24 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { args.control_ttl_ms, ); - if args.gossip_peer.is_empty() { - tracing::warn!("control announce requested but no gossip peers configured; skipping"); - } else { - match spawn_control_announcer_task( - endpoint.clone(), - args.gossip_peer.clone(), - announcement, - Duration::from_millis(args.control_interval_ms.max(1000)), - ) - .await - { - Ok(stop_tx) => { - tracing::info!( - endpoint = %endpoint.id(), - stream = %args.name, - "control announce enabled" - ); - control_stop = Some(stop_tx); - } - Err(err) => { - tracing::warn!("failed to start control announce task: {err:#}"); - } + match spawn_control_announcer_task( + endpoint.clone(), + args.gossip_peer.clone(), + announcement, + Duration::from_millis(args.control_interval_ms.max(1000)), + ) + .await + { + Ok(stop_tx) => { + tracing::info!( + endpoint = %endpoint.id(), + stream = %args.name, + "control announce enabled" + ); + control_stop = Some(stop_tx); + } + Err(err) => { + tracing::warn!("failed to start control announce task: {err:#}"); } } } diff --git a/evolution/proposals/ECP-0067-control-resolve-and-nixos-wiring.md b/evolution/proposals/ECP-0067-control-resolve-and-nixos-wiring.md new file mode 100644 index 0000000..d15dfde --- /dev/null +++ b/evolution/proposals/ECP-0067-control-resolve-and-nixos-wiring.md @@ -0,0 +1,38 @@ +# ECP-0067: Control Transport Resolution And NixOS Control Wiring + +Status: Draft + +## Decision + +Add two pieces on top of ECP-0066: + +1. `ec-node control-resolve`: +- resolve a `stream_id` from iroh-gossip control announcements, +- enforce freshness (`updated_unix_ms` + TTL / max age), +- choose transport by policy (`direct-first`, `relay-first`, direct-only, relay-only), +- emit machine-readable JSON for automation. + +2. Extend the `services.every-channel.ec-node` NixOS module with `control.*` options that map directly to `wt-publish --control-announce` flags. + +## Motivation + +We already announce relay/direct transport availability, but consumers and deployment automation still need ad-hoc logic to pick a path. `control-resolve` makes this deterministic and scriptable. + +For ops, control announcements should be configured as immutable host state in Nix, not hand-managed CLI flags on each machine. + +## Scope + +In scope: +- New `control-resolve` command in `ec-node`. +- Freshness + transport-preference policy in resolver. +- NixOS module options for control announce enable/ttl/interval/discovery/identity/peers. + +Out of scope: +- Browser-native iroh direct transport. +- End-to-end automatic failover execution (resolve + launch subscribe) in one command. +- Cryptographic policy hardening beyond current control-topic trust model. + +## Rollout / Reversibility + +- Additive only: existing relay and direct publish/subscribe paths remain unchanged. +- If needed, disable by not using `control-resolve` and leaving `services.every-channel.ec-node.control.enable = false`. diff --git a/nix/modules/ec-node.nix b/nix/modules/ec-node.nix index f10b992..13c83d8 100644 --- a/nix/modules/ec-node.nix +++ b/nix/modules/ec-node.nix @@ -107,6 +107,45 @@ in }; }; + control = { + enable = lib.mkOption { + type = lib.types.bool; + default = false; + description = "Enable iroh-gossip control announcements from each wt-publish service."; + }; + + ttlMs = lib.mkOption { + type = lib.types.ints.positive; + default = 15000; + description = "Control announcement TTL passed to `ec-node wt-publish --control-ttl-ms`."; + }; + + intervalMs = lib.mkOption { + type = lib.types.ints.positive; + default = 5000; + description = "Control announcement interval passed to `ec-node wt-publish --control-interval-ms`."; + }; + + discovery = lib.mkOption { + type = lib.types.nullOr lib.types.str; + default = null; + example = "dht,mdns,dns"; + description = "Optional iroh discovery mode list for control announcements."; + }; + + irohSecret = lib.mkOption { + type = lib.types.nullOr lib.types.str; + default = null; + description = "Optional iroh secret key (hex) for control announcement identity."; + }; + + gossipPeers = lib.mkOption { + type = lib.types.listOf lib.types.str; + default = [ ]; + description = "Optional iroh endpoint addresses to seed control gossip joins."; + }; + }; + broadcasts = lib.mkOption { type = lib.types.listOf (lib.types.submodule { options = { @@ -188,6 +227,7 @@ in "cmd+=(${lib.concatStringsSep " " (map lib.escapeShellArg cfg.extraArgs)})"; explicitInputStr = if b.input == null then "" else b.input; channelStr = if b.channel == null then "" else b.channel; + controlGossipPeerLines = lib.concatMapStrings (peer: "cmd+=(--gossip-peer ${lib.escapeShellArg peer})\n") cfg.control.gossipPeers; in '' set -euo pipefail @@ -302,6 +342,14 @@ in ${lib.optionalString (!cfg.transcode) "cmd+=(--transcode=false)"} ${lib.optionalString (!cfg.passthrough) "cmd+=(--passthrough=false)"} ${lib.optionalString cfg.tlsDisableVerify "cmd+=(--tls-disable-verify)"} + ${lib.optionalString cfg.control.enable '' + cmd+=(--control-announce) + cmd+=(--control-ttl-ms ${toString cfg.control.ttlMs}) + cmd+=(--control-interval-ms ${toString cfg.control.intervalMs}) + ${lib.optionalString (cfg.control.discovery != null) "cmd+=(--discovery ${lib.escapeShellArg cfg.control.discovery})"} + ${lib.optionalString (cfg.control.irohSecret != null) "cmd+=(--iroh-secret ${lib.escapeShellArg cfg.control.irohSecret})"} + ${controlGossipPeerLines} + ''} ${extraArgsLine} # Keep the unit alive even if the relay is temporarily unreachable.