diff --git a/Cargo.lock b/Cargo.lock index 9d963a3..7eed7bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1085,16 +1085,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "core-foundation" -version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "core-foundation" version = "0.10.1" @@ -1118,7 +1108,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa95a34622365fa5bbf40b20b75dba8dfa8c94c734aea8ac9a5ca38af14316f1" dependencies = [ "bitflags 2.10.0", - "core-foundation 0.10.1", + "core-foundation", "core-graphics-types", "foreign-types", "libc", @@ -1131,7 +1121,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d44a101f213f6c4cdc1853d4b78aef6db6bdfa3468798cc1d9912f4735013eb" dependencies = [ "bitflags 2.10.0", - "core-foundation 0.10.1", + "core-foundation", "libc", ] @@ -1833,11 +1823,9 @@ dependencies = [ "hex", "iroh", "just-webrtc", + "moq-lite 0.14.0", "moq-mux", "moq-native", - "moq-native-ietf", - "moq-pub", - "moq-transport", "reqwest", "serde", "serde_json", @@ -2032,35 +2020,12 @@ dependencies = [ "syn 2.0.114", ] -[[package]] -name = "env_filter" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a1c3cc8e57274ec99de65301228b537f1e4eedc1b8e0f9411c6caac8ae7308f" -dependencies = [ - "log", - "regex", -] - [[package]] name = "env_home" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7f84e12ccf0a7ddc17a6c41c93326024c42920d7ee630d04950e6926645c0fe" -[[package]] -name = "env_logger" -version = "0.11.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2daee4ea451f429a58296525ddf28b45a3b64f1acf6587e2067437bb11e218d" -dependencies = [ - "anstream", - "anstyle", - "env_filter", - "jiff", - "log", -] - [[package]] name = "equivalent" version = "1.0.2" @@ -2255,12 +2220,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "four-cc" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "795cbfc56d419a7ce47ccbb7504dd9a5b7c484c083c356e797de08bd988d9629" - [[package]] name = "fs_extra" version = "1.3.0" @@ -3709,30 +3668,6 @@ dependencies = [ "system-deps", ] -[[package]] -name = "jiff" -version = "0.2.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c867c356cc096b33f4981825ab281ecba3db0acefe60329f044c1789d94c6543" -dependencies = [ - "jiff-static", - "log", - "portable-atomic", - "portable-atomic-util", - "serde_core", -] - -[[package]] -name = "jiff-static" -version = "0.2.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7946b4325269738f270bb55b3c19ab5c5040525f83fd625259422a9d25d9be5" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.114", -] - [[package]] name = "jni" version = "0.21.1" @@ -4154,14 +4089,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "moq-catalog" -version = "0.2.2" -source = "git+https://github.com/cloudflare/moq-rs?branch=draft-ietf-moq-transport-07#ebc843de8504e37d36c3134a1181513ebdf7a34a" -dependencies = [ - "serde", -] - [[package]] name = "moq-lite" version = "0.10.1" @@ -4243,7 +4170,7 @@ dependencies = [ "rcgen 0.14.7", "reqwest", "rustls", - "rustls-native-certs 0.8.3", + "rustls-native-certs", "rustls-pemfile", "rustls-webpki", "serde", @@ -4253,83 +4180,10 @@ dependencies = [ "tracing", "tracing-subscriber", "url", - "web-transport-quinn 0.11.4", + "web-transport-quinn", "web-transport-ws", ] -[[package]] -name = "moq-native-ietf" -version = "0.5.4" -source = "git+https://github.com/cloudflare/moq-rs?branch=draft-ietf-moq-transport-07#ebc843de8504e37d36c3134a1181513ebdf7a34a" -dependencies = [ - "anyhow", - "clap", - "futures", - "hex", - "log", - "moq-transport", - "quinn", - "ring", - "rustls", - "rustls-native-certs 0.7.3", - "rustls-pemfile", - "tokio", - "url", - "web-transport", - "web-transport-quinn 0.3.4", - "webpki", -] - -[[package]] -name = "moq-pub" -version = "0.8.4" -source = "git+https://github.com/cloudflare/moq-rs?branch=draft-ietf-moq-transport-07#ebc843de8504e37d36c3134a1181513ebdf7a34a" -dependencies = [ - "anyhow", - "bytes", - "clap", - "env_logger", - "log", - "moq-catalog", - "moq-native-ietf", - "moq-transport", - "mp4", - "rfc6381-codec", - "serde_json", - "tokio", - "tracing", - "tracing-subscriber", - "url", -] - -[[package]] -name = "moq-transport" -version = "0.10.0" -source = "git+https://github.com/cloudflare/moq-rs?branch=draft-ietf-moq-transport-07#ebc843de8504e37d36c3134a1181513ebdf7a34a" -dependencies = [ - "bytes", - "futures", - "log", - "paste", - "thiserror 1.0.69", - "tokio", - "web-transport", -] - -[[package]] -name = "mp4" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9ef834d5ed55e494a2ae350220314dc4aacd1c43a9498b00e320e0ea352a5c3" -dependencies = [ - "byteorder", - "bytes", - "num-rational", - "serde", - "serde_json", - "thiserror 1.0.69", -] - [[package]] name = "mp4-atom" version = "0.10.1" @@ -4346,21 +4200,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "mp4ra-rust" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdbc3d3867085d66ac6270482e66f3dd2c5a18451a3dc9ad7269e94844a536b7" -dependencies = [ - "four-cc", -] - -[[package]] -name = "mpeg4-audio-const" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96a1fe2275b68991faded2c80aa4a33dba398b77d276038b8f50701a22e55918" - [[package]] name = "muda" version = "0.17.1" @@ -4747,7 +4586,6 @@ dependencies = [ "num-bigint", "num-integer", "num-traits", - "serde", ] [[package]] @@ -5076,12 +4914,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" -[[package]] -name = "openssl-probe" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" - [[package]] name = "openssl-probe" version = "0.2.1" @@ -5523,15 +5355,6 @@ version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" -[[package]] -name = "portable-atomic-util" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a9db96d7fa8782dd8c15ce32ffe8680bbd1e978a43bf51a34d39483540495f5" -dependencies = [ - "portable-atomic", -] - [[package]] name = "portmapper" version = "0.13.0" @@ -6036,16 +5859,6 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e061d1b48cb8d38042de4ae0a7a6401009d6143dc80d2e2d6f31f0bdd6470c7" -[[package]] -name = "rfc6381-codec" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed54c20f5c3ec82eab6d998b313dc75ec5d5650d4f57675e61d72489040297fd" -dependencies = [ - "mp4ra-rust", - "mpeg4-audio-const", -] - [[package]] name = "rfc6979" version = "0.4.0" @@ -6167,29 +5980,16 @@ dependencies = [ "zeroize", ] -[[package]] -name = "rustls-native-certs" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" -dependencies = [ - "openssl-probe 0.1.6", - "rustls-pemfile", - "rustls-pki-types", - "schannel", - "security-framework 2.11.1", -] - [[package]] name = "rustls-native-certs" version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" dependencies = [ - "openssl-probe 0.2.1", + "openssl-probe", "rustls-pki-types", "schannel", - "security-framework 3.5.1", + "security-framework", ] [[package]] @@ -6217,16 +6017,16 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784" dependencies = [ - "core-foundation 0.10.1", + "core-foundation", "core-foundation-sys", "jni", "log", "once_cell", "rustls", - "rustls-native-certs 0.8.3", + "rustls-native-certs", "rustls-platform-verifier-android", "rustls-webpki", - "security-framework 3.5.1", + "security-framework", "security-framework-sys", "webpki-root-certs", "windows-sys 0.61.2", @@ -6424,19 +6224,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "security-framework" -version = "2.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" -dependencies = [ - "bitflags 2.10.0", - "core-foundation 0.9.4", - "core-foundation-sys", - "libc", - "security-framework-sys", -] - [[package]] name = "security-framework" version = "3.5.1" @@ -6444,7 +6231,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef" dependencies = [ "bitflags 2.10.0", - "core-foundation 0.10.1", + "core-foundation", "core-foundation-sys", "libc", "security-framework-sys", @@ -7212,7 +6999,7 @@ checksum = "f3a753bdc39c07b192151523a3f77cd0394aa75413802c883a0f6f6a0e5ee2e7" dependencies = [ "bitflags 2.10.0", "block2", - "core-foundation 0.10.1", + "core-foundation", "core-graphics", "crossbeam-channel", "dispatch", @@ -8491,18 +8278,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "web-transport" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4703a5ad424f8eca7860903b94f6ed747cf58bebba3081ede78e84493a12440c" -dependencies = [ - "bytes", - "thiserror 1.0.69", - "web-transport-quinn 0.3.4", - "web-transport-wasm", -] - [[package]] name = "web-transport-iroh" version = "0.1.1" @@ -8521,18 +8296,6 @@ dependencies = [ "web-transport-trait", ] -[[package]] -name = "web-transport-proto" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "974fa1e325e6cc5327de8887f189a441fcff4f8eedcd31ec87f0ef0cc5283fbc" -dependencies = [ - "bytes", - "http", - "thiserror 2.0.18", - "url", -] - [[package]] name = "web-transport-proto" version = "0.3.1" @@ -8560,24 +8323,6 @@ dependencies = [ "url", ] -[[package]] -name = "web-transport-quinn" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3020b51cda10472a365e42d9a701916d4f04d74cc743de08246ef6a421c2d137" -dependencies = [ - "bytes", - "futures", - "http", - "log", - "quinn", - "quinn-proto", - "thiserror 1.0.69", - "tokio", - "url", - "web-transport-proto 0.2.8", -] - [[package]] name = "web-transport-quinn" version = "0.11.4" @@ -8589,7 +8334,7 @@ dependencies = [ "http", "quinn", "rustls", - "rustls-native-certs 0.8.3", + "rustls-native-certs", "thiserror 2.0.18", "tokio", "tracing", @@ -8607,19 +8352,6 @@ dependencies = [ "bytes", ] -[[package]] -name = "web-transport-wasm" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66e8f572ad133af04a5aa4a207d48d3f6a2f1f3006aa1b8f0d774d28c085d699" -dependencies = [ - "bytes", - "js-sys", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", -] - [[package]] name = "web-transport-ws" version = "0.2.5" @@ -8679,16 +8411,6 @@ dependencies = [ "system-deps", ] -[[package]] -name = "webpki" -version = "0.22.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" -dependencies = [ - "ring", - "untrusted 0.9.0", -] - [[package]] name = "webpki-root-certs" version = "1.0.5" diff --git a/crates/ec-node/Cargo.toml b/crates/ec-node/Cargo.toml index 996f034..9d4023e 100644 --- a/crates/ec-node/Cargo.toml +++ b/crates/ec-node/Cargo.toml @@ -31,10 +31,8 @@ tracing.workspace = true tracing-subscriber.workspace = true hang = "0.14.0" moq-mux = "0.2.1" +moq-lite = "0.14.0" moq-native = { version = "0.13.1", default-features = true } -moq-native-ietf = { git = "https://github.com/cloudflare/moq-rs", branch = "draft-ietf-moq-transport-07" } -moq-pub = { git = "https://github.com/cloudflare/moq-rs", branch = "draft-ietf-moq-transport-07" } -moq-transport = { git = "https://github.com/cloudflare/moq-rs", branch = "draft-ietf-moq-transport-07" } url = "2" [dev-dependencies] diff --git a/crates/ec-node/src/main.rs b/crates/ec-node/src/main.rs index 060e3d1..5fd9e72 100644 --- a/crates/ec-node/src/main.rs +++ b/crates/ec-node/src/main.rs @@ -40,7 +40,6 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio_tungstenite::tungstenite::Message as WsMessage; use futures_util::{SinkExt, StreamExt}; use tokio::process::Command as TokioCommand; -use tokio::io::AsyncReadExt; use url::Url; const DIRECT_WIRE_TAG_FRAME: u8 = 0x00; @@ -4226,53 +4225,39 @@ fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> { } async fn wt_publish(args: WtPublishArgs) -> Result<()> { - let relay_url = Url::parse(&args.url) - .with_context(|| format!("invalid relay url: {}", args.url))?; + let relay_url = + Url::parse(&args.url).with_context(|| format!("invalid relay url: {}", args.url))?; - // Cloudflare's public relay currently implements a subset of the IETF MoQ Transport draft-07. - // The upstream (newer-draft) moq stack does not interop; use the draft-07 compatibility - // implementation from the moq-rs repository. - let mut tls_args = moq_native_ietf::tls::Args::default(); - tls_args.disable_verify = args.tls_disable_verify; - let tls = tls_args.load().context("failed to load TLS config")?; + // Cloudflare's MoQ technical preview relay currently does not support ANNOUNCE. + // Use the moq-lite publish model (as used by hang) for relay interop. + let mut client_cfg = moq_native::ClientConfig::default(); + if args.tls_disable_verify { + client_cfg.tls.disable_verify = Some(true); + } + let client = client_cfg + .init() + .context("failed to init moq-native client")?; - let quic = moq_native_ietf::quic::Endpoint::new(moq_native_ietf::quic::Config { - bind: "[::]:0".parse().expect("valid bind addr"), - tls, - }) - .context("failed to init moq-native-ietf endpoint")?; + // Create a local origin + broadcast, then pass an OriginConsumer into the client so it can + // publish announcements to the relay. + let origin = moq_lite::Origin::produce(); + let publish = origin.consume(); + let mut broadcast = origin + .create_broadcast(&args.name) + .ok_or_else(|| anyhow!("failed to create broadcast: {}", args.name))?; - let (writer, _, reader) = moq_transport::serve::Tracks::new( - moq_transport::coding::Tuple::from_utf8_path(&args.name), - ) - .produce(); - let mut media = moq_pub::Media::new(writer).context("failed to init moq-pub media parser")?; + // Ensure the catalog track is present in the broadcast so subscribers can discover tracks. + let mut catalog = hang::CatalogProducer::default(); + broadcast.insert_track(catalog.track.clone()); tracing::info!(url=%relay_url, name=%args.name, "connecting to relay"); - let session = quic - .client - .connect(&relay_url) + let session = client + .with_publish(publish) + .connect(relay_url) .await .context("failed to connect to relay")?; - let (session, mut publisher) = moq_transport::session::Publisher::connect(session) - .await - .context("failed to create moq-transport publisher")?; - - // Run the relay session pump in the background; announcing and publishing require the - // session driver to be polled. - let session_task = tokio::spawn(async move { session.run().await }); - - tracing::info!("announcing track(s)"); - match tokio::time::timeout(Duration::from_secs(5), publisher.announce(reader)).await { - Ok(res) => res.context("publisher announce failed")?, - Err(_) => anyhow::bail!("publisher announce timed out"), - } - // Spawn ffmpeg to generate fMP4 suitable for hang/moq-mux. - // We keep this conservative and deterministic-ish by default: - // - single threaded x264 - // - fixed GOP settings to reduce drift let mut cmd = TokioCommand::new("ffmpeg"); cmd.arg("-hide_banner") .arg("-loglevel") @@ -4331,59 +4316,36 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { tracing::info!(input=%args.input, "spawning ffmpeg"); let mut child = cmd.spawn().context("failed to spawn ffmpeg")?; - let mut stdout = child + let stdout = child .stdout .take() .ok_or_else(|| anyhow!("ffmpeg stdout unavailable"))?; - tracing::info!("publishing fMP4 -> moq-pub -> relay"); - let decode_task = tokio::spawn(async move { - let mut buf = bytes::BytesMut::new(); - loop { - let n = stdout - .read_buf(&mut buf) - .await - .context("failed to read from ffmpeg stdout")?; - if n == 0 { - anyhow::bail!("ffmpeg stdout EOF"); - } - media.parse(&mut buf).context("failed to parse fMP4")?; - } - #[allow(unreachable_code)] - Ok::<(), anyhow::Error>(()) - }); + let config = moq_mux::import::Fmp4Config { + passthrough: args.passthrough, + }; + let mut importer = moq_mux::import::Fmp4::new(broadcast, catalog, config); + let mut stdout = stdout; + let mut decode_fut = importer.decode_from(&mut stdout); + tokio::pin!(decode_fut); + + tracing::info!("publishing fMP4 -> moq-mux -> relay"); tokio::select! { - res = session_task => { - let _ = child.kill().await; + res = &mut decode_fut => { + let status = child.wait().await.context("failed to wait for ffmpeg")?; match res { - Ok(Ok(())) => Ok(()), - Ok(Err(err)) => Err(err).context("relay session error"), - Err(err) => Err(anyhow!("relay session join failed: {err}")), + Ok(()) if status.success() => Ok(()), + Ok(()) => Err(anyhow!("ffmpeg exited with {status}")), + Err(err) => Err(err).context("fmp4 ingest failed"), } } - res = decode_task => { - match res { - Ok(Ok(())) => { - let status = child.wait().await.context("failed to wait for ffmpeg")?; - if !status.success() { - return Err(anyhow!("ffmpeg exited with {status}")); - } - Ok(()) - } - Ok(Err(err)) => { - let _ = child.kill().await; - Err(err).context("fmp4 ingest failed") - } - Err(err) => { - let _ = child.kill().await; - Err(anyhow!("import task join failed: {err}")) - } - } + _ = session.closed() => { + let _ = child.kill().await; + Err(anyhow!("relay session closed")) } _ = tokio::signal::ctrl_c() => { tracing::info!("ctrl-c; shutting down"); - // Best-effort shutdown; the underlying QUIC session is dropped after kill. let _ = child.kill().await; tokio::time::sleep(Duration::from_millis(100)).await; Ok(()) diff --git a/evolution/proposals/ECP-0063-cloudflare-moq-webtransport.md b/evolution/proposals/ECP-0063-cloudflare-moq-webtransport.md index caea0e9..6904c4f 100644 --- a/evolution/proposals/ECP-0063-cloudflare-moq-webtransport.md +++ b/evolution/proposals/ECP-0063-cloudflare-moq-webtransport.md @@ -58,8 +58,7 @@ Cloudflare's public relay currently implements a subset of the IETF MoQ Transpor newer draft implementations. Implementation choice: -- `ec-node wt-publish` uses `moq-native-ietf` + `moq-transport` + `moq-pub` (fMP4 ingestion) for Cloudflare relay compatibility. -- Pin these crates to the `cloudflare/moq-rs` git branch `draft-ietf-moq-transport-07` until the relay supports newer drafts (crates.io releases observed to fail with `closed by peer`). +- Cloudflare's relay preview currently does **not** support `ANNOUNCE` (namespace-style publishing). `ec-node wt-publish` uses the `moq-lite` publish model via `moq-native` and `moq-mux` (fMP4 ingestion) for Cloudflare relay compatibility. ### Share link diff --git a/nix/pkgs/ec-cli.nix b/nix/pkgs/ec-cli.nix index f8ce106..df5bd32 100644 --- a/nix/pkgs/ec-cli.nix +++ b/nix/pkgs/ec-cli.nix @@ -22,10 +22,6 @@ rustPlatform.buildRustPackage { cargoLock = { lockFile = ../../Cargo.lock; - outputHashes = { - # Filled iteratively when git dependencies change. - "moq-catalog-0.2.2" = "sha256-df9KXVBRiNewDJ7ZgGBja81PPnk8vC9TA0bDIG0892o="; - }; }; cargoBuildFlags = [ "-p" "ec-cli" ]; diff --git a/nix/pkgs/ec-node.nix b/nix/pkgs/ec-node.nix index 5072729..ae08534 100644 --- a/nix/pkgs/ec-node.nix +++ b/nix/pkgs/ec-node.nix @@ -24,10 +24,6 @@ rustPlatform.buildRustPackage { cargoLock = { lockFile = ../../Cargo.lock; - outputHashes = { - # Filled iteratively when git dependencies change. - "moq-catalog-0.2.2" = "sha256-df9KXVBRiNewDJ7ZgGBja81PPnk8vC9TA0bDIG0892o="; - }; }; cargoBuildFlags = [ "-p" "ec-node" ];