diff --git a/Cargo.lock b/Cargo.lock index bb04a6e..b559277 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2262,6 +2262,7 @@ dependencies = [ "serde_json", "tokio", "tokio-tungstenite", + "tokio-util", "tracing", "tracing-subscriber", "url", diff --git a/apps/tauri/src/main.rs b/apps/tauri/src/main.rs index 0cf0e10..9e7de0e 100644 --- a/apps/tauri/src/main.rs +++ b/apps/tauri/src/main.rs @@ -29,9 +29,9 @@ use std::io::{Cursor, Read}; use std::net::IpAddr; use std::path::{Path, PathBuf}; use std::process::{Child, Command, Stdio}; -use std::sync::{mpsc, Arc, Mutex as StdMutex}; #[cfg(target_os = "macos")] use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{mpsc, Arc, Mutex as StdMutex}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tauri::path::BaseDirectory; #[cfg(target_os = "macos")] @@ -309,10 +309,7 @@ fn dedupe_source_descriptors(sources: Vec) -> Vec>, - force: bool, -) { +async fn refresh_public_nbc_discovery_if_needed(state: Arc>, force: bool) { let needs_refresh = { let manager = state.lock().await; force @@ -2712,7 +2709,10 @@ fn discover_nbc_public_guide() -> Result { } else { Some(current_title.to_string()) }; - let entitlement = item.analytics.as_ref().and_then(|value| value.entitlement.as_deref()); + let entitlement = item + .analytics + .as_ref() + .and_then(|value| value.entitlement.as_deref()); let mut extra_metadata = vec![ StreamMetadata { key: "discovery_source".to_string(), @@ -2737,7 +2737,10 @@ fn discover_nbc_public_guide() -> Result { value: current_program.clone(), }); } - if let Some(relative_path) = tile.relative_path.as_ref().filter(|value| !value.is_empty()) + if let Some(relative_path) = tile + .relative_path + .as_ref() + .filter(|value| !value.is_empty()) { extra_metadata.push(StreamMetadata { key: "relative_path".to_string(), @@ -3986,7 +3989,10 @@ fn nbc_native_js_eval(window: &WebviewWindow, script: &str) -> Result { } #[cfg(target_os = "macos")] -fn nbc_native_snapshot(window: &WebviewWindow, capture_rect: Option<&NbcCaptureRect>) -> Result> { +fn nbc_native_snapshot( + window: &WebviewWindow, + capture_rect: Option<&NbcCaptureRect>, +) -> Result> { let (tx, rx) = mpsc::sync_channel(1); let capture_rect = capture_rect.cloned(); window @@ -4016,12 +4022,15 @@ fn nbc_native_snapshot(window: &WebviewWindow, capture_rect: Option<&NbcCaptureR let bitmap = NSBitmapImageRep::imageRepWithData(&tiff) .ok_or_else(|| anyhow!("failed to decode WKWebView snapshot"))?; let properties = NSDictionary::dictionary(); - bitmap.representationUsingType_properties( - NSBitmapImageFileType::JPEG, - &properties, - ) - .ok_or_else(|| anyhow!("failed to encode WKWebView snapshot as JPEG")) - .map(|jpeg| jpeg.to_vec()) + bitmap + .representationUsingType_properties( + NSBitmapImageFileType::JPEG, + &properties, + ) + .ok_or_else(|| { + anyhow!("failed to encode WKWebView snapshot as JPEG") + }) + .map(|jpeg| jpeg.to_vec()) }); tiff }; @@ -4561,7 +4570,10 @@ fn run_nbc_capture_loop( } #[cfg(target_os = "macos")] -fn spawn_nbc_frame_reader_native(app: &AppHandle, source: &StreamSource) -> Result { +fn spawn_nbc_frame_reader_native( + app: &AppHandle, + source: &StreamSource, +) -> Result { let origin_url = source_metadata_value(source, "origin_url") .ok_or_else(|| anyhow!("NBC source is missing origin_url metadata"))? .to_string(); @@ -4678,7 +4690,11 @@ fn bootstrap_nbc_auth_hidden( None, )?; wait_for_nbc_playback_webview(&window, &session.trace, session.hidden_mode)?; - let trace_state = session.trace.lock().map(|state| state.clone()).unwrap_or_default(); + let trace_state = session + .trace + .lock() + .map(|state| state.clone()) + .unwrap_or_default(); Ok(BootstrapNbcAuthResult { input_url: input_url.clone(), stream_id, diff --git a/crates/ec-node/Cargo.toml b/crates/ec-node/Cargo.toml index f1c56b0..c08cb36 100644 --- a/crates/ec-node/Cargo.toml +++ b/crates/ec-node/Cargo.toml @@ -40,8 +40,9 @@ hang = "0.14.0" moq-mux = "0.2.1" moq-lite = "0.14.0" moq-native = { version = "0.13.1", default-features = true } +headless_chrome = "1" +tokio-util = "0.7" url = "2" [dev-dependencies] -headless_chrome = "1" which = "6" diff --git a/crates/ec-node/src/main.rs b/crates/ec-node/src/main.rs index 0da7e30..5b86a2a 100644 --- a/crates/ec-node/src/main.rs +++ b/crates/ec-node/src/main.rs @@ -1,5 +1,6 @@ //! Node runner: orchestrates ingest, chunking, and MoQ publication. +mod nbc; mod source; use anyhow::{anyhow, Context, Result}; @@ -33,6 +34,10 @@ use futures_util::{SinkExt, StreamExt}; use iroh::Watcher; use just_webrtc::types::{DataChannelOptions, ICEServer, PeerConfiguration, PeerConnectionState}; use just_webrtc::{DataChannelExt, PeerConnectionBuilder, PeerConnectionExt}; +use nbc::{ + bootstrap_nbc_auth, nbc_capture_fps, resolve_nbc_chrome_path, resolve_nbc_profile_dir, + spawn_nbc_frame_reader, +}; use source::{HdhrSource, HlsMode, HlsSource, LinuxDvbSource, StreamSource, TsSource}; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fs; @@ -48,6 +53,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; use tokio::process::Command as TokioCommand; use tokio_tungstenite::tungstenite::Message as WsMessage; +use tokio_util::io::SyncIoBridge; use url::Url; const DIRECT_WIRE_TAG_FRAME: u8 = 0x00; @@ -90,6 +96,10 @@ enum Commands { WsSubscribe(WsSubscribeArgs), /// Publish a CMAF (fMP4) stream to a MoQ relay over WebTransport (Cloudflare preview by default). WtPublish(WtPublishArgs), + /// Warm an NBC / Adobe auth session in Chrome, only requiring operator interaction when the session is cold. + NbcBootstrap(NbcBootstrapArgs), + /// Publish a live NBC browser-backed stream to a MoQ relay over WebTransport. + NbcWtPublish(NbcWtPublishArgs), /// Subscribe to a relay broadcast over WebTransport/MoQ and archive groups into CAS. WtArchive(WtArchiveArgs), /// Serve archived relay groups as DVR-style HLS playlists + object endpoints. @@ -470,6 +480,68 @@ struct WtPublishArgs { control_endpoint_addr_out: Option, } +#[derive(Parser, Debug)] +struct NbcBootstrapArgs { + /// NBC watch or live URL to warm. + #[arg(long)] + source_url: String, + /// Optional Chrome binary override. + #[arg(long)] + chrome_path: Option, + /// Optional persistent profile directory override. + #[arg(long)] + profile_dir: Option, + /// Optional path for a screenshot captured when interactive auth is required. + #[arg(long)] + screenshot_out: Option, +} + +#[derive(Parser, Debug)] +struct NbcWtPublishArgs { + /// Relay URL (WebTransport) to connect to. + #[arg(long, default_value = "https://cdn.moq.dev/anon")] + url: String, + /// Broadcast name to publish. + #[arg(long)] + name: String, + /// NBC watch or live URL to open in Chrome. + #[arg(long)] + source_url: String, + /// Transmit fMP4 fragments directly (passthrough mode). + #[arg(long, default_value_t = true, action = clap::ArgAction::Set)] + passthrough: bool, + /// Danger: disable TLS verification for the relay. + #[arg(long, default_value_t = false)] + tls_disable_verify: bool, + /// Announce this relay stream over iroh gossip control topic. + #[arg(long, default_value_t = false)] + control_announce: bool, + /// Control gossip TTL (ms) for control announcements. + #[arg(long, default_value_t = 15000)] + control_ttl_ms: u64, + /// Control gossip announce interval (ms). + #[arg(long, default_value_t = 5000)] + control_interval_ms: u64, + /// Optional iroh secret key (hex) for control gossip endpoint identity. + #[arg(long)] + iroh_secret: Option, + /// Discovery modes to enable for control gossip endpoint (comma-separated: dht, mdns, dns). + #[arg(long)] + discovery: Option, + /// Gossip peers to connect to for control announcements (repeatable). + #[arg(long)] + gossip_peer: Vec, + /// Optional path to write this publisher's control endpoint address JSON. + #[arg(long)] + control_endpoint_addr_out: Option, + /// Optional Chrome binary override. + #[arg(long)] + chrome_path: Option, + /// Optional persistent profile directory override. + #[arg(long)] + profile_dir: Option, +} + #[derive(Parser, Debug)] struct WtArchiveArgs { /// Relay URL (WebTransport) to connect to. @@ -718,6 +790,8 @@ fn main() -> Result<()> { Commands::WsPublish(args) => run_async(ws_publish(args))?, Commands::WsSubscribe(args) => run_async(ws_subscribe(args))?, Commands::WtPublish(args) => run_async(wt_publish(args))?, + Commands::NbcBootstrap(args) => nbc_bootstrap(args)?, + Commands::NbcWtPublish(args) => run_async(nbc_wt_publish(args))?, Commands::WtArchive(args) => run_async(wt_archive(args))?, Commands::WtArchiveServe(args) => run_async(wt_archive_serve(args))?, Commands::ControlAnnounce(args) => run_async(control_announce(args))?, @@ -1807,6 +1881,47 @@ mod tests { assert!(manifest_hash_for_chunk(&manifest, sid, 12).is_none()); } + #[test] + fn decode_archive_group_bytes_unwraps_concatenated_object_frames() { + let meta_a = ObjectMeta { + created_unix_ms: 1, + content_type: "video/mp4".to_string(), + size_bytes: 4, + timing: None, + encryption: None, + chunk_hash: None, + chunk_hash_alg: None, + chunk_proof: None, + chunk_proof_alg: None, + manifest_id: None, + }; + let meta_b = ObjectMeta { + created_unix_ms: 2, + content_type: "video/mp4".to_string(), + size_bytes: 3, + timing: None, + encryption: None, + chunk_hash: None, + chunk_hash_alg: None, + chunk_proof: None, + chunk_proof_alg: None, + manifest_id: None, + }; + + let mut group = encode_object_frame(&meta_a, b"init").unwrap(); + group.extend_from_slice(&encode_object_frame(&meta_b, b"seg").unwrap()); + + let decoded = decode_archive_group_bytes(&group).unwrap(); + assert_eq!(decoded, b"initseg"); + } + + #[test] + fn decode_archive_group_bytes_passes_through_raw_payloads() { + let raw = b"\x00\x00\x00\x18ftypisom\x00\x00\x02\x00isomiso2"; + let decoded = decode_archive_group_bytes(raw).unwrap(); + assert_eq!(decoded, raw); + } + #[derive(Clone)] struct DummySource { source_id: ec_core::SourceId, @@ -5835,6 +5950,59 @@ fn archive_error(status: u16, message: &str) -> ArchiveHttpResponse { ) } +fn decode_archive_group_bytes(bytes: &[u8]) -> Result> { + let mut pos = 0usize; + let mut out = Vec::new(); + let mut decoded_any = false; + + while pos < bytes.len() { + if bytes.len().saturating_sub(pos) < 4 { + if decoded_any { + return Err(anyhow!("archive group ended mid-frame header")); + } + return Ok(bytes.to_vec()); + } + + let meta_len = + u32::from_be_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]]) + as usize; + let meta_start = pos + 4; + let meta_end = meta_start + meta_len; + if meta_end > bytes.len() { + if decoded_any { + return Err(anyhow!("archive group ended mid-frame metadata")); + } + return Ok(bytes.to_vec()); + } + + let meta: ObjectMeta = match serde_json::from_slice(&bytes[meta_start..meta_end]) { + Ok(meta) => meta, + Err(err) => { + if decoded_any { + return Err(anyhow!("archive group metadata parse failed: {err:#}")); + } + return Ok(bytes.to_vec()); + } + }; + let data_len = usize::try_from(meta.size_bytes) + .map_err(|_| anyhow!("archive object size exceeds addressable memory"))?; + let data_start = meta_end; + let data_end = data_start + data_len; + if data_end > bytes.len() { + if decoded_any { + return Err(anyhow!("archive group ended mid-frame payload")); + } + return Ok(bytes.to_vec()); + } + + out.extend_from_slice(&bytes[data_start..data_end]); + pos = data_end; + decoded_any = true; + } + + Ok(out) +} + fn archive_status_text(status: u16) -> &'static str { match status { 200 => "OK", @@ -6025,7 +6193,10 @@ fn handle_archive_http_request( Err(_) => return archive_error(400, "invalid hash"), }; match fs::read(path) { - Ok(bytes) => archive_response(200, "video/mp4", bytes), + Ok(bytes) => match decode_archive_group_bytes(&bytes) { + Ok(payload) => archive_response(200, "video/mp4", payload), + Err(err) => archive_error(500, &format!("{err:#}")), + }, Err(_) => archive_error(404, "init segment not found"), } } @@ -6036,7 +6207,10 @@ fn handle_archive_http_request( Err(_) => return archive_error(400, "invalid hash"), }; match fs::read(path) { - Ok(bytes) => archive_response(200, "video/mp4", bytes), + Ok(bytes) => match decode_archive_group_bytes(&bytes) { + Ok(payload) => archive_response(200, "video/mp4", payload), + Err(err) => archive_error(500, &format!("{err:#}")), + }, Err(_) => archive_error(404, "segment not found"), } } @@ -6105,46 +6279,259 @@ async fn handle_archive_http_connection( Ok(()) } -async fn wt_archive_serve(args: WtArchiveServeArgs) -> Result<()> { - let manifest_root = args - .manifest_dir - .unwrap_or_else(|| args.output_dir.join("manifests")); - let cas_root = args.output_dir.join("objects").join("blake3"); - fs::create_dir_all(&manifest_root) - .with_context(|| format!("failed to create manifest dir {}", manifest_root.display()))?; - fs::create_dir_all(&cas_root) - .with_context(|| format!("failed to create CAS dir {}", cas_root.display()))?; +#[derive(Debug, Clone)] +struct WtPublishRelayArgs { + url: String, + name: String, + tls_disable_verify: bool, + control_announce: bool, + control_ttl_ms: u64, + control_interval_ms: u64, + iroh_secret: Option, + discovery: Option, + gossip_peer: Vec, + control_endpoint_addr_out: Option, +} - let listener = TcpListener::bind(&args.listen) - .await - .with_context(|| format!("failed to bind archive replay listener {}", args.listen))?; - let local = listener - .local_addr() - .context("failed to read listen addr")?; - tracing::info!( - listen = %local, - manifest_root = %manifest_root.display(), - cas_root = %cas_root.display(), - "archive replay server listening" - ); +struct WtPublishRelayState { + session: moq_lite::Session, + broadcast: moq_lite::BroadcastProducer, + catalog: hang::CatalogProducer, + control_stop: Option>, +} - let state = ArchiveReplayState { - cas_root, - manifest_root, - }; +#[derive(Clone)] +struct ProtocolOverride { + inner: S, + protocol: Option, +} - loop { - let (stream, peer) = listener.accept().await.context("accept failed")?; - let state_clone = state.clone(); - tokio::spawn(async move { - if let Err(err) = handle_archive_http_connection(stream, state_clone).await { - tracing::debug!(peer = %peer, err = %err, "archive replay request failed"); - } - }); +impl web_transport_trait::Session for ProtocolOverride { + type SendStream = S::SendStream; + type RecvStream = S::RecvStream; + type Error = S::Error; + + fn accept_uni( + &self, + ) -> impl Future> + web_transport_trait::MaybeSend + { + self.inner.accept_uni() + } + + fn accept_bi( + &self, + ) -> impl Future> + + web_transport_trait::MaybeSend { + self.inner.accept_bi() + } + + fn open_bi( + &self, + ) -> impl Future> + + web_transport_trait::MaybeSend { + self.inner.open_bi() + } + + fn open_uni( + &self, + ) -> impl Future> + web_transport_trait::MaybeSend + { + self.inner.open_uni() + } + + fn send_datagram(&self, payload: bytes::Bytes) -> Result<(), Self::Error> { + self.inner.send_datagram(payload) + } + + fn recv_datagram( + &self, + ) -> impl Future> + web_transport_trait::MaybeSend + { + self.inner.recv_datagram() + } + + fn max_datagram_size(&self) -> usize { + self.inner.max_datagram_size() + } + + fn protocol(&self) -> Option<&str> { + self.protocol.as_deref().or_else(|| self.inner.protocol()) + } + + fn close(&self, code: u32, reason: &str) { + self.inner.close(code, reason) + } + + fn closed(&self) -> impl Future + web_transport_trait::MaybeSend { + self.inner.closed() } } -async fn wt_publish(args: WtPublishArgs) -> Result<()> { +async fn connect_moq_session( + relay_url: &Url, + publish: moq_lite::OriginConsumer, + tls_disable_verify: bool, +) -> Result { + let host = relay_url + .host_str() + .ok_or_else(|| anyhow!("relay url missing host: {relay_url}"))? + .to_string(); + let port = relay_url.port().unwrap_or(443); + + let mut roots = rustls::RootCertStore::empty(); + let native = rustls_native_certs::load_native_certs(); + if !native.errors.is_empty() { + tracing::warn!( + errors = ?native.errors, + "some native root certs could not be loaded" + ); + } + for cert in native.certs { + let _ = roots.add(cert); + } + + let mut tls = rustls::ClientConfig::builder() + .with_root_certificates(roots) + .with_no_client_auth(); + + if tls_disable_verify { + #[derive(Debug)] + struct NoCertificateVerification(Arc); + + impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification { + fn verify_server_cert( + &self, + _end_entity: &rustls::pki_types::CertificateDer<'_>, + _intermediates: &[rustls::pki_types::CertificateDer<'_>], + _server_name: &rustls::pki_types::ServerName<'_>, + _ocsp: &[u8], + _now: rustls::pki_types::UnixTime, + ) -> Result { + Ok(rustls::client::danger::ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + message: &[u8], + cert: &rustls::pki_types::CertificateDer<'_>, + dss: &rustls::DigitallySignedStruct, + ) -> Result + { + rustls::crypto::verify_tls12_signature( + message, + cert, + dss, + &self.0.signature_verification_algorithms, + ) + } + + fn verify_tls13_signature( + &self, + message: &[u8], + cert: &rustls::pki_types::CertificateDer<'_>, + dss: &rustls::DigitallySignedStruct, + ) -> Result + { + rustls::crypto::verify_tls13_signature( + message, + cert, + dss, + &self.0.signature_verification_algorithms, + ) + } + + fn supported_verify_schemes(&self) -> Vec { + self.0.signature_verification_algorithms.supported_schemes() + } + } + + let provider = rustls::crypto::CryptoProvider::get_default() + .cloned() + .unwrap_or_else(|| Arc::new(rustls::crypto::ring::default_provider())); + tls.dangerous() + .set_certificate_verifier(Arc::new(NoCertificateVerification(provider))); + } + + tls.alpn_protocols = vec![web_transport_quinn::ALPN.as_bytes().to_vec()]; + + let socket = std::net::UdpSocket::bind("[::]:0").context("failed to bind UDP socket")?; + + let mut transport = quinn::TransportConfig::default(); + transport.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap())); + transport.keep_alive_interval(Some(Duration::from_secs(4))); + transport.mtu_discovery_config(None); + + let transport = Arc::new(transport); + let runtime = quinn::default_runtime().context("no async runtime")?; + let endpoint_config = quinn::EndpointConfig::default(); + let endpoint = quinn::Endpoint::new(endpoint_config, None, socket, runtime) + .context("failed to create QUIC endpoint")?; + + let ip = tokio::net::lookup_host((host.clone(), port)) + .await + .context("failed DNS lookup")? + .next() + .context("no DNS entries")?; + + let quic: quinn::crypto::rustls::QuicClientConfig = tls.try_into()?; + let mut client_cfg = quinn::ClientConfig::new(Arc::new(quic)); + client_cfg.transport_config(transport); + + tracing::debug!(%ip, %host, %relay_url, "connecting QUIC"); + let connection = endpoint + .connect_with(client_cfg, ip, &host)? + .await + .context("failed QUIC connect")?; + + let mut request = web_transport_quinn::proto::ConnectRequest::new(relay_url.clone()); + for alpn in moq_lite::ALPNS { + request = request.with_protocol(alpn.to_string()); + } + let wt = web_transport_quinn::Session::connect(connection, request) + .await + .context("failed WebTransport CONNECT")?; + + let client = moq_lite::Client::new().with_publish(publish); + + match client.connect(wt.clone()).await { + Ok(session) => { + tracing::info!("connected to relay (native protocol negotiation)"); + return Ok(session); + } + Err(err) => { + tracing::debug!(err = %err, "native MoQ SETUP failed; trying protocol overrides"); + } + } + + let attempts: [&str; 4] = ["moqt-16", "moqt-15", "moq-00", ""]; + let mut last_err: Option = None; + + for p in attempts { + let session = ProtocolOverride { + inner: wt.clone(), + protocol: (!p.is_empty()).then(|| p.to_string()), + }; + + match client.connect(session).await { + Ok(session) => { + tracing::info!(protocol = %p, "connected to relay"); + return Ok(session); + } + Err(err) => { + last_err = Some(anyhow::Error::new(err)); + tracing::debug!( + protocol = %p, + err = %last_err.as_ref().unwrap(), + "MoQ SETUP failed; retrying" + ); + } + } + } + + Err(last_err.unwrap_or_else(|| anyhow!("failed to connect"))).context("failed MoQ SETUP") +} + +async fn open_wt_publish_relay(args: &WtPublishRelayArgs) -> Result { let relay_url = Url::parse(&args.url).with_context(|| format!("invalid relay url: {}", args.url))?; let relay_url_str = relay_url.to_string(); @@ -6216,260 +6603,79 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { } } - // Create a local origin + broadcast, then pass an OriginConsumer into the client so it can - // publish announcements to the relay. let origin = moq_lite::Origin::produce(); let publish = origin.consume(); let mut broadcast = origin .create_broadcast(&args.name) .ok_or_else(|| anyhow!("failed to create broadcast: {}", args.name))?; - - // Ensure the catalog track is present in the broadcast so subscribers can discover tracks. - let mut catalog = hang::CatalogProducer::default(); + let catalog = hang::CatalogProducer::default(); broadcast.insert_track(catalog.track.clone()); - #[derive(Clone)] - struct ProtocolOverride { - inner: S, - protocol: Option, - } - - impl web_transport_trait::Session for ProtocolOverride { - type SendStream = S::SendStream; - type RecvStream = S::RecvStream; - type Error = S::Error; - - fn accept_uni( - &self, - ) -> impl Future> + web_transport_trait::MaybeSend - { - self.inner.accept_uni() - } - - fn accept_bi( - &self, - ) -> impl Future> - + web_transport_trait::MaybeSend { - self.inner.accept_bi() - } - - fn open_bi( - &self, - ) -> impl Future> - + web_transport_trait::MaybeSend { - self.inner.open_bi() - } - - fn open_uni( - &self, - ) -> impl Future> + web_transport_trait::MaybeSend - { - self.inner.open_uni() - } - - fn send_datagram(&self, payload: bytes::Bytes) -> Result<(), Self::Error> { - self.inner.send_datagram(payload) - } - - fn recv_datagram( - &self, - ) -> impl Future> + web_transport_trait::MaybeSend - { - self.inner.recv_datagram() - } - - fn max_datagram_size(&self) -> usize { - self.inner.max_datagram_size() - } - - fn protocol(&self) -> Option<&str> { - self.protocol.as_deref().or_else(|| self.inner.protocol()) - } - - fn close(&self, code: u32, reason: &str) { - self.inner.close(code, reason) - } - - fn closed(&self) -> impl Future + web_transport_trait::MaybeSend { - self.inner.closed() - } - } - - async fn connect_moq_session( - relay_url: &Url, - publish: moq_lite::OriginConsumer, - tls_disable_verify: bool, - ) -> Result { - let host = relay_url - .host_str() - .ok_or_else(|| anyhow!("relay url missing host: {relay_url}"))? - .to_string(); - let port = relay_url.port().unwrap_or(443); - - // Build TLS config. - let mut roots = rustls::RootCertStore::empty(); - let native = rustls_native_certs::load_native_certs(); - if !native.errors.is_empty() { - tracing::warn!( - errors = ?native.errors, - "some native root certs could not be loaded" - ); - } - for cert in native.certs { - let _ = roots.add(cert); - } - - let mut tls = rustls::ClientConfig::builder() - .with_root_certificates(roots) - .with_no_client_auth(); - - if tls_disable_verify { - // Mirror moq-native's behavior: accept any certificate, but still verify signatures. - #[derive(Debug)] - struct NoCertificateVerification(Arc); - - impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification { - fn verify_server_cert( - &self, - _end_entity: &rustls::pki_types::CertificateDer<'_>, - _intermediates: &[rustls::pki_types::CertificateDer<'_>], - _server_name: &rustls::pki_types::ServerName<'_>, - _ocsp: &[u8], - _now: rustls::pki_types::UnixTime, - ) -> Result - { - Ok(rustls::client::danger::ServerCertVerified::assertion()) - } - - fn verify_tls12_signature( - &self, - message: &[u8], - cert: &rustls::pki_types::CertificateDer<'_>, - dss: &rustls::DigitallySignedStruct, - ) -> Result - { - rustls::crypto::verify_tls12_signature( - message, - cert, - dss, - &self.0.signature_verification_algorithms, - ) - } - - fn verify_tls13_signature( - &self, - message: &[u8], - cert: &rustls::pki_types::CertificateDer<'_>, - dss: &rustls::DigitallySignedStruct, - ) -> Result - { - rustls::crypto::verify_tls13_signature( - message, - cert, - dss, - &self.0.signature_verification_algorithms, - ) - } - - fn supported_verify_schemes(&self) -> Vec { - self.0.signature_verification_algorithms.supported_schemes() - } - } - - let provider = rustls::crypto::CryptoProvider::get_default() - .cloned() - .unwrap_or_else(|| Arc::new(rustls::crypto::ring::default_provider())); - tls.dangerous() - .set_certificate_verifier(Arc::new(NoCertificateVerification(provider))); - } - - // WebTransport over HTTP/3. - tls.alpn_protocols = vec![web_transport_quinn::ALPN.as_bytes().to_vec()]; - - // Build a Quinn endpoint. - let socket = std::net::UdpSocket::bind("[::]:0").context("failed to bind UDP socket")?; - - let mut transport = quinn::TransportConfig::default(); - transport.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap())); - transport.keep_alive_interval(Some(Duration::from_secs(4))); - transport.mtu_discovery_config(None); - - let transport = Arc::new(transport); - let runtime = quinn::default_runtime().context("no async runtime")?; - let endpoint_config = quinn::EndpointConfig::default(); - let endpoint = quinn::Endpoint::new(endpoint_config, None, socket, runtime) - .context("failed to create QUIC endpoint")?; - - // Resolve relay. - let ip = tokio::net::lookup_host((host.clone(), port)) - .await - .context("failed DNS lookup")? - .next() - .context("no DNS entries")?; - - let quic: quinn::crypto::rustls::QuicClientConfig = tls.try_into()?; - let mut client_cfg = quinn::ClientConfig::new(Arc::new(quic)); - client_cfg.transport_config(transport); - - tracing::debug!(%ip, %host, %relay_url, "connecting QUIC"); - let connection = endpoint - .connect_with(client_cfg, ip, &host)? - .await - .context("failed QUIC connect")?; - - // Establish a WebTransport session. - let mut request = web_transport_quinn::proto::ConnectRequest::new(relay_url.clone()); - for alpn in moq_lite::ALPNS { - request = request.with_protocol(alpn.to_string()); - } - let wt = web_transport_quinn::Session::connect(connection, request) - .await - .context("failed WebTransport CONNECT")?; - - // Establish a MoQ session. First try native negotiation as-selected by the relay. - // If that fails, fall back to explicit protocol overrides for relays that omit protocol - // selection in CONNECT responses. - let client = moq_lite::Client::new().with_publish(publish); - - match client.connect(wt.clone()).await { - Ok(session) => { - tracing::info!("connected to relay (native protocol negotiation)"); - return Ok(session); - } - Err(err) => { - tracing::debug!(err = %err, "native MoQ SETUP failed; trying protocol overrides"); - } - } - - // These correspond to IETF draft ALPNs as used by moq-lite/web code. - // We use string literals here since moq-lite does not currently expose these constants. - let attempts: [&str; 4] = ["moqt-16", "moqt-15", "moq-00", ""]; - let mut last_err: Option = None; - - for p in attempts { - let session = ProtocolOverride { - inner: wt.clone(), - protocol: (!p.is_empty()).then(|| p.to_string()), - }; - - match client.connect(session).await { - Ok(session) => { - tracing::info!(protocol = %p, "connected to relay"); - return Ok(session); - } - Err(err) => { - last_err = Some(anyhow::Error::new(err)); - tracing::debug!(protocol = %p, err = %last_err.as_ref().unwrap(), "MoQ SETUP failed; retrying"); - } - } - } - - Err(last_err.unwrap_or_else(|| anyhow!("failed to connect"))).context("failed MoQ SETUP") - } - tracing::info!(url=%relay_url, name=%args.name, "connecting to relay"); let session = connect_moq_session(&relay_url, publish, args.tls_disable_verify).await?; + Ok(WtPublishRelayState { + session, + broadcast, + catalog, + control_stop, + }) +} + +async fn wt_archive_serve(args: WtArchiveServeArgs) -> Result<()> { + let manifest_root = args + .manifest_dir + .unwrap_or_else(|| args.output_dir.join("manifests")); + let cas_root = args.output_dir.join("objects").join("blake3"); + fs::create_dir_all(&manifest_root) + .with_context(|| format!("failed to create manifest dir {}", manifest_root.display()))?; + fs::create_dir_all(&cas_root) + .with_context(|| format!("failed to create CAS dir {}", cas_root.display()))?; + + let listener = TcpListener::bind(&args.listen) + .await + .with_context(|| format!("failed to bind archive replay listener {}", args.listen))?; + let local = listener + .local_addr() + .context("failed to read listen addr")?; + tracing::info!( + listen = %local, + manifest_root = %manifest_root.display(), + cas_root = %cas_root.display(), + "archive replay server listening" + ); + + let state = ArchiveReplayState { + cas_root, + manifest_root, + }; + + loop { + let (stream, peer) = listener.accept().await.context("accept failed")?; + let state_clone = state.clone(); + tokio::spawn(async move { + if let Err(err) = handle_archive_http_connection(stream, state_clone).await { + tracing::debug!(peer = %peer, err = %err, "archive replay request failed"); + } + }); + } +} + +async fn wt_publish(args: WtPublishArgs) -> Result<()> { + let mut relay = open_wt_publish_relay(&WtPublishRelayArgs { + url: args.url.clone(), + name: args.name.clone(), + tls_disable_verify: args.tls_disable_verify, + control_announce: args.control_announce, + control_ttl_ms: args.control_ttl_ms, + control_interval_ms: args.control_interval_ms, + iroh_secret: args.iroh_secret.clone(), + discovery: args.discovery.clone(), + gossip_peer: args.gossip_peer.clone(), + control_endpoint_addr_out: args.control_endpoint_addr_out.clone(), + }) + .await?; + // Spawn ffmpeg to generate fMP4 suitable for hang/moq-mux. let mut cmd = TokioCommand::new("ffmpeg"); cmd.arg("-hide_banner") @@ -6543,10 +6749,10 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { let config = moq_mux::import::Fmp4Config { passthrough: args.passthrough, }; - let mut importer = moq_mux::import::Fmp4::new(broadcast, catalog, config); + let mut importer = moq_mux::import::Fmp4::new(relay.broadcast, relay.catalog, config); let mut stdout = stdout; - let mut decode_fut = importer.decode_from(&mut stdout); + let decode_fut = importer.decode_from(&mut stdout); tokio::pin!(decode_fut); tracing::info!("publishing fMP4 -> moq-mux -> relay"); @@ -6559,7 +6765,7 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { Err(err) => Err(err).context("fmp4 ingest failed"), } } - _ = session.closed() => { + _ = relay.session.closed() => { let _ = child.kill().await; Err(anyhow!("relay session closed")) } @@ -6571,7 +6777,170 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { } }; - if let Some(stop) = control_stop.take() { + if let Some(stop) = relay.control_stop.take() { + let _ = stop.send(()); + } + + outcome +} + +fn nbc_bootstrap(args: NbcBootstrapArgs) -> Result<()> { + let chrome_path = resolve_nbc_chrome_path(args.chrome_path.as_deref())?; + let profile_dir = resolve_nbc_profile_dir(args.profile_dir.as_deref())?; + let result = bootstrap_nbc_auth( + chrome_path, + profile_dir, + args.source_url, + args.screenshot_out, + )?; + println!( + "{}", + serde_json::to_string_pretty(&result).context("failed to encode bootstrap result")? + ); + Ok(()) +} + +async fn nbc_wt_publish(args: NbcWtPublishArgs) -> Result<()> { + let chrome_path = resolve_nbc_chrome_path(args.chrome_path.as_deref())?; + let profile_dir = resolve_nbc_profile_dir(args.profile_dir.as_deref())?; + let mut relay = open_wt_publish_relay(&WtPublishRelayArgs { + url: args.url.clone(), + name: args.name.clone(), + tls_disable_verify: args.tls_disable_verify, + control_announce: args.control_announce, + control_ttl_ms: args.control_ttl_ms, + control_interval_ms: args.control_interval_ms, + iroh_secret: args.iroh_secret.clone(), + discovery: args.discovery.clone(), + gossip_peer: args.gossip_peer.clone(), + control_endpoint_addr_out: args.control_endpoint_addr_out.clone(), + }) + .await?; + + let mut frame_reader = spawn_nbc_frame_reader( + chrome_path.clone(), + profile_dir.clone(), + args.source_url.clone(), + ) + .with_context(|| format!("failed to open NBC browser session for {}", args.source_url))?; + + let fps = nbc_capture_fps().max(1); + let gop = (fps * 4).clamp(12, 48); + + let mut cmd = TokioCommand::new("ffmpeg"); + cmd.arg("-hide_banner") + .arg("-loglevel") + .arg("error") + .arg("-nostats") + .arg("-fflags") + .arg("+nobuffer") + .arg("-flags") + .arg("low_delay") + .arg("-f") + .arg("mjpeg") + .arg("-framerate") + .arg(fps.to_string()) + .arg("-i") + .arg("pipe:0") + .args([ + "-map", + "0:v:0", + "-an", + "-c:v", + "libx264", + "-preset", + "veryfast", + "-tune", + "zerolatency", + "-pix_fmt", + "yuv420p", + "-profile:v", + "main", + "-g", + ]) + .arg(gop.to_string()) + .arg("-keyint_min") + .arg(gop.to_string()) + .args([ + "-sc_threshold", + "0", + "-threads", + "1", + "-f", + "mp4", + "-movflags", + "empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset", + "pipe:1", + ]); + + cmd.stdin(Stdio::piped()); + cmd.stdout(Stdio::piped()); + cmd.stderr(Stdio::inherit()); + + tracing::info!( + source_url = %args.source_url, + chrome_path = %chrome_path.display(), + profile_dir = %profile_dir.display(), + "spawning ffmpeg for NBC browser capture" + ); + let mut child = cmd.spawn().context("failed to spawn ffmpeg")?; + let stdin = child + .stdin + .take() + .ok_or_else(|| anyhow!("ffmpeg stdin unavailable"))?; + let writer = tokio::task::spawn_blocking(move || -> Result<()> { + let mut stdin = SyncIoBridge::new(stdin); + std::io::copy(&mut frame_reader, &mut stdin) + .context("failed to pipe NBC JPEG frames into ffmpeg")?; + Ok(()) + }); + let stdout = child + .stdout + .take() + .ok_or_else(|| anyhow!("ffmpeg stdout unavailable"))?; + + let config = moq_mux::import::Fmp4Config { + passthrough: args.passthrough, + }; + let mut importer = moq_mux::import::Fmp4::new(relay.broadcast, relay.catalog, config); + + let mut stdout = stdout; + let decode_fut = importer.decode_from(&mut stdout); + tokio::pin!(decode_fut); + + tracing::info!("publishing NBC browser capture -> fMP4 -> moq-mux -> relay"); + let outcome = tokio::select! { + res = &mut decode_fut => { + let status = child.wait().await.context("failed to wait for ffmpeg")?; + match writer.await { + Ok(Ok(())) => {} + Ok(Err(err)) if !status.success() => { + tracing::debug!("NBC frame writer ended after ffmpeg exit: {err:#}"); + } + Ok(Err(err)) => return Err(err).context("NBC frame writer failed"), + Err(err) => return Err(anyhow!("NBC frame writer task failed: {err}")), + } + match res { + Ok(()) if status.success() => Ok(()), + Ok(()) => Err(anyhow!("ffmpeg exited with {status}")), + Err(err) => Err(err).context("fmp4 ingest failed"), + } + } + _ = relay.session.closed() => { + let _ = child.kill().await; + let _ = writer.await; + Err(anyhow!("relay session closed")) + } + _ = tokio::signal::ctrl_c() => { + tracing::info!("ctrl-c; shutting down"); + let _ = child.kill().await; + let _ = writer.await; + tokio::time::sleep(Duration::from_millis(100)).await; + Ok(()) + } + }; + + if let Some(stop) = relay.control_stop.take() { let _ = stop.send(()); } diff --git a/crates/ec-node/src/nbc.rs b/crates/ec-node/src/nbc.rs new file mode 100644 index 0000000..e4dd3ef --- /dev/null +++ b/crates/ec-node/src/nbc.rs @@ -0,0 +1,1284 @@ +use anyhow::{anyhow, Context, Result}; +use headless_chrome::protocol::cdp::Page; +use headless_chrome::{Browser, Tab}; +use serde::{Deserialize, Serialize}; +use std::env; +use std::fs; +use std::io::{BufRead, BufReader, Cursor, Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::path::{Path, PathBuf}; +use std::process::{Child, Command, Stdio}; +use std::sync::{mpsc, Arc, Mutex as StdMutex}; +use std::time::{Duration, Instant}; +use url::Url; + +#[derive(Debug, Default, Clone)] +struct NbcTraceState { + friendship_ready: bool, + adobe_session_ready: bool, + adobe_authorized: bool, + short_authorized: bool, + token_verified: bool, + mt_session_ready: bool, + background_login_complete: bool, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct NbcVideoState { + title: String, + page_url: String, + document_ready_state: String, + has_video: bool, + current_time: f64, + ready_state: i64, + paused: bool, + width: u64, + height: u64, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct NbcPageClues { + body_text: String, + ctas: Vec, + iframes: Vec, + inputs: Vec, + provider_candidates: Vec, + video_count: u64, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct NbcAuthAdvanceResult { + page_url: String, + title: String, + actions: Vec, +} + +#[derive(Debug, Clone, Serialize)] +pub struct BootstrapResult { + pub title: String, + pub page_url: String, + pub interactive_auth_required: bool, + pub authorized: bool, + pub screenshot_path: Option, +} + +#[derive(Debug)] +struct WaitOutcome { + state: NbcVideoState, + trace: NbcTraceState, + interactive_auth_required: bool, + screenshot_path: Option, +} + +enum AuthMode { + Forbidden, + AllowInteractive { timeout: Duration }, +} + +pub struct FramePipeReader { + rx: mpsc::Receiver>, + current: Cursor>, +} + +impl FramePipeReader { + fn new(rx: mpsc::Receiver>) -> Self { + Self { + rx, + current: Cursor::new(Vec::new()), + } + } +} + +impl Read for FramePipeReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + loop { + let read = self.current.read(buf)?; + if read > 0 { + return Ok(read); + } + match self.rx.recv() { + Ok(frame) => self.current = Cursor::new(frame), + Err(_) => return Ok(0), + } + } + } +} + +// Keep Linux/Xvfb Chrome launch aligned with the manual forge smoke that survives +// under a virtual display; the headless_chrome-managed launcher crashes there. +const NBC_CHROME_BASE_ARGS: &[&str] = &[ + "--disable-background-networking", + "--enable-features=NetworkService,NetworkServiceInProcess", + "--disable-background-timer-throttling", + "--disable-backgrounding-occluded-windows", + "--disable-breakpad", + "--disable-client-side-phishing-detection", + "--disable-component-extensions-with-background-pages", + "--disable-default-apps", + "--disable-dev-shm-usage", + "--disable-extensions", + "--disable-features=TranslateUI,BlinkGenPropertyTrees,GlobalMediaControls", + "--disable-hang-monitor", + "--disable-ipc-flooding-protection", + "--disable-popup-blocking", + "--disable-prompt-on-repost", + "--disable-renderer-backgrounding", + "--disable-sync", + "--force-color-profile=srgb", + "--metrics-recording-only", + "--enable-automation", + "--password-store=basic", + "--use-mock-keychain", + "--autoplay-policy=no-user-gesture-required", + "--mute-audio", +]; + +struct ChromeSession { + browser: Browser, + child: Child, +} + +impl ChromeSession { + fn browser(&self) -> &Browser { + &self.browser + } +} + +impl Drop for ChromeSession { + fn drop(&mut self) { + let pid = self.child.id(); + tracing::info!(pid, "shutting down external Chrome for NBC worker"); + let _ = self.child.kill(); + let _ = self.child.wait(); + } +} + +pub fn nbc_capture_timeout() -> Duration { + env::var("EVERY_CHANNEL_NBC_CAPTURE_TIMEOUT_SECS") + .ok() + .and_then(|value| value.parse::().ok()) + .map(Duration::from_secs) + .unwrap_or_else(|| Duration::from_secs(300)) +} + +pub fn nbc_capture_fps() -> u64 { + env::var("EVERY_CHANNEL_NBC_CAPTURE_FPS") + .ok() + .and_then(|value| value.parse::().ok()) + .filter(|value| *value > 0) + .unwrap_or(6) +} + +pub fn nbc_capture_quality() -> u32 { + env::var("EVERY_CHANNEL_NBC_CAPTURE_QUALITY") + .ok() + .and_then(|value| value.parse::().ok()) + .map(|value| value.clamp(1, 100)) + .unwrap_or(80) +} + +fn nbc_bootstrap_timeout() -> Duration { + env::var("EVERY_CHANNEL_NBC_BOOTSTRAP_TIMEOUT_SECS") + .ok() + .and_then(|value| value.parse::().ok()) + .map(Duration::from_secs) + .unwrap_or_else(|| Duration::from_secs(1800)) +} + +fn nbc_env_flag(name: &str) -> Option { + env::var(name).ok().map(|value| { + let value = value.trim().to_ascii_lowercase(); + matches!(value.as_str(), "1" | "true" | "yes" | "on") + }) +} + +fn nbc_disable_chrome_sandbox() -> bool { + nbc_env_flag("EVERY_CHANNEL_NBC_NO_SANDBOX").unwrap_or(cfg!(target_os = "linux")) +} + +fn nbc_enable_chrome_gpu() -> bool { + nbc_env_flag("EVERY_CHANNEL_NBC_ENABLE_GPU").unwrap_or(!cfg!(target_os = "linux")) +} + +fn nbc_proxy_server() -> Option { + env::var("EVERY_CHANNEL_NBC_PROXY_SERVER") + .ok() + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) +} + +fn nbc_proxy_bypass_list() -> Option { + env::var("EVERY_CHANNEL_NBC_PROXY_BYPASS_LIST") + .ok() + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) +} + +fn nbc_mvpd_provider_name() -> String { + env::var("EVERY_CHANNEL_NBC_MVPD_PROVIDER") + .ok() + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) + .unwrap_or_else(|| "Verizon Fios".to_string()) +} + +pub fn resolve_nbc_chrome_path(override_path: Option<&Path>) -> Result { + if let Some(path) = override_path { + if path.exists() { + return Ok(path.to_path_buf()); + } + return Err(anyhow!( + "configured Chrome path does not exist: {}", + path.display() + )); + } + + if let Ok(path) = + env::var("EVERY_CHANNEL_NBC_CHROME_PATH").or_else(|_| env::var("EVERY_CHANNEL_CHROME_PATH")) + { + let path = PathBuf::from(path); + if path.exists() { + return Ok(path); + } + return Err(anyhow!( + "configured Chrome path does not exist: {}", + path.display() + )); + } + + let candidates = [ + "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", + "/Applications/Chromium.app/Contents/MacOS/Chromium", + "/run/current-system/sw/bin/google-chrome-stable", + "/run/current-system/sw/bin/google-chrome", + "/run/current-system/sw/bin/chromium", + "/run/current-system/sw/bin/chromium-browser", + ]; + for candidate in candidates { + let path = PathBuf::from(candidate); + if path.exists() { + return Ok(path); + } + } + + for binary in [ + "google-chrome-stable", + "google-chrome", + "chromium", + "chromium-browser", + ] { + let output = Command::new("bash") + .arg("-lc") + .arg(format!("command -v {binary}")) + .output(); + if let Ok(output) = output { + if output.status.success() { + let path = String::from_utf8_lossy(&output.stdout).trim().to_string(); + if !path.is_empty() { + return Ok(PathBuf::from(path)); + } + } + } + } + + Err(anyhow!( + "Chrome not found; set EVERY_CHANNEL_NBC_CHROME_PATH to a Google Chrome binary" + )) +} + +pub fn resolve_nbc_profile_dir(override_dir: Option<&Path>) -> Result { + if let Some(path) = override_dir { + fs::create_dir_all(path).with_context(|| format!("failed to create {}", path.display()))?; + return Ok(path.to_path_buf()); + } + + if let Ok(path) = env::var("EVERY_CHANNEL_NBC_PROFILE_DIR") { + let path = PathBuf::from(path); + fs::create_dir_all(&path) + .with_context(|| format!("failed to create {}", path.display()))?; + return Ok(path); + } + + let base = env::var_os("XDG_STATE_HOME") + .map(PathBuf::from) + .or_else(|| env::var_os("HOME").map(|home| PathBuf::from(home).join(".local/state"))) + .unwrap_or_else(|| PathBuf::from(".cache")); + let path = base.join("every-channel/nbc-chrome-profile"); + fs::create_dir_all(&path).with_context(|| format!("failed to create {}", path.display()))?; + Ok(path) +} + +fn cleanup_nbc_profile_lockfiles(profile_dir: &Path) { + for name in ["SingletonLock", "SingletonCookie", "SingletonSocket"] { + let path = profile_dir.join(name); + if !path.exists() { + continue; + } + match fs::remove_file(&path) { + Ok(()) => tracing::info!( + path = %path.display(), + "removed stale Chrome profile lockfile before NBC launch" + ), + Err(err) if err.kind() == std::io::ErrorKind::IsADirectory => { + if let Err(err) = fs::remove_dir_all(&path) { + tracing::debug!( + path = %path.display(), + "failed to remove stale Chrome profile lock directory: {err:#}" + ); + } + } + Err(err) => tracing::debug!( + path = %path.display(), + "failed to remove stale Chrome profile lockfile: {err:#}" + ), + } + } +} + +pub fn bootstrap_nbc_auth( + chrome_path: PathBuf, + profile_dir: PathBuf, + url: String, + screenshot_out: Option, +) -> Result { + let chrome = launch_browser(chrome_path, profile_dir)?; + let tab = chrome.browser().new_tab()?; + let _ = tab.enable_stealth_mode(); + let trace = Arc::new(StdMutex::new(NbcTraceState::default())); + register_nbc_trace_handlers(&tab, trace.clone())?; + tab.navigate_to(&url)?; + tab.wait_until_navigated()?; + + let outcome = wait_for_nbc_playback( + &tab, + &trace, + AuthMode::AllowInteractive { + timeout: nbc_bootstrap_timeout(), + }, + screenshot_out, + )?; + + Ok(BootstrapResult { + title: outcome.state.title, + page_url: outcome.state.page_url, + interactive_auth_required: outcome.interactive_auth_required, + authorized: nbc_trace_is_authorized(&outcome.trace), + screenshot_path: outcome.screenshot_path, + }) +} + +pub fn spawn_nbc_frame_reader( + chrome_path: PathBuf, + profile_dir: PathBuf, + url: String, +) -> Result { + let (frame_tx, frame_rx) = mpsc::sync_channel(8); + let (ready_tx, ready_rx) = mpsc::sync_channel(1); + std::thread::spawn(move || { + let result = run_nbc_capture_loop(chrome_path, profile_dir, url, frame_tx, ready_tx); + if let Err(err) = result { + tracing::warn!("NBC capture loop failed: {err:#}"); + } + }); + + match ready_rx.recv_timeout(nbc_capture_timeout()) { + Ok(Ok(())) => Ok(FramePipeReader::new(frame_rx)), + Ok(Err(err)) => Err(err), + Err(_) => Err(anyhow!( + "timed out waiting for the NBC browser session to become ready" + )), + } +} + +fn reserve_devtools_port() -> Result { + let listener = TcpListener::bind(("127.0.0.1", 0)) + .context("failed to reserve a local Chrome DevTools port")?; + let port = listener + .local_addr() + .context("failed to read reserved Chrome DevTools port")? + .port(); + drop(listener); + Ok(port) +} + +fn fetch_devtools_ws_url(port: u16) -> Result> { + let mut stream = match TcpStream::connect(("127.0.0.1", port)) { + Ok(stream) => stream, + Err(_) => return Ok(None), + }; + let _ = stream.set_read_timeout(Some(Duration::from_millis(500))); + let _ = stream.set_write_timeout(Some(Duration::from_millis(500))); + stream + .write_all( + format!( + "GET /json/version HTTP/1.1\r\nHost: 127.0.0.1:{port}\r\nConnection: close\r\n\r\n" + ) + .as_bytes(), + ) + .context("failed to query Chrome DevTools version endpoint")?; + let mut response = Vec::new(); + let mut chunk = [0_u8; 4096]; + loop { + match stream.read(&mut chunk) { + Ok(0) => break, + Ok(read) => { + response.extend_from_slice(&chunk[..read]); + let text = String::from_utf8_lossy(&response); + if let Some(payload) = text.split("\r\n\r\n").nth(1) { + if let Ok(value) = serde_json::from_str::(payload.trim()) { + return Ok(value + .get("webSocketDebuggerUrl") + .and_then(|value| value.as_str()) + .map(|value| normalize_devtools_ws_url(value, port)) + .transpose()?); + } + } + } + Err(err) if is_retryable_devtools_io(&err) => { + break; + } + Err(err) => return Err(err).context("failed reading Chrome DevTools version response"), + } + } + let text = String::from_utf8_lossy(&response); + let payload = text.split("\r\n\r\n").nth(1).unwrap_or_default().trim(); + if payload.is_empty() { + return Ok(None); + } + let value: serde_json::Value = + serde_json::from_str(payload).context("failed to decode Chrome DevTools version JSON")?; + Ok(value + .get("webSocketDebuggerUrl") + .and_then(|value| value.as_str()) + .map(|value| normalize_devtools_ws_url(value, port)) + .transpose()?) +} + +fn is_retryable_devtools_io(err: &std::io::Error) -> bool { + matches!( + err.kind(), + std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut + ) || matches!(err.raw_os_error(), Some(11) | Some(35)) +} + +fn normalize_devtools_ws_url(url: &str, port: u16) -> Result { + let mut parsed = Url::parse(url).context("failed to parse Chrome DevTools websocket URL")?; + if parsed.port().is_none() { + parsed + .set_port(Some(port)) + .map_err(|_| anyhow!("failed to restore Chrome DevTools websocket port"))?; + } + Ok(parsed.to_string()) +} + +fn wait_for_devtools_ws_url(child: &mut Child, port: u16) -> Result { + let deadline = Instant::now() + Duration::from_secs(15); + loop { + if let Some(status) = child + .try_wait() + .context("failed to poll Chrome process state")? + { + return Err(anyhow!( + "Chrome exited with {status} before DevTools became ready" + )); + } + + let now = Instant::now(); + if now >= deadline { + return Err(anyhow!("timed out waiting for Chrome DevTools websocket")); + } + if let Some(url) = fetch_devtools_ws_url(port)? { + return Ok(url); + } + std::thread::sleep( + deadline + .checked_duration_since(now) + .unwrap_or_else(|| Duration::from_millis(0)) + .min(Duration::from_millis(250)), + ); + } +} + +fn launch_browser(chrome_path: PathBuf, profile_dir: PathBuf) -> Result { + let enable_gpu = nbc_enable_chrome_gpu(); + let debug_port = reserve_devtools_port()?; + cleanup_nbc_profile_lockfiles(&profile_dir); + let mut cmd = Command::new(&chrome_path); + cmd.arg(format!("--remote-debugging-port={debug_port}")); + cmd.arg("--verbose"); + cmd.arg("--log-level=0"); + cmd.arg("--no-first-run"); + cmd.arg(format!("--user-data-dir={}", profile_dir.display())); + cmd.arg("--window-size=1440,960"); + cmd.arg("--ignore-certificate-errors"); + cmd.args(NBC_CHROME_BASE_ARGS); + if let Some(proxy_server) = nbc_proxy_server() { + cmd.arg(format!("--proxy-server={proxy_server}")); + if let Some(proxy_bypass_list) = nbc_proxy_bypass_list() { + cmd.arg(format!("--proxy-bypass-list={proxy_bypass_list}")); + } + } + if nbc_disable_chrome_sandbox() { + cmd.arg("--no-sandbox"); + cmd.arg("--disable-setuid-sandbox"); + } + if !enable_gpu { + cmd.arg("--disable-gpu"); + cmd.arg("--disable-software-rasterizer"); + } + cmd.arg("about:blank"); + cmd.stdin(Stdio::null()); + cmd.stdout(Stdio::null()); + cmd.stderr(Stdio::piped()); + + tracing::info!(binary = %chrome_path.display(), "launching external Chrome for NBC worker"); + let mut child = cmd + .spawn() + .with_context(|| format!("failed to spawn Chrome at {}", chrome_path.display()))?; + tracing::info!(pid = child.id(), "started external Chrome for NBC worker"); + + let stderr = child + .stderr + .take() + .ok_or_else(|| anyhow!("Chrome stderr unavailable"))?; + std::thread::spawn(move || { + for line in BufReader::new(stderr).lines() { + match line { + Ok(line) => tracing::debug!(target: "ec_node::nbc.chrome", "{line}"), + Err(err) => { + tracing::debug!("failed reading Chrome stderr: {err:#}"); + break; + } + } + } + }); + + let ws_url = wait_for_devtools_ws_url(&mut child, debug_port)?; + let browser = Browser::connect_with_timeout(ws_url, nbc_capture_timeout()) + .context("failed to attach to Chrome DevTools for NBC worker")?; + Ok(ChromeSession { browser, child }) +} + +fn run_nbc_capture_loop( + chrome_path: PathBuf, + profile_dir: PathBuf, + url: String, + frame_tx: mpsc::SyncSender>, + ready_tx: mpsc::SyncSender>, +) -> Result<()> { + let ready_tx_err = ready_tx.clone(); + let result: Result<()> = (|| { + let chrome = launch_browser(chrome_path, profile_dir)?; + let tab = chrome.browser().new_tab()?; + let _ = tab.enable_stealth_mode(); + let trace = Arc::new(StdMutex::new(NbcTraceState::default())); + register_nbc_trace_handlers(&tab, trace.clone())?; + tab.navigate_to(&url)?; + tab.wait_until_navigated()?; + wait_for_nbc_playback(&tab, &trace, AuthMode::Forbidden, None)?; + + let frame_interval = Duration::from_millis(1000 / nbc_capture_fps().max(1)); + let quality = nbc_capture_quality(); + let mut first_frame = true; + + loop { + kick_nbc_player(&tab).ok(); + let frame = tab + .find_element("video") + .and_then(|video| { + video.parent.capture_screenshot( + Page::CaptureScreenshotFormatOption::Jpeg, + Some(quality), + Some(video.get_box_model()?.content_viewport()), + true, + ) + }) + .or_else(|_| { + tab.capture_screenshot( + Page::CaptureScreenshotFormatOption::Jpeg, + Some(quality), + None, + true, + ) + })?; + + if first_frame { + first_frame = false; + let _ = ready_tx.send(Ok(())); + } + + if frame_tx.send(frame).is_err() { + break; + } + std::thread::sleep(frame_interval); + } + + Ok(()) + })(); + + if let Err(err) = &result { + let _ = ready_tx_err.send(Err(anyhow!(err.to_string()))); + } + + result +} + +fn register_nbc_trace_handlers(tab: &Arc, trace: Arc>) -> Result<()> { + tab.register_response_handling( + "nbc-trace", + Box::new(move |event, fetch_body| { + let url = event.response.url.clone(); + let status = event.response.status as u16; + if !(200..400).contains(&status) { + return; + } + + let mut trace_state = match trace.lock() { + Ok(guard) => guard, + Err(_) => return, + }; + + if update_nbc_trace_from_url(&mut trace_state, &url) { + return; + } + if url.contains( + "tokenverifier.digitalsvc.apps.nbcuni.com/tokenverifier/entitlement/verifier", + ) { + if let Ok(body) = fetch_body() { + if !body.base_64_encoded && body.body.contains("\"valid\":true") { + trace_state.token_verified = true; + } + } + } + }), + )?; + tab.register_loading_failed_handling( + "nbc-trace-loading-failed", + Box::new(move |response, failure| { + tracing::warn!( + url = %response.response.url, + status = response.response.status, + error_text = %failure.error_text, + canceled = failure.canceled, + "NBC request failed while waiting for playback" + ); + }), + )?; + Ok(()) +} + +fn update_nbc_trace_from_url(trace_state: &mut NbcTraceState, url: &str) -> bool { + let mut matched = false; + if url.contains("friendship.nbc.com/v3/graphql") { + trace_state.friendship_ready = true; + matched = true; + } + if url.contains("sp.auth.adobe.com/adobe-services/session") { + trace_state.adobe_session_ready = true; + matched = true; + } + if url.contains("sp.auth.adobe.com/adobe-services/authorize") { + trace_state.adobe_authorized = true; + matched = true; + } + if url.contains("sp.auth.adobe.com/adobe-services/shortAuthorize") { + trace_state.short_authorized = true; + matched = true; + } + if url.contains("mt.ssai-oneapp.nbcuni.com/") { + trace_state.mt_session_ready = true; + matched = true; + } + if url.contains("entitlement.auth.adobe.com/entitlement/v4/completeBackgroundLogin.html") { + trace_state.background_login_complete = true; + matched = true; + } + matched +} + +fn nbc_trace_is_authorized(trace_state: &NbcTraceState) -> bool { + trace_state.token_verified + || trace_state.short_authorized + || trace_state.mt_session_ready + || trace_state.adobe_authorized + || trace_state.background_login_complete +} + +fn nbc_url_requires_interactive_auth(url: &str) -> bool { + let Ok(url) = Url::parse(url) else { + return false; + }; + let host = url.host_str().unwrap_or_default().to_ascii_lowercase(); + let path = url.path().to_ascii_lowercase(); + host.contains("auth.adobe.com") + || host.contains("verizon") + || path.contains("mvpd") + || path.contains("signin") + || path.contains("login") +} + +fn nbc_page_is_watch_surface(url: &str) -> bool { + let Ok(url) = Url::parse(url) else { + return false; + }; + let host = url.host_str().unwrap_or_default().to_ascii_lowercase(); + host.ends_with("nbc.com") || host.ends_with(".nbc.com") +} + +fn kick_nbc_player(tab: &Arc) -> Result<()> { + let script = r#" + (() => { + const labels = [ + "watch", + "watch live", + "watch now", + "watch live tv", + "play", + "resume", + "continue", + "start watching", + "start watching now", + "sign in", + "link tv provider", + "unlock", + ]; + const lower = (value) => (value || "").trim().toLowerCase(); + const nodes = Array.from(document.querySelectorAll("button,a,[role='button']")); + const candidate = nodes.find((node) => { + const text = lower(node.innerText || node.textContent || ""); + return labels.some((label) => text === label || text.startsWith(label + " ")); + }); + if (candidate) candidate.click(); + const video = document.querySelector("video"); + if (!video) return false; + video.muted = true; + video.controls = false; + if (typeof video.play === "function") { + Promise.resolve(video.play()).catch(() => {}); + } + return true; + })(); + "#; + let _ = tab.evaluate(script, true)?; + Ok(()) +} + +fn advance_nbc_auth_flow(tab: &Arc) -> Result> { + let provider = nbc_mvpd_provider_name(); + let provider_json = serde_json::to_string(&provider)?; + let provider_needles = serde_json::to_string(&{ + let mut needles = vec![provider.to_ascii_lowercase()]; + for token in provider + .split(|ch: char| !ch.is_ascii_alphanumeric()) + .filter(|token| !token.is_empty()) + { + let token = token.to_ascii_lowercase(); + if !needles.contains(&token) { + needles.push(token); + } + } + needles + })?; + let script = format!( + r#" + JSON.stringify((() => {{ + const provider = {provider_json}; + const providerNeedles = {provider_needles}; + const normalize = (value) => (value || "").replace(/\s+/g, " ").trim().toLowerCase(); + const visible = (node) => {{ + if (!node || typeof node.getBoundingClientRect !== "function") return false; + const rect = node.getBoundingClientRect(); + const style = window.getComputedStyle(node); + return rect.width > 0 && + rect.height > 0 && + style.display !== "none" && + style.visibility !== "hidden" && + style.opacity !== "0"; + }}; + const textOf = (node) => normalize([ + node.innerText || node.textContent || "", + node.getAttribute?.("aria-label") || "", + node.getAttribute?.("title") || "", + node.getAttribute?.("name") || "", + node.id || "", + node.getAttribute?.("data-provider-name") || "", + ].join(" ")); + const clickNode = (node, action) => {{ + if (!node || !visible(node)) return false; + node.scrollIntoView?.({{ block: "center", inline: "center" }}); + node.dispatchEvent?.(new MouseEvent("mouseover", {{ bubbles: true, cancelable: true, view: window }})); + node.dispatchEvent?.(new MouseEvent("mousedown", {{ bubbles: true, cancelable: true, view: window }})); + node.dispatchEvent?.(new MouseEvent("mouseup", {{ bubbles: true, cancelable: true, view: window }})); + node.click?.(); + actions.push(action); + return true; + }}; + const setValue = (node, value) => {{ + if (!node) return false; + const prototype = Object.getPrototypeOf(node); + const descriptor = prototype && Object.getOwnPropertyDescriptor(prototype, "value"); + if (descriptor && descriptor.set) {{ + descriptor.set.call(node, value); + }} else {{ + node.value = value; + }} + node.focus?.(); + node.dispatchEvent(new Event("input", {{ bubbles: true }})); + node.dispatchEvent(new Event("change", {{ bubbles: true }})); + actions.push(`typed:${{value}}`); + return true; + }}; + const actions = []; + const url = window.location.href || ""; + const candidates = Array.from( + document.querySelectorAll( + "button,a,[role='button'],label,li,div,span,[data-provider-name],[data-provider-id],[data-provider]" + ) + ); + + if (url.includes("mvpd")) {{ + const providerCta = candidates.find((node) => {{ + const text = textOf(node); + return visible(node) && + ( + text === "link tv provider" || + text === "link provider" || + text.startsWith("link tv provider ") || + text.startsWith("link provider ") + ); + }}); + clickNode(providerCta, "click:link-provider"); + + const fullListNode = candidates.find((node) => {{ + const text = textOf(node); + return visible(node) && (text === "full list" || text.startsWith("full list ")); + }}); + clickNode(fullListNode, "click:full-list"); + + const input = Array.from(document.querySelectorAll("input,textarea,select")).find((node) => {{ + if (node.disabled || node.type === "hidden") return false; + const hint = normalize([ + node.getAttribute("placeholder") || "", + node.getAttribute("aria-label") || "", + node.getAttribute("name") || "", + node.id || "", + node.type || "", + node.getAttribute("data-provider-name") || "", + node.getAttribute("data-provider-id") || "", + ].join(" ")); + return hint.includes("search") || hint.includes("provider") || node.type === "search"; + }}); + if (input && normalize(input.value || "") !== normalize(provider)) {{ + setValue(input, provider); + }} + + const providerNode = candidates.find((node) => {{ + const text = textOf(node); + const providerMeta = normalize([ + node.getAttribute?.("data-provider-name") || "", + node.getAttribute?.("data-provider-id") || "", + node.getAttribute?.("data-provider") || "", + ].join(" ")); + return visible(node) && providerNeedles.some((needle) => + text.includes(needle) || providerMeta.includes(needle) + ); + }}); + clickNode(providerNode, `click:provider:${{textOf(providerNode).slice(0, 120)}}`); + }} + + return {{ + pageUrl: url, + title: document.title || "", + actions, + }}; + }})()) + "# + ); + let value = tab + .evaluate(&script, false)? + .value + .and_then(|value| value.as_str().map(|value| value.to_string())) + .and_then(|value| serde_json::from_str::(&value).ok()); + Ok(value.filter(|value| !value.actions.is_empty())) +} + +fn probe_nbc_page_clues(tab: &Arc) -> Option { + let value = tab + .evaluate( + r#" + JSON.stringify((() => { + const visible = (node) => { + if (!node || typeof node.getBoundingClientRect !== "function") return false; + const rect = node.getBoundingClientRect(); + const style = window.getComputedStyle(node); + return rect.width > 0 && + rect.height > 0 && + style.display !== "none" && + style.visibility !== "hidden" && + style.opacity !== "0"; + }; + const summarize = (node) => { + const pieces = [ + node.innerText || node.textContent || "", + node.getAttribute?.("aria-label") || "", + node.getAttribute?.("title") || "", + ] + .map((value) => (value || "").replace(/\s+/g, " ").trim()) + .filter(Boolean); + return pieces.join(" | ").slice(0, 160); + }; + const ctas = Array.from(document.querySelectorAll("button,a,[role='button']")) + .filter(visible) + .map(summarize) + .filter(Boolean) + .slice(0, 20); + const iframes = Array.from(document.querySelectorAll("iframe")) + .map((frame) => frame.src || "") + .filter(Boolean) + .slice(0, 12); + const inputs = Array.from(document.querySelectorAll("input,textarea,select")) + .map((node) => [ + node.tagName || "", + node.getAttribute?.("type") || "", + node.getAttribute?.("name") || "", + node.id || "", + node.getAttribute?.("placeholder") || "", + node.getAttribute?.("aria-label") || "", + node.getAttribute?.("data-provider-name") || "", + node.getAttribute?.("data-provider-id") || "", + ] + .map((value) => (value || "").replace(/\s+/g, " ").trim()) + .filter(Boolean) + .join(" | ")) + .filter(Boolean) + .slice(0, 20); + const providerCandidates = Array.from(document.querySelectorAll( + "[data-provider-name],[data-provider-id],[data-provider]" + )) + .map(summarize) + .filter(Boolean) + .slice(0, 20); + const bodyText = (document.body?.innerText || "") + .replace(/\s+/g, " ") + .trim() + .slice(0, 1200); + return { + bodyText, + ctas, + iframes, + inputs, + providerCandidates, + videoCount: document.querySelectorAll("video").length, + }; + })()) + "#, + false, + ) + .ok()? + .value + .and_then(|value| value.as_str().map(|value| value.to_string())) + .and_then(|value| serde_json::from_str::(&value).ok())?; + Some(value) +} + +fn probe_nbc_video(tab: &Arc) -> Result { + let fallback_page_url = tab.get_url(); + let fallback_title = tab.get_title().unwrap_or_default(); + let value = match tab.evaluate( + r#" + JSON.stringify((() => { + const video = document.querySelector("video"); + return { + title: document.title || "", + pageUrl: window.location.href || "", + documentReadyState: document.readyState || "", + hasVideo: Boolean(video), + currentTime: video?.currentTime || 0, + readyState: video?.readyState || 0, + paused: video ? Boolean(video.paused) : true, + width: video?.videoWidth || 0, + height: video?.videoHeight || 0, + }; + })()) + "#, + false, + ) { + Ok(result) => result + .value + .and_then(|value| value.as_str().map(|value| value.to_string())) + .and_then(|value| serde_json::from_str::(&value).ok()) + .unwrap_or_else(|| serde_json::json!({})), + Err(err) => { + tracing::debug!( + page_url = %fallback_page_url, + title = %fallback_title, + "failed to evaluate NBC video probe: {err:#}" + ); + serde_json::json!({}) + } + }; + + Ok(NbcVideoState { + title: value + .get("title") + .and_then(|value| value.as_str()) + .filter(|value| !value.is_empty()) + .unwrap_or(&fallback_title) + .to_string(), + page_url: value + .get("pageUrl") + .and_then(|value| value.as_str()) + .filter(|value| !value.is_empty()) + .unwrap_or(&fallback_page_url) + .to_string(), + document_ready_state: value + .get("documentReadyState") + .and_then(|value| value.as_str()) + .unwrap_or_default() + .to_string(), + has_video: value + .get("hasVideo") + .and_then(|value| value.as_bool()) + .unwrap_or(false), + current_time: value + .get("currentTime") + .and_then(|value| value.as_f64()) + .unwrap_or(0.0), + ready_state: value + .get("readyState") + .and_then(|value| value.as_i64()) + .unwrap_or(0), + paused: value + .get("paused") + .and_then(|value| value.as_bool()) + .unwrap_or(true), + width: value + .get("width") + .and_then(|value| value.as_u64()) + .unwrap_or(0), + height: value + .get("height") + .and_then(|value| value.as_u64()) + .unwrap_or(0), + }) +} + +fn wait_for_nbc_playback( + tab: &Arc, + trace: &Arc>, + auth_mode: AuthMode, + screenshot_out: Option, +) -> Result { + let deadline = Instant::now() + nbc_capture_timeout(); + let mut interactive_deadline = None::; + let mut interactive_auth_required = false; + let mut screenshot_path = None::; + let mut last_state = None::; + let mut last_trace_state = None::; + let mut last_log = Instant::now() - Duration::from_secs(10); + let mut last_clue_log = Instant::now() - Duration::from_secs(30); + let mut resumed_after_background_login = false; + let mut resumed_after_authenticated_surface = false; + let mut watch_surface_seen_at = None::; + + loop { + kick_nbc_player(tab).ok(); + if let Some(progress) = advance_nbc_auth_flow(tab).ok().flatten() { + tracing::info!( + title = %progress.title, + page_url = %progress.page_url, + actions = ?progress.actions, + "advanced NBC interactive auth flow" + ); + } + let state = probe_nbc_video(tab).unwrap_or_default(); + let trace_state = trace.lock().map(|state| state.clone()).unwrap_or_default(); + let authorized = nbc_trace_is_authorized(&trace_state); + + if last_log.elapsed() >= Duration::from_secs(5) { + last_log = Instant::now(); + tracing::info!( + title = %state.title, + page_url = %state.page_url, + document_ready_state = %state.document_ready_state, + has_video = state.has_video, + current_time = state.current_time, + ready_state = state.ready_state, + paused = state.paused, + authorized, + interactive_auth_required, + "waiting for NBC playback" + ); + } + + if nbc_url_requires_interactive_auth(&state.page_url) && !authorized { + if !interactive_auth_required { + interactive_auth_required = true; + if let Some(path) = screenshot_out.as_ref() { + save_tab_screenshot(tab, path).ok(); + screenshot_path = Some(path.clone()); + } + tracing::info!( + page_url = %state.page_url, + "NBC interactive auth required" + ); + interactive_deadline = match auth_mode { + AuthMode::Forbidden => { + return Err(anyhow!( + "interactive NBC auth required at {}; run `ec-node nbc-bootstrap` against this URL first", + state.page_url + )); + } + AuthMode::AllowInteractive { timeout } => Some(Instant::now() + timeout), + }; + } + } + + if trace_state.background_login_complete && !resumed_after_background_login { + resumed_after_background_login = true; + tracing::info!( + title = %state.title, + "NBC background login completed; reloading watch page" + ); + let _ = tab.evaluate("window.location.reload()", true); + std::thread::sleep(Duration::from_secs(2)); + continue; + } + + let fully_loaded_watch_surface = nbc_page_is_watch_surface(&state.page_url) + && state.document_ready_state.eq_ignore_ascii_case("complete"); + if fully_loaded_watch_surface + && !state.has_video + && last_clue_log.elapsed() >= Duration::from_secs(15) + { + last_clue_log = Instant::now(); + if let Some(clues) = probe_nbc_page_clues(tab) { + tracing::info!( + ctas = ?clues.ctas, + iframes = ?clues.iframes, + inputs = ?clues.inputs, + provider_candidates = ?clues.provider_candidates, + video_count = clues.video_count, + body_text = %clues.body_text, + "NBC watch surface clues" + ); + } + } + if fully_loaded_watch_surface && !state.has_video { + watch_surface_seen_at.get_or_insert_with(Instant::now); + } else { + watch_surface_seen_at = None; + } + if fully_loaded_watch_surface + && !state.has_video + && !resumed_after_authenticated_surface + && watch_surface_seen_at + .map(|seen_at| seen_at.elapsed() >= Duration::from_secs(3)) + .unwrap_or(false) + { + resumed_after_authenticated_surface = true; + tracing::info!( + title = %state.title, + page_url = %state.page_url, + "NBC browser is on a fully loaded watch surface without video; reloading once" + ); + let _ = tab.evaluate("window.location.reload()", true); + std::thread::sleep(Duration::from_secs(2)); + continue; + } + + if state.has_video + && state.width > 0 + && state.height > 0 + && !state.paused + && (state.current_time > 0.0 || state.ready_state >= 2) + { + return Ok(WaitOutcome { + state, + trace: trace_state, + interactive_auth_required, + screenshot_path, + }); + } + + last_state = Some(state); + last_trace_state = Some(trace_state); + + let now = Instant::now(); + if now >= deadline { + break; + } + if let Some(interactive_deadline) = interactive_deadline { + if now >= interactive_deadline { + break; + } + } + std::thread::sleep(Duration::from_secs(1)); + } + + let last_state = last_state.unwrap_or_default(); + let last_trace_state = last_trace_state.unwrap_or_default(); + if screenshot_path.is_none() { + if let Some(path) = screenshot_out.as_ref() { + if save_tab_screenshot(tab, path).is_ok() { + screenshot_path = Some(path.clone()); + } + } + } + + Err(anyhow!( + "timed out waiting for NBC playback (title='{}', page_url='{}', current_time={}, authorized={}, interactive_auth_required={})", + last_state.title, + last_state.page_url, + last_state.current_time, + nbc_trace_is_authorized(&last_trace_state), + interactive_auth_required, + )) +} + +fn save_tab_screenshot(tab: &Arc, path: &Path) -> Result<()> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent) + .with_context(|| format!("failed to create {}", parent.display()))?; + } + let screenshot = + tab.capture_screenshot(Page::CaptureScreenshotFormatOption::Png, None, None, true)?; + fs::write(path, screenshot).with_context(|| format!("failed to write {}", path.display()))?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn interactive_auth_url_detection_catches_mvpd_hosts() { + assert!(nbc_url_requires_interactive_auth( + "https://sp.auth.adobe.com/adobe-services/authenticate/saml" + )); + assert!(nbc_url_requires_interactive_auth( + "https://secure.verizon.com/signin" + )); + assert!(!nbc_url_requires_interactive_auth( + "https://www.nbc.com/watch/some-show" + )); + } + + #[test] + fn trace_updates_mark_authorization_edges() { + let mut trace = NbcTraceState::default(); + assert!(update_nbc_trace_from_url( + &mut trace, + "https://sp.auth.adobe.com/adobe-services/authorize" + )); + assert!(nbc_trace_is_authorized(&trace)); + assert!(update_nbc_trace_from_url( + &mut trace, + "https://entitlement.auth.adobe.com/entitlement/v4/completeBackgroundLogin.html" + )); + assert!(trace.background_login_complete); + } +} diff --git a/docs/ETHEREUM_NODE_ECP_FORGE.md b/docs/ETHEREUM_NODE_ECP_FORGE.md new file mode 100644 index 0000000..51798c8 --- /dev/null +++ b/docs/ETHEREUM_NODE_ECP_FORGE.md @@ -0,0 +1,65 @@ +# Ethereum Nodes on `ecp-forge` + +This runbook covers the dual-network Ethereum full-node surface introduced by [ECP-0104](/Users/conradev/Projects/every.channel/evolution/proposals/ECP-0104-ecp-forge-dual-ethereum-full-nodes-on-dedicated-nvme-zfs.md). + +## Scope + +- `ecp-forge` owns a dedicated NVMe-backed ZFS pool for Ethereum state. +- Ethereum mainnet and Sepolia both run as full nodes. +- Execution uses Reth and consensus uses Lighthouse. +- Public HTTPS on `eth.every.channel` exposes sync and finality status only. +- Raw execution RPC, Engine API, and Beacon API stay bound to `127.0.0.1` on `ecp-forge`. + +## Deploy + +```sh +./scripts/deploy-ecp-forge.sh +``` + +## Verify + +Storage: + +```sh +ssh -o BatchMode=yes -o IdentityAgent=none -o IdentitiesOnly=yes -i ~/.ssh/id_ed25519 root@git.every.channel \ + 'zpool list eth && zfs list -r eth' +``` + +Core services: + +```sh +ssh -o BatchMode=yes -o IdentityAgent=none -o IdentitiesOnly=yes -i ~/.ssh/id_ed25519 root@git.every.channel \ + 'systemctl is-active every-channel-ethereum-storage podman-every-channel-ethereum-mainnet-reth podman-every-channel-ethereum-mainnet-lighthouse podman-every-channel-ethereum-sepolia-reth podman-every-channel-ethereum-sepolia-lighthouse' +``` + +Public sync status: + +```sh +curl -fsS https://eth.every.channel/mainnet/sync | jq . +curl -fsS https://eth.every.channel/sepolia/sync | jq . +``` + +## Local-only endpoints on `ecp-forge` + +Mainnet: + +- execution HTTP: `127.0.0.1:8545` +- execution WS: `127.0.0.1:8546` +- execution Engine API: `127.0.0.1:8551` +- beacon API: `127.0.0.1:5052` + +Sepolia: + +- execution HTTP: `127.0.0.1:18545` +- execution WS: `127.0.0.1:18546` +- execution Engine API: `127.0.0.1:18551` +- beacon API: `127.0.0.1:15052` + +## Notes + +- The dedicated Ethereum pool is `eth`, not `tank`. +- Both networks are configured for full sync, not archive mode. +- Lighthouse is configured to permit full beacon sync from genesis via + `--allow-insecure-genesis-sync` instead of checkpoint bootstrap. +- The first public host surface is intentionally sync and finality only; extend `eth.every.channel` + later if authenticated RPC is required. diff --git a/docs/USAGE.md b/docs/USAGE.md index 16c4880..03f9fae 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -72,6 +72,19 @@ Notes: - adjust startup timeout / capture rate with `EVERY_CHANNEL_NBC_CAPTURE_TIMEOUT_SECS`, `EVERY_CHANNEL_NBC_CAPTURE_FPS`, and `EVERY_CHANNEL_NBC_CAPTURE_QUALITY` +On Linux / forge hosts, the equivalent worker path lives in `ec-node`: + +- warm auth with + `ec-node nbc-bootstrap --source-url 'https://www.nbc.com/live?brand=nbc-sports-philadelphia'` +- publish with + `ec-node nbc-wt-publish --url https://cdn.moq.dev/anon --name forge-nbc-sports-philly --source-url 'https://www.nbc.com/live?brand=nbc-sports-philadelphia'` +- for unattended hosts, persist the Chrome profile with `EVERY_CHANNEL_NBC_PROFILE_DIR=/path/to/profile` +- the NixOS module exposes `services.every-channel.ec-node.nbc.*` for a persistent Xvfb display plus + an optional local-only VNC bridge so MVPD auth can be completed only when the session is cold +- on Linux virtual displays, the worker disables Chrome GPU acceleration by default; only set + `EVERY_CHANNEL_NBC_ENABLE_GPU=1` if the host has a real GL-capable display path +- the forge path is also currently video-first; audio is still a follow-up item + Linux DVB sources can be added with a URL like: ``` diff --git a/evolution/proposals/ECP-0101-decode-archived-object-frames-before-hls-replay.md b/evolution/proposals/ECP-0101-decode-archived-object-frames-before-hls-replay.md new file mode 100644 index 0000000..942bae2 --- /dev/null +++ b/evolution/proposals/ECP-0101-decode-archived-object-frames-before-hls-replay.md @@ -0,0 +1,15 @@ +# ECP-0101: Decode Archived Object Frames Before HLS Replay + +## Why + +`wt-archive` stores raw MoQ object frames in CAS. `wt-archive-serve` was replaying those framed bytes directly as `init.mp4` and `.m4s`, which yields HLS playlists that look valid but cannot be decoded by players. + +## Decision + +Decode archived object frames back into their media payload bytes before serving replay responses. If archived bytes are already raw payloads, pass them through unchanged. + +## Consequences + +- Local and forge replay can hand archived CMAF data to normal HLS consumers. +- Existing archives remain usable without rewriting CAS objects. +- Replay failures now point at genuinely bad archive contents rather than envelope bytes leaking over HTTP. diff --git a/evolution/proposals/ECP-0102-linux-widevine-nbc-worker-on-ecp-forge.md b/evolution/proposals/ECP-0102-linux-widevine-nbc-worker-on-ecp-forge.md new file mode 100644 index 0000000..78da77d --- /dev/null +++ b/evolution/proposals/ECP-0102-linux-widevine-nbc-worker-on-ecp-forge.md @@ -0,0 +1,35 @@ +# ECP-0102: Linux Widevine NBC Worker on `ecp-forge` + +## Why + +NBC live playback currently exists only in the desktop app: + +- macOS native `WKWebView` for in-app auth and playback capture +- local Chrome fallback for browser-driven playback capture + +`ecp-forge` cannot originate an NBC live stream today because `ec-node` has no NBC source worker, no Adobe auth bootstrap flow, and no Linux browser runtime with Widevine. + +## Decision + +Build a forge-capable NBC source worker around Linux Google Chrome plus a persistent authenticated profile. + +1. Use Google Chrome on `x86_64` Linux, not bare Chromium, as the browser runtime for NBC on forge. +2. Run Chrome in a virtual display session on `ecp-forge` so DRM playback remains in a normal browser path even when no physical display is attached. + Keep the forge worker on a dedicated display number and disable GPU acceleration by default under Xvfb. +3. Persist the NBC/Adobe browser profile on forge so auth warmup is amortized across runs. +4. Add an explicit bootstrap path that surfaces interactive auth only when the stored session is cold or expired. +5. Launch Chrome as an external process and attach over DevTools rather than relying on the crate-managed Chrome launcher path. +6. Publish the resulting captured stream through the existing `ec-node` relay path so archive, replay, and downstream nodes stay unchanged. +7. Expose the forge operator surface as `ec-node nbc-bootstrap` and `ec-node nbc-wt-publish`, with a persistent virtual display on `ecp-forge`. + +## Consequences + +- NBC live origin becomes possible from forge without depending on a user’s macOS desktop session. +- We keep the existing every.channel transport/archive model and only replace the source worker. +- The first forge implementation should be browser-frame capture first; audio and full unattended auth renewal can follow once the Linux worker is stable. + +## Rejected Alternatives + +- Reuse the macOS `WKWebView` path on forge: impossible on Linux. +- Depend on bare Chromium only: rejected because Widevine support is the central requirement. +- Require true headless-only DRM playback from the start: rejected because the safer first implementation is a normal Chrome session inside a virtual display. diff --git a/evolution/proposals/ECP-0103-mullvad-philadelphia-egress-for-forge-nbc-philadelphia.md b/evolution/proposals/ECP-0103-mullvad-philadelphia-egress-for-forge-nbc-philadelphia.md new file mode 100644 index 0000000..ffedbb5 --- /dev/null +++ b/evolution/proposals/ECP-0103-mullvad-philadelphia-egress-for-forge-nbc-philadelphia.md @@ -0,0 +1,24 @@ +# ECP-0103: Mullvad Philadelphia Egress for Forge NBC Philadelphia + +## Why + +The forge-side NBC worker is currently dependent on a reverse-tunneled proxy for US egress. +That is enough to prove the geo-boundary, but it is the wrong long-term operator shape for `NBC Sports Philadelphia`. + +## Decision + +1. Enable the Mullvad daemon on `ecp-forge`. +2. Keep the Mullvad account number out of committed Nix configuration; log in operationally from founder-provided material. +3. Use a Philadelphia Mullvad relay for `NBC Sports Philadelphia` work on forge. +4. Start the forge NBC publish worker after the Mullvad daemon is available. + +## Consequences + +- Forge NBC egress becomes self-contained instead of depending on a local reverse proxy. +- The account credential stays operational-only rather than being copied into repo config. +- Relay choice remains runtime-controlled, so it can be swapped if a specific Philadelphia host degrades. + +## Rejected Alternatives + +- Keep relying on the reverse-tunneled local proxy: rejected because it couples forge origin to a founder workstation. +- Commit the Mullvad account number into NixOS config: rejected because it expands secret exposure for no benefit. diff --git a/evolution/proposals/ECP-0104-ecp-forge-dual-ethereum-full-nodes-on-dedicated-nvme-zfs.md b/evolution/proposals/ECP-0104-ecp-forge-dual-ethereum-full-nodes-on-dedicated-nvme-zfs.md new file mode 100644 index 0000000..3eb7108 --- /dev/null +++ b/evolution/proposals/ECP-0104-ecp-forge-dual-ethereum-full-nodes-on-dedicated-nvme-zfs.md @@ -0,0 +1,45 @@ +# ECP-0104: `ecp-forge` dual Ethereum full nodes on dedicated NVMe ZFS + +## Why + +`every.channel` now has OP Stack and Ethereum contract rails on `ecp-forge`, but it still depends on +public L1 RPC and beacon providers for Ethereum itself. + +The forge host has enough raw capacity in `tank`, but `tank` is a large HDD `raidz3` pool that is +well suited for archive bytes, not for the random-read/write pattern of concurrent Ethereum +execution and consensus sync. The same host also has a free 7 TB NVMe device, which is the right +media class for a self-hosted node. + +## Decision + +1. Create a dedicated single-device ZFS pool for Ethereum on the free NVMe in `ecp-forge`. +2. Run two full-sync Ethereum node pairs on that pool: + - Ethereum mainnet + - Ethereum Sepolia +3. Use Reth for execution and Lighthouse for consensus. +4. Keep raw execution and beacon RPC local-only on `ecp-forge` for the first tranche. +5. Publish sync and finality status on `https://eth.every.channel`. +6. Because this tranche is explicitly full-sync from genesis, run Lighthouse with + `--allow-insecure-genesis-sync` instead of checkpoint bootstrap. + +## Consequences + +- `every.channel` gets repo-owned L1 execution and consensus rails for both mainnet and Sepolia. +- Ethereum state no longer competes with `tank` archive I/O. +- The first public surface is intentionally conservative: health and sync visibility, not + unauthenticated public JSON-RPC. +- The dedicated Ethereum pool is single-device NVMe, so it optimizes for node performance rather + than storage redundancy. +- Consensus sync starts from genesis without checkpoint assistance, which is slower but matches the + requested full-sync posture. + +## Rejected Alternatives + +- Put Ethereum state on `tank`: rejected because the HDD `raidz3` pool is the wrong latency profile + for concurrent execution and consensus sync. +- Keep using only public upstream Ethereum providers: rejected because the OP Stack and settlement + rails should not depend on third-party RPC availability. +- Expose raw JSON-RPC on `eth.every.channel` immediately: rejected for the first tranche because it + creates an unauthenticated public abuse surface before auth and rate policy exists. +- Use checkpoint sync for Lighthouse: rejected for this tranche because the requested posture is + full sync from genesis on both networks. diff --git a/flake.nix b/flake.nix index 8ac0618..eadbdf7 100644 --- a/flake.nix +++ b/flake.nix @@ -15,6 +15,7 @@ ec-runner = import ./nix/modules/ec-runner.nix; ec-netboot = import ./nix/modules/ec-netboot.nix; ec-ipxe-qemu = import ./nix/modules/ec-ipxe-qemu.nix; + ec-ethereum = import ./nix/modules/ec-ethereum.nix; ec-op-stack = import ./nix/modules/ec-op-stack.nix; ec-publisher-guest = import ./nix/modules/ec-publisher-guest.nix; default = ec-node; @@ -58,6 +59,7 @@ self.nixosModules.ec-node self.nixosModules.ec-netboot self.nixosModules.ec-ipxe-qemu + self.nixosModules.ec-ethereum self.nixosModules.ec-op-stack ./nix/nixos/ecp-forge.nix ]; diff --git a/nix/modules/ec-ethereum.nix b/nix/modules/ec-ethereum.nix new file mode 100644 index 0000000..48cf406 --- /dev/null +++ b/nix/modules/ec-ethereum.nix @@ -0,0 +1,485 @@ +{ lib, config, pkgs, ... }: + +let + cfg = config.services.every-channel.ethereum; + + mkNetworkSubmodule = + name: defaults: + { ... }: + { + options = { + enable = lib.mkOption { + type = lib.types.bool; + default = true; + description = "Whether to run the ${name} Ethereum execution and consensus pair."; + }; + + rootDir = lib.mkOption { + type = lib.types.str; + default = "${cfg.rootDir}/${name}"; + description = "Persistent root directory for the ${name} node state."; + }; + + reth = { + httpPort = lib.mkOption { + type = lib.types.port; + default = defaults.rethHttpPort; + description = "Local HTTP JSON-RPC port for the ${name} Reth node."; + }; + + wsPort = lib.mkOption { + type = lib.types.port; + default = defaults.rethWsPort; + description = "Local WebSocket JSON-RPC port for the ${name} Reth node."; + }; + + authPort = lib.mkOption { + type = lib.types.port; + default = defaults.rethAuthPort; + description = "Local Engine API port for the ${name} Reth node."; + }; + + p2pPort = lib.mkOption { + type = lib.types.port; + default = defaults.rethP2pPort; + description = "RLPx/P2P TCP port for the ${name} Reth node."; + }; + + discoveryPort = lib.mkOption { + type = lib.types.port; + default = defaults.rethDiscoveryPort; + description = "Discovery UDP port for the ${name} Reth node."; + }; + + metricsPort = lib.mkOption { + type = lib.types.port; + default = defaults.rethMetricsPort; + description = "Prometheus port for the ${name} Reth node."; + }; + }; + + lighthouse = { + httpPort = lib.mkOption { + type = lib.types.port; + default = defaults.lighthouseHttpPort; + description = "Local Beacon API port for the ${name} Lighthouse node."; + }; + + p2pPort = lib.mkOption { + type = lib.types.port; + default = defaults.lighthouseP2pPort; + description = "TCP libp2p port for the ${name} Lighthouse node."; + }; + + discoveryPort = lib.mkOption { + type = lib.types.port; + default = defaults.lighthouseDiscoveryPort; + description = "UDP discovery port for the ${name} Lighthouse node."; + }; + + quicPort = lib.mkOption { + type = lib.types.port; + default = defaults.lighthouseQuicPort; + description = "UDP QUIC port for the ${name} Lighthouse node."; + }; + + metricsPort = lib.mkOption { + type = lib.types.port; + default = defaults.lighthouseMetricsPort; + description = "Prometheus port for the ${name} Lighthouse node."; + }; + }; + }; + }; + + networks = { + mainnet = cfg.mainnet; + sepolia = cfg.sepolia; + }; + + enabledNetworks = lib.filterAttrs (_: networkCfg: networkCfg.enable) networks; + + rethContainerName = network: "every-channel-ethereum-${network}-reth"; + lighthouseContainerName = network: "every-channel-ethereum-${network}-lighthouse"; + + networkDatasetLines = lib.concatStringsSep "\n" ( + lib.mapAttrsToList + (network: networkCfg: '' + ensure_dataset ${lib.escapeShellArg "${cfg.poolName}/${network}"} + ensure_dataset ${lib.escapeShellArg "${cfg.poolName}/${network}/reth"} + ensure_dataset ${lib.escapeShellArg "${cfg.poolName}/${network}/lighthouse"} + ensure_jwt ${lib.escapeShellArg "${networkCfg.rootDir}/jwt.hex"} + '') + enabledNetworks + ); + + mkNatArgs = lib.optionals (cfg.publicIp != null) [ "--nat" "extip:${cfg.publicIp}" ]; + mkEnrArgs = lib.optionals (cfg.publicIp != null) [ "--enr-address" cfg.publicIp ]; + + mkRethContainer = + network: networkCfg: { + image = cfg.images.reth; + autoStart = true; + extraOptions = [ "--network=host" ]; + volumes = [ "${networkCfg.rootDir}:/state" ]; + cmd = + [ + "node" + "--chain" + network + "--datadir" + "/state/reth" + "--full" + "--http" + "--http.addr" + "127.0.0.1" + "--http.port" + (toString networkCfg.reth.httpPort) + "--http.api" + "eth,net,web3,rpc" + "--ws" + "--ws.addr" + "127.0.0.1" + "--ws.port" + (toString networkCfg.reth.wsPort) + "--ws.api" + "eth,net,web3,rpc" + "--authrpc.addr" + "127.0.0.1" + "--authrpc.port" + (toString networkCfg.reth.authPort) + "--authrpc.jwtsecret" + "/state/jwt.hex" + "--port" + (toString networkCfg.reth.p2pPort) + "--discovery.port" + (toString networkCfg.reth.discoveryPort) + "--metrics" + "127.0.0.1:${toString networkCfg.reth.metricsPort}" + "--log.stdout.format" + "json" + ] + ++ mkNatArgs; + }; + + mkLighthouseContainer = + network: networkCfg: { + image = cfg.images.lighthouse; + autoStart = true; + extraOptions = [ "--network=host" ]; + volumes = [ "${networkCfg.rootDir}:/state" ]; + entrypoint = "/usr/local/bin/lighthouse"; + cmd = + [ + "beacon_node" + "--network" + network + "--datadir" + "/state/lighthouse" + "--http" + "--http-address" + "127.0.0.1" + "--http-port" + (toString networkCfg.lighthouse.httpPort) + "--execution-endpoint" + "http://127.0.0.1:${toString networkCfg.reth.authPort}" + "--execution-jwt" + "/state/jwt.hex" + "--allow-insecure-genesis-sync" + "--port" + (toString networkCfg.lighthouse.p2pPort) + "--discovery-port" + (toString networkCfg.lighthouse.discoveryPort) + "--quic-port" + (toString networkCfg.lighthouse.quicPort) + "--metrics" + "--metrics-address" + "127.0.0.1" + "--metrics-port" + (toString networkCfg.lighthouse.metricsPort) + ] + ++ mkEnrArgs; + }; + + caddyRootBody = '' + every.channel ethereum nodes + mainnet sync: /mainnet/sync + mainnet finality: /mainnet/finality + sepolia sync: /sepolia/sync + sepolia finality: /sepolia/finality + raw execution and beacon RPC remain local-only on ecp-forge for now. + ''; +in +{ + options.services.every-channel.ethereum = { + enable = lib.mkEnableOption "every.channel dual-network Ethereum full nodes"; + + poolName = lib.mkOption { + type = lib.types.str; + default = "eth"; + description = "Dedicated ZFS pool name used for Ethereum node state."; + }; + + poolDevice = lib.mkOption { + type = lib.types.nullOr lib.types.str; + default = null; + description = "Block device used to create the dedicated Ethereum ZFS pool if it does not already exist."; + }; + + rootDir = lib.mkOption { + type = lib.types.str; + default = "/eth"; + description = "Mountpoint for the dedicated Ethereum ZFS pool."; + }; + + publicIp = lib.mkOption { + type = lib.types.nullOr lib.types.str; + default = null; + description = "Public IP to advertise in Ethereum P2P metadata."; + }; + + publicHost = lib.mkOption { + type = lib.types.nullOr lib.types.str; + default = null; + description = "Optional HTTPS host that publishes node sync and finality surfaces."; + }; + + images = { + reth = lib.mkOption { + type = lib.types.str; + default = "ghcr.io/paradigmxyz/reth:v1.9.3"; + description = "Pinned Reth OCI image."; + }; + + lighthouse = lib.mkOption { + type = lib.types.str; + default = "docker.io/sigp/lighthouse:v8.1.1"; + description = "Pinned Lighthouse OCI image."; + }; + }; + + mainnet = lib.mkOption { + type = lib.types.submodule (mkNetworkSubmodule "mainnet" { + rethHttpPort = 8545; + rethWsPort = 8546; + rethAuthPort = 8551; + rethP2pPort = 30303; + rethDiscoveryPort = 30303; + rethMetricsPort = 19001; + lighthouseHttpPort = 5052; + lighthouseP2pPort = 9000; + lighthouseDiscoveryPort = 9000; + lighthouseQuicPort = 9001; + lighthouseMetricsPort = 5054; + }); + default = { }; + description = "Mainnet Ethereum node configuration."; + }; + + sepolia = lib.mkOption { + type = lib.types.submodule (mkNetworkSubmodule "sepolia" { + rethHttpPort = 18545; + rethWsPort = 18546; + rethAuthPort = 18551; + rethP2pPort = 31303; + rethDiscoveryPort = 31303; + rethMetricsPort = 29001; + lighthouseHttpPort = 15052; + lighthouseP2pPort = 19000; + lighthouseDiscoveryPort = 19000; + lighthouseQuicPort = 19001; + lighthouseMetricsPort = 15054; + }); + default = { }; + description = "Sepolia Ethereum node configuration."; + }; + }; + + config = lib.mkIf cfg.enable { + assertions = [ + { + assertion = cfg.poolDevice != null; + message = "services.every-channel.ethereum.poolDevice must be set when the Ethereum node is enabled"; + } + { + assertion = enabledNetworks != { }; + message = "At least one Ethereum network must be enabled"; + } + ]; + + boot.zfs.extraPools = [ cfg.poolName ]; + + networking.firewall = { + allowedTCPPorts = + lib.flatten ( + lib.mapAttrsToList + (_: networkCfg: [ + networkCfg.reth.p2pPort + networkCfg.lighthouse.p2pPort + ]) + enabledNetworks + ); + allowedUDPPorts = + lib.flatten ( + lib.mapAttrsToList + (_: networkCfg: [ + networkCfg.reth.discoveryPort + networkCfg.lighthouse.discoveryPort + networkCfg.lighthouse.quicPort + ]) + enabledNetworks + ); + }; + + virtualisation.oci-containers.containers = + (lib.mapAttrs' + (network: networkCfg: + lib.nameValuePair (rethContainerName network) (mkRethContainer network networkCfg)) + enabledNetworks) + // (lib.mapAttrs' + (network: networkCfg: + lib.nameValuePair (lighthouseContainerName network) (mkLighthouseContainer network networkCfg)) + enabledNetworks); + + systemd.services = + { + every-channel-ethereum-storage = { + description = "every.channel Ethereum NVMe ZFS pool and dataset bootstrap"; + wantedBy = [ "multi-user.target" ]; + after = [ "local-fs.target" "zfs.target" ]; + wants = [ "zfs.target" ]; + before = + lib.flatten ( + lib.mapAttrsToList + (network: _: [ + "podman-${rethContainerName network}.service" + "podman-${lighthouseContainerName network}.service" + ]) + enabledNetworks + ); + path = with pkgs; [ + coreutils + openssl + util-linux + zfs + ]; + serviceConfig = { + Type = "oneshot"; + RemainAfterExit = true; + }; + script = '' + set -euo pipefail + + pool=${lib.escapeShellArg cfg.poolName} + root_dir=${lib.escapeShellArg cfg.rootDir} + device=${lib.escapeShellArg cfg.poolDevice} + + ensure_dataset() { + local dataset="$1" + if ! zfs list -H "$dataset" >/dev/null 2>&1; then + zfs create -p "$dataset" + fi + zfs set atime=off compression=lz4 xattr=sa "$dataset" >/dev/null + } + + ensure_jwt() { + local path="$1" + if [[ ! -s "$path" ]]; then + umask 077 + openssl rand -hex 32 | tr -d '\n' > "$path" + printf '\n' >> "$path" + fi + chmod 0400 "$path" + } + + if ! zpool list -H "$pool" >/dev/null 2>&1; then + if [[ -z "$device" ]]; then + echo "every-channel-ethereum-storage: missing poolDevice for pool $pool" >&2 + exit 1 + fi + if [[ ! -b "$device" ]]; then + echo "every-channel-ethereum-storage: device $device not present" >&2 + exit 1 + fi + if blkid "$device" >/dev/null 2>&1; then + echo "every-channel-ethereum-storage: device $device already has signatures; refusing to overwrite automatically" >&2 + exit 1 + fi + + zpool create -f \ + -o ashift=12 \ + -O mountpoint="$root_dir" \ + -O atime=off \ + -O compression=lz4 \ + -O xattr=sa \ + "$pool" "$device" + else + zfs set mountpoint="$root_dir" "$pool" >/dev/null + fi + + ${networkDatasetLines} + ''; + }; + } + // (lib.mapAttrs' + (network: networkCfg: + lib.nameValuePair "podman-${rethContainerName network}" { + after = [ "network-online.target" "every-channel-ethereum-storage.service" ]; + wants = [ "network-online.target" "every-channel-ethereum-storage.service" ]; + requires = [ "every-channel-ethereum-storage.service" ]; + unitConfig.RequiresMountsFor = [ networkCfg.rootDir ]; + }) + enabledNetworks) + // (lib.mapAttrs' + (network: networkCfg: + lib.nameValuePair "podman-${lighthouseContainerName network}" { + after = [ + "network-online.target" + "every-channel-ethereum-storage.service" + "podman-${rethContainerName network}.service" + ]; + wants = [ + "network-online.target" + "every-channel-ethereum-storage.service" + "podman-${rethContainerName network}.service" + ]; + requires = [ + "every-channel-ethereum-storage.service" + "podman-${rethContainerName network}.service" + ]; + unitConfig.RequiresMountsFor = [ networkCfg.rootDir ]; + }) + enabledNetworks); + + services.caddy.virtualHosts = lib.mkIf (cfg.publicHost != null) { + "${cfg.publicHost}".extraConfig = '' + encode zstd gzip + + handle /mainnet/sync { + uri replace /mainnet/sync /eth/v1/node/syncing + reverse_proxy http://127.0.0.1:${toString cfg.mainnet.lighthouse.httpPort} + } + + handle /mainnet/finality { + uri replace /mainnet/finality /eth/v1/beacon/states/head/finality_checkpoints + reverse_proxy http://127.0.0.1:${toString cfg.mainnet.lighthouse.httpPort} + } + + handle /sepolia/sync { + uri replace /sepolia/sync /eth/v1/node/syncing + reverse_proxy http://127.0.0.1:${toString cfg.sepolia.lighthouse.httpPort} + } + + handle /sepolia/finality { + uri replace /sepolia/finality /eth/v1/beacon/states/head/finality_checkpoints + reverse_proxy http://127.0.0.1:${toString cfg.sepolia.lighthouse.httpPort} + } + + handle { + header Content-Type text/plain + respond "${caddyRootBody}" 200 + } + ''; + }; + }; +} diff --git a/nix/modules/ec-node.nix b/nix/modules/ec-node.nix index c13bf0f..59b8a81 100644 --- a/nix/modules/ec-node.nix +++ b/nix/modules/ec-node.nix @@ -266,6 +266,70 @@ in }; }; + nbc = { + enable = lib.mkOption { + type = lib.types.bool; + default = false; + description = "Enable Linux Chrome + virtual-display support for NBC browser-backed broadcasts."; + }; + + chromeBinary = lib.mkOption { + type = lib.types.str; + default = "/run/current-system/sw/bin/google-chrome-stable"; + description = "Chrome binary used by `ec-node nbc-bootstrap` and `ec-node nbc-wt-publish`."; + }; + + profileDir = lib.mkOption { + type = lib.types.str; + default = "/var/lib/every-channel/nbc-profile"; + description = "Persistent Chrome profile directory used for NBC / Adobe auth sessions."; + }; + + authScreenshotDir = lib.mkOption { + type = lib.types.str; + default = "/var/lib/every-channel/nbc-auth"; + description = "Directory for operator-facing screenshots when bootstrap hits an interactive auth page."; + }; + + display = lib.mkOption { + type = lib.types.str; + default = ":99"; + description = "DISPLAY used by the NBC virtual display session."; + }; + + screen = lib.mkOption { + type = lib.types.str; + default = "1920x1080x24"; + description = "Xvfb screen geometry for the NBC virtual display."; + }; + + noSandbox = lib.mkOption { + type = lib.types.bool; + default = true; + description = "Pass `EVERY_CHANNEL_NBC_NO_SANDBOX=1` for Chrome worker sessions."; + }; + + vnc = { + enable = lib.mkOption { + type = lib.types.bool; + default = true; + description = "Expose the NBC virtual display over VNC so auth can be completed remotely when needed."; + }; + + listen = lib.mkOption { + type = lib.types.str; + default = "127.0.0.1"; + description = "x11vnc listen address for the NBC virtual display."; + }; + + port = lib.mkOption { + type = lib.types.port; + default = 5900; + description = "x11vnc TCP port for the NBC virtual display."; + }; + }; + }; + broadcasts = lib.mkOption { type = lib.types.listOf (lib.types.submodule { options = { @@ -283,10 +347,15 @@ in default = null; description = "Optional explicit ffmpeg input URL/file. When set, HDHomeRun settings are ignored for this broadcast."; }; + nbcUrl = lib.mkOption { + type = lib.types.nullOr lib.types.str; + default = null; + description = "Optional NBC watch/live URL for a browser-backed relay publish worker."; + }; }; }); default = [ ]; - description = "List of broadcasts (name + channel, or explicit input) to publish."; + description = "List of broadcasts (HDHomeRun, explicit input, or NBC browser-backed URL) to publish."; }; }; @@ -299,7 +368,7 @@ in { assertion = let - needsHdhr = builtins.any (b: b.input == null) cfg.broadcasts; + needsHdhr = builtins.any (b: b.input == null && b.nbcUrl == null) cfg.broadcasts; in (!needsHdhr) || (cfg.hdhomerun.host != null) || (cfg.hdhomerun.deviceId != null); message = "Set services.every-channel.ec-node.hdhomerun.host or .deviceId (required when any broadcast omits `input`)"; @@ -309,8 +378,20 @@ in message = "hdhomerun.autoDiscover only applies when hdhomerun.host is unset"; } { - assertion = builtins.all (b: (b.input != null) || (b.channel != null)) cfg.broadcasts; - message = "Each broadcast must set either `input` or `channel`"; + assertion = + builtins.all + (b: + (lib.length (lib.filter (value: value != null) [ b.input b.channel b.nbcUrl ])) == 1) + cfg.broadcasts; + message = "Each broadcast must set exactly one of `input`, `channel`, or `nbcUrl`"; + } + { + assertion = + let + hasNbcBroadcast = builtins.any (b: b.nbcUrl != null) cfg.broadcasts; + in + (!hasNbcBroadcast) || cfg.nbc.enable; + message = "Set services.every-channel.ec-node.nbc.enable = true before configuring `broadcasts.*.nbcUrl`"; } ]; @@ -318,16 +399,30 @@ in [ "d /run/every-channel 1777 root root - -" ] + ++ lib.optionals cfg.nbc.enable [ + "d /var/lib/every-channel 0750 every-channel every-channel - -" + "d ${cfg.nbc.profileDir} 0750 every-channel every-channel - -" + "d ${cfg.nbc.authScreenshotDir} 0750 every-channel every-channel - -" + ] ++ lib.optionals cfg.archive.enable [ "d ${cfg.archive.outputDir} 0750 root root - -" "d ${cfg.archive.manifestDir} 0750 root root - -" ]; + users.groups.every-channel = lib.mkIf cfg.nbc.enable { }; + users.users.every-channel = lib.mkIf cfg.nbc.enable { + isSystemUser = true; + group = "every-channel"; + home = "/var/lib/every-channel"; + createHome = true; + }; + systemd.services = lib.listToAttrs (map (b: let unit = "every-channel-wt-publish-${sanitizeUnitName b.name}"; + isNbc = b.nbcUrl != null; runner = pkgs.writeShellApplication { name = unit; runtimeInputs = @@ -352,6 +447,7 @@ in "cmd+=(${lib.concatStringsSep " " (map lib.escapeShellArg cfg.extraArgs)})"; explicitInputStr = if b.input == null then "" else b.input; channelStr = if b.channel == null then "" else b.channel; + nbcUrlStr = if b.nbcUrl == null then "" else b.nbcUrl; controlEndpointOutPath = "/run/every-channel/control-peer-${sanitizeUnitName b.name}.json"; controlDiscoveryStr = if cfg.control.discovery == null then "" else cfg.control.discovery; controlIrohSecretStr = if cfg.control.irohSecret == null then "" else cfg.control.irohSecret; @@ -360,114 +456,122 @@ in '' set -euo pipefail + nbc_url=${lib.escapeShellArg nbcUrlStr} input="" - explicit_input=${lib.escapeShellArg explicitInputStr} - if [[ -n "$explicit_input" ]]; then - input="$explicit_input" - else - ch=${lib.escapeShellArg channelStr} - if [[ -z "$ch" ]]; then - echo "ec-node: broadcast missing both input and channel" >&2 - exit 2 - fi - - # Note: don't wrap lib.escapeShellArg in double-quotes, otherwise empty strings - # become a literal two-quote token and break discovery. - base=${lib.escapeShellArg fixedHost} - if [[ -z "$base" ]]; then - dev_id=${lib.escapeShellArg deviceId} - if [[ -z "$dev_id" ]]; then - echo "ec-node: missing hdhomerun.host and hdhomerun.deviceId" >&2 + if [[ -z "$nbc_url" ]]; then + explicit_input=${lib.escapeShellArg explicitInputStr} + if [[ -n "$explicit_input" ]]; then + input="$explicit_input" + else + ch=${lib.escapeShellArg channelStr} + if [[ -z "$ch" ]]; then + echo "ec-node: broadcast missing input, channel, and nbcUrl" >&2 exit 2 fi - try_ip() { - local ip="$1" - local json id base_url - json="$(curl -fsS --connect-timeout 0.10 --max-time 0.20 "http://$ip/discover.json" 2>/dev/null || true)" - if [[ -z "$json" ]]; then - return 1 - fi - id="$(printf '%s' "$json" | jq -r '.DeviceID // empty' 2>/dev/null || true)" - if [[ "$id" != "$dev_id" ]]; then - return 1 - fi - base_url="$(printf '%s' "$json" | jq -r '.BaseURL // empty' 2>/dev/null || true)" - if [[ -z "$base_url" ]]; then - base_url="http://$ip" - fi - printf '%s\n' "$base_url" - return 0 - } - - if ${lib.boolToString cfg.hdhomerun.autoDiscover}; then - # Primary: UDP broadcast discover. - base="$(${cfg.discoveryPackage}/bin/ec-cli discover | jq -r --arg id "$dev_id" '.[] | select(.id == $id) | .base_url // empty' | head -n1 || true)" - - # Fallback: probe known neighbors for /discover.json (fast; avoids full /24 scan). - if [[ -z "$base" ]]; then - while read -r ip; do - found="$(try_ip "$ip" || true)" - if [[ -n "$found" ]]; then - base="$found" - break - fi - done < <(ip neigh | awk '{print $1}' | sort -u) + # Note: don't wrap lib.escapeShellArg in double-quotes, otherwise empty strings + # become a literal two-quote token and break discovery. + base=${lib.escapeShellArg fixedHost} + if [[ -z "$base" ]]; then + dev_id=${lib.escapeShellArg deviceId} + if [[ -z "$dev_id" ]]; then + echo "ec-node: missing hdhomerun.host and hdhomerun.deviceId" >&2 + exit 2 fi - # Fallback: scan local /24 subnets for /discover.json (slow; worst-case ~50s). - if [[ -z "$base" ]]; then - while read -r cidr; do - ip_addr="''${cidr%/*}" - prefix="''${cidr#*/}" - if [[ "$prefix" != "24" ]]; then - continue - fi - net="''${ip_addr%.*}" - for i in $(seq 1 254); do - found="$(try_ip "$net.$i" || true)" + try_ip() { + local ip="$1" + local json id base_url + json="$(curl -fsS --connect-timeout 0.10 --max-time 0.20 "http://$ip/discover.json" 2>/dev/null || true)" + if [[ -z "$json" ]]; then + return 1 + fi + id="$(printf '%s' "$json" | jq -r '.DeviceID // empty' 2>/dev/null || true)" + if [[ "$id" != "$dev_id" ]]; then + return 1 + fi + base_url="$(printf '%s' "$json" | jq -r '.BaseURL // empty' 2>/dev/null || true)" + if [[ -z "$base_url" ]]; then + base_url="http://$ip" + fi + printf '%s\n' "$base_url" + return 0 + } + + if ${lib.boolToString cfg.hdhomerun.autoDiscover}; then + base="$(${cfg.discoveryPackage}/bin/ec-cli discover | jq -r --arg id "$dev_id" '.[] | select(.id == $id) | .base_url // empty' | head -n1 || true)" + + if [[ -z "$base" ]]; then + while read -r ip; do + found="$(try_ip "$ip" || true)" if [[ -n "$found" ]]; then base="$found" break fi - done - if [[ -n "$base" ]]; then - break - fi - done < <(ip -o -4 addr show scope global | awk '{print $4}') - fi + done < <(ip neigh | awk '{print $1}' | sort -u) + fi - if [[ -z "$base" ]]; then - echo "ec-node: HDHomeRun deviceId not found: $dev_id" >&2 - exit 2 + if [[ -z "$base" ]]; then + while read -r cidr; do + ip_addr="''${cidr%/*}" + prefix="''${cidr#*/}" + if [[ "$prefix" != "24" ]]; then + continue + fi + net="''${ip_addr%.*}" + for i in $(seq 1 254); do + found="$(try_ip "$net.$i" || true)" + if [[ -n "$found" ]]; then + base="$found" + break + fi + done + if [[ -n "$base" ]]; then + break + fi + done < <(ip -o -4 addr show scope global | awk '{print $4}') + fi + + if [[ -z "$base" ]]; then + echo "ec-node: HDHomeRun deviceId not found: $dev_id" >&2 + exit 2 + fi + else + base="http://$dev_id.local" fi - else - # Best-effort mDNS convention. - base="http://$dev_id.local" fi - fi - base="''${base%/}" - if [[ "$base" != http://* && "$base" != https://* ]]; then - base="http://$base" - fi + base="''${base%/}" + if [[ "$base" != http://* && "$base" != https://* ]]; then + base="http://$base" + fi - # HDHomeRun streaming is on port 5004, regardless of the discover BaseURL. - hostport="''${base#http://}" - hostport="''${hostport#https://}" - hostport="''${hostport%%/*}" - host="''${hostport%%:*}" - input="http://$host:5004/auto/v$ch" + hostport="''${base#http://}" + hostport="''${hostport#https://}" + hostport="''${hostport%%/*}" + host="''${hostport%%:*}" + input="http://$host:5004/auto/v$ch" + fi fi - cmd=( - ${lib.escapeShellArg "${cfg.package}/bin/ec-node"} - wt-publish - --url ${lib.escapeShellArg cfg.relayUrl} - --name ${lib.escapeShellArg b.name} - --input "$input" - ) - ${lib.optionalString (!cfg.transcode) "cmd+=(--transcode=false)"} + if [[ -n "$nbc_url" ]]; then + cmd=( + ${lib.escapeShellArg "${cfg.package}/bin/ec-node"} + nbc-wt-publish + --url ${lib.escapeShellArg cfg.relayUrl} + --name ${lib.escapeShellArg b.name} + --source-url "$nbc_url" + ) + else + cmd=( + ${lib.escapeShellArg "${cfg.package}/bin/ec-node"} + wt-publish + --url ${lib.escapeShellArg cfg.relayUrl} + --name ${lib.escapeShellArg b.name} + --input "$input" + ) + ${lib.optionalString (!cfg.transcode) "cmd+=(--transcode=false)"} + fi ${lib.optionalString (!cfg.passthrough) "cmd+=(--passthrough=false)"} ${lib.optionalString cfg.tlsDisableVerify "cmd+=(--tls-disable-verify)"} ${lib.optionalString cfg.control.enable '' @@ -503,8 +607,12 @@ in value = { description = "every.channel WebTransport publish (${b.name} -> ${cfg.relayUrl})"; wantedBy = [ "multi-user.target" ]; - after = [ "network-online.target" ]; - wants = [ "network-online.target" ]; + after = + [ "network-online.target" ] + ++ lib.optionals isNbc [ "every-channel-nbc-display.service" ]; + wants = + [ "network-online.target" ] + ++ lib.optionals isNbc [ "every-channel-nbc-display.service" ]; # Keep the unit from entering "failed" due to rapid restarts (deploy-flake treats # failed units during `switch-to-configuration test` as a deployment failure). @@ -521,23 +629,34 @@ in Restart = "always"; RestartSec = 2; - DynamicUser = true; + DynamicUser = !isNbc; + User = lib.mkIf isNbc "every-channel"; + Group = lib.mkIf isNbc "every-channel"; NoNewPrivileges = true; - PrivateTmp = true; + PrivateTmp = !isNbc; ProtectSystem = "strict"; - ProtectHome = true; + ProtectHome = !isNbc; ProtectKernelTunables = true; ProtectKernelModules = true; ProtectControlGroups = true; LockPersonality = true; - MemoryDenyWriteExecute = true; + MemoryDenyWriteExecute = !isNbc; RestrictSUIDSGID = true; RestrictRealtime = true; SystemCallArchitectures = "native"; - ReadWritePaths = lib.optionals cfg.control.enable [ "/run/every-channel" ]; + ReadWritePaths = + lib.optionals cfg.control.enable [ "/run/every-channel" ] + ++ lib.optionals isNbc [ cfg.nbc.profileDir cfg.nbc.authScreenshotDir ]; }; - environment = cfg.environment; + environment = + cfg.environment + // lib.optionalAttrs isNbc { + DISPLAY = cfg.nbc.display; + EVERY_CHANNEL_NBC_CHROME_PATH = cfg.nbc.chromeBinary; + EVERY_CHANNEL_NBC_PROFILE_DIR = cfg.nbc.profileDir; + EVERY_CHANNEL_NBC_NO_SANDBOX = if cfg.nbc.noSandbox then "1" else "0"; + }; }; }) cfg.broadcasts) @@ -852,6 +971,112 @@ in environment = cfg.environment; }; - }); + }) + // lib.optionalAttrs cfg.nbc.enable + (let + displayUnit = "every-channel-nbc-display"; + displayNumber = lib.strings.removePrefix ":" cfg.nbc.display; + displayRunner = pkgs.writeShellApplication { + name = displayUnit; + runtimeInputs = [ + pkgs.coreutils + pkgs.xorg.xorgserver + ]; + text = '' + set -euo pipefail + exec ${pkgs.xorg.xorgserver}/bin/Xvfb ${lib.escapeShellArg cfg.nbc.display} \ + -screen 0 ${lib.escapeShellArg cfg.nbc.screen} \ + -nolisten tcp \ + -ac \ + +extension RANDR + ''; + }; + vncUnit = "every-channel-nbc-vnc"; + vncRunner = pkgs.writeShellApplication { + name = vncUnit; + runtimeInputs = [ + pkgs.x11vnc + ]; + text = '' + set -euo pipefail + exec ${pkgs.x11vnc}/bin/x11vnc \ + -display ${lib.escapeShellArg cfg.nbc.display} \ + -forever \ + -shared \ + -nopw \ + -listen ${lib.escapeShellArg cfg.nbc.vnc.listen} \ + -rfbport ${toString cfg.nbc.vnc.port} + ''; + }; + in + ({ + "${displayUnit}" = { + description = "every.channel NBC virtual display"; + wantedBy = [ "multi-user.target" ]; + after = [ "network-online.target" ]; + wants = [ "network-online.target" ]; + + serviceConfig = { + Type = "simple"; + ExecStart = "${displayRunner}/bin/${displayUnit}"; + Restart = "always"; + RestartSec = 2; + User = "every-channel"; + Group = "every-channel"; + NoNewPrivileges = true; + PrivateTmp = false; + ProtectSystem = "strict"; + ProtectHome = false; + ProtectKernelTunables = true; + ProtectKernelModules = true; + ProtectControlGroups = true; + LockPersonality = true; + MemoryDenyWriteExecute = false; + RestrictSUIDSGID = true; + RestrictRealtime = true; + SystemCallArchitectures = "native"; + ReadWritePaths = [ "/tmp" "/var/lib/every-channel" ]; + }; + + environment = cfg.environment // { + HOME = "/var/lib/every-channel"; + }; + }; + } + // lib.optionalAttrs cfg.nbc.vnc.enable { + "${vncUnit}" = { + description = "every.channel NBC virtual display VNC bridge"; + wantedBy = [ "multi-user.target" ]; + after = [ "network-online.target" "${displayUnit}.service" ]; + wants = [ "network-online.target" "${displayUnit}.service" ]; + + serviceConfig = { + Type = "simple"; + ExecStart = "${vncRunner}/bin/${vncUnit}"; + Restart = "always"; + RestartSec = 2; + User = "every-channel"; + Group = "every-channel"; + NoNewPrivileges = true; + PrivateTmp = false; + ProtectSystem = "strict"; + ProtectHome = false; + ProtectKernelTunables = true; + ProtectKernelModules = true; + ProtectControlGroups = true; + LockPersonality = true; + MemoryDenyWriteExecute = false; + RestrictSUIDSGID = true; + RestrictRealtime = true; + SystemCallArchitectures = "native"; + ReadWritePaths = [ "/tmp" "/var/lib/every-channel" ]; + }; + + environment = cfg.environment // { + DISPLAY = cfg.nbc.display; + HOME = "/var/lib/every-channel"; + }; + }; + })); }; } diff --git a/nix/nixos/ecp-forge.nix b/nix/nixos/ecp-forge.nix index 8e105d6..2459a05 100644 --- a/nix/nixos/ecp-forge.nix +++ b/nix/nixos/ecp-forge.nix @@ -11,6 +11,12 @@ in ./ecp-forge-hardware.nix ]; + nixpkgs.config.allowUnfreePredicate = pkg: + builtins.elem (lib.getName pkg) [ + "google-chrome" + "google-chrome-stable" + ]; + networking = { hostName = "ecp-forge"; hostId = "007f0200"; @@ -252,6 +258,24 @@ in services.every-channel.ec-node = { enable = true; + nbc = { + enable = true; + chromeBinary = "${pkgs.google-chrome}/bin/google-chrome-stable"; + display = ":120"; + screen = "1920x1080x24"; + noSandbox = true; + vnc = { + enable = true; + listen = "127.0.0.1"; + port = 5900; + }; + }; + broadcasts = [ + { + name = "forge-nbc-sports-philly"; + nbcUrl = "https://www.nbc.com/live?brand=nbc-sports-philadelphia"; + } + ]; archive = { enable = true; outputDir = "/tank/every-channel/archive"; @@ -261,6 +285,25 @@ in }; }; + services.every-channel.ethereum = { + enable = true; + poolName = "eth"; + poolDevice = "/dev/disk/by-id/nvme-eui.01000000000000008ce38ee307de5c01"; + rootDir = "/eth"; + publicIp = "95.216.114.54"; + publicHost = "eth.every.channel"; + }; + + services.mullvad-vpn = { + enable = true; + enableExcludeWrapper = true; + }; + + systemd.services.every-channel-wt-publish-forge-nbc-sports-philly = { + after = [ "mullvad-daemon.service" ]; + wants = [ "mullvad-daemon.service" ]; + }; + services.every-channel.op-stack = { enable = hasOpStackSepoliaKey; challengerEnable = hasOpStackChallengerPrestate; @@ -276,13 +319,20 @@ in p2pAdvertiseIp = "95.216.114.54"; }; - environment.systemPackages = with pkgs; [ - git - htop - jq - tmux - zfs - ]; + environment.systemPackages = + (with pkgs; [ + git + google-chrome + htop + jq + mullvad-vpn + tmux + x11vnc + zfs + ]) + ++ [ + config.services.every-channel.ec-node.package + ]; system.stateVersion = "22.11"; } diff --git a/nix/pkgs/ec-cli.nix b/nix/pkgs/ec-cli.nix index df5bd32..1522f03 100644 --- a/nix/pkgs/ec-cli.nix +++ b/nix/pkgs/ec-cli.nix @@ -12,7 +12,17 @@ let let base = baseNameOf path; in - !(base == "target" || base == ".git" || base == ".direnv" || base == "tmp" || base == "node_modules"); + !(base == "target" + || base == ".git" + || base == ".direnv" + || base == "tmp" + || base == "node_modules" + || base == "out" + || base == "test-results" + || base == "deploy" + || base == "intake" + || base == "cache" + || base == ".tower-minimal"); }; in rustPlatform.buildRustPackage { diff --git a/nix/pkgs/ec-node.nix b/nix/pkgs/ec-node.nix index ae08534..146b95a 100644 --- a/nix/pkgs/ec-node.nix +++ b/nix/pkgs/ec-node.nix @@ -1,5 +1,6 @@ { lib , rustPlatform +, rustfmt , stdenv , pkg-config , openssl @@ -14,7 +15,17 @@ let base = baseNameOf path; in # Skip typical build outputs and large scratch dirs. - !(base == "target" || base == ".git" || base == ".direnv" || base == "tmp" || base == "node_modules"); + !(base == "target" + || base == ".git" + || base == ".direnv" + || base == "tmp" + || base == "node_modules" + || base == "out" + || base == "test-results" + || base == "deploy" + || base == "intake" + || base == "cache" + || base == ".tower-minimal"); }; in rustPlatform.buildRustPackage { @@ -30,6 +41,7 @@ rustPlatform.buildRustPackage { nativeBuildInputs = [ pkg-config + rustfmt ]; buildInputs =