ec-node: wt-publish via moq-transport (draft-07)

This commit is contained in:
every.channel 2026-02-17 02:00:38 -08:00
parent 7719b0b763
commit 49b969e081
No known key found for this signature in database
6 changed files with 385 additions and 52 deletions

View file

@ -39,10 +39,8 @@ use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio_tungstenite::tungstenite::Message as WsMessage;
use futures_util::{SinkExt, StreamExt};
use hang as hang_moq;
use moq_mux as moq_mux_lib;
use moq_native as moq_native_lib;
use tokio::process::Command as TokioCommand;
use tokio::io::AsyncReadExt;
use url::Url;
const DIRECT_WIRE_TAG_FRAME: u8 = 0x00;
@ -4231,38 +4229,33 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
let relay_url = Url::parse(&args.url)
.with_context(|| format!("invalid relay url: {}", args.url))?;
// Build the WebTransport client.
let mut client_cfg = moq_native_lib::ClientConfig::default();
if args.tls_disable_verify {
client_cfg.tls.disable_verify = Some(true);
}
let client = client_cfg.init().context("failed to init moq-native client")?;
// Cloudflare's relay currently implements a subset of the IETF MoQ Transport draft-07.
// Use moq-transport (via moq-native-ietf) for interoperability.
let mut tls_args = moq_native_ietf::tls::Args::default();
tls_args.disable_verify = args.tls_disable_verify;
let tls = tls_args.load().context("failed to load TLS config")?;
// Build a hang broadcast producer + catalog + fMP4 importer.
// This matches the moq-dev/moq-cli publishing strategy, but with our own ffmpeg input.
let origin = hang_moq::moq_lite::Origin::produce();
let bind: std::net::SocketAddr = "[::]:0".parse().expect("valid bind addr");
let quic = moq_native_ietf::quic::Endpoint::new(moq_native_ietf::quic::Config::new(
bind, None, tls,
))
.context("failed to init moq-native-ietf endpoint")?;
let mut broadcast = hang_moq::moq_lite::BroadcastProducer::default();
let catalog = hang_moq::Catalog::default().produce();
broadcast.insert_track(catalog.track.clone());
let mut importer = moq_mux_lib::import::Fmp4::new(
broadcast.clone(),
catalog.clone(),
moq_mux_lib::import::Fmp4Config {
passthrough: args.passthrough,
},
);
origin.publish_broadcast(&args.name, broadcast.consume());
let namespace = moq_transport::coding::TrackNamespace::from_utf8_path(&args.name);
let (writer, _, reader) = moq_transport::serve::Tracks::new(namespace).produce();
let mut media = moq_pub::Media::new(writer).context("failed to init moq-pub media parser")?;
tracing::info!(url=%relay_url, name=%args.name, "connecting to relay");
let session = client
.with_publish(origin.consume())
.connect(relay_url)
let (session, _cid) = quic
.client
.connect(&relay_url, None)
.await
.context("failed to connect to relay")?;
let (session, mut publisher) = moq_transport::session::Publisher::connect(session)
.await
.context("failed to create moq-transport publisher")?;
// Spawn ffmpeg to generate fMP4 suitable for hang/moq-mux.
// We keep this conservative and deterministic-ish by default:
// - single threaded x264
@ -4330,16 +4323,27 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
.take()
.ok_or_else(|| anyhow!("ffmpeg stdout unavailable"))?;
tracing::info!("publishing fMP4 -> hang -> relay");
let decode_task = tokio::spawn(async move { importer.decode_from(&mut stdout).await });
tracing::info!("publishing fMP4 -> moq-pub -> relay");
let decode_task = tokio::spawn(async move {
let mut buf = bytes::BytesMut::new();
loop {
let n = stdout
.read_buf(&mut buf)
.await
.context("failed to read from ffmpeg stdout")?;
if n == 0 {
anyhow::bail!("ffmpeg stdout EOF");
}
media.parse(&mut buf).context("failed to parse fMP4")?;
}
#[allow(unreachable_code)]
Ok::<(), anyhow::Error>(())
});
tokio::select! {
res = session.closed() => {
if let Err(err) = res {
// moq-lite errors are not always `std::error::Error`; keep this explicit.
return Err(anyhow!("relay session closed: {err:?}"));
}
res = session.run() => {
let _ = child.kill().await;
res.context("relay session error")?;
Ok(())
}
res = decode_task => {
@ -4353,7 +4357,7 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
}
Ok(Err(err)) => {
let _ = child.kill().await;
Err(err).context("fmp4 import failed")
Err(err).context("fmp4 ingest failed")
}
Err(err) => {
let _ = child.kill().await;
@ -4361,9 +4365,14 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
}
}
}
res = publisher.announce(reader) => {
let _ = child.kill().await;
res.context("publisher announce failed")?;
Ok(())
}
_ = tokio::signal::ctrl_c() => {
tracing::info!("ctrl-c; shutting down");
session.close(hang_moq::moq_lite::Error::Cancel);
// Best-effort shutdown; the underlying QUIC session is dropped after kill.
let _ = child.kill().await;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())