use std::io::{BufRead, BufReader, Read}; use std::process::{Command, Stdio}; use std::time::{Duration, Instant}; const TS_PACKET_SIZE: usize = 188; fn env_required(key: &str) -> Option { std::env::var(key) .ok() .map(|v| v.trim().to_string()) .filter(|v| !v.is_empty()) } fn looks_drm(value: &str) -> bool { let value = value.to_lowercase(); value.contains("drm") || value.contains("encrypted") || value.contains("protected") || value.contains("copy") || value.contains("widevine") } fn autodiscover_hdhr_host_and_channel() -> Option<(String, String)> { let devices = ec_hdhomerun::discover().ok()?; let device = devices.into_iter().next()?; let lineup = ec_hdhomerun::fetch_lineup(&device).ok()?; let entry = lineup.iter().find(|e| { // Prefer a likely-clear channel to avoid false negatives in E2E. let tag_drm = e.tags.iter().any(|t| looks_drm(t)); let raw_drm = e .raw .as_object() .map(|obj| { obj.iter() .any(|(k, v)| looks_drm(k) || looks_drm(&v.to_string())) }) .unwrap_or(false); !tag_drm && !raw_drm && e.channel.number.as_deref().unwrap_or("").trim() != "" })?; let host = device.ip.clone(); let channel = entry .channel .number .clone() .or_else(|| Some(entry.channel.name.clone())) .unwrap_or_else(|| "2.1".to_string()); Some((host, channel)) } fn ec_node_path() -> std::path::PathBuf { if let Ok(value) = std::env::var("EC_NODE_BIN") { return value.into(); } if let Ok(value) = std::env::var("CARGO_BIN_EXE_ec_node") { return value.into(); } if let Ok(value) = std::env::var("CARGO_BIN_EXE_ec-node") { return value.into(); } let exe = std::env::current_exe().expect("current_exe"); let debug_dir = exe .parent() .and_then(|p| p.parent()) .expect("expected target/debug/deps"); debug_dir.join("ec-node") } fn wait_for_line_prefix( lines: &mut dyn Iterator>, prefix: &str, timeout: Duration, ) -> Option { let deadline = Instant::now() + timeout; while Instant::now() < deadline { match lines.next() { Some(Ok(line)) => { if let Some(rest) = line.strip_prefix(prefix) { return Some(rest.trim().to_string()); } } Some(Err(_)) => continue, None => break, } } None } fn write_short_ts_recording( host: &str, channel: &str, out_path: &std::path::Path, ) -> anyhow::Result<()> { // Use lineup to resolve name -> number, but capture from the provided host. // (OrbStack/Linux may not resolve the lineup URL's mDNS hostname.) let device = ec_hdhomerun::discover_from_host(host)?; let lineup = ec_hdhomerun::fetch_lineup(&device)?; let entry = ec_hdhomerun::find_lineup_entry_by_number(&lineup, channel) .or_else(|| ec_hdhomerun::find_lineup_entry_by_name(&lineup, channel)) .ok_or_else(|| anyhow::anyhow!("channel not found in lineup: {channel}"))?; let guide_number = entry.channel.number.as_deref().unwrap_or(channel); let capture_url = format!("http://{host}:5004/auto/v{guide_number}"); // Capture a short TS sample directly from the HDHR. // Retry a few times to handle "no tuner available" 5xx responses. let mut last_err: Option = None; for attempt in 0..10 { match ec_hdhomerun::open_stream_url(&capture_url, Some(12)) { Ok(mut stream) => { let mut file = std::fs::File::create(out_path)?; std::io::copy(&mut stream, &mut file)?; last_err = None; break; } Err(err) => { last_err = Some(err); std::thread::sleep(Duration::from_millis(400 * (attempt + 1) as u64)); continue; } } } if let Some(err) = last_err { return Err(err); } let mut file = std::fs::File::open(out_path)?; let mut bytes = Vec::new(); file.read_to_end(&mut bytes)?; let mut len = bytes.len(); // Ensure the TS file ends on a packet boundary. let rem = len % TS_PACKET_SIZE; if rem != 0 { len -= rem; std::fs::write(out_path, &bytes[..len])?; } if len < 188 * 200 { anyhow::bail!("recorded TS too small ({} bytes) from HDHR {}", len, host); } Ok(()) } #[test] #[ignore] fn e2e_split_sources_cmaf_init_from_objects_peer_segments_verified_by_manifests_peer() { let host = env_required("EVERY_CHANNEL_E2E_HDHR_HOST"); let channel = env_required("EVERY_CHANNEL_E2E_HDHR_CHANNEL"); let (host, channel) = match (host, channel) { (Some(host), Some(channel)) => (host, channel), _ => match autodiscover_hdhr_host_and_channel() { Some(v) => v, None => return, // skip }, }; let ec_node = ec_node_path(); // Keep secrets deterministic for reproducibility. let signing_key = "11".repeat(32); let network_secret = "22".repeat(32); let ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis(); let stream_id = format!("every.channel/e2e/mesh-cmaf/{ts}"); let broadcast_name = stream_id.clone(); let tmp = std::env::temp_dir().join(format!("ec-e2e-mesh-split-cmaf-{ts}")); let _ = std::fs::create_dir_all(&tmp); let input_ts = tmp.join("input.ts"); let manifest_chunks = tmp.join("chunks-manifests"); let object_chunks = tmp.join("chunks-objects"); let subscribe_out = tmp.join("subscribe-out"); write_short_ts_recording(&host, &channel, &input_ts).expect("failed to record TS from HDHR"); // Publisher A: leader/signer, publishes manifests only (for CMAF segments). let mut pub_manifests = Command::new(&ec_node); pub_manifests .env("EVERY_CHANNEL_MANIFEST_SIGNING_KEY", &signing_key) .arg("moq-publish") .arg("--publish-manifests") .arg("--publish-chunks") .arg("false") .arg("--encode") .arg("cmaf") .arg("--epoch-chunks") .arg("1") .arg("--max-chunks") .arg("4") .arg("--chunk-ms") .arg("2000") .arg("--stream-id") .arg(&stream_id) .arg("--broadcast-name") .arg(&broadcast_name) .arg("--track-name") .arg("noop") .arg("--manifest-track") .arg("manifests") .arg("--network-secret") .arg(&network_secret) .arg("--chunk-dir") .arg(&manifest_chunks) .arg("--startup-delay-ms") .arg("6000") .arg("ts") .arg(input_ts.to_string_lossy().to_string()) .stdout(Stdio::piped()) .stderr(Stdio::inherit()); let mut pub_a = pub_manifests.spawn().expect("spawn manifest publisher"); let a_stdout = pub_a .stdout .take() .expect("manifest publisher stdout missing"); let mut a_lines = BufReader::new(a_stdout).lines(); let remote_manifests = wait_for_line_prefix(&mut a_lines, "moq endpoint addr: ", Duration::from_secs(10)) .expect("manifest publisher did not print endpoint addr"); // Publisher B: publishes init + segments as objects only. let mut pub_objects = Command::new(&ec_node); pub_objects .arg("moq-publish") .arg("--publish-chunks") .arg("true") .arg("--encode") .arg("cmaf") .arg("--init-track") .arg("init") .arg("--max-chunks") .arg("4") .arg("--chunk-ms") .arg("2000") .arg("--stream-id") .arg(&stream_id) .arg("--broadcast-name") .arg(&broadcast_name) .arg("--track-name") .arg("objects") .arg("--network-secret") .arg(&network_secret) .arg("--chunk-dir") .arg(&object_chunks) .arg("--startup-delay-ms") .arg("10000") .arg("ts") .arg(input_ts.to_string_lossy().to_string()) .stdout(Stdio::piped()) .stderr(Stdio::inherit()); let mut pub_b = pub_objects.spawn().expect("spawn object publisher"); let b_stdout = pub_b .stdout .take() .expect("object publisher stdout missing"); let mut b_lines = BufReader::new(b_stdout).lines(); let remote_objects = wait_for_line_prefix(&mut b_lines, "moq endpoint addr: ", Duration::from_secs(10)) .expect("object publisher did not print endpoint addr"); // Subscriber: init+segments from B, manifests from A. let mut subscriber = Command::new(&ec_node); subscriber .arg("moq-subscribe") .arg("--remote") .arg(&remote_objects) .arg("--remote-manifests") .arg(&remote_manifests) .arg("--broadcast-name") .arg(&broadcast_name) .arg("--track-name") .arg("objects") .arg("--manifest-track") .arg("manifests") .arg("--subscribe-manifests") .arg("--require-manifest") .arg("--max-invalid-chunks") .arg("0") .arg("--container") .arg("cmaf") .arg("--subscribe-init") .arg("--init-track") .arg("init") .arg("--stop-after") .arg("2") .arg("--output-dir") .arg(&subscribe_out) .arg("--chunk-ms") .arg("2000") .arg("--stream-id") .arg(&stream_id) .arg("--network-secret") .arg(&network_secret) .stdout(Stdio::inherit()) .stderr(Stdio::inherit()); let mut sub_child = subscriber.spawn().expect("failed to spawn subscriber"); let start = Instant::now(); loop { if let Ok(Some(status)) = sub_child.try_wait() { assert!(status.success(), "subscriber exited with {status}"); break; } if start.elapsed() > Duration::from_secs(60) { let _ = sub_child.kill(); panic!("subscriber timed out"); } std::thread::sleep(Duration::from_millis(200)); } // Ensure publishers exit after max chunks. for child in [&mut pub_a, &mut pub_b] { let start = Instant::now(); loop { if let Ok(Some(status)) = child.try_wait() { assert!(status.success(), "publisher exited with {status}"); break; } if start.elapsed() > Duration::from_secs(90) { let _ = child.kill(); panic!("publisher timed out"); } std::thread::sleep(Duration::from_millis(200)); } } let playlist = subscribe_out.join("index.m3u8"); assert!( playlist.exists(), "missing playlist at {}", playlist.display() ); let init = subscribe_out.join("init.mp4"); assert!(init.exists(), "missing init segment at {}", init.display()); let segments = std::fs::read_dir(&subscribe_out) .unwrap() .filter_map(|e| e.ok()) .filter(|e| e.file_name().to_string_lossy().ends_with(".m4s")) .count(); assert!(segments >= 1, "expected at least one .m4s segment"); }