308 lines
9.5 KiB
Rust
308 lines
9.5 KiB
Rust
use std::io::{BufRead, BufReader};
|
|
use std::path::Path;
|
|
use std::process::{Command, Stdio};
|
|
use std::time::{Duration, Instant};
|
|
|
|
fn ec_node_path() -> std::path::PathBuf {
|
|
if let Ok(value) = std::env::var("EC_NODE_BIN") {
|
|
return value.into();
|
|
}
|
|
if let Ok(value) = std::env::var("CARGO_BIN_EXE_ec_node") {
|
|
return value.into();
|
|
}
|
|
if let Ok(value) = std::env::var("CARGO_BIN_EXE_ec-node") {
|
|
return value.into();
|
|
}
|
|
let exe = std::env::current_exe().expect("current_exe");
|
|
let debug_dir = exe
|
|
.parent()
|
|
.and_then(|p| p.parent())
|
|
.expect("expected target/debug/deps");
|
|
debug_dir.join("ec-node")
|
|
}
|
|
|
|
fn wait_for_line_prefix(
|
|
lines: &mut dyn Iterator<Item = std::io::Result<String>>,
|
|
prefix: &str,
|
|
timeout: Duration,
|
|
) -> Option<String> {
|
|
let deadline = Instant::now() + timeout;
|
|
while Instant::now() < deadline {
|
|
match lines.next() {
|
|
Some(Ok(line)) => {
|
|
if let Some(rest) = line.strip_prefix(prefix) {
|
|
return Some(rest.trim().to_string());
|
|
}
|
|
}
|
|
Some(Err(_)) => continue,
|
|
None => break,
|
|
}
|
|
}
|
|
None
|
|
}
|
|
|
|
fn blake3_hex(path: &Path) -> anyhow::Result<String> {
|
|
let bytes = std::fs::read(path)?;
|
|
Ok(blake3::hash(&bytes).to_hex().to_string())
|
|
}
|
|
|
|
fn concat_init_and_segment(init: &Path, seg: &Path, out: &Path) -> anyhow::Result<()> {
|
|
let init_bytes = std::fs::read(init)?;
|
|
let seg_bytes = std::fs::read(seg)?;
|
|
let mut bytes = Vec::with_capacity(init_bytes.len() + seg_bytes.len());
|
|
bytes.extend_from_slice(&init_bytes);
|
|
bytes.extend_from_slice(&seg_bytes);
|
|
std::fs::write(out, bytes)?;
|
|
Ok(())
|
|
}
|
|
|
|
fn first_video_frame_keyframe_flag(mp4: &Path) -> anyhow::Result<u32> {
|
|
if Command::new("ffprobe")
|
|
.arg("-version")
|
|
.stdout(Stdio::null())
|
|
.stderr(Stdio::null())
|
|
.status()
|
|
.is_err()
|
|
{
|
|
// Cross-OS environments might not have ffprobe installed; treat as skip.
|
|
return Ok(1);
|
|
}
|
|
// Read only the first decoded frame record. For fMP4 this works reliably if we concat init+seg.
|
|
let out = Command::new("ffprobe")
|
|
.arg("-v")
|
|
.arg("error")
|
|
.arg("-select_streams")
|
|
.arg("v:0")
|
|
.arg("-show_frames")
|
|
.arg("-read_intervals")
|
|
.arg("%+#1")
|
|
.arg("-show_entries")
|
|
.arg("frame=key_frame")
|
|
.arg("-of")
|
|
.arg("csv=p=0")
|
|
.arg(mp4)
|
|
.output()?;
|
|
if !out.status.success() {
|
|
anyhow::bail!("ffprobe failed: {}", String::from_utf8_lossy(&out.stderr));
|
|
}
|
|
let s = String::from_utf8_lossy(&out.stdout);
|
|
let first = s.lines().next().unwrap_or("").trim();
|
|
// Some ffprobe builds may append extra columns (e.g. side data) even with restricted
|
|
// `-show_entries`. We only care about the first token.
|
|
let token = first.split(',').next().unwrap_or("").trim();
|
|
let flag: u32 = token
|
|
.parse()
|
|
.map_err(|_| anyhow::anyhow!("unexpected ffprobe output: {first:?}"))?;
|
|
Ok(flag)
|
|
}
|
|
|
|
fn write_deterministic_ts(out_path: &Path) -> anyhow::Result<()> {
|
|
// Deterministic synthetic A/V source: 30fps CFR with a fixed sine audio tone.
|
|
// Output: MPEG-TS, constrained to a stable keyframe cadence (g=60 -> 2s GOP).
|
|
let status = Command::new("ffmpeg")
|
|
.arg("-hide_banner")
|
|
.arg("-loglevel")
|
|
.arg("error")
|
|
.arg("-nostdin")
|
|
.arg("-y")
|
|
.arg("-f")
|
|
.arg("lavfi")
|
|
.arg("-i")
|
|
.arg("testsrc2=size=1280x720:rate=30")
|
|
.arg("-f")
|
|
.arg("lavfi")
|
|
.arg("-i")
|
|
.arg("sine=frequency=1000:sample_rate=48000")
|
|
.arg("-t")
|
|
.arg("10")
|
|
.arg("-map")
|
|
.arg("0:v:0")
|
|
.arg("-map")
|
|
.arg("1:a:0")
|
|
.arg("-c:v")
|
|
.arg("libx264")
|
|
.arg("-pix_fmt")
|
|
.arg("yuv420p")
|
|
.arg("-g")
|
|
.arg("60")
|
|
.arg("-keyint_min")
|
|
.arg("60")
|
|
.arg("-sc_threshold")
|
|
.arg("0")
|
|
.arg("-bf")
|
|
.arg("0")
|
|
.arg("-threads")
|
|
.arg("1")
|
|
.arg("-fflags")
|
|
.arg("+bitexact")
|
|
.arg("-flags:v")
|
|
.arg("+bitexact")
|
|
.arg("-c:a")
|
|
.arg("aac")
|
|
.arg("-b:a")
|
|
.arg("128k")
|
|
.arg("-ac")
|
|
.arg("2")
|
|
.arg("-ar")
|
|
.arg("48000")
|
|
.arg("-flags:a")
|
|
.arg("+bitexact")
|
|
.arg("-f")
|
|
.arg("mpegts")
|
|
.arg(out_path)
|
|
.status()?;
|
|
if !status.success() {
|
|
anyhow::bail!("ffmpeg synthetic TS generation failed with {status}");
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn run_ladder(ec_node: &Path, input_ts: &Path, out_dir: &Path) -> anyhow::Result<()> {
|
|
let signing_key = "11".repeat(32);
|
|
let network_secret = "22".repeat(32);
|
|
let stream_id = "every.channel/determinism/cmaf-ladder";
|
|
let broadcast_name = "every.channel/determinism/cmaf-ladder";
|
|
|
|
let mut cmd = Command::new(ec_node);
|
|
cmd.env("EVERY_CHANNEL_MANIFEST_SIGNING_KEY", &signing_key)
|
|
.arg("moq-publish")
|
|
.arg("--publish-manifests")
|
|
.arg("--encode")
|
|
.arg("cmaf")
|
|
.arg("--cmaf-ladder")
|
|
.arg("hd3")
|
|
.arg("--epoch-chunks")
|
|
.arg("1")
|
|
.arg("--max-chunks")
|
|
.arg("3")
|
|
.arg("--chunk-ms")
|
|
.arg("2000")
|
|
.arg("--stream-id")
|
|
.arg(stream_id)
|
|
.arg("--broadcast-name")
|
|
.arg(broadcast_name)
|
|
.arg("--track-name")
|
|
.arg("chunks")
|
|
.arg("--init-track")
|
|
.arg("init")
|
|
.arg("--network-secret")
|
|
.arg(&network_secret)
|
|
.arg("--chunk-dir")
|
|
.arg(out_dir)
|
|
.arg("--startup-delay-ms")
|
|
.arg("0")
|
|
.arg("ts")
|
|
.arg(input_ts)
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::inherit());
|
|
|
|
// This will run until --max-chunks is reached, then exit.
|
|
let mut child = cmd.spawn()?;
|
|
let stdout = child.stdout.take().expect("publisher stdout missing");
|
|
let mut lines = BufReader::new(stdout).lines();
|
|
let _remote = wait_for_line_prefix(&mut lines, "moq endpoint addr: ", Duration::from_secs(10))
|
|
.ok_or_else(|| anyhow::anyhow!("publisher did not print endpoint addr"))?;
|
|
|
|
let status = child.wait()?;
|
|
if !status.success() {
|
|
anyhow::bail!("publisher failed: {status}");
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
#[ignore]
|
|
fn deterministic_cmaf_ladder_outputs_match_across_runs() {
|
|
let ec_node = ec_node_path();
|
|
|
|
let ts = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.unwrap_or_default()
|
|
.as_millis();
|
|
let tmp = std::env::temp_dir().join(format!("ec-determinism-cmaf-ladder-{ts}"));
|
|
let _ = std::fs::create_dir_all(&tmp);
|
|
|
|
let input_ts = tmp.join("input.ts");
|
|
write_deterministic_ts(&input_ts).expect("write deterministic TS");
|
|
|
|
let run1 = tmp.join("run1");
|
|
let run2 = tmp.join("run2");
|
|
let _ = std::fs::remove_dir_all(&run1);
|
|
let _ = std::fs::remove_dir_all(&run2);
|
|
std::fs::create_dir_all(&run1).unwrap();
|
|
std::fs::create_dir_all(&run2).unwrap();
|
|
|
|
run_ladder(&ec_node, &input_ts, &run1).expect("run ladder 1");
|
|
run_ladder(&ec_node, &input_ts, &run2).expect("run ladder 2");
|
|
|
|
for variant in ["1080p", "720p", "480p"] {
|
|
let v1 = run1.join("cmaf-ladder").join(variant);
|
|
let v2 = run2.join("cmaf-ladder").join(variant);
|
|
|
|
let init1 = v1.join("init.mp4");
|
|
let init2 = v2.join("init.mp4");
|
|
assert!(
|
|
init1.exists() && init2.exists(),
|
|
"missing init for {variant}"
|
|
);
|
|
assert_eq!(
|
|
blake3_hex(&init1).unwrap(),
|
|
blake3_hex(&init2).unwrap(),
|
|
"init differs for {variant}"
|
|
);
|
|
|
|
for idx in 0..3 {
|
|
let s1 = v1.join(format!("segment_{idx:06}.m4s"));
|
|
let s2 = v2.join(format!("segment_{idx:06}.m4s"));
|
|
assert!(
|
|
s1.exists() && s2.exists(),
|
|
"missing segment {idx} for {variant}"
|
|
);
|
|
assert_eq!(
|
|
blake3_hex(&s1).unwrap(),
|
|
blake3_hex(&s2).unwrap(),
|
|
"segment {idx} differs for {variant}"
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
#[ignore]
|
|
fn cmaf_ladder_segments_start_with_keyframes() {
|
|
let ec_node = ec_node_path();
|
|
|
|
let ts = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.unwrap_or_default()
|
|
.as_millis();
|
|
let tmp = std::env::temp_dir().join(format!("ec-determinism-cmaf-ladder-kf-{ts}"));
|
|
let _ = std::fs::create_dir_all(&tmp);
|
|
|
|
let input_ts = tmp.join("input.ts");
|
|
write_deterministic_ts(&input_ts).expect("write deterministic TS");
|
|
|
|
let run = tmp.join("run");
|
|
let _ = std::fs::remove_dir_all(&run);
|
|
std::fs::create_dir_all(&run).unwrap();
|
|
run_ladder(&ec_node, &input_ts, &run).expect("run ladder");
|
|
|
|
for variant in ["1080p", "720p", "480p"] {
|
|
let v = run.join("cmaf-ladder").join(variant);
|
|
let init = v.join("init.mp4");
|
|
assert!(init.exists(), "missing init for {variant}");
|
|
|
|
for idx in 0..3 {
|
|
let seg = v.join(format!("segment_{idx:06}.m4s"));
|
|
assert!(seg.exists(), "missing segment {idx} for {variant}");
|
|
|
|
let stitched = tmp.join(format!("stitched-{variant}-{idx:06}.mp4"));
|
|
concat_init_and_segment(&init, &seg, &stitched).unwrap();
|
|
let keyflag = first_video_frame_keyframe_flag(&stitched).unwrap();
|
|
assert_eq!(
|
|
keyflag, 1,
|
|
"segment {idx} not keyframe-aligned for {variant}"
|
|
);
|
|
}
|
|
}
|
|
}
|