diff --git a/README.md b/README.md index ab519c7..b8db931 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,20 @@ Watch (web): https://every.channel/watch?url=https%3A%2F%2Fcdn.moq.dev%2Fanon&name=la-nbc ``` +Control protocol (iroh gossip, relay + direct transport discovery): + +```sh +# Listener (on node A) +cargo run -p ec-node -- control-listen --gossip-peer + +# Announcer (on node B) +cargo run -p ec-node -- control-announce \ + --stream-id la-nbc \ + --relay-url https://cdn.moq.dev/anon \ + --relay-broadcast la-nbc \ + --gossip-peer +``` + Coverage: ```sh diff --git a/crates/ec-core/src/lib.rs b/crates/ec-core/src/lib.rs index 0b7c34b..fa2f029 100644 --- a/crates/ec-core/src/lib.rs +++ b/crates/ec-core/src/lib.rs @@ -183,6 +183,32 @@ pub struct StreamCatalog { pub entries: Vec, } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum StreamTransportDescriptor { + /// Stream is available via a MoQ relay over WebTransport/WebSocket. + RelayMoq { + url: String, + broadcast_name: String, + track_name: String, + }, + /// Stream is available via iroh direct MoQ. + IrohDirect { + endpoint: String, + broadcast_name: String, + track_name: String, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StreamControlAnnouncement { + pub stream: StreamDescriptor, + pub transports: Vec, + pub updated_unix_ms: u64, + /// Suggested freshness window for this announcement. + pub ttl_ms: u64, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ManifestSummary { pub manifest_id: String, diff --git a/crates/ec-iroh/src/lib.rs b/crates/ec-iroh/src/lib.rs index 1c8dddd..1c4947a 100644 --- a/crates/ec-iroh/src/lib.rs +++ b/crates/ec-iroh/src/lib.rs @@ -2,7 +2,7 @@ use anyhow::{Context, Result}; use bytes::Bytes; -use ec_core::StreamCatalogEntry; +use ec_core::{StreamCatalogEntry, StreamControlAnnouncement}; use futures_lite::StreamExt; use iroh::address_lookup::{ DhtAddressLookup, DiscoveryEvent, DnsAddressLookup, MdnsAddressLookup, PkarrPublisher, UserData, @@ -23,6 +23,7 @@ use std::time::{Duration, Instant}; pub const ALPN_MOQ: &[u8] = b"every.channel/moq/0"; pub const DEFAULT_CATALOG_TOPIC: &str = "every.channel/catalog/v1"; +pub const DEFAULT_CONTROL_TOPIC: &str = "every.channel/control/v1"; pub const MDNS_USER_DATA: &str = "every.channel"; #[derive(Debug, Clone)] @@ -176,6 +177,11 @@ pub fn catalog_topic() -> TopicId { TopicId::from_bytes(*hash.as_bytes()) } +pub fn control_topic() -> TopicId { + let hash = blake3::hash(DEFAULT_CONTROL_TOPIC.as_bytes()); + TopicId::from_bytes(*hash.as_bytes()) +} + pub fn parse_endpoint_addr(value: &str) -> Result { let value = value.trim(); if value.starts_with('{') { @@ -326,3 +332,75 @@ impl CatalogGossip { } } } + +#[derive(Debug)] +pub struct ControlGossip { + sender: GossipSender, + receiver: GossipReceiver, + _router: Router, + _gossip: Gossip, + _memory_lookup: MemoryLookup, +} + +impl ControlGossip { + pub async fn join(endpoint: Endpoint, peers: &[String]) -> Result { + let memory_lookup = MemoryLookup::new(); + endpoint.address_lookup().add(memory_lookup.clone()); + + let gossip = Gossip::builder().spawn(endpoint.clone()); + let router = Router::builder(endpoint.clone()) + .accept(GOSSIP_ALPN, gossip.clone()) + .spawn(); + + let peer_addrs = peers + .iter() + .map(|peer| parse_endpoint_addr(peer)) + .collect::, _>>() + .context("failed to parse control peer addr")?; + for peer in &peer_addrs { + memory_lookup.add_endpoint_info(peer.clone()); + } + let peer_ids = peer_addrs + .iter() + .map(|addr| addr.id) + .collect::>(); + + let (sender, receiver) = gossip + .subscribe_and_join(control_topic(), peer_ids) + .await? + .split(); + + Ok(Self { + sender, + receiver, + _router: router, + _gossip: gossip, + _memory_lookup: memory_lookup, + }) + } + + pub async fn announce(&mut self, announcement: StreamControlAnnouncement) -> Result<()> { + let bytes = serde_json::to_vec(&announcement)?; + self.sender.broadcast(Bytes::from(bytes)).await?; + Ok(()) + } + + pub async fn next_announcement(&mut self) -> Result> { + while let Some(event) = self.receiver.try_next().await? { + if let Event::Received(msg) = event { + if let Ok(announcement) = + serde_json::from_slice::(&msg.content) + { + return Ok(Some(announcement)); + } + } + } + Ok(None) + } + + pub fn add_peers(&self, peers: Vec) { + for peer in peers { + self._memory_lookup.add_endpoint_info(peer); + } + } +} diff --git a/crates/ec-node/src/main.rs b/crates/ec-node/src/main.rs index 584f7b6..5bfa9d8 100644 --- a/crates/ec-node/src/main.rs +++ b/crates/ec-node/src/main.rs @@ -9,8 +9,8 @@ use clap::{Parser, Subcommand}; use ec_chopper::{build_manifest_body_for_chunks, TsChunk}; use ec_core::{ merkle_proof_for_index, verify_merkle_proof, Manifest, ManifestSummary, ManifestVariant, - MoqStreamDescriptor, StreamCatalogEntry, StreamDescriptor, StreamEncryptionInfo, StreamId, - StreamKey, StreamMetadata, + MoqStreamDescriptor, StreamCatalogEntry, StreamControlAnnouncement, StreamDescriptor, + StreamEncryptionInfo, StreamId, StreamKey, StreamMetadata, StreamTransportDescriptor, }; use ec_crypto::{ decrypt_stream_data, encrypt_stream_data, load_manifest_keypair_from_env, sign_manifest_id, @@ -23,6 +23,7 @@ use ec_moq::{ MoqNode, MoqPublishSet, ObjectId, ObjectMeta, ObjectPayload, TimingMeta, TrackName, DEFAULT_MANIFEST_TRACK_NAME, DEFAULT_TRACK_NAME, }; +use futures_util::{SinkExt, StreamExt}; use iroh::Watcher; use just_webrtc::types::{DataChannelOptions, ICEServer, PeerConfiguration, PeerConnectionState}; use just_webrtc::{DataChannelExt, PeerConnectionBuilder, PeerConnectionExt}; @@ -37,9 +38,8 @@ use std::process::{Command, Stdio}; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use tokio_tungstenite::tungstenite::Message as WsMessage; -use futures_util::{SinkExt, StreamExt}; use tokio::process::Command as TokioCommand; +use tokio_tungstenite::tungstenite::Message as WsMessage; use url::Url; const DIRECT_WIRE_TAG_FRAME: u8 = 0x00; @@ -49,6 +49,7 @@ const DIRECT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(8); // Conservatively under typical SCTP data channel max message sizes. const DIRECT_WIRE_CHUNK_BYTES: usize = 16 * 1024; use tokio::sync::mpsc; +use tokio::sync::oneshot; use tokio::sync::RwLock; #[derive(Parser, Debug)] @@ -79,6 +80,10 @@ enum Commands { WsSubscribe(WsSubscribeArgs), /// Publish a CMAF (fMP4) stream to a MoQ relay over WebTransport (Cloudflare preview by default). WtPublish(WtPublishArgs), + /// Announce stream transport availability over iroh gossip control topic. + ControlAnnounce(ControlAnnounceArgs), + /// Listen for stream transport announcements from iroh gossip control topic. + ControlListen(ControlListenArgs), } #[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] @@ -423,6 +428,84 @@ struct WtPublishArgs { /// Danger: disable TLS verification for the relay. #[arg(long, default_value_t = false)] tls_disable_verify: bool, + /// Announce this relay stream over iroh gossip control topic. + #[arg(long, default_value_t = false)] + control_announce: bool, + /// Control gossip TTL (ms) for control announcements. + #[arg(long, default_value_t = 15000)] + control_ttl_ms: u64, + /// Control gossip announce interval (ms). + #[arg(long, default_value_t = 5000)] + control_interval_ms: u64, + /// Optional iroh secret key (hex) for control gossip endpoint identity. + #[arg(long)] + iroh_secret: Option, + /// Discovery modes to enable for control gossip endpoint (comma-separated: dht, mdns, dns). + #[arg(long)] + discovery: Option, + /// Gossip peers to connect to for control announcements (repeatable). + #[arg(long)] + gossip_peer: Vec, +} + +#[derive(Parser, Debug)] +struct ControlAnnounceArgs { + /// Stable stream id to announce. + #[arg(long)] + stream_id: String, + /// Optional human title. Defaults to stream id. + #[arg(long)] + title: Option, + /// Announcement TTL in milliseconds. + #[arg(long, default_value_t = 15000)] + ttl_ms: u64, + /// Announcement interval in milliseconds. + #[arg(long, default_value_t = 5000)] + interval_ms: u64, + /// Relay URL for relay transport advertisement. + #[arg(long)] + relay_url: Option, + /// Relay broadcast name for relay transport advertisement. + #[arg(long)] + relay_broadcast: Option, + /// Relay track name for relay transport advertisement. + #[arg(long, default_value = "video0.m4s")] + relay_track: String, + /// Direct iroh endpoint address/id for direct transport advertisement. + /// Defaults to this process endpoint id when `--direct-broadcast` is set. + #[arg(long)] + direct_endpoint: Option, + /// Direct iroh broadcast name for direct transport advertisement. + #[arg(long)] + direct_broadcast: Option, + /// Direct iroh track name for direct transport advertisement. + #[arg(long, default_value = DEFAULT_TRACK_NAME)] + direct_track: String, + /// Optional iroh secret key (hex) for control gossip endpoint identity. + #[arg(long)] + iroh_secret: Option, + /// Discovery modes to enable (comma-separated: dht, mdns, dns). + #[arg(long)] + discovery: Option, + /// Gossip peers to connect to (repeatable). + #[arg(long)] + gossip_peer: Vec, +} + +#[derive(Parser, Debug)] +struct ControlListenArgs { + /// Optional iroh secret key (hex) for control gossip endpoint identity. + #[arg(long)] + iroh_secret: Option, + /// Discovery modes to enable (comma-separated: dht, mdns, dns). + #[arg(long)] + discovery: Option, + /// Gossip peers to connect to (repeatable). + #[arg(long)] + gossip_peer: Vec, + /// Exit after the first announcement. + #[arg(long, default_value_t = false)] + once: bool, } #[derive(Subcommand, Debug)] @@ -502,6 +585,8 @@ fn main() -> Result<()> { Commands::WsPublish(args) => run_async(ws_publish(args))?, Commands::WsSubscribe(args) => run_async(ws_subscribe(args))?, Commands::WtPublish(args) => run_async(wt_publish(args))?, + Commands::ControlAnnounce(args) => run_async(control_announce(args))?, + Commands::ControlListen(args) => run_async(control_listen(args))?, } Ok(()) @@ -3679,7 +3764,9 @@ fn ws_url_for(base: &str, stream_id: &str, role: &str) -> String { } async fn ws_send_frame( - ws: &mut tokio_tungstenite::WebSocketStream>, + ws: &mut tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, frame: &[u8], ) -> Result<()> { let len = u32::try_from(frame.len()).map_err(|_| anyhow!("frame too large"))?; @@ -3784,7 +3871,9 @@ async fn ws_publish(args: WsPublishArgs) -> Result<()> { expires_ms: now.saturating_add(ttl), }; let url = format!("{dir2}/api/announce"); - let _ = tokio::time::timeout(Duration::from_secs(5), client2.post(url).json(&req).send()).await; + let _ = + tokio::time::timeout(Duration::from_secs(5), client2.post(url).json(&req).send()) + .await; tokio::time::sleep(Duration::from_millis(ttl.saturating_mul(3) / 4)).await; } }); @@ -3983,7 +4072,10 @@ async fn ws_subscribe(args: WsSubscribeArgs) -> Result<()> { if deadline.is_some_and(|d| Instant::now() > d) { break; } - let next = ws.next().await.ok_or_else(|| anyhow!("websocket closed"))??; + let next = ws + .next() + .await + .ok_or_else(|| anyhow!("websocket closed"))??; let bytes = match next { WsMessage::Binary(b) => b, WsMessage::Close(_) => break, @@ -4202,6 +4294,171 @@ fn build_catalog_entry( } } +fn now_unix_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 +} + +fn build_control_announcement( + stream_id: String, + title: String, + transports: Vec, + ttl_ms: u64, +) -> StreamControlAnnouncement { + let stream = StreamDescriptor { + id: StreamId(stream_id.clone()), + title, + number: None, + source: "control".to_string(), + metadata: vec![StreamMetadata { + key: "stream_id".to_string(), + value: stream_id, + }], + }; + + StreamControlAnnouncement { + stream, + transports, + updated_unix_ms: now_unix_ms(), + ttl_ms, + } +} + +async fn spawn_control_announcer_task( + endpoint: iroh::Endpoint, + peers: Vec, + mut announcement: StreamControlAnnouncement, + interval: Duration, +) -> Result> { + let mut gossip = tokio::time::timeout( + Duration::from_secs(10), + ec_iroh::ControlGossip::join(endpoint, &peers), + ) + .await + .context("timed out joining control gossip topic")??; + let (stop_tx, mut stop_rx) = oneshot::channel::<()>(); + + tokio::spawn(async move { + let mut ticker = tokio::time::interval(interval.max(Duration::from_secs(1))); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + announcement.updated_unix_ms = now_unix_ms(); + if let Err(err) = gossip.announce(announcement.clone()).await { + tracing::warn!("control announce failed: {err:#}"); + } + + tokio::select! { + _ = ticker.tick() => {} + _ = &mut stop_rx => break, + } + } + }); + + Ok(stop_tx) +} + +async fn control_announce(args: ControlAnnounceArgs) -> Result<()> { + if args.gossip_peer.is_empty() { + return Err(anyhow!( + "control announce requires at least one --gossip-peer currently" + )); + } + + let secret = parse_iroh_secret(args.iroh_secret)?; + let discovery = parse_discovery(args.discovery.as_deref())?; + let endpoint = ec_iroh::build_endpoint(secret, discovery).await?; + + let mut transports = Vec::new(); + + if let (Some(url), Some(broadcast_name)) = + (args.relay_url.as_ref(), args.relay_broadcast.as_ref()) + { + transports.push(StreamTransportDescriptor::RelayMoq { + url: url.clone(), + broadcast_name: broadcast_name.clone(), + track_name: args.relay_track.clone(), + }); + } + + if let Some(broadcast_name) = args.direct_broadcast.as_ref() { + let endpoint_addr = args + .direct_endpoint + .clone() + .unwrap_or_else(|| endpoint.id().to_string()); + transports.push(StreamTransportDescriptor::IrohDirect { + endpoint: endpoint_addr, + broadcast_name: broadcast_name.clone(), + track_name: args.direct_track.clone(), + }); + } + + if transports.is_empty() { + return Err(anyhow!( + "no transports configured; set relay (--relay-url + --relay-broadcast) and/or direct (--direct-broadcast)" + )); + } + + let title = args.title.clone().unwrap_or_else(|| args.stream_id.clone()); + let announcement = + build_control_announcement(args.stream_id.clone(), title, transports, args.ttl_ms); + + let stop_tx = spawn_control_announcer_task( + endpoint.clone(), + args.gossip_peer.clone(), + announcement, + Duration::from_millis(args.interval_ms.max(1000)), + ) + .await?; + + eprintln!("control endpoint id: {}", endpoint.id()); + eprintln!("control stream_id: {}", args.stream_id); + eprintln!( + "control announce interval_ms: {}", + args.interval_ms.max(1000) + ); + eprintln!("control ttl_ms: {}", args.ttl_ms); + + tokio::signal::ctrl_c().await?; + let _ = stop_tx.send(()); + tokio::time::sleep(Duration::from_millis(100)).await; + Ok(()) +} + +async fn control_listen(args: ControlListenArgs) -> Result<()> { + let secret = parse_iroh_secret(args.iroh_secret)?; + let discovery = parse_discovery(args.discovery.as_deref())?; + let endpoint = ec_iroh::build_endpoint(secret, discovery).await?; + eprintln!("control endpoint id: {}", endpoint.id()); + let mut gossip = tokio::time::timeout( + Duration::from_secs(30), + ec_iroh::ControlGossip::join(endpoint.clone(), &args.gossip_peer), + ) + .await + .context("timed out joining control gossip topic")??; + + loop { + tokio::select! { + maybe = gossip.next_announcement() => { + let Some(announcement) = maybe? else { + continue; + }; + println!("{}", serde_json::to_string(&announcement)?); + if args.once { + break; + } + } + _ = tokio::signal::ctrl_c() => { + break; + } + } + } + + Ok(()) +} + fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> { let start = Instant::now(); let mut last_len: Option = None; @@ -4237,6 +4494,50 @@ fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> { async fn wt_publish(args: WtPublishArgs) -> Result<()> { let relay_url = Url::parse(&args.url).with_context(|| format!("invalid relay url: {}", args.url))?; + let relay_url_str = relay_url.to_string(); + + let mut control_stop: Option> = None; + if args.control_announce { + let secret = parse_iroh_secret(args.iroh_secret.clone())?; + let discovery = parse_discovery(args.discovery.as_deref())?; + let endpoint = ec_iroh::build_endpoint(secret, discovery).await?; + + let announcement = build_control_announcement( + args.name.clone(), + args.name.clone(), + vec![StreamTransportDescriptor::RelayMoq { + url: relay_url_str.clone(), + broadcast_name: args.name.clone(), + track_name: "video0.m4s".to_string(), + }], + args.control_ttl_ms, + ); + + if args.gossip_peer.is_empty() { + tracing::warn!("control announce requested but no gossip peers configured; skipping"); + } else { + match spawn_control_announcer_task( + endpoint.clone(), + args.gossip_peer.clone(), + announcement, + Duration::from_millis(args.control_interval_ms.max(1000)), + ) + .await + { + Ok(stop_tx) => { + tracing::info!( + endpoint = %endpoint.id(), + stream = %args.name, + "control announce enabled" + ); + control_stop = Some(stop_tx); + } + Err(err) => { + tracing::warn!("failed to start control announce task: {err:#}"); + } + } + } + } // Create a local origin + broadcast, then pass an OriginConsumer into the client so it can // publish announcements to the relay. @@ -4270,15 +4571,15 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { fn accept_bi( &self, - ) -> impl Future> + web_transport_trait::MaybeSend - { + ) -> impl Future> + + web_transport_trait::MaybeSend { self.inner.accept_bi() } fn open_bi( &self, - ) -> impl Future> + web_transport_trait::MaybeSend - { + ) -> impl Future> + + web_transport_trait::MaybeSend { self.inner.open_bi() } @@ -4358,7 +4659,8 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { _server_name: &rustls::pki_types::ServerName<'_>, _ocsp: &[u8], _now: rustls::pki_types::UnixTime, - ) -> Result { + ) -> Result + { Ok(rustls::client::danger::ServerCertVerified::assertion()) } @@ -4367,7 +4669,8 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { message: &[u8], cert: &rustls::pki_types::CertificateDer<'_>, dss: &rustls::DigitallySignedStruct, - ) -> Result { + ) -> Result + { rustls::crypto::verify_tls12_signature( message, cert, @@ -4381,7 +4684,8 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { message: &[u8], cert: &rustls::pki_types::CertificateDer<'_>, dss: &rustls::DigitallySignedStruct, - ) -> Result { + ) -> Result + { rustls::crypto::verify_tls13_signature( message, cert, @@ -4395,9 +4699,9 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { } } - let provider = rustls::crypto::CryptoProvider::get_default().cloned().unwrap_or_else(|| { - Arc::new(rustls::crypto::ring::default_provider()) - }); + let provider = rustls::crypto::CryptoProvider::get_default() + .cloned() + .unwrap_or_else(|| Arc::new(rustls::crypto::ring::default_provider())); tls.dangerous() .set_certificate_verifier(Arc::new(NoCertificateVerification(provider))); } @@ -4406,8 +4710,7 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { tls.alpn_protocols = vec![web_transport_quinn::ALPN.as_bytes().to_vec()]; // Build a Quinn endpoint. - let socket = std::net::UdpSocket::bind("[::]:0") - .context("failed to bind UDP socket")?; + let socket = std::net::UdpSocket::bind("[::]:0").context("failed to bind UDP socket")?; let mut transport = quinn::TransportConfig::default(); transport.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap())); @@ -4484,8 +4787,7 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { } } - Err(last_err.unwrap_or_else(|| anyhow!("failed to connect"))) - .context("failed MoQ SETUP") + Err(last_err.unwrap_or_else(|| anyhow!("failed to connect"))).context("failed MoQ SETUP") } tracing::info!(url=%relay_url, name=%args.name, "connecting to relay"); @@ -4565,7 +4867,7 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { tokio::pin!(decode_fut); tracing::info!("publishing fMP4 -> moq-mux -> relay"); - tokio::select! { + let outcome = tokio::select! { res = &mut decode_fut => { let status = child.wait().await.context("failed to wait for ffmpeg")?; match res { @@ -4584,5 +4886,11 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { tokio::time::sleep(Duration::from_millis(100)).await; Ok(()) } + }; + + if let Some(stop) = control_stop.take() { + let _ = stop.send(()); } + + outcome } diff --git a/evolution/proposals/ECP-0066-iroh-control-protocol.md b/evolution/proposals/ECP-0066-iroh-control-protocol.md new file mode 100644 index 0000000..359c0cd --- /dev/null +++ b/evolution/proposals/ECP-0066-iroh-control-protocol.md @@ -0,0 +1,45 @@ +# ECP-0066: iroh-Gossip Control Protocol For Hybrid MoQ Discovery + +Status: Draft + +## Decision + +Add a first-party control protocol carried over iroh-gossip to advertise stream availability across multiple transport paths: + +- MoQ relay (WebTransport URL + broadcast name + track), +- iroh direct (EndpointAddr + broadcast name + track). + +`ec-node` will expose: + +1. `control-announce`: publish control announcements to an iroh gossip topic. +2. `control-listen`: subscribe to the topic and print announcements. +3. Optional integration in `wt-publish` to announce relay-published streams via the same control topic. + +## Motivation + +We currently have two transport worlds: + +- relay-centric (`wt-publish`), +- direct iroh (`moq-publish` / `moq-subscribe`). + +Discovery is fragmented. A shared iroh control channel makes transport selection explicit and allows consumers to discover either path (or both) from one substrate. + +## Scope + +In scope: + +- New control protocol data types in `ec-core`. +- New `ControlGossip` helper in `ec-iroh`. +- New CLI surfaces in `ec-node` for announce/listen. +- Optional relay announcement from `wt-publish`. + +Out of scope: + +- Policy engine for automatic best-path selection. +- Security policy beyond existing iroh/gossip trust boundaries. +- Replacing existing catalog gossip immediately (coexist first). + +## Rollout / Reversibility + +- Additive and reversible: removing control commands and topic does not affect existing media paths. +- Existing MoQ publish/subscribe flows continue unchanged.