ec-node: wt-publish via moq-lite publish model

This commit is contained in:
every.channel 2026-02-18 00:38:09 -08:00
parent becd56b42f
commit 97c83961c5
No known key found for this signature in database
6 changed files with 57 additions and 384 deletions

View file

@ -40,7 +40,6 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio_tungstenite::tungstenite::Message as WsMessage;
use futures_util::{SinkExt, StreamExt};
use tokio::process::Command as TokioCommand;
use tokio::io::AsyncReadExt;
use url::Url;
const DIRECT_WIRE_TAG_FRAME: u8 = 0x00;
@ -4226,53 +4225,39 @@ fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> {
}
async fn wt_publish(args: WtPublishArgs) -> Result<()> {
let relay_url = Url::parse(&args.url)
.with_context(|| format!("invalid relay url: {}", args.url))?;
let relay_url =
Url::parse(&args.url).with_context(|| format!("invalid relay url: {}", args.url))?;
// Cloudflare's public relay currently implements a subset of the IETF MoQ Transport draft-07.
// The upstream (newer-draft) moq stack does not interop; use the draft-07 compatibility
// implementation from the moq-rs repository.
let mut tls_args = moq_native_ietf::tls::Args::default();
tls_args.disable_verify = args.tls_disable_verify;
let tls = tls_args.load().context("failed to load TLS config")?;
// Cloudflare's MoQ technical preview relay currently does not support ANNOUNCE.
// Use the moq-lite publish model (as used by hang) for relay interop.
let mut client_cfg = moq_native::ClientConfig::default();
if args.tls_disable_verify {
client_cfg.tls.disable_verify = Some(true);
}
let client = client_cfg
.init()
.context("failed to init moq-native client")?;
let quic = moq_native_ietf::quic::Endpoint::new(moq_native_ietf::quic::Config {
bind: "[::]:0".parse().expect("valid bind addr"),
tls,
})
.context("failed to init moq-native-ietf endpoint")?;
// Create a local origin + broadcast, then pass an OriginConsumer into the client so it can
// publish announcements to the relay.
let origin = moq_lite::Origin::produce();
let publish = origin.consume();
let mut broadcast = origin
.create_broadcast(&args.name)
.ok_or_else(|| anyhow!("failed to create broadcast: {}", args.name))?;
let (writer, _, reader) = moq_transport::serve::Tracks::new(
moq_transport::coding::Tuple::from_utf8_path(&args.name),
)
.produce();
let mut media = moq_pub::Media::new(writer).context("failed to init moq-pub media parser")?;
// Ensure the catalog track is present in the broadcast so subscribers can discover tracks.
let mut catalog = hang::CatalogProducer::default();
broadcast.insert_track(catalog.track.clone());
tracing::info!(url=%relay_url, name=%args.name, "connecting to relay");
let session = quic
.client
.connect(&relay_url)
let session = client
.with_publish(publish)
.connect(relay_url)
.await
.context("failed to connect to relay")?;
let (session, mut publisher) = moq_transport::session::Publisher::connect(session)
.await
.context("failed to create moq-transport publisher")?;
// Run the relay session pump in the background; announcing and publishing require the
// session driver to be polled.
let session_task = tokio::spawn(async move { session.run().await });
tracing::info!("announcing track(s)");
match tokio::time::timeout(Duration::from_secs(5), publisher.announce(reader)).await {
Ok(res) => res.context("publisher announce failed")?,
Err(_) => anyhow::bail!("publisher announce timed out"),
}
// Spawn ffmpeg to generate fMP4 suitable for hang/moq-mux.
// We keep this conservative and deterministic-ish by default:
// - single threaded x264
// - fixed GOP settings to reduce drift
let mut cmd = TokioCommand::new("ffmpeg");
cmd.arg("-hide_banner")
.arg("-loglevel")
@ -4331,59 +4316,36 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
tracing::info!(input=%args.input, "spawning ffmpeg");
let mut child = cmd.spawn().context("failed to spawn ffmpeg")?;
let mut stdout = child
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow!("ffmpeg stdout unavailable"))?;
tracing::info!("publishing fMP4 -> moq-pub -> relay");
let decode_task = tokio::spawn(async move {
let mut buf = bytes::BytesMut::new();
loop {
let n = stdout
.read_buf(&mut buf)
.await
.context("failed to read from ffmpeg stdout")?;
if n == 0 {
anyhow::bail!("ffmpeg stdout EOF");
}
media.parse(&mut buf).context("failed to parse fMP4")?;
}
#[allow(unreachable_code)]
Ok::<(), anyhow::Error>(())
});
let config = moq_mux::import::Fmp4Config {
passthrough: args.passthrough,
};
let mut importer = moq_mux::import::Fmp4::new(broadcast, catalog, config);
let mut stdout = stdout;
let mut decode_fut = importer.decode_from(&mut stdout);
tokio::pin!(decode_fut);
tracing::info!("publishing fMP4 -> moq-mux -> relay");
tokio::select! {
res = session_task => {
let _ = child.kill().await;
res = &mut decode_fut => {
let status = child.wait().await.context("failed to wait for ffmpeg")?;
match res {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err).context("relay session error"),
Err(err) => Err(anyhow!("relay session join failed: {err}")),
Ok(()) if status.success() => Ok(()),
Ok(()) => Err(anyhow!("ffmpeg exited with {status}")),
Err(err) => Err(err).context("fmp4 ingest failed"),
}
}
res = decode_task => {
match res {
Ok(Ok(())) => {
let status = child.wait().await.context("failed to wait for ffmpeg")?;
if !status.success() {
return Err(anyhow!("ffmpeg exited with {status}"));
}
Ok(())
}
Ok(Err(err)) => {
let _ = child.kill().await;
Err(err).context("fmp4 ingest failed")
}
Err(err) => {
let _ = child.kill().await;
Err(anyhow!("import task join failed: {err}"))
}
}
_ = session.closed() => {
let _ = child.kill().await;
Err(anyhow!("relay session closed"))
}
_ = tokio::signal::ctrl_c() => {
tracing::info!("ctrl-c; shutting down");
// Best-effort shutdown; the underlying QUIC session is dropped after kill.
let _ = child.kill().await;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())