From 806d8ed84d7748ca9aa9ff61d8c2180c9f4f8f3c Mon Sep 17 00:00:00 2001 From: "every.channel" Date: Wed, 18 Feb 2026 00:11:33 -0800 Subject: [PATCH] ec-node: announce before ingest loop --- crates/ec-node/src/main.rs | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/crates/ec-node/src/main.rs b/crates/ec-node/src/main.rs index 698f776..d226312 100644 --- a/crates/ec-node/src/main.rs +++ b/crates/ec-node/src/main.rs @@ -4259,6 +4259,16 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { .await .context("failed to create moq-transport publisher")?; + tracing::info!("announcing track(s)"); + publisher + .announce(reader) + .await + .context("publisher announce failed")?; + + // Run the relay session pump in the background; publishing happens by writing objects to + // the track writer via moq-pub's fMP4 parser. + let session_task = tokio::spawn(async move { session.run().await }); + // Spawn ffmpeg to generate fMP4 suitable for hang/moq-mux. // We keep this conservative and deterministic-ish by default: // - single threaded x264 @@ -4344,10 +4354,13 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { }); tokio::select! { - res = session.run() => { + res = session_task => { let _ = child.kill().await; - res.context("relay session error")?; - Ok(()) + match res { + Ok(Ok(())) => Ok(()), + Ok(Err(err)) => Err(err).context("relay session error"), + Err(err) => Err(anyhow!("relay session join failed: {err}")), + } } res = decode_task => { match res { @@ -4368,11 +4381,6 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { } } } - res = publisher.announce(reader) => { - let _ = child.kill().await; - res.context("publisher announce failed")?; - Ok(()) - } _ = tokio::signal::ctrl_c() => { tracing::info!("ctrl-c; shutting down"); // Best-effort shutdown; the underlying QUIC session is dropped after kill.