every.channel/crates/ec-node/src/main.rs
2026-02-18 01:49:04 -08:00

4578 lines
159 KiB
Rust

//! Node runner: orchestrates ingest, chunking, and MoQ publication.
mod source;
use anyhow::{anyhow, Context, Result};
use blake3;
use clap::ValueEnum;
use clap::{Parser, Subcommand};
use ec_chopper::{build_manifest_body_for_chunks, TsChunk};
use ec_core::{
merkle_proof_for_index, verify_merkle_proof, Manifest, ManifestSummary, ManifestVariant,
MoqStreamDescriptor, StreamCatalogEntry, StreamDescriptor, StreamEncryptionInfo, StreamId,
StreamKey, StreamMetadata,
};
use ec_crypto::{
decrypt_stream_data, encrypt_stream_data, load_manifest_keypair_from_env, sign_manifest_id,
verify_manifest_signature, ENCRYPTION_ALG,
};
use ec_direct::{decode_direct_link, encode_direct_link, DirectCodeV1};
use ec_iroh::DiscoveryConfig;
use ec_moq::{
chunk_duration_secs, decode_object_frame, encode_object_frame, FileRelay, GroupId, HlsWriter,
MoqNode, MoqPublishSet, ObjectId, ObjectMeta, ObjectPayload, TimingMeta, TrackName,
DEFAULT_MANIFEST_TRACK_NAME, DEFAULT_TRACK_NAME,
};
use iroh::Watcher;
use just_webrtc::types::{DataChannelOptions, ICEServer, PeerConfiguration, PeerConnectionState};
use just_webrtc::{DataChannelExt, PeerConnectionBuilder, PeerConnectionExt};
use source::{HdhrSource, HlsMode, HlsSource, LinuxDvbSource, StreamSource, TsSource};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs;
use std::fs::File;
use std::future::Future;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio_tungstenite::tungstenite::Message as WsMessage;
use futures_util::{SinkExt, StreamExt};
use tokio::process::Command as TokioCommand;
use url::Url;
const DIRECT_WIRE_TAG_FRAME: u8 = 0x00;
const DIRECT_WIRE_TAG_STREAM: u8 = 0x01;
const DIRECT_WIRE_TAG_PING: u8 = 0x02;
const DIRECT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(8);
// Conservatively under typical SCTP data channel max message sizes.
const DIRECT_WIRE_CHUNK_BYTES: usize = 16 * 1024;
use tokio::sync::mpsc;
use tokio::sync::RwLock;
#[derive(Parser, Debug)]
#[command(name = "ec-node")]
#[command(about = "every.channel node runner", long_about = None)]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand, Debug)]
enum Commands {
/// Ingest a source and publish MoQ objects into a local relay directory.
Ingest(IngestArgs),
/// Ingest a source and publish via MoQ over iroh.
MoqPublish(MoqPublishArgs),
/// Subscribe to a MoQ stream and write HLS segments locally.
MoqSubscribe(MoqSubscribeArgs),
/// Publish and subscribe to a MoQ stream locally, verifying chunk hashes.
MoqSelftest(MoqSelftestArgs),
/// Publish a stream over a direct WebRTC data channel (manual copy/paste connect code).
DirectPublish(DirectPublishArgs),
/// Subscribe to a direct WebRTC stream (directory or offer link) and optionally capture an .mp4 proof.
DirectSubscribe(DirectSubscribeArgs),
/// Publish a stream to the global one-to-many relay (`/api/stream/ws`) and keep the directory entry live.
WsPublish(WsPublishArgs),
/// Subscribe to the global one-to-many relay (`/api/stream/ws`) and capture CMAF fragments + an mp4 proof.
WsSubscribe(WsSubscribeArgs),
/// Publish a CMAF (fMP4) stream to a MoQ relay over WebTransport (Cloudflare preview by default).
WtPublish(WtPublishArgs),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
enum EncodeMode {
/// Publish/subscribe CMAF-style fragmented MP4 segments (HLS fMP4) encoded with x264/AAC.
Cmaf,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
enum CmafLadderPreset {
/// 3-rung ladder: 1080p@6000k, 720p@3000k, 480p@1200k (CBR-ish).
Hd3,
}
#[derive(Parser, Debug)]
struct IngestArgs {
/// Output directory for temporary chunks.
#[arg(long, default_value = "./tmp/chunks")]
chunk_dir: PathBuf,
/// Relay directory to write MoQ objects.
#[arg(long, default_value = "./tmp/relay")]
relay_dir: PathBuf,
/// Chunk duration in ms.
#[arg(long, default_value_t = 2000)]
chunk_ms: u64,
/// Maximum number of chunks to write.
#[arg(long)]
max_chunks: Option<usize>,
/// Optional stream id override.
#[arg(long)]
stream_id: Option<String>,
/// Optional network secret (hex) for stream encryption.
#[arg(long)]
network_secret: Option<String>,
/// Enable deterministic transcode before chunking.
#[arg(long)]
deterministic: bool,
#[command(subcommand)]
source: IngestSource,
}
#[derive(Parser, Debug)]
struct MoqPublishArgs {
/// Output directory for temporary chunks.
#[arg(long, default_value = "./tmp/chunks")]
chunk_dir: PathBuf,
/// Chunk duration in ms.
#[arg(long, default_value_t = 2000)]
chunk_ms: u64,
/// Maximum number of chunks to write.
#[arg(long)]
max_chunks: Option<usize>,
/// Optional stream id override.
#[arg(long)]
stream_id: Option<String>,
/// Optional network secret (hex) for stream encryption.
#[arg(long)]
network_secret: Option<String>,
/// Enable deterministic transcode before chunking.
#[arg(long)]
deterministic: bool,
/// Broadcast name override (defaults to stream id).
#[arg(long)]
broadcast_name: Option<String>,
/// Track name override.
#[arg(long, default_value = DEFAULT_TRACK_NAME)]
track_name: String,
/// Publish chunk objects on the main track.
#[arg(long, default_value_t = true, action = clap::ArgAction::Set)]
publish_chunks: bool,
/// Publish manifests alongside chunks.
#[arg(long)]
publish_manifests: bool,
/// Track name for manifest frames.
#[arg(long, default_value = DEFAULT_MANIFEST_TRACK_NAME)]
manifest_track: String,
/// Number of chunks per manifest epoch.
#[arg(long, default_value_t = 1)]
epoch_chunks: usize,
/// Optional iroh secret key (hex).
#[arg(long)]
iroh_secret: Option<String>,
/// Discovery modes to enable (comma-separated: dht, mdns, dns).
#[arg(long)]
discovery: Option<String>,
/// Announce catalog entries over iroh-gossip (requires peers).
#[arg(long)]
announce: bool,
/// Gossip peers to connect to (repeatable).
#[arg(long)]
gossip_peer: Vec<String>,
/// Optional startup delay (ms) after binding/publishing tracks, before ingest begins.
/// Useful for E2E tests that need time to connect subscribers.
#[arg(long)]
startup_delay_ms: Option<u64>,
/// Encoding/container mode.
#[arg(long, value_enum, default_value_t = EncodeMode::Cmaf)]
encode: EncodeMode,
/// Track name for CMAF init segment objects.
#[arg(long, default_value = "init")]
init_track: String,
/// Publish a CMAF ladder (multiple quality variants) using x264/AAC.
#[arg(long, value_enum)]
cmaf_ladder: Option<CmafLadderPreset>,
#[command(subcommand)]
source: IngestSource,
}
#[derive(Parser, Debug)]
struct MoqSubscribeArgs {
/// Output directory for HLS segments.
#[arg(long, default_value = "./tmp/moq-hls")]
output_dir: PathBuf,
/// Fallback chunk duration in ms.
#[arg(long, default_value_t = 2000)]
chunk_ms: u64,
/// HLS window size.
#[arg(long, default_value_t = 6)]
window: usize,
/// Optional stream id override (for decryption).
#[arg(long)]
stream_id: Option<String>,
/// Optional network secret (hex) for stream decryption.
#[arg(long)]
network_secret: Option<String>,
/// Remote endpoint address (iroh EndpointAddr).
#[arg(long)]
remote: String,
/// Optional remote endpoint address to fetch manifests from.
/// Defaults to `--remote` when not provided.
#[arg(long)]
remote_manifests: Option<String>,
/// Broadcast name to subscribe to.
#[arg(long)]
broadcast_name: String,
/// Track name to subscribe to.
#[arg(long, default_value = DEFAULT_TRACK_NAME)]
track_name: String,
/// Subscribe to manifest frames.
#[arg(long)]
subscribe_manifests: bool,
/// Require a manifest to accept chunk data.
#[arg(long)]
require_manifest: bool,
/// Track name for manifest frames.
#[arg(long, default_value = DEFAULT_MANIFEST_TRACK_NAME)]
manifest_track: String,
/// Allowed manifest signer ids (comma-separated).
#[arg(long)]
manifest_signers: Option<String>,
/// Maximum bytes per second to accept (anti-junk).
#[arg(long)]
max_bytes_per_sec: Option<u64>,
/// Maximum burst bytes before throttling.
#[arg(long)]
max_bytes_burst: Option<u64>,
/// Maximum invalid chunks before disconnect.
#[arg(long, default_value_t = 1)]
max_invalid_chunks: u32,
/// Stop after writing this many segments (useful for E2E tests).
#[arg(long)]
stop_after: Option<u64>,
/// Optional iroh secret key (hex).
#[arg(long)]
iroh_secret: Option<String>,
/// Discovery modes to enable (comma-separated: dht, mdns, dns).
#[arg(long)]
discovery: Option<String>,
/// Container mode for local HLS output.
#[arg(long, value_enum, default_value_t = EncodeMode::Cmaf)]
container: EncodeMode,
/// Track name to subscribe to for CMAF init segment objects.
#[arg(long, default_value = "init")]
init_track: String,
/// Subscribe to the init segment track (CMAF).
#[arg(long)]
subscribe_init: bool,
/// Write raw CMAF init+segments (no HLS playlist) to `--output-dir`.
#[arg(long)]
raw_cmaf: bool,
}
#[derive(Parser, Debug)]
struct MoqSelftestArgs {
/// Input TS file or URL (e.g. http://HDHR_HOST/auto/v8.1).
input: String,
/// Output directory for temporary chunks.
#[arg(long, default_value = "./tmp/moq-selftest")]
chunk_dir: PathBuf,
/// Chunk duration in ms.
#[arg(long, default_value_t = 2000)]
chunk_ms: u64,
/// Maximum number of chunks to publish/verify.
#[arg(long, default_value_t = 8)]
max_chunks: usize,
/// Optional stream id override.
#[arg(long)]
stream_id: Option<String>,
/// Track name override.
#[arg(long, default_value = DEFAULT_TRACK_NAME)]
track_name: String,
/// Discovery modes to enable (comma-separated: dht, mdns, dns).
#[arg(long)]
discovery: Option<String>,
}
#[derive(Parser, Debug)]
struct DirectPublishArgs {
/// Global stream id (used for directory/signaling). If omitted, a fresh id is generated.
#[arg(long)]
stream_id: Option<String>,
/// Human-friendly title (used for directory listing).
#[arg(long, default_value = "Live channel")]
title: String,
/// Optional directory base URL (e.g. https://every.channel). When set, the publisher
/// announces the offer to `/api/announce` and polls `/api/answer`.
#[arg(long)]
directory_url: Option<String>,
/// Offer TTL (ms) when announcing to the directory.
#[arg(long, default_value_t = 20000)]
announce_ttl_ms: u64,
/// Output directory for temporary chunks.
#[arg(long, default_value = "./tmp/direct-chunks")]
chunk_dir: PathBuf,
/// Chunk duration in ms.
#[arg(long, default_value_t = 2000)]
chunk_ms: u64,
/// Maximum number of media segments to send.
#[arg(long, default_value_t = 30)]
max_segments: usize,
/// Optional answer code/link to avoid interactive stdin.
#[arg(long)]
answer: Option<String>,
/// How long to wait for a browser answer when using `--directory-url` or stdin (seconds).
/// Set to 0 to wait indefinitely.
#[arg(long, default_value_t = 900)]
answer_timeout_secs: u64,
/// Ingest source.
#[command(subcommand)]
source: IngestSource,
}
#[derive(Parser, Debug)]
struct DirectSubscribeArgs {
/// Directory base URL (e.g. https://every.channel). Used to locate offers by stream_id
/// and to POST answers back to the publisher.
#[arg(long, default_value = "https://every.channel")]
directory_url: String,
/// Stream id to locate in the directory.
#[arg(long)]
stream_id: Option<String>,
/// Direct offer link/code to use instead of looking up a stream in the directory.
#[arg(long)]
offer: Option<String>,
/// Output directory for captured CMAF fragments (init+segments) and index.m3u8.
#[arg(long, default_value = "./tmp/direct-subscribe")]
out_dir: PathBuf,
/// Maximum number of media segments to capture (init not counted).
#[arg(long, default_value_t = 12)]
max_segments: usize,
/// Optional time limit (seconds). When set, capture stops after this duration even if `--max-segments` is not reached.
#[arg(long)]
duration_secs: Option<u64>,
/// If set, remux the captured playlist to this mp4 path (best-effort, `ffmpeg -c copy`).
#[arg(long)]
mp4: Option<PathBuf>,
}
#[derive(Parser, Debug)]
struct WsPublishArgs {
/// Global stream id (used for directory listing + relay instance key). If omitted, a fresh id is generated.
#[arg(long)]
stream_id: Option<String>,
/// Human-friendly title (used for directory listing).
#[arg(long, default_value = "Live channel")]
title: String,
/// Directory base URL (e.g. https://every.channel). Used for listing at `/api/announce`,
/// and as the base for relay websocket URL (`/api/stream/ws`).
#[arg(long, default_value = "https://every.channel")]
directory_url: String,
/// Offer TTL (ms) when announcing to the directory.
#[arg(long, default_value_t = 20000)]
announce_ttl_ms: u64,
/// Output directory for temporary chunks.
#[arg(long, default_value = "./tmp/ws-chunks")]
chunk_dir: PathBuf,
/// Chunk duration in ms.
#[arg(long, default_value_t = 2000)]
chunk_ms: u64,
/// Maximum number of media segments to send (init not counted). Set high for "run forever" behavior.
#[arg(long, default_value_t = 1_000_000)]
max_segments: usize,
/// Ingest source.
#[command(subcommand)]
source: IngestSource,
}
#[derive(Parser, Debug)]
struct WsSubscribeArgs {
/// Directory base URL (e.g. https://every.channel). Used for relay websocket URL (`/api/stream/ws`).
#[arg(long, default_value = "https://every.channel")]
directory_url: String,
/// Stream id to subscribe to.
#[arg(long)]
stream_id: String,
/// Output directory for captured CMAF fragments (init+segments) and index.m3u8.
#[arg(long, default_value = "./tmp/ws-subscribe")]
out_dir: PathBuf,
/// Maximum number of media segments to capture (init not counted).
#[arg(long, default_value_t = 12)]
max_segments: usize,
/// Optional time limit (seconds). When set, capture stops after this duration even if `--max-segments` is not reached.
#[arg(long)]
duration_secs: Option<u64>,
/// If set, remux the captured playlist to this mp4 path (best-effort, `ffmpeg -c copy`).
#[arg(long)]
mp4: Option<PathBuf>,
}
#[derive(Parser, Debug)]
struct WtPublishArgs {
/// Relay URL (WebTransport) to connect to.
/// Default points at Cloudflare's MoQ technical preview relay.
#[arg(long, default_value = "https://relay.cloudflare.mediaoverquic.com/")]
url: String,
/// Broadcast name to publish.
///
/// This should be stable so you can share:
/// `https://every.channel/watch?url=...&name=<broadcast>`.
#[arg(long)]
name: String,
/// Input URL or file for ffmpeg (e.g. HDHomeRun `http://hdhomerun.local/auto/v4.1`).
#[arg(long)]
input: String,
/// If set, transcode to H.264/AAC before fragmenting to fMP4.
#[arg(long, default_value_t = true, action = clap::ArgAction::Set)]
transcode: bool,
/// Transmit fMP4 fragments directly (passthrough mode).
/// When false, the importer may reframe into CMAF fragments.
#[arg(long, default_value_t = true, action = clap::ArgAction::Set)]
passthrough: bool,
/// Danger: disable TLS verification for the relay.
#[arg(long, default_value_t = false)]
tls_disable_verify: bool,
}
#[derive(Subcommand, Debug)]
enum IngestSource {
/// Ingest from an HDHomeRun device.
Hdhr {
/// Hostname or IP (e.g. 192.168.1.10). If omitted, auto-discover.
#[arg(long)]
host: Option<String>,
/// Device ID (uses <deviceid>.local when host is omitted).
#[arg(long)]
device_id: Option<String>,
/// Channel number (e.g. 8.1).
#[arg(long)]
channel: Option<String>,
/// Channel name (e.g. KQED).
#[arg(long)]
name: Option<String>,
/// Prefer mDNS (hdhomerun.local) before UDP discovery.
#[arg(long)]
prefer_mdns: bool,
},
/// Ingest from an HLS playlist URL.
Hls {
/// HLS playlist URL.
url: String,
/// Ingest mode (passthrough, remux, transcode).
#[arg(long, value_enum, default_value_t = HlsMode::Passthrough)]
mode: HlsMode,
},
/// Ingest from a Linux DVB device.
LinuxDvb {
/// DVB adapter index.
#[arg(long, default_value_t = 0)]
adapter: u32,
/// DVR device index.
#[arg(long, default_value_t = 0)]
dvr: u32,
/// Optional tune command (repeat for each arg).
#[arg(long)]
tune_cmd: Vec<String>,
/// Optional tune wait (ms).
#[arg(long)]
tune_wait_ms: Option<u64>,
},
/// Ingest from a raw TS file or URL.
Ts {
/// Input TS file or URL.
input: String,
},
}
fn main() -> Result<()> {
// rustls 0.23 requires an explicit process-level CryptoProvider when multiple providers
// are enabled. We install a default provider once at startup.
if rustls::crypto::CryptoProvider::get_default().is_none() {
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let _ = rustls::crypto::ring::default_provider().install_default();
}
// Keep stdout reserved for machine-readable output (endpoint addr, etc).
let filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.with_env_filter(filter)
.init();
let cli = Cli::parse();
match cli.command {
Commands::Ingest(args) => ingest(args)?,
Commands::MoqPublish(args) => run_async(moq_publish(args))?,
Commands::MoqSubscribe(args) => run_async(moq_subscribe(args))?,
Commands::MoqSelftest(args) => run_async(moq_selftest(args))?,
Commands::DirectPublish(args) => run_async(direct_publish(args))?,
Commands::DirectSubscribe(args) => run_async(direct_subscribe(args))?,
Commands::WsPublish(args) => run_async(ws_publish(args))?,
Commands::WsSubscribe(args) => run_async(ws_subscribe(args))?,
Commands::WtPublish(args) => run_async(wt_publish(args))?,
}
Ok(())
}
fn run_async<F>(future: F) -> Result<()>
where
F: Future<Output = Result<()>>,
{
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
runtime.block_on(future)
}
fn ingest(args: IngestArgs) -> Result<()> {
fs::create_dir_all(&args.chunk_dir)
.with_context(|| format!("failed to create {}", args.chunk_dir.display()))?;
let deterministic = deterministic_enabled(args.deterministic);
let (source, _needs_transcode): (Box<dyn StreamSource>, bool) = match args.source {
IngestSource::Hls { url, mut mode } => {
if deterministic {
mode = HlsMode::Transcode;
}
(Box::new(HlsSource { url, mode }), false)
}
IngestSource::Hdhr {
host,
device_id,
channel,
name,
prefer_mdns,
} => (
Box::new(HdhrSource {
host,
device_id,
channel,
name,
prefer_mdns,
}),
deterministic,
),
IngestSource::LinuxDvb {
adapter,
dvr,
tune_cmd,
tune_wait_ms,
} => (
Box::new(LinuxDvbSource {
adapter,
dvr,
tune_cmd,
tune_wait_ms,
}),
deterministic,
),
IngestSource::Ts { input } => (Box::new(TsSource { input }), deterministic),
};
let source_id = source.source_id();
let source_id_for_stream = source_id.clone();
let reader = source.open_stream()?;
let encoder_profile_id = if deterministic {
"deterministic-h264-aac".to_string()
} else {
// NOTE: We still normalize into CMAF for interoperability, even when determinism is off.
"h264-aac".to_string()
};
// CMAF-only: segment into init.mp4 + segment_*.m4s via ffmpeg.
let out_dir = args.chunk_dir.join("cmaf");
let (init_path, segments) = chunk_stream_cmaf_ffmpeg(
reader,
&out_dir,
args.chunk_ms,
args.max_chunks.unwrap_or(usize::MAX),
deterministic,
)?;
let mut chunk_hashes = Vec::with_capacity(1 + segments.len());
// Chunk index 0 is always init.mp4; segments are 1..N.
let (init_bytes, init_hash) = read_chunk_bytes_and_hash(&init_path)?;
chunk_hashes.push(init_hash);
let mut segment_meta = Vec::with_capacity(segments.len());
for seg_path in &segments {
let (bytes, hash) = read_chunk_bytes_and_hash(seg_path)?;
chunk_hashes.push(hash.clone());
segment_meta.push((seg_path.clone(), bytes, hash));
}
let chunk_start_index = 0u64;
let created_unix_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let relay = FileRelay::new(args.relay_dir);
let track = TrackName {
namespace: "every.channel".to_string(),
name: args.stream_id.unwrap_or_else(|| {
StreamKey {
version: 1,
broadcast: None,
source: Some(source_id_for_stream),
profile: Some(format!("chunk-{}ms", args.chunk_ms)),
variant: None,
}
.to_stream_id()
.0
}),
};
let stream_id = StreamId(track.name.clone());
let manifest_payload = build_manifest(
stream_id,
format!("epoch-{created_unix_ms}"),
args.chunk_ms,
chunk_start_index,
encoder_profile_id,
created_unix_ms,
vec![StreamMetadata {
key: "source_kind".to_string(),
value: source_id.kind.clone(),
}],
chunk_hashes,
)?;
let manifest_id = manifest_payload.manifest_id.clone();
let manifest_path = args.chunk_dir.join("manifest.json");
fs::write(
&manifest_path,
serde_json::to_vec_pretty(&manifest_payload)?,
)?;
let network_secret = parse_network_secret(args.network_secret)?;
// Publish init at chunk_index 0 to avoid colliding with segment_000000.
publish_chunk_file(
&relay,
&track,
TsChunk {
index: 0,
path: init_path,
timing: ec_chopper::ChunkTiming {
chunk_index: 0,
chunk_start_27mhz: None,
chunk_duration_27mhz: 0,
utc_start_unix: None,
sync_status: "init".to_string(),
},
},
"video/mp4",
Some(init_bytes),
network_secret.as_deref(),
Some(&manifest_id),
)?;
for (i, (seg_path, bytes, _hash)) in segment_meta.into_iter().enumerate() {
let chunk_index = (i as u64) + 1;
publish_chunk_file(
&relay,
&track,
TsChunk {
index: chunk_index,
path: seg_path,
timing: ec_chopper::ChunkTiming {
chunk_index,
chunk_start_27mhz: None,
chunk_duration_27mhz: args.chunk_ms * 27_000,
utc_start_unix: None,
sync_status: "cmaf".to_string(),
},
},
"video/iso.segment",
Some(bytes),
network_secret.as_deref(),
Some(&manifest_id),
)?;
}
Ok(())
}
fn publish_chunk_file(
relay: &FileRelay,
track: &TrackName,
chunk: TsChunk,
content_type: &str,
data_override: Option<Vec<u8>>,
network_secret: Option<&[u8]>,
manifest_id: Option<&str>,
) -> Result<()> {
let (data, chunk_hash) = match data_override {
Some(bytes) => {
let hash = blake3::hash(&bytes).to_hex().to_string();
(bytes, hash)
}
None => read_chunk_bytes_and_hash(&chunk.path)?,
};
let object = build_object(
chunk,
data,
chunk_hash,
None,
network_secret,
manifest_id,
content_type,
&track.name,
)?;
relay.write_object(
track,
GroupId(
object
.meta
.timing
.as_ref()
.map(|t| t.chunk_index)
.unwrap_or(0),
),
ObjectId(
object
.meta
.timing
.as_ref()
.map(|t| t.chunk_index)
.unwrap_or(0),
),
&object,
)
}
fn chunk_stream_cmaf_ffmpeg(
mut reader: Box<dyn Read + Send>,
out_dir: &std::path::Path,
chunk_ms: u64,
max_segments: usize,
deterministic: bool,
) -> Result<(std::path::PathBuf, Vec<std::path::PathBuf>)> {
let _ = fs::remove_dir_all(out_dir);
fs::create_dir_all(out_dir)
.with_context(|| format!("failed to create {}", out_dir.display()))?;
let profile = if deterministic {
Some(ec_chopper::deterministic_h264_profile())
} else {
// For now, keep encoder args stable even when determinism is off.
Some(ec_chopper::deterministic_h264_profile())
};
let mut cmd = Command::new("ffmpeg");
cmd.current_dir(out_dir);
cmd.arg("-hide_banner")
.arg("-loglevel")
.arg("error")
.arg("-nostdin")
.arg("-y")
.arg("-i")
.arg("pipe:0")
// Keep stream mapping predictable.
.arg("-map")
.arg("0:v:0")
.arg("-map")
.arg("0:a:0?")
.arg("-sn")
.arg("-dn")
.arg("-map_metadata")
.arg("-1")
// Reduce opportunities for non-deterministic scheduling in filters/decoders.
.arg("-filter_threads")
.arg("1")
.arg("-filter_complex_threads")
.arg("1")
.arg("-threads")
.arg("1");
if let Some(profile) = profile {
for arg in ec_chopper::ffmpeg_profile_args(&profile) {
cmd.arg(arg);
}
}
let seg_time = format!("{:.3}", chunk_ms as f64 / 1000.0);
cmd.arg("-f")
.arg("hls")
.arg("-hls_time")
.arg(seg_time)
.arg("-hls_list_size")
.arg("0")
.arg("-hls_segment_type")
.arg("fmp4")
.arg("-hls_flags")
.arg("independent_segments")
.arg("-hls_fmp4_init_filename")
.arg("init.mp4")
.arg("-hls_segment_filename")
.arg("segment_%06d.m4s")
.arg("index.m3u8")
.stdin(Stdio::piped())
.stdout(Stdio::null())
.stderr(Stdio::inherit());
let mut child = cmd.spawn().with_context(|| "failed to spawn ffmpeg")?;
let mut stdin = child
.stdin
.take()
.ok_or_else(|| anyhow!("ffmpeg stdin unavailable"))?;
let writer = std::thread::spawn(move || -> Result<()> {
std::io::copy(&mut reader, &mut stdin)?;
Ok(())
});
let init_path = out_dir.join("init.mp4");
wait_for_stable_file(&init_path, Duration::from_secs(20))?;
let mut segments = Vec::new();
for i in 0..max_segments {
let seg_path = out_dir.join(format!("segment_{i:06}.m4s"));
match wait_for_stable_file(&seg_path, Duration::from_secs(30)) {
Ok(()) => segments.push(seg_path),
Err(err) => {
// If ffmpeg ended cleanly, stop; otherwise bubble the error.
if let Ok(Some(status)) = child.try_wait() {
if status.success() {
break;
}
return Err(anyhow!("ffmpeg exited with {status} ({err:#})"));
}
return Err(err);
}
}
}
let _ = child.kill();
let _ = child.wait();
let _ = writer.join();
Ok((init_path, segments))
}
fn parse_network_secret(value: Option<String>) -> Result<Option<Vec<u8>>> {
let value = value.or_else(|| std::env::var("EVERY_CHANNEL_NETWORK_SECRET").ok());
let Some(value) = value else { return Ok(None) };
let bytes = hex::decode(value).context("network secret must be hex")?;
Ok(Some(bytes))
}
fn deterministic_enabled(flag: bool) -> bool {
if flag {
return true;
}
std::env::var("EVERY_CHANNEL_DETERMINISTIC")
.ok()
.map(|value| {
let value = value.trim().to_ascii_lowercase();
value == "1" || value == "true" || value == "yes" || value == "on"
})
.unwrap_or(false)
}
fn read_chunk_bytes_and_hash(path: &std::path::Path) -> Result<(Vec<u8>, String)> {
let data = fs::read(path).with_context(|| format!("failed to read {}", path.display()))?;
let hash = blake3::hash(&data).to_hex().to_string();
Ok((data, hash))
}
fn build_manifest(
stream_id: StreamId,
epoch_id: impl Into<String>,
chunk_duration_ms: u64,
chunk_start_index: u64,
encoder_profile_id: impl Into<String>,
created_unix_ms: u64,
metadata: Vec<StreamMetadata>,
chunk_hashes: Vec<String>,
) -> Result<Manifest> {
let body = build_manifest_body_for_chunks(
stream_id,
epoch_id,
chunk_duration_ms,
chunk_start_index,
encoder_profile_id,
created_unix_ms,
metadata,
&chunk_hashes,
)?;
let manifest_id = body.manifest_id()?;
let mut signatures = Vec::new();
if let Some(keypair) = load_manifest_keypair_from_env().map_err(|err| anyhow!(err))? {
signatures.push(sign_manifest_id(&manifest_id, &keypair));
}
Ok(Manifest {
body,
manifest_id,
signatures,
})
}
#[derive(Debug, Clone)]
struct CmafVariantSpec {
id: String,
width: u32,
height: u32,
video_bitrate_kbps: u32,
}
fn cmaf_ladder_variants(preset: CmafLadderPreset) -> Vec<CmafVariantSpec> {
match preset {
CmafLadderPreset::Hd3 => vec![
CmafVariantSpec {
id: "1080p".to_string(),
width: 1920,
height: 1080,
video_bitrate_kbps: 6000,
},
CmafVariantSpec {
id: "720p".to_string(),
width: 1280,
height: 720,
video_bitrate_kbps: 3000,
},
CmafVariantSpec {
id: "480p".to_string(),
width: 854,
height: 480,
video_bitrate_kbps: 1200,
},
],
}
}
fn sanitize_component(value: &str) -> String {
value
.chars()
.map(|c| match c {
'a'..='z' | '0'..='9' | '-' | '_' | '/' => c,
'A'..='Z' => c.to_ascii_lowercase(),
_ => '_',
})
.collect()
}
fn derive_variant_stream_id(base_stream_id: &str, variant_id: &str) -> String {
// Match StreamKey encoding style (`variant-...`) without requiring we reconstruct StreamKey.
let v = sanitize_component(variant_id);
format!("{}/variant-{}", base_stream_id.trim_end_matches('/'), v)
}
fn build_multi_variant_manifest(
base_stream_id: StreamId,
epoch_id: String,
chunk_duration_ms: u64,
chunk_start_index: u64,
encoder_profile_id: String,
created_unix_ms: u64,
metadata: Vec<StreamMetadata>,
variants: &[CmafVariantSpec],
variant_chunk_start_index: u64,
per_variant_hash: Vec<(String, String)>,
) -> Result<Manifest> {
let mut entries = Vec::new();
for variant in variants {
let Some((_, hash)) = per_variant_hash.iter().find(|(id, _)| id == &variant.id) else {
return Err(anyhow!("missing hash for variant {}", variant.id));
};
let chunk_hashes = vec![hash.clone()];
let merkle_root =
ec_core::merkle_root_from_hashes(&chunk_hashes).map_err(|err| anyhow!("{err}"))?;
let stream_id = StreamId(derive_variant_stream_id(&base_stream_id.0, &variant.id));
entries.push(ManifestVariant {
variant_id: variant.id.clone(),
stream_id,
chunk_start_index: variant_chunk_start_index,
total_chunks: 1,
merkle_root,
chunk_hashes,
metadata: vec![
StreamMetadata {
key: "width".to_string(),
value: variant.width.to_string(),
},
StreamMetadata {
key: "height".to_string(),
value: variant.height.to_string(),
},
StreamMetadata {
key: "video_bitrate_kbps".to_string(),
value: variant.video_bitrate_kbps.to_string(),
},
],
});
}
entries.sort_by(|a, b| a.variant_id.cmp(&b.variant_id));
let roots = entries
.iter()
.map(|v| v.merkle_root.clone())
.collect::<Vec<_>>();
let body_root = ec_core::merkle_root_from_hashes(&roots).map_err(|err| anyhow!("{err}"))?;
let body = ec_core::ManifestBody {
stream_id: base_stream_id,
epoch_id,
chunk_duration_ms,
total_chunks: 1,
chunk_start_index,
encoder_profile_id,
merkle_root: body_root,
created_unix_ms,
metadata,
chunk_hashes: Vec::new(),
variants: Some(entries),
};
let manifest_id = body.manifest_id()?;
let mut signatures = Vec::new();
if let Some(keypair) = load_manifest_keypair_from_env().map_err(|err| anyhow!(err))? {
signatures.push(sign_manifest_id(&manifest_id, &keypair));
}
Ok(Manifest {
body,
manifest_id,
signatures,
})
}
struct EpochBuffer {
capacity: usize,
chunks: Vec<TsChunk>,
data: Vec<Option<Vec<u8>>>,
hashes: Vec<String>,
start_index: Option<u64>,
}
impl EpochBuffer {
fn new(capacity: usize) -> Self {
Self {
capacity: capacity.max(1),
chunks: Vec::new(),
data: Vec::new(),
hashes: Vec::new(),
start_index: None,
}
}
fn push(&mut self, chunk: TsChunk, data: Option<Vec<u8>>, hash: String) {
if self.start_index.is_none() {
self.start_index = Some(chunk.timing.chunk_index);
}
self.chunks.push(chunk);
self.data.push(data);
self.hashes.push(hash);
}
fn is_full(&self) -> bool {
self.chunks.len() >= self.capacity
}
fn is_empty(&self) -> bool {
self.chunks.is_empty()
}
fn start_index(&self) -> u64 {
self.start_index.unwrap_or(0)
}
fn take(&mut self) -> (Vec<TsChunk>, Vec<Option<Vec<u8>>>, Vec<String>) {
self.start_index = None;
let chunks = std::mem::take(&mut self.chunks);
let data = std::mem::take(&mut self.data);
let hashes = std::mem::take(&mut self.hashes);
(chunks, data, hashes)
}
}
fn parse_manifest_allowlist(value: Option<&str>) -> Option<HashSet<String>> {
let value = value?.trim();
if value.is_empty() {
return None;
}
let set = value
.split(|c: char| c == ',' || c == ';' || c.is_whitespace())
.filter_map(|token| {
let trimmed = token.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
})
.collect::<HashSet<_>>();
if set.is_empty() {
None
} else {
Some(set)
}
}
fn validate_manifest(manifest: &Manifest, allowlist: Option<&HashSet<String>>) -> bool {
let body_id = match manifest.body.manifest_id() {
Ok(id) => id,
Err(_) => return false,
};
if body_id != manifest.manifest_id {
return false;
}
if let Some(variants) = manifest.body.variants.as_ref().filter(|v| !v.is_empty()) {
// Multi-variant: validate each variant and ensure body merkle_root commits to per-variant roots.
let mut roots = Vec::with_capacity(variants.len());
for variant in variants {
if variant.chunk_hashes.len() != variant.total_chunks as usize {
return false;
}
match ec_core::merkle_root_from_hashes(&variant.chunk_hashes) {
Ok(root) if root == variant.merkle_root => {}
_ => return false,
}
roots.push((variant.variant_id.clone(), variant.merkle_root.clone()));
}
roots.sort_by(|a, b| a.0.cmp(&b.0));
let ordered = roots.into_iter().map(|(_, root)| root).collect::<Vec<_>>();
match ec_core::merkle_root_from_hashes(&ordered) {
Ok(root) if root == manifest.body.merkle_root => {}
_ => return false,
}
} else if !manifest.body.chunk_hashes.is_empty() {
if manifest.body.chunk_hashes.len() != manifest.body.total_chunks as usize {
return false;
}
match ec_core::merkle_root_from_hashes(&manifest.body.chunk_hashes) {
Ok(root) if root == manifest.body.merkle_root => {}
_ => return false,
}
}
if let Some(allowlist) = allowlist {
return manifest.signatures.iter().any(|sig| {
verify_manifest_signature(&manifest.manifest_id, sig)
&& allowlist.contains(&sig.signer_id)
});
}
if manifest.signatures.is_empty() {
// Unsigned manifests are only acceptable when they include full hashes for verification.
return !manifest.body.chunk_hashes.is_empty()
|| manifest
.body
.variants
.as_ref()
.is_some_and(|v| !v.is_empty());
}
manifest
.signatures
.iter()
.any(|sig| verify_manifest_signature(&manifest.manifest_id, sig))
}
fn strip_init_suffix(key_id: &str) -> &str {
key_id.strip_suffix("/init").unwrap_or(key_id)
}
fn manifest_hash_for_chunk(
manifest: &Manifest,
stream_id: &str,
chunk_index: u64,
) -> Option<String> {
if let Some(variants) = manifest.body.variants.as_ref() {
let variant = variants.iter().find(|v| v.stream_id.0 == stream_id)?;
if chunk_index < variant.chunk_start_index {
return None;
}
let offset = (chunk_index - variant.chunk_start_index) as usize;
return variant.chunk_hashes.get(offset).cloned();
}
// Legacy single-variant manifests.
if manifest.body.stream_id.0 != stream_id {
return None;
}
if chunk_index < manifest.body.chunk_start_index {
return None;
}
let offset = (chunk_index - manifest.body.chunk_start_index) as usize;
manifest.body.chunk_hashes.get(offset).cloned()
}
fn manifest_covers_stream_index(manifest: &Manifest, stream_id: &str, chunk_index: u64) -> bool {
if let Some(variants) = manifest.body.variants.as_ref() {
let Some(variant) = variants.iter().find(|v| v.stream_id.0 == stream_id) else {
return false;
};
if chunk_index < variant.chunk_start_index {
return false;
}
let end = variant
.chunk_start_index
.saturating_add(variant.total_chunks as u64);
return chunk_index < end;
}
if manifest.body.stream_id.0 != stream_id {
return false;
}
if chunk_index < manifest.body.chunk_start_index {
return false;
}
let end = manifest
.body
.chunk_start_index
.saturating_add(manifest.body.total_chunks as u64);
chunk_index < end
}
fn find_manifest_for_stream_index(
store: &HashMap<String, Manifest>,
stream_id: &str,
chunk_index: u64,
) -> Option<Manifest> {
// Prefer the latest manifest whose range covers the index for this stream.
let mut best: Option<&Manifest> = None;
for manifest in store.values() {
if !manifest_covers_stream_index(manifest, stream_id, chunk_index) {
continue;
}
match best {
None => best = Some(manifest),
Some(current) => {
if manifest.body.chunk_start_index >= current.body.chunk_start_index {
best = Some(manifest);
}
}
}
}
best.cloned()
}
fn build_object(
chunk: TsChunk,
data: Vec<u8>,
chunk_hash: String,
chunk_proof: Option<Vec<String>>,
network_secret: Option<&[u8]>,
manifest_id: Option<&str>,
content_type: &str,
key_id: &str,
) -> Result<ObjectPayload> {
let created_unix_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let timing = TimingMeta {
chunk_index: chunk.timing.chunk_index,
chunk_start_27mhz: chunk.timing.chunk_start_27mhz.unwrap_or(0),
chunk_duration_27mhz: chunk.timing.chunk_duration_27mhz,
utc_start_unix: chunk.timing.utc_start_unix,
sync_status: chunk.timing.sync_status.clone(),
};
let encrypted = encrypt_stream_data(key_id, chunk.timing.chunk_index, &data, network_secret);
let meta = ObjectMeta {
created_unix_ms,
content_type: content_type.to_string(),
size_bytes: encrypted.ciphertext.len() as u64,
timing: Some(timing),
encryption: Some(ec_moq::EncryptionMeta {
alg: encrypted.alg.to_string(),
key_id: key_id.to_string(),
nonce_hex: hex::encode(encrypted.nonce),
}),
chunk_hash: Some(chunk_hash),
chunk_hash_alg: Some("blake3".to_string()),
chunk_proof,
chunk_proof_alg: Some("merkle+blake3".to_string()),
manifest_id: manifest_id.map(|value| value.to_string()),
};
Ok(ObjectPayload {
meta,
data: encrypted.ciphertext,
})
}
fn flush_epoch_publish(
publish_set: &mut MoqPublishSet,
object_track_name: &str,
manifest_track_name: &str,
publish_chunks: bool,
publish_manifests: bool,
epoch_buffer: &mut EpochBuffer,
stream_id_value: &StreamId,
chunk_ms: u64,
encoder_profile_id: &str,
source_kind: &str,
network_secret: Option<&[u8]>,
segment_content_type: &str,
key_id: &str,
object_sequence: &mut u64,
manifest_sequence: &mut u64,
announce_tx: Option<&tokio::sync::mpsc::UnboundedSender<ManifestSummary>>,
) -> Result<()> {
if epoch_buffer.is_empty() {
return Ok(());
}
let (chunks, datas, hashes) = epoch_buffer.take();
let start_index = chunks
.first()
.map(|chunk| chunk.timing.chunk_index)
.unwrap_or_else(|| epoch_buffer.start_index());
let created_unix_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let mut manifest_id = None;
if publish_manifests {
let manifest = build_manifest(
stream_id_value.clone(),
format!("epoch-{start_index}"),
chunk_ms,
start_index,
encoder_profile_id.to_string(),
created_unix_ms,
vec![StreamMetadata {
key: "source_kind".to_string(),
value: source_kind.to_string(),
}],
hashes.clone(),
)?;
manifest_id = Some(manifest.manifest_id.clone());
publish_set.publish_manifest(manifest_track_name, *manifest_sequence, &manifest)?;
*manifest_sequence += 1;
if let Some(tx) = announce_tx {
let _ = tx.send(manifest.summary());
}
}
// Compute per-chunk Merkle proofs so subscribers can validate membership
// even if future manifests omit the full chunk hash list.
let mut proofs = Vec::with_capacity(hashes.len());
for (offset, _) in hashes.iter().enumerate() {
proofs.push(merkle_proof_for_index(&hashes, offset)?);
}
if publish_chunks {
for (((chunk, data), hash), proof) in chunks.into_iter().zip(datas).zip(hashes).zip(proofs)
{
let Some(data) = data else {
return Err(anyhow!("missing chunk data for publish"));
};
let object = build_object(
chunk,
data,
hash,
Some(proof),
network_secret,
manifest_id.as_deref(),
segment_content_type,
key_id,
)?;
tracing::info!(
"publish segment chunk_index={} bytes={} content_type={}",
object
.meta
.timing
.as_ref()
.map(|t| t.chunk_index)
.unwrap_or(0),
object.data.len(),
object.meta.content_type
);
publish_set.publish_object(object_track_name, GroupId(*object_sequence), object)?;
*object_sequence += 1;
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_manifest_allowlist_splits_and_trims() {
let set = parse_manifest_allowlist(Some(" a,b ; c\t d ")).unwrap();
assert!(set.contains("a"));
assert!(set.contains("b"));
assert!(set.contains("c"));
assert!(set.contains("d"));
}
#[test]
fn deterministic_enabled_reads_env() {
let prev = std::env::var("EVERY_CHANNEL_DETERMINISTIC").ok();
std::env::set_var("EVERY_CHANNEL_DETERMINISTIC", "true");
assert!(deterministic_enabled(false));
std::env::set_var("EVERY_CHANNEL_DETERMINISTIC", "0");
assert!(!deterministic_enabled(false));
match prev {
Some(value) => std::env::set_var("EVERY_CHANNEL_DETERMINISTIC", value),
None => std::env::remove_var("EVERY_CHANNEL_DETERMINISTIC"),
}
}
#[test]
fn parse_network_secret_accepts_hex_and_rejects_invalid() {
let out = parse_network_secret(Some("00".repeat(8))).unwrap().unwrap();
assert_eq!(out.len(), 8);
assert!(parse_network_secret(Some("not-hex".to_string())).is_err());
}
fn build_valid_manifest(unsigned: bool) -> Manifest {
let chunk_hashes = vec![
blake3::hash(b"c0").to_hex().to_string(),
blake3::hash(b"c1").to_hex().to_string(),
];
let body = build_manifest_body_for_chunks(
StreamId("s".to_string()),
"epoch-1",
2000,
10,
"p",
1,
Vec::new(),
&chunk_hashes,
)
.unwrap();
let manifest_id = body.manifest_id().unwrap();
let signatures = if unsigned {
Vec::new()
} else {
let prev = std::env::var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY").ok();
std::env::set_var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY", "11".repeat(32));
let keypair = load_manifest_keypair_from_env().unwrap().unwrap();
let sig = sign_manifest_id(&manifest_id, &keypair);
match prev {
Some(value) => std::env::set_var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY", value),
None => std::env::remove_var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY"),
}
vec![sig]
};
Manifest {
body,
manifest_id,
signatures,
}
}
#[test]
fn validate_manifest_accepts_unsigned_only_with_hashes() {
let mut manifest = build_valid_manifest(true);
assert!(validate_manifest(&manifest, None));
// Remove hashes: unsigned should be rejected.
manifest.body.chunk_hashes.clear();
manifest.body.total_chunks = 0;
manifest.body.merkle_root = "00".repeat(32);
manifest.manifest_id = manifest.body.manifest_id().unwrap();
assert!(!validate_manifest(&manifest, None));
}
#[test]
fn validate_manifest_accepts_signed_and_obeys_allowlist() {
let manifest = build_valid_manifest(false);
assert!(validate_manifest(&manifest, None));
let signer = manifest.signatures[0].signer_id.clone();
let allow = HashSet::from([signer.clone()]);
assert!(validate_manifest(&manifest, Some(&allow)));
let deny = HashSet::from(["other".to_string()]);
assert!(!validate_manifest(&manifest, Some(&deny)));
}
#[test]
fn manifest_hash_for_chunk_indexes_into_hash_list() {
let manifest = build_valid_manifest(true);
let sid = manifest.body.stream_id.0.as_str();
assert!(manifest_hash_for_chunk(&manifest, sid, 9).is_none());
assert_eq!(
manifest_hash_for_chunk(&manifest, sid, 10).as_deref(),
Some(manifest.body.chunk_hashes[0].as_str())
);
assert_eq!(
manifest_hash_for_chunk(&manifest, sid, 11).as_deref(),
Some(manifest.body.chunk_hashes[1].as_str())
);
assert!(manifest_hash_for_chunk(&manifest, sid, 12).is_none());
}
}
async fn moq_publish(args: MoqPublishArgs) -> Result<()> {
fs::create_dir_all(&args.chunk_dir)
.with_context(|| format!("failed to create {}", args.chunk_dir.display()))?;
let deterministic = deterministic_enabled(args.deterministic);
let (source, _needs_transcode): (Box<dyn StreamSource + Send>, bool) = match args.source {
IngestSource::Hls { url, mut mode } => {
if deterministic {
mode = HlsMode::Transcode;
}
(Box::new(HlsSource { url, mode }), false)
}
IngestSource::Hdhr {
host,
device_id,
channel,
name,
prefer_mdns,
} => (
Box::new(HdhrSource {
host,
device_id,
channel,
name,
prefer_mdns,
}),
deterministic,
),
IngestSource::LinuxDvb {
adapter,
dvr,
tune_cmd,
tune_wait_ms,
} => (
Box::new(LinuxDvbSource {
adapter,
dvr,
tune_cmd,
tune_wait_ms,
}),
deterministic,
),
IngestSource::Ts { input } => (Box::new(TsSource { input }), deterministic),
};
let source_id = source.source_id();
let source_id_for_stream = source_id.clone();
let stream_id = args.stream_id.unwrap_or_else(|| {
StreamKey {
version: 1,
broadcast: None,
source: Some(source_id_for_stream),
profile: Some(format!("chunk-{}ms", args.chunk_ms)),
variant: None,
}
.to_stream_id()
.0
});
let broadcast_name = args.broadcast_name.unwrap_or_else(|| stream_id.clone());
let track_name = args.track_name.clone();
let secret = parse_iroh_secret(args.iroh_secret)?;
let discovery = parse_discovery(args.discovery.as_deref())?;
let node = MoqNode::bind_with_discovery(secret, discovery).await?;
// Wait briefly for direct addresses so "remote" can connect without needing discovery.
let mut endpoint_addr = node.endpoint_addr();
if endpoint_addr.addrs.is_empty() {
let mut watcher = node.endpoint().watch_addr();
let start = Instant::now();
while endpoint_addr.addrs.is_empty() && start.elapsed() < Duration::from_secs(3) {
tokio::time::sleep(Duration::from_millis(200)).await;
endpoint_addr = watcher.get();
}
}
println!("moq endpoint id: {}", node.endpoint().id());
if let Ok(addr_json) = serde_json::to_string(&endpoint_addr) {
println!("moq endpoint addr: {}", addr_json);
}
println!("moq broadcast: {}", broadcast_name);
println!("moq track: {}", track_name);
let publish_chunks = args.publish_chunks;
let ladder = args.cmaf_ladder;
let ladder_variants = ladder.map(cmaf_ladder_variants);
let mut object_tracks = Vec::new();
if let Some(variants) = ladder_variants.as_ref() {
if !publish_chunks {
return Err(anyhow!("--cmaf-ladder requires --publish-chunks true"));
}
for variant in variants {
object_tracks.push(format!("{}/{}", track_name, variant.id));
object_tracks.push(format!("{}/{}", args.init_track, variant.id));
}
} else {
object_tracks.push(track_name.clone());
if publish_chunks {
object_tracks.push(args.init_track.clone());
}
}
let mut manifest_tracks = Vec::new();
if args.publish_manifests {
manifest_tracks.push(args.manifest_track.clone());
}
let mut publish_set = node
.publish_track_set(&broadcast_name, object_tracks, manifest_tracks)
.await?;
if let Some(ms) = args.startup_delay_ms {
tokio::time::sleep(Duration::from_millis(ms)).await;
}
let network_secret = parse_network_secret(args.network_secret)?;
let track = TrackName {
namespace: "every.channel".to_string(),
name: stream_id.clone(),
};
let stream_id_value = StreamId(track.name.clone());
let source_kind = source_id.kind.clone();
let encoder_profile_id = "deterministic-h264-aac".to_string();
if let Some(variants) = ladder_variants {
if args.epoch_chunks != 1 {
return Err(anyhow!("--cmaf-ladder currently requires --epoch-chunks 1"));
}
if !args.publish_manifests {
return Err(anyhow!(
"--cmaf-ladder currently requires --publish-manifests"
));
}
#[derive(Debug)]
enum PendingPublish {
Object {
track: String,
group: u64,
object: ObjectPayload,
},
Manifest {
track: String,
sequence: u64,
manifest: Manifest,
},
}
let (tx, mut rx) = mpsc::channel::<PendingPublish>(16);
let chunk_ms = args.chunk_ms;
let max_chunks = args.max_chunks.unwrap_or(usize::MAX);
let out_dir = args.chunk_dir.join("cmaf-ladder");
let init_track_prefix = args.init_track.clone();
let chunk_track_prefix = track_name.clone();
let manifest_track = args.manifest_track.clone();
let publish_manifests = args.publish_manifests;
let source_kind = source_id.kind.clone();
let base_stream_id = stream_id.clone();
let network_secret_bytes = network_secret.clone();
let startup_delay_ms = args.startup_delay_ms;
let chunk_task = tokio::task::spawn_blocking(move || -> Result<()> {
let _ = fs::remove_dir_all(&out_dir);
fs::create_dir_all(&out_dir)
.with_context(|| format!("failed to create {}", out_dir.display()))?;
for variant in &variants {
fs::create_dir_all(out_dir.join(&variant.id))?;
}
let mut cmd = Command::new("ffmpeg");
cmd.current_dir(&out_dir);
cmd.arg("-hide_banner")
.arg("-loglevel")
.arg("error")
.arg("-nostdin")
.arg("-y")
.arg("-i")
.arg("pipe:0")
// Reduce opportunities for non-deterministic scheduling in filters/decoders.
.arg("-filter_threads")
.arg("1")
.arg("-filter_complex_threads")
.arg("1")
.arg("-threads")
.arg("1")
.arg("-map")
.arg("0:v:0")
.arg("-map")
.arg("0:a:0?")
.arg("-sn")
.arg("-dn")
.arg("-map_metadata")
.arg("-1");
// Build a split+scale filter graph for all variants.
// NOTE: We keep it simple: split video N ways, then scale each output.
let mut filter = String::new();
filter.push_str(&format!("[0:v]split={}", variants.len()));
for (i, _) in variants.iter().enumerate() {
filter.push_str(&format!("[v{i}]"));
}
filter.push(';');
for (i, variant) in variants.iter().enumerate() {
// Scale flags influence quality but should be deterministic.
filter.push_str(&format!(
"[v{i}]scale=w={}:h={}:flags=bicubic[v{i}o];",
variant.width, variant.height
));
}
cmd.arg("-filter_complex").arg(filter);
let seg_time = format!("{:.3}", chunk_ms as f64 / 1000.0);
for (i, variant) in variants.iter().enumerate() {
let v_bitrate = format!("{}k", variant.video_bitrate_kbps);
let bufsize = format!("{}k", variant.video_bitrate_kbps.saturating_mul(2));
let out_variant_dir = out_dir.join(&variant.id);
let seg_template = out_variant_dir.join("segment_%06d.m4s");
let seg_template = seg_template
.to_str()
.ok_or_else(|| anyhow!("invalid segment template path"))?
.to_string();
cmd.arg("-map")
.arg(format!("[v{i}o]"))
.arg("-map")
.arg("0:a:0?")
.arg("-c:v")
.arg("libx264")
// Force keyframes aligned to segment boundaries (chunk_ms).
.arg("-force_key_frames")
.arg(format!(
"expr:gte(t,n_forced*{:.3})",
chunk_ms as f64 / 1000.0
))
.arg("-b:v")
.arg(&v_bitrate)
.arg("-minrate")
.arg(&v_bitrate)
.arg("-maxrate")
.arg(&v_bitrate)
.arg("-bufsize")
.arg(&bufsize)
.arg("-c:a")
.arg("aac")
.arg("-b:a")
.arg("128k")
.arg("-ac")
.arg("2")
.arg("-ar")
.arg("48000")
.arg("-pix_fmt")
.arg("yuv420p")
.arg("-g")
.arg("60")
.arg("-keyint_min")
.arg("60")
.arg("-sc_threshold")
.arg("0")
.arg("-bf")
.arg("0")
.arg("-threads")
.arg("1")
.arg("-fflags")
.arg("+bitexact")
.arg("-flags:v")
.arg("+bitexact")
.arg("-flags:a")
.arg("+bitexact")
.arg("-f")
.arg("hls")
.arg("-hls_time")
.arg(&seg_time)
.arg("-hls_list_size")
.arg("0")
.arg("-hls_segment_type")
.arg("fmp4")
.arg("-hls_flags")
.arg("independent_segments")
.arg("-hls_fmp4_init_filename")
.arg("init.mp4")
.arg("-hls_segment_filename")
.arg(seg_template)
.arg(out_variant_dir.join("index.m3u8"));
}
cmd.stdin(Stdio::piped())
.stdout(Stdio::null())
.stderr(Stdio::inherit());
let mut child = cmd.spawn().with_context(|| "failed to spawn ffmpeg")?;
let mut stdin = child
.stdin
.take()
.ok_or_else(|| anyhow!("ffmpeg stdin unavailable"))?;
let mut reader = source.open_stream()?;
let writer = std::thread::spawn(move || -> Result<()> {
std::io::copy(&mut reader, &mut stdin)?;
Ok(())
});
// Optional startup delay to allow subscribers to connect.
if let Some(ms) = startup_delay_ms {
std::thread::sleep(Duration::from_millis(ms));
}
// Publish init per variant as soon as they exist.
for variant in &variants {
let init_path = out_dir.join(&variant.id).join("init.mp4");
wait_for_stable_file(&init_path, Duration::from_secs(20))?;
let data = fs::read(&init_path)?;
let hash = blake3::hash(&data).to_hex().to_string();
let key_id = format!(
"{}/init",
derive_variant_stream_id(&base_stream_id, &variant.id)
);
let object = build_object(
TsChunk {
index: 0,
path: init_path,
timing: ec_chopper::ChunkTiming {
chunk_index: 0,
chunk_start_27mhz: None,
chunk_duration_27mhz: 0,
utc_start_unix: None,
sync_status: "init".to_string(),
},
},
data,
hash,
None,
network_secret_bytes.as_deref(),
None,
"video/mp4",
&key_id,
)?;
tx.blocking_send(PendingPublish::Object {
track: format!("{}/{}", init_track_prefix, variant.id),
group: 0,
object,
})
.map_err(|_| anyhow!("publish channel closed"))?;
}
let mut manifest_seq: u64 = 0;
for index in 0..max_chunks {
// Wait for each variant's segment for this index.
let mut per_variant_segments = Vec::with_capacity(variants.len());
let mut per_variant_hashes = Vec::with_capacity(variants.len());
for variant in &variants {
let seg_path = out_dir
.join(&variant.id)
.join(format!("segment_{index:06}.m4s"));
wait_for_stable_file(&seg_path, Duration::from_secs(30))?;
let data = fs::read(&seg_path)?;
let hash = blake3::hash(&data).to_hex().to_string();
per_variant_segments.push((variant, seg_path, data, hash.clone()));
per_variant_hashes.push((variant, hash));
}
let created_unix_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let epoch_id = format!("epoch-{created_unix_ms}");
let manifest = if publish_manifests {
build_multi_variant_manifest(
StreamId(base_stream_id.clone()),
epoch_id,
chunk_ms,
index as u64,
encoder_profile_id.clone(),
created_unix_ms,
vec![StreamMetadata {
key: "source_kind".to_string(),
value: source_kind.clone(),
}],
&variants,
index as u64,
per_variant_hashes
.iter()
.map(|(v, h)| (v.id.clone(), h.clone()))
.collect(),
)?
} else {
// Still build an unsigned manifest for internal linkage if desired.
build_multi_variant_manifest(
StreamId(base_stream_id.clone()),
epoch_id,
chunk_ms,
index as u64,
encoder_profile_id.clone(),
created_unix_ms,
vec![StreamMetadata {
key: "source_kind".to_string(),
value: source_kind.clone(),
}],
&variants,
index as u64,
per_variant_hashes
.iter()
.map(|(v, h)| (v.id.clone(), h.clone()))
.collect(),
)?
};
tx.blocking_send(PendingPublish::Manifest {
track: manifest_track.clone(),
sequence: manifest_seq,
manifest: manifest.clone(),
})
.map_err(|_| anyhow!("publish channel closed"))?;
manifest_seq += 1;
// Publish segment objects for each variant, linked to the manifest.
for (variant, seg_path, data, hash) in per_variant_segments {
let key_id = derive_variant_stream_id(&base_stream_id, &variant.id);
let chunk = TsChunk {
index: index as u64,
path: seg_path,
timing: ec_chopper::ChunkTiming {
chunk_index: index as u64,
chunk_start_27mhz: None,
chunk_duration_27mhz: chunk_ms * 27_000,
utc_start_unix: None,
sync_status: "cmaf".to_string(),
},
};
let object = build_object(
chunk,
data,
hash,
None,
network_secret_bytes.as_deref(),
Some(&manifest.manifest_id),
"video/iso.segment",
&key_id,
)?;
tx.blocking_send(PendingPublish::Object {
track: format!("{}/{}", chunk_track_prefix, variant.id),
group: (index as u64) + 1,
object,
})
.map_err(|_| anyhow!("publish channel closed"))?;
}
}
let _ = child.kill();
let _ = child.wait();
let _ = writer.join();
Ok(())
});
while let Some(item) = rx.recv().await {
match item {
PendingPublish::Object {
track,
group,
object,
} => {
publish_set.publish_object(&track, GroupId(group), object)?;
}
PendingPublish::Manifest {
track,
sequence,
manifest,
} => {
publish_set.publish_manifest(&track, sequence, &manifest)?;
}
}
}
chunk_task
.await
.map_err(|err| anyhow!("chunk task join error: {err}"))??;
return Ok(());
}
let needs_init_track = publish_chunks;
let mut epoch_buffer = EpochBuffer::new(args.epoch_chunks);
// Some early MoQ implementations have surprising assumptions around group sequence ids.
// In CMAF mode we reserve group 0 for the init segment on the init track and start
// segment groups at 1 to avoid any potential cross-track collisions.
let mut object_sequence: u64 = if needs_init_track { 1 } else { 0 };
let mut manifest_sequence: u64 = 0;
let announce_tx = if args.announce {
Some(
spawn_catalog_announcer(
&node,
&track,
&broadcast_name,
&track_name,
args.gossip_peer.clone(),
)
.await?,
)
} else {
None
};
#[derive(Debug)]
enum PendingKind {
Init,
Segment,
}
#[derive(Debug)]
struct PendingChunk {
kind: PendingKind,
chunk: TsChunk,
data: Option<Vec<u8>>,
hash: String,
}
let segment_content_type = "video/iso.segment";
// Chunking is CPU and IO heavy and must not block the async runtime.
// We do ingest + chunking on a blocking thread and feed finalized chunks
// to the async publisher over a channel.
let (tx, mut rx) = mpsc::channel::<PendingChunk>(8);
let chunk_dir = args.chunk_dir.clone();
let chunk_ms = args.chunk_ms;
let max_chunks = args.max_chunks;
let chunk_task = tokio::task::spawn_blocking(move || -> Result<()> {
let out_dir = chunk_dir.join("cmaf");
let _ = fs::remove_dir_all(&out_dir);
fs::create_dir_all(&out_dir)
.with_context(|| format!("failed to create {}", out_dir.display()))?;
let profile = ec_chopper::deterministic_h264_profile();
let mut cmd = std::process::Command::new("ffmpeg");
cmd.current_dir(&out_dir);
cmd.arg("-hide_banner")
.arg("-loglevel")
.arg("error")
.arg("-nostdin")
.arg("-y")
.arg("-i")
.arg("pipe:0");
// Reduce non-determinism and "surprise streams" (data/subtitles, extra audio).
// We intentionally pick the first video stream and (optionally) the first audio stream.
cmd.arg("-map").arg("0:v:0");
cmd.arg("-map").arg("0:a:0?");
cmd.arg("-sn").arg("-dn");
cmd.arg("-map_metadata").arg("-1");
for arg in ec_chopper::ffmpeg_profile_args(&profile) {
cmd.arg(arg);
}
let seg_time = format!("{:.3}", chunk_ms as f64 / 1000.0);
let seg_template = "segment_%06d.m4s".to_string();
let init_filename = "init.mp4".to_string();
let playlist = "index.m3u8".to_string();
cmd.arg("-f")
.arg("hls")
.arg("-hls_time")
.arg(seg_time)
.arg("-hls_list_size")
.arg("0")
.arg("-hls_segment_type")
.arg("fmp4")
.arg("-hls_flags")
.arg("independent_segments")
.arg("-hls_fmp4_init_filename")
.arg(init_filename)
.arg("-hls_segment_filename")
.arg(seg_template)
.arg(playlist)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::inherit());
let mut child = cmd
.spawn()
.with_context(|| "failed to spawn ffmpeg".to_string())?;
let mut stdin = child
.stdin
.take()
.ok_or_else(|| anyhow!("ffmpeg stdin unavailable"))?;
let mut reader = source.open_stream()?;
let writer = std::thread::spawn(move || -> Result<()> {
std::io::copy(&mut reader, &mut stdin)?;
Ok(())
});
let init_path = out_dir.join("init.mp4");
if publish_chunks {
wait_for_stable_file(&init_path, Duration::from_secs(10))?;
let data = fs::read(&init_path)?;
let hash = blake3::hash(&data).to_hex().to_string();
let chunk = TsChunk {
index: 0,
path: init_path.clone(),
timing: ec_chopper::ChunkTiming {
chunk_index: 0,
chunk_start_27mhz: None,
chunk_duration_27mhz: 0,
utc_start_unix: None,
sync_status: "init".to_string(),
},
};
tx.blocking_send(PendingChunk {
kind: PendingKind::Init,
chunk,
data: Some(data),
hash,
})
.map_err(|_| anyhow!("publish channel closed"))?;
}
let limit = max_chunks.unwrap_or(usize::MAX);
for index in 0..limit {
let seg_path = out_dir.join(format!("segment_{index:06}.m4s"));
// If the segment never appears and ffmpeg exited, we are done (short input).
// If the segment never appears and ffmpeg is still running, treat as error.
match wait_for_stable_file(&seg_path, Duration::from_secs(20)) {
Ok(()) => {}
Err(err) => {
if let Ok(Some(status)) = child.try_wait() {
if status.success() {
break;
}
return Err(anyhow!("ffmpeg exited with {status} ({err:#})"));
}
return Err(err);
}
}
let data = if publish_chunks {
Some(fs::read(&seg_path)?)
} else {
None
};
let hash = if let Some(ref bytes) = data {
blake3::hash(bytes).to_hex().to_string()
} else {
ec_chopper::hash_file_blake3(&seg_path)?
};
let chunk = TsChunk {
index: index as u64,
path: seg_path.clone(),
timing: ec_chopper::ChunkTiming {
chunk_index: index as u64,
chunk_start_27mhz: None,
chunk_duration_27mhz: chunk_ms * 27_000,
utc_start_unix: None,
sync_status: "cmaf".to_string(),
},
};
tx.blocking_send(PendingChunk {
kind: PendingKind::Segment,
chunk,
data,
hash,
})
.map_err(|_| anyhow!("publish channel closed"))?;
}
let _ = child.kill();
let _ = child.wait();
let _ = writer.join();
Ok(())
});
while let Some(pending) = rx.recv().await {
match pending.kind {
PendingKind::Init => {
let Some(data) = pending.data else {
continue;
};
let key_id = format!("{}/init", track.name);
let object = build_object(
pending.chunk,
data,
pending.hash,
None,
network_secret.as_deref(),
None,
"video/mp4",
&key_id,
)?;
tracing::info!(
"publish init bytes={} track={}",
object.data.len(),
args.init_track
);
publish_set.publish_object(&args.init_track, GroupId(0), object)?;
}
PendingKind::Segment => {
epoch_buffer.push(pending.chunk, pending.data, pending.hash);
if epoch_buffer.is_full() {
flush_epoch_publish(
&mut publish_set,
&track_name,
&args.manifest_track,
publish_chunks,
args.publish_manifests,
&mut epoch_buffer,
&stream_id_value,
args.chunk_ms,
&encoder_profile_id,
&source_kind,
network_secret.as_deref(),
segment_content_type,
&track.name,
&mut object_sequence,
&mut manifest_sequence,
announce_tx.as_ref(),
)?;
}
}
}
}
// Ensure chunking completed successfully.
chunk_task
.await
.map_err(|err| anyhow!("chunk task join error: {err}"))??;
flush_epoch_publish(
&mut publish_set,
&track_name,
&args.manifest_track,
publish_chunks,
args.publish_manifests,
&mut epoch_buffer,
&stream_id_value,
args.chunk_ms,
&encoder_profile_id,
&source_kind,
network_secret.as_deref(),
segment_content_type,
&track.name,
&mut object_sequence,
&mut manifest_sequence,
announce_tx.as_ref(),
)?;
Ok(())
}
async fn moq_subscribe(args: MoqSubscribeArgs) -> Result<()> {
if args.require_manifest && !args.subscribe_manifests {
return Err(anyhow!("--require-manifest requires --subscribe-manifests"));
}
let secret = parse_iroh_secret(args.iroh_secret)?;
let discovery = parse_discovery(args.discovery.as_deref())?;
let node = MoqNode::bind_with_discovery(secret, discovery).await?;
let remote = ec_iroh::parse_endpoint_addr(&args.remote)?;
let remote_manifests = if let Some(value) = args.remote_manifests.as_deref() {
ec_iroh::parse_endpoint_addr(value)?
} else {
remote.clone()
};
let mut stream = node
.subscribe_objects(remote.clone(), &args.broadcast_name, &args.track_name)
.await?;
let manifest_allowlist = parse_manifest_allowlist(args.manifest_signers.as_deref());
let manifest_store = if args.subscribe_manifests || args.require_manifest {
let mut manifest_stream = node
.subscribe_manifests(remote_manifests, &args.broadcast_name, &args.manifest_track)
.await?;
let store = Arc::new(RwLock::new(HashMap::new()));
let store_clone = Arc::clone(&store);
let allowlist = manifest_allowlist.clone();
tokio::spawn(async move {
while let Some(manifest) = manifest_stream.recv().await {
if !validate_manifest(&manifest, allowlist.as_ref()) {
tracing::warn!("rejected manifest {}", manifest.manifest_id);
continue;
}
let manifest_id = manifest.manifest_id.clone();
store_clone.write().await.insert(manifest_id, manifest);
}
});
Some(store)
} else {
None
};
let network_secret = parse_network_secret(args.network_secret)?;
let mut hls =
HlsWriter::new_cmaf(&args.output_dir, args.chunk_ms as f64 / 1000.0, args.window)?;
let needs_init = args.subscribe_init;
let mut init_ready = !needs_init;
let mut buffered_segments: Vec<(u64, f64, Vec<u8>)> = Vec::new();
let mut init_rx = if needs_init {
let (tx, rx) = tokio::sync::oneshot::channel::<Result<Vec<u8>>>();
let mut init_stream = node
.subscribe_objects(remote.clone(), &args.broadcast_name, &args.init_track)
.await?;
let remote_str = args.remote.clone();
let init_track = args.init_track.clone();
let secret = network_secret.clone();
tokio::spawn(async move {
let deadline = Instant::now() + Duration::from_secs(60);
while Instant::now() < deadline {
let remaining = deadline.saturating_duration_since(Instant::now());
let recv =
tokio::time::timeout(remaining.min(Duration::from_secs(2)), init_stream.recv())
.await;
let Ok(Some(object)) = recv else { continue };
let init_index = object
.meta
.timing
.as_ref()
.map(|t| t.chunk_index)
.unwrap_or(0);
let data = if let Some(enc) = &object.meta.encryption {
if enc.alg != ENCRYPTION_ALG {
tracing::warn!("init: unsupported encryption {}", enc.alg);
continue;
}
tracing::info!(
"init: received encrypted object bytes={} key_id={} chunk_index={}",
object.data.len(),
enc.key_id,
init_index
);
match decrypt_stream_data(
&enc.key_id,
init_index,
&object.data,
secret.as_deref(),
) {
Some(plaintext) => plaintext,
None => {
tracing::warn!(
"init: decryption failed key_id={} chunk_index={}",
enc.key_id,
init_index
);
continue;
}
}
} else {
tracing::info!(
"init: received plaintext object bytes={} chunk_index={}",
object.data.len(),
init_index
);
object.data
};
let _ = tx.send(Ok(data));
return;
}
let _ = tx.send(Err(anyhow!(
"timed out waiting for CMAF init segment on track '{}' from {}",
init_track,
remote_str
)));
});
Some(rx)
} else {
None
};
let fallback = Duration::from_millis(args.chunk_ms);
let mut fallback_index = 0u64;
let mut invalid_chunks = 0u32;
let mut written_chunks = 0u64;
let mut quota = args.max_bytes_per_sec.map(|rate| {
let burst = args.max_bytes_burst.unwrap_or(rate.saturating_mul(2));
ec_iroh::TokenBucket::new(burst, rate)
});
loop {
tokio::select! {
biased;
init_res = async { if let Some(rx) = init_rx.as_mut() { Some(rx.await) } else { None } }, if init_rx.is_some() => {
let Some(init_res) = init_res else { continue };
let init = match init_res {
Ok(inner) => inner?,
Err(_) => return Err(anyhow!("init receiver task cancelled")),
};
if args.raw_cmaf {
fs::create_dir_all(&args.output_dir)?;
fs::write(args.output_dir.join("init.mp4"), &init)?;
} else {
let _ = hls.write_init_segment(&init)?;
}
init_ready = true;
init_rx = None;
// Flush any segments we buffered while waiting for init.
buffered_segments.sort_by_key(|(idx, _, _)| *idx);
for (idx, dur, bytes) in buffered_segments.drain(..) {
if args.raw_cmaf {
fs::create_dir_all(&args.output_dir)?;
fs::write(args.output_dir.join(format!("segment_{idx:06}.m4s")), &bytes)?;
} else {
let _ = hls.write_segment(idx, dur, &bytes)?;
}
written_chunks += 1;
if let Some(limit) = args.stop_after {
if written_chunks >= limit {
return Ok(());
}
}
}
continue;
}
object = stream.recv() => {
let Some(object) = object else { break };
if let Some(bucket) = quota.as_mut() {
if !bucket.allow(object.data.len() as u64) {
tracing::warn!("quota exceeded; dropping chunk");
continue;
}
}
let stream_id_for_manifest = object
.meta
.encryption
.as_ref()
.map(|enc| strip_init_suffix(enc.key_id.as_str()).to_string());
let index = object
.meta
.timing
.as_ref()
.map(|t| t.chunk_index)
.unwrap_or_else(|| {
let current = fallback_index;
fallback_index += 1;
current
});
let stream_id = args
.stream_id
.as_deref()
.or_else(|| object.meta.encryption.as_ref().map(|enc| enc.key_id.as_str()));
let data = if let Some(enc) = &object.meta.encryption {
if enc.alg != ENCRYPTION_ALG {
tracing::warn!("unsupported encryption {}", enc.alg);
continue;
}
let Some(stream_id) = stream_id else {
tracing::warn!("missing stream id for decryption");
continue;
};
match decrypt_stream_data(stream_id, index, &object.data, network_secret.as_deref()) {
Some(plaintext) => plaintext,
None => {
tracing::warn!("decryption failed for chunk {}", index);
continue;
}
}
} else {
object.data
};
if let Some(store) = manifest_store.as_ref() {
let manifest = {
let store = store.read().await;
if let Some(manifest_id) = object.meta.manifest_id.as_ref() {
store.get(manifest_id).cloned()
} else {
if let Some(stream_id) = stream_id_for_manifest.as_deref() {
find_manifest_for_stream_index(&store, stream_id, index)
} else {
None
}
}
};
if let Some(manifest) = manifest {
let expected = stream_id_for_manifest
.as_deref()
.and_then(|sid| manifest_hash_for_chunk(&manifest, sid, index));
if let Some(expected) = expected {
if let Some(meta_hash) = object.meta.chunk_hash.as_ref() {
if expected != *meta_hash {
tracing::warn!(
"manifest mismatch for chunk {} (expected {}, got {})",
index,
expected,
meta_hash
);
invalid_chunks += 1;
if invalid_chunks > args.max_invalid_chunks {
tracing::warn!("too many invalid chunks; closing");
break;
}
continue;
}
} else if args.require_manifest {
tracing::warn!("missing chunk hash for manifest");
invalid_chunks += 1;
if invalid_chunks > args.max_invalid_chunks {
tracing::warn!("too many invalid chunks; closing");
break;
}
continue;
}
} else {
// If the manifest doesn't include hash lists, fall back to Merkle proof.
if let (Some(meta_hash), Some(proof)) =
(object.meta.chunk_hash.as_ref(), object.meta.chunk_proof.as_ref())
{
let offset = (index - manifest.body.chunk_start_index) as usize;
if !verify_merkle_proof(
meta_hash,
offset,
proof,
&manifest.body.merkle_root,
) {
tracing::warn!("chunk {} proof invalid for manifest", index);
invalid_chunks += 1;
if invalid_chunks > args.max_invalid_chunks {
tracing::warn!("too many invalid chunks; closing");
break;
}
continue;
}
} else if args.require_manifest {
tracing::warn!("chunk {} outside manifest", index);
invalid_chunks += 1;
if invalid_chunks > args.max_invalid_chunks {
tracing::warn!("too many invalid chunks; closing");
break;
}
continue;
}
}
} else if args.require_manifest {
tracing::warn!("missing manifest covering chunk {}", index);
continue;
}
}
if let Some(expected) = object.meta.chunk_hash.as_ref() {
let actual = blake3::hash(&data).to_hex().to_string();
if &actual != expected {
tracing::warn!(
"chunk {} hash mismatch (expected {}, got {})",
index,
expected,
actual
);
invalid_chunks += 1;
if invalid_chunks > args.max_invalid_chunks {
tracing::warn!("too many invalid chunks; closing");
break;
}
continue;
}
}
let duration = chunk_duration_secs(&object.meta, fallback);
if !init_ready {
// Keep draining the MoQ track to avoid flow-control stalls while init is pending.
buffered_segments.push((index, duration, data));
} else {
if args.raw_cmaf {
fs::create_dir_all(&args.output_dir)?;
fs::write(args.output_dir.join(format!("segment_{index:06}.m4s")), &data)?;
} else {
let _ = hls.write_segment(index, duration, &data)?;
}
written_chunks += 1;
}
if let Some(limit) = args.stop_after {
if written_chunks >= limit {
break;
}
}
}
}
}
if needs_init && !init_ready {
return Err(anyhow!(
"stream ended before receiving CMAF init segment on track '{}' from {}",
args.init_track,
args.remote
));
}
Ok(())
}
async fn moq_selftest(args: MoqSelftestArgs) -> Result<()> {
let discovery = parse_discovery(args.discovery.as_deref())?;
let publisher_node = MoqNode::bind_with_discovery(None, discovery).await?;
let subscriber_node = MoqNode::bind_with_discovery(None, discovery).await?;
publisher_node.endpoint().online().await;
subscriber_node.endpoint().online().await;
let stream_id = args.stream_id.unwrap_or_else(|| {
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
format!("selftest-{ts}")
});
let broadcast_name = stream_id.clone();
let track_name = args.track_name.clone();
let mut publisher = publisher_node
.publish_objects(&broadcast_name, &track_name)
.await?;
let mut endpoint_addr = publisher_node.endpoint().addr();
if endpoint_addr.addrs.is_empty() {
let mut watcher = publisher_node.endpoint().watch_addr();
let start = Instant::now();
while endpoint_addr.addrs.is_empty() && start.elapsed() < Duration::from_secs(3) {
tokio::time::sleep(Duration::from_millis(200)).await;
endpoint_addr = watcher.get();
}
}
if endpoint_addr.addrs.is_empty() {
tracing::warn!(
"publisher endpoint has no direct addrs; selftest may rely on discovery/relays"
);
}
let mut stream = subscriber_node
.subscribe_objects(endpoint_addr, &broadcast_name, &track_name)
.await?;
let expected: Arc<Mutex<BTreeMap<u64, blake3::Hash>>> = Arc::new(Mutex::new(BTreeMap::new()));
let (done_tx, mut done_rx) = tokio::sync::watch::channel(0usize);
let (progress_tx, mut progress_rx) = tokio::sync::mpsc::channel::<u64>(8);
let subscriber_task = tokio::spawn(async move {
let mut received: BTreeMap<u64, blake3::Hash> = BTreeMap::new();
loop {
tokio::select! {
item = stream.recv() => {
match item {
Some(object) => {
let index = object
.meta
.timing
.as_ref()
.map(|timing| timing.chunk_index)
.unwrap_or(0);
let hash = blake3::hash(&object.data);
received.insert(index, hash);
let _ = progress_tx.send(index).await;
let expected_total = *done_rx.borrow();
if expected_total > 0 && received.len() >= expected_total {
break;
}
}
None => break,
}
}
_ = done_rx.changed() => {
let expected_total = *done_rx.borrow();
if expected_total > 0 && received.len() >= expected_total {
break;
}
}
}
}
Ok::<_, anyhow::Error>(received)
});
let chunk_dir = args.chunk_dir.clone();
let input = args.input.clone();
let max_chunks = args.max_chunks;
let track = TrackName {
namespace: "every.channel".to_string(),
name: stream_id.clone(),
};
let objects =
tokio::task::spawn_blocking(move || -> Result<Vec<(u64, ObjectPayload, blake3::Hash)>> {
let _ = fs::remove_dir_all(&chunk_dir);
fs::create_dir_all(&chunk_dir)
.with_context(|| format!("failed to create {}", chunk_dir.display()))?;
let reader: Box<dyn Read + Send> = if input.starts_with("http://")
|| input.starts_with("https://")
{
Box::new(ec_hdhomerun::open_stream_url(&input, None)?)
} else {
Box::new(File::open(&input).with_context(|| format!("failed to open {}", input))?)
};
let mut objects = Vec::new();
let out_dir = chunk_dir.join("cmaf");
let (init_path, segments) =
chunk_stream_cmaf_ffmpeg(reader, &out_dir, args.chunk_ms, max_chunks, true)?;
// Index 0: init.mp4
let (init_bytes, init_hash) = read_chunk_bytes_and_hash(&init_path)?;
let init_chunk = TsChunk {
index: 0,
path: init_path,
timing: ec_chopper::ChunkTiming {
chunk_index: 0,
chunk_start_27mhz: None,
chunk_duration_27mhz: 0,
utc_start_unix: None,
sync_status: "init".to_string(),
},
};
let object = build_object(
init_chunk,
init_bytes,
init_hash,
None,
None,
None,
"video/mp4",
&track.name,
)?;
let hash = blake3::hash(&object.data);
objects.push((0, object, hash));
// Index 1..N: segments
for (i, seg_path) in segments.into_iter().enumerate() {
let index = (i as u64) + 1;
let (bytes, hash_hex) = read_chunk_bytes_and_hash(&seg_path)?;
let chunk = TsChunk {
index,
path: seg_path,
timing: ec_chopper::ChunkTiming {
chunk_index: index,
chunk_start_27mhz: None,
chunk_duration_27mhz: args.chunk_ms * 27_000,
utc_start_unix: None,
sync_status: "cmaf".to_string(),
},
};
let object = build_object(
chunk,
bytes,
hash_hex,
None,
None,
None,
"video/iso.segment",
&track.name,
)?;
let hash = blake3::hash(&object.data);
objects.push((index, object, hash));
}
Ok(objects)
})
.await
.map_err(|err| anyhow!("chunking task failed: {err}"))??;
{
let mut map = expected.lock().expect("mutex poisoned");
for (index, _, hash) in &objects {
map.insert(*index, *hash);
}
let _ = done_tx.send(map.len());
}
tokio::time::sleep(Duration::from_millis(250)).await;
for (index, object, _) in objects {
publisher.publish_object(GroupId(index), object)?;
match tokio::time::timeout(Duration::from_secs(2), progress_rx.recv()).await {
Ok(Some(received)) if received == index => {}
Ok(Some(received)) => {
tracing::warn!("selftest received chunk {received} while waiting for {index}");
}
Ok(None) => {
return Err(anyhow!("selftest subscriber closed before chunk {index}"));
}
Err(_) => {
return Err(anyhow!("selftest timed out waiting for chunk {index}"));
}
}
}
let received = tokio::time::timeout(Duration::from_secs(20), subscriber_task)
.await
.map_err(|_| anyhow!("selftest timed out waiting for subscriber"))?
.map_err(|err| anyhow!("subscriber task failed: {err}"))??;
let expected_map = expected.lock().expect("mutex poisoned");
let mut mismatches = 0usize;
for (index, expected_hash) in expected_map.iter() {
match received.get(index) {
Some(actual) if actual == expected_hash => {}
Some(_) => {
mismatches += 1;
tracing::warn!("hash mismatch at chunk {index}");
}
None => {
mismatches += 1;
tracing::warn!("missing chunk {index}");
}
}
}
if received.len() > expected_map.len() {
mismatches += received.len() - expected_map.len();
tracing::warn!(
"received extra chunks ({})",
received.len() - expected_map.len()
);
}
if mismatches > 0 {
return Err(anyhow!("moq selftest failed with {mismatches} mismatches"));
}
println!(
"moq selftest ok: {} chunks verified (broadcast {}, track {})",
expected_map.len(),
broadcast_name,
track_name
);
Ok(())
}
#[derive(serde::Deserialize)]
struct TurnResp {
ice_servers: Vec<ICEServer>,
}
async fn fetch_turn_ice_servers(client: &reqwest::Client, dir: &str) -> Option<Vec<ICEServer>> {
let base = dir.trim_end_matches('/');
let url = format!("{base}/api/turn");
let res = client.get(url).send().await.ok()?;
if !res.status().is_success() {
return None;
}
let body: TurnResp = res.json().await.ok()?;
if body.ice_servers.is_empty() {
return None;
}
Some(body.ice_servers)
}
async fn direct_publish(args: DirectPublishArgs) -> Result<()> {
fs::create_dir_all(&args.chunk_dir)
.with_context(|| format!("failed to create {}", args.chunk_dir.display()))?;
// For browser interop, we currently always normalize into deterministic H.264/AAC CMAF.
// This also keeps the codec stable for MSE playback.
let deterministic = true;
let source: Box<dyn StreamSource + Send> = match args.source {
IngestSource::Hls { url, .. } => Box::new(HlsSource {
url,
mode: HlsMode::Transcode,
}),
IngestSource::Hdhr {
host,
device_id,
channel,
name,
prefer_mdns,
} => Box::new(HdhrSource {
host,
device_id,
channel,
name,
prefer_mdns,
}),
IngestSource::LinuxDvb {
adapter,
dvr,
tune_cmd,
tune_wait_ms,
} => Box::new(LinuxDvbSource {
adapter,
dvr,
tune_cmd,
tune_wait_ms,
}),
IngestSource::Ts { input } => Box::new(TsSource { input }),
};
fn normalize_base(url: &str) -> String {
url.trim_end_matches('/').to_string()
}
let directory_url = args.directory_url.as_deref().map(normalize_base);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(8))
.build()
.with_context(|| "failed to build http client")?;
let mut cfg = PeerConfiguration::default();
if let Some(dir) = directory_url.as_deref() {
if let Some(ice) = fetch_turn_ice_servers(&client, dir).await {
cfg.ice_servers = ice;
}
}
let stream_id = args.stream_id.clone().unwrap_or_else(|| {
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
format!("every.channel/direct/{ts}")
});
#[derive(serde::Serialize)]
struct AnnounceReq<'a> {
stream_id: &'a str,
title: &'a str,
offer: &'a str,
expires_ms: u64,
}
#[derive(serde::Deserialize)]
struct AnswerResp {
answer: String,
}
let title = args.title.clone();
let out_dir = args.chunk_dir.join("cmaf");
let chunk_ms = args.chunk_ms;
let max_segments = args.max_segments;
let answer_override = args.answer.clone();
loop {
// New offerer per session so viewers can reconnect (or a new viewer can take over).
let offerer = PeerConnectionBuilder::new()
.set_config(cfg.clone())
.with_channel_options(vec![(
"simple_channel_".to_string(),
DataChannelOptions {
ordered: Some(true),
..Default::default()
},
)])
.map_err(|e| anyhow!("{e:#}"))?
.build()
.await
.map_err(|e| anyhow!("{e:#}"))?;
let offer_desc = offerer
.get_local_description()
.await
.ok_or_else(|| anyhow!("missing local offer description"))?;
let offer_candidates = offerer
.collect_ice_candidates()
.await
.map_err(|e| anyhow!("{e:#}"))?;
let offer_link = encode_direct_link(&DirectCodeV1 {
v: 1,
desc: offer_desc,
candidates: offer_candidates,
label: Some("every.channel0".to_string()),
})?;
println!("{offer_link}");
let stop_refresh = tokio::sync::watch::channel(false);
if let Some(dir) = directory_url.as_deref() {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let req = AnnounceReq {
stream_id: &stream_id,
title: &title,
offer: &offer_link,
expires_ms: now.saturating_add(args.announce_ttl_ms),
};
let url = format!("{dir}/api/announce");
let res = client.post(url).json(&req).send().await?;
if !res.status().is_success() {
tracing::warn!("directory announce failed: {}", res.status());
} else {
tracing::info!("announced offer for {}", stream_id);
}
// Best-effort refresh while waiting for an answer (stops when the session starts).
let mut stop_rx = stop_refresh.1.clone();
let client2 = client.clone();
let dir2 = dir.to_string();
let stream_id2 = stream_id.clone();
let title2 = title.clone();
let offer2 = offer_link.clone();
let ttl = args.announce_ttl_ms;
tokio::spawn(async move {
loop {
if *stop_rx.borrow() {
break;
}
tokio::time::sleep(Duration::from_millis(ttl.saturating_mul(3) / 4)).await;
if *stop_rx.borrow() {
break;
}
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let req = AnnounceReq {
stream_id: &stream_id2,
title: &title2,
offer: &offer2,
expires_ms: now.saturating_add(ttl),
};
let url = format!("{dir2}/api/announce");
let fut = client2.post(url).json(&req).send();
let _ = tokio::time::timeout(Duration::from_secs(5), fut).await;
}
});
}
let answer = if let Some(answer) = answer_override.clone() {
answer
} else if let Some(dir) = directory_url.as_deref() {
eprintln!("waiting for browser answer via {dir}/api/answer?stream_id=...");
let url = format!(
"{dir}/api/answer?stream_id={}",
urlencoding::encode(&stream_id)
);
let deadline = if args.answer_timeout_secs == 0 {
None
} else {
Some(Instant::now() + Duration::from_secs(args.answer_timeout_secs))
};
loop {
if deadline.is_some_and(|d| Instant::now() > d) {
return Err(anyhow!(
"timed out waiting for answer for stream_id {stream_id}"
));
}
match client.get(&url).send().await {
Ok(res) if res.status().is_success() => {
let body: AnswerResp = res.json().await?;
break body.answer;
}
_ => {
tokio::time::sleep(Duration::from_millis(300)).await;
}
}
}
} else {
eprintln!("paste direct answer link/code, then press enter:");
let mut line = String::new();
std::io::stdin().read_line(&mut line)?;
line
};
// Stop refreshing announcements. If we keep the listing live while connected, people
// will try to join and silently fail (this direct path is 1:1 today).
let _ = stop_refresh.0.send(true);
let answer = decode_direct_link(&answer)?;
offerer
.set_remote_description(answer.desc)
.await
.map_err(|e| anyhow!("{e:#}"))?;
offerer
.add_ice_candidates(answer.candidates)
.await
.map_err(|e| anyhow!("{e:#}"))?;
let channel = offerer
.receive_channel()
.await
.map_err(|e| anyhow!("{e:#}"))?;
channel.wait_ready().await;
eprintln!("direct channel open: {}", channel.label());
// Detect viewer disconnect promptly. Relying only on `channel.send` errors can lag
// (depending on buffering), leaving the directory entry effectively "stuck".
let (pc_dead_tx, mut pc_dead_rx) = tokio::sync::oneshot::channel::<PeerConnectionState>();
let (pc_stop_tx, mut pc_stop_rx) = tokio::sync::watch::channel(false);
tokio::spawn(async move {
loop {
tokio::select! {
_ = pc_stop_rx.changed() => {
if *pc_stop_rx.borrow() {
break;
}
}
st = offerer.state_change() => {
if matches!(
st,
PeerConnectionState::Disconnected | PeerConnectionState::Failed | PeerConnectionState::Closed
) {
let _ = pc_dead_tx.send(st);
break;
}
}
}
}
});
// Expire the listing quickly now that we have a live session.
if let Some(dir) = directory_url.as_deref() {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let req = AnnounceReq {
stream_id: &stream_id,
title: &title,
offer: &offer_link,
expires_ms: now.saturating_add(1500),
};
let url = format!("{dir}/api/announce");
let _ = client.post(url).json(&req).send().await;
}
enum ChunkItem {
Init(Vec<u8>),
Segment { index: u64, bytes: Vec<u8> },
}
let (tx, mut rx) = mpsc::channel::<ChunkItem>(8);
let reader = source.open_stream()?;
let out_dir2 = out_dir.clone();
let chunk_task = tokio::task::spawn_blocking(move || -> Result<()> {
let _ = fs::remove_dir_all(&out_dir2);
fs::create_dir_all(&out_dir2)
.with_context(|| format!("failed to create {}", out_dir2.display()))?;
let profile = if deterministic {
ec_chopper::deterministic_h264_profile()
} else {
ec_chopper::deterministic_h264_profile()
};
let mut cmd = Command::new("ffmpeg");
cmd.current_dir(&out_dir2);
cmd.arg("-hide_banner")
.arg("-loglevel")
.arg("error")
.arg("-nostdin")
.arg("-y")
.arg("-i")
.arg("pipe:0")
// predictable mapping
.arg("-map")
.arg("0:v:0")
.arg("-map")
.arg("0:a:0?")
.arg("-sn")
.arg("-dn")
.arg("-map_metadata")
.arg("-1")
// reduce scheduling nondeterminism
.arg("-filter_threads")
.arg("1")
.arg("-filter_complex_threads")
.arg("1")
.arg("-threads")
.arg("1");
for arg in ec_chopper::ffmpeg_profile_args(&profile) {
cmd.arg(arg);
}
let seg_time = format!("{:.3}", chunk_ms as f64 / 1000.0);
cmd.arg("-f")
.arg("hls")
.arg("-hls_time")
.arg(seg_time)
.arg("-hls_list_size")
.arg("0")
.arg("-hls_segment_type")
.arg("fmp4")
.arg("-hls_flags")
.arg("independent_segments")
.arg("-hls_fmp4_init_filename")
.arg("init.mp4")
.arg("-hls_segment_filename")
.arg("segment_%06d.m4s")
.arg("index.m3u8")
.stdin(Stdio::piped())
.stdout(Stdio::null())
.stderr(Stdio::inherit());
let mut child = cmd.spawn().with_context(|| "failed to spawn ffmpeg")?;
let mut stdin = child
.stdin
.take()
.ok_or_else(|| anyhow!("ffmpeg stdin unavailable"))?;
let mut reader = reader;
let writer = std::thread::spawn(move || -> Result<()> {
std::io::copy(&mut reader, &mut stdin)?;
Ok(())
});
let init_path = out_dir2.join("init.mp4");
wait_for_stable_file(&init_path, Duration::from_secs(20))?;
let init = fs::read(&init_path)?;
tx.blocking_send(ChunkItem::Init(init))
.map_err(|_| anyhow!("receiver closed"))?;
for i in 0..max_segments {
let seg_path = out_dir2.join(format!("segment_{i:06}.m4s"));
match wait_for_stable_file(&seg_path, Duration::from_secs(30)) {
Ok(()) => {}
Err(err) => {
if let Ok(Some(status)) = child.try_wait() {
if status.success() {
break;
}
return Err(anyhow!("ffmpeg exited with {status} ({err:#})"));
}
return Err(err);
}
}
let bytes = fs::read(&seg_path)?;
tx.blocking_send(ChunkItem::Segment {
index: (i as u64) + 1,
bytes,
})
.map_err(|_| anyhow!("receiver closed"))?;
}
let _ = child.kill();
let _ = child.wait();
let _ = writer.join();
Ok(())
});
let mut send_failed = false;
let mut have_heartbeat = false;
let mut last_heartbeat = Instant::now();
let mut heartbeat_check = tokio::time::interval(Duration::from_secs(1));
heartbeat_check.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
biased;
st = (&mut pc_dead_rx) => {
match st {
Ok(st) => tracing::info!("peer connection ended ({st:?}); restarting session"),
Err(_) => tracing::info!("peer connection ended; restarting session"),
}
send_failed = true;
break;
}
_ = heartbeat_check.tick() => {
if have_heartbeat && last_heartbeat.elapsed() > DIRECT_HEARTBEAT_TIMEOUT {
tracing::info!("direct session heartbeat timed out; restarting session");
send_failed = true;
break;
}
}
msg = channel.receive() => {
match msg {
Ok(b) => {
// Any message from the subscriber counts as a heartbeat today.
// (We reserve DIRECT_WIRE_TAG_PING for explicit pings, but don't require it.)
have_heartbeat = true;
last_heartbeat = Instant::now();
if !b.is_empty() && b[0] == DIRECT_WIRE_TAG_PING {
// ignore
}
}
Err(_) => {
send_failed = true;
break;
}
}
}
item = rx.recv() => {
let Some(item) = item else { break };
let (index, content_type, bytes, duration_27mhz, sync_status) = match item {
ChunkItem::Init(bytes) => (0u64, "video/mp4", bytes, 0u64, "init"),
ChunkItem::Segment { index, bytes } => {
(index, "video/iso.segment", bytes, chunk_ms * 27_000, "cmaf")
}
};
if index == 0 {
tracing::info!("direct send: init.mp4 ({} bytes)", bytes.len());
} else if index % 5 == 0 {
tracing::info!("direct send: segment {index} ({} bytes)", bytes.len());
}
let hash = blake3::hash(&bytes).to_hex().to_string();
let meta = ObjectMeta {
created_unix_ms: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64,
content_type: content_type.to_string(),
size_bytes: bytes.len() as u64,
timing: Some(TimingMeta {
chunk_index: index,
chunk_start_27mhz: 0,
chunk_duration_27mhz: duration_27mhz,
utc_start_unix: None,
sync_status: sync_status.to_string(),
}),
encryption: None,
chunk_hash: Some(hash),
chunk_hash_alg: Some("blake3".to_string()),
chunk_proof: None,
chunk_proof_alg: None,
manifest_id: None,
};
let frame = encode_object_frame(&meta, &bytes)?;
match tokio::time::timeout(
Duration::from_secs(5),
direct_wire_send_frame(&channel, &frame),
)
.await
{
Ok(Ok(())) => {}
Ok(Err(err)) => {
tracing::warn!("direct send failed (restarting session): {err:#}");
send_failed = true;
break;
}
Err(_) => {
tracing::warn!("direct send timed out (restarting session)");
send_failed = true;
break;
}
}
}
}
}
// Ensure the peer connection is dropped if we restart for non-ICE reasons (e.g. ffmpeg end).
let _ = pc_stop_tx.send(true);
drop(rx);
match chunk_task.await {
Ok(Ok(())) => {}
Ok(Err(err)) => {
// Common when the viewer disconnects: the receiver is dropped and the
// blocking sender errors out. Treat as a reconnect.
tracing::debug!("chunk task ended: {err:#}");
}
Err(err) => tracing::debug!("chunk task join error: {err}"),
}
if answer_override.is_some() {
// For manual mode, we can't reasonably loop.
break;
}
if !send_failed {
// ffmpeg ended or source ended; try again.
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
Ok(())
}
async fn direct_wire_send_frame(channel: &impl DataChannelExt, frame: &[u8]) -> Result<()> {
let len = u32::try_from(frame.len()).map_err(|_| anyhow!("frame too large"))?;
let mut stream = Vec::with_capacity(4 + frame.len());
stream.extend_from_slice(&len.to_be_bytes());
stream.extend_from_slice(frame);
for chunk in stream.chunks(DIRECT_WIRE_CHUNK_BYTES) {
let mut msg = Vec::with_capacity(1 + chunk.len());
msg.push(DIRECT_WIRE_TAG_STREAM);
msg.extend_from_slice(chunk);
channel
.send(&bytes::Bytes::from(msg))
.await
.map_err(|e| anyhow!("{e:#}"))?;
}
Ok(())
}
#[derive(Debug, Default)]
struct DirectWireDecoder {
buf: Vec<u8>,
pos: usize,
want: Option<usize>,
}
impl DirectWireDecoder {
fn push(&mut self, msg: &[u8]) -> Result<Vec<Vec<u8>>> {
if msg.is_empty() {
return Ok(Vec::new());
}
match msg[0] {
DIRECT_WIRE_TAG_FRAME => return Ok(vec![msg[1..].to_vec()]),
DIRECT_WIRE_TAG_STREAM => {
self.buf.extend_from_slice(&msg[1..]);
}
DIRECT_WIRE_TAG_PING => {
// Control message; ignore.
return Ok(Vec::new());
}
_ => {
// Unknown tag: treat as legacy "whole frame per message".
return Ok(vec![msg.to_vec()]);
}
}
let mut out = Vec::new();
loop {
if self.want.is_none() {
if self.buf.len().saturating_sub(self.pos) < 4 {
break;
}
let start = self.pos;
let meta = &self.buf[start..start + 4];
let len = u32::from_be_bytes([meta[0], meta[1], meta[2], meta[3]]) as usize;
self.pos += 4;
self.want = Some(len);
}
let Some(want) = self.want else { break };
if self.buf.len().saturating_sub(self.pos) < want {
break;
}
let start = self.pos;
let end = start + want;
out.push(self.buf[start..end].to_vec());
self.pos = end;
self.want = None;
// Avoid unbounded growth: occasionally compact the buffer.
if self.pos > 64 * 1024 {
self.buf.drain(0..self.pos);
self.pos = 0;
}
}
Ok(out)
}
}
async fn direct_subscribe(args: DirectSubscribeArgs) -> Result<()> {
fs::create_dir_all(&args.out_dir)
.with_context(|| format!("failed to create {}", args.out_dir.display()))?;
#[derive(serde::Deserialize)]
struct DirectoryResp {
entries: Vec<DirectoryEntry>,
}
#[derive(serde::Deserialize)]
struct DirectoryEntry {
stream_id: String,
title: String,
offer: String,
#[allow(dead_code)]
expires_ms: Option<u64>,
}
fn normalize_base(url: &str) -> String {
url.trim_end_matches('/').to_string()
}
let dir = normalize_base(&args.directory_url);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(8))
.build()
.with_context(|| "failed to build http client")?;
let (stream_id, offer_link) = match (args.stream_id.clone(), args.offer.clone()) {
(_, Some(offer)) => (args.stream_id.clone(), offer),
(Some(stream_id), None) => {
let url = format!("{dir}/api/directory");
let res = client.get(&url).send().await?;
if !res.status().is_success() {
return Err(anyhow!("directory GET failed: {}", res.status()));
}
let body: DirectoryResp = res.json().await?;
let entry = body
.entries
.into_iter()
.find(|e| e.stream_id == stream_id)
.ok_or_else(|| anyhow!("stream_id not found in directory: {stream_id}"))?;
(Some(stream_id), entry.offer)
}
(None, None) => {
return Err(anyhow!("either --offer or --stream-id is required"));
}
};
let offer = decode_direct_link(&offer_link)?;
let mut cfg = PeerConfiguration::default();
if let Some(ice) = fetch_turn_ice_servers(&client, &dir).await {
cfg.ice_servers = ice;
}
let pc = PeerConnectionBuilder::new()
.set_config(cfg)
.with_remote_offer(Some(offer.desc))
.map_err(|e| anyhow!("{e:#}"))?
.build()
.await
.map_err(|e| anyhow!("{e:#}"))?;
pc.add_ice_candidates(offer.candidates)
.await
.map_err(|e| anyhow!("{e:#}"))?;
// Build our answer link (used both for directory POST and manual copy/paste).
let desc = pc
.get_local_description()
.await
.ok_or_else(|| anyhow!("missing local answer description"))?;
let candidates = pc
.collect_ice_candidates()
.await
.map_err(|e| anyhow!("{e:#}"))?;
let answer_link = encode_direct_link(&DirectCodeV1 {
v: 1,
desc,
candidates,
label: Some("every.channel0".to_string()),
})?;
if let Some(stream_id) = stream_id.as_deref() {
#[derive(serde::Serialize)]
struct AnswerReq<'a> {
stream_id: &'a str,
answer: &'a str,
}
let url = format!("{dir}/api/answer");
let res = client
.post(url)
.json(&AnswerReq {
stream_id,
answer: &answer_link,
})
.send()
.await?;
if !res.status().is_success() {
return Err(anyhow!(
"directory POST /api/answer failed: {}",
res.status()
));
}
} else {
eprintln!("answer link (paste into publisher):\n{answer_link}");
}
let ch = pc.receive_channel().await.map_err(|e| anyhow!("{e:#}"))?;
ch.wait_ready().await;
eprintln!("direct channel open: {}", ch.label());
let cmaf_dir = args.out_dir.join("cmaf");
fs::create_dir_all(&cmaf_dir)
.with_context(|| format!("failed to create {}", cmaf_dir.display()))?;
let mut init_written = false;
let mut durations = Vec::<f64>::new();
let mut captured_segments = 0usize;
let mut decoder = DirectWireDecoder::default();
let mut ping = tokio::time::interval(Duration::from_secs(1));
ping.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let deadline = args
.duration_secs
.map(|s| Instant::now() + Duration::from_secs(s));
while captured_segments < args.max_segments {
if deadline.is_some_and(|d| Instant::now() > d) {
break;
}
let msg = tokio::select! {
_ = ping.tick() => {
// Heartbeat to help the publisher detect disconnects promptly.
let _ = ch.send(&bytes::Bytes::from(vec![DIRECT_WIRE_TAG_PING])).await;
continue;
}
msg = ch.receive() => msg.map_err(|e| anyhow!("{e:#}"))?,
};
for frame in decoder.push(&msg)? {
let payload = decode_object_frame(&frame)?;
let idx = payload
.meta
.timing
.as_ref()
.map(|t| t.chunk_index)
.unwrap_or(0);
if idx == 0 {
tracing::info!("direct recv: init.mp4 ({} bytes)", payload.data.len());
let path = cmaf_dir.join("init.mp4");
fs::write(&path, &payload.data)
.with_context(|| format!("failed to write {}", path.display()))?;
init_written = true;
continue;
}
if !init_written {
// Ignore segments until init arrives.
continue;
}
// Publisher indexes segments starting at 1; our filenames start at 0.
let seg = idx.saturating_sub(1);
let path = cmaf_dir.join(format!("segment_{seg:06}.m4s"));
fs::write(&path, &payload.data)
.with_context(|| format!("failed to write {}", path.display()))?;
tracing::info!("direct recv: segment {idx} ({} bytes)", payload.data.len());
let dur = payload
.meta
.timing
.as_ref()
.map(|t| t.chunk_duration_27mhz as f64 / 27_000_000.0)
.unwrap_or(2.0);
durations.push(dur);
captured_segments += 1;
if captured_segments >= args.max_segments {
break;
}
}
}
if !init_written || durations.is_empty() {
return Err(anyhow!("no media captured (missing init or segments)"));
}
// Write a VOD playlist so ffmpeg can remux the fragments.
let target = durations
.iter()
.copied()
.fold(0.0_f64, f64::max)
.ceil()
.max(1.0) as u64;
let mut m3u8 = String::new();
m3u8.push_str("#EXTM3U\n");
m3u8.push_str("#EXT-X-VERSION:7\n");
m3u8.push_str(&format!("#EXT-X-TARGETDURATION:{target}\n"));
m3u8.push_str("#EXT-X-PLAYLIST-TYPE:VOD\n");
m3u8.push_str("#EXT-X-INDEPENDENT-SEGMENTS\n");
m3u8.push_str("#EXT-X-MAP:URI=\"init.mp4\"\n");
for (i, dur) in durations.iter().enumerate() {
m3u8.push_str(&format!("#EXTINF:{:.3},\n", dur));
m3u8.push_str(&format!("segment_{i:06}.m4s\n"));
}
m3u8.push_str("#EXT-X-ENDLIST\n");
let playlist_path = cmaf_dir.join("index.m3u8");
fs::write(&playlist_path, m3u8)
.with_context(|| format!("failed to write {}", playlist_path.display()))?;
let mp4_path = args
.mp4
.clone()
.unwrap_or_else(|| args.out_dir.join("capture.mp4"));
let status = Command::new("ffmpeg")
.arg("-hide_banner")
.arg("-loglevel")
.arg("error")
.arg("-nostdin")
.arg("-y")
.arg("-protocol_whitelist")
.arg("file,crypto,data")
.arg("-i")
.arg(&playlist_path)
.arg("-c")
.arg("copy")
.arg(&mp4_path)
.status();
match status {
Ok(s) if s.success() => {
println!("{}", mp4_path.display());
}
Ok(s) => {
eprintln!("ffmpeg remux failed: {s}");
println!("{}", playlist_path.display());
}
Err(err) => {
eprintln!("failed to run ffmpeg: {err}");
println!("{}", playlist_path.display());
}
}
Ok(())
}
fn ws_url_for(base: &str, stream_id: &str, role: &str) -> String {
let base = base.trim_end_matches('/');
let ws_base = if let Some(rest) = base.strip_prefix("https://") {
format!("wss://{rest}")
} else if let Some(rest) = base.strip_prefix("http://") {
format!("ws://{rest}")
} else if base.starts_with("ws://") || base.starts_with("wss://") {
base.to_string()
} else {
format!("wss://{base}")
};
format!(
"{ws_base}/api/stream/ws?stream_id={}&role={role}",
urlencoding::encode(stream_id)
)
}
async fn ws_send_frame(
ws: &mut tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
frame: &[u8],
) -> Result<()> {
let len = u32::try_from(frame.len()).map_err(|_| anyhow!("frame too large"))?;
let mut stream = Vec::with_capacity(4 + frame.len());
stream.extend_from_slice(&len.to_be_bytes());
stream.extend_from_slice(frame);
for chunk in stream.chunks(DIRECT_WIRE_CHUNK_BYTES) {
let mut msg = Vec::with_capacity(1 + chunk.len());
msg.push(DIRECT_WIRE_TAG_STREAM);
msg.extend_from_slice(chunk);
ws.send(WsMessage::Binary(msg)).await?;
}
Ok(())
}
async fn ws_publish(args: WsPublishArgs) -> Result<()> {
fs::create_dir_all(&args.chunk_dir)
.with_context(|| format!("failed to create {}", args.chunk_dir.display()))?;
// For browser interop, we currently always normalize into deterministic H.264/AAC CMAF.
let deterministic = true;
let source: Box<dyn StreamSource + Send> = match args.source {
IngestSource::Hls { url, .. } => Box::new(HlsSource {
url,
mode: HlsMode::Transcode,
}),
IngestSource::Hdhr {
host,
device_id,
channel,
name,
prefer_mdns,
} => Box::new(HdhrSource {
host,
device_id,
channel,
name,
prefer_mdns,
}),
IngestSource::LinuxDvb {
adapter,
dvr,
tune_cmd,
tune_wait_ms,
} => Box::new(LinuxDvbSource {
adapter,
dvr,
tune_cmd,
tune_wait_ms,
}),
IngestSource::Ts { input } => Box::new(TsSource { input }),
};
fn normalize_base(url: &str) -> String {
url.trim_end_matches('/').to_string()
}
let directory_url = normalize_base(&args.directory_url);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(8))
.build()
.with_context(|| "failed to build http client")?;
let stream_id = args.stream_id.clone().unwrap_or_else(|| {
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
format!("every.channel/ws/{ts}")
});
let title = args.title.clone();
#[derive(serde::Serialize)]
struct AnnounceReq<'a> {
stream_id: &'a str,
title: &'a str,
offer: &'a str,
expires_ms: u64,
}
// Directory listing is still "offer"-shaped today; keep a non-empty placeholder for legacy clients.
let offer_placeholder = format!("every.channel://watch?stream_id={}", stream_id);
// Refresh listing forever while publishing (one-to-many relay supports multiple viewers).
let ttl = args.announce_ttl_ms;
let client2 = client.clone();
let dir2 = directory_url.clone();
let stream_id2 = stream_id.clone();
let title2 = title.clone();
let offer2 = offer_placeholder.clone();
tokio::spawn(async move {
loop {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let req = AnnounceReq {
stream_id: &stream_id2,
title: &title2,
offer: &offer2,
expires_ms: now.saturating_add(ttl),
};
let url = format!("{dir2}/api/announce");
let _ = tokio::time::timeout(Duration::from_secs(5), client2.post(url).json(&req).send()).await;
tokio::time::sleep(Duration::from_millis(ttl.saturating_mul(3) / 4)).await;
}
});
// Connect to relay websocket as publisher.
let ws_url = ws_url_for(&directory_url, &stream_id, "pub");
eprintln!("ws publish: {stream_id}");
eprintln!("ws url: {ws_url}");
let (mut ws, _resp) = tokio_tungstenite::connect_async(&ws_url).await?;
enum ChunkItem {
Init(Vec<u8>),
Segment { index: u64, bytes: Vec<u8> },
}
let out_dir = args.chunk_dir.join("cmaf");
let chunk_ms = args.chunk_ms;
let max_segments = args.max_segments;
let (tx, mut rx) = mpsc::channel::<ChunkItem>(8);
let reader = source.open_stream()?;
let out_dir2 = out_dir.clone();
let chunk_task = tokio::task::spawn_blocking(move || -> Result<()> {
let _ = fs::remove_dir_all(&out_dir2);
fs::create_dir_all(&out_dir2)
.with_context(|| format!("failed to create {}", out_dir2.display()))?;
let profile = if deterministic {
ec_chopper::deterministic_h264_profile()
} else {
ec_chopper::deterministic_h264_profile()
};
let mut cmd = Command::new("ffmpeg");
cmd.current_dir(&out_dir2);
cmd.arg("-hide_banner")
.arg("-loglevel")
.arg("error")
.arg("-nostdin")
.arg("-y")
.arg("-i")
.arg("pipe:0")
// predictable mapping
.arg("-map")
.arg("0:v:0")
.arg("-map")
.arg("0:a:0?")
.arg("-sn")
.arg("-dn")
.arg("-map_metadata")
.arg("-1")
// reduce scheduling nondeterminism
.arg("-filter_threads")
.arg("1")
.arg("-filter_complex_threads")
.arg("1")
.arg("-threads")
.arg("1");
for arg in ec_chopper::ffmpeg_profile_args(&profile) {
cmd.arg(arg);
}
let seg_time = format!("{:.3}", chunk_ms as f64 / 1000.0);
cmd.arg("-f")
.arg("hls")
.arg("-hls_time")
.arg(seg_time)
.arg("-hls_list_size")
.arg("0")
.arg("-hls_segment_type")
.arg("fmp4")
.arg("-hls_flags")
.arg("independent_segments")
.arg("-hls_fmp4_init_filename")
.arg("init.mp4")
.arg("-hls_segment_filename")
.arg("segment_%06d.m4s")
.arg("index.m3u8")
.stdin(Stdio::piped())
.stdout(Stdio::null())
.stderr(Stdio::inherit());
let mut child = cmd.spawn().with_context(|| "failed to spawn ffmpeg")?;
let mut stdin = child
.stdin
.take()
.ok_or_else(|| anyhow!("ffmpeg stdin unavailable"))?;
let mut reader = reader;
let writer = std::thread::spawn(move || -> Result<()> {
std::io::copy(&mut reader, &mut stdin)?;
Ok(())
});
let init_path = out_dir2.join("init.mp4");
wait_for_stable_file(&init_path, Duration::from_secs(20))?;
let init = fs::read(&init_path)?;
tx.blocking_send(ChunkItem::Init(init))
.map_err(|_| anyhow!("receiver closed"))?;
for i in 0..max_segments {
let seg_path = out_dir2.join(format!("segment_{i:06}.m4s"));
match wait_for_stable_file(&seg_path, Duration::from_secs(30)) {
Ok(()) => {}
Err(err) => {
if let Ok(Some(status)) = child.try_wait() {
if status.success() {
break;
}
return Err(anyhow!("ffmpeg exited with {status} ({err:#})"));
}
return Err(err);
}
}
let bytes = fs::read(&seg_path)?;
tx.blocking_send(ChunkItem::Segment {
index: (i as u64) + 1,
bytes,
})
.map_err(|_| anyhow!("receiver closed"))?;
}
let _ = child.kill();
let _ = child.wait();
let _ = writer.join();
Ok(())
});
while let Some(item) = rx.recv().await {
let (index, content_type, bytes, duration_27mhz, sync_status) = match item {
ChunkItem::Init(bytes) => (0u64, "video/mp4", bytes, 0u64, "init"),
ChunkItem::Segment { index, bytes } => {
(index, "video/iso.segment", bytes, chunk_ms * 27_000, "cmaf")
}
};
if index == 0 {
tracing::info!("ws send: init.mp4 ({} bytes)", bytes.len());
} else if index % 10 == 0 {
tracing::info!("ws send: segment {index} ({} bytes)", bytes.len());
}
let hash = blake3::hash(&bytes).to_hex().to_string();
let meta = ObjectMeta {
created_unix_ms: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64,
content_type: content_type.to_string(),
size_bytes: bytes.len() as u64,
timing: Some(TimingMeta {
chunk_index: index,
chunk_start_27mhz: 0,
chunk_duration_27mhz: duration_27mhz,
utc_start_unix: None,
sync_status: sync_status.to_string(),
}),
encryption: None,
chunk_hash: Some(hash),
chunk_hash_alg: Some("blake3".to_string()),
chunk_proof: None,
chunk_proof_alg: None,
manifest_id: None,
};
let frame = encode_object_frame(&meta, &bytes)?;
tokio::time::timeout(Duration::from_secs(5), ws_send_frame(&mut ws, &frame)).await??;
}
match chunk_task.await {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Err(err) => Err(anyhow!("chunk task join error: {err}")),
}
}
async fn ws_subscribe(args: WsSubscribeArgs) -> Result<()> {
fs::create_dir_all(&args.out_dir)
.with_context(|| format!("failed to create {}", args.out_dir.display()))?;
let ws_url = ws_url_for(&args.directory_url, &args.stream_id, "sub");
eprintln!("ws subscribe: {} ({ws_url})", args.stream_id);
let (mut ws, _resp) = tokio_tungstenite::connect_async(&ws_url).await?;
let cmaf_dir = args.out_dir.join("cmaf");
fs::create_dir_all(&cmaf_dir)
.with_context(|| format!("failed to create {}", cmaf_dir.display()))?;
let mut init_written = false;
let mut durations = Vec::<f64>::new();
let mut captured_segments = 0usize;
let mut decoder = DirectWireDecoder::default();
let deadline = args
.duration_secs
.map(|s| Instant::now() + Duration::from_secs(s));
while captured_segments < args.max_segments {
if deadline.is_some_and(|d| Instant::now() > d) {
break;
}
let next = ws.next().await.ok_or_else(|| anyhow!("websocket closed"))??;
let bytes = match next {
WsMessage::Binary(b) => b,
WsMessage::Close(_) => break,
_ => continue,
};
for frame in decoder.push(&bytes)? {
let payload = decode_object_frame(&frame)?;
let idx = payload
.meta
.timing
.as_ref()
.map(|t| t.chunk_index)
.unwrap_or(0);
if idx == 0 {
tracing::info!("ws recv: init.mp4 ({} bytes)", payload.data.len());
let path = cmaf_dir.join("init.mp4");
fs::write(&path, &payload.data)
.with_context(|| format!("failed to write {}", path.display()))?;
init_written = true;
continue;
}
if !init_written {
continue;
}
let seg = idx.saturating_sub(1);
let path = cmaf_dir.join(format!("segment_{seg:06}.m4s"));
fs::write(&path, &payload.data)
.with_context(|| format!("failed to write {}", path.display()))?;
if idx % 10 == 0 {
tracing::info!("ws recv: segment {idx} ({} bytes)", payload.data.len());
}
let dur = payload
.meta
.timing
.as_ref()
.map(|t| t.chunk_duration_27mhz as f64 / 27_000_000.0)
.unwrap_or(2.0);
durations.push(dur);
captured_segments += 1;
if captured_segments >= args.max_segments {
break;
}
}
}
if !init_written || durations.is_empty() {
return Err(anyhow!("no media captured (missing init or segments)"));
}
let target = durations
.iter()
.copied()
.fold(0.0_f64, f64::max)
.ceil()
.max(1.0) as u64;
let mut m3u8 = String::new();
m3u8.push_str("#EXTM3U\n");
m3u8.push_str("#EXT-X-VERSION:7\n");
m3u8.push_str(&format!("#EXT-X-TARGETDURATION:{target}\n"));
m3u8.push_str("#EXT-X-PLAYLIST-TYPE:VOD\n");
m3u8.push_str("#EXT-X-INDEPENDENT-SEGMENTS\n");
m3u8.push_str("#EXT-X-MAP:URI=\"init.mp4\"\n");
for (i, dur) in durations.iter().enumerate() {
m3u8.push_str(&format!("#EXTINF:{:.3},\n", dur));
m3u8.push_str(&format!("segment_{i:06}.m4s\n"));
}
m3u8.push_str("#EXT-X-ENDLIST\n");
let playlist_path = cmaf_dir.join("index.m3u8");
fs::write(&playlist_path, m3u8)
.with_context(|| format!("failed to write {}", playlist_path.display()))?;
let mp4_path = args
.mp4
.clone()
.unwrap_or_else(|| args.out_dir.join("capture.mp4"));
let status = Command::new("ffmpeg")
.arg("-hide_banner")
.arg("-loglevel")
.arg("error")
.arg("-nostdin")
.arg("-y")
.arg("-protocol_whitelist")
.arg("file,crypto,data")
.arg("-i")
.arg(&playlist_path)
.arg("-c")
.arg("copy")
.arg(&mp4_path)
.status();
match status {
Ok(s) if s.success() => {
println!("{}", mp4_path.display());
}
Ok(s) => {
eprintln!("ffmpeg remux failed: {s}");
println!("{}", playlist_path.display());
}
Err(err) => {
eprintln!("failed to run ffmpeg: {err}");
println!("{}", playlist_path.display());
}
}
Ok(())
}
fn parse_iroh_secret(value: Option<String>) -> Result<Option<iroh::SecretKey>> {
let value = value.or_else(|| std::env::var("IROH_SECRET").ok());
let Some(value) = value else { return Ok(None) };
let secret =
iroh::SecretKey::from_str(&value).with_context(|| "failed to parse IROH_SECRET")?;
Ok(Some(secret))
}
fn parse_discovery(value: Option<&str>) -> Result<DiscoveryConfig> {
match value {
Some(value) => DiscoveryConfig::from_list(value),
None => DiscoveryConfig::from_env(),
}
}
async fn spawn_catalog_announcer(
node: &MoqNode,
track: &TrackName,
broadcast_name: &str,
track_name: &str,
peers: Vec<String>,
) -> Result<tokio::sync::mpsc::UnboundedSender<ManifestSummary>> {
let endpoint = serde_json::to_string(&node.endpoint_addr())
.unwrap_or_else(|_| node.endpoint().id().to_string());
let track = track.clone();
let broadcast_name = broadcast_name.to_string();
let track_name = track_name.to_string();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ManifestSummary>();
let endpoint_clone = endpoint.clone();
let node_endpoint = node.endpoint().clone();
tokio::spawn(async move {
let mut gossip = match ec_iroh::CatalogGossip::join(node_endpoint, &peers).await {
Ok(gossip) => gossip,
Err(err) => {
tracing::warn!("catalog gossip join failed: {err:#}");
return;
}
};
let entry = build_catalog_entry(
endpoint_clone.clone(),
&track,
&broadcast_name,
&track_name,
None,
);
if let Err(err) = gossip.announce(entry).await {
tracing::warn!("catalog announce failed: {err:#}");
}
while let Some(summary) = rx.recv().await {
let entry = build_catalog_entry(
endpoint_clone.clone(),
&track,
&broadcast_name,
&track_name,
Some(summary),
);
if let Err(err) = gossip.announce(entry).await {
tracing::warn!("catalog update failed: {err:#}");
}
}
});
Ok(tx)
}
fn build_catalog_entry(
endpoint: String,
track: &TrackName,
broadcast_name: &str,
track_name: &str,
manifest: Option<ManifestSummary>,
) -> StreamCatalogEntry {
let stream = StreamDescriptor {
id: StreamId(track.name.clone()),
title: track.name.clone(),
number: None,
source: "moq".to_string(),
metadata: vec![StreamMetadata {
key: "broadcast".to_string(),
value: broadcast_name.to_string(),
}],
};
let encryption = StreamEncryptionInfo {
alg: ENCRYPTION_ALG.to_string(),
key_id: track.name.clone(),
nonce_scheme: "stream-id+chunk-index".to_string(),
};
let moq = MoqStreamDescriptor {
endpoint,
broadcast_name: broadcast_name.to_string(),
track_name: track_name.to_string(),
encryption: Some(encryption),
};
StreamCatalogEntry {
stream,
moq: Some(moq),
manifest,
updated_unix_ms: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64,
}
}
fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> {
let start = Instant::now();
let mut last_len: Option<u64> = None;
let mut stable_ms: u64 = 0;
// We want "write is done" behavior. File size staying constant for a few polls
// is a pragmatic signal that ffmpeg is done producing the segment.
while start.elapsed() < timeout {
if let Ok(meta) = fs::metadata(path) {
let len = meta.len();
if len > 0 {
if Some(len) == last_len {
stable_ms += 100;
if stable_ms >= 300 {
return Ok(());
}
} else {
last_len = Some(len);
stable_ms = 0;
}
}
}
std::thread::sleep(Duration::from_millis(100));
}
Err(anyhow!(
"timed out waiting for stable file {} after {:?}",
path.display(),
timeout
))
}
async fn wt_publish(args: WtPublishArgs) -> Result<()> {
let relay_url =
Url::parse(&args.url).with_context(|| format!("invalid relay url: {}", args.url))?;
// Create a local origin + broadcast, then pass an OriginConsumer into the client so it can
// publish announcements to the relay.
let origin = moq_lite::Origin::produce();
let publish = origin.consume();
let mut broadcast = origin
.create_broadcast(&args.name)
.ok_or_else(|| anyhow!("failed to create broadcast: {}", args.name))?;
// Ensure the catalog track is present in the broadcast so subscribers can discover tracks.
let mut catalog = hang::CatalogProducer::default();
broadcast.insert_track(catalog.track.clone());
#[derive(Clone)]
struct ProtocolOverride<S> {
inner: S,
protocol: Option<String>,
}
impl<S: web_transport_trait::Session> web_transport_trait::Session for ProtocolOverride<S> {
type SendStream = S::SendStream;
type RecvStream = S::RecvStream;
type Error = S::Error;
fn accept_uni(
&self,
) -> impl Future<Output = Result<Self::RecvStream, Self::Error>> + web_transport_trait::MaybeSend
{
self.inner.accept_uni()
}
fn accept_bi(
&self,
) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + web_transport_trait::MaybeSend
{
self.inner.accept_bi()
}
fn open_bi(
&self,
) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>> + web_transport_trait::MaybeSend
{
self.inner.open_bi()
}
fn open_uni(
&self,
) -> impl Future<Output = Result<Self::SendStream, Self::Error>> + web_transport_trait::MaybeSend
{
self.inner.open_uni()
}
fn send_datagram(&self, payload: bytes::Bytes) -> Result<(), Self::Error> {
self.inner.send_datagram(payload)
}
fn recv_datagram(
&self,
) -> impl Future<Output = Result<bytes::Bytes, Self::Error>> + web_transport_trait::MaybeSend
{
self.inner.recv_datagram()
}
fn max_datagram_size(&self) -> usize {
self.inner.max_datagram_size()
}
fn protocol(&self) -> Option<&str> {
self.protocol.as_deref().or_else(|| self.inner.protocol())
}
fn close(&self, code: u32, reason: &str) {
self.inner.close(code, reason)
}
fn closed(&self) -> impl Future<Output = Self::Error> + web_transport_trait::MaybeSend {
self.inner.closed()
}
}
async fn connect_moq_session(
relay_url: &Url,
publish: moq_lite::OriginConsumer,
tls_disable_verify: bool,
) -> Result<moq_lite::Session> {
let host = relay_url
.host_str()
.ok_or_else(|| anyhow!("relay url missing host: {relay_url}"))?
.to_string();
let port = relay_url.port().unwrap_or(443);
// Build TLS config.
let mut roots = rustls::RootCertStore::empty();
let native = rustls_native_certs::load_native_certs();
if !native.errors.is_empty() {
tracing::warn!(
errors = ?native.errors,
"some native root certs could not be loaded"
);
}
for cert in native.certs {
let _ = roots.add(cert);
}
let mut tls = rustls::ClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
if tls_disable_verify {
// Mirror moq-native's behavior: accept any certificate, but still verify signatures.
#[derive(Debug)]
struct NoCertificateVerification(Arc<rustls::crypto::CryptoProvider>);
impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
fn verify_server_cert(
&self,
_end_entity: &rustls::pki_types::CertificateDer<'_>,
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
_server_name: &rustls::pki_types::ServerName<'_>,
_ocsp: &[u8],
_now: rustls::pki_types::UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
Ok(rustls::client::danger::ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
rustls::crypto::verify_tls12_signature(
message,
cert,
dss,
&self.0.signature_verification_algorithms,
)
}
fn verify_tls13_signature(
&self,
message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
rustls::crypto::verify_tls13_signature(
message,
cert,
dss,
&self.0.signature_verification_algorithms,
)
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
self.0.signature_verification_algorithms.supported_schemes()
}
}
let provider = rustls::crypto::CryptoProvider::get_default().cloned().unwrap_or_else(|| {
Arc::new(rustls::crypto::ring::default_provider())
});
tls.dangerous()
.set_certificate_verifier(Arc::new(NoCertificateVerification(provider)));
}
// WebTransport over HTTP/3.
tls.alpn_protocols = vec![web_transport_quinn::ALPN.as_bytes().to_vec()];
// Build a Quinn endpoint.
let socket = std::net::UdpSocket::bind("[::]:0")
.context("failed to bind UDP socket")?;
let mut transport = quinn::TransportConfig::default();
transport.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap()));
transport.keep_alive_interval(Some(Duration::from_secs(4)));
transport.mtu_discovery_config(None);
let transport = Arc::new(transport);
let runtime = quinn::default_runtime().context("no async runtime")?;
let endpoint_config = quinn::EndpointConfig::default();
let endpoint = quinn::Endpoint::new(endpoint_config, None, socket, runtime)
.context("failed to create QUIC endpoint")?;
// Resolve relay.
let ip = tokio::net::lookup_host((host.clone(), port))
.await
.context("failed DNS lookup")?
.next()
.context("no DNS entries")?;
let quic: quinn::crypto::rustls::QuicClientConfig = tls.try_into()?;
let mut client_cfg = quinn::ClientConfig::new(Arc::new(quic));
client_cfg.transport_config(transport);
tracing::debug!(%ip, %host, %relay_url, "connecting QUIC");
let connection = endpoint
.connect_with(client_cfg, ip, &host)?
.await
.context("failed QUIC connect")?;
// Establish a WebTransport session.
let mut request = web_transport_quinn::proto::ConnectRequest::new(relay_url.clone());
for alpn in moq_lite::ALPNS {
request = request.with_protocol(alpn.to_string());
}
let wt = web_transport_quinn::Session::connect(connection, request)
.await
.context("failed WebTransport CONNECT")?;
// Establish a MoQ session. Cloudflare's relay currently does not always include a selected
// subprotocol in the CONNECT response, so we attempt a few protocol overrides to select
// the correct IETF draft encoding for SETUP.
let client = moq_lite::Client::new().with_publish(publish);
// These correspond to IETF draft ALPNs as used by moq-lite/web code.
// We use string literals here since moq-lite does not currently expose these constants.
let attempts: [&str; 4] = ["moqt-16", "moqt-15", "moq-00", ""];
let mut last_err: Option<anyhow::Error> = None;
for p in attempts {
let session = ProtocolOverride {
inner: wt.clone(),
protocol: (!p.is_empty()).then(|| p.to_string()),
};
match client.connect(session).await {
Ok(session) => {
tracing::info!(protocol = %p, "connected to relay");
return Ok(session);
}
Err(err) => {
last_err = Some(anyhow::Error::new(err));
tracing::debug!(protocol = %p, err = %last_err.as_ref().unwrap(), "MoQ SETUP failed; retrying");
}
}
}
Err(last_err.unwrap_or_else(|| anyhow!("failed to connect")))
.context("failed MoQ SETUP")
}
tracing::info!(url=%relay_url, name=%args.name, "connecting to relay");
let session = connect_moq_session(&relay_url, publish, args.tls_disable_verify).await?;
// Spawn ffmpeg to generate fMP4 suitable for hang/moq-mux.
let mut cmd = TokioCommand::new("ffmpeg");
cmd.arg("-hide_banner")
.arg("-loglevel")
.arg("error")
.arg("-nostats")
.arg("-fflags")
.arg("+nobuffer")
.arg("-flags")
.arg("low_delay")
.arg("-i")
.arg(&args.input);
if args.transcode {
cmd.args([
"-c:v",
"libx264",
"-preset",
"veryfast",
"-tune",
"zerolatency",
"-pix_fmt",
"yuv420p",
"-profile:v",
"main",
"-g",
"48",
"-keyint_min",
"48",
"-sc_threshold",
"0",
"-threads",
"1",
"-c:a",
"aac",
"-b:a",
"128k",
"-ac",
"2",
"-ar",
"48000",
]);
} else {
cmd.args(["-c", "copy"]);
}
cmd.args([
"-f",
"mp4",
"-movflags",
"empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset",
"pipe:1",
]);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::inherit());
tracing::info!(input=%args.input, "spawning ffmpeg");
let mut child = cmd.spawn().context("failed to spawn ffmpeg")?;
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow!("ffmpeg stdout unavailable"))?;
let config = moq_mux::import::Fmp4Config {
passthrough: args.passthrough,
};
let mut importer = moq_mux::import::Fmp4::new(broadcast, catalog, config);
let mut stdout = stdout;
let mut decode_fut = importer.decode_from(&mut stdout);
tokio::pin!(decode_fut);
tracing::info!("publishing fMP4 -> moq-mux -> relay");
tokio::select! {
res = &mut decode_fut => {
let status = child.wait().await.context("failed to wait for ffmpeg")?;
match res {
Ok(()) if status.success() => Ok(()),
Ok(()) => Err(anyhow!("ffmpeg exited with {status}")),
Err(err) => Err(err).context("fmp4 ingest failed"),
}
}
_ = session.closed() => {
let _ = child.kill().await;
Err(anyhow!("relay session closed"))
}
_ = tokio::signal::ctrl_c() => {
tracing::info!("ctrl-c; shutting down");
let _ = child.kill().await;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
}
}