231 lines
7.4 KiB
Rust
231 lines
7.4 KiB
Rust
use std::io::{BufRead, BufReader, Read};
|
|
use std::process::{Command, Stdio};
|
|
use std::time::{Duration, Instant};
|
|
|
|
const TS_PACKET_SIZE: usize = 188;
|
|
|
|
fn env_required(key: &str) -> Option<String> {
|
|
std::env::var(key)
|
|
.ok()
|
|
.map(|v| v.trim().to_string())
|
|
.filter(|v| !v.is_empty())
|
|
}
|
|
|
|
fn ec_node_path() -> std::path::PathBuf {
|
|
if let Ok(value) = std::env::var("EC_NODE_BIN") {
|
|
return value.into();
|
|
}
|
|
if let Ok(value) = std::env::var("CARGO_BIN_EXE_ec_node") {
|
|
return value.into();
|
|
}
|
|
if let Ok(value) = std::env::var("CARGO_BIN_EXE_ec-node") {
|
|
return value.into();
|
|
}
|
|
let exe = std::env::current_exe().expect("current_exe");
|
|
let debug_dir = exe
|
|
.parent()
|
|
.and_then(|p| p.parent())
|
|
.expect("expected target/debug/deps");
|
|
debug_dir.join("ec-node")
|
|
}
|
|
|
|
fn wait_for_line_prefix(
|
|
lines: &mut dyn Iterator<Item = std::io::Result<String>>,
|
|
prefix: &str,
|
|
timeout: Duration,
|
|
) -> Option<String> {
|
|
let deadline = Instant::now() + timeout;
|
|
while Instant::now() < deadline {
|
|
match lines.next() {
|
|
Some(Ok(line)) => {
|
|
if let Some(rest) = line.strip_prefix(prefix) {
|
|
return Some(rest.trim().to_string());
|
|
}
|
|
}
|
|
Some(Err(_)) => continue,
|
|
None => break,
|
|
}
|
|
}
|
|
None
|
|
}
|
|
|
|
fn write_short_ts_recording(
|
|
host: &str,
|
|
channel: &str,
|
|
out_path: &std::path::Path,
|
|
) -> anyhow::Result<()> {
|
|
// Use lineup to resolve name -> number, but capture from the provided host.
|
|
// (OrbStack/Linux may not resolve the lineup URL's mDNS hostname.)
|
|
let device = ec_hdhomerun::discover_from_host(host)?;
|
|
let lineup = ec_hdhomerun::fetch_lineup(&device)?;
|
|
let entry = ec_hdhomerun::find_lineup_entry_by_number(&lineup, channel)
|
|
.or_else(|| ec_hdhomerun::find_lineup_entry_by_name(&lineup, channel))
|
|
.ok_or_else(|| anyhow::anyhow!("channel not found in lineup: {channel}"))?;
|
|
|
|
let guide_number = entry.channel.number.as_deref().unwrap_or(channel);
|
|
let capture_url = format!("http://{host}:5004/auto/v{guide_number}");
|
|
|
|
// Capture a short TS sample directly from the HDHR.
|
|
// Retry a few times to handle "no tuner available" 5xx responses.
|
|
let mut last_err: Option<anyhow::Error> = None;
|
|
for attempt in 0..10 {
|
|
match ec_hdhomerun::open_stream_url(&capture_url, Some(14)) {
|
|
Ok(mut stream) => {
|
|
let mut file = std::fs::File::create(out_path)?;
|
|
std::io::copy(&mut stream, &mut file)?;
|
|
last_err = None;
|
|
break;
|
|
}
|
|
Err(err) => {
|
|
last_err = Some(err);
|
|
std::thread::sleep(Duration::from_millis(400 * (attempt + 1) as u64));
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
if let Some(err) = last_err {
|
|
return Err(err);
|
|
}
|
|
|
|
let mut file = std::fs::File::open(out_path)?;
|
|
let mut bytes = Vec::new();
|
|
file.read_to_end(&mut bytes)?;
|
|
let mut len = bytes.len();
|
|
let rem = len % TS_PACKET_SIZE;
|
|
if rem != 0 {
|
|
len -= rem;
|
|
std::fs::write(out_path, &bytes[..len])?;
|
|
}
|
|
if len < 188 * 200 {
|
|
anyhow::bail!("recorded TS too small ({} bytes) from HDHR {}", len, host);
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
#[ignore]
|
|
fn e2e_cmaf_ladder_one_publisher_three_subscribers_verify_manifests() {
|
|
let host = match env_required("EVERY_CHANNEL_E2E_HDHR_HOST") {
|
|
Some(v) => v,
|
|
None => return, // skip
|
|
};
|
|
let channel = match env_required("EVERY_CHANNEL_E2E_HDHR_CHANNEL") {
|
|
Some(v) => v,
|
|
None => return, // skip
|
|
};
|
|
|
|
let ec_node = ec_node_path();
|
|
|
|
// Keep secrets deterministic for reproducibility.
|
|
let signing_key = "11".repeat(32);
|
|
let network_secret = "22".repeat(32);
|
|
|
|
let ts = std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.unwrap_or_default()
|
|
.as_millis();
|
|
let stream_id = format!("every.channel/e2e/cmaf-ladder/{ts}");
|
|
let broadcast_name = stream_id.clone();
|
|
|
|
let tmp = std::env::temp_dir().join(format!("ec-e2e-cmaf-ladder-{ts}"));
|
|
let _ = std::fs::create_dir_all(&tmp);
|
|
let input_ts = tmp.join("input.ts");
|
|
|
|
write_short_ts_recording(&host, &channel, &input_ts).expect("failed to record TS from HDHR");
|
|
|
|
let mut publisher = Command::new(&ec_node);
|
|
publisher
|
|
.env("EVERY_CHANNEL_MANIFEST_SIGNING_KEY", &signing_key)
|
|
.arg("moq-publish")
|
|
.arg("--publish-manifests")
|
|
.arg("--encode")
|
|
.arg("cmaf")
|
|
.arg("--cmaf-ladder")
|
|
.arg("hd3")
|
|
.arg("--epoch-chunks")
|
|
.arg("1")
|
|
.arg("--max-chunks")
|
|
.arg("3")
|
|
.arg("--chunk-ms")
|
|
.arg("2000")
|
|
.arg("--stream-id")
|
|
.arg(&stream_id)
|
|
.arg("--broadcast-name")
|
|
.arg(&broadcast_name)
|
|
.arg("--track-name")
|
|
.arg("chunks")
|
|
.arg("--init-track")
|
|
.arg("init")
|
|
.arg("--manifest-track")
|
|
.arg("manifests")
|
|
.arg("--network-secret")
|
|
.arg(&network_secret)
|
|
.arg("--chunk-dir")
|
|
.arg(tmp.join("pub-chunks"))
|
|
.arg("--startup-delay-ms")
|
|
.arg("4000")
|
|
.arg("ts")
|
|
.arg(input_ts.to_string_lossy().to_string())
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::inherit());
|
|
|
|
let mut pub_child = publisher.spawn().expect("spawn publisher");
|
|
let pub_stdout = pub_child.stdout.take().expect("publisher stdout missing");
|
|
let mut pub_lines = BufReader::new(pub_stdout).lines();
|
|
let remote = wait_for_line_prefix(
|
|
&mut pub_lines,
|
|
"moq endpoint addr: ",
|
|
Duration::from_secs(10),
|
|
)
|
|
.expect("publisher did not print endpoint addr");
|
|
|
|
let variants = ["1080p", "720p", "480p"];
|
|
let mut subscribers = Vec::new();
|
|
for variant in variants {
|
|
let out_dir = tmp.join(format!("sub-{variant}"));
|
|
let mut sub = Command::new(&ec_node);
|
|
sub.arg("moq-subscribe")
|
|
.arg("--remote")
|
|
.arg(&remote)
|
|
.arg("--remote-manifests")
|
|
.arg(&remote)
|
|
.arg("--broadcast-name")
|
|
.arg(&broadcast_name)
|
|
.arg("--track-name")
|
|
.arg(format!("chunks/{variant}"))
|
|
.arg("--subscribe-manifests")
|
|
.arg("--require-manifest")
|
|
.arg("--manifest-track")
|
|
.arg("manifests")
|
|
.arg("--container")
|
|
.arg("cmaf")
|
|
.arg("--subscribe-init")
|
|
.arg("--init-track")
|
|
.arg(format!("init/{variant}"))
|
|
.arg("--raw-cmaf")
|
|
.arg("--stop-after")
|
|
.arg("2")
|
|
.arg("--network-secret")
|
|
.arg(&network_secret)
|
|
.arg("--output-dir")
|
|
.arg(&out_dir)
|
|
.stdout(Stdio::inherit())
|
|
.stderr(Stdio::inherit());
|
|
subscribers.push((
|
|
variant.to_string(),
|
|
out_dir,
|
|
sub.spawn().expect("spawn subscriber"),
|
|
));
|
|
}
|
|
|
|
for (variant, out_dir, mut child) in subscribers {
|
|
let status = child.wait().expect("wait subscriber");
|
|
assert!(status.success(), "subscriber {variant} failed: {status}");
|
|
let init = out_dir.join("init.mp4");
|
|
assert!(init.exists(), "subscriber {variant} missing init.mp4");
|
|
let seg0 = out_dir.join("segment_000000.m4s");
|
|
assert!(seg0.exists(), "subscriber {variant} missing first segment");
|
|
}
|
|
|
|
let _ = pub_child.kill();
|
|
}
|