control: add iroh gossip transport announcements and ec-node control CLI

This commit is contained in:
every.channel 2026-02-22 01:57:20 -08:00
parent fba1f3a7d5
commit fe97623ba8
No known key found for this signature in database
5 changed files with 494 additions and 23 deletions

View file

@ -64,6 +64,20 @@ Watch (web):
https://every.channel/watch?url=https%3A%2F%2Fcdn.moq.dev%2Fanon&name=la-nbc https://every.channel/watch?url=https%3A%2F%2Fcdn.moq.dev%2Fanon&name=la-nbc
``` ```
Control protocol (iroh gossip, relay + direct transport discovery):
```sh
# Listener (on node A)
cargo run -p ec-node -- control-listen --gossip-peer <node-b-endpoint-id>
# Announcer (on node B)
cargo run -p ec-node -- control-announce \
--stream-id la-nbc \
--relay-url https://cdn.moq.dev/anon \
--relay-broadcast la-nbc \
--gossip-peer <node-a-endpoint-id>
```
Coverage: Coverage:
```sh ```sh

View file

@ -183,6 +183,32 @@ pub struct StreamCatalog {
pub entries: Vec<StreamCatalogEntry>, pub entries: Vec<StreamCatalogEntry>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum StreamTransportDescriptor {
/// Stream is available via a MoQ relay over WebTransport/WebSocket.
RelayMoq {
url: String,
broadcast_name: String,
track_name: String,
},
/// Stream is available via iroh direct MoQ.
IrohDirect {
endpoint: String,
broadcast_name: String,
track_name: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamControlAnnouncement {
pub stream: StreamDescriptor,
pub transports: Vec<StreamTransportDescriptor>,
pub updated_unix_ms: u64,
/// Suggested freshness window for this announcement.
pub ttl_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ManifestSummary { pub struct ManifestSummary {
pub manifest_id: String, pub manifest_id: String,

View file

@ -2,7 +2,7 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use bytes::Bytes; use bytes::Bytes;
use ec_core::StreamCatalogEntry; use ec_core::{StreamCatalogEntry, StreamControlAnnouncement};
use futures_lite::StreamExt; use futures_lite::StreamExt;
use iroh::address_lookup::{ use iroh::address_lookup::{
DhtAddressLookup, DiscoveryEvent, DnsAddressLookup, MdnsAddressLookup, PkarrPublisher, UserData, DhtAddressLookup, DiscoveryEvent, DnsAddressLookup, MdnsAddressLookup, PkarrPublisher, UserData,
@ -23,6 +23,7 @@ use std::time::{Duration, Instant};
pub const ALPN_MOQ: &[u8] = b"every.channel/moq/0"; pub const ALPN_MOQ: &[u8] = b"every.channel/moq/0";
pub const DEFAULT_CATALOG_TOPIC: &str = "every.channel/catalog/v1"; pub const DEFAULT_CATALOG_TOPIC: &str = "every.channel/catalog/v1";
pub const DEFAULT_CONTROL_TOPIC: &str = "every.channel/control/v1";
pub const MDNS_USER_DATA: &str = "every.channel"; pub const MDNS_USER_DATA: &str = "every.channel";
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -176,6 +177,11 @@ pub fn catalog_topic() -> TopicId {
TopicId::from_bytes(*hash.as_bytes()) TopicId::from_bytes(*hash.as_bytes())
} }
pub fn control_topic() -> TopicId {
let hash = blake3::hash(DEFAULT_CONTROL_TOPIC.as_bytes());
TopicId::from_bytes(*hash.as_bytes())
}
pub fn parse_endpoint_addr(value: &str) -> Result<EndpointAddr> { pub fn parse_endpoint_addr(value: &str) -> Result<EndpointAddr> {
let value = value.trim(); let value = value.trim();
if value.starts_with('{') { if value.starts_with('{') {
@ -326,3 +332,75 @@ impl CatalogGossip {
} }
} }
} }
#[derive(Debug)]
pub struct ControlGossip {
sender: GossipSender,
receiver: GossipReceiver,
_router: Router,
_gossip: Gossip,
_memory_lookup: MemoryLookup,
}
impl ControlGossip {
pub async fn join(endpoint: Endpoint, peers: &[String]) -> Result<Self> {
let memory_lookup = MemoryLookup::new();
endpoint.address_lookup().add(memory_lookup.clone());
let gossip = Gossip::builder().spawn(endpoint.clone());
let router = Router::builder(endpoint.clone())
.accept(GOSSIP_ALPN, gossip.clone())
.spawn();
let peer_addrs = peers
.iter()
.map(|peer| parse_endpoint_addr(peer))
.collect::<Result<Vec<_>, _>>()
.context("failed to parse control peer addr")?;
for peer in &peer_addrs {
memory_lookup.add_endpoint_info(peer.clone());
}
let peer_ids = peer_addrs
.iter()
.map(|addr| addr.id)
.collect::<Vec<PublicKey>>();
let (sender, receiver) = gossip
.subscribe_and_join(control_topic(), peer_ids)
.await?
.split();
Ok(Self {
sender,
receiver,
_router: router,
_gossip: gossip,
_memory_lookup: memory_lookup,
})
}
pub async fn announce(&mut self, announcement: StreamControlAnnouncement) -> Result<()> {
let bytes = serde_json::to_vec(&announcement)?;
self.sender.broadcast(Bytes::from(bytes)).await?;
Ok(())
}
pub async fn next_announcement(&mut self) -> Result<Option<StreamControlAnnouncement>> {
while let Some(event) = self.receiver.try_next().await? {
if let Event::Received(msg) = event {
if let Ok(announcement) =
serde_json::from_slice::<StreamControlAnnouncement>(&msg.content)
{
return Ok(Some(announcement));
}
}
}
Ok(None)
}
pub fn add_peers(&self, peers: Vec<EndpointAddr>) {
for peer in peers {
self._memory_lookup.add_endpoint_info(peer);
}
}
}

View file

@ -9,8 +9,8 @@ use clap::{Parser, Subcommand};
use ec_chopper::{build_manifest_body_for_chunks, TsChunk}; use ec_chopper::{build_manifest_body_for_chunks, TsChunk};
use ec_core::{ use ec_core::{
merkle_proof_for_index, verify_merkle_proof, Manifest, ManifestSummary, ManifestVariant, merkle_proof_for_index, verify_merkle_proof, Manifest, ManifestSummary, ManifestVariant,
MoqStreamDescriptor, StreamCatalogEntry, StreamDescriptor, StreamEncryptionInfo, StreamId, MoqStreamDescriptor, StreamCatalogEntry, StreamControlAnnouncement, StreamDescriptor,
StreamKey, StreamMetadata, StreamEncryptionInfo, StreamId, StreamKey, StreamMetadata, StreamTransportDescriptor,
}; };
use ec_crypto::{ use ec_crypto::{
decrypt_stream_data, encrypt_stream_data, load_manifest_keypair_from_env, sign_manifest_id, decrypt_stream_data, encrypt_stream_data, load_manifest_keypair_from_env, sign_manifest_id,
@ -23,6 +23,7 @@ use ec_moq::{
MoqNode, MoqPublishSet, ObjectId, ObjectMeta, ObjectPayload, TimingMeta, TrackName, MoqNode, MoqPublishSet, ObjectId, ObjectMeta, ObjectPayload, TimingMeta, TrackName,
DEFAULT_MANIFEST_TRACK_NAME, DEFAULT_TRACK_NAME, DEFAULT_MANIFEST_TRACK_NAME, DEFAULT_TRACK_NAME,
}; };
use futures_util::{SinkExt, StreamExt};
use iroh::Watcher; use iroh::Watcher;
use just_webrtc::types::{DataChannelOptions, ICEServer, PeerConfiguration, PeerConnectionState}; use just_webrtc::types::{DataChannelOptions, ICEServer, PeerConfiguration, PeerConnectionState};
use just_webrtc::{DataChannelExt, PeerConnectionBuilder, PeerConnectionExt}; use just_webrtc::{DataChannelExt, PeerConnectionBuilder, PeerConnectionExt};
@ -37,9 +38,8 @@ use std::process::{Command, Stdio};
use std::str::FromStr; use std::str::FromStr;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio_tungstenite::tungstenite::Message as WsMessage;
use futures_util::{SinkExt, StreamExt};
use tokio::process::Command as TokioCommand; use tokio::process::Command as TokioCommand;
use tokio_tungstenite::tungstenite::Message as WsMessage;
use url::Url; use url::Url;
const DIRECT_WIRE_TAG_FRAME: u8 = 0x00; const DIRECT_WIRE_TAG_FRAME: u8 = 0x00;
@ -49,6 +49,7 @@ const DIRECT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(8);
// Conservatively under typical SCTP data channel max message sizes. // Conservatively under typical SCTP data channel max message sizes.
const DIRECT_WIRE_CHUNK_BYTES: usize = 16 * 1024; const DIRECT_WIRE_CHUNK_BYTES: usize = 16 * 1024;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::RwLock; use tokio::sync::RwLock;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
@ -79,6 +80,10 @@ enum Commands {
WsSubscribe(WsSubscribeArgs), WsSubscribe(WsSubscribeArgs),
/// Publish a CMAF (fMP4) stream to a MoQ relay over WebTransport (Cloudflare preview by default). /// Publish a CMAF (fMP4) stream to a MoQ relay over WebTransport (Cloudflare preview by default).
WtPublish(WtPublishArgs), WtPublish(WtPublishArgs),
/// Announce stream transport availability over iroh gossip control topic.
ControlAnnounce(ControlAnnounceArgs),
/// Listen for stream transport announcements from iroh gossip control topic.
ControlListen(ControlListenArgs),
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] #[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
@ -423,6 +428,84 @@ struct WtPublishArgs {
/// Danger: disable TLS verification for the relay. /// Danger: disable TLS verification for the relay.
#[arg(long, default_value_t = false)] #[arg(long, default_value_t = false)]
tls_disable_verify: bool, tls_disable_verify: bool,
/// Announce this relay stream over iroh gossip control topic.
#[arg(long, default_value_t = false)]
control_announce: bool,
/// Control gossip TTL (ms) for control announcements.
#[arg(long, default_value_t = 15000)]
control_ttl_ms: u64,
/// Control gossip announce interval (ms).
#[arg(long, default_value_t = 5000)]
control_interval_ms: u64,
/// Optional iroh secret key (hex) for control gossip endpoint identity.
#[arg(long)]
iroh_secret: Option<String>,
/// Discovery modes to enable for control gossip endpoint (comma-separated: dht, mdns, dns).
#[arg(long)]
discovery: Option<String>,
/// Gossip peers to connect to for control announcements (repeatable).
#[arg(long)]
gossip_peer: Vec<String>,
}
#[derive(Parser, Debug)]
struct ControlAnnounceArgs {
/// Stable stream id to announce.
#[arg(long)]
stream_id: String,
/// Optional human title. Defaults to stream id.
#[arg(long)]
title: Option<String>,
/// Announcement TTL in milliseconds.
#[arg(long, default_value_t = 15000)]
ttl_ms: u64,
/// Announcement interval in milliseconds.
#[arg(long, default_value_t = 5000)]
interval_ms: u64,
/// Relay URL for relay transport advertisement.
#[arg(long)]
relay_url: Option<String>,
/// Relay broadcast name for relay transport advertisement.
#[arg(long)]
relay_broadcast: Option<String>,
/// Relay track name for relay transport advertisement.
#[arg(long, default_value = "video0.m4s")]
relay_track: String,
/// Direct iroh endpoint address/id for direct transport advertisement.
/// Defaults to this process endpoint id when `--direct-broadcast` is set.
#[arg(long)]
direct_endpoint: Option<String>,
/// Direct iroh broadcast name for direct transport advertisement.
#[arg(long)]
direct_broadcast: Option<String>,
/// Direct iroh track name for direct transport advertisement.
#[arg(long, default_value = DEFAULT_TRACK_NAME)]
direct_track: String,
/// Optional iroh secret key (hex) for control gossip endpoint identity.
#[arg(long)]
iroh_secret: Option<String>,
/// Discovery modes to enable (comma-separated: dht, mdns, dns).
#[arg(long)]
discovery: Option<String>,
/// Gossip peers to connect to (repeatable).
#[arg(long)]
gossip_peer: Vec<String>,
}
#[derive(Parser, Debug)]
struct ControlListenArgs {
/// Optional iroh secret key (hex) for control gossip endpoint identity.
#[arg(long)]
iroh_secret: Option<String>,
/// Discovery modes to enable (comma-separated: dht, mdns, dns).
#[arg(long)]
discovery: Option<String>,
/// Gossip peers to connect to (repeatable).
#[arg(long)]
gossip_peer: Vec<String>,
/// Exit after the first announcement.
#[arg(long, default_value_t = false)]
once: bool,
} }
#[derive(Subcommand, Debug)] #[derive(Subcommand, Debug)]
@ -502,6 +585,8 @@ fn main() -> Result<()> {
Commands::WsPublish(args) => run_async(ws_publish(args))?, Commands::WsPublish(args) => run_async(ws_publish(args))?,
Commands::WsSubscribe(args) => run_async(ws_subscribe(args))?, Commands::WsSubscribe(args) => run_async(ws_subscribe(args))?,
Commands::WtPublish(args) => run_async(wt_publish(args))?, Commands::WtPublish(args) => run_async(wt_publish(args))?,
Commands::ControlAnnounce(args) => run_async(control_announce(args))?,
Commands::ControlListen(args) => run_async(control_listen(args))?,
} }
Ok(()) Ok(())
@ -3679,7 +3764,9 @@ fn ws_url_for(base: &str, stream_id: &str, role: &str) -> String {
} }
async fn ws_send_frame( async fn ws_send_frame(
ws: &mut tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, ws: &mut tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
frame: &[u8], frame: &[u8],
) -> Result<()> { ) -> Result<()> {
let len = u32::try_from(frame.len()).map_err(|_| anyhow!("frame too large"))?; let len = u32::try_from(frame.len()).map_err(|_| anyhow!("frame too large"))?;
@ -3784,7 +3871,9 @@ async fn ws_publish(args: WsPublishArgs) -> Result<()> {
expires_ms: now.saturating_add(ttl), expires_ms: now.saturating_add(ttl),
}; };
let url = format!("{dir2}/api/announce"); let url = format!("{dir2}/api/announce");
let _ = tokio::time::timeout(Duration::from_secs(5), client2.post(url).json(&req).send()).await; let _ =
tokio::time::timeout(Duration::from_secs(5), client2.post(url).json(&req).send())
.await;
tokio::time::sleep(Duration::from_millis(ttl.saturating_mul(3) / 4)).await; tokio::time::sleep(Duration::from_millis(ttl.saturating_mul(3) / 4)).await;
} }
}); });
@ -3983,7 +4072,10 @@ async fn ws_subscribe(args: WsSubscribeArgs) -> Result<()> {
if deadline.is_some_and(|d| Instant::now() > d) { if deadline.is_some_and(|d| Instant::now() > d) {
break; break;
} }
let next = ws.next().await.ok_or_else(|| anyhow!("websocket closed"))??; let next = ws
.next()
.await
.ok_or_else(|| anyhow!("websocket closed"))??;
let bytes = match next { let bytes = match next {
WsMessage::Binary(b) => b, WsMessage::Binary(b) => b,
WsMessage::Close(_) => break, WsMessage::Close(_) => break,
@ -4202,6 +4294,171 @@ fn build_catalog_entry(
} }
} }
fn now_unix_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
fn build_control_announcement(
stream_id: String,
title: String,
transports: Vec<StreamTransportDescriptor>,
ttl_ms: u64,
) -> StreamControlAnnouncement {
let stream = StreamDescriptor {
id: StreamId(stream_id.clone()),
title,
number: None,
source: "control".to_string(),
metadata: vec![StreamMetadata {
key: "stream_id".to_string(),
value: stream_id,
}],
};
StreamControlAnnouncement {
stream,
transports,
updated_unix_ms: now_unix_ms(),
ttl_ms,
}
}
async fn spawn_control_announcer_task(
endpoint: iroh::Endpoint,
peers: Vec<String>,
mut announcement: StreamControlAnnouncement,
interval: Duration,
) -> Result<oneshot::Sender<()>> {
let mut gossip = tokio::time::timeout(
Duration::from_secs(10),
ec_iroh::ControlGossip::join(endpoint, &peers),
)
.await
.context("timed out joining control gossip topic")??;
let (stop_tx, mut stop_rx) = oneshot::channel::<()>();
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval.max(Duration::from_secs(1)));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
announcement.updated_unix_ms = now_unix_ms();
if let Err(err) = gossip.announce(announcement.clone()).await {
tracing::warn!("control announce failed: {err:#}");
}
tokio::select! {
_ = ticker.tick() => {}
_ = &mut stop_rx => break,
}
}
});
Ok(stop_tx)
}
async fn control_announce(args: ControlAnnounceArgs) -> Result<()> {
if args.gossip_peer.is_empty() {
return Err(anyhow!(
"control announce requires at least one --gossip-peer currently"
));
}
let secret = parse_iroh_secret(args.iroh_secret)?;
let discovery = parse_discovery(args.discovery.as_deref())?;
let endpoint = ec_iroh::build_endpoint(secret, discovery).await?;
let mut transports = Vec::new();
if let (Some(url), Some(broadcast_name)) =
(args.relay_url.as_ref(), args.relay_broadcast.as_ref())
{
transports.push(StreamTransportDescriptor::RelayMoq {
url: url.clone(),
broadcast_name: broadcast_name.clone(),
track_name: args.relay_track.clone(),
});
}
if let Some(broadcast_name) = args.direct_broadcast.as_ref() {
let endpoint_addr = args
.direct_endpoint
.clone()
.unwrap_or_else(|| endpoint.id().to_string());
transports.push(StreamTransportDescriptor::IrohDirect {
endpoint: endpoint_addr,
broadcast_name: broadcast_name.clone(),
track_name: args.direct_track.clone(),
});
}
if transports.is_empty() {
return Err(anyhow!(
"no transports configured; set relay (--relay-url + --relay-broadcast) and/or direct (--direct-broadcast)"
));
}
let title = args.title.clone().unwrap_or_else(|| args.stream_id.clone());
let announcement =
build_control_announcement(args.stream_id.clone(), title, transports, args.ttl_ms);
let stop_tx = spawn_control_announcer_task(
endpoint.clone(),
args.gossip_peer.clone(),
announcement,
Duration::from_millis(args.interval_ms.max(1000)),
)
.await?;
eprintln!("control endpoint id: {}", endpoint.id());
eprintln!("control stream_id: {}", args.stream_id);
eprintln!(
"control announce interval_ms: {}",
args.interval_ms.max(1000)
);
eprintln!("control ttl_ms: {}", args.ttl_ms);
tokio::signal::ctrl_c().await?;
let _ = stop_tx.send(());
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
async fn control_listen(args: ControlListenArgs) -> Result<()> {
let secret = parse_iroh_secret(args.iroh_secret)?;
let discovery = parse_discovery(args.discovery.as_deref())?;
let endpoint = ec_iroh::build_endpoint(secret, discovery).await?;
eprintln!("control endpoint id: {}", endpoint.id());
let mut gossip = tokio::time::timeout(
Duration::from_secs(30),
ec_iroh::ControlGossip::join(endpoint.clone(), &args.gossip_peer),
)
.await
.context("timed out joining control gossip topic")??;
loop {
tokio::select! {
maybe = gossip.next_announcement() => {
let Some(announcement) = maybe? else {
continue;
};
println!("{}", serde_json::to_string(&announcement)?);
if args.once {
break;
}
}
_ = tokio::signal::ctrl_c() => {
break;
}
}
}
Ok(())
}
fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> { fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> {
let start = Instant::now(); let start = Instant::now();
let mut last_len: Option<u64> = None; let mut last_len: Option<u64> = None;
@ -4237,6 +4494,50 @@ fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> {
async fn wt_publish(args: WtPublishArgs) -> Result<()> { async fn wt_publish(args: WtPublishArgs) -> Result<()> {
let relay_url = let relay_url =
Url::parse(&args.url).with_context(|| format!("invalid relay url: {}", args.url))?; Url::parse(&args.url).with_context(|| format!("invalid relay url: {}", args.url))?;
let relay_url_str = relay_url.to_string();
let mut control_stop: Option<oneshot::Sender<()>> = None;
if args.control_announce {
let secret = parse_iroh_secret(args.iroh_secret.clone())?;
let discovery = parse_discovery(args.discovery.as_deref())?;
let endpoint = ec_iroh::build_endpoint(secret, discovery).await?;
let announcement = build_control_announcement(
args.name.clone(),
args.name.clone(),
vec![StreamTransportDescriptor::RelayMoq {
url: relay_url_str.clone(),
broadcast_name: args.name.clone(),
track_name: "video0.m4s".to_string(),
}],
args.control_ttl_ms,
);
if args.gossip_peer.is_empty() {
tracing::warn!("control announce requested but no gossip peers configured; skipping");
} else {
match spawn_control_announcer_task(
endpoint.clone(),
args.gossip_peer.clone(),
announcement,
Duration::from_millis(args.control_interval_ms.max(1000)),
)
.await
{
Ok(stop_tx) => {
tracing::info!(
endpoint = %endpoint.id(),
stream = %args.name,
"control announce enabled"
);
control_stop = Some(stop_tx);
}
Err(err) => {
tracing::warn!("failed to start control announce task: {err:#}");
}
}
}
}
// Create a local origin + broadcast, then pass an OriginConsumer into the client so it can // Create a local origin + broadcast, then pass an OriginConsumer into the client so it can
// publish announcements to the relay. // publish announcements to the relay.
@ -4270,15 +4571,15 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
fn accept_bi( fn accept_bi(
&self, &self,
) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + web_transport_trait::MaybeSend ) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>>
{ + web_transport_trait::MaybeSend {
self.inner.accept_bi() self.inner.accept_bi()
} }
fn open_bi( fn open_bi(
&self, &self,
) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + web_transport_trait::MaybeSend ) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>>
{ + web_transport_trait::MaybeSend {
self.inner.open_bi() self.inner.open_bi()
} }
@ -4358,7 +4659,8 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
_server_name: &rustls::pki_types::ServerName<'_>, _server_name: &rustls::pki_types::ServerName<'_>,
_ocsp: &[u8], _ocsp: &[u8],
_now: rustls::pki_types::UnixTime, _now: rustls::pki_types::UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> { ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error>
{
Ok(rustls::client::danger::ServerCertVerified::assertion()) Ok(rustls::client::danger::ServerCertVerified::assertion())
} }
@ -4367,7 +4669,8 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
message: &[u8], message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>, cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct, dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> { ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error>
{
rustls::crypto::verify_tls12_signature( rustls::crypto::verify_tls12_signature(
message, message,
cert, cert,
@ -4381,7 +4684,8 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
message: &[u8], message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>, cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct, dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> { ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error>
{
rustls::crypto::verify_tls13_signature( rustls::crypto::verify_tls13_signature(
message, message,
cert, cert,
@ -4395,9 +4699,9 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
} }
} }
let provider = rustls::crypto::CryptoProvider::get_default().cloned().unwrap_or_else(|| { let provider = rustls::crypto::CryptoProvider::get_default()
Arc::new(rustls::crypto::ring::default_provider()) .cloned()
}); .unwrap_or_else(|| Arc::new(rustls::crypto::ring::default_provider()));
tls.dangerous() tls.dangerous()
.set_certificate_verifier(Arc::new(NoCertificateVerification(provider))); .set_certificate_verifier(Arc::new(NoCertificateVerification(provider)));
} }
@ -4406,8 +4710,7 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
tls.alpn_protocols = vec![web_transport_quinn::ALPN.as_bytes().to_vec()]; tls.alpn_protocols = vec![web_transport_quinn::ALPN.as_bytes().to_vec()];
// Build a Quinn endpoint. // Build a Quinn endpoint.
let socket = std::net::UdpSocket::bind("[::]:0") let socket = std::net::UdpSocket::bind("[::]:0").context("failed to bind UDP socket")?;
.context("failed to bind UDP socket")?;
let mut transport = quinn::TransportConfig::default(); let mut transport = quinn::TransportConfig::default();
transport.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap())); transport.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap()));
@ -4484,8 +4787,7 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
} }
} }
Err(last_err.unwrap_or_else(|| anyhow!("failed to connect"))) Err(last_err.unwrap_or_else(|| anyhow!("failed to connect"))).context("failed MoQ SETUP")
.context("failed MoQ SETUP")
} }
tracing::info!(url=%relay_url, name=%args.name, "connecting to relay"); tracing::info!(url=%relay_url, name=%args.name, "connecting to relay");
@ -4565,7 +4867,7 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
tokio::pin!(decode_fut); tokio::pin!(decode_fut);
tracing::info!("publishing fMP4 -> moq-mux -> relay"); tracing::info!("publishing fMP4 -> moq-mux -> relay");
tokio::select! { let outcome = tokio::select! {
res = &mut decode_fut => { res = &mut decode_fut => {
let status = child.wait().await.context("failed to wait for ffmpeg")?; let status = child.wait().await.context("failed to wait for ffmpeg")?;
match res { match res {
@ -4584,5 +4886,11 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
Ok(()) Ok(())
} }
};
if let Some(stop) = control_stop.take() {
let _ = stop.send(());
} }
outcome
} }

View file

@ -0,0 +1,45 @@
# ECP-0066: iroh-Gossip Control Protocol For Hybrid MoQ Discovery
Status: Draft
## Decision
Add a first-party control protocol carried over iroh-gossip to advertise stream availability across multiple transport paths:
- MoQ relay (WebTransport URL + broadcast name + track),
- iroh direct (EndpointAddr + broadcast name + track).
`ec-node` will expose:
1. `control-announce`: publish control announcements to an iroh gossip topic.
2. `control-listen`: subscribe to the topic and print announcements.
3. Optional integration in `wt-publish` to announce relay-published streams via the same control topic.
## Motivation
We currently have two transport worlds:
- relay-centric (`wt-publish`),
- direct iroh (`moq-publish` / `moq-subscribe`).
Discovery is fragmented. A shared iroh control channel makes transport selection explicit and allows consumers to discover either path (or both) from one substrate.
## Scope
In scope:
- New control protocol data types in `ec-core`.
- New `ControlGossip` helper in `ec-iroh`.
- New CLI surfaces in `ec-node` for announce/listen.
- Optional relay announcement from `wt-publish`.
Out of scope:
- Policy engine for automatic best-path selection.
- Security policy beyond existing iroh/gossip trust boundaries.
- Replacing existing catalog gossip immediately (coexist first).
## Rollout / Reversibility
- Additive and reversible: removing control commands and topic does not affect existing media paths.
- Existing MoQ publish/subscribe flows continue unchanged.