Bridge iroh control announcements into web stream discovery

This commit is contained in:
every.channel 2026-02-22 23:08:37 -08:00
parent 74842eb25e
commit 2778715304
No known key found for this signature in database
8 changed files with 528 additions and 26 deletions

View file

@ -86,6 +86,8 @@ enum Commands {
ControlListen(ControlListenArgs),
/// Resolve a stream id to the best currently-announced transport.
ControlResolve(ControlResolveArgs),
/// Bridge iroh control announcements into the website public stream directory.
ControlBridgeWeb(ControlBridgeWebArgs),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
@ -550,6 +552,39 @@ struct ControlResolveArgs {
gossip_peer: Vec<String>,
}
#[derive(Parser, Debug)]
struct ControlBridgeWebArgs {
/// Directory base URL (e.g. https://every.channel).
#[arg(long, default_value = "https://every.channel")]
directory_url: String,
/// Optional bearer token for `/api/stream-upsert`.
/// Falls back to `EVERY_CHANNEL_WEB_UPSERT_TOKEN` when omitted.
#[arg(long)]
auth_token: Option<String>,
/// Maximum wall-clock wait for control announcements before exit (ms).
/// Set to 0 to run forever.
#[arg(long, default_value_t = 0)]
timeout_ms: u64,
/// Maximum accepted staleness from updated_unix_ms (ms).
#[arg(long, default_value_t = 30000)]
max_age_ms: u64,
/// Optional stream id prefix filter.
#[arg(long)]
stream_prefix: Option<String>,
/// Exit after the first successful upsert.
#[arg(long, default_value_t = false)]
once: bool,
/// Optional iroh secret key (hex) for control gossip endpoint identity.
#[arg(long)]
iroh_secret: Option<String>,
/// Discovery modes to enable (comma-separated: dht, mdns, dns).
#[arg(long)]
discovery: Option<String>,
/// Gossip peers to connect to (repeatable).
#[arg(long)]
gossip_peer: Vec<String>,
}
#[derive(Subcommand, Debug)]
enum IngestSource {
/// Ingest from an HDHomeRun device.
@ -630,6 +665,7 @@ fn main() -> Result<()> {
Commands::ControlAnnounce(args) => run_async(control_announce(args))?,
Commands::ControlListen(args) => run_async(control_listen(args))?,
Commands::ControlResolve(args) => run_async(control_resolve(args))?,
Commands::ControlBridgeWeb(args) => run_async(control_bridge_web(args))?,
}
Ok(())
@ -2102,15 +2138,10 @@ async fn moq_publish(args: MoqPublishArgs) -> Result<()> {
let mut manifest_sequence: u64 = 0;
let announce_tx = if args.announce {
let gossip_peers = parse_gossip_peers(args.gossip_peer.clone());
Some(
spawn_catalog_announcer(
&node,
&track,
&broadcast_name,
&track_name,
args.gossip_peer.clone(),
)
.await?,
spawn_catalog_announcer(&node, &track, &broadcast_name, &track_name, gossip_peers)
.await?,
)
} else {
None
@ -4244,6 +4275,21 @@ fn parse_discovery(value: Option<&str>) -> Result<DiscoveryConfig> {
}
}
fn parse_gossip_peers(mut peers: Vec<String>) -> Vec<String> {
if let Ok(env_peers) = std::env::var("EVERY_CHANNEL_GOSSIP_PEERS") {
for raw in env_peers.split(|c: char| c == ',' || c == ';' || c.is_whitespace()) {
let peer = raw.trim();
if peer.is_empty() {
continue;
}
if !peers.iter().any(|existing| existing == peer) {
peers.push(peer.to_string());
}
}
}
peers
}
async fn spawn_catalog_announcer(
node: &MoqNode,
track: &TrackName,
@ -4407,6 +4453,9 @@ async fn control_announce(args: ControlAnnounceArgs) -> Result<()> {
let secret = parse_iroh_secret(args.iroh_secret)?;
let discovery = parse_discovery(args.discovery.as_deref())?;
let endpoint = ec_iroh::build_endpoint(secret, discovery).await?;
let gossip_peers = parse_gossip_peers(args.gossip_peer.clone());
let endpoint_addr =
serde_json::to_string(&endpoint.addr()).unwrap_or_else(|_| endpoint.id().to_string());
let mut transports = Vec::new();
@ -4444,13 +4493,14 @@ async fn control_announce(args: ControlAnnounceArgs) -> Result<()> {
let stop_tx = spawn_control_announcer_task(
endpoint.clone(),
args.gossip_peer.clone(),
gossip_peers,
announcement,
Duration::from_millis(args.interval_ms.max(1000)),
)
.await?;
eprintln!("control endpoint id: {}", endpoint.id());
eprintln!("control endpoint addr: {}", endpoint_addr);
eprintln!("control stream_id: {}", args.stream_id);
eprintln!(
"control announce interval_ms: {}",
@ -4468,10 +4518,15 @@ async fn control_listen(args: ControlListenArgs) -> Result<()> {
let secret = parse_iroh_secret(args.iroh_secret)?;
let discovery = parse_discovery(args.discovery.as_deref())?;
let endpoint = ec_iroh::build_endpoint(secret, discovery).await?;
let gossip_peers = parse_gossip_peers(args.gossip_peer.clone());
eprintln!("control endpoint id: {}", endpoint.id());
eprintln!(
"control endpoint addr: {}",
serde_json::to_string(&endpoint.addr()).unwrap_or_else(|_| endpoint.id().to_string())
);
let mut gossip = tokio::time::timeout(
Duration::from_secs(30),
ec_iroh::ControlGossip::join(endpoint.clone(), &args.gossip_peer),
ec_iroh::ControlGossip::join(endpoint.clone(), &gossip_peers),
)
.await
.context("timed out joining control gossip topic")??;
@ -4534,11 +4589,16 @@ async fn control_resolve(args: ControlResolveArgs) -> Result<()> {
let secret = parse_iroh_secret(args.iroh_secret)?;
let discovery = parse_discovery(args.discovery.as_deref())?;
let endpoint = ec_iroh::build_endpoint(secret, discovery).await?;
let gossip_peers = parse_gossip_peers(args.gossip_peer.clone());
eprintln!("control endpoint id: {}", endpoint.id());
eprintln!(
"control endpoint addr: {}",
serde_json::to_string(&endpoint.addr()).unwrap_or_else(|_| endpoint.id().to_string())
);
let mut gossip = tokio::time::timeout(
Duration::from_secs(30),
ec_iroh::ControlGossip::join(endpoint.clone(), &args.gossip_peer),
ec_iroh::ControlGossip::join(endpoint.clone(), &gossip_peers),
)
.await
.context("timed out joining control gossip topic")??;
@ -4601,6 +4661,155 @@ async fn control_resolve(args: ControlResolveArgs) -> Result<()> {
))
}
fn select_relay_transport_for_web(
transports: &[StreamTransportDescriptor],
) -> Option<(String, String, String)> {
for transport in transports {
if let StreamTransportDescriptor::RelayMoq {
url,
broadcast_name,
track_name,
} = transport
{
return Some((url.clone(), broadcast_name.clone(), track_name.clone()));
}
}
None
}
#[derive(Debug, serde::Serialize)]
struct WebStreamUpsertReq<'a> {
stream_id: &'a str,
title: &'a str,
relay_url: &'a str,
broadcast_name: &'a str,
track_name: &'a str,
expires_ms: u64,
}
async fn control_bridge_web(args: ControlBridgeWebArgs) -> Result<()> {
let secret = parse_iroh_secret(args.iroh_secret)?;
let discovery = parse_discovery(args.discovery.as_deref())?;
let endpoint = ec_iroh::build_endpoint(secret, discovery).await?;
let gossip_peers = parse_gossip_peers(args.gossip_peer.clone());
eprintln!("control endpoint id: {}", endpoint.id());
eprintln!(
"control endpoint addr: {}",
serde_json::to_string(&endpoint.addr()).unwrap_or_else(|_| endpoint.id().to_string())
);
let mut gossip = tokio::time::timeout(
Duration::from_secs(30),
ec_iroh::ControlGossip::join(endpoint.clone(), &gossip_peers),
)
.await
.context("timed out joining control gossip topic")??;
let token = args
.auth_token
.or_else(|| std::env::var("EVERY_CHANNEL_WEB_UPSERT_TOKEN").ok())
.map(|v| v.trim().to_string())
.filter(|v| !v.is_empty());
let directory_url = args.directory_url.trim_end_matches('/').to_string();
let upsert_url = format!("{directory_url}/api/stream-upsert");
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(8))
.build()
.context("failed to build web bridge http client")?;
let mut last_upserted_unix_ms = HashMap::<String, u64>::new();
let timeout_deadline = if args.timeout_ms == 0 {
None
} else {
Some(Instant::now() + Duration::from_millis(args.timeout_ms.max(1000)))
};
eprintln!("web upsert url: {}", upsert_url);
loop {
let maybe = if let Some(deadline) = timeout_deadline {
let now = Instant::now();
if now >= deadline {
return Err(anyhow!("timed out waiting for control announcements"));
}
tokio::time::timeout(deadline - now, gossip.next_announcement())
.await
.context("timed out waiting for control announcement")??
} else {
gossip.next_announcement().await?
};
let Some(announcement) = maybe else {
continue;
};
if !control_announcement_is_fresh(&announcement, args.max_age_ms) {
continue;
}
let stream_id = announcement.stream.id.0.clone();
if let Some(prefix) = args.stream_prefix.as_deref() {
if !stream_id.starts_with(prefix) {
continue;
}
}
let Some((relay_url, broadcast_name, track_name)) =
select_relay_transport_for_web(&announcement.transports)
else {
continue;
};
if last_upserted_unix_ms
.get(&stream_id)
.is_some_and(|seen| *seen >= announcement.updated_unix_ms)
{
continue;
}
let ttl_ms = announcement.ttl_ms.clamp(5_000, 60_000);
let payload = WebStreamUpsertReq {
stream_id: &stream_id,
title: &announcement.stream.title,
relay_url: &relay_url,
broadcast_name: &broadcast_name,
track_name: &track_name,
expires_ms: now_unix_ms().saturating_add(ttl_ms),
};
let mut req = client.post(&upsert_url).json(&payload);
if let Some(token) = token.as_ref() {
req = req.bearer_auth(token);
}
let res = req
.send()
.await
.with_context(|| format!("failed posting to {upsert_url}"))?;
if !res.status().is_success() {
let status = res.status();
let body = res.text().await.unwrap_or_default();
tracing::warn!(
stream = %stream_id,
status = %status,
body = %body,
"web stream upsert failed"
);
continue;
}
last_upserted_unix_ms.insert(stream_id.clone(), announcement.updated_unix_ms);
tracing::info!(
stream = %stream_id,
relay = %relay_url,
broadcast = %broadcast_name,
"web stream upserted"
);
if args.once {
break;
}
}
Ok(())
}
fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> {
let start = Instant::now();
let mut last_len: Option<u64> = None;
@ -4643,6 +4852,7 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
let secret = parse_iroh_secret(args.iroh_secret.clone())?;
let discovery = parse_discovery(args.discovery.as_deref())?;
let endpoint = ec_iroh::build_endpoint(secret, discovery).await?;
let gossip_peers = parse_gossip_peers(args.gossip_peer.clone());
let announcement = build_control_announcement(
args.name.clone(),
@ -4657,7 +4867,7 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
match spawn_control_announcer_task(
endpoint.clone(),
args.gossip_peer.clone(),
gossip_peers,
announcement,
Duration::from_millis(args.control_interval_ms.max(1000)),
)