use std::io::{BufRead, BufReader}; use std::process::{Command, Stdio}; use std::time::{Duration, Instant}; fn env_required(key: &str) -> Option { std::env::var(key) .ok() .map(|v| v.trim().to_string()) .filter(|v| !v.is_empty()) } fn looks_drm(value: &str) -> bool { let value = value.to_lowercase(); value.contains("drm") || value.contains("encrypted") || value.contains("protected") || value.contains("copy") || value.contains("widevine") } fn autodiscover_hdhr_host_and_channel() -> Option<(String, String)> { let devices = ec_hdhomerun::discover().ok()?; let device = devices.into_iter().next()?; let lineup = ec_hdhomerun::fetch_lineup(&device).ok()?; let entry = lineup.iter().find(|e| { let tag_drm = e.tags.iter().any(|t| looks_drm(t)); let raw_drm = e .raw .as_object() .map(|obj| { obj.iter() .any(|(k, v)| looks_drm(k) || looks_drm(&v.to_string())) }) .unwrap_or(false); !tag_drm && !raw_drm && e.channel.number.as_deref().unwrap_or("").trim() != "" })?; let host = device.ip.clone(); let channel = entry .channel .number .clone() .or_else(|| Some(entry.channel.name.clone())) .unwrap_or_else(|| "2.1".to_string()); Some((host, channel)) } fn ec_node_path() -> std::path::PathBuf { if let Ok(value) = std::env::var("EC_NODE_BIN") { return value.into(); } if let Ok(value) = std::env::var("CARGO_BIN_EXE_ec_node") { return value.into(); } if let Ok(value) = std::env::var("CARGO_BIN_EXE_ec-node") { return value.into(); } // Fallback: assume a standard cargo target layout. let exe = std::env::current_exe().expect("current_exe"); let debug_dir = exe .parent() .and_then(|p| p.parent()) .expect("expected target/debug/deps"); let bin = debug_dir.join("ec-node"); bin } #[test] #[ignore] fn e2e_hdhr_publish_then_subscribe_with_manifest_and_encryption() { let host = env_required("EVERY_CHANNEL_E2E_HDHR_HOST"); let channel = env_required("EVERY_CHANNEL_E2E_HDHR_CHANNEL"); let (host, channel) = match (host, channel) { (Some(host), Some(channel)) => (host, channel), _ => match autodiscover_hdhr_host_and_channel() { Some(v) => v, None => return, // skip }, }; let ec_node = ec_node_path(); // Keep secrets deterministic for reproducibility. let signing_key = "11".repeat(32); let network_secret = "22".repeat(32); let ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis(); let broadcast_name = format!("every.channel/e2e/{ts}"); let tmp = std::env::temp_dir().join(format!("ec-e2e-hdhr-{ts}")); let publish_chunks = tmp.join("publish-chunks"); let subscribe_out = tmp.join("subscribe-out"); let mut publisher = Command::new(&ec_node); publisher .env("EVERY_CHANNEL_MANIFEST_SIGNING_KEY", &signing_key) .arg("moq-publish") .arg("--publish-manifests") .arg("--epoch-chunks") .arg("1") .arg("--max-chunks") .arg("8") .arg("--chunk-ms") .arg("2000") .arg("--broadcast-name") .arg(&broadcast_name) .arg("--network-secret") .arg(&network_secret) .arg("--chunk-dir") .arg(&publish_chunks) .arg("hdhr") .arg("--host") .arg(&host) .arg("--channel") .arg(&channel) .stdout(Stdio::piped()) .stderr(Stdio::inherit()); let mut child = publisher.spawn().expect("failed to spawn publisher"); let stdout = child.stdout.take().expect("publisher stdout missing"); let mut lines = BufReader::new(stdout).lines(); let mut remote: Option = None; let mut track: Option = None; let deadline = Instant::now() + Duration::from_secs(10); while Instant::now() < deadline { let line = match lines.next() { Some(Ok(line)) => line, Some(Err(_)) => continue, None => break, }; if let Some(rest) = line.strip_prefix("moq endpoint addr: ") { remote = Some(rest.trim().to_string()); } else if let Some(rest) = line.strip_prefix("moq track: ") { track = Some(rest.trim().to_string()); } if remote.is_some() && track.is_some() { break; } } let remote = remote.expect("publisher did not print endpoint addr in time"); let track = track.expect("publisher did not print track in time"); let mut subscriber = Command::new(&ec_node); subscriber .arg("moq-subscribe") .arg("--remote") .arg(&remote) .arg("--broadcast-name") .arg(&broadcast_name) .arg("--track-name") .arg(&track) .arg("--subscribe-manifests") .arg("--require-manifest") .arg("--max-invalid-chunks") .arg("0") .arg("--stop-after") .arg("3") .arg("--output-dir") .arg(&subscribe_out) .arg("--chunk-ms") .arg("2000") .arg("--network-secret") .arg(&network_secret) .stdout(Stdio::inherit()) .stderr(Stdio::inherit()); let mut sub_child = subscriber.spawn().expect("failed to spawn subscriber"); let start = Instant::now(); loop { if let Ok(Some(status)) = sub_child.try_wait() { assert!(status.success(), "subscriber exited with {status}"); break; } if start.elapsed() > Duration::from_secs(30) { let _ = sub_child.kill(); panic!("subscriber timed out"); } std::thread::sleep(Duration::from_millis(200)); } // Publisher should exit after max chunks; don't hang forever. let start = Instant::now(); loop { if let Ok(Some(status)) = child.try_wait() { assert!(status.success(), "publisher exited with {status}"); break; } if start.elapsed() > Duration::from_secs(30) { let _ = child.kill(); panic!("publisher timed out"); } std::thread::sleep(Duration::from_millis(200)); } let playlist = subscribe_out.join("index.m3u8"); assert!( playlist.exists(), "missing playlist at {}", playlist.display() ); let segments = std::fs::read_dir(&subscribe_out) .unwrap() .filter_map(|e| e.ok()) .filter(|e| e.file_name().to_string_lossy().starts_with("segment_")) .count(); assert!(segments >= 1, "expected at least one segment"); }