control: add transport resolver and nix control announce wiring

This commit is contained in:
every.channel 2026-02-22 02:23:06 -08:00
parent f77fab378b
commit faec62f9ae
No known key found for this signature in database
4 changed files with 260 additions and 30 deletions

View file

@ -84,6 +84,8 @@ enum Commands {
ControlAnnounce(ControlAnnounceArgs),
/// Listen for stream transport announcements from iroh gossip control topic.
ControlListen(ControlListenArgs),
/// Resolve a stream id to the best currently-announced transport.
ControlResolve(ControlResolveArgs),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
@ -508,6 +510,46 @@ struct ControlListenArgs {
once: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
enum ControlTransportPreference {
/// Prefer iroh direct transport when both are available.
DirectFirst,
/// Prefer relay transport when both are available.
RelayFirst,
/// Accept only iroh direct transport.
DirectOnly,
/// Accept only relay transport.
RelayOnly,
}
#[derive(Parser, Debug)]
struct ControlResolveArgs {
/// Stable stream id to resolve from control announcements.
#[arg(long)]
stream_id: String,
/// Transport selection preference.
#[arg(long, value_enum, default_value_t = ControlTransportPreference::DirectFirst)]
prefer: ControlTransportPreference,
/// Maximum wall-clock wait for a matching announcement (ms).
#[arg(long, default_value_t = 30000)]
timeout_ms: u64,
/// Maximum accepted staleness from updated_unix_ms (ms).
#[arg(long, default_value_t = 30000)]
max_age_ms: u64,
/// Include the full raw announcement in output JSON.
#[arg(long, default_value_t = false)]
include_announcement: bool,
/// Optional iroh secret key (hex) for control gossip endpoint identity.
#[arg(long)]
iroh_secret: Option<String>,
/// Discovery modes to enable (comma-separated: dht, mdns, dns).
#[arg(long)]
discovery: Option<String>,
/// Gossip peers to connect to (repeatable).
#[arg(long)]
gossip_peer: Vec<String>,
}
#[derive(Subcommand, Debug)]
enum IngestSource {
/// Ingest from an HDHomeRun device.
@ -587,6 +629,7 @@ fn main() -> Result<()> {
Commands::WtPublish(args) => run_async(wt_publish(args))?,
Commands::ControlAnnounce(args) => run_async(control_announce(args))?,
Commands::ControlListen(args) => run_async(control_listen(args))?,
Commands::ControlResolve(args) => run_async(control_resolve(args))?,
}
Ok(())
@ -4361,12 +4404,6 @@ async fn spawn_control_announcer_task(
}
async fn control_announce(args: ControlAnnounceArgs) -> Result<()> {
if args.gossip_peer.is_empty() {
return Err(anyhow!(
"control announce requires at least one --gossip-peer currently"
));
}
let secret = parse_iroh_secret(args.iroh_secret)?;
let discovery = parse_discovery(args.discovery.as_deref())?;
let endpoint = ec_iroh::build_endpoint(secret, discovery).await?;
@ -4459,6 +4496,111 @@ async fn control_listen(args: ControlListenArgs) -> Result<()> {
Ok(())
}
fn control_announcement_is_fresh(
announcement: &StreamControlAnnouncement,
max_age_ms: u64,
) -> bool {
let now = now_unix_ms();
let freshness_ms = announcement.ttl_ms.min(max_age_ms.max(1));
now <= announcement.updated_unix_ms.saturating_add(freshness_ms)
}
fn select_control_transport(
transports: &[StreamTransportDescriptor],
prefer: ControlTransportPreference,
) -> Option<StreamTransportDescriptor> {
let is_direct =
|t: &StreamTransportDescriptor| matches!(t, StreamTransportDescriptor::IrohDirect { .. });
let is_relay =
|t: &StreamTransportDescriptor| matches!(t, StreamTransportDescriptor::RelayMoq { .. });
match prefer {
ControlTransportPreference::DirectOnly => transports.iter().find(|t| is_direct(t)).cloned(),
ControlTransportPreference::RelayOnly => transports.iter().find(|t| is_relay(t)).cloned(),
ControlTransportPreference::DirectFirst => transports
.iter()
.find(|t| is_direct(t))
.cloned()
.or_else(|| transports.iter().find(|t| is_relay(t)).cloned()),
ControlTransportPreference::RelayFirst => transports
.iter()
.find(|t| is_relay(t))
.cloned()
.or_else(|| transports.iter().find(|t| is_direct(t)).cloned()),
}
}
async fn control_resolve(args: ControlResolveArgs) -> Result<()> {
let secret = parse_iroh_secret(args.iroh_secret)?;
let discovery = parse_discovery(args.discovery.as_deref())?;
let endpoint = ec_iroh::build_endpoint(secret, discovery).await?;
eprintln!("control endpoint id: {}", endpoint.id());
let mut gossip = tokio::time::timeout(
Duration::from_secs(30),
ec_iroh::ControlGossip::join(endpoint.clone(), &args.gossip_peer),
)
.await
.context("timed out joining control gossip topic")??;
let timeout = Duration::from_millis(args.timeout_ms.max(1000));
let deadline = Instant::now() + timeout;
loop {
let now = Instant::now();
if now >= deadline {
break;
}
let remaining = deadline - now;
let maybe = tokio::time::timeout(remaining, gossip.next_announcement())
.await
.context("timed out waiting for control announcement")??;
let Some(announcement) = maybe else {
continue;
};
if announcement.stream.id.0.as_str() != args.stream_id {
continue;
}
if !control_announcement_is_fresh(&announcement, args.max_age_ms) {
continue;
}
let Some(transport) = select_control_transport(&announcement.transports, args.prefer)
else {
continue;
};
let stream_id = announcement.stream.id.0.clone();
let title = announcement.stream.title.clone();
let output = if args.include_announcement {
serde_json::json!({
"stream_id": stream_id,
"title": title,
"transport": transport,
"updated_unix_ms": announcement.updated_unix_ms,
"ttl_ms": announcement.ttl_ms,
"announcement": announcement,
})
} else {
serde_json::json!({
"stream_id": stream_id,
"title": title,
"transport": transport,
"updated_unix_ms": announcement.updated_unix_ms,
"ttl_ms": announcement.ttl_ms,
})
};
println!("{}", serde_json::to_string(&output)?);
return Ok(());
}
Err(anyhow!(
"timed out resolving stream_id '{}' from control topic",
args.stream_id
))
}
fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> {
let start = Instant::now();
let mut last_len: Option<u64> = None;
@ -4513,28 +4655,24 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
args.control_ttl_ms,
);
if args.gossip_peer.is_empty() {
tracing::warn!("control announce requested but no gossip peers configured; skipping");
} else {
match spawn_control_announcer_task(
endpoint.clone(),
args.gossip_peer.clone(),
announcement,
Duration::from_millis(args.control_interval_ms.max(1000)),
)
.await
{
Ok(stop_tx) => {
tracing::info!(
endpoint = %endpoint.id(),
stream = %args.name,
"control announce enabled"
);
control_stop = Some(stop_tx);
}
Err(err) => {
tracing::warn!("failed to start control announce task: {err:#}");
}
match spawn_control_announcer_task(
endpoint.clone(),
args.gossip_peer.clone(),
announcement,
Duration::from_millis(args.control_interval_ms.max(1000)),
)
.await
{
Ok(stop_tx) => {
tracing::info!(
endpoint = %endpoint.id(),
stream = %args.name,
"control announce enabled"
);
control_stop = Some(stop_tx);
}
Err(err) => {
tracing::warn!("failed to start control announce task: {err:#}");
}
}
}