ec-node: WebTransport publish + web hang-watch

This commit is contained in:
every.channel 2026-02-16 12:54:42 -05:00
parent 791c7beee7
commit 339aef50e0
No known key found for this signature in database
19 changed files with 1355 additions and 2229 deletions

View file

@ -24,11 +24,15 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "rus
urlencoding = "2"
serde.workspace = true
serde_json.workspace = true
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = { version = "0.24", default-features = false, features = ["connect", "rustls-tls-webpki-roots"] }
futures-util = "0.3"
tracing.workspace = true
tracing-subscriber.workspace = true
hang = "0.14.0"
moq-mux = "0.2.1"
moq-native = { version = "0.13.1", default-features = true }
url = "2"
[dev-dependencies]
headless_chrome = "1"

View file

@ -39,6 +39,11 @@ use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio_tungstenite::tungstenite::Message as WsMessage;
use futures_util::{SinkExt, StreamExt};
use hang as hang_moq;
use moq_mux as moq_mux_lib;
use moq_native as moq_native_lib;
use tokio::process::Command as TokioCommand;
use url::Url;
const DIRECT_WIRE_TAG_FRAME: u8 = 0x00;
const DIRECT_WIRE_TAG_STREAM: u8 = 0x01;
@ -75,6 +80,8 @@ enum Commands {
WsPublish(WsPublishArgs),
/// Subscribe to the global one-to-many relay (`/api/stream/ws`) and capture CMAF fragments + an mp4 proof.
WsSubscribe(WsSubscribeArgs),
/// Publish a CMAF (fMP4) stream to a MoQ relay over WebTransport (Cloudflare preview by default).
WtPublish(WtPublishArgs),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
@ -394,6 +401,33 @@ struct WsSubscribeArgs {
mp4: Option<PathBuf>,
}
#[derive(Parser, Debug)]
struct WtPublishArgs {
/// Relay URL (WebTransport) to connect to.
/// Default points at Cloudflare's MoQ technical preview relay.
#[arg(long, default_value = "https://relay.cloudflare.mediaoverquic.com/")]
url: String,
/// Broadcast name to publish.
///
/// This should be stable so you can share:
/// `https://every.channel/watch?url=...&name=<broadcast>`.
#[arg(long)]
name: String,
/// Input URL or file for ffmpeg (e.g. HDHomeRun `http://hdhomerun.local/auto/v4.1`).
#[arg(long)]
input: String,
/// If set, transcode to H.264/AAC before fragmenting to fMP4.
#[arg(long, default_value_t = true, action = clap::ArgAction::Set)]
transcode: bool,
/// Transmit fMP4 fragments directly (passthrough mode).
/// When false, the importer may reframe into CMAF fragments.
#[arg(long, default_value_t = true, action = clap::ArgAction::Set)]
passthrough: bool,
/// Danger: disable TLS verification for the relay.
#[arg(long, default_value_t = false)]
tls_disable_verify: bool,
}
#[derive(Subcommand, Debug)]
enum IngestSource {
/// Ingest from an HDHomeRun device.
@ -460,6 +494,7 @@ fn main() -> Result<()> {
Commands::DirectSubscribe(args) => run_async(direct_subscribe(args))?,
Commands::WsPublish(args) => run_async(ws_publish(args))?,
Commands::WsSubscribe(args) => run_async(ws_subscribe(args))?,
Commands::WtPublish(args) => run_async(wt_publish(args))?,
}
Ok(())
@ -4191,3 +4226,147 @@ fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> {
timeout
))
}
async fn wt_publish(args: WtPublishArgs) -> Result<()> {
let relay_url = Url::parse(&args.url)
.with_context(|| format!("invalid relay url: {}", args.url))?;
// Build the WebTransport client.
let mut client_cfg = moq_native_lib::ClientConfig::default();
if args.tls_disable_verify {
client_cfg.tls.disable_verify = Some(true);
}
let client = client_cfg.init().context("failed to init moq-native client")?;
// Build a hang broadcast producer + catalog + fMP4 importer.
// This matches the moq-dev/moq-cli publishing strategy, but with our own ffmpeg input.
let origin = hang_moq::moq_lite::Origin::produce();
let mut broadcast = hang_moq::moq_lite::BroadcastProducer::default();
let catalog = hang_moq::Catalog::default().produce();
broadcast.insert_track(catalog.track.clone());
let mut importer = moq_mux_lib::import::Fmp4::new(
broadcast.clone(),
catalog.clone(),
moq_mux_lib::import::Fmp4Config {
passthrough: args.passthrough,
},
);
origin.publish_broadcast(&args.name, broadcast.consume());
tracing::info!(url=%relay_url, name=%args.name, "connecting to relay");
let session = client
.with_publish(origin.consume())
.connect(relay_url)
.await
.context("failed to connect to relay")?;
// Spawn ffmpeg to generate fMP4 suitable for hang/moq-mux.
// We keep this conservative and deterministic-ish by default:
// - single threaded x264
// - fixed GOP settings to reduce drift
let mut cmd = TokioCommand::new("ffmpeg");
cmd.arg("-hide_banner")
.arg("-loglevel")
.arg("error")
.arg("-nostats")
.arg("-fflags")
.arg("+nobuffer")
.arg("-flags")
.arg("low_delay")
.arg("-i")
.arg(&args.input);
if args.transcode {
cmd.args([
"-c:v",
"libx264",
"-preset",
"veryfast",
"-tune",
"zerolatency",
"-pix_fmt",
"yuv420p",
"-profile:v",
"main",
"-g",
"48",
"-keyint_min",
"48",
"-sc_threshold",
"0",
"-threads",
"1",
"-c:a",
"aac",
"-b:a",
"128k",
"-ac",
"2",
"-ar",
"48000",
]);
} else {
cmd.args(["-c", "copy"]);
}
cmd.args([
"-f",
"mp4",
"-movflags",
"empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset",
"pipe:1",
]);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::inherit());
tracing::info!(input=%args.input, "spawning ffmpeg");
let mut child = cmd.spawn().context("failed to spawn ffmpeg")?;
let mut stdout = child
.stdout
.take()
.ok_or_else(|| anyhow!("ffmpeg stdout unavailable"))?;
tracing::info!("publishing fMP4 -> hang -> relay");
let decode_task = tokio::spawn(async move { importer.decode_from(&mut stdout).await });
tokio::select! {
res = session.closed() => {
if let Err(err) = res {
// moq-lite errors are not always `std::error::Error`; keep this explicit.
return Err(anyhow!("relay session closed: {err:?}"));
}
let _ = child.kill().await;
Ok(())
}
res = decode_task => {
match res {
Ok(Ok(())) => {
let status = child.wait().await.context("failed to wait for ffmpeg")?;
if !status.success() {
return Err(anyhow!("ffmpeg exited with {status}"));
}
Ok(())
}
Ok(Err(err)) => {
let _ = child.kill().await;
Err(err).context("fmp4 import failed")
}
Err(err) => {
let _ = child.kill().await;
Err(anyhow!("import task join failed: {err}"))
}
}
}
_ = tokio::signal::ctrl_c() => {
tracing::info!("ctrl-c; shutting down");
session.close(hang_moq::moq_lite::Error::Cancel);
let _ = child.kill().await;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
}
}