use std::ffi::OsStr; use std::io::{BufRead, BufReader, Write}; use std::process::{Command, Stdio}; use std::time::{Duration, Instant}; fn which(cmd: &str) -> Option { if let Ok(path) = which::which(cmd) { return Some(path); } None } fn chrome_path() -> Option { // Prefer the standard macOS Chrome app bundle. let mac = std::path::PathBuf::from("/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"); if mac.exists() { return Some(mac); } which("google-chrome") .or_else(|| which("google-chrome-stable")) .or_else(|| which("chromium")) } fn ec_node_path() -> std::path::PathBuf { if let Ok(value) = std::env::var("EC_NODE_BIN") { return value.into(); } if let Ok(value) = std::env::var("CARGO_BIN_EXE_ec_node") { return value.into(); } if let Ok(value) = std::env::var("CARGO_BIN_EXE_ec-node") { return value.into(); } let exe = std::env::current_exe().expect("current_exe"); let debug_dir = exe .parent() .and_then(|p| p.parent()) .expect("expected target/debug/deps"); debug_dir.join("ec-node") } fn read_line_with_timeout( lines: &mut dyn Iterator>, timeout: Duration, ) -> Option { let deadline = Instant::now() + timeout; while Instant::now() < deadline { match lines.next() { Some(Ok(line)) => { let line = line.trim().to_string(); if !line.is_empty() { return Some(line); } } Some(Err(_)) => continue, None => break, } } None } fn generate_ts_fixture(out: &std::path::Path) -> anyhow::Result<()> { // Deterministic-ish fixture: single-threaded x264, fixed GOP, sine audio. let status = Command::new("ffmpeg") .arg("-hide_banner") .arg("-loglevel") .arg("error") .arg("-nostdin") .arg("-y") .arg("-f") .arg("lavfi") .arg("-i") .arg("testsrc2=size=1280x720:rate=30") .arg("-f") .arg("lavfi") .arg("-i") .arg("sine=frequency=1000:sample_rate=48000") .arg("-t") .arg("12") .arg("-map") .arg("0:v:0") .arg("-map") .arg("1:a:0") .arg("-c:v") .arg("libx264") .arg("-pix_fmt") .arg("yuv420p") .arg("-g") .arg("60") .arg("-keyint_min") .arg("60") .arg("-sc_threshold") .arg("0") .arg("-bf") .arg("0") .arg("-threads") .arg("1") .arg("-c:a") .arg("aac") .arg("-b:a") .arg("128k") .arg("-ac") .arg("2") .arg("-ar") .arg("48000") .arg("-f") .arg("mpegts") .arg(out) .status()?; if !status.success() { anyhow::bail!("ffmpeg fixture generation failed with {status}"); } Ok(()) } fn click_button_by_text(tab: &headless_chrome::Tab, text: &str) -> anyhow::Result<()> { let js = format!( r#"(function() {{ let btns = Array.from(document.querySelectorAll('button')); let btn = btns.find(b => (b.innerText || '').trim() === {t}); if (!btn) return false; btn.click(); return true; }})();"#, t = serde_json::to_string(text).unwrap() ); let v = tab.evaluate(&js, false)?; let ok = v.value.and_then(|v| v.as_bool()).unwrap_or(false); if !ok { anyhow::bail!("button not found: {text}"); } Ok(()) } fn fill_input_by_placeholder( tab: &headless_chrome::Tab, placeholder: &str, value: &str, ) -> anyhow::Result<()> { let js = format!( r#"(function() {{ let input = document.querySelector('input[placeholder={p}]'); if (!input) return false; input.focus(); input.value = {v}; input.dispatchEvent(new Event('input', {{ bubbles: true }})); input.dispatchEvent(new Event('change', {{ bubbles: true }})); return true; }})();"#, p = serde_json::to_string(placeholder).unwrap(), v = serde_json::to_string(value).unwrap() ); let v = tab.evaluate(&js, false)?; let ok = v.value.and_then(|v| v.as_bool()).unwrap_or(false); if !ok { anyhow::bail!("input not found for placeholder: {placeholder}"); } Ok(()) } fn get_reply_link(tab: &headless_chrome::Tab) -> anyhow::Result> { // Read the last readonly input inside the add menu; this is where we render the reply code. let js = r#"(function() { let menu = document.querySelector('.source-menu'); if (!menu) return null; let inputs = Array.from(menu.querySelectorAll('input.source-menu-input[readonly]')); if (!inputs.length) return null; return inputs[inputs.length - 1].value || null; })();"#; let v = tab.evaluate(js, false)?; Ok(v.value.and_then(|v| v.as_str().map(|s| s.to_string()))) } fn wait_for_text( tab: &headless_chrome::Tab, needle: &str, timeout: Duration, ) -> anyhow::Result<()> { let deadline = Instant::now() + timeout; while Instant::now() < deadline { let js = format!( r#"(function() {{ return document.body && (document.body.innerText || '').includes({n}); }})();"#, n = serde_json::to_string(needle).unwrap() ); let v = tab.evaluate(&js, false)?; if v.value.and_then(|v| v.as_bool()).unwrap_or(false) { return Ok(()); } std::thread::sleep(Duration::from_millis(200)); } anyhow::bail!("timed out waiting for text: {needle}"); } fn wait_for_blob_video(tab: &headless_chrome::Tab, timeout: Duration) -> anyhow::Result<()> { let deadline = Instant::now() + timeout; while Instant::now() < deadline { let js = r#"(function() { let v = document.querySelector('video'); if (!v) return false; if (typeof v.src !== 'string') return false; return v.src.startsWith('blob:'); })();"#; let v = tab.evaluate(js, false)?; if v.value.and_then(|v| v.as_bool()).unwrap_or(false) { return Ok(()); } std::thread::sleep(Duration::from_millis(200)); } anyhow::bail!("timed out waiting for video blob src"); } #[test] #[ignore] fn e2e_remote_website_connects_to_local_direct_publisher() -> anyhow::Result<()> { if which("ffmpeg").is_none() { return Ok(()); // skip } let chrome = match chrome_path() { Some(p) => p, None => return Ok(()), // skip }; let site_url = std::env::var("EVERY_CHANNEL_SITE_URL") .unwrap_or_else(|_| "https://every.channel/".to_string()); let ec_node = ec_node_path(); let ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis(); let tmp = std::env::temp_dir().join(format!("ec-e2e-remote-website-direct-{ts}")); let _ = std::fs::create_dir_all(&tmp); let input_ts = tmp.join("input.ts"); let chunk_dir = tmp.join("chunks"); generate_ts_fixture(&input_ts)?; let mut pub_child = Command::new(&ec_node) .arg("direct-publish") .arg("--chunk-dir") .arg(&chunk_dir) .arg("--chunk-ms") .arg("2000") .arg("--max-segments") .arg("6") .arg("ts") .arg(&input_ts) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::inherit()) .spawn()?; let stdout = pub_child.stdout.take().expect("publisher stdout missing"); let mut lines = BufReader::new(stdout).lines(); let offer = read_line_with_timeout(&mut lines, Duration::from_secs(60)) .ok_or_else(|| anyhow::anyhow!("publisher did not print offer link in time"))?; if !offer.starts_with("every.channel://direct?c=") { anyhow::bail!("unexpected offer link: {offer}"); } let launch_options = headless_chrome::LaunchOptionsBuilder::default() .path(Some(chrome)) .headless(true) .args(vec![ OsStr::new("--autoplay-policy=no-user-gesture-required"), OsStr::new("--mute-audio"), ]) .build() .unwrap(); let browser = headless_chrome::Browser::new(launch_options)?; let tab = browser.new_tab()?; tab.navigate_to(&site_url)?; tab.wait_until_navigated()?; // Open the add menu via class selector (stable). tab.wait_for_element("button.add-source")?.click()?; tab.wait_for_element(".source-menu")?; // Use Watch a link flow. fill_input_by_placeholder(&tab, "every.channel://watch?...", &offer)?; click_button_by_text(&tab, "Parse link")?; click_button_by_text(&tab, "Tune in")?; // Poll for reply link. let deadline = Instant::now() + Duration::from_secs(60); let reply = loop { if let Some(v) = get_reply_link(&tab)? { if v.starts_with("every.channel://direct?c=") { break v; } } if Instant::now() > deadline { anyhow::bail!("timed out waiting for reply link in UI"); } std::thread::sleep(Duration::from_millis(200)); }; // Feed reply back to publisher. let stdin = pub_child.stdin.as_mut().expect("publisher stdin missing"); writeln!(stdin, "{reply}")?; stdin.flush()?; // Website should go Live and show a blob video source. wait_for_text(&tab, "Live", Duration::from_secs(60))?; wait_for_blob_video(&tab, Duration::from_secs(60))?; // Cleanup. let _ = pub_child.kill(); let _ = pub_child.wait(); let _ = std::fs::remove_dir_all(&tmp); Ok(()) }