use std::fs; use std::io::{BufRead, BufReader}; use std::process::{Child, Command, Stdio}; use std::time::{Duration, Instant}; const ANVIL_PK0: &str = "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"; const ANVIL_PK1: &str = "0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d"; fn env_required(key: &str) -> Option { std::env::var(key) .ok() .map(|v| v.trim().to_string()) .filter(|v| !v.is_empty()) } fn looks_drm(value: &str) -> bool { let value = value.to_lowercase(); value.contains("drm") || value.contains("encrypted") || value.contains("protected") || value.contains("copy") || value.contains("widevine") } fn autodiscover_hdhr_host_and_channel() -> Option<(String, String)> { let devices = ec_hdhomerun::discover().ok()?; let device = devices.into_iter().next()?; let lineup = ec_hdhomerun::fetch_lineup(&device).ok()?; let entry = lineup.iter().find(|e| { let tag_drm = e.tags.iter().any(|t| looks_drm(t)); let raw_drm = e .raw .as_object() .map(|obj| { obj.iter() .any(|(k, v)| looks_drm(k) || looks_drm(&v.to_string())) }) .unwrap_or(false); !tag_drm && !raw_drm && e.channel.number.as_deref().unwrap_or("").trim() != "" })?; let host = device.ip.clone(); let channel = entry .channel .number .clone() .or_else(|| Some(entry.channel.name.clone())) .unwrap_or_else(|| "2.1".to_string()); Some((host, channel)) } fn ec_node_path() -> std::path::PathBuf { if let Ok(value) = std::env::var("EC_NODE_BIN") { return value.into(); } if let Ok(value) = std::env::var("CARGO_BIN_EXE_ec_node") { return value.into(); } if let Ok(value) = std::env::var("CARGO_BIN_EXE_ec-node") { return value.into(); } let exe = std::env::current_exe().expect("current_exe"); let debug_dir = exe .parent() .and_then(|p| p.parent()) .expect("expected target/debug/deps"); debug_dir.join("ec-node") } fn repo_root() -> std::path::PathBuf { std::path::Path::new(env!("CARGO_MANIFEST_DIR")) .parent() .and_then(|p| p.parent()) .expect("workspace root") .to_path_buf() } fn require_tools() -> bool { ["anvil", "cast", "forge", "ffmpeg"] .into_iter() .all(|tool| which::which(tool).is_ok()) } fn wait_for_anvil(rpc_url: &str) { let deadline = Instant::now() + Duration::from_secs(20); while Instant::now() < deadline { let status = Command::new("cast") .arg("block-number") .arg("--rpc-url") .arg(rpc_url) .stdout(Stdio::null()) .stderr(Stdio::null()) .status(); if matches!(status, Ok(status) if status.success()) { return; } std::thread::sleep(Duration::from_millis(250)); } panic!("anvil did not become ready"); } struct ChildGuard { child: Option, } impl ChildGuard { fn new(child: Child) -> Self { Self { child: Some(child) } } } impl Drop for ChildGuard { fn drop(&mut self) { if let Some(child) = self.child.as_mut() { let _ = child.kill(); let _ = child.wait(); } } } fn parse_field(line: &str, field: &str) -> Option { let prefix = format!("{field}="); line.split_whitespace() .find_map(|part| part.strip_prefix(&prefix).map(|value| value.to_string())) } #[test] #[ignore] fn e2e_hdhr_manifest_observation_finalizes_on_anvil() { if !require_tools() { return; } let host = env_required("EVERY_CHANNEL_E2E_HDHR_HOST"); let channel = env_required("EVERY_CHANNEL_E2E_HDHR_CHANNEL"); let (host, channel) = match (host, channel) { (Some(host), Some(channel)) => (host, channel), _ => match autodiscover_hdhr_host_and_channel() { Some(v) => v, None => return, }, }; let ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis(); let tmp = std::env::temp_dir().join(format!("ec-e2e-hdhr-chain-{ts}")); fs::create_dir_all(&tmp).unwrap(); let port = 18_545 + (ts % 10_000) as u16; let rpc_url = format!("http://127.0.0.1:{port}"); let anvil_log = fs::File::create(tmp.join("anvil.log")).unwrap(); let _anvil = ChildGuard::new( Command::new("anvil") .arg("--port") .arg(port.to_string()) .stdout(anvil_log) .stderr(Stdio::null()) .spawn() .expect("failed to spawn anvil"), ); wait_for_anvil(&rpc_url); let owner_file = tmp.join("owner.key"); fs::write(&owner_file, ANVIL_PK0).unwrap(); let deploy_json = tmp.join("observation-ledger-deploy.json"); let deploy = Command::new(repo_root().join("scripts/op-stack/deploy-observation-ledger.sh")) .current_dir(repo_root()) .env("EVERY_CHANNEL_RPC_URL", &rpc_url) .env("EVERY_CHANNEL_PRIVATE_KEY_FILE", &owner_file) .env("EVERY_CHANNEL_OBSERVATION_QUORUM", "1") .env("EVERY_CHANNEL_OBSERVATION_DEPLOY_OUT", &deploy_json) .stdout(Stdio::null()) .stderr(Stdio::inherit()) .status() .expect("failed to deploy observation ledger"); assert!(deploy.success(), "deploy failed with {deploy}"); let deploy_value: serde_json::Value = serde_json::from_slice(&fs::read(&deploy_json).unwrap()).unwrap(); let registry = deploy_value["registry"].as_str().unwrap(); let ledger = deploy_value["ledger"].as_str().unwrap(); let witness = Command::new("cast") .arg("wallet") .arg("address") .arg("--private-key") .arg(ANVIL_PK1) .output() .expect("failed to derive witness address"); assert!(witness.status.success()); let witness = String::from_utf8(witness.stdout).unwrap(); let witness = witness.trim(); let add_witness = Command::new("cast") .arg("send") .arg(registry) .arg("addWitness(address)") .arg(witness) .arg("--rpc-url") .arg(&rpc_url) .arg("--private-key") .arg(ANVIL_PK0) .stdout(Stdio::null()) .stderr(Stdio::inherit()) .status() .expect("failed to add witness"); assert!( add_witness.success(), "add witness failed with {add_witness}" ); let ec_node = ec_node_path(); let broadcast_name = format!("every.channel/e2e/blockchain/{ts}"); let mut publisher = Command::new(&ec_node); publisher .env("EVERY_CHANNEL_MANIFEST_SIGNING_KEY", "11".repeat(32)) .arg("moq-publish") .arg("--publish-manifests") .arg("--epoch-chunks") .arg("1") .arg("--max-chunks") .arg("1") .arg("--chunk-ms") .arg("2000") .arg("--broadcast-name") .arg(&broadcast_name) .arg("--observation-rpc-url") .arg(&rpc_url) .arg("--observation-ledger") .arg(ledger) .arg("--observation-private-key") .arg(ANVIL_PK1) .arg("--chunk-dir") .arg(tmp.join("chunks")) .arg("hdhr") .arg("--host") .arg(&host) .arg("--channel") .arg(&channel) .stdout(Stdio::null()) .stderr(Stdio::piped()); let mut child = publisher.spawn().expect("failed to spawn publisher"); let stderr = child.stderr.take().expect("publisher stderr missing"); let lines = BufReader::new(stderr) .lines() .filter_map(|line| line.ok()) .collect::>(); let status = child.wait().expect("failed to wait for publisher"); assert!( status.success(), "publisher exited with {status}: {lines:?}" ); let observation_line = lines .iter() .find(|line| line.starts_with("observation submitted:")) .expect("publisher did not submit an observation"); let observation_hash = parse_field(observation_line, "observation_hash").unwrap(); let slot_hash = parse_field(observation_line, "slot_hash").unwrap(); let finalized = Command::new("cast") .arg("call") .arg(ledger) .arg("finalizedObservationBySlot(bytes32)(bytes32)") .arg(slot_hash) .arg("--rpc-url") .arg(&rpc_url) .output() .expect("failed to read finalized slot"); assert!(finalized.status.success()); let finalized = String::from_utf8(finalized.stdout).unwrap(); assert_eq!(finalized.trim(), observation_hash); }