use std::io::{BufRead, BufReader, Read, Write}; use std::process::{Command, Stdio}; use std::time::{Duration, Instant}; const TS_PACKET_SIZE: usize = 188; fn env_required(key: &str) -> Option { std::env::var(key) .ok() .map(|v| v.trim().to_string()) .filter(|v| !v.is_empty()) } fn ec_node_path() -> std::path::PathBuf { if let Ok(value) = std::env::var("EC_NODE_BIN") { return value.into(); } if let Ok(value) = std::env::var("CARGO_BIN_EXE_ec_node") { return value.into(); } if let Ok(value) = std::env::var("CARGO_BIN_EXE_ec-node") { return value.into(); } let exe = std::env::current_exe().expect("current_exe"); let debug_dir = exe .parent() .and_then(|p| p.parent()) .expect("expected target/debug/deps"); debug_dir.join("ec-node") } fn wait_for_line_prefix( lines: &mut dyn Iterator>, prefix: &str, timeout: Duration, ) -> Option { let deadline = Instant::now() + timeout; while Instant::now() < deadline { match lines.next() { Some(Ok(line)) => { if let Some(rest) = line.strip_prefix(prefix) { return Some(rest.trim().to_string()); } } Some(Err(_)) => continue, None => break, } } None } fn write_short_ts_recording( host: &str, channel: &str, out_path: &std::path::Path, ) -> anyhow::Result<()> { // Use the lineup's stream URL so we get the correct host/port (often :5004). // HDHomeRun supports `duration=...` on the stream URL on many models. // We also cap by time/bytes to avoid hanging if duration is ignored. let device = ec_hdhomerun::discover_from_host(host)?; let lineup = ec_hdhomerun::fetch_lineup(&device)?; let entry = ec_hdhomerun::find_lineup_entry_by_number(&lineup, channel) .or_else(|| ec_hdhomerun::find_lineup_entry_by_name(&lineup, channel)) .ok_or_else(|| anyhow::anyhow!("channel not found in lineup: {channel}"))?; // Tuner allocation can transiently fail (503) if another client is using all tuners. // Retry briefly; we only need a short capture. let mut last_err: Option = None; let mut stream = loop { match ec_hdhomerun::open_stream_entry(entry, Some(8)) { Ok(stream) => break stream, Err(err) => { let msg = format!("{err:#}"); last_err = Some(err); if msg.contains("503") { std::thread::sleep(Duration::from_millis(500)); continue; } return Err(last_err.unwrap()); } } }; let mut file = std::fs::File::create(out_path)?; let start = Instant::now(); let mut bytes = 0usize; let mut buf = [0u8; 64 * 1024]; loop { let n = stream.read(&mut buf)?; if n == 0 { break; } file.write_all(&buf[..n])?; bytes += n; if bytes >= 8 * 1024 * 1024 { break; } if start.elapsed() > Duration::from_secs(6) { break; } } file.flush()?; // Ensure the TS file ends on a packet boundary. let len = file.metadata()?.len(); let rem = (len as usize) % TS_PACKET_SIZE; if rem != 0 { file.set_len(len - rem as u64)?; bytes = (len as usize) - rem; } if bytes < 188 * 20 { anyhow::bail!("recorded TS too small ({} bytes) from HDHR {}", bytes, host); } Ok(()) } #[test] #[ignore] fn e2e_split_sources_manifests_from_one_peer_objects_from_another() { let host = match env_required("EVERY_CHANNEL_E2E_HDHR_HOST") { Some(v) => v, None => return, // skip }; let channel = match env_required("EVERY_CHANNEL_E2E_HDHR_CHANNEL") { Some(v) => v, None => return, // skip }; let ec_node = ec_node_path(); // Keep secrets deterministic for reproducibility. let signing_key = "11".repeat(32); let network_secret = "22".repeat(32); let ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis(); let stream_id = format!("every.channel/e2e/mesh/{ts}"); let broadcast_name = stream_id.clone(); let tmp = std::env::temp_dir().join(format!("ec-e2e-mesh-split-{ts}")); let _ = std::fs::create_dir_all(&tmp); let input_ts = tmp.join("input.ts"); let manifest_chunks = tmp.join("chunks-manifests"); let object_chunks = tmp.join("chunks-objects"); let subscribe_out = tmp.join("subscribe-out"); write_short_ts_recording(&host, &channel, &input_ts).expect("failed to record TS from HDHR"); // Publisher A: leader/signer, publishes manifests only. // Give subscribers time to connect before ingest starts. let mut pub_manifests = Command::new(&ec_node); pub_manifests .env("EVERY_CHANNEL_MANIFEST_SIGNING_KEY", &signing_key) .arg("moq-publish") .arg("--publish-manifests") .arg("--publish-chunks") .arg("false") .arg("--epoch-chunks") .arg("1") .arg("--max-chunks") .arg("6") .arg("--chunk-ms") .arg("2000") .arg("--stream-id") .arg(&stream_id) .arg("--broadcast-name") .arg(&broadcast_name) .arg("--track-name") .arg("noop") .arg("--manifest-track") .arg("manifests") .arg("--network-secret") .arg(&network_secret) .arg("--chunk-dir") .arg(&manifest_chunks) .arg("--startup-delay-ms") .arg("5000") .arg("ts") .arg(input_ts.to_string_lossy().to_string()) .stdout(Stdio::piped()) .stderr(Stdio::inherit()); let mut pub_a = pub_manifests.spawn().expect("spawn manifest publisher"); let a_stdout = pub_a .stdout .take() .expect("manifest publisher stdout missing"); let mut a_lines = BufReader::new(a_stdout).lines(); let remote_manifests = wait_for_line_prefix(&mut a_lines, "moq endpoint addr: ", Duration::from_secs(10)) .expect("manifest publisher did not print endpoint addr"); // Publisher B: relay/data, publishes chunk objects only. // Delay longer than the manifest publisher so the subscriber can receive manifests first. let mut pub_objects = Command::new(&ec_node); pub_objects .arg("moq-publish") .arg("--publish-chunks") .arg("true") .arg("--max-chunks") .arg("6") .arg("--chunk-ms") .arg("2000") .arg("--stream-id") .arg(&stream_id) .arg("--broadcast-name") .arg(&broadcast_name) .arg("--track-name") .arg("objects") .arg("--network-secret") .arg(&network_secret) .arg("--chunk-dir") .arg(&object_chunks) .arg("--startup-delay-ms") .arg("9000") .arg("ts") .arg(input_ts.to_string_lossy().to_string()) .stdout(Stdio::piped()) .stderr(Stdio::inherit()); let mut pub_b = pub_objects.spawn().expect("spawn object publisher"); let b_stdout = pub_b .stdout .take() .expect("object publisher stdout missing"); let mut b_lines = BufReader::new(b_stdout).lines(); let remote_objects = wait_for_line_prefix(&mut b_lines, "moq endpoint addr: ", Duration::from_secs(10)) .expect("object publisher did not print endpoint addr"); // Subscriber: stitch objects from B with manifests from A. let mut subscriber = Command::new(&ec_node); subscriber .arg("moq-subscribe") .arg("--remote") .arg(&remote_objects) .arg("--remote-manifests") .arg(&remote_manifests) .arg("--broadcast-name") .arg(&broadcast_name) .arg("--track-name") .arg("objects") .arg("--manifest-track") .arg("manifests") .arg("--subscribe-manifests") .arg("--require-manifest") .arg("--max-invalid-chunks") .arg("0") .arg("--stop-after") .arg("2") .arg("--output-dir") .arg(&subscribe_out) .arg("--chunk-ms") .arg("2000") .arg("--stream-id") .arg(&stream_id) .arg("--network-secret") .arg(&network_secret) .stdout(Stdio::inherit()) .stderr(Stdio::inherit()); let mut sub_child = subscriber.spawn().expect("failed to spawn subscriber"); let start = Instant::now(); loop { if let Ok(Some(status)) = sub_child.try_wait() { assert!(status.success(), "subscriber exited with {status}"); break; } if start.elapsed() > Duration::from_secs(30) { let _ = sub_child.kill(); panic!("subscriber timed out"); } std::thread::sleep(Duration::from_millis(200)); } // Ensure publishers exit after max chunks. for child in [&mut pub_a, &mut pub_b] { let start = Instant::now(); loop { if let Ok(Some(status)) = child.try_wait() { assert!(status.success(), "publisher exited with {status}"); break; } if start.elapsed() > Duration::from_secs(30) { let _ = child.kill(); panic!("publisher timed out"); } std::thread::sleep(Duration::from_millis(200)); } } let playlist = subscribe_out.join("index.m3u8"); assert!( playlist.exists(), "missing playlist at {}", playlist.display() ); let segments = std::fs::read_dir(&subscribe_out) .unwrap() .filter_map(|e| e.ok()) .filter(|e| e.file_name().to_string_lossy().starts_with("segment_")) .count(); assert!(segments >= 1, "expected at least one segment"); }