//! Node runner: orchestrates ingest, chunking, and MoQ publication. mod source; use anyhow::{anyhow, Context, Result}; use blake3; use clap::ValueEnum; use clap::{Parser, Subcommand}; use ec_chopper::{build_manifest_body_for_chunks, TsChunk}; use ec_core::{ merkle_proof_for_index, verify_merkle_proof, Manifest, ManifestSummary, ManifestVariant, MoqStreamDescriptor, StreamCatalogEntry, StreamDescriptor, StreamEncryptionInfo, StreamId, StreamKey, StreamMetadata, }; use ec_crypto::{ decrypt_stream_data, encrypt_stream_data, load_manifest_keypair_from_env, sign_manifest_id, verify_manifest_signature, ENCRYPTION_ALG, }; use ec_direct::{decode_direct_link, encode_direct_link, DirectCodeV1}; use ec_iroh::DiscoveryConfig; use ec_moq::{ chunk_duration_secs, decode_object_frame, encode_object_frame, FileRelay, GroupId, HlsWriter, MoqNode, MoqPublishSet, ObjectId, ObjectMeta, ObjectPayload, TimingMeta, TrackName, DEFAULT_MANIFEST_TRACK_NAME, DEFAULT_TRACK_NAME, }; use iroh::Watcher; use just_webrtc::types::{DataChannelOptions, ICEServer, PeerConfiguration, PeerConnectionState}; use just_webrtc::{DataChannelExt, PeerConnectionBuilder, PeerConnectionExt}; use source::{HdhrSource, HlsMode, HlsSource, LinuxDvbSource, StreamSource, TsSource}; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fs; use std::fs::File; use std::future::Future; use std::io::Read; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio_tungstenite::tungstenite::Message as WsMessage; use futures_util::{SinkExt, StreamExt}; use tokio::process::Command as TokioCommand; use url::Url; const DIRECT_WIRE_TAG_FRAME: u8 = 0x00; const DIRECT_WIRE_TAG_STREAM: u8 = 0x01; const DIRECT_WIRE_TAG_PING: u8 = 0x02; const DIRECT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(8); // Conservatively under typical SCTP data channel max message sizes. const DIRECT_WIRE_CHUNK_BYTES: usize = 16 * 1024; use tokio::sync::mpsc; use tokio::sync::RwLock; #[derive(Parser, Debug)] #[command(name = "ec-node")] #[command(about = "every.channel node runner", long_about = None)] struct Cli { #[command(subcommand)] command: Commands, } #[derive(Subcommand, Debug)] enum Commands { /// Ingest a source and publish MoQ objects into a local relay directory. Ingest(IngestArgs), /// Ingest a source and publish via MoQ over iroh. MoqPublish(MoqPublishArgs), /// Subscribe to a MoQ stream and write HLS segments locally. MoqSubscribe(MoqSubscribeArgs), /// Publish and subscribe to a MoQ stream locally, verifying chunk hashes. MoqSelftest(MoqSelftestArgs), /// Publish a stream over a direct WebRTC data channel (manual copy/paste connect code). DirectPublish(DirectPublishArgs), /// Subscribe to a direct WebRTC stream (directory or offer link) and optionally capture an .mp4 proof. DirectSubscribe(DirectSubscribeArgs), /// Publish a stream to the global one-to-many relay (`/api/stream/ws`) and keep the directory entry live. WsPublish(WsPublishArgs), /// Subscribe to the global one-to-many relay (`/api/stream/ws`) and capture CMAF fragments + an mp4 proof. WsSubscribe(WsSubscribeArgs), /// Publish a CMAF (fMP4) stream to a MoQ relay over WebTransport (Cloudflare preview by default). WtPublish(WtPublishArgs), } #[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] enum EncodeMode { /// Publish/subscribe CMAF-style fragmented MP4 segments (HLS fMP4) encoded with x264/AAC. Cmaf, } #[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] enum CmafLadderPreset { /// 3-rung ladder: 1080p@6000k, 720p@3000k, 480p@1200k (CBR-ish). Hd3, } #[derive(Parser, Debug)] struct IngestArgs { /// Output directory for temporary chunks. #[arg(long, default_value = "./tmp/chunks")] chunk_dir: PathBuf, /// Relay directory to write MoQ objects. #[arg(long, default_value = "./tmp/relay")] relay_dir: PathBuf, /// Chunk duration in ms. #[arg(long, default_value_t = 2000)] chunk_ms: u64, /// Maximum number of chunks to write. #[arg(long)] max_chunks: Option, /// Optional stream id override. #[arg(long)] stream_id: Option, /// Optional network secret (hex) for stream encryption. #[arg(long)] network_secret: Option, /// Enable deterministic transcode before chunking. #[arg(long)] deterministic: bool, #[command(subcommand)] source: IngestSource, } #[derive(Parser, Debug)] struct MoqPublishArgs { /// Output directory for temporary chunks. #[arg(long, default_value = "./tmp/chunks")] chunk_dir: PathBuf, /// Chunk duration in ms. #[arg(long, default_value_t = 2000)] chunk_ms: u64, /// Maximum number of chunks to write. #[arg(long)] max_chunks: Option, /// Optional stream id override. #[arg(long)] stream_id: Option, /// Optional network secret (hex) for stream encryption. #[arg(long)] network_secret: Option, /// Enable deterministic transcode before chunking. #[arg(long)] deterministic: bool, /// Broadcast name override (defaults to stream id). #[arg(long)] broadcast_name: Option, /// Track name override. #[arg(long, default_value = DEFAULT_TRACK_NAME)] track_name: String, /// Publish chunk objects on the main track. #[arg(long, default_value_t = true, action = clap::ArgAction::Set)] publish_chunks: bool, /// Publish manifests alongside chunks. #[arg(long)] publish_manifests: bool, /// Track name for manifest frames. #[arg(long, default_value = DEFAULT_MANIFEST_TRACK_NAME)] manifest_track: String, /// Number of chunks per manifest epoch. #[arg(long, default_value_t = 1)] epoch_chunks: usize, /// Optional iroh secret key (hex). #[arg(long)] iroh_secret: Option, /// Discovery modes to enable (comma-separated: dht, mdns, dns). #[arg(long)] discovery: Option, /// Announce catalog entries over iroh-gossip (requires peers). #[arg(long)] announce: bool, /// Gossip peers to connect to (repeatable). #[arg(long)] gossip_peer: Vec, /// Optional startup delay (ms) after binding/publishing tracks, before ingest begins. /// Useful for E2E tests that need time to connect subscribers. #[arg(long)] startup_delay_ms: Option, /// Encoding/container mode. #[arg(long, value_enum, default_value_t = EncodeMode::Cmaf)] encode: EncodeMode, /// Track name for CMAF init segment objects. #[arg(long, default_value = "init")] init_track: String, /// Publish a CMAF ladder (multiple quality variants) using x264/AAC. #[arg(long, value_enum)] cmaf_ladder: Option, #[command(subcommand)] source: IngestSource, } #[derive(Parser, Debug)] struct MoqSubscribeArgs { /// Output directory for HLS segments. #[arg(long, default_value = "./tmp/moq-hls")] output_dir: PathBuf, /// Fallback chunk duration in ms. #[arg(long, default_value_t = 2000)] chunk_ms: u64, /// HLS window size. #[arg(long, default_value_t = 6)] window: usize, /// Optional stream id override (for decryption). #[arg(long)] stream_id: Option, /// Optional network secret (hex) for stream decryption. #[arg(long)] network_secret: Option, /// Remote endpoint address (iroh EndpointAddr). #[arg(long)] remote: String, /// Optional remote endpoint address to fetch manifests from. /// Defaults to `--remote` when not provided. #[arg(long)] remote_manifests: Option, /// Broadcast name to subscribe to. #[arg(long)] broadcast_name: String, /// Track name to subscribe to. #[arg(long, default_value = DEFAULT_TRACK_NAME)] track_name: String, /// Subscribe to manifest frames. #[arg(long)] subscribe_manifests: bool, /// Require a manifest to accept chunk data. #[arg(long)] require_manifest: bool, /// Track name for manifest frames. #[arg(long, default_value = DEFAULT_MANIFEST_TRACK_NAME)] manifest_track: String, /// Allowed manifest signer ids (comma-separated). #[arg(long)] manifest_signers: Option, /// Maximum bytes per second to accept (anti-junk). #[arg(long)] max_bytes_per_sec: Option, /// Maximum burst bytes before throttling. #[arg(long)] max_bytes_burst: Option, /// Maximum invalid chunks before disconnect. #[arg(long, default_value_t = 1)] max_invalid_chunks: u32, /// Stop after writing this many segments (useful for E2E tests). #[arg(long)] stop_after: Option, /// Optional iroh secret key (hex). #[arg(long)] iroh_secret: Option, /// Discovery modes to enable (comma-separated: dht, mdns, dns). #[arg(long)] discovery: Option, /// Container mode for local HLS output. #[arg(long, value_enum, default_value_t = EncodeMode::Cmaf)] container: EncodeMode, /// Track name to subscribe to for CMAF init segment objects. #[arg(long, default_value = "init")] init_track: String, /// Subscribe to the init segment track (CMAF). #[arg(long)] subscribe_init: bool, /// Write raw CMAF init+segments (no HLS playlist) to `--output-dir`. #[arg(long)] raw_cmaf: bool, } #[derive(Parser, Debug)] struct MoqSelftestArgs { /// Input TS file or URL (e.g. http://HDHR_HOST/auto/v8.1). input: String, /// Output directory for temporary chunks. #[arg(long, default_value = "./tmp/moq-selftest")] chunk_dir: PathBuf, /// Chunk duration in ms. #[arg(long, default_value_t = 2000)] chunk_ms: u64, /// Maximum number of chunks to publish/verify. #[arg(long, default_value_t = 8)] max_chunks: usize, /// Optional stream id override. #[arg(long)] stream_id: Option, /// Track name override. #[arg(long, default_value = DEFAULT_TRACK_NAME)] track_name: String, /// Discovery modes to enable (comma-separated: dht, mdns, dns). #[arg(long)] discovery: Option, } #[derive(Parser, Debug)] struct DirectPublishArgs { /// Global stream id (used for directory/signaling). If omitted, a fresh id is generated. #[arg(long)] stream_id: Option, /// Human-friendly title (used for directory listing). #[arg(long, default_value = "Live channel")] title: String, /// Optional directory base URL (e.g. https://every.channel). When set, the publisher /// announces the offer to `/api/announce` and polls `/api/answer`. #[arg(long)] directory_url: Option, /// Offer TTL (ms) when announcing to the directory. #[arg(long, default_value_t = 20000)] announce_ttl_ms: u64, /// Output directory for temporary chunks. #[arg(long, default_value = "./tmp/direct-chunks")] chunk_dir: PathBuf, /// Chunk duration in ms. #[arg(long, default_value_t = 2000)] chunk_ms: u64, /// Maximum number of media segments to send. #[arg(long, default_value_t = 30)] max_segments: usize, /// Optional answer code/link to avoid interactive stdin. #[arg(long)] answer: Option, /// How long to wait for a browser answer when using `--directory-url` or stdin (seconds). /// Set to 0 to wait indefinitely. #[arg(long, default_value_t = 900)] answer_timeout_secs: u64, /// Ingest source. #[command(subcommand)] source: IngestSource, } #[derive(Parser, Debug)] struct DirectSubscribeArgs { /// Directory base URL (e.g. https://every.channel). Used to locate offers by stream_id /// and to POST answers back to the publisher. #[arg(long, default_value = "https://every.channel")] directory_url: String, /// Stream id to locate in the directory. #[arg(long)] stream_id: Option, /// Direct offer link/code to use instead of looking up a stream in the directory. #[arg(long)] offer: Option, /// Output directory for captured CMAF fragments (init+segments) and index.m3u8. #[arg(long, default_value = "./tmp/direct-subscribe")] out_dir: PathBuf, /// Maximum number of media segments to capture (init not counted). #[arg(long, default_value_t = 12)] max_segments: usize, /// Optional time limit (seconds). When set, capture stops after this duration even if `--max-segments` is not reached. #[arg(long)] duration_secs: Option, /// If set, remux the captured playlist to this mp4 path (best-effort, `ffmpeg -c copy`). #[arg(long)] mp4: Option, } #[derive(Parser, Debug)] struct WsPublishArgs { /// Global stream id (used for directory listing + relay instance key). If omitted, a fresh id is generated. #[arg(long)] stream_id: Option, /// Human-friendly title (used for directory listing). #[arg(long, default_value = "Live channel")] title: String, /// Directory base URL (e.g. https://every.channel). Used for listing at `/api/announce`, /// and as the base for relay websocket URL (`/api/stream/ws`). #[arg(long, default_value = "https://every.channel")] directory_url: String, /// Offer TTL (ms) when announcing to the directory. #[arg(long, default_value_t = 20000)] announce_ttl_ms: u64, /// Output directory for temporary chunks. #[arg(long, default_value = "./tmp/ws-chunks")] chunk_dir: PathBuf, /// Chunk duration in ms. #[arg(long, default_value_t = 2000)] chunk_ms: u64, /// Maximum number of media segments to send (init not counted). Set high for "run forever" behavior. #[arg(long, default_value_t = 1_000_000)] max_segments: usize, /// Ingest source. #[command(subcommand)] source: IngestSource, } #[derive(Parser, Debug)] struct WsSubscribeArgs { /// Directory base URL (e.g. https://every.channel). Used for relay websocket URL (`/api/stream/ws`). #[arg(long, default_value = "https://every.channel")] directory_url: String, /// Stream id to subscribe to. #[arg(long)] stream_id: String, /// Output directory for captured CMAF fragments (init+segments) and index.m3u8. #[arg(long, default_value = "./tmp/ws-subscribe")] out_dir: PathBuf, /// Maximum number of media segments to capture (init not counted). #[arg(long, default_value_t = 12)] max_segments: usize, /// Optional time limit (seconds). When set, capture stops after this duration even if `--max-segments` is not reached. #[arg(long)] duration_secs: Option, /// If set, remux the captured playlist to this mp4 path (best-effort, `ffmpeg -c copy`). #[arg(long)] mp4: Option, } #[derive(Parser, Debug)] struct WtPublishArgs { /// Relay URL (WebTransport) to connect to. /// Default points at Cloudflare's MoQ technical preview relay. #[arg(long, default_value = "https://relay.cloudflare.mediaoverquic.com/")] url: String, /// Broadcast name to publish. /// /// This should be stable so you can share: /// `https://every.channel/watch?url=...&name=`. #[arg(long)] name: String, /// Input URL or file for ffmpeg (e.g. HDHomeRun `http://hdhomerun.local/auto/v4.1`). #[arg(long)] input: String, /// If set, transcode to H.264/AAC before fragmenting to fMP4. #[arg(long, default_value_t = true, action = clap::ArgAction::Set)] transcode: bool, /// Transmit fMP4 fragments directly (passthrough mode). /// When false, the importer may reframe into CMAF fragments. #[arg(long, default_value_t = true, action = clap::ArgAction::Set)] passthrough: bool, /// Danger: disable TLS verification for the relay. #[arg(long, default_value_t = false)] tls_disable_verify: bool, } #[derive(Subcommand, Debug)] enum IngestSource { /// Ingest from an HDHomeRun device. Hdhr { /// Hostname or IP (e.g. 192.168.1.10). If omitted, auto-discover. #[arg(long)] host: Option, /// Device ID (uses .local when host is omitted). #[arg(long)] device_id: Option, /// Channel number (e.g. 8.1). #[arg(long)] channel: Option, /// Channel name (e.g. KQED). #[arg(long)] name: Option, /// Prefer mDNS (hdhomerun.local) before UDP discovery. #[arg(long)] prefer_mdns: bool, }, /// Ingest from an HLS playlist URL. Hls { /// HLS playlist URL. url: String, /// Ingest mode (passthrough, remux, transcode). #[arg(long, value_enum, default_value_t = HlsMode::Passthrough)] mode: HlsMode, }, /// Ingest from a Linux DVB device. LinuxDvb { /// DVB adapter index. #[arg(long, default_value_t = 0)] adapter: u32, /// DVR device index. #[arg(long, default_value_t = 0)] dvr: u32, /// Optional tune command (repeat for each arg). #[arg(long)] tune_cmd: Vec, /// Optional tune wait (ms). #[arg(long)] tune_wait_ms: Option, }, /// Ingest from a raw TS file or URL. Ts { /// Input TS file or URL. input: String, }, } fn main() -> Result<()> { // Keep stdout reserved for machine-readable output (endpoint addr, etc). let filter = tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")); tracing_subscriber::fmt() .with_writer(std::io::stderr) .with_env_filter(filter) .init(); let cli = Cli::parse(); match cli.command { Commands::Ingest(args) => ingest(args)?, Commands::MoqPublish(args) => run_async(moq_publish(args))?, Commands::MoqSubscribe(args) => run_async(moq_subscribe(args))?, Commands::MoqSelftest(args) => run_async(moq_selftest(args))?, Commands::DirectPublish(args) => run_async(direct_publish(args))?, Commands::DirectSubscribe(args) => run_async(direct_subscribe(args))?, Commands::WsPublish(args) => run_async(ws_publish(args))?, Commands::WsSubscribe(args) => run_async(ws_subscribe(args))?, Commands::WtPublish(args) => run_async(wt_publish(args))?, } Ok(()) } fn run_async(future: F) -> Result<()> where F: Future>, { let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build()?; runtime.block_on(future) } fn ingest(args: IngestArgs) -> Result<()> { fs::create_dir_all(&args.chunk_dir) .with_context(|| format!("failed to create {}", args.chunk_dir.display()))?; let deterministic = deterministic_enabled(args.deterministic); let (source, _needs_transcode): (Box, bool) = match args.source { IngestSource::Hls { url, mut mode } => { if deterministic { mode = HlsMode::Transcode; } (Box::new(HlsSource { url, mode }), false) } IngestSource::Hdhr { host, device_id, channel, name, prefer_mdns, } => ( Box::new(HdhrSource { host, device_id, channel, name, prefer_mdns, }), deterministic, ), IngestSource::LinuxDvb { adapter, dvr, tune_cmd, tune_wait_ms, } => ( Box::new(LinuxDvbSource { adapter, dvr, tune_cmd, tune_wait_ms, }), deterministic, ), IngestSource::Ts { input } => (Box::new(TsSource { input }), deterministic), }; let source_id = source.source_id(); let source_id_for_stream = source_id.clone(); let reader = source.open_stream()?; let encoder_profile_id = if deterministic { "deterministic-h264-aac".to_string() } else { // NOTE: We still normalize into CMAF for interoperability, even when determinism is off. "h264-aac".to_string() }; // CMAF-only: segment into init.mp4 + segment_*.m4s via ffmpeg. let out_dir = args.chunk_dir.join("cmaf"); let (init_path, segments) = chunk_stream_cmaf_ffmpeg( reader, &out_dir, args.chunk_ms, args.max_chunks.unwrap_or(usize::MAX), deterministic, )?; let mut chunk_hashes = Vec::with_capacity(1 + segments.len()); // Chunk index 0 is always init.mp4; segments are 1..N. let (init_bytes, init_hash) = read_chunk_bytes_and_hash(&init_path)?; chunk_hashes.push(init_hash); let mut segment_meta = Vec::with_capacity(segments.len()); for seg_path in &segments { let (bytes, hash) = read_chunk_bytes_and_hash(seg_path)?; chunk_hashes.push(hash.clone()); segment_meta.push((seg_path.clone(), bytes, hash)); } let chunk_start_index = 0u64; let created_unix_ms = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64; let relay = FileRelay::new(args.relay_dir); let track = TrackName { namespace: "every.channel".to_string(), name: args.stream_id.unwrap_or_else(|| { StreamKey { version: 1, broadcast: None, source: Some(source_id_for_stream), profile: Some(format!("chunk-{}ms", args.chunk_ms)), variant: None, } .to_stream_id() .0 }), }; let stream_id = StreamId(track.name.clone()); let manifest_payload = build_manifest( stream_id, format!("epoch-{created_unix_ms}"), args.chunk_ms, chunk_start_index, encoder_profile_id, created_unix_ms, vec![StreamMetadata { key: "source_kind".to_string(), value: source_id.kind.clone(), }], chunk_hashes, )?; let manifest_id = manifest_payload.manifest_id.clone(); let manifest_path = args.chunk_dir.join("manifest.json"); fs::write( &manifest_path, serde_json::to_vec_pretty(&manifest_payload)?, )?; let network_secret = parse_network_secret(args.network_secret)?; // Publish init at chunk_index 0 to avoid colliding with segment_000000. publish_chunk_file( &relay, &track, TsChunk { index: 0, path: init_path, timing: ec_chopper::ChunkTiming { chunk_index: 0, chunk_start_27mhz: None, chunk_duration_27mhz: 0, utc_start_unix: None, sync_status: "init".to_string(), }, }, "video/mp4", Some(init_bytes), network_secret.as_deref(), Some(&manifest_id), )?; for (i, (seg_path, bytes, _hash)) in segment_meta.into_iter().enumerate() { let chunk_index = (i as u64) + 1; publish_chunk_file( &relay, &track, TsChunk { index: chunk_index, path: seg_path, timing: ec_chopper::ChunkTiming { chunk_index, chunk_start_27mhz: None, chunk_duration_27mhz: args.chunk_ms * 27_000, utc_start_unix: None, sync_status: "cmaf".to_string(), }, }, "video/iso.segment", Some(bytes), network_secret.as_deref(), Some(&manifest_id), )?; } Ok(()) } fn publish_chunk_file( relay: &FileRelay, track: &TrackName, chunk: TsChunk, content_type: &str, data_override: Option>, network_secret: Option<&[u8]>, manifest_id: Option<&str>, ) -> Result<()> { let (data, chunk_hash) = match data_override { Some(bytes) => { let hash = blake3::hash(&bytes).to_hex().to_string(); (bytes, hash) } None => read_chunk_bytes_and_hash(&chunk.path)?, }; let object = build_object( chunk, data, chunk_hash, None, network_secret, manifest_id, content_type, &track.name, )?; relay.write_object( track, GroupId( object .meta .timing .as_ref() .map(|t| t.chunk_index) .unwrap_or(0), ), ObjectId( object .meta .timing .as_ref() .map(|t| t.chunk_index) .unwrap_or(0), ), &object, ) } fn chunk_stream_cmaf_ffmpeg( mut reader: Box, out_dir: &std::path::Path, chunk_ms: u64, max_segments: usize, deterministic: bool, ) -> Result<(std::path::PathBuf, Vec)> { let _ = fs::remove_dir_all(out_dir); fs::create_dir_all(out_dir) .with_context(|| format!("failed to create {}", out_dir.display()))?; let profile = if deterministic { Some(ec_chopper::deterministic_h264_profile()) } else { // For now, keep encoder args stable even when determinism is off. Some(ec_chopper::deterministic_h264_profile()) }; let mut cmd = Command::new("ffmpeg"); cmd.current_dir(out_dir); cmd.arg("-hide_banner") .arg("-loglevel") .arg("error") .arg("-nostdin") .arg("-y") .arg("-i") .arg("pipe:0") // Keep stream mapping predictable. .arg("-map") .arg("0:v:0") .arg("-map") .arg("0:a:0?") .arg("-sn") .arg("-dn") .arg("-map_metadata") .arg("-1") // Reduce opportunities for non-deterministic scheduling in filters/decoders. .arg("-filter_threads") .arg("1") .arg("-filter_complex_threads") .arg("1") .arg("-threads") .arg("1"); if let Some(profile) = profile { for arg in ec_chopper::ffmpeg_profile_args(&profile) { cmd.arg(arg); } } let seg_time = format!("{:.3}", chunk_ms as f64 / 1000.0); cmd.arg("-f") .arg("hls") .arg("-hls_time") .arg(seg_time) .arg("-hls_list_size") .arg("0") .arg("-hls_segment_type") .arg("fmp4") .arg("-hls_flags") .arg("independent_segments") .arg("-hls_fmp4_init_filename") .arg("init.mp4") .arg("-hls_segment_filename") .arg("segment_%06d.m4s") .arg("index.m3u8") .stdin(Stdio::piped()) .stdout(Stdio::null()) .stderr(Stdio::inherit()); let mut child = cmd.spawn().with_context(|| "failed to spawn ffmpeg")?; let mut stdin = child .stdin .take() .ok_or_else(|| anyhow!("ffmpeg stdin unavailable"))?; let writer = std::thread::spawn(move || -> Result<()> { std::io::copy(&mut reader, &mut stdin)?; Ok(()) }); let init_path = out_dir.join("init.mp4"); wait_for_stable_file(&init_path, Duration::from_secs(20))?; let mut segments = Vec::new(); for i in 0..max_segments { let seg_path = out_dir.join(format!("segment_{i:06}.m4s")); match wait_for_stable_file(&seg_path, Duration::from_secs(30)) { Ok(()) => segments.push(seg_path), Err(err) => { // If ffmpeg ended cleanly, stop; otherwise bubble the error. if let Ok(Some(status)) = child.try_wait() { if status.success() { break; } return Err(anyhow!("ffmpeg exited with {status} ({err:#})")); } return Err(err); } } } let _ = child.kill(); let _ = child.wait(); let _ = writer.join(); Ok((init_path, segments)) } fn parse_network_secret(value: Option) -> Result>> { let value = value.or_else(|| std::env::var("EVERY_CHANNEL_NETWORK_SECRET").ok()); let Some(value) = value else { return Ok(None) }; let bytes = hex::decode(value).context("network secret must be hex")?; Ok(Some(bytes)) } fn deterministic_enabled(flag: bool) -> bool { if flag { return true; } std::env::var("EVERY_CHANNEL_DETERMINISTIC") .ok() .map(|value| { let value = value.trim().to_ascii_lowercase(); value == "1" || value == "true" || value == "yes" || value == "on" }) .unwrap_or(false) } fn read_chunk_bytes_and_hash(path: &std::path::Path) -> Result<(Vec, String)> { let data = fs::read(path).with_context(|| format!("failed to read {}", path.display()))?; let hash = blake3::hash(&data).to_hex().to_string(); Ok((data, hash)) } fn build_manifest( stream_id: StreamId, epoch_id: impl Into, chunk_duration_ms: u64, chunk_start_index: u64, encoder_profile_id: impl Into, created_unix_ms: u64, metadata: Vec, chunk_hashes: Vec, ) -> Result { let body = build_manifest_body_for_chunks( stream_id, epoch_id, chunk_duration_ms, chunk_start_index, encoder_profile_id, created_unix_ms, metadata, &chunk_hashes, )?; let manifest_id = body.manifest_id()?; let mut signatures = Vec::new(); if let Some(keypair) = load_manifest_keypair_from_env().map_err(|err| anyhow!(err))? { signatures.push(sign_manifest_id(&manifest_id, &keypair)); } Ok(Manifest { body, manifest_id, signatures, }) } #[derive(Debug, Clone)] struct CmafVariantSpec { id: String, width: u32, height: u32, video_bitrate_kbps: u32, } fn cmaf_ladder_variants(preset: CmafLadderPreset) -> Vec { match preset { CmafLadderPreset::Hd3 => vec![ CmafVariantSpec { id: "1080p".to_string(), width: 1920, height: 1080, video_bitrate_kbps: 6000, }, CmafVariantSpec { id: "720p".to_string(), width: 1280, height: 720, video_bitrate_kbps: 3000, }, CmafVariantSpec { id: "480p".to_string(), width: 854, height: 480, video_bitrate_kbps: 1200, }, ], } } fn sanitize_component(value: &str) -> String { value .chars() .map(|c| match c { 'a'..='z' | '0'..='9' | '-' | '_' | '/' => c, 'A'..='Z' => c.to_ascii_lowercase(), _ => '_', }) .collect() } fn derive_variant_stream_id(base_stream_id: &str, variant_id: &str) -> String { // Match StreamKey encoding style (`variant-...`) without requiring we reconstruct StreamKey. let v = sanitize_component(variant_id); format!("{}/variant-{}", base_stream_id.trim_end_matches('/'), v) } fn build_multi_variant_manifest( base_stream_id: StreamId, epoch_id: String, chunk_duration_ms: u64, chunk_start_index: u64, encoder_profile_id: String, created_unix_ms: u64, metadata: Vec, variants: &[CmafVariantSpec], variant_chunk_start_index: u64, per_variant_hash: Vec<(String, String)>, ) -> Result { let mut entries = Vec::new(); for variant in variants { let Some((_, hash)) = per_variant_hash.iter().find(|(id, _)| id == &variant.id) else { return Err(anyhow!("missing hash for variant {}", variant.id)); }; let chunk_hashes = vec![hash.clone()]; let merkle_root = ec_core::merkle_root_from_hashes(&chunk_hashes).map_err(|err| anyhow!("{err}"))?; let stream_id = StreamId(derive_variant_stream_id(&base_stream_id.0, &variant.id)); entries.push(ManifestVariant { variant_id: variant.id.clone(), stream_id, chunk_start_index: variant_chunk_start_index, total_chunks: 1, merkle_root, chunk_hashes, metadata: vec![ StreamMetadata { key: "width".to_string(), value: variant.width.to_string(), }, StreamMetadata { key: "height".to_string(), value: variant.height.to_string(), }, StreamMetadata { key: "video_bitrate_kbps".to_string(), value: variant.video_bitrate_kbps.to_string(), }, ], }); } entries.sort_by(|a, b| a.variant_id.cmp(&b.variant_id)); let roots = entries .iter() .map(|v| v.merkle_root.clone()) .collect::>(); let body_root = ec_core::merkle_root_from_hashes(&roots).map_err(|err| anyhow!("{err}"))?; let body = ec_core::ManifestBody { stream_id: base_stream_id, epoch_id, chunk_duration_ms, total_chunks: 1, chunk_start_index, encoder_profile_id, merkle_root: body_root, created_unix_ms, metadata, chunk_hashes: Vec::new(), variants: Some(entries), }; let manifest_id = body.manifest_id()?; let mut signatures = Vec::new(); if let Some(keypair) = load_manifest_keypair_from_env().map_err(|err| anyhow!(err))? { signatures.push(sign_manifest_id(&manifest_id, &keypair)); } Ok(Manifest { body, manifest_id, signatures, }) } struct EpochBuffer { capacity: usize, chunks: Vec, data: Vec>>, hashes: Vec, start_index: Option, } impl EpochBuffer { fn new(capacity: usize) -> Self { Self { capacity: capacity.max(1), chunks: Vec::new(), data: Vec::new(), hashes: Vec::new(), start_index: None, } } fn push(&mut self, chunk: TsChunk, data: Option>, hash: String) { if self.start_index.is_none() { self.start_index = Some(chunk.timing.chunk_index); } self.chunks.push(chunk); self.data.push(data); self.hashes.push(hash); } fn is_full(&self) -> bool { self.chunks.len() >= self.capacity } fn is_empty(&self) -> bool { self.chunks.is_empty() } fn start_index(&self) -> u64 { self.start_index.unwrap_or(0) } fn take(&mut self) -> (Vec, Vec>>, Vec) { self.start_index = None; let chunks = std::mem::take(&mut self.chunks); let data = std::mem::take(&mut self.data); let hashes = std::mem::take(&mut self.hashes); (chunks, data, hashes) } } fn parse_manifest_allowlist(value: Option<&str>) -> Option> { let value = value?.trim(); if value.is_empty() { return None; } let set = value .split(|c: char| c == ',' || c == ';' || c.is_whitespace()) .filter_map(|token| { let trimmed = token.trim(); if trimmed.is_empty() { None } else { Some(trimmed.to_string()) } }) .collect::>(); if set.is_empty() { None } else { Some(set) } } fn validate_manifest(manifest: &Manifest, allowlist: Option<&HashSet>) -> bool { let body_id = match manifest.body.manifest_id() { Ok(id) => id, Err(_) => return false, }; if body_id != manifest.manifest_id { return false; } if let Some(variants) = manifest.body.variants.as_ref().filter(|v| !v.is_empty()) { // Multi-variant: validate each variant and ensure body merkle_root commits to per-variant roots. let mut roots = Vec::with_capacity(variants.len()); for variant in variants { if variant.chunk_hashes.len() != variant.total_chunks as usize { return false; } match ec_core::merkle_root_from_hashes(&variant.chunk_hashes) { Ok(root) if root == variant.merkle_root => {} _ => return false, } roots.push((variant.variant_id.clone(), variant.merkle_root.clone())); } roots.sort_by(|a, b| a.0.cmp(&b.0)); let ordered = roots.into_iter().map(|(_, root)| root).collect::>(); match ec_core::merkle_root_from_hashes(&ordered) { Ok(root) if root == manifest.body.merkle_root => {} _ => return false, } } else if !manifest.body.chunk_hashes.is_empty() { if manifest.body.chunk_hashes.len() != manifest.body.total_chunks as usize { return false; } match ec_core::merkle_root_from_hashes(&manifest.body.chunk_hashes) { Ok(root) if root == manifest.body.merkle_root => {} _ => return false, } } if let Some(allowlist) = allowlist { return manifest.signatures.iter().any(|sig| { verify_manifest_signature(&manifest.manifest_id, sig) && allowlist.contains(&sig.signer_id) }); } if manifest.signatures.is_empty() { // Unsigned manifests are only acceptable when they include full hashes for verification. return !manifest.body.chunk_hashes.is_empty() || manifest .body .variants .as_ref() .is_some_and(|v| !v.is_empty()); } manifest .signatures .iter() .any(|sig| verify_manifest_signature(&manifest.manifest_id, sig)) } fn strip_init_suffix(key_id: &str) -> &str { key_id.strip_suffix("/init").unwrap_or(key_id) } fn manifest_hash_for_chunk( manifest: &Manifest, stream_id: &str, chunk_index: u64, ) -> Option { if let Some(variants) = manifest.body.variants.as_ref() { let variant = variants.iter().find(|v| v.stream_id.0 == stream_id)?; if chunk_index < variant.chunk_start_index { return None; } let offset = (chunk_index - variant.chunk_start_index) as usize; return variant.chunk_hashes.get(offset).cloned(); } // Legacy single-variant manifests. if manifest.body.stream_id.0 != stream_id { return None; } if chunk_index < manifest.body.chunk_start_index { return None; } let offset = (chunk_index - manifest.body.chunk_start_index) as usize; manifest.body.chunk_hashes.get(offset).cloned() } fn manifest_covers_stream_index(manifest: &Manifest, stream_id: &str, chunk_index: u64) -> bool { if let Some(variants) = manifest.body.variants.as_ref() { let Some(variant) = variants.iter().find(|v| v.stream_id.0 == stream_id) else { return false; }; if chunk_index < variant.chunk_start_index { return false; } let end = variant .chunk_start_index .saturating_add(variant.total_chunks as u64); return chunk_index < end; } if manifest.body.stream_id.0 != stream_id { return false; } if chunk_index < manifest.body.chunk_start_index { return false; } let end = manifest .body .chunk_start_index .saturating_add(manifest.body.total_chunks as u64); chunk_index < end } fn find_manifest_for_stream_index( store: &HashMap, stream_id: &str, chunk_index: u64, ) -> Option { // Prefer the latest manifest whose range covers the index for this stream. let mut best: Option<&Manifest> = None; for manifest in store.values() { if !manifest_covers_stream_index(manifest, stream_id, chunk_index) { continue; } match best { None => best = Some(manifest), Some(current) => { if manifest.body.chunk_start_index >= current.body.chunk_start_index { best = Some(manifest); } } } } best.cloned() } fn build_object( chunk: TsChunk, data: Vec, chunk_hash: String, chunk_proof: Option>, network_secret: Option<&[u8]>, manifest_id: Option<&str>, content_type: &str, key_id: &str, ) -> Result { let created_unix_ms = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64; let timing = TimingMeta { chunk_index: chunk.timing.chunk_index, chunk_start_27mhz: chunk.timing.chunk_start_27mhz.unwrap_or(0), chunk_duration_27mhz: chunk.timing.chunk_duration_27mhz, utc_start_unix: chunk.timing.utc_start_unix, sync_status: chunk.timing.sync_status.clone(), }; let encrypted = encrypt_stream_data(key_id, chunk.timing.chunk_index, &data, network_secret); let meta = ObjectMeta { created_unix_ms, content_type: content_type.to_string(), size_bytes: encrypted.ciphertext.len() as u64, timing: Some(timing), encryption: Some(ec_moq::EncryptionMeta { alg: encrypted.alg.to_string(), key_id: key_id.to_string(), nonce_hex: hex::encode(encrypted.nonce), }), chunk_hash: Some(chunk_hash), chunk_hash_alg: Some("blake3".to_string()), chunk_proof, chunk_proof_alg: Some("merkle+blake3".to_string()), manifest_id: manifest_id.map(|value| value.to_string()), }; Ok(ObjectPayload { meta, data: encrypted.ciphertext, }) } fn flush_epoch_publish( publish_set: &mut MoqPublishSet, object_track_name: &str, manifest_track_name: &str, publish_chunks: bool, publish_manifests: bool, epoch_buffer: &mut EpochBuffer, stream_id_value: &StreamId, chunk_ms: u64, encoder_profile_id: &str, source_kind: &str, network_secret: Option<&[u8]>, segment_content_type: &str, key_id: &str, object_sequence: &mut u64, manifest_sequence: &mut u64, announce_tx: Option<&tokio::sync::mpsc::UnboundedSender>, ) -> Result<()> { if epoch_buffer.is_empty() { return Ok(()); } let (chunks, datas, hashes) = epoch_buffer.take(); let start_index = chunks .first() .map(|chunk| chunk.timing.chunk_index) .unwrap_or_else(|| epoch_buffer.start_index()); let created_unix_ms = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64; let mut manifest_id = None; if publish_manifests { let manifest = build_manifest( stream_id_value.clone(), format!("epoch-{start_index}"), chunk_ms, start_index, encoder_profile_id.to_string(), created_unix_ms, vec![StreamMetadata { key: "source_kind".to_string(), value: source_kind.to_string(), }], hashes.clone(), )?; manifest_id = Some(manifest.manifest_id.clone()); publish_set.publish_manifest(manifest_track_name, *manifest_sequence, &manifest)?; *manifest_sequence += 1; if let Some(tx) = announce_tx { let _ = tx.send(manifest.summary()); } } // Compute per-chunk Merkle proofs so subscribers can validate membership // even if future manifests omit the full chunk hash list. let mut proofs = Vec::with_capacity(hashes.len()); for (offset, _) in hashes.iter().enumerate() { proofs.push(merkle_proof_for_index(&hashes, offset)?); } if publish_chunks { for (((chunk, data), hash), proof) in chunks.into_iter().zip(datas).zip(hashes).zip(proofs) { let Some(data) = data else { return Err(anyhow!("missing chunk data for publish")); }; let object = build_object( chunk, data, hash, Some(proof), network_secret, manifest_id.as_deref(), segment_content_type, key_id, )?; tracing::info!( "publish segment chunk_index={} bytes={} content_type={}", object .meta .timing .as_ref() .map(|t| t.chunk_index) .unwrap_or(0), object.data.len(), object.meta.content_type ); publish_set.publish_object(object_track_name, GroupId(*object_sequence), object)?; *object_sequence += 1; } } Ok(()) } #[cfg(test)] mod tests { use super::*; #[test] fn parse_manifest_allowlist_splits_and_trims() { let set = parse_manifest_allowlist(Some(" a,b ; c\t d ")).unwrap(); assert!(set.contains("a")); assert!(set.contains("b")); assert!(set.contains("c")); assert!(set.contains("d")); } #[test] fn deterministic_enabled_reads_env() { let prev = std::env::var("EVERY_CHANNEL_DETERMINISTIC").ok(); std::env::set_var("EVERY_CHANNEL_DETERMINISTIC", "true"); assert!(deterministic_enabled(false)); std::env::set_var("EVERY_CHANNEL_DETERMINISTIC", "0"); assert!(!deterministic_enabled(false)); match prev { Some(value) => std::env::set_var("EVERY_CHANNEL_DETERMINISTIC", value), None => std::env::remove_var("EVERY_CHANNEL_DETERMINISTIC"), } } #[test] fn parse_network_secret_accepts_hex_and_rejects_invalid() { let out = parse_network_secret(Some("00".repeat(8))).unwrap().unwrap(); assert_eq!(out.len(), 8); assert!(parse_network_secret(Some("not-hex".to_string())).is_err()); } fn build_valid_manifest(unsigned: bool) -> Manifest { let chunk_hashes = vec![ blake3::hash(b"c0").to_hex().to_string(), blake3::hash(b"c1").to_hex().to_string(), ]; let body = build_manifest_body_for_chunks( StreamId("s".to_string()), "epoch-1", 2000, 10, "p", 1, Vec::new(), &chunk_hashes, ) .unwrap(); let manifest_id = body.manifest_id().unwrap(); let signatures = if unsigned { Vec::new() } else { let prev = std::env::var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY").ok(); std::env::set_var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY", "11".repeat(32)); let keypair = load_manifest_keypair_from_env().unwrap().unwrap(); let sig = sign_manifest_id(&manifest_id, &keypair); match prev { Some(value) => std::env::set_var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY", value), None => std::env::remove_var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY"), } vec![sig] }; Manifest { body, manifest_id, signatures, } } #[test] fn validate_manifest_accepts_unsigned_only_with_hashes() { let mut manifest = build_valid_manifest(true); assert!(validate_manifest(&manifest, None)); // Remove hashes: unsigned should be rejected. manifest.body.chunk_hashes.clear(); manifest.body.total_chunks = 0; manifest.body.merkle_root = "00".repeat(32); manifest.manifest_id = manifest.body.manifest_id().unwrap(); assert!(!validate_manifest(&manifest, None)); } #[test] fn validate_manifest_accepts_signed_and_obeys_allowlist() { let manifest = build_valid_manifest(false); assert!(validate_manifest(&manifest, None)); let signer = manifest.signatures[0].signer_id.clone(); let allow = HashSet::from([signer.clone()]); assert!(validate_manifest(&manifest, Some(&allow))); let deny = HashSet::from(["other".to_string()]); assert!(!validate_manifest(&manifest, Some(&deny))); } #[test] fn manifest_hash_for_chunk_indexes_into_hash_list() { let manifest = build_valid_manifest(true); let sid = manifest.body.stream_id.0.as_str(); assert!(manifest_hash_for_chunk(&manifest, sid, 9).is_none()); assert_eq!( manifest_hash_for_chunk(&manifest, sid, 10).as_deref(), Some(manifest.body.chunk_hashes[0].as_str()) ); assert_eq!( manifest_hash_for_chunk(&manifest, sid, 11).as_deref(), Some(manifest.body.chunk_hashes[1].as_str()) ); assert!(manifest_hash_for_chunk(&manifest, sid, 12).is_none()); } } async fn moq_publish(args: MoqPublishArgs) -> Result<()> { fs::create_dir_all(&args.chunk_dir) .with_context(|| format!("failed to create {}", args.chunk_dir.display()))?; let deterministic = deterministic_enabled(args.deterministic); let (source, _needs_transcode): (Box, bool) = match args.source { IngestSource::Hls { url, mut mode } => { if deterministic { mode = HlsMode::Transcode; } (Box::new(HlsSource { url, mode }), false) } IngestSource::Hdhr { host, device_id, channel, name, prefer_mdns, } => ( Box::new(HdhrSource { host, device_id, channel, name, prefer_mdns, }), deterministic, ), IngestSource::LinuxDvb { adapter, dvr, tune_cmd, tune_wait_ms, } => ( Box::new(LinuxDvbSource { adapter, dvr, tune_cmd, tune_wait_ms, }), deterministic, ), IngestSource::Ts { input } => (Box::new(TsSource { input }), deterministic), }; let source_id = source.source_id(); let source_id_for_stream = source_id.clone(); let stream_id = args.stream_id.unwrap_or_else(|| { StreamKey { version: 1, broadcast: None, source: Some(source_id_for_stream), profile: Some(format!("chunk-{}ms", args.chunk_ms)), variant: None, } .to_stream_id() .0 }); let broadcast_name = args.broadcast_name.unwrap_or_else(|| stream_id.clone()); let track_name = args.track_name.clone(); let secret = parse_iroh_secret(args.iroh_secret)?; let discovery = parse_discovery(args.discovery.as_deref())?; let node = MoqNode::bind_with_discovery(secret, discovery).await?; // Wait briefly for direct addresses so "remote" can connect without needing discovery. let mut endpoint_addr = node.endpoint_addr(); if endpoint_addr.addrs.is_empty() { let mut watcher = node.endpoint().watch_addr(); let start = Instant::now(); while endpoint_addr.addrs.is_empty() && start.elapsed() < Duration::from_secs(3) { tokio::time::sleep(Duration::from_millis(200)).await; endpoint_addr = watcher.get(); } } println!("moq endpoint id: {}", node.endpoint().id()); if let Ok(addr_json) = serde_json::to_string(&endpoint_addr) { println!("moq endpoint addr: {}", addr_json); } println!("moq broadcast: {}", broadcast_name); println!("moq track: {}", track_name); let publish_chunks = args.publish_chunks; let ladder = args.cmaf_ladder; let ladder_variants = ladder.map(cmaf_ladder_variants); let mut object_tracks = Vec::new(); if let Some(variants) = ladder_variants.as_ref() { if !publish_chunks { return Err(anyhow!("--cmaf-ladder requires --publish-chunks true")); } for variant in variants { object_tracks.push(format!("{}/{}", track_name, variant.id)); object_tracks.push(format!("{}/{}", args.init_track, variant.id)); } } else { object_tracks.push(track_name.clone()); if publish_chunks { object_tracks.push(args.init_track.clone()); } } let mut manifest_tracks = Vec::new(); if args.publish_manifests { manifest_tracks.push(args.manifest_track.clone()); } let mut publish_set = node .publish_track_set(&broadcast_name, object_tracks, manifest_tracks) .await?; if let Some(ms) = args.startup_delay_ms { tokio::time::sleep(Duration::from_millis(ms)).await; } let network_secret = parse_network_secret(args.network_secret)?; let track = TrackName { namespace: "every.channel".to_string(), name: stream_id.clone(), }; let stream_id_value = StreamId(track.name.clone()); let source_kind = source_id.kind.clone(); let encoder_profile_id = "deterministic-h264-aac".to_string(); if let Some(variants) = ladder_variants { if args.epoch_chunks != 1 { return Err(anyhow!("--cmaf-ladder currently requires --epoch-chunks 1")); } if !args.publish_manifests { return Err(anyhow!( "--cmaf-ladder currently requires --publish-manifests" )); } #[derive(Debug)] enum PendingPublish { Object { track: String, group: u64, object: ObjectPayload, }, Manifest { track: String, sequence: u64, manifest: Manifest, }, } let (tx, mut rx) = mpsc::channel::(16); let chunk_ms = args.chunk_ms; let max_chunks = args.max_chunks.unwrap_or(usize::MAX); let out_dir = args.chunk_dir.join("cmaf-ladder"); let init_track_prefix = args.init_track.clone(); let chunk_track_prefix = track_name.clone(); let manifest_track = args.manifest_track.clone(); let publish_manifests = args.publish_manifests; let source_kind = source_id.kind.clone(); let base_stream_id = stream_id.clone(); let network_secret_bytes = network_secret.clone(); let startup_delay_ms = args.startup_delay_ms; let chunk_task = tokio::task::spawn_blocking(move || -> Result<()> { let _ = fs::remove_dir_all(&out_dir); fs::create_dir_all(&out_dir) .with_context(|| format!("failed to create {}", out_dir.display()))?; for variant in &variants { fs::create_dir_all(out_dir.join(&variant.id))?; } let mut cmd = Command::new("ffmpeg"); cmd.current_dir(&out_dir); cmd.arg("-hide_banner") .arg("-loglevel") .arg("error") .arg("-nostdin") .arg("-y") .arg("-i") .arg("pipe:0") // Reduce opportunities for non-deterministic scheduling in filters/decoders. .arg("-filter_threads") .arg("1") .arg("-filter_complex_threads") .arg("1") .arg("-threads") .arg("1") .arg("-map") .arg("0:v:0") .arg("-map") .arg("0:a:0?") .arg("-sn") .arg("-dn") .arg("-map_metadata") .arg("-1"); // Build a split+scale filter graph for all variants. // NOTE: We keep it simple: split video N ways, then scale each output. let mut filter = String::new(); filter.push_str(&format!("[0:v]split={}", variants.len())); for (i, _) in variants.iter().enumerate() { filter.push_str(&format!("[v{i}]")); } filter.push(';'); for (i, variant) in variants.iter().enumerate() { // Scale flags influence quality but should be deterministic. filter.push_str(&format!( "[v{i}]scale=w={}:h={}:flags=bicubic[v{i}o];", variant.width, variant.height )); } cmd.arg("-filter_complex").arg(filter); let seg_time = format!("{:.3}", chunk_ms as f64 / 1000.0); for (i, variant) in variants.iter().enumerate() { let v_bitrate = format!("{}k", variant.video_bitrate_kbps); let bufsize = format!("{}k", variant.video_bitrate_kbps.saturating_mul(2)); let out_variant_dir = out_dir.join(&variant.id); let seg_template = out_variant_dir.join("segment_%06d.m4s"); let seg_template = seg_template .to_str() .ok_or_else(|| anyhow!("invalid segment template path"))? .to_string(); cmd.arg("-map") .arg(format!("[v{i}o]")) .arg("-map") .arg("0:a:0?") .arg("-c:v") .arg("libx264") // Force keyframes aligned to segment boundaries (chunk_ms). .arg("-force_key_frames") .arg(format!( "expr:gte(t,n_forced*{:.3})", chunk_ms as f64 / 1000.0 )) .arg("-b:v") .arg(&v_bitrate) .arg("-minrate") .arg(&v_bitrate) .arg("-maxrate") .arg(&v_bitrate) .arg("-bufsize") .arg(&bufsize) .arg("-c:a") .arg("aac") .arg("-b:a") .arg("128k") .arg("-ac") .arg("2") .arg("-ar") .arg("48000") .arg("-pix_fmt") .arg("yuv420p") .arg("-g") .arg("60") .arg("-keyint_min") .arg("60") .arg("-sc_threshold") .arg("0") .arg("-bf") .arg("0") .arg("-threads") .arg("1") .arg("-fflags") .arg("+bitexact") .arg("-flags:v") .arg("+bitexact") .arg("-flags:a") .arg("+bitexact") .arg("-f") .arg("hls") .arg("-hls_time") .arg(&seg_time) .arg("-hls_list_size") .arg("0") .arg("-hls_segment_type") .arg("fmp4") .arg("-hls_flags") .arg("independent_segments") .arg("-hls_fmp4_init_filename") .arg("init.mp4") .arg("-hls_segment_filename") .arg(seg_template) .arg(out_variant_dir.join("index.m3u8")); } cmd.stdin(Stdio::piped()) .stdout(Stdio::null()) .stderr(Stdio::inherit()); let mut child = cmd.spawn().with_context(|| "failed to spawn ffmpeg")?; let mut stdin = child .stdin .take() .ok_or_else(|| anyhow!("ffmpeg stdin unavailable"))?; let mut reader = source.open_stream()?; let writer = std::thread::spawn(move || -> Result<()> { std::io::copy(&mut reader, &mut stdin)?; Ok(()) }); // Optional startup delay to allow subscribers to connect. if let Some(ms) = startup_delay_ms { std::thread::sleep(Duration::from_millis(ms)); } // Publish init per variant as soon as they exist. for variant in &variants { let init_path = out_dir.join(&variant.id).join("init.mp4"); wait_for_stable_file(&init_path, Duration::from_secs(20))?; let data = fs::read(&init_path)?; let hash = blake3::hash(&data).to_hex().to_string(); let key_id = format!( "{}/init", derive_variant_stream_id(&base_stream_id, &variant.id) ); let object = build_object( TsChunk { index: 0, path: init_path, timing: ec_chopper::ChunkTiming { chunk_index: 0, chunk_start_27mhz: None, chunk_duration_27mhz: 0, utc_start_unix: None, sync_status: "init".to_string(), }, }, data, hash, None, network_secret_bytes.as_deref(), None, "video/mp4", &key_id, )?; tx.blocking_send(PendingPublish::Object { track: format!("{}/{}", init_track_prefix, variant.id), group: 0, object, }) .map_err(|_| anyhow!("publish channel closed"))?; } let mut manifest_seq: u64 = 0; for index in 0..max_chunks { // Wait for each variant's segment for this index. let mut per_variant_segments = Vec::with_capacity(variants.len()); let mut per_variant_hashes = Vec::with_capacity(variants.len()); for variant in &variants { let seg_path = out_dir .join(&variant.id) .join(format!("segment_{index:06}.m4s")); wait_for_stable_file(&seg_path, Duration::from_secs(30))?; let data = fs::read(&seg_path)?; let hash = blake3::hash(&data).to_hex().to_string(); per_variant_segments.push((variant, seg_path, data, hash.clone())); per_variant_hashes.push((variant, hash)); } let created_unix_ms = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64; let epoch_id = format!("epoch-{created_unix_ms}"); let manifest = if publish_manifests { build_multi_variant_manifest( StreamId(base_stream_id.clone()), epoch_id, chunk_ms, index as u64, encoder_profile_id.clone(), created_unix_ms, vec![StreamMetadata { key: "source_kind".to_string(), value: source_kind.clone(), }], &variants, index as u64, per_variant_hashes .iter() .map(|(v, h)| (v.id.clone(), h.clone())) .collect(), )? } else { // Still build an unsigned manifest for internal linkage if desired. build_multi_variant_manifest( StreamId(base_stream_id.clone()), epoch_id, chunk_ms, index as u64, encoder_profile_id.clone(), created_unix_ms, vec![StreamMetadata { key: "source_kind".to_string(), value: source_kind.clone(), }], &variants, index as u64, per_variant_hashes .iter() .map(|(v, h)| (v.id.clone(), h.clone())) .collect(), )? }; tx.blocking_send(PendingPublish::Manifest { track: manifest_track.clone(), sequence: manifest_seq, manifest: manifest.clone(), }) .map_err(|_| anyhow!("publish channel closed"))?; manifest_seq += 1; // Publish segment objects for each variant, linked to the manifest. for (variant, seg_path, data, hash) in per_variant_segments { let key_id = derive_variant_stream_id(&base_stream_id, &variant.id); let chunk = TsChunk { index: index as u64, path: seg_path, timing: ec_chopper::ChunkTiming { chunk_index: index as u64, chunk_start_27mhz: None, chunk_duration_27mhz: chunk_ms * 27_000, utc_start_unix: None, sync_status: "cmaf".to_string(), }, }; let object = build_object( chunk, data, hash, None, network_secret_bytes.as_deref(), Some(&manifest.manifest_id), "video/iso.segment", &key_id, )?; tx.blocking_send(PendingPublish::Object { track: format!("{}/{}", chunk_track_prefix, variant.id), group: (index as u64) + 1, object, }) .map_err(|_| anyhow!("publish channel closed"))?; } } let _ = child.kill(); let _ = child.wait(); let _ = writer.join(); Ok(()) }); while let Some(item) = rx.recv().await { match item { PendingPublish::Object { track, group, object, } => { publish_set.publish_object(&track, GroupId(group), object)?; } PendingPublish::Manifest { track, sequence, manifest, } => { publish_set.publish_manifest(&track, sequence, &manifest)?; } } } chunk_task .await .map_err(|err| anyhow!("chunk task join error: {err}"))??; return Ok(()); } let needs_init_track = publish_chunks; let mut epoch_buffer = EpochBuffer::new(args.epoch_chunks); // Some early MoQ implementations have surprising assumptions around group sequence ids. // In CMAF mode we reserve group 0 for the init segment on the init track and start // segment groups at 1 to avoid any potential cross-track collisions. let mut object_sequence: u64 = if needs_init_track { 1 } else { 0 }; let mut manifest_sequence: u64 = 0; let announce_tx = if args.announce { Some( spawn_catalog_announcer( &node, &track, &broadcast_name, &track_name, args.gossip_peer.clone(), ) .await?, ) } else { None }; #[derive(Debug)] enum PendingKind { Init, Segment, } #[derive(Debug)] struct PendingChunk { kind: PendingKind, chunk: TsChunk, data: Option>, hash: String, } let segment_content_type = "video/iso.segment"; // Chunking is CPU and IO heavy and must not block the async runtime. // We do ingest + chunking on a blocking thread and feed finalized chunks // to the async publisher over a channel. let (tx, mut rx) = mpsc::channel::(8); let chunk_dir = args.chunk_dir.clone(); let chunk_ms = args.chunk_ms; let max_chunks = args.max_chunks; let chunk_task = tokio::task::spawn_blocking(move || -> Result<()> { let out_dir = chunk_dir.join("cmaf"); let _ = fs::remove_dir_all(&out_dir); fs::create_dir_all(&out_dir) .with_context(|| format!("failed to create {}", out_dir.display()))?; let profile = ec_chopper::deterministic_h264_profile(); let mut cmd = std::process::Command::new("ffmpeg"); cmd.current_dir(&out_dir); cmd.arg("-hide_banner") .arg("-loglevel") .arg("error") .arg("-nostdin") .arg("-y") .arg("-i") .arg("pipe:0"); // Reduce non-determinism and "surprise streams" (data/subtitles, extra audio). // We intentionally pick the first video stream and (optionally) the first audio stream. cmd.arg("-map").arg("0:v:0"); cmd.arg("-map").arg("0:a:0?"); cmd.arg("-sn").arg("-dn"); cmd.arg("-map_metadata").arg("-1"); for arg in ec_chopper::ffmpeg_profile_args(&profile) { cmd.arg(arg); } let seg_time = format!("{:.3}", chunk_ms as f64 / 1000.0); let seg_template = "segment_%06d.m4s".to_string(); let init_filename = "init.mp4".to_string(); let playlist = "index.m3u8".to_string(); cmd.arg("-f") .arg("hls") .arg("-hls_time") .arg(seg_time) .arg("-hls_list_size") .arg("0") .arg("-hls_segment_type") .arg("fmp4") .arg("-hls_flags") .arg("independent_segments") .arg("-hls_fmp4_init_filename") .arg(init_filename) .arg("-hls_segment_filename") .arg(seg_template) .arg(playlist) .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::inherit()); let mut child = cmd .spawn() .with_context(|| "failed to spawn ffmpeg".to_string())?; let mut stdin = child .stdin .take() .ok_or_else(|| anyhow!("ffmpeg stdin unavailable"))?; let mut reader = source.open_stream()?; let writer = std::thread::spawn(move || -> Result<()> { std::io::copy(&mut reader, &mut stdin)?; Ok(()) }); let init_path = out_dir.join("init.mp4"); if publish_chunks { wait_for_stable_file(&init_path, Duration::from_secs(10))?; let data = fs::read(&init_path)?; let hash = blake3::hash(&data).to_hex().to_string(); let chunk = TsChunk { index: 0, path: init_path.clone(), timing: ec_chopper::ChunkTiming { chunk_index: 0, chunk_start_27mhz: None, chunk_duration_27mhz: 0, utc_start_unix: None, sync_status: "init".to_string(), }, }; tx.blocking_send(PendingChunk { kind: PendingKind::Init, chunk, data: Some(data), hash, }) .map_err(|_| anyhow!("publish channel closed"))?; } let limit = max_chunks.unwrap_or(usize::MAX); for index in 0..limit { let seg_path = out_dir.join(format!("segment_{index:06}.m4s")); // If the segment never appears and ffmpeg exited, we are done (short input). // If the segment never appears and ffmpeg is still running, treat as error. match wait_for_stable_file(&seg_path, Duration::from_secs(20)) { Ok(()) => {} Err(err) => { if let Ok(Some(status)) = child.try_wait() { if status.success() { break; } return Err(anyhow!("ffmpeg exited with {status} ({err:#})")); } return Err(err); } } let data = if publish_chunks { Some(fs::read(&seg_path)?) } else { None }; let hash = if let Some(ref bytes) = data { blake3::hash(bytes).to_hex().to_string() } else { ec_chopper::hash_file_blake3(&seg_path)? }; let chunk = TsChunk { index: index as u64, path: seg_path.clone(), timing: ec_chopper::ChunkTiming { chunk_index: index as u64, chunk_start_27mhz: None, chunk_duration_27mhz: chunk_ms * 27_000, utc_start_unix: None, sync_status: "cmaf".to_string(), }, }; tx.blocking_send(PendingChunk { kind: PendingKind::Segment, chunk, data, hash, }) .map_err(|_| anyhow!("publish channel closed"))?; } let _ = child.kill(); let _ = child.wait(); let _ = writer.join(); Ok(()) }); while let Some(pending) = rx.recv().await { match pending.kind { PendingKind::Init => { let Some(data) = pending.data else { continue; }; let key_id = format!("{}/init", track.name); let object = build_object( pending.chunk, data, pending.hash, None, network_secret.as_deref(), None, "video/mp4", &key_id, )?; tracing::info!( "publish init bytes={} track={}", object.data.len(), args.init_track ); publish_set.publish_object(&args.init_track, GroupId(0), object)?; } PendingKind::Segment => { epoch_buffer.push(pending.chunk, pending.data, pending.hash); if epoch_buffer.is_full() { flush_epoch_publish( &mut publish_set, &track_name, &args.manifest_track, publish_chunks, args.publish_manifests, &mut epoch_buffer, &stream_id_value, args.chunk_ms, &encoder_profile_id, &source_kind, network_secret.as_deref(), segment_content_type, &track.name, &mut object_sequence, &mut manifest_sequence, announce_tx.as_ref(), )?; } } } } // Ensure chunking completed successfully. chunk_task .await .map_err(|err| anyhow!("chunk task join error: {err}"))??; flush_epoch_publish( &mut publish_set, &track_name, &args.manifest_track, publish_chunks, args.publish_manifests, &mut epoch_buffer, &stream_id_value, args.chunk_ms, &encoder_profile_id, &source_kind, network_secret.as_deref(), segment_content_type, &track.name, &mut object_sequence, &mut manifest_sequence, announce_tx.as_ref(), )?; Ok(()) } async fn moq_subscribe(args: MoqSubscribeArgs) -> Result<()> { if args.require_manifest && !args.subscribe_manifests { return Err(anyhow!("--require-manifest requires --subscribe-manifests")); } let secret = parse_iroh_secret(args.iroh_secret)?; let discovery = parse_discovery(args.discovery.as_deref())?; let node = MoqNode::bind_with_discovery(secret, discovery).await?; let remote = ec_iroh::parse_endpoint_addr(&args.remote)?; let remote_manifests = if let Some(value) = args.remote_manifests.as_deref() { ec_iroh::parse_endpoint_addr(value)? } else { remote.clone() }; let mut stream = node .subscribe_objects(remote.clone(), &args.broadcast_name, &args.track_name) .await?; let manifest_allowlist = parse_manifest_allowlist(args.manifest_signers.as_deref()); let manifest_store = if args.subscribe_manifests || args.require_manifest { let mut manifest_stream = node .subscribe_manifests(remote_manifests, &args.broadcast_name, &args.manifest_track) .await?; let store = Arc::new(RwLock::new(HashMap::new())); let store_clone = Arc::clone(&store); let allowlist = manifest_allowlist.clone(); tokio::spawn(async move { while let Some(manifest) = manifest_stream.recv().await { if !validate_manifest(&manifest, allowlist.as_ref()) { tracing::warn!("rejected manifest {}", manifest.manifest_id); continue; } let manifest_id = manifest.manifest_id.clone(); store_clone.write().await.insert(manifest_id, manifest); } }); Some(store) } else { None }; let network_secret = parse_network_secret(args.network_secret)?; let mut hls = HlsWriter::new_cmaf(&args.output_dir, args.chunk_ms as f64 / 1000.0, args.window)?; let needs_init = args.subscribe_init; let mut init_ready = !needs_init; let mut buffered_segments: Vec<(u64, f64, Vec)> = Vec::new(); let mut init_rx = if needs_init { let (tx, rx) = tokio::sync::oneshot::channel::>>(); let mut init_stream = node .subscribe_objects(remote.clone(), &args.broadcast_name, &args.init_track) .await?; let remote_str = args.remote.clone(); let init_track = args.init_track.clone(); let secret = network_secret.clone(); tokio::spawn(async move { let deadline = Instant::now() + Duration::from_secs(60); while Instant::now() < deadline { let remaining = deadline.saturating_duration_since(Instant::now()); let recv = tokio::time::timeout(remaining.min(Duration::from_secs(2)), init_stream.recv()) .await; let Ok(Some(object)) = recv else { continue }; let init_index = object .meta .timing .as_ref() .map(|t| t.chunk_index) .unwrap_or(0); let data = if let Some(enc) = &object.meta.encryption { if enc.alg != ENCRYPTION_ALG { tracing::warn!("init: unsupported encryption {}", enc.alg); continue; } tracing::info!( "init: received encrypted object bytes={} key_id={} chunk_index={}", object.data.len(), enc.key_id, init_index ); match decrypt_stream_data( &enc.key_id, init_index, &object.data, secret.as_deref(), ) { Some(plaintext) => plaintext, None => { tracing::warn!( "init: decryption failed key_id={} chunk_index={}", enc.key_id, init_index ); continue; } } } else { tracing::info!( "init: received plaintext object bytes={} chunk_index={}", object.data.len(), init_index ); object.data }; let _ = tx.send(Ok(data)); return; } let _ = tx.send(Err(anyhow!( "timed out waiting for CMAF init segment on track '{}' from {}", init_track, remote_str ))); }); Some(rx) } else { None }; let fallback = Duration::from_millis(args.chunk_ms); let mut fallback_index = 0u64; let mut invalid_chunks = 0u32; let mut written_chunks = 0u64; let mut quota = args.max_bytes_per_sec.map(|rate| { let burst = args.max_bytes_burst.unwrap_or(rate.saturating_mul(2)); ec_iroh::TokenBucket::new(burst, rate) }); loop { tokio::select! { biased; init_res = async { if let Some(rx) = init_rx.as_mut() { Some(rx.await) } else { None } }, if init_rx.is_some() => { let Some(init_res) = init_res else { continue }; let init = match init_res { Ok(inner) => inner?, Err(_) => return Err(anyhow!("init receiver task cancelled")), }; if args.raw_cmaf { fs::create_dir_all(&args.output_dir)?; fs::write(args.output_dir.join("init.mp4"), &init)?; } else { let _ = hls.write_init_segment(&init)?; } init_ready = true; init_rx = None; // Flush any segments we buffered while waiting for init. buffered_segments.sort_by_key(|(idx, _, _)| *idx); for (idx, dur, bytes) in buffered_segments.drain(..) { if args.raw_cmaf { fs::create_dir_all(&args.output_dir)?; fs::write(args.output_dir.join(format!("segment_{idx:06}.m4s")), &bytes)?; } else { let _ = hls.write_segment(idx, dur, &bytes)?; } written_chunks += 1; if let Some(limit) = args.stop_after { if written_chunks >= limit { return Ok(()); } } } continue; } object = stream.recv() => { let Some(object) = object else { break }; if let Some(bucket) = quota.as_mut() { if !bucket.allow(object.data.len() as u64) { tracing::warn!("quota exceeded; dropping chunk"); continue; } } let stream_id_for_manifest = object .meta .encryption .as_ref() .map(|enc| strip_init_suffix(enc.key_id.as_str()).to_string()); let index = object .meta .timing .as_ref() .map(|t| t.chunk_index) .unwrap_or_else(|| { let current = fallback_index; fallback_index += 1; current }); let stream_id = args .stream_id .as_deref() .or_else(|| object.meta.encryption.as_ref().map(|enc| enc.key_id.as_str())); let data = if let Some(enc) = &object.meta.encryption { if enc.alg != ENCRYPTION_ALG { tracing::warn!("unsupported encryption {}", enc.alg); continue; } let Some(stream_id) = stream_id else { tracing::warn!("missing stream id for decryption"); continue; }; match decrypt_stream_data(stream_id, index, &object.data, network_secret.as_deref()) { Some(plaintext) => plaintext, None => { tracing::warn!("decryption failed for chunk {}", index); continue; } } } else { object.data }; if let Some(store) = manifest_store.as_ref() { let manifest = { let store = store.read().await; if let Some(manifest_id) = object.meta.manifest_id.as_ref() { store.get(manifest_id).cloned() } else { if let Some(stream_id) = stream_id_for_manifest.as_deref() { find_manifest_for_stream_index(&store, stream_id, index) } else { None } } }; if let Some(manifest) = manifest { let expected = stream_id_for_manifest .as_deref() .and_then(|sid| manifest_hash_for_chunk(&manifest, sid, index)); if let Some(expected) = expected { if let Some(meta_hash) = object.meta.chunk_hash.as_ref() { if expected != *meta_hash { tracing::warn!( "manifest mismatch for chunk {} (expected {}, got {})", index, expected, meta_hash ); invalid_chunks += 1; if invalid_chunks > args.max_invalid_chunks { tracing::warn!("too many invalid chunks; closing"); break; } continue; } } else if args.require_manifest { tracing::warn!("missing chunk hash for manifest"); invalid_chunks += 1; if invalid_chunks > args.max_invalid_chunks { tracing::warn!("too many invalid chunks; closing"); break; } continue; } } else { // If the manifest doesn't include hash lists, fall back to Merkle proof. if let (Some(meta_hash), Some(proof)) = (object.meta.chunk_hash.as_ref(), object.meta.chunk_proof.as_ref()) { let offset = (index - manifest.body.chunk_start_index) as usize; if !verify_merkle_proof( meta_hash, offset, proof, &manifest.body.merkle_root, ) { tracing::warn!("chunk {} proof invalid for manifest", index); invalid_chunks += 1; if invalid_chunks > args.max_invalid_chunks { tracing::warn!("too many invalid chunks; closing"); break; } continue; } } else if args.require_manifest { tracing::warn!("chunk {} outside manifest", index); invalid_chunks += 1; if invalid_chunks > args.max_invalid_chunks { tracing::warn!("too many invalid chunks; closing"); break; } continue; } } } else if args.require_manifest { tracing::warn!("missing manifest covering chunk {}", index); continue; } } if let Some(expected) = object.meta.chunk_hash.as_ref() { let actual = blake3::hash(&data).to_hex().to_string(); if &actual != expected { tracing::warn!( "chunk {} hash mismatch (expected {}, got {})", index, expected, actual ); invalid_chunks += 1; if invalid_chunks > args.max_invalid_chunks { tracing::warn!("too many invalid chunks; closing"); break; } continue; } } let duration = chunk_duration_secs(&object.meta, fallback); if !init_ready { // Keep draining the MoQ track to avoid flow-control stalls while init is pending. buffered_segments.push((index, duration, data)); } else { if args.raw_cmaf { fs::create_dir_all(&args.output_dir)?; fs::write(args.output_dir.join(format!("segment_{index:06}.m4s")), &data)?; } else { let _ = hls.write_segment(index, duration, &data)?; } written_chunks += 1; } if let Some(limit) = args.stop_after { if written_chunks >= limit { break; } } } } } if needs_init && !init_ready { return Err(anyhow!( "stream ended before receiving CMAF init segment on track '{}' from {}", args.init_track, args.remote )); } Ok(()) } async fn moq_selftest(args: MoqSelftestArgs) -> Result<()> { let discovery = parse_discovery(args.discovery.as_deref())?; let publisher_node = MoqNode::bind_with_discovery(None, discovery).await?; let subscriber_node = MoqNode::bind_with_discovery(None, discovery).await?; publisher_node.endpoint().online().await; subscriber_node.endpoint().online().await; let stream_id = args.stream_id.unwrap_or_else(|| { let ts = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis(); format!("selftest-{ts}") }); let broadcast_name = stream_id.clone(); let track_name = args.track_name.clone(); let mut publisher = publisher_node .publish_objects(&broadcast_name, &track_name) .await?; let mut endpoint_addr = publisher_node.endpoint().addr(); if endpoint_addr.addrs.is_empty() { let mut watcher = publisher_node.endpoint().watch_addr(); let start = Instant::now(); while endpoint_addr.addrs.is_empty() && start.elapsed() < Duration::from_secs(3) { tokio::time::sleep(Duration::from_millis(200)).await; endpoint_addr = watcher.get(); } } if endpoint_addr.addrs.is_empty() { tracing::warn!( "publisher endpoint has no direct addrs; selftest may rely on discovery/relays" ); } let mut stream = subscriber_node .subscribe_objects(endpoint_addr, &broadcast_name, &track_name) .await?; let expected: Arc>> = Arc::new(Mutex::new(BTreeMap::new())); let (done_tx, mut done_rx) = tokio::sync::watch::channel(0usize); let (progress_tx, mut progress_rx) = tokio::sync::mpsc::channel::(8); let subscriber_task = tokio::spawn(async move { let mut received: BTreeMap = BTreeMap::new(); loop { tokio::select! { item = stream.recv() => { match item { Some(object) => { let index = object .meta .timing .as_ref() .map(|timing| timing.chunk_index) .unwrap_or(0); let hash = blake3::hash(&object.data); received.insert(index, hash); let _ = progress_tx.send(index).await; let expected_total = *done_rx.borrow(); if expected_total > 0 && received.len() >= expected_total { break; } } None => break, } } _ = done_rx.changed() => { let expected_total = *done_rx.borrow(); if expected_total > 0 && received.len() >= expected_total { break; } } } } Ok::<_, anyhow::Error>(received) }); let chunk_dir = args.chunk_dir.clone(); let input = args.input.clone(); let max_chunks = args.max_chunks; let track = TrackName { namespace: "every.channel".to_string(), name: stream_id.clone(), }; let objects = tokio::task::spawn_blocking(move || -> Result> { let _ = fs::remove_dir_all(&chunk_dir); fs::create_dir_all(&chunk_dir) .with_context(|| format!("failed to create {}", chunk_dir.display()))?; let reader: Box = if input.starts_with("http://") || input.starts_with("https://") { Box::new(ec_hdhomerun::open_stream_url(&input, None)?) } else { Box::new(File::open(&input).with_context(|| format!("failed to open {}", input))?) }; let mut objects = Vec::new(); let out_dir = chunk_dir.join("cmaf"); let (init_path, segments) = chunk_stream_cmaf_ffmpeg(reader, &out_dir, args.chunk_ms, max_chunks, true)?; // Index 0: init.mp4 let (init_bytes, init_hash) = read_chunk_bytes_and_hash(&init_path)?; let init_chunk = TsChunk { index: 0, path: init_path, timing: ec_chopper::ChunkTiming { chunk_index: 0, chunk_start_27mhz: None, chunk_duration_27mhz: 0, utc_start_unix: None, sync_status: "init".to_string(), }, }; let object = build_object( init_chunk, init_bytes, init_hash, None, None, None, "video/mp4", &track.name, )?; let hash = blake3::hash(&object.data); objects.push((0, object, hash)); // Index 1..N: segments for (i, seg_path) in segments.into_iter().enumerate() { let index = (i as u64) + 1; let (bytes, hash_hex) = read_chunk_bytes_and_hash(&seg_path)?; let chunk = TsChunk { index, path: seg_path, timing: ec_chopper::ChunkTiming { chunk_index: index, chunk_start_27mhz: None, chunk_duration_27mhz: args.chunk_ms * 27_000, utc_start_unix: None, sync_status: "cmaf".to_string(), }, }; let object = build_object( chunk, bytes, hash_hex, None, None, None, "video/iso.segment", &track.name, )?; let hash = blake3::hash(&object.data); objects.push((index, object, hash)); } Ok(objects) }) .await .map_err(|err| anyhow!("chunking task failed: {err}"))??; { let mut map = expected.lock().expect("mutex poisoned"); for (index, _, hash) in &objects { map.insert(*index, *hash); } let _ = done_tx.send(map.len()); } tokio::time::sleep(Duration::from_millis(250)).await; for (index, object, _) in objects { publisher.publish_object(GroupId(index), object)?; match tokio::time::timeout(Duration::from_secs(2), progress_rx.recv()).await { Ok(Some(received)) if received == index => {} Ok(Some(received)) => { tracing::warn!("selftest received chunk {received} while waiting for {index}"); } Ok(None) => { return Err(anyhow!("selftest subscriber closed before chunk {index}")); } Err(_) => { return Err(anyhow!("selftest timed out waiting for chunk {index}")); } } } let received = tokio::time::timeout(Duration::from_secs(20), subscriber_task) .await .map_err(|_| anyhow!("selftest timed out waiting for subscriber"))? .map_err(|err| anyhow!("subscriber task failed: {err}"))??; let expected_map = expected.lock().expect("mutex poisoned"); let mut mismatches = 0usize; for (index, expected_hash) in expected_map.iter() { match received.get(index) { Some(actual) if actual == expected_hash => {} Some(_) => { mismatches += 1; tracing::warn!("hash mismatch at chunk {index}"); } None => { mismatches += 1; tracing::warn!("missing chunk {index}"); } } } if received.len() > expected_map.len() { mismatches += received.len() - expected_map.len(); tracing::warn!( "received extra chunks ({})", received.len() - expected_map.len() ); } if mismatches > 0 { return Err(anyhow!("moq selftest failed with {mismatches} mismatches")); } println!( "moq selftest ok: {} chunks verified (broadcast {}, track {})", expected_map.len(), broadcast_name, track_name ); Ok(()) } #[derive(serde::Deserialize)] struct TurnResp { ice_servers: Vec, } async fn fetch_turn_ice_servers(client: &reqwest::Client, dir: &str) -> Option> { let base = dir.trim_end_matches('/'); let url = format!("{base}/api/turn"); let res = client.get(url).send().await.ok()?; if !res.status().is_success() { return None; } let body: TurnResp = res.json().await.ok()?; if body.ice_servers.is_empty() { return None; } Some(body.ice_servers) } async fn direct_publish(args: DirectPublishArgs) -> Result<()> { fs::create_dir_all(&args.chunk_dir) .with_context(|| format!("failed to create {}", args.chunk_dir.display()))?; // For browser interop, we currently always normalize into deterministic H.264/AAC CMAF. // This also keeps the codec stable for MSE playback. let deterministic = true; let source: Box = match args.source { IngestSource::Hls { url, .. } => Box::new(HlsSource { url, mode: HlsMode::Transcode, }), IngestSource::Hdhr { host, device_id, channel, name, prefer_mdns, } => Box::new(HdhrSource { host, device_id, channel, name, prefer_mdns, }), IngestSource::LinuxDvb { adapter, dvr, tune_cmd, tune_wait_ms, } => Box::new(LinuxDvbSource { adapter, dvr, tune_cmd, tune_wait_ms, }), IngestSource::Ts { input } => Box::new(TsSource { input }), }; fn normalize_base(url: &str) -> String { url.trim_end_matches('/').to_string() } let directory_url = args.directory_url.as_deref().map(normalize_base); let client = reqwest::Client::builder() .timeout(Duration::from_secs(8)) .build() .with_context(|| "failed to build http client")?; let mut cfg = PeerConfiguration::default(); if let Some(dir) = directory_url.as_deref() { if let Some(ice) = fetch_turn_ice_servers(&client, dir).await { cfg.ice_servers = ice; } } let stream_id = args.stream_id.clone().unwrap_or_else(|| { let ts = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis(); format!("every.channel/direct/{ts}") }); #[derive(serde::Serialize)] struct AnnounceReq<'a> { stream_id: &'a str, title: &'a str, offer: &'a str, expires_ms: u64, } #[derive(serde::Deserialize)] struct AnswerResp { answer: String, } let title = args.title.clone(); let out_dir = args.chunk_dir.join("cmaf"); let chunk_ms = args.chunk_ms; let max_segments = args.max_segments; let answer_override = args.answer.clone(); loop { // New offerer per session so viewers can reconnect (or a new viewer can take over). let offerer = PeerConnectionBuilder::new() .set_config(cfg.clone()) .with_channel_options(vec![( "simple_channel_".to_string(), DataChannelOptions { ordered: Some(true), ..Default::default() }, )]) .map_err(|e| anyhow!("{e:#}"))? .build() .await .map_err(|e| anyhow!("{e:#}"))?; let offer_desc = offerer .get_local_description() .await .ok_or_else(|| anyhow!("missing local offer description"))?; let offer_candidates = offerer .collect_ice_candidates() .await .map_err(|e| anyhow!("{e:#}"))?; let offer_link = encode_direct_link(&DirectCodeV1 { v: 1, desc: offer_desc, candidates: offer_candidates, label: Some("every.channel0".to_string()), })?; println!("{offer_link}"); let stop_refresh = tokio::sync::watch::channel(false); if let Some(dir) = directory_url.as_deref() { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64; let req = AnnounceReq { stream_id: &stream_id, title: &title, offer: &offer_link, expires_ms: now.saturating_add(args.announce_ttl_ms), }; let url = format!("{dir}/api/announce"); let res = client.post(url).json(&req).send().await?; if !res.status().is_success() { tracing::warn!("directory announce failed: {}", res.status()); } else { tracing::info!("announced offer for {}", stream_id); } // Best-effort refresh while waiting for an answer (stops when the session starts). let mut stop_rx = stop_refresh.1.clone(); let client2 = client.clone(); let dir2 = dir.to_string(); let stream_id2 = stream_id.clone(); let title2 = title.clone(); let offer2 = offer_link.clone(); let ttl = args.announce_ttl_ms; tokio::spawn(async move { loop { if *stop_rx.borrow() { break; } tokio::time::sleep(Duration::from_millis(ttl.saturating_mul(3) / 4)).await; if *stop_rx.borrow() { break; } let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64; let req = AnnounceReq { stream_id: &stream_id2, title: &title2, offer: &offer2, expires_ms: now.saturating_add(ttl), }; let url = format!("{dir2}/api/announce"); let fut = client2.post(url).json(&req).send(); let _ = tokio::time::timeout(Duration::from_secs(5), fut).await; } }); } let answer = if let Some(answer) = answer_override.clone() { answer } else if let Some(dir) = directory_url.as_deref() { eprintln!("waiting for browser answer via {dir}/api/answer?stream_id=..."); let url = format!( "{dir}/api/answer?stream_id={}", urlencoding::encode(&stream_id) ); let deadline = if args.answer_timeout_secs == 0 { None } else { Some(Instant::now() + Duration::from_secs(args.answer_timeout_secs)) }; loop { if deadline.is_some_and(|d| Instant::now() > d) { return Err(anyhow!( "timed out waiting for answer for stream_id {stream_id}" )); } match client.get(&url).send().await { Ok(res) if res.status().is_success() => { let body: AnswerResp = res.json().await?; break body.answer; } _ => { tokio::time::sleep(Duration::from_millis(300)).await; } } } } else { eprintln!("paste direct answer link/code, then press enter:"); let mut line = String::new(); std::io::stdin().read_line(&mut line)?; line }; // Stop refreshing announcements. If we keep the listing live while connected, people // will try to join and silently fail (this direct path is 1:1 today). let _ = stop_refresh.0.send(true); let answer = decode_direct_link(&answer)?; offerer .set_remote_description(answer.desc) .await .map_err(|e| anyhow!("{e:#}"))?; offerer .add_ice_candidates(answer.candidates) .await .map_err(|e| anyhow!("{e:#}"))?; let channel = offerer .receive_channel() .await .map_err(|e| anyhow!("{e:#}"))?; channel.wait_ready().await; eprintln!("direct channel open: {}", channel.label()); // Detect viewer disconnect promptly. Relying only on `channel.send` errors can lag // (depending on buffering), leaving the directory entry effectively "stuck". let (pc_dead_tx, mut pc_dead_rx) = tokio::sync::oneshot::channel::(); let (pc_stop_tx, mut pc_stop_rx) = tokio::sync::watch::channel(false); tokio::spawn(async move { loop { tokio::select! { _ = pc_stop_rx.changed() => { if *pc_stop_rx.borrow() { break; } } st = offerer.state_change() => { if matches!( st, PeerConnectionState::Disconnected | PeerConnectionState::Failed | PeerConnectionState::Closed ) { let _ = pc_dead_tx.send(st); break; } } } } }); // Expire the listing quickly now that we have a live session. if let Some(dir) = directory_url.as_deref() { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64; let req = AnnounceReq { stream_id: &stream_id, title: &title, offer: &offer_link, expires_ms: now.saturating_add(1500), }; let url = format!("{dir}/api/announce"); let _ = client.post(url).json(&req).send().await; } enum ChunkItem { Init(Vec), Segment { index: u64, bytes: Vec }, } let (tx, mut rx) = mpsc::channel::(8); let reader = source.open_stream()?; let out_dir2 = out_dir.clone(); let chunk_task = tokio::task::spawn_blocking(move || -> Result<()> { let _ = fs::remove_dir_all(&out_dir2); fs::create_dir_all(&out_dir2) .with_context(|| format!("failed to create {}", out_dir2.display()))?; let profile = if deterministic { ec_chopper::deterministic_h264_profile() } else { ec_chopper::deterministic_h264_profile() }; let mut cmd = Command::new("ffmpeg"); cmd.current_dir(&out_dir2); cmd.arg("-hide_banner") .arg("-loglevel") .arg("error") .arg("-nostdin") .arg("-y") .arg("-i") .arg("pipe:0") // predictable mapping .arg("-map") .arg("0:v:0") .arg("-map") .arg("0:a:0?") .arg("-sn") .arg("-dn") .arg("-map_metadata") .arg("-1") // reduce scheduling nondeterminism .arg("-filter_threads") .arg("1") .arg("-filter_complex_threads") .arg("1") .arg("-threads") .arg("1"); for arg in ec_chopper::ffmpeg_profile_args(&profile) { cmd.arg(arg); } let seg_time = format!("{:.3}", chunk_ms as f64 / 1000.0); cmd.arg("-f") .arg("hls") .arg("-hls_time") .arg(seg_time) .arg("-hls_list_size") .arg("0") .arg("-hls_segment_type") .arg("fmp4") .arg("-hls_flags") .arg("independent_segments") .arg("-hls_fmp4_init_filename") .arg("init.mp4") .arg("-hls_segment_filename") .arg("segment_%06d.m4s") .arg("index.m3u8") .stdin(Stdio::piped()) .stdout(Stdio::null()) .stderr(Stdio::inherit()); let mut child = cmd.spawn().with_context(|| "failed to spawn ffmpeg")?; let mut stdin = child .stdin .take() .ok_or_else(|| anyhow!("ffmpeg stdin unavailable"))?; let mut reader = reader; let writer = std::thread::spawn(move || -> Result<()> { std::io::copy(&mut reader, &mut stdin)?; Ok(()) }); let init_path = out_dir2.join("init.mp4"); wait_for_stable_file(&init_path, Duration::from_secs(20))?; let init = fs::read(&init_path)?; tx.blocking_send(ChunkItem::Init(init)) .map_err(|_| anyhow!("receiver closed"))?; for i in 0..max_segments { let seg_path = out_dir2.join(format!("segment_{i:06}.m4s")); match wait_for_stable_file(&seg_path, Duration::from_secs(30)) { Ok(()) => {} Err(err) => { if let Ok(Some(status)) = child.try_wait() { if status.success() { break; } return Err(anyhow!("ffmpeg exited with {status} ({err:#})")); } return Err(err); } } let bytes = fs::read(&seg_path)?; tx.blocking_send(ChunkItem::Segment { index: (i as u64) + 1, bytes, }) .map_err(|_| anyhow!("receiver closed"))?; } let _ = child.kill(); let _ = child.wait(); let _ = writer.join(); Ok(()) }); let mut send_failed = false; let mut have_heartbeat = false; let mut last_heartbeat = Instant::now(); let mut heartbeat_check = tokio::time::interval(Duration::from_secs(1)); heartbeat_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { tokio::select! { biased; st = (&mut pc_dead_rx) => { match st { Ok(st) => tracing::info!("peer connection ended ({st:?}); restarting session"), Err(_) => tracing::info!("peer connection ended; restarting session"), } send_failed = true; break; } _ = heartbeat_check.tick() => { if have_heartbeat && last_heartbeat.elapsed() > DIRECT_HEARTBEAT_TIMEOUT { tracing::info!("direct session heartbeat timed out; restarting session"); send_failed = true; break; } } msg = channel.receive() => { match msg { Ok(b) => { // Any message from the subscriber counts as a heartbeat today. // (We reserve DIRECT_WIRE_TAG_PING for explicit pings, but don't require it.) have_heartbeat = true; last_heartbeat = Instant::now(); if !b.is_empty() && b[0] == DIRECT_WIRE_TAG_PING { // ignore } } Err(_) => { send_failed = true; break; } } } item = rx.recv() => { let Some(item) = item else { break }; let (index, content_type, bytes, duration_27mhz, sync_status) = match item { ChunkItem::Init(bytes) => (0u64, "video/mp4", bytes, 0u64, "init"), ChunkItem::Segment { index, bytes } => { (index, "video/iso.segment", bytes, chunk_ms * 27_000, "cmaf") } }; if index == 0 { tracing::info!("direct send: init.mp4 ({} bytes)", bytes.len()); } else if index % 5 == 0 { tracing::info!("direct send: segment {index} ({} bytes)", bytes.len()); } let hash = blake3::hash(&bytes).to_hex().to_string(); let meta = ObjectMeta { created_unix_ms: SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64, content_type: content_type.to_string(), size_bytes: bytes.len() as u64, timing: Some(TimingMeta { chunk_index: index, chunk_start_27mhz: 0, chunk_duration_27mhz: duration_27mhz, utc_start_unix: None, sync_status: sync_status.to_string(), }), encryption: None, chunk_hash: Some(hash), chunk_hash_alg: Some("blake3".to_string()), chunk_proof: None, chunk_proof_alg: None, manifest_id: None, }; let frame = encode_object_frame(&meta, &bytes)?; match tokio::time::timeout( Duration::from_secs(5), direct_wire_send_frame(&channel, &frame), ) .await { Ok(Ok(())) => {} Ok(Err(err)) => { tracing::warn!("direct send failed (restarting session): {err:#}"); send_failed = true; break; } Err(_) => { tracing::warn!("direct send timed out (restarting session)"); send_failed = true; break; } } } } } // Ensure the peer connection is dropped if we restart for non-ICE reasons (e.g. ffmpeg end). let _ = pc_stop_tx.send(true); drop(rx); match chunk_task.await { Ok(Ok(())) => {} Ok(Err(err)) => { // Common when the viewer disconnects: the receiver is dropped and the // blocking sender errors out. Treat as a reconnect. tracing::debug!("chunk task ended: {err:#}"); } Err(err) => tracing::debug!("chunk task join error: {err}"), } if answer_override.is_some() { // For manual mode, we can't reasonably loop. break; } if !send_failed { // ffmpeg ended or source ended; try again. tokio::time::sleep(Duration::from_millis(500)).await; } } Ok(()) } async fn direct_wire_send_frame(channel: &impl DataChannelExt, frame: &[u8]) -> Result<()> { let len = u32::try_from(frame.len()).map_err(|_| anyhow!("frame too large"))?; let mut stream = Vec::with_capacity(4 + frame.len()); stream.extend_from_slice(&len.to_be_bytes()); stream.extend_from_slice(frame); for chunk in stream.chunks(DIRECT_WIRE_CHUNK_BYTES) { let mut msg = Vec::with_capacity(1 + chunk.len()); msg.push(DIRECT_WIRE_TAG_STREAM); msg.extend_from_slice(chunk); channel .send(&bytes::Bytes::from(msg)) .await .map_err(|e| anyhow!("{e:#}"))?; } Ok(()) } #[derive(Debug, Default)] struct DirectWireDecoder { buf: Vec, pos: usize, want: Option, } impl DirectWireDecoder { fn push(&mut self, msg: &[u8]) -> Result>> { if msg.is_empty() { return Ok(Vec::new()); } match msg[0] { DIRECT_WIRE_TAG_FRAME => return Ok(vec![msg[1..].to_vec()]), DIRECT_WIRE_TAG_STREAM => { self.buf.extend_from_slice(&msg[1..]); } DIRECT_WIRE_TAG_PING => { // Control message; ignore. return Ok(Vec::new()); } _ => { // Unknown tag: treat as legacy "whole frame per message". return Ok(vec![msg.to_vec()]); } } let mut out = Vec::new(); loop { if self.want.is_none() { if self.buf.len().saturating_sub(self.pos) < 4 { break; } let start = self.pos; let meta = &self.buf[start..start + 4]; let len = u32::from_be_bytes([meta[0], meta[1], meta[2], meta[3]]) as usize; self.pos += 4; self.want = Some(len); } let Some(want) = self.want else { break }; if self.buf.len().saturating_sub(self.pos) < want { break; } let start = self.pos; let end = start + want; out.push(self.buf[start..end].to_vec()); self.pos = end; self.want = None; // Avoid unbounded growth: occasionally compact the buffer. if self.pos > 64 * 1024 { self.buf.drain(0..self.pos); self.pos = 0; } } Ok(out) } } async fn direct_subscribe(args: DirectSubscribeArgs) -> Result<()> { fs::create_dir_all(&args.out_dir) .with_context(|| format!("failed to create {}", args.out_dir.display()))?; #[derive(serde::Deserialize)] struct DirectoryResp { entries: Vec, } #[derive(serde::Deserialize)] struct DirectoryEntry { stream_id: String, title: String, offer: String, #[allow(dead_code)] expires_ms: Option, } fn normalize_base(url: &str) -> String { url.trim_end_matches('/').to_string() } let dir = normalize_base(&args.directory_url); let client = reqwest::Client::builder() .timeout(Duration::from_secs(8)) .build() .with_context(|| "failed to build http client")?; let (stream_id, offer_link) = match (args.stream_id.clone(), args.offer.clone()) { (_, Some(offer)) => (args.stream_id.clone(), offer), (Some(stream_id), None) => { let url = format!("{dir}/api/directory"); let res = client.get(&url).send().await?; if !res.status().is_success() { return Err(anyhow!("directory GET failed: {}", res.status())); } let body: DirectoryResp = res.json().await?; let entry = body .entries .into_iter() .find(|e| e.stream_id == stream_id) .ok_or_else(|| anyhow!("stream_id not found in directory: {stream_id}"))?; (Some(stream_id), entry.offer) } (None, None) => { return Err(anyhow!("either --offer or --stream-id is required")); } }; let offer = decode_direct_link(&offer_link)?; let mut cfg = PeerConfiguration::default(); if let Some(ice) = fetch_turn_ice_servers(&client, &dir).await { cfg.ice_servers = ice; } let pc = PeerConnectionBuilder::new() .set_config(cfg) .with_remote_offer(Some(offer.desc)) .map_err(|e| anyhow!("{e:#}"))? .build() .await .map_err(|e| anyhow!("{e:#}"))?; pc.add_ice_candidates(offer.candidates) .await .map_err(|e| anyhow!("{e:#}"))?; // Build our answer link (used both for directory POST and manual copy/paste). let desc = pc .get_local_description() .await .ok_or_else(|| anyhow!("missing local answer description"))?; let candidates = pc .collect_ice_candidates() .await .map_err(|e| anyhow!("{e:#}"))?; let answer_link = encode_direct_link(&DirectCodeV1 { v: 1, desc, candidates, label: Some("every.channel0".to_string()), })?; if let Some(stream_id) = stream_id.as_deref() { #[derive(serde::Serialize)] struct AnswerReq<'a> { stream_id: &'a str, answer: &'a str, } let url = format!("{dir}/api/answer"); let res = client .post(url) .json(&AnswerReq { stream_id, answer: &answer_link, }) .send() .await?; if !res.status().is_success() { return Err(anyhow!( "directory POST /api/answer failed: {}", res.status() )); } } else { eprintln!("answer link (paste into publisher):\n{answer_link}"); } let ch = pc.receive_channel().await.map_err(|e| anyhow!("{e:#}"))?; ch.wait_ready().await; eprintln!("direct channel open: {}", ch.label()); let cmaf_dir = args.out_dir.join("cmaf"); fs::create_dir_all(&cmaf_dir) .with_context(|| format!("failed to create {}", cmaf_dir.display()))?; let mut init_written = false; let mut durations = Vec::::new(); let mut captured_segments = 0usize; let mut decoder = DirectWireDecoder::default(); let mut ping = tokio::time::interval(Duration::from_secs(1)); ping.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); let deadline = args .duration_secs .map(|s| Instant::now() + Duration::from_secs(s)); while captured_segments < args.max_segments { if deadline.is_some_and(|d| Instant::now() > d) { break; } let msg = tokio::select! { _ = ping.tick() => { // Heartbeat to help the publisher detect disconnects promptly. let _ = ch.send(&bytes::Bytes::from(vec![DIRECT_WIRE_TAG_PING])).await; continue; } msg = ch.receive() => msg.map_err(|e| anyhow!("{e:#}"))?, }; for frame in decoder.push(&msg)? { let payload = decode_object_frame(&frame)?; let idx = payload .meta .timing .as_ref() .map(|t| t.chunk_index) .unwrap_or(0); if idx == 0 { tracing::info!("direct recv: init.mp4 ({} bytes)", payload.data.len()); let path = cmaf_dir.join("init.mp4"); fs::write(&path, &payload.data) .with_context(|| format!("failed to write {}", path.display()))?; init_written = true; continue; } if !init_written { // Ignore segments until init arrives. continue; } // Publisher indexes segments starting at 1; our filenames start at 0. let seg = idx.saturating_sub(1); let path = cmaf_dir.join(format!("segment_{seg:06}.m4s")); fs::write(&path, &payload.data) .with_context(|| format!("failed to write {}", path.display()))?; tracing::info!("direct recv: segment {idx} ({} bytes)", payload.data.len()); let dur = payload .meta .timing .as_ref() .map(|t| t.chunk_duration_27mhz as f64 / 27_000_000.0) .unwrap_or(2.0); durations.push(dur); captured_segments += 1; if captured_segments >= args.max_segments { break; } } } if !init_written || durations.is_empty() { return Err(anyhow!("no media captured (missing init or segments)")); } // Write a VOD playlist so ffmpeg can remux the fragments. let target = durations .iter() .copied() .fold(0.0_f64, f64::max) .ceil() .max(1.0) as u64; let mut m3u8 = String::new(); m3u8.push_str("#EXTM3U\n"); m3u8.push_str("#EXT-X-VERSION:7\n"); m3u8.push_str(&format!("#EXT-X-TARGETDURATION:{target}\n")); m3u8.push_str("#EXT-X-PLAYLIST-TYPE:VOD\n"); m3u8.push_str("#EXT-X-INDEPENDENT-SEGMENTS\n"); m3u8.push_str("#EXT-X-MAP:URI=\"init.mp4\"\n"); for (i, dur) in durations.iter().enumerate() { m3u8.push_str(&format!("#EXTINF:{:.3},\n", dur)); m3u8.push_str(&format!("segment_{i:06}.m4s\n")); } m3u8.push_str("#EXT-X-ENDLIST\n"); let playlist_path = cmaf_dir.join("index.m3u8"); fs::write(&playlist_path, m3u8) .with_context(|| format!("failed to write {}", playlist_path.display()))?; let mp4_path = args .mp4 .clone() .unwrap_or_else(|| args.out_dir.join("capture.mp4")); let status = Command::new("ffmpeg") .arg("-hide_banner") .arg("-loglevel") .arg("error") .arg("-nostdin") .arg("-y") .arg("-protocol_whitelist") .arg("file,crypto,data") .arg("-i") .arg(&playlist_path) .arg("-c") .arg("copy") .arg(&mp4_path) .status(); match status { Ok(s) if s.success() => { println!("{}", mp4_path.display()); } Ok(s) => { eprintln!("ffmpeg remux failed: {s}"); println!("{}", playlist_path.display()); } Err(err) => { eprintln!("failed to run ffmpeg: {err}"); println!("{}", playlist_path.display()); } } Ok(()) } fn ws_url_for(base: &str, stream_id: &str, role: &str) -> String { let base = base.trim_end_matches('/'); let ws_base = if let Some(rest) = base.strip_prefix("https://") { format!("wss://{rest}") } else if let Some(rest) = base.strip_prefix("http://") { format!("ws://{rest}") } else if base.starts_with("ws://") || base.starts_with("wss://") { base.to_string() } else { format!("wss://{base}") }; format!( "{ws_base}/api/stream/ws?stream_id={}&role={role}", urlencoding::encode(stream_id) ) } async fn ws_send_frame( ws: &mut tokio_tungstenite::WebSocketStream>, frame: &[u8], ) -> Result<()> { let len = u32::try_from(frame.len()).map_err(|_| anyhow!("frame too large"))?; let mut stream = Vec::with_capacity(4 + frame.len()); stream.extend_from_slice(&len.to_be_bytes()); stream.extend_from_slice(frame); for chunk in stream.chunks(DIRECT_WIRE_CHUNK_BYTES) { let mut msg = Vec::with_capacity(1 + chunk.len()); msg.push(DIRECT_WIRE_TAG_STREAM); msg.extend_from_slice(chunk); ws.send(WsMessage::Binary(msg)).await?; } Ok(()) } async fn ws_publish(args: WsPublishArgs) -> Result<()> { fs::create_dir_all(&args.chunk_dir) .with_context(|| format!("failed to create {}", args.chunk_dir.display()))?; // For browser interop, we currently always normalize into deterministic H.264/AAC CMAF. let deterministic = true; let source: Box = match args.source { IngestSource::Hls { url, .. } => Box::new(HlsSource { url, mode: HlsMode::Transcode, }), IngestSource::Hdhr { host, device_id, channel, name, prefer_mdns, } => Box::new(HdhrSource { host, device_id, channel, name, prefer_mdns, }), IngestSource::LinuxDvb { adapter, dvr, tune_cmd, tune_wait_ms, } => Box::new(LinuxDvbSource { adapter, dvr, tune_cmd, tune_wait_ms, }), IngestSource::Ts { input } => Box::new(TsSource { input }), }; fn normalize_base(url: &str) -> String { url.trim_end_matches('/').to_string() } let directory_url = normalize_base(&args.directory_url); let client = reqwest::Client::builder() .timeout(Duration::from_secs(8)) .build() .with_context(|| "failed to build http client")?; let stream_id = args.stream_id.clone().unwrap_or_else(|| { let ts = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis(); format!("every.channel/ws/{ts}") }); let title = args.title.clone(); #[derive(serde::Serialize)] struct AnnounceReq<'a> { stream_id: &'a str, title: &'a str, offer: &'a str, expires_ms: u64, } // Directory listing is still "offer"-shaped today; keep a non-empty placeholder for legacy clients. let offer_placeholder = format!("every.channel://watch?stream_id={}", stream_id); // Refresh listing forever while publishing (one-to-many relay supports multiple viewers). let ttl = args.announce_ttl_ms; let client2 = client.clone(); let dir2 = directory_url.clone(); let stream_id2 = stream_id.clone(); let title2 = title.clone(); let offer2 = offer_placeholder.clone(); tokio::spawn(async move { loop { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64; let req = AnnounceReq { stream_id: &stream_id2, title: &title2, offer: &offer2, expires_ms: now.saturating_add(ttl), }; let url = format!("{dir2}/api/announce"); let _ = tokio::time::timeout(Duration::from_secs(5), client2.post(url).json(&req).send()).await; tokio::time::sleep(Duration::from_millis(ttl.saturating_mul(3) / 4)).await; } }); // Connect to relay websocket as publisher. let ws_url = ws_url_for(&directory_url, &stream_id, "pub"); eprintln!("ws publish: {stream_id}"); eprintln!("ws url: {ws_url}"); let (mut ws, _resp) = tokio_tungstenite::connect_async(&ws_url).await?; enum ChunkItem { Init(Vec), Segment { index: u64, bytes: Vec }, } let out_dir = args.chunk_dir.join("cmaf"); let chunk_ms = args.chunk_ms; let max_segments = args.max_segments; let (tx, mut rx) = mpsc::channel::(8); let reader = source.open_stream()?; let out_dir2 = out_dir.clone(); let chunk_task = tokio::task::spawn_blocking(move || -> Result<()> { let _ = fs::remove_dir_all(&out_dir2); fs::create_dir_all(&out_dir2) .with_context(|| format!("failed to create {}", out_dir2.display()))?; let profile = if deterministic { ec_chopper::deterministic_h264_profile() } else { ec_chopper::deterministic_h264_profile() }; let mut cmd = Command::new("ffmpeg"); cmd.current_dir(&out_dir2); cmd.arg("-hide_banner") .arg("-loglevel") .arg("error") .arg("-nostdin") .arg("-y") .arg("-i") .arg("pipe:0") // predictable mapping .arg("-map") .arg("0:v:0") .arg("-map") .arg("0:a:0?") .arg("-sn") .arg("-dn") .arg("-map_metadata") .arg("-1") // reduce scheduling nondeterminism .arg("-filter_threads") .arg("1") .arg("-filter_complex_threads") .arg("1") .arg("-threads") .arg("1"); for arg in ec_chopper::ffmpeg_profile_args(&profile) { cmd.arg(arg); } let seg_time = format!("{:.3}", chunk_ms as f64 / 1000.0); cmd.arg("-f") .arg("hls") .arg("-hls_time") .arg(seg_time) .arg("-hls_list_size") .arg("0") .arg("-hls_segment_type") .arg("fmp4") .arg("-hls_flags") .arg("independent_segments") .arg("-hls_fmp4_init_filename") .arg("init.mp4") .arg("-hls_segment_filename") .arg("segment_%06d.m4s") .arg("index.m3u8") .stdin(Stdio::piped()) .stdout(Stdio::null()) .stderr(Stdio::inherit()); let mut child = cmd.spawn().with_context(|| "failed to spawn ffmpeg")?; let mut stdin = child .stdin .take() .ok_or_else(|| anyhow!("ffmpeg stdin unavailable"))?; let mut reader = reader; let writer = std::thread::spawn(move || -> Result<()> { std::io::copy(&mut reader, &mut stdin)?; Ok(()) }); let init_path = out_dir2.join("init.mp4"); wait_for_stable_file(&init_path, Duration::from_secs(20))?; let init = fs::read(&init_path)?; tx.blocking_send(ChunkItem::Init(init)) .map_err(|_| anyhow!("receiver closed"))?; for i in 0..max_segments { let seg_path = out_dir2.join(format!("segment_{i:06}.m4s")); match wait_for_stable_file(&seg_path, Duration::from_secs(30)) { Ok(()) => {} Err(err) => { if let Ok(Some(status)) = child.try_wait() { if status.success() { break; } return Err(anyhow!("ffmpeg exited with {status} ({err:#})")); } return Err(err); } } let bytes = fs::read(&seg_path)?; tx.blocking_send(ChunkItem::Segment { index: (i as u64) + 1, bytes, }) .map_err(|_| anyhow!("receiver closed"))?; } let _ = child.kill(); let _ = child.wait(); let _ = writer.join(); Ok(()) }); while let Some(item) = rx.recv().await { let (index, content_type, bytes, duration_27mhz, sync_status) = match item { ChunkItem::Init(bytes) => (0u64, "video/mp4", bytes, 0u64, "init"), ChunkItem::Segment { index, bytes } => { (index, "video/iso.segment", bytes, chunk_ms * 27_000, "cmaf") } }; if index == 0 { tracing::info!("ws send: init.mp4 ({} bytes)", bytes.len()); } else if index % 10 == 0 { tracing::info!("ws send: segment {index} ({} bytes)", bytes.len()); } let hash = blake3::hash(&bytes).to_hex().to_string(); let meta = ObjectMeta { created_unix_ms: SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64, content_type: content_type.to_string(), size_bytes: bytes.len() as u64, timing: Some(TimingMeta { chunk_index: index, chunk_start_27mhz: 0, chunk_duration_27mhz: duration_27mhz, utc_start_unix: None, sync_status: sync_status.to_string(), }), encryption: None, chunk_hash: Some(hash), chunk_hash_alg: Some("blake3".to_string()), chunk_proof: None, chunk_proof_alg: None, manifest_id: None, }; let frame = encode_object_frame(&meta, &bytes)?; tokio::time::timeout(Duration::from_secs(5), ws_send_frame(&mut ws, &frame)).await??; } match chunk_task.await { Ok(Ok(())) => Ok(()), Ok(Err(err)) => Err(err), Err(err) => Err(anyhow!("chunk task join error: {err}")), } } async fn ws_subscribe(args: WsSubscribeArgs) -> Result<()> { fs::create_dir_all(&args.out_dir) .with_context(|| format!("failed to create {}", args.out_dir.display()))?; let ws_url = ws_url_for(&args.directory_url, &args.stream_id, "sub"); eprintln!("ws subscribe: {} ({ws_url})", args.stream_id); let (mut ws, _resp) = tokio_tungstenite::connect_async(&ws_url).await?; let cmaf_dir = args.out_dir.join("cmaf"); fs::create_dir_all(&cmaf_dir) .with_context(|| format!("failed to create {}", cmaf_dir.display()))?; let mut init_written = false; let mut durations = Vec::::new(); let mut captured_segments = 0usize; let mut decoder = DirectWireDecoder::default(); let deadline = args .duration_secs .map(|s| Instant::now() + Duration::from_secs(s)); while captured_segments < args.max_segments { if deadline.is_some_and(|d| Instant::now() > d) { break; } let next = ws.next().await.ok_or_else(|| anyhow!("websocket closed"))??; let bytes = match next { WsMessage::Binary(b) => b, WsMessage::Close(_) => break, _ => continue, }; for frame in decoder.push(&bytes)? { let payload = decode_object_frame(&frame)?; let idx = payload .meta .timing .as_ref() .map(|t| t.chunk_index) .unwrap_or(0); if idx == 0 { tracing::info!("ws recv: init.mp4 ({} bytes)", payload.data.len()); let path = cmaf_dir.join("init.mp4"); fs::write(&path, &payload.data) .with_context(|| format!("failed to write {}", path.display()))?; init_written = true; continue; } if !init_written { continue; } let seg = idx.saturating_sub(1); let path = cmaf_dir.join(format!("segment_{seg:06}.m4s")); fs::write(&path, &payload.data) .with_context(|| format!("failed to write {}", path.display()))?; if idx % 10 == 0 { tracing::info!("ws recv: segment {idx} ({} bytes)", payload.data.len()); } let dur = payload .meta .timing .as_ref() .map(|t| t.chunk_duration_27mhz as f64 / 27_000_000.0) .unwrap_or(2.0); durations.push(dur); captured_segments += 1; if captured_segments >= args.max_segments { break; } } } if !init_written || durations.is_empty() { return Err(anyhow!("no media captured (missing init or segments)")); } let target = durations .iter() .copied() .fold(0.0_f64, f64::max) .ceil() .max(1.0) as u64; let mut m3u8 = String::new(); m3u8.push_str("#EXTM3U\n"); m3u8.push_str("#EXT-X-VERSION:7\n"); m3u8.push_str(&format!("#EXT-X-TARGETDURATION:{target}\n")); m3u8.push_str("#EXT-X-PLAYLIST-TYPE:VOD\n"); m3u8.push_str("#EXT-X-INDEPENDENT-SEGMENTS\n"); m3u8.push_str("#EXT-X-MAP:URI=\"init.mp4\"\n"); for (i, dur) in durations.iter().enumerate() { m3u8.push_str(&format!("#EXTINF:{:.3},\n", dur)); m3u8.push_str(&format!("segment_{i:06}.m4s\n")); } m3u8.push_str("#EXT-X-ENDLIST\n"); let playlist_path = cmaf_dir.join("index.m3u8"); fs::write(&playlist_path, m3u8) .with_context(|| format!("failed to write {}", playlist_path.display()))?; let mp4_path = args .mp4 .clone() .unwrap_or_else(|| args.out_dir.join("capture.mp4")); let status = Command::new("ffmpeg") .arg("-hide_banner") .arg("-loglevel") .arg("error") .arg("-nostdin") .arg("-y") .arg("-protocol_whitelist") .arg("file,crypto,data") .arg("-i") .arg(&playlist_path) .arg("-c") .arg("copy") .arg(&mp4_path) .status(); match status { Ok(s) if s.success() => { println!("{}", mp4_path.display()); } Ok(s) => { eprintln!("ffmpeg remux failed: {s}"); println!("{}", playlist_path.display()); } Err(err) => { eprintln!("failed to run ffmpeg: {err}"); println!("{}", playlist_path.display()); } } Ok(()) } fn parse_iroh_secret(value: Option) -> Result> { let value = value.or_else(|| std::env::var("IROH_SECRET").ok()); let Some(value) = value else { return Ok(None) }; let secret = iroh::SecretKey::from_str(&value).with_context(|| "failed to parse IROH_SECRET")?; Ok(Some(secret)) } fn parse_discovery(value: Option<&str>) -> Result { match value { Some(value) => DiscoveryConfig::from_list(value), None => DiscoveryConfig::from_env(), } } async fn spawn_catalog_announcer( node: &MoqNode, track: &TrackName, broadcast_name: &str, track_name: &str, peers: Vec, ) -> Result> { let endpoint = serde_json::to_string(&node.endpoint_addr()) .unwrap_or_else(|_| node.endpoint().id().to_string()); let track = track.clone(); let broadcast_name = broadcast_name.to_string(); let track_name = track_name.to_string(); let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::(); let endpoint_clone = endpoint.clone(); let node_endpoint = node.endpoint().clone(); tokio::spawn(async move { let mut gossip = match ec_iroh::CatalogGossip::join(node_endpoint, &peers).await { Ok(gossip) => gossip, Err(err) => { tracing::warn!("catalog gossip join failed: {err:#}"); return; } }; let entry = build_catalog_entry( endpoint_clone.clone(), &track, &broadcast_name, &track_name, None, ); if let Err(err) = gossip.announce(entry).await { tracing::warn!("catalog announce failed: {err:#}"); } while let Some(summary) = rx.recv().await { let entry = build_catalog_entry( endpoint_clone.clone(), &track, &broadcast_name, &track_name, Some(summary), ); if let Err(err) = gossip.announce(entry).await { tracing::warn!("catalog update failed: {err:#}"); } } }); Ok(tx) } fn build_catalog_entry( endpoint: String, track: &TrackName, broadcast_name: &str, track_name: &str, manifest: Option, ) -> StreamCatalogEntry { let stream = StreamDescriptor { id: StreamId(track.name.clone()), title: track.name.clone(), number: None, source: "moq".to_string(), metadata: vec![StreamMetadata { key: "broadcast".to_string(), value: broadcast_name.to_string(), }], }; let encryption = StreamEncryptionInfo { alg: ENCRYPTION_ALG.to_string(), key_id: track.name.clone(), nonce_scheme: "stream-id+chunk-index".to_string(), }; let moq = MoqStreamDescriptor { endpoint, broadcast_name: broadcast_name.to_string(), track_name: track_name.to_string(), encryption: Some(encryption), }; StreamCatalogEntry { stream, moq: Some(moq), manifest, updated_unix_ms: SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64, } } fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> { let start = Instant::now(); let mut last_len: Option = None; let mut stable_ms: u64 = 0; // We want "write is done" behavior. File size staying constant for a few polls // is a pragmatic signal that ffmpeg is done producing the segment. while start.elapsed() < timeout { if let Ok(meta) = fs::metadata(path) { let len = meta.len(); if len > 0 { if Some(len) == last_len { stable_ms += 100; if stable_ms >= 300 { return Ok(()); } } else { last_len = Some(len); stable_ms = 0; } } } std::thread::sleep(Duration::from_millis(100)); } Err(anyhow!( "timed out waiting for stable file {} after {:?}", path.display(), timeout )) } async fn wt_publish(args: WtPublishArgs) -> Result<()> { let relay_url = Url::parse(&args.url).with_context(|| format!("invalid relay url: {}", args.url))?; // Create a local origin + broadcast, then pass an OriginConsumer into the client so it can // publish announcements to the relay. let origin = moq_lite::Origin::produce(); let publish = origin.consume(); let mut broadcast = origin .create_broadcast(&args.name) .ok_or_else(|| anyhow!("failed to create broadcast: {}", args.name))?; // Ensure the catalog track is present in the broadcast so subscribers can discover tracks. let mut catalog = hang::CatalogProducer::default(); broadcast.insert_track(catalog.track.clone()); #[derive(Clone)] struct ProtocolOverride { inner: S, protocol: Option, } impl web_transport_trait::Session for ProtocolOverride { type SendStream = S::SendStream; type RecvStream = S::RecvStream; type Error = S::Error; fn accept_uni( &self, ) -> impl Future> + web_transport_trait::MaybeSend { self.inner.accept_uni() } fn accept_bi( &self, ) -> impl Future> + web_transport_trait::MaybeSend { self.inner.accept_bi() } fn open_bi( &self, ) -> impl Future> + web_transport_trait::MaybeSend { self.inner.open_bi() } fn open_uni( &self, ) -> impl Future> + web_transport_trait::MaybeSend { self.inner.open_uni() } fn send_datagram(&self, payload: bytes::Bytes) -> Result<(), Self::Error> { self.inner.send_datagram(payload) } fn recv_datagram( &self, ) -> impl Future> + web_transport_trait::MaybeSend { self.inner.recv_datagram() } fn max_datagram_size(&self) -> usize { self.inner.max_datagram_size() } fn protocol(&self) -> Option<&str> { self.protocol.as_deref().or_else(|| self.inner.protocol()) } fn close(&self, code: u32, reason: &str) { self.inner.close(code, reason) } fn closed(&self) -> impl Future + web_transport_trait::MaybeSend { self.inner.closed() } } async fn connect_moq_session( relay_url: &Url, publish: moq_lite::OriginConsumer, tls_disable_verify: bool, ) -> Result { let host = relay_url .host_str() .ok_or_else(|| anyhow!("relay url missing host: {relay_url}"))? .to_string(); let port = relay_url.port().unwrap_or(443); // Build TLS config. let mut roots = rustls::RootCertStore::empty(); let native = rustls_native_certs::load_native_certs(); if !native.errors.is_empty() { tracing::warn!( errors = ?native.errors, "some native root certs could not be loaded" ); } for cert in native.certs { let _ = roots.add(cert); } let mut tls = rustls::ClientConfig::builder() .with_root_certificates(roots) .with_no_client_auth(); if tls_disable_verify { // Mirror moq-native's behavior: accept any certificate, but still verify signatures. #[derive(Debug)] struct NoCertificateVerification(Arc); impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification { fn verify_server_cert( &self, _end_entity: &rustls::pki_types::CertificateDer<'_>, _intermediates: &[rustls::pki_types::CertificateDer<'_>], _server_name: &rustls::pki_types::ServerName<'_>, _ocsp: &[u8], _now: rustls::pki_types::UnixTime, ) -> Result { Ok(rustls::client::danger::ServerCertVerified::assertion()) } fn verify_tls12_signature( &self, message: &[u8], cert: &rustls::pki_types::CertificateDer<'_>, dss: &rustls::DigitallySignedStruct, ) -> Result { rustls::crypto::verify_tls12_signature( message, cert, dss, &self.0.signature_verification_algorithms, ) } fn verify_tls13_signature( &self, message: &[u8], cert: &rustls::pki_types::CertificateDer<'_>, dss: &rustls::DigitallySignedStruct, ) -> Result { rustls::crypto::verify_tls13_signature( message, cert, dss, &self.0.signature_verification_algorithms, ) } fn supported_verify_schemes(&self) -> Vec { self.0.signature_verification_algorithms.supported_schemes() } } let provider = rustls::crypto::CryptoProvider::get_default().cloned().unwrap_or_else(|| { Arc::new(rustls::crypto::ring::default_provider()) }); tls.dangerous() .set_certificate_verifier(Arc::new(NoCertificateVerification(provider))); } // WebTransport over HTTP/3. tls.alpn_protocols = vec![web_transport_quinn::ALPN.as_bytes().to_vec()]; // Build a Quinn endpoint. let socket = std::net::UdpSocket::bind("[::]:0") .context("failed to bind UDP socket")?; let mut transport = quinn::TransportConfig::default(); transport.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap())); transport.keep_alive_interval(Some(Duration::from_secs(4))); transport.mtu_discovery_config(None); let transport = Arc::new(transport); let runtime = quinn::default_runtime().context("no async runtime")?; let endpoint_config = quinn::EndpointConfig::default(); let endpoint = quinn::Endpoint::new(endpoint_config, None, socket, runtime) .context("failed to create QUIC endpoint")?; // Resolve relay. let ip = tokio::net::lookup_host((host.clone(), port)) .await .context("failed DNS lookup")? .next() .context("no DNS entries")?; let quic: quinn::crypto::rustls::QuicClientConfig = tls.try_into()?; let mut client_cfg = quinn::ClientConfig::new(Arc::new(quic)); client_cfg.transport_config(transport); tracing::debug!(%ip, %host, %relay_url, "connecting QUIC"); let connection = endpoint .connect_with(client_cfg, ip, &host)? .await .context("failed QUIC connect")?; // Establish a WebTransport session. let mut request = web_transport_quinn::proto::ConnectRequest::new(relay_url.clone()); for alpn in moq_lite::ALPNS { request = request.with_protocol(alpn.to_string()); } let wt = web_transport_quinn::Session::connect(connection, request) .await .context("failed WebTransport CONNECT")?; // Establish a MoQ session. Cloudflare's relay currently does not always include a selected // subprotocol in the CONNECT response, so we attempt a few protocol overrides to select // the correct IETF draft encoding for SETUP. let client = moq_lite::Client::new().with_publish(publish); // These correspond to IETF draft ALPNs as used by moq-lite/web code. // We use string literals here since moq-lite does not currently expose these constants. let attempts: [&str; 4] = ["moqt-16", "moqt-15", "moq-00", ""]; let mut last_err: Option = None; for p in attempts { let session = ProtocolOverride { inner: wt.clone(), protocol: (!p.is_empty()).then(|| p.to_string()), }; match client.connect(session).await { Ok(session) => { tracing::info!(protocol = %p, "connected to relay"); return Ok(session); } Err(err) => { last_err = Some(anyhow::Error::new(err)); tracing::debug!(protocol = %p, err = %last_err.as_ref().unwrap(), "MoQ SETUP failed; retrying"); } } } Err(last_err.unwrap_or_else(|| anyhow!("failed to connect"))) .context("failed MoQ SETUP") } tracing::info!(url=%relay_url, name=%args.name, "connecting to relay"); let session = connect_moq_session(&relay_url, publish, args.tls_disable_verify).await?; // Spawn ffmpeg to generate fMP4 suitable for hang/moq-mux. let mut cmd = TokioCommand::new("ffmpeg"); cmd.arg("-hide_banner") .arg("-loglevel") .arg("error") .arg("-nostats") .arg("-fflags") .arg("+nobuffer") .arg("-flags") .arg("low_delay") .arg("-i") .arg(&args.input); if args.transcode { cmd.args([ "-c:v", "libx264", "-preset", "veryfast", "-tune", "zerolatency", "-pix_fmt", "yuv420p", "-profile:v", "main", "-g", "48", "-keyint_min", "48", "-sc_threshold", "0", "-threads", "1", "-c:a", "aac", "-b:a", "128k", "-ac", "2", "-ar", "48000", ]); } else { cmd.args(["-c", "copy"]); } cmd.args([ "-f", "mp4", "-movflags", "empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset", "pipe:1", ]); cmd.stdout(Stdio::piped()); cmd.stderr(Stdio::inherit()); tracing::info!(input=%args.input, "spawning ffmpeg"); let mut child = cmd.spawn().context("failed to spawn ffmpeg")?; let stdout = child .stdout .take() .ok_or_else(|| anyhow!("ffmpeg stdout unavailable"))?; let config = moq_mux::import::Fmp4Config { passthrough: args.passthrough, }; let mut importer = moq_mux::import::Fmp4::new(broadcast, catalog, config); let mut stdout = stdout; let mut decode_fut = importer.decode_from(&mut stdout); tokio::pin!(decode_fut); tracing::info!("publishing fMP4 -> moq-mux -> relay"); tokio::select! { res = &mut decode_fut => { let status = child.wait().await.context("failed to wait for ffmpeg")?; match res { Ok(()) if status.success() => Ok(()), Ok(()) => Err(anyhow!("ffmpeg exited with {status}")), Err(err) => Err(err).context("fmp4 ingest failed"), } } _ = session.closed() => { let _ = child.kill().await; Err(anyhow!("relay session closed")) } _ = tokio::signal::ctrl_c() => { tracing::info!("ctrl-c; shutting down"); let _ = child.kill().await; tokio::time::sleep(Duration::from_millis(100)).await; Ok(()) } } }