diff --git a/Cargo.lock b/Cargo.lock index b577eac..f1792a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1826,7 +1826,10 @@ dependencies = [ "moq-lite 0.14.0", "moq-mux", "moq-native", + "quinn", "reqwest", + "rustls", + "rustls-native-certs", "serde", "serde_json", "tokio", @@ -1835,6 +1838,8 @@ dependencies = [ "tracing-subscriber", "url", "urlencoding", + "web-transport-quinn", + "web-transport-trait", "which 6.0.3", ] diff --git a/crates/ec-node/Cargo.toml b/crates/ec-node/Cargo.toml index 9d4023e..3dbc838 100644 --- a/crates/ec-node/Cargo.toml +++ b/crates/ec-node/Cargo.toml @@ -20,7 +20,10 @@ hex = "0.4" iroh = "0.96" just-webrtc = "0.2" bytes = "1" +quinn = "0.11.9" reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } +rustls = "0.23.36" +rustls-native-certs = "0.8.3" urlencoding = "2" serde.workspace = true serde_json.workspace = true @@ -29,6 +32,8 @@ tokio-tungstenite = { version = "0.24", default-features = false, features = ["c futures-util = "0.3" tracing.workspace = true tracing-subscriber.workspace = true +web-transport-quinn = "0.11.4" +web-transport-trait = "0.3.3" hang = "0.14.0" moq-mux = "0.2.1" moq-lite = "0.14.0" diff --git a/crates/ec-node/src/main.rs b/crates/ec-node/src/main.rs index 7a02b44..6e36cf9 100644 --- a/crates/ec-node/src/main.rs +++ b/crates/ec-node/src/main.rs @@ -4231,16 +4231,6 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { let relay_url = Url::parse(&args.url).with_context(|| format!("invalid relay url: {}", args.url))?; - // Cloudflare's MoQ technical preview relay currently does not support ANNOUNCE. - // Use the moq-lite publish model (as used by hang) for relay interop. - let mut client_cfg = moq_native::ClientConfig::default(); - if args.tls_disable_verify { - client_cfg.tls.disable_verify = Some(true); - } - let client = client_cfg - .init() - .context("failed to init moq-native client")?; - // Create a local origin + broadcast, then pass an OriginConsumer into the client so it can // publish announcements to the relay. let origin = moq_lite::Origin::produce(); @@ -4253,12 +4243,236 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { let mut catalog = hang::CatalogProducer::default(); broadcast.insert_track(catalog.track.clone()); + #[derive(Clone)] + struct ProtocolOverride { + inner: S, + protocol: Option, + } + + impl web_transport_trait::Session for ProtocolOverride { + type SendStream = S::SendStream; + type RecvStream = S::RecvStream; + type Error = S::Error; + + fn accept_uni( + &self, + ) -> impl Future> + web_transport_trait::MaybeSend + { + self.inner.accept_uni() + } + + fn accept_bi( + &self, + ) -> impl Future> + web_transport_trait::MaybeSend + { + self.inner.accept_bi() + } + + fn open_bi( + &self, + ) -> impl Future> + web_transport_trait::MaybeSend + { + self.inner.open_bi() + } + + fn open_uni( + &self, + ) -> impl Future> + web_transport_trait::MaybeSend + { + self.inner.open_uni() + } + + fn send_datagram(&self, payload: bytes::Bytes) -> Result<(), Self::Error> { + self.inner.send_datagram(payload) + } + + fn recv_datagram( + &self, + ) -> impl Future> + web_transport_trait::MaybeSend + { + self.inner.recv_datagram() + } + + fn max_datagram_size(&self) -> usize { + self.inner.max_datagram_size() + } + + fn protocol(&self) -> Option<&str> { + self.protocol.as_deref().or_else(|| self.inner.protocol()) + } + + fn close(&self, code: u32, reason: &str) { + self.inner.close(code, reason) + } + + fn closed(&self) -> impl Future + web_transport_trait::MaybeSend { + self.inner.closed() + } + } + + async fn connect_moq_session( + relay_url: &Url, + publish: moq_lite::OriginConsumer, + tls_disable_verify: bool, + ) -> Result { + let host = relay_url + .host_str() + .ok_or_else(|| anyhow!("relay url missing host: {relay_url}"))? + .to_string(); + let port = relay_url.port().unwrap_or(443); + + // Build TLS config. + let mut roots = rustls::RootCertStore::empty(); + let native = rustls_native_certs::load_native_certs(); + if !native.errors.is_empty() { + tracing::warn!( + errors = ?native.errors, + "some native root certs could not be loaded" + ); + } + for cert in native.certs { + let _ = roots.add(cert); + } + + let mut tls = rustls::ClientConfig::builder() + .with_root_certificates(roots) + .with_no_client_auth(); + + if tls_disable_verify { + // Mirror moq-native's behavior: accept any certificate, but still verify signatures. + #[derive(Debug)] + struct NoCertificateVerification(Arc); + + impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification { + fn verify_server_cert( + &self, + _end_entity: &rustls::pki_types::CertificateDer<'_>, + _intermediates: &[rustls::pki_types::CertificateDer<'_>], + _server_name: &rustls::pki_types::ServerName<'_>, + _ocsp: &[u8], + _now: rustls::pki_types::UnixTime, + ) -> Result { + Ok(rustls::client::danger::ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + message: &[u8], + cert: &rustls::pki_types::CertificateDer<'_>, + dss: &rustls::DigitallySignedStruct, + ) -> Result { + rustls::crypto::verify_tls12_signature( + message, + cert, + dss, + &self.0.signature_verification_algorithms, + ) + } + + fn verify_tls13_signature( + &self, + message: &[u8], + cert: &rustls::pki_types::CertificateDer<'_>, + dss: &rustls::DigitallySignedStruct, + ) -> Result { + rustls::crypto::verify_tls13_signature( + message, + cert, + dss, + &self.0.signature_verification_algorithms, + ) + } + + fn supported_verify_schemes(&self) -> Vec { + self.0.signature_verification_algorithms.supported_schemes() + } + } + + let provider = rustls::crypto::CryptoProvider::get_default().cloned().unwrap_or_else(|| { + Arc::new(rustls::crypto::ring::default_provider()) + }); + tls.dangerous() + .set_certificate_verifier(Arc::new(NoCertificateVerification(provider))); + } + + // WebTransport over HTTP/3. + tls.alpn_protocols = vec![web_transport_quinn::ALPN.as_bytes().to_vec()]; + + // Build a Quinn endpoint. + let socket = std::net::UdpSocket::bind("[::]:0") + .context("failed to bind UDP socket")?; + + let mut transport = quinn::TransportConfig::default(); + transport.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap())); + transport.keep_alive_interval(Some(Duration::from_secs(4))); + transport.mtu_discovery_config(None); + + let transport = Arc::new(transport); + let runtime = quinn::default_runtime().context("no async runtime")?; + let endpoint_config = quinn::EndpointConfig::default(); + let endpoint = quinn::Endpoint::new(endpoint_config, None, socket, runtime) + .context("failed to create QUIC endpoint")?; + + // Resolve relay. + let ip = tokio::net::lookup_host((host.clone(), port)) + .await + .context("failed DNS lookup")? + .next() + .context("no DNS entries")?; + + let quic: quinn::crypto::rustls::QuicClientConfig = tls.try_into()?; + let mut client_cfg = quinn::ClientConfig::new(Arc::new(quic)); + client_cfg.transport_config(transport); + + tracing::debug!(%ip, %host, %relay_url, "connecting QUIC"); + let connection = endpoint + .connect_with(client_cfg, ip, &host)? + .await + .context("failed QUIC connect")?; + + // Establish a WebTransport session. + let mut request = web_transport_quinn::proto::ConnectRequest::new(relay_url.clone()); + for alpn in moq_lite::ALPNS { + request = request.with_protocol(alpn.to_string()); + } + let wt = web_transport_quinn::Session::connect(connection, request) + .await + .context("failed WebTransport CONNECT")?; + + // Establish a MoQ session. Cloudflare's relay currently does not always include a selected + // subprotocol in the CONNECT response, so we attempt a few protocol overrides to select + // the correct IETF draft encoding for SETUP. + let client = moq_lite::Client::new().with_publish(publish); + + // These correspond to IETF draft ALPNs as used by moq-lite/web code. + // We use string literals here since moq-lite does not currently expose these constants. + let attempts: [&str; 4] = ["moqt-16", "moqt-15", "moq-00", ""]; + let mut last_err: Option = None; + + for p in attempts { + let session = ProtocolOverride { + inner: wt.clone(), + protocol: (!p.is_empty()).then(|| p.to_string()), + }; + + match client.connect(session).await { + Ok(session) => { + tracing::info!(protocol = %p, "connected to relay"); + return Ok(session); + } + Err(err) => { + last_err = Some(anyhow::Error::new(err)); + tracing::debug!(protocol = %p, err = %last_err.as_ref().unwrap(), "MoQ SETUP failed; retrying"); + } + } + } + + Err(last_err.unwrap_or_else(|| anyhow!("failed to connect"))) + .context("failed MoQ SETUP") + } + tracing::info!(url=%relay_url, name=%args.name, "connecting to relay"); - let session = client - .with_publish(publish) - .connect(relay_url) - .await - .context("failed to connect to relay")?; + let session = connect_moq_session(&relay_url, publish, args.tls_disable_verify).await?; // Spawn ffmpeg to generate fMP4 suitable for hang/moq-mux. let mut cmd = TokioCommand::new("ffmpeg"); diff --git a/evolution/proposals/ECP-0063-cloudflare-moq-webtransport.md b/evolution/proposals/ECP-0063-cloudflare-moq-webtransport.md index fe314bc..f9f5b95 100644 --- a/evolution/proposals/ECP-0063-cloudflare-moq-webtransport.md +++ b/evolution/proposals/ECP-0063-cloudflare-moq-webtransport.md @@ -58,9 +58,10 @@ Cloudflare's public relay currently implements a subset of the IETF MoQ Transpor newer draft implementations. Implementation choice: -- Cloudflare's relay preview currently does **not** support `ANNOUNCE` (namespace-style publishing). `ec-node wt-publish` uses the `moq-lite` publish model via `moq-native` and `moq-mux` (fMP4 ingestion) for Cloudflare relay compatibility. -- On NixOS deployments, we disable `moq-native`'s WebSocket fallback (`MOQ_CLIENT_WEBSOCKET_ENABLED=false`) to ensure WebTransport (QUIC) is used. This avoids the WebSocket path occasionally "winning" the race and then failing MoQ negotiation against the Cloudflare relay, causing rapid reconnect loops. +- Cloudflare's relay preview currently does **not** support `ANNOUNCE` (namespace-style publishing). `ec-node wt-publish` uses the `moq-lite` publish model via `quinn` + `web-transport-quinn` + `moq-lite` and `moq-mux` (fMP4 ingestion) for Cloudflare relay compatibility. +- `ec-node wt-publish` is QUIC/WebTransport-only (no WebSocket fallback). NixOS deployments also set `MOQ_CLIENT_WEBSOCKET_ENABLED=false` as a belt-and-suspenders default for any other binaries that use `moq-native`. - For Cloudflare relay interop, we patch `web-transport-proto` to send and accept the standard WebTransport subprotocol negotiation header (`sec-webtransport-protocol`) in addition to the legacy `wt-available-protocols`/`wt-protocol` headers. Without subprotocol negotiation, the relay may not select a `moqt-*` protocol and can close the session immediately after MoQ `SETUP`. +- If the relay does not select a WebTransport subprotocol, `ec-node wt-publish` attempts MoQ `SETUP` with protocol overrides (`moqt-16`, `moqt-15`, `moq-00`) before falling back to "no protocol". ### Share link