Advance forge NBC worker and Ethereum full nodes

This commit is contained in:
every.channel 2026-04-03 02:01:34 -07:00
parent 7d84510eac
commit 3402f7dab2
No known key found for this signature in database
17 changed files with 3066 additions and 414 deletions

1
Cargo.lock generated
View file

@ -2262,6 +2262,7 @@ dependencies = [
"serde_json", "serde_json",
"tokio", "tokio",
"tokio-tungstenite", "tokio-tungstenite",
"tokio-util",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"url", "url",

View file

@ -29,9 +29,9 @@ use std::io::{Cursor, Read};
use std::net::IpAddr; use std::net::IpAddr;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio}; use std::process::{Child, Command, Stdio};
use std::sync::{mpsc, Arc, Mutex as StdMutex};
#[cfg(target_os = "macos")] #[cfg(target_os = "macos")]
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{mpsc, Arc, Mutex as StdMutex};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tauri::path::BaseDirectory; use tauri::path::BaseDirectory;
#[cfg(target_os = "macos")] #[cfg(target_os = "macos")]
@ -309,10 +309,7 @@ fn dedupe_source_descriptors(sources: Vec<SourceDescriptor>) -> Vec<SourceDescri
by_key.into_values().collect() by_key.into_values().collect()
} }
async fn refresh_public_nbc_discovery_if_needed( async fn refresh_public_nbc_discovery_if_needed(state: Arc<Mutex<StreamManager>>, force: bool) {
state: Arc<Mutex<StreamManager>>,
force: bool,
) {
let needs_refresh = { let needs_refresh = {
let manager = state.lock().await; let manager = state.lock().await;
force force
@ -2712,7 +2709,10 @@ fn discover_nbc_public_guide() -> Result<NbcPublicDiscovery> {
} else { } else {
Some(current_title.to_string()) Some(current_title.to_string())
}; };
let entitlement = item.analytics.as_ref().and_then(|value| value.entitlement.as_deref()); let entitlement = item
.analytics
.as_ref()
.and_then(|value| value.entitlement.as_deref());
let mut extra_metadata = vec![ let mut extra_metadata = vec![
StreamMetadata { StreamMetadata {
key: "discovery_source".to_string(), key: "discovery_source".to_string(),
@ -2737,7 +2737,10 @@ fn discover_nbc_public_guide() -> Result<NbcPublicDiscovery> {
value: current_program.clone(), value: current_program.clone(),
}); });
} }
if let Some(relative_path) = tile.relative_path.as_ref().filter(|value| !value.is_empty()) if let Some(relative_path) = tile
.relative_path
.as_ref()
.filter(|value| !value.is_empty())
{ {
extra_metadata.push(StreamMetadata { extra_metadata.push(StreamMetadata {
key: "relative_path".to_string(), key: "relative_path".to_string(),
@ -3986,7 +3989,10 @@ fn nbc_native_js_eval(window: &WebviewWindow, script: &str) -> Result<String> {
} }
#[cfg(target_os = "macos")] #[cfg(target_os = "macos")]
fn nbc_native_snapshot(window: &WebviewWindow, capture_rect: Option<&NbcCaptureRect>) -> Result<Vec<u8>> { fn nbc_native_snapshot(
window: &WebviewWindow,
capture_rect: Option<&NbcCaptureRect>,
) -> Result<Vec<u8>> {
let (tx, rx) = mpsc::sync_channel(1); let (tx, rx) = mpsc::sync_channel(1);
let capture_rect = capture_rect.cloned(); let capture_rect = capture_rect.cloned();
window window
@ -4016,11 +4022,14 @@ fn nbc_native_snapshot(window: &WebviewWindow, capture_rect: Option<&NbcCaptureR
let bitmap = NSBitmapImageRep::imageRepWithData(&tiff) let bitmap = NSBitmapImageRep::imageRepWithData(&tiff)
.ok_or_else(|| anyhow!("failed to decode WKWebView snapshot"))?; .ok_or_else(|| anyhow!("failed to decode WKWebView snapshot"))?;
let properties = NSDictionary::dictionary(); let properties = NSDictionary::dictionary();
bitmap.representationUsingType_properties( bitmap
.representationUsingType_properties(
NSBitmapImageFileType::JPEG, NSBitmapImageFileType::JPEG,
&properties, &properties,
) )
.ok_or_else(|| anyhow!("failed to encode WKWebView snapshot as JPEG")) .ok_or_else(|| {
anyhow!("failed to encode WKWebView snapshot as JPEG")
})
.map(|jpeg| jpeg.to_vec()) .map(|jpeg| jpeg.to_vec())
}); });
tiff tiff
@ -4561,7 +4570,10 @@ fn run_nbc_capture_loop(
} }
#[cfg(target_os = "macos")] #[cfg(target_os = "macos")]
fn spawn_nbc_frame_reader_native(app: &AppHandle, source: &StreamSource) -> Result<FramePipeReader> { fn spawn_nbc_frame_reader_native(
app: &AppHandle,
source: &StreamSource,
) -> Result<FramePipeReader> {
let origin_url = source_metadata_value(source, "origin_url") let origin_url = source_metadata_value(source, "origin_url")
.ok_or_else(|| anyhow!("NBC source is missing origin_url metadata"))? .ok_or_else(|| anyhow!("NBC source is missing origin_url metadata"))?
.to_string(); .to_string();
@ -4678,7 +4690,11 @@ fn bootstrap_nbc_auth_hidden(
None, None,
)?; )?;
wait_for_nbc_playback_webview(&window, &session.trace, session.hidden_mode)?; wait_for_nbc_playback_webview(&window, &session.trace, session.hidden_mode)?;
let trace_state = session.trace.lock().map(|state| state.clone()).unwrap_or_default(); let trace_state = session
.trace
.lock()
.map(|state| state.clone())
.unwrap_or_default();
Ok(BootstrapNbcAuthResult { Ok(BootstrapNbcAuthResult {
input_url: input_url.clone(), input_url: input_url.clone(),
stream_id, stream_id,

View file

@ -40,8 +40,9 @@ hang = "0.14.0"
moq-mux = "0.2.1" moq-mux = "0.2.1"
moq-lite = "0.14.0" moq-lite = "0.14.0"
moq-native = { version = "0.13.1", default-features = true } moq-native = { version = "0.13.1", default-features = true }
headless_chrome = "1"
tokio-util = "0.7"
url = "2" url = "2"
[dev-dependencies] [dev-dependencies]
headless_chrome = "1"
which = "6" which = "6"

View file

@ -1,5 +1,6 @@
//! Node runner: orchestrates ingest, chunking, and MoQ publication. //! Node runner: orchestrates ingest, chunking, and MoQ publication.
mod nbc;
mod source; mod source;
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
@ -33,6 +34,10 @@ use futures_util::{SinkExt, StreamExt};
use iroh::Watcher; use iroh::Watcher;
use just_webrtc::types::{DataChannelOptions, ICEServer, PeerConfiguration, PeerConnectionState}; use just_webrtc::types::{DataChannelOptions, ICEServer, PeerConfiguration, PeerConnectionState};
use just_webrtc::{DataChannelExt, PeerConnectionBuilder, PeerConnectionExt}; use just_webrtc::{DataChannelExt, PeerConnectionBuilder, PeerConnectionExt};
use nbc::{
bootstrap_nbc_auth, nbc_capture_fps, resolve_nbc_chrome_path, resolve_nbc_profile_dir,
spawn_nbc_frame_reader,
};
use source::{HdhrSource, HlsMode, HlsSource, LinuxDvbSource, StreamSource, TsSource}; use source::{HdhrSource, HlsMode, HlsSource, LinuxDvbSource, StreamSource, TsSource};
use std::collections::{BTreeMap, HashMap, HashSet}; use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs; use std::fs;
@ -48,6 +53,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::process::Command as TokioCommand; use tokio::process::Command as TokioCommand;
use tokio_tungstenite::tungstenite::Message as WsMessage; use tokio_tungstenite::tungstenite::Message as WsMessage;
use tokio_util::io::SyncIoBridge;
use url::Url; use url::Url;
const DIRECT_WIRE_TAG_FRAME: u8 = 0x00; const DIRECT_WIRE_TAG_FRAME: u8 = 0x00;
@ -90,6 +96,10 @@ enum Commands {
WsSubscribe(WsSubscribeArgs), WsSubscribe(WsSubscribeArgs),
/// Publish a CMAF (fMP4) stream to a MoQ relay over WebTransport (Cloudflare preview by default). /// Publish a CMAF (fMP4) stream to a MoQ relay over WebTransport (Cloudflare preview by default).
WtPublish(WtPublishArgs), WtPublish(WtPublishArgs),
/// Warm an NBC / Adobe auth session in Chrome, only requiring operator interaction when the session is cold.
NbcBootstrap(NbcBootstrapArgs),
/// Publish a live NBC browser-backed stream to a MoQ relay over WebTransport.
NbcWtPublish(NbcWtPublishArgs),
/// Subscribe to a relay broadcast over WebTransport/MoQ and archive groups into CAS. /// Subscribe to a relay broadcast over WebTransport/MoQ and archive groups into CAS.
WtArchive(WtArchiveArgs), WtArchive(WtArchiveArgs),
/// Serve archived relay groups as DVR-style HLS playlists + object endpoints. /// Serve archived relay groups as DVR-style HLS playlists + object endpoints.
@ -470,6 +480,68 @@ struct WtPublishArgs {
control_endpoint_addr_out: Option<PathBuf>, control_endpoint_addr_out: Option<PathBuf>,
} }
#[derive(Parser, Debug)]
struct NbcBootstrapArgs {
/// NBC watch or live URL to warm.
#[arg(long)]
source_url: String,
/// Optional Chrome binary override.
#[arg(long)]
chrome_path: Option<PathBuf>,
/// Optional persistent profile directory override.
#[arg(long)]
profile_dir: Option<PathBuf>,
/// Optional path for a screenshot captured when interactive auth is required.
#[arg(long)]
screenshot_out: Option<PathBuf>,
}
#[derive(Parser, Debug)]
struct NbcWtPublishArgs {
/// Relay URL (WebTransport) to connect to.
#[arg(long, default_value = "https://cdn.moq.dev/anon")]
url: String,
/// Broadcast name to publish.
#[arg(long)]
name: String,
/// NBC watch or live URL to open in Chrome.
#[arg(long)]
source_url: String,
/// Transmit fMP4 fragments directly (passthrough mode).
#[arg(long, default_value_t = true, action = clap::ArgAction::Set)]
passthrough: bool,
/// Danger: disable TLS verification for the relay.
#[arg(long, default_value_t = false)]
tls_disable_verify: bool,
/// Announce this relay stream over iroh gossip control topic.
#[arg(long, default_value_t = false)]
control_announce: bool,
/// Control gossip TTL (ms) for control announcements.
#[arg(long, default_value_t = 15000)]
control_ttl_ms: u64,
/// Control gossip announce interval (ms).
#[arg(long, default_value_t = 5000)]
control_interval_ms: u64,
/// Optional iroh secret key (hex) for control gossip endpoint identity.
#[arg(long)]
iroh_secret: Option<String>,
/// Discovery modes to enable for control gossip endpoint (comma-separated: dht, mdns, dns).
#[arg(long)]
discovery: Option<String>,
/// Gossip peers to connect to for control announcements (repeatable).
#[arg(long)]
gossip_peer: Vec<String>,
/// Optional path to write this publisher's control endpoint address JSON.
#[arg(long)]
control_endpoint_addr_out: Option<PathBuf>,
/// Optional Chrome binary override.
#[arg(long)]
chrome_path: Option<PathBuf>,
/// Optional persistent profile directory override.
#[arg(long)]
profile_dir: Option<PathBuf>,
}
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
struct WtArchiveArgs { struct WtArchiveArgs {
/// Relay URL (WebTransport) to connect to. /// Relay URL (WebTransport) to connect to.
@ -718,6 +790,8 @@ fn main() -> Result<()> {
Commands::WsPublish(args) => run_async(ws_publish(args))?, Commands::WsPublish(args) => run_async(ws_publish(args))?,
Commands::WsSubscribe(args) => run_async(ws_subscribe(args))?, Commands::WsSubscribe(args) => run_async(ws_subscribe(args))?,
Commands::WtPublish(args) => run_async(wt_publish(args))?, Commands::WtPublish(args) => run_async(wt_publish(args))?,
Commands::NbcBootstrap(args) => nbc_bootstrap(args)?,
Commands::NbcWtPublish(args) => run_async(nbc_wt_publish(args))?,
Commands::WtArchive(args) => run_async(wt_archive(args))?, Commands::WtArchive(args) => run_async(wt_archive(args))?,
Commands::WtArchiveServe(args) => run_async(wt_archive_serve(args))?, Commands::WtArchiveServe(args) => run_async(wt_archive_serve(args))?,
Commands::ControlAnnounce(args) => run_async(control_announce(args))?, Commands::ControlAnnounce(args) => run_async(control_announce(args))?,
@ -1807,6 +1881,47 @@ mod tests {
assert!(manifest_hash_for_chunk(&manifest, sid, 12).is_none()); assert!(manifest_hash_for_chunk(&manifest, sid, 12).is_none());
} }
#[test]
fn decode_archive_group_bytes_unwraps_concatenated_object_frames() {
let meta_a = ObjectMeta {
created_unix_ms: 1,
content_type: "video/mp4".to_string(),
size_bytes: 4,
timing: None,
encryption: None,
chunk_hash: None,
chunk_hash_alg: None,
chunk_proof: None,
chunk_proof_alg: None,
manifest_id: None,
};
let meta_b = ObjectMeta {
created_unix_ms: 2,
content_type: "video/mp4".to_string(),
size_bytes: 3,
timing: None,
encryption: None,
chunk_hash: None,
chunk_hash_alg: None,
chunk_proof: None,
chunk_proof_alg: None,
manifest_id: None,
};
let mut group = encode_object_frame(&meta_a, b"init").unwrap();
group.extend_from_slice(&encode_object_frame(&meta_b, b"seg").unwrap());
let decoded = decode_archive_group_bytes(&group).unwrap();
assert_eq!(decoded, b"initseg");
}
#[test]
fn decode_archive_group_bytes_passes_through_raw_payloads() {
let raw = b"\x00\x00\x00\x18ftypisom\x00\x00\x02\x00isomiso2";
let decoded = decode_archive_group_bytes(raw).unwrap();
assert_eq!(decoded, raw);
}
#[derive(Clone)] #[derive(Clone)]
struct DummySource { struct DummySource {
source_id: ec_core::SourceId, source_id: ec_core::SourceId,
@ -5835,6 +5950,59 @@ fn archive_error(status: u16, message: &str) -> ArchiveHttpResponse {
) )
} }
fn decode_archive_group_bytes(bytes: &[u8]) -> Result<Vec<u8>> {
let mut pos = 0usize;
let mut out = Vec::new();
let mut decoded_any = false;
while pos < bytes.len() {
if bytes.len().saturating_sub(pos) < 4 {
if decoded_any {
return Err(anyhow!("archive group ended mid-frame header"));
}
return Ok(bytes.to_vec());
}
let meta_len =
u32::from_be_bytes([bytes[pos], bytes[pos + 1], bytes[pos + 2], bytes[pos + 3]])
as usize;
let meta_start = pos + 4;
let meta_end = meta_start + meta_len;
if meta_end > bytes.len() {
if decoded_any {
return Err(anyhow!("archive group ended mid-frame metadata"));
}
return Ok(bytes.to_vec());
}
let meta: ObjectMeta = match serde_json::from_slice(&bytes[meta_start..meta_end]) {
Ok(meta) => meta,
Err(err) => {
if decoded_any {
return Err(anyhow!("archive group metadata parse failed: {err:#}"));
}
return Ok(bytes.to_vec());
}
};
let data_len = usize::try_from(meta.size_bytes)
.map_err(|_| anyhow!("archive object size exceeds addressable memory"))?;
let data_start = meta_end;
let data_end = data_start + data_len;
if data_end > bytes.len() {
if decoded_any {
return Err(anyhow!("archive group ended mid-frame payload"));
}
return Ok(bytes.to_vec());
}
out.extend_from_slice(&bytes[data_start..data_end]);
pos = data_end;
decoded_any = true;
}
Ok(out)
}
fn archive_status_text(status: u16) -> &'static str { fn archive_status_text(status: u16) -> &'static str {
match status { match status {
200 => "OK", 200 => "OK",
@ -6025,7 +6193,10 @@ fn handle_archive_http_request(
Err(_) => return archive_error(400, "invalid hash"), Err(_) => return archive_error(400, "invalid hash"),
}; };
match fs::read(path) { match fs::read(path) {
Ok(bytes) => archive_response(200, "video/mp4", bytes), Ok(bytes) => match decode_archive_group_bytes(&bytes) {
Ok(payload) => archive_response(200, "video/mp4", payload),
Err(err) => archive_error(500, &format!("{err:#}")),
},
Err(_) => archive_error(404, "init segment not found"), Err(_) => archive_error(404, "init segment not found"),
} }
} }
@ -6036,7 +6207,10 @@ fn handle_archive_http_request(
Err(_) => return archive_error(400, "invalid hash"), Err(_) => return archive_error(400, "invalid hash"),
}; };
match fs::read(path) { match fs::read(path) {
Ok(bytes) => archive_response(200, "video/mp4", bytes), Ok(bytes) => match decode_archive_group_bytes(&bytes) {
Ok(payload) => archive_response(200, "video/mp4", payload),
Err(err) => archive_error(500, &format!("{err:#}")),
},
Err(_) => archive_error(404, "segment not found"), Err(_) => archive_error(404, "segment not found"),
} }
} }
@ -6105,128 +6279,26 @@ async fn handle_archive_http_connection(
Ok(()) Ok(())
} }
async fn wt_archive_serve(args: WtArchiveServeArgs) -> Result<()> { #[derive(Debug, Clone)]
let manifest_root = args struct WtPublishRelayArgs {
.manifest_dir url: String,
.unwrap_or_else(|| args.output_dir.join("manifests")); name: String,
let cas_root = args.output_dir.join("objects").join("blake3"); tls_disable_verify: bool,
fs::create_dir_all(&manifest_root) control_announce: bool,
.with_context(|| format!("failed to create manifest dir {}", manifest_root.display()))?; control_ttl_ms: u64,
fs::create_dir_all(&cas_root) control_interval_ms: u64,
.with_context(|| format!("failed to create CAS dir {}", cas_root.display()))?; iroh_secret: Option<String>,
discovery: Option<String>,
let listener = TcpListener::bind(&args.listen) gossip_peer: Vec<String>,
.await control_endpoint_addr_out: Option<PathBuf>,
.with_context(|| format!("failed to bind archive replay listener {}", args.listen))?;
let local = listener
.local_addr()
.context("failed to read listen addr")?;
tracing::info!(
listen = %local,
manifest_root = %manifest_root.display(),
cas_root = %cas_root.display(),
"archive replay server listening"
);
let state = ArchiveReplayState {
cas_root,
manifest_root,
};
loop {
let (stream, peer) = listener.accept().await.context("accept failed")?;
let state_clone = state.clone();
tokio::spawn(async move {
if let Err(err) = handle_archive_http_connection(stream, state_clone).await {
tracing::debug!(peer = %peer, err = %err, "archive replay request failed");
}
});
}
} }
async fn wt_publish(args: WtPublishArgs) -> Result<()> { struct WtPublishRelayState {
let relay_url = session: moq_lite::Session,
Url::parse(&args.url).with_context(|| format!("invalid relay url: {}", args.url))?; broadcast: moq_lite::BroadcastProducer,
let relay_url_str = relay_url.to_string(); catalog: hang::CatalogProducer,
control_stop: Option<oneshot::Sender<()>>,
let mut control_stop: Option<oneshot::Sender<()>> = None;
if args.control_announce {
let secret = parse_iroh_secret(args.iroh_secret.clone())?;
let discovery = parse_discovery(args.discovery.as_deref())?;
let endpoint = ec_iroh::build_endpoint(secret, discovery).await?;
let gossip_peers = parse_gossip_peers(args.gossip_peer.clone());
let endpoint_addr_json =
serde_json::to_string(&endpoint.addr()).unwrap_or_else(|_| endpoint.id().to_string());
let announcement = build_control_announcement(
args.name.clone(),
args.name.clone(),
vec![StreamTransportDescriptor::RelayMoq {
url: relay_url_str.clone(),
broadcast_name: args.name.clone(),
track_name: "video0.m4s".to_string(),
}],
args.control_ttl_ms,
);
match spawn_control_announcer_task(
endpoint.clone(),
gossip_peers,
announcement,
Duration::from_millis(args.control_interval_ms.max(1000)),
)
.await
{
Ok(stop_tx) => {
eprintln!("control endpoint id: {}", endpoint.id());
eprintln!("control endpoint addr: {}", endpoint_addr_json);
if let Some(path) = args.control_endpoint_addr_out.as_ref() {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).with_context(|| {
format!(
"failed to create control endpoint addr parent dir: {}",
parent.display()
)
})?;
} }
fs::write(path, format!("{endpoint_addr_json}\n")).with_context(|| {
format!(
"failed to write control endpoint addr file: {}",
path.display()
)
})?;
tracing::info!(
path = %path.display(),
"wrote control endpoint addr file"
);
}
tracing::info!(
endpoint = %endpoint.id(),
endpoint_addr = %endpoint_addr_json,
stream = %args.name,
"control announce enabled"
);
control_stop = Some(stop_tx);
}
Err(err) => {
tracing::warn!("failed to start control announce task: {err:#}");
}
}
}
// Create a local origin + broadcast, then pass an OriginConsumer into the client so it can
// publish announcements to the relay.
let origin = moq_lite::Origin::produce();
let publish = origin.consume();
let mut broadcast = origin
.create_broadcast(&args.name)
.ok_or_else(|| anyhow!("failed to create broadcast: {}", args.name))?;
// Ensure the catalog track is present in the broadcast so subscribers can discover tracks.
let mut catalog = hang::CatalogProducer::default();
broadcast.insert_track(catalog.track.clone());
#[derive(Clone)] #[derive(Clone)]
struct ProtocolOverride<S> { struct ProtocolOverride<S> {
@ -6306,7 +6378,6 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
.to_string(); .to_string();
let port = relay_url.port().unwrap_or(443); let port = relay_url.port().unwrap_or(443);
// Build TLS config.
let mut roots = rustls::RootCertStore::empty(); let mut roots = rustls::RootCertStore::empty();
let native = rustls_native_certs::load_native_certs(); let native = rustls_native_certs::load_native_certs();
if !native.errors.is_empty() { if !native.errors.is_empty() {
@ -6324,7 +6395,6 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
.with_no_client_auth(); .with_no_client_auth();
if tls_disable_verify { if tls_disable_verify {
// Mirror moq-native's behavior: accept any certificate, but still verify signatures.
#[derive(Debug)] #[derive(Debug)]
struct NoCertificateVerification(Arc<rustls::crypto::CryptoProvider>); struct NoCertificateVerification(Arc<rustls::crypto::CryptoProvider>);
@ -6336,8 +6406,7 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
_server_name: &rustls::pki_types::ServerName<'_>, _server_name: &rustls::pki_types::ServerName<'_>,
_ocsp: &[u8], _ocsp: &[u8],
_now: rustls::pki_types::UnixTime, _now: rustls::pki_types::UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> ) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
{
Ok(rustls::client::danger::ServerCertVerified::assertion()) Ok(rustls::client::danger::ServerCertVerified::assertion())
} }
@ -6383,10 +6452,8 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
.set_certificate_verifier(Arc::new(NoCertificateVerification(provider))); .set_certificate_verifier(Arc::new(NoCertificateVerification(provider)));
} }
// WebTransport over HTTP/3.
tls.alpn_protocols = vec![web_transport_quinn::ALPN.as_bytes().to_vec()]; tls.alpn_protocols = vec![web_transport_quinn::ALPN.as_bytes().to_vec()];
// Build a Quinn endpoint.
let socket = std::net::UdpSocket::bind("[::]:0").context("failed to bind UDP socket")?; let socket = std::net::UdpSocket::bind("[::]:0").context("failed to bind UDP socket")?;
let mut transport = quinn::TransportConfig::default(); let mut transport = quinn::TransportConfig::default();
@ -6400,7 +6467,6 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
let endpoint = quinn::Endpoint::new(endpoint_config, None, socket, runtime) let endpoint = quinn::Endpoint::new(endpoint_config, None, socket, runtime)
.context("failed to create QUIC endpoint")?; .context("failed to create QUIC endpoint")?;
// Resolve relay.
let ip = tokio::net::lookup_host((host.clone(), port)) let ip = tokio::net::lookup_host((host.clone(), port))
.await .await
.context("failed DNS lookup")? .context("failed DNS lookup")?
@ -6417,7 +6483,6 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
.await .await
.context("failed QUIC connect")?; .context("failed QUIC connect")?;
// Establish a WebTransport session.
let mut request = web_transport_quinn::proto::ConnectRequest::new(relay_url.clone()); let mut request = web_transport_quinn::proto::ConnectRequest::new(relay_url.clone());
for alpn in moq_lite::ALPNS { for alpn in moq_lite::ALPNS {
request = request.with_protocol(alpn.to_string()); request = request.with_protocol(alpn.to_string());
@ -6426,9 +6491,6 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
.await .await
.context("failed WebTransport CONNECT")?; .context("failed WebTransport CONNECT")?;
// Establish a MoQ session. First try native negotiation as-selected by the relay.
// If that fails, fall back to explicit protocol overrides for relays that omit protocol
// selection in CONNECT responses.
let client = moq_lite::Client::new().with_publish(publish); let client = moq_lite::Client::new().with_publish(publish);
match client.connect(wt.clone()).await { match client.connect(wt.clone()).await {
@ -6441,8 +6503,6 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
} }
} }
// These correspond to IETF draft ALPNs as used by moq-lite/web code.
// We use string literals here since moq-lite does not currently expose these constants.
let attempts: [&str; 4] = ["moqt-16", "moqt-15", "moq-00", ""]; let attempts: [&str; 4] = ["moqt-16", "moqt-15", "moq-00", ""];
let mut last_err: Option<anyhow::Error> = None; let mut last_err: Option<anyhow::Error> = None;
@ -6459,7 +6519,11 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
} }
Err(err) => { Err(err) => {
last_err = Some(anyhow::Error::new(err)); last_err = Some(anyhow::Error::new(err));
tracing::debug!(protocol = %p, err = %last_err.as_ref().unwrap(), "MoQ SETUP failed; retrying"); tracing::debug!(
protocol = %p,
err = %last_err.as_ref().unwrap(),
"MoQ SETUP failed; retrying"
);
} }
} }
} }
@ -6467,9 +6531,151 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
Err(last_err.unwrap_or_else(|| anyhow!("failed to connect"))).context("failed MoQ SETUP") Err(last_err.unwrap_or_else(|| anyhow!("failed to connect"))).context("failed MoQ SETUP")
} }
async fn open_wt_publish_relay(args: &WtPublishRelayArgs) -> Result<WtPublishRelayState> {
let relay_url =
Url::parse(&args.url).with_context(|| format!("invalid relay url: {}", args.url))?;
let relay_url_str = relay_url.to_string();
let mut control_stop: Option<oneshot::Sender<()>> = None;
if args.control_announce {
let secret = parse_iroh_secret(args.iroh_secret.clone())?;
let discovery = parse_discovery(args.discovery.as_deref())?;
let endpoint = ec_iroh::build_endpoint(secret, discovery).await?;
let gossip_peers = parse_gossip_peers(args.gossip_peer.clone());
let endpoint_addr_json =
serde_json::to_string(&endpoint.addr()).unwrap_or_else(|_| endpoint.id().to_string());
let announcement = build_control_announcement(
args.name.clone(),
args.name.clone(),
vec![StreamTransportDescriptor::RelayMoq {
url: relay_url_str.clone(),
broadcast_name: args.name.clone(),
track_name: "video0.m4s".to_string(),
}],
args.control_ttl_ms,
);
match spawn_control_announcer_task(
endpoint.clone(),
gossip_peers,
announcement,
Duration::from_millis(args.control_interval_ms.max(1000)),
)
.await
{
Ok(stop_tx) => {
eprintln!("control endpoint id: {}", endpoint.id());
eprintln!("control endpoint addr: {}", endpoint_addr_json);
if let Some(path) = args.control_endpoint_addr_out.as_ref() {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).with_context(|| {
format!(
"failed to create control endpoint addr parent dir: {}",
parent.display()
)
})?;
}
fs::write(path, format!("{endpoint_addr_json}\n")).with_context(|| {
format!(
"failed to write control endpoint addr file: {}",
path.display()
)
})?;
tracing::info!(
path = %path.display(),
"wrote control endpoint addr file"
);
}
tracing::info!(
endpoint = %endpoint.id(),
endpoint_addr = %endpoint_addr_json,
stream = %args.name,
"control announce enabled"
);
control_stop = Some(stop_tx);
}
Err(err) => {
tracing::warn!("failed to start control announce task: {err:#}");
}
}
}
let origin = moq_lite::Origin::produce();
let publish = origin.consume();
let mut broadcast = origin
.create_broadcast(&args.name)
.ok_or_else(|| anyhow!("failed to create broadcast: {}", args.name))?;
let catalog = hang::CatalogProducer::default();
broadcast.insert_track(catalog.track.clone());
tracing::info!(url=%relay_url, name=%args.name, "connecting to relay"); tracing::info!(url=%relay_url, name=%args.name, "connecting to relay");
let session = connect_moq_session(&relay_url, publish, args.tls_disable_verify).await?; let session = connect_moq_session(&relay_url, publish, args.tls_disable_verify).await?;
Ok(WtPublishRelayState {
session,
broadcast,
catalog,
control_stop,
})
}
async fn wt_archive_serve(args: WtArchiveServeArgs) -> Result<()> {
let manifest_root = args
.manifest_dir
.unwrap_or_else(|| args.output_dir.join("manifests"));
let cas_root = args.output_dir.join("objects").join("blake3");
fs::create_dir_all(&manifest_root)
.with_context(|| format!("failed to create manifest dir {}", manifest_root.display()))?;
fs::create_dir_all(&cas_root)
.with_context(|| format!("failed to create CAS dir {}", cas_root.display()))?;
let listener = TcpListener::bind(&args.listen)
.await
.with_context(|| format!("failed to bind archive replay listener {}", args.listen))?;
let local = listener
.local_addr()
.context("failed to read listen addr")?;
tracing::info!(
listen = %local,
manifest_root = %manifest_root.display(),
cas_root = %cas_root.display(),
"archive replay server listening"
);
let state = ArchiveReplayState {
cas_root,
manifest_root,
};
loop {
let (stream, peer) = listener.accept().await.context("accept failed")?;
let state_clone = state.clone();
tokio::spawn(async move {
if let Err(err) = handle_archive_http_connection(stream, state_clone).await {
tracing::debug!(peer = %peer, err = %err, "archive replay request failed");
}
});
}
}
async fn wt_publish(args: WtPublishArgs) -> Result<()> {
let mut relay = open_wt_publish_relay(&WtPublishRelayArgs {
url: args.url.clone(),
name: args.name.clone(),
tls_disable_verify: args.tls_disable_verify,
control_announce: args.control_announce,
control_ttl_ms: args.control_ttl_ms,
control_interval_ms: args.control_interval_ms,
iroh_secret: args.iroh_secret.clone(),
discovery: args.discovery.clone(),
gossip_peer: args.gossip_peer.clone(),
control_endpoint_addr_out: args.control_endpoint_addr_out.clone(),
})
.await?;
// Spawn ffmpeg to generate fMP4 suitable for hang/moq-mux. // Spawn ffmpeg to generate fMP4 suitable for hang/moq-mux.
let mut cmd = TokioCommand::new("ffmpeg"); let mut cmd = TokioCommand::new("ffmpeg");
cmd.arg("-hide_banner") cmd.arg("-hide_banner")
@ -6543,10 +6749,10 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
let config = moq_mux::import::Fmp4Config { let config = moq_mux::import::Fmp4Config {
passthrough: args.passthrough, passthrough: args.passthrough,
}; };
let mut importer = moq_mux::import::Fmp4::new(broadcast, catalog, config); let mut importer = moq_mux::import::Fmp4::new(relay.broadcast, relay.catalog, config);
let mut stdout = stdout; let mut stdout = stdout;
let mut decode_fut = importer.decode_from(&mut stdout); let decode_fut = importer.decode_from(&mut stdout);
tokio::pin!(decode_fut); tokio::pin!(decode_fut);
tracing::info!("publishing fMP4 -> moq-mux -> relay"); tracing::info!("publishing fMP4 -> moq-mux -> relay");
@ -6559,7 +6765,7 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
Err(err) => Err(err).context("fmp4 ingest failed"), Err(err) => Err(err).context("fmp4 ingest failed"),
} }
} }
_ = session.closed() => { _ = relay.session.closed() => {
let _ = child.kill().await; let _ = child.kill().await;
Err(anyhow!("relay session closed")) Err(anyhow!("relay session closed"))
} }
@ -6571,7 +6777,170 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> {
} }
}; };
if let Some(stop) = control_stop.take() { if let Some(stop) = relay.control_stop.take() {
let _ = stop.send(());
}
outcome
}
fn nbc_bootstrap(args: NbcBootstrapArgs) -> Result<()> {
let chrome_path = resolve_nbc_chrome_path(args.chrome_path.as_deref())?;
let profile_dir = resolve_nbc_profile_dir(args.profile_dir.as_deref())?;
let result = bootstrap_nbc_auth(
chrome_path,
profile_dir,
args.source_url,
args.screenshot_out,
)?;
println!(
"{}",
serde_json::to_string_pretty(&result).context("failed to encode bootstrap result")?
);
Ok(())
}
async fn nbc_wt_publish(args: NbcWtPublishArgs) -> Result<()> {
let chrome_path = resolve_nbc_chrome_path(args.chrome_path.as_deref())?;
let profile_dir = resolve_nbc_profile_dir(args.profile_dir.as_deref())?;
let mut relay = open_wt_publish_relay(&WtPublishRelayArgs {
url: args.url.clone(),
name: args.name.clone(),
tls_disable_verify: args.tls_disable_verify,
control_announce: args.control_announce,
control_ttl_ms: args.control_ttl_ms,
control_interval_ms: args.control_interval_ms,
iroh_secret: args.iroh_secret.clone(),
discovery: args.discovery.clone(),
gossip_peer: args.gossip_peer.clone(),
control_endpoint_addr_out: args.control_endpoint_addr_out.clone(),
})
.await?;
let mut frame_reader = spawn_nbc_frame_reader(
chrome_path.clone(),
profile_dir.clone(),
args.source_url.clone(),
)
.with_context(|| format!("failed to open NBC browser session for {}", args.source_url))?;
let fps = nbc_capture_fps().max(1);
let gop = (fps * 4).clamp(12, 48);
let mut cmd = TokioCommand::new("ffmpeg");
cmd.arg("-hide_banner")
.arg("-loglevel")
.arg("error")
.arg("-nostats")
.arg("-fflags")
.arg("+nobuffer")
.arg("-flags")
.arg("low_delay")
.arg("-f")
.arg("mjpeg")
.arg("-framerate")
.arg(fps.to_string())
.arg("-i")
.arg("pipe:0")
.args([
"-map",
"0:v:0",
"-an",
"-c:v",
"libx264",
"-preset",
"veryfast",
"-tune",
"zerolatency",
"-pix_fmt",
"yuv420p",
"-profile:v",
"main",
"-g",
])
.arg(gop.to_string())
.arg("-keyint_min")
.arg(gop.to_string())
.args([
"-sc_threshold",
"0",
"-threads",
"1",
"-f",
"mp4",
"-movflags",
"empty_moov+frag_every_frame+separate_moof+omit_tfhd_offset",
"pipe:1",
]);
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::inherit());
tracing::info!(
source_url = %args.source_url,
chrome_path = %chrome_path.display(),
profile_dir = %profile_dir.display(),
"spawning ffmpeg for NBC browser capture"
);
let mut child = cmd.spawn().context("failed to spawn ffmpeg")?;
let stdin = child
.stdin
.take()
.ok_or_else(|| anyhow!("ffmpeg stdin unavailable"))?;
let writer = tokio::task::spawn_blocking(move || -> Result<()> {
let mut stdin = SyncIoBridge::new(stdin);
std::io::copy(&mut frame_reader, &mut stdin)
.context("failed to pipe NBC JPEG frames into ffmpeg")?;
Ok(())
});
let stdout = child
.stdout
.take()
.ok_or_else(|| anyhow!("ffmpeg stdout unavailable"))?;
let config = moq_mux::import::Fmp4Config {
passthrough: args.passthrough,
};
let mut importer = moq_mux::import::Fmp4::new(relay.broadcast, relay.catalog, config);
let mut stdout = stdout;
let decode_fut = importer.decode_from(&mut stdout);
tokio::pin!(decode_fut);
tracing::info!("publishing NBC browser capture -> fMP4 -> moq-mux -> relay");
let outcome = tokio::select! {
res = &mut decode_fut => {
let status = child.wait().await.context("failed to wait for ffmpeg")?;
match writer.await {
Ok(Ok(())) => {}
Ok(Err(err)) if !status.success() => {
tracing::debug!("NBC frame writer ended after ffmpeg exit: {err:#}");
}
Ok(Err(err)) => return Err(err).context("NBC frame writer failed"),
Err(err) => return Err(anyhow!("NBC frame writer task failed: {err}")),
}
match res {
Ok(()) if status.success() => Ok(()),
Ok(()) => Err(anyhow!("ffmpeg exited with {status}")),
Err(err) => Err(err).context("fmp4 ingest failed"),
}
}
_ = relay.session.closed() => {
let _ = child.kill().await;
let _ = writer.await;
Err(anyhow!("relay session closed"))
}
_ = tokio::signal::ctrl_c() => {
tracing::info!("ctrl-c; shutting down");
let _ = child.kill().await;
let _ = writer.await;
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
};
if let Some(stop) = relay.control_stop.take() {
let _ = stop.send(()); let _ = stop.send(());
} }

1284
crates/ec-node/src/nbc.rs Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,65 @@
# Ethereum Nodes on `ecp-forge`
This runbook covers the dual-network Ethereum full-node surface introduced by [ECP-0104](/Users/conradev/Projects/every.channel/evolution/proposals/ECP-0104-ecp-forge-dual-ethereum-full-nodes-on-dedicated-nvme-zfs.md).
## Scope
- `ecp-forge` owns a dedicated NVMe-backed ZFS pool for Ethereum state.
- Ethereum mainnet and Sepolia both run as full nodes.
- Execution uses Reth and consensus uses Lighthouse.
- Public HTTPS on `eth.every.channel` exposes sync and finality status only.
- Raw execution RPC, Engine API, and Beacon API stay bound to `127.0.0.1` on `ecp-forge`.
## Deploy
```sh
./scripts/deploy-ecp-forge.sh
```
## Verify
Storage:
```sh
ssh -o BatchMode=yes -o IdentityAgent=none -o IdentitiesOnly=yes -i ~/.ssh/id_ed25519 root@git.every.channel \
'zpool list eth && zfs list -r eth'
```
Core services:
```sh
ssh -o BatchMode=yes -o IdentityAgent=none -o IdentitiesOnly=yes -i ~/.ssh/id_ed25519 root@git.every.channel \
'systemctl is-active every-channel-ethereum-storage podman-every-channel-ethereum-mainnet-reth podman-every-channel-ethereum-mainnet-lighthouse podman-every-channel-ethereum-sepolia-reth podman-every-channel-ethereum-sepolia-lighthouse'
```
Public sync status:
```sh
curl -fsS https://eth.every.channel/mainnet/sync | jq .
curl -fsS https://eth.every.channel/sepolia/sync | jq .
```
## Local-only endpoints on `ecp-forge`
Mainnet:
- execution HTTP: `127.0.0.1:8545`
- execution WS: `127.0.0.1:8546`
- execution Engine API: `127.0.0.1:8551`
- beacon API: `127.0.0.1:5052`
Sepolia:
- execution HTTP: `127.0.0.1:18545`
- execution WS: `127.0.0.1:18546`
- execution Engine API: `127.0.0.1:18551`
- beacon API: `127.0.0.1:15052`
## Notes
- The dedicated Ethereum pool is `eth`, not `tank`.
- Both networks are configured for full sync, not archive mode.
- Lighthouse is configured to permit full beacon sync from genesis via
`--allow-insecure-genesis-sync` instead of checkpoint bootstrap.
- The first public host surface is intentionally sync and finality only; extend `eth.every.channel`
later if authenticated RPC is required.

View file

@ -72,6 +72,19 @@ Notes:
- adjust startup timeout / capture rate with `EVERY_CHANNEL_NBC_CAPTURE_TIMEOUT_SECS`, - adjust startup timeout / capture rate with `EVERY_CHANNEL_NBC_CAPTURE_TIMEOUT_SECS`,
`EVERY_CHANNEL_NBC_CAPTURE_FPS`, and `EVERY_CHANNEL_NBC_CAPTURE_QUALITY` `EVERY_CHANNEL_NBC_CAPTURE_FPS`, and `EVERY_CHANNEL_NBC_CAPTURE_QUALITY`
On Linux / forge hosts, the equivalent worker path lives in `ec-node`:
- warm auth with
`ec-node nbc-bootstrap --source-url 'https://www.nbc.com/live?brand=nbc-sports-philadelphia'`
- publish with
`ec-node nbc-wt-publish --url https://cdn.moq.dev/anon --name forge-nbc-sports-philly --source-url 'https://www.nbc.com/live?brand=nbc-sports-philadelphia'`
- for unattended hosts, persist the Chrome profile with `EVERY_CHANNEL_NBC_PROFILE_DIR=/path/to/profile`
- the NixOS module exposes `services.every-channel.ec-node.nbc.*` for a persistent Xvfb display plus
an optional local-only VNC bridge so MVPD auth can be completed only when the session is cold
- on Linux virtual displays, the worker disables Chrome GPU acceleration by default; only set
`EVERY_CHANNEL_NBC_ENABLE_GPU=1` if the host has a real GL-capable display path
- the forge path is also currently video-first; audio is still a follow-up item
Linux DVB sources can be added with a URL like: Linux DVB sources can be added with a URL like:
``` ```

View file

@ -0,0 +1,15 @@
# ECP-0101: Decode Archived Object Frames Before HLS Replay
## Why
`wt-archive` stores raw MoQ object frames in CAS. `wt-archive-serve` was replaying those framed bytes directly as `init.mp4` and `.m4s`, which yields HLS playlists that look valid but cannot be decoded by players.
## Decision
Decode archived object frames back into their media payload bytes before serving replay responses. If archived bytes are already raw payloads, pass them through unchanged.
## Consequences
- Local and forge replay can hand archived CMAF data to normal HLS consumers.
- Existing archives remain usable without rewriting CAS objects.
- Replay failures now point at genuinely bad archive contents rather than envelope bytes leaking over HTTP.

View file

@ -0,0 +1,35 @@
# ECP-0102: Linux Widevine NBC Worker on `ecp-forge`
## Why
NBC live playback currently exists only in the desktop app:
- macOS native `WKWebView` for in-app auth and playback capture
- local Chrome fallback for browser-driven playback capture
`ecp-forge` cannot originate an NBC live stream today because `ec-node` has no NBC source worker, no Adobe auth bootstrap flow, and no Linux browser runtime with Widevine.
## Decision
Build a forge-capable NBC source worker around Linux Google Chrome plus a persistent authenticated profile.
1. Use Google Chrome on `x86_64` Linux, not bare Chromium, as the browser runtime for NBC on forge.
2. Run Chrome in a virtual display session on `ecp-forge` so DRM playback remains in a normal browser path even when no physical display is attached.
Keep the forge worker on a dedicated display number and disable GPU acceleration by default under Xvfb.
3. Persist the NBC/Adobe browser profile on forge so auth warmup is amortized across runs.
4. Add an explicit bootstrap path that surfaces interactive auth only when the stored session is cold or expired.
5. Launch Chrome as an external process and attach over DevTools rather than relying on the crate-managed Chrome launcher path.
6. Publish the resulting captured stream through the existing `ec-node` relay path so archive, replay, and downstream nodes stay unchanged.
7. Expose the forge operator surface as `ec-node nbc-bootstrap` and `ec-node nbc-wt-publish`, with a persistent virtual display on `ecp-forge`.
## Consequences
- NBC live origin becomes possible from forge without depending on a users macOS desktop session.
- We keep the existing every.channel transport/archive model and only replace the source worker.
- The first forge implementation should be browser-frame capture first; audio and full unattended auth renewal can follow once the Linux worker is stable.
## Rejected Alternatives
- Reuse the macOS `WKWebView` path on forge: impossible on Linux.
- Depend on bare Chromium only: rejected because Widevine support is the central requirement.
- Require true headless-only DRM playback from the start: rejected because the safer first implementation is a normal Chrome session inside a virtual display.

View file

@ -0,0 +1,24 @@
# ECP-0103: Mullvad Philadelphia Egress for Forge NBC Philadelphia
## Why
The forge-side NBC worker is currently dependent on a reverse-tunneled proxy for US egress.
That is enough to prove the geo-boundary, but it is the wrong long-term operator shape for `NBC Sports Philadelphia`.
## Decision
1. Enable the Mullvad daemon on `ecp-forge`.
2. Keep the Mullvad account number out of committed Nix configuration; log in operationally from founder-provided material.
3. Use a Philadelphia Mullvad relay for `NBC Sports Philadelphia` work on forge.
4. Start the forge NBC publish worker after the Mullvad daemon is available.
## Consequences
- Forge NBC egress becomes self-contained instead of depending on a local reverse proxy.
- The account credential stays operational-only rather than being copied into repo config.
- Relay choice remains runtime-controlled, so it can be swapped if a specific Philadelphia host degrades.
## Rejected Alternatives
- Keep relying on the reverse-tunneled local proxy: rejected because it couples forge origin to a founder workstation.
- Commit the Mullvad account number into NixOS config: rejected because it expands secret exposure for no benefit.

View file

@ -0,0 +1,45 @@
# ECP-0104: `ecp-forge` dual Ethereum full nodes on dedicated NVMe ZFS
## Why
`every.channel` now has OP Stack and Ethereum contract rails on `ecp-forge`, but it still depends on
public L1 RPC and beacon providers for Ethereum itself.
The forge host has enough raw capacity in `tank`, but `tank` is a large HDD `raidz3` pool that is
well suited for archive bytes, not for the random-read/write pattern of concurrent Ethereum
execution and consensus sync. The same host also has a free 7 TB NVMe device, which is the right
media class for a self-hosted node.
## Decision
1. Create a dedicated single-device ZFS pool for Ethereum on the free NVMe in `ecp-forge`.
2. Run two full-sync Ethereum node pairs on that pool:
- Ethereum mainnet
- Ethereum Sepolia
3. Use Reth for execution and Lighthouse for consensus.
4. Keep raw execution and beacon RPC local-only on `ecp-forge` for the first tranche.
5. Publish sync and finality status on `https://eth.every.channel`.
6. Because this tranche is explicitly full-sync from genesis, run Lighthouse with
`--allow-insecure-genesis-sync` instead of checkpoint bootstrap.
## Consequences
- `every.channel` gets repo-owned L1 execution and consensus rails for both mainnet and Sepolia.
- Ethereum state no longer competes with `tank` archive I/O.
- The first public surface is intentionally conservative: health and sync visibility, not
unauthenticated public JSON-RPC.
- The dedicated Ethereum pool is single-device NVMe, so it optimizes for node performance rather
than storage redundancy.
- Consensus sync starts from genesis without checkpoint assistance, which is slower but matches the
requested full-sync posture.
## Rejected Alternatives
- Put Ethereum state on `tank`: rejected because the HDD `raidz3` pool is the wrong latency profile
for concurrent execution and consensus sync.
- Keep using only public upstream Ethereum providers: rejected because the OP Stack and settlement
rails should not depend on third-party RPC availability.
- Expose raw JSON-RPC on `eth.every.channel` immediately: rejected for the first tranche because it
creates an unauthenticated public abuse surface before auth and rate policy exists.
- Use checkpoint sync for Lighthouse: rejected for this tranche because the requested posture is
full sync from genesis on both networks.

View file

@ -15,6 +15,7 @@
ec-runner = import ./nix/modules/ec-runner.nix; ec-runner = import ./nix/modules/ec-runner.nix;
ec-netboot = import ./nix/modules/ec-netboot.nix; ec-netboot = import ./nix/modules/ec-netboot.nix;
ec-ipxe-qemu = import ./nix/modules/ec-ipxe-qemu.nix; ec-ipxe-qemu = import ./nix/modules/ec-ipxe-qemu.nix;
ec-ethereum = import ./nix/modules/ec-ethereum.nix;
ec-op-stack = import ./nix/modules/ec-op-stack.nix; ec-op-stack = import ./nix/modules/ec-op-stack.nix;
ec-publisher-guest = import ./nix/modules/ec-publisher-guest.nix; ec-publisher-guest = import ./nix/modules/ec-publisher-guest.nix;
default = ec-node; default = ec-node;
@ -58,6 +59,7 @@
self.nixosModules.ec-node self.nixosModules.ec-node
self.nixosModules.ec-netboot self.nixosModules.ec-netboot
self.nixosModules.ec-ipxe-qemu self.nixosModules.ec-ipxe-qemu
self.nixosModules.ec-ethereum
self.nixosModules.ec-op-stack self.nixosModules.ec-op-stack
./nix/nixos/ecp-forge.nix ./nix/nixos/ecp-forge.nix
]; ];

485
nix/modules/ec-ethereum.nix Normal file
View file

@ -0,0 +1,485 @@
{ lib, config, pkgs, ... }:
let
cfg = config.services.every-channel.ethereum;
mkNetworkSubmodule =
name: defaults:
{ ... }:
{
options = {
enable = lib.mkOption {
type = lib.types.bool;
default = true;
description = "Whether to run the ${name} Ethereum execution and consensus pair.";
};
rootDir = lib.mkOption {
type = lib.types.str;
default = "${cfg.rootDir}/${name}";
description = "Persistent root directory for the ${name} node state.";
};
reth = {
httpPort = lib.mkOption {
type = lib.types.port;
default = defaults.rethHttpPort;
description = "Local HTTP JSON-RPC port for the ${name} Reth node.";
};
wsPort = lib.mkOption {
type = lib.types.port;
default = defaults.rethWsPort;
description = "Local WebSocket JSON-RPC port for the ${name} Reth node.";
};
authPort = lib.mkOption {
type = lib.types.port;
default = defaults.rethAuthPort;
description = "Local Engine API port for the ${name} Reth node.";
};
p2pPort = lib.mkOption {
type = lib.types.port;
default = defaults.rethP2pPort;
description = "RLPx/P2P TCP port for the ${name} Reth node.";
};
discoveryPort = lib.mkOption {
type = lib.types.port;
default = defaults.rethDiscoveryPort;
description = "Discovery UDP port for the ${name} Reth node.";
};
metricsPort = lib.mkOption {
type = lib.types.port;
default = defaults.rethMetricsPort;
description = "Prometheus port for the ${name} Reth node.";
};
};
lighthouse = {
httpPort = lib.mkOption {
type = lib.types.port;
default = defaults.lighthouseHttpPort;
description = "Local Beacon API port for the ${name} Lighthouse node.";
};
p2pPort = lib.mkOption {
type = lib.types.port;
default = defaults.lighthouseP2pPort;
description = "TCP libp2p port for the ${name} Lighthouse node.";
};
discoveryPort = lib.mkOption {
type = lib.types.port;
default = defaults.lighthouseDiscoveryPort;
description = "UDP discovery port for the ${name} Lighthouse node.";
};
quicPort = lib.mkOption {
type = lib.types.port;
default = defaults.lighthouseQuicPort;
description = "UDP QUIC port for the ${name} Lighthouse node.";
};
metricsPort = lib.mkOption {
type = lib.types.port;
default = defaults.lighthouseMetricsPort;
description = "Prometheus port for the ${name} Lighthouse node.";
};
};
};
};
networks = {
mainnet = cfg.mainnet;
sepolia = cfg.sepolia;
};
enabledNetworks = lib.filterAttrs (_: networkCfg: networkCfg.enable) networks;
rethContainerName = network: "every-channel-ethereum-${network}-reth";
lighthouseContainerName = network: "every-channel-ethereum-${network}-lighthouse";
networkDatasetLines = lib.concatStringsSep "\n" (
lib.mapAttrsToList
(network: networkCfg: ''
ensure_dataset ${lib.escapeShellArg "${cfg.poolName}/${network}"}
ensure_dataset ${lib.escapeShellArg "${cfg.poolName}/${network}/reth"}
ensure_dataset ${lib.escapeShellArg "${cfg.poolName}/${network}/lighthouse"}
ensure_jwt ${lib.escapeShellArg "${networkCfg.rootDir}/jwt.hex"}
'')
enabledNetworks
);
mkNatArgs = lib.optionals (cfg.publicIp != null) [ "--nat" "extip:${cfg.publicIp}" ];
mkEnrArgs = lib.optionals (cfg.publicIp != null) [ "--enr-address" cfg.publicIp ];
mkRethContainer =
network: networkCfg: {
image = cfg.images.reth;
autoStart = true;
extraOptions = [ "--network=host" ];
volumes = [ "${networkCfg.rootDir}:/state" ];
cmd =
[
"node"
"--chain"
network
"--datadir"
"/state/reth"
"--full"
"--http"
"--http.addr"
"127.0.0.1"
"--http.port"
(toString networkCfg.reth.httpPort)
"--http.api"
"eth,net,web3,rpc"
"--ws"
"--ws.addr"
"127.0.0.1"
"--ws.port"
(toString networkCfg.reth.wsPort)
"--ws.api"
"eth,net,web3,rpc"
"--authrpc.addr"
"127.0.0.1"
"--authrpc.port"
(toString networkCfg.reth.authPort)
"--authrpc.jwtsecret"
"/state/jwt.hex"
"--port"
(toString networkCfg.reth.p2pPort)
"--discovery.port"
(toString networkCfg.reth.discoveryPort)
"--metrics"
"127.0.0.1:${toString networkCfg.reth.metricsPort}"
"--log.stdout.format"
"json"
]
++ mkNatArgs;
};
mkLighthouseContainer =
network: networkCfg: {
image = cfg.images.lighthouse;
autoStart = true;
extraOptions = [ "--network=host" ];
volumes = [ "${networkCfg.rootDir}:/state" ];
entrypoint = "/usr/local/bin/lighthouse";
cmd =
[
"beacon_node"
"--network"
network
"--datadir"
"/state/lighthouse"
"--http"
"--http-address"
"127.0.0.1"
"--http-port"
(toString networkCfg.lighthouse.httpPort)
"--execution-endpoint"
"http://127.0.0.1:${toString networkCfg.reth.authPort}"
"--execution-jwt"
"/state/jwt.hex"
"--allow-insecure-genesis-sync"
"--port"
(toString networkCfg.lighthouse.p2pPort)
"--discovery-port"
(toString networkCfg.lighthouse.discoveryPort)
"--quic-port"
(toString networkCfg.lighthouse.quicPort)
"--metrics"
"--metrics-address"
"127.0.0.1"
"--metrics-port"
(toString networkCfg.lighthouse.metricsPort)
]
++ mkEnrArgs;
};
caddyRootBody = ''
every.channel ethereum nodes
mainnet sync: /mainnet/sync
mainnet finality: /mainnet/finality
sepolia sync: /sepolia/sync
sepolia finality: /sepolia/finality
raw execution and beacon RPC remain local-only on ecp-forge for now.
'';
in
{
options.services.every-channel.ethereum = {
enable = lib.mkEnableOption "every.channel dual-network Ethereum full nodes";
poolName = lib.mkOption {
type = lib.types.str;
default = "eth";
description = "Dedicated ZFS pool name used for Ethereum node state.";
};
poolDevice = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Block device used to create the dedicated Ethereum ZFS pool if it does not already exist.";
};
rootDir = lib.mkOption {
type = lib.types.str;
default = "/eth";
description = "Mountpoint for the dedicated Ethereum ZFS pool.";
};
publicIp = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Public IP to advertise in Ethereum P2P metadata.";
};
publicHost = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional HTTPS host that publishes node sync and finality surfaces.";
};
images = {
reth = lib.mkOption {
type = lib.types.str;
default = "ghcr.io/paradigmxyz/reth:v1.9.3";
description = "Pinned Reth OCI image.";
};
lighthouse = lib.mkOption {
type = lib.types.str;
default = "docker.io/sigp/lighthouse:v8.1.1";
description = "Pinned Lighthouse OCI image.";
};
};
mainnet = lib.mkOption {
type = lib.types.submodule (mkNetworkSubmodule "mainnet" {
rethHttpPort = 8545;
rethWsPort = 8546;
rethAuthPort = 8551;
rethP2pPort = 30303;
rethDiscoveryPort = 30303;
rethMetricsPort = 19001;
lighthouseHttpPort = 5052;
lighthouseP2pPort = 9000;
lighthouseDiscoveryPort = 9000;
lighthouseQuicPort = 9001;
lighthouseMetricsPort = 5054;
});
default = { };
description = "Mainnet Ethereum node configuration.";
};
sepolia = lib.mkOption {
type = lib.types.submodule (mkNetworkSubmodule "sepolia" {
rethHttpPort = 18545;
rethWsPort = 18546;
rethAuthPort = 18551;
rethP2pPort = 31303;
rethDiscoveryPort = 31303;
rethMetricsPort = 29001;
lighthouseHttpPort = 15052;
lighthouseP2pPort = 19000;
lighthouseDiscoveryPort = 19000;
lighthouseQuicPort = 19001;
lighthouseMetricsPort = 15054;
});
default = { };
description = "Sepolia Ethereum node configuration.";
};
};
config = lib.mkIf cfg.enable {
assertions = [
{
assertion = cfg.poolDevice != null;
message = "services.every-channel.ethereum.poolDevice must be set when the Ethereum node is enabled";
}
{
assertion = enabledNetworks != { };
message = "At least one Ethereum network must be enabled";
}
];
boot.zfs.extraPools = [ cfg.poolName ];
networking.firewall = {
allowedTCPPorts =
lib.flatten (
lib.mapAttrsToList
(_: networkCfg: [
networkCfg.reth.p2pPort
networkCfg.lighthouse.p2pPort
])
enabledNetworks
);
allowedUDPPorts =
lib.flatten (
lib.mapAttrsToList
(_: networkCfg: [
networkCfg.reth.discoveryPort
networkCfg.lighthouse.discoveryPort
networkCfg.lighthouse.quicPort
])
enabledNetworks
);
};
virtualisation.oci-containers.containers =
(lib.mapAttrs'
(network: networkCfg:
lib.nameValuePair (rethContainerName network) (mkRethContainer network networkCfg))
enabledNetworks)
// (lib.mapAttrs'
(network: networkCfg:
lib.nameValuePair (lighthouseContainerName network) (mkLighthouseContainer network networkCfg))
enabledNetworks);
systemd.services =
{
every-channel-ethereum-storage = {
description = "every.channel Ethereum NVMe ZFS pool and dataset bootstrap";
wantedBy = [ "multi-user.target" ];
after = [ "local-fs.target" "zfs.target" ];
wants = [ "zfs.target" ];
before =
lib.flatten (
lib.mapAttrsToList
(network: _: [
"podman-${rethContainerName network}.service"
"podman-${lighthouseContainerName network}.service"
])
enabledNetworks
);
path = with pkgs; [
coreutils
openssl
util-linux
zfs
];
serviceConfig = {
Type = "oneshot";
RemainAfterExit = true;
};
script = ''
set -euo pipefail
pool=${lib.escapeShellArg cfg.poolName}
root_dir=${lib.escapeShellArg cfg.rootDir}
device=${lib.escapeShellArg cfg.poolDevice}
ensure_dataset() {
local dataset="$1"
if ! zfs list -H "$dataset" >/dev/null 2>&1; then
zfs create -p "$dataset"
fi
zfs set atime=off compression=lz4 xattr=sa "$dataset" >/dev/null
}
ensure_jwt() {
local path="$1"
if [[ ! -s "$path" ]]; then
umask 077
openssl rand -hex 32 | tr -d '\n' > "$path"
printf '\n' >> "$path"
fi
chmod 0400 "$path"
}
if ! zpool list -H "$pool" >/dev/null 2>&1; then
if [[ -z "$device" ]]; then
echo "every-channel-ethereum-storage: missing poolDevice for pool $pool" >&2
exit 1
fi
if [[ ! -b "$device" ]]; then
echo "every-channel-ethereum-storage: device $device not present" >&2
exit 1
fi
if blkid "$device" >/dev/null 2>&1; then
echo "every-channel-ethereum-storage: device $device already has signatures; refusing to overwrite automatically" >&2
exit 1
fi
zpool create -f \
-o ashift=12 \
-O mountpoint="$root_dir" \
-O atime=off \
-O compression=lz4 \
-O xattr=sa \
"$pool" "$device"
else
zfs set mountpoint="$root_dir" "$pool" >/dev/null
fi
${networkDatasetLines}
'';
};
}
// (lib.mapAttrs'
(network: networkCfg:
lib.nameValuePair "podman-${rethContainerName network}" {
after = [ "network-online.target" "every-channel-ethereum-storage.service" ];
wants = [ "network-online.target" "every-channel-ethereum-storage.service" ];
requires = [ "every-channel-ethereum-storage.service" ];
unitConfig.RequiresMountsFor = [ networkCfg.rootDir ];
})
enabledNetworks)
// (lib.mapAttrs'
(network: networkCfg:
lib.nameValuePair "podman-${lighthouseContainerName network}" {
after = [
"network-online.target"
"every-channel-ethereum-storage.service"
"podman-${rethContainerName network}.service"
];
wants = [
"network-online.target"
"every-channel-ethereum-storage.service"
"podman-${rethContainerName network}.service"
];
requires = [
"every-channel-ethereum-storage.service"
"podman-${rethContainerName network}.service"
];
unitConfig.RequiresMountsFor = [ networkCfg.rootDir ];
})
enabledNetworks);
services.caddy.virtualHosts = lib.mkIf (cfg.publicHost != null) {
"${cfg.publicHost}".extraConfig = ''
encode zstd gzip
handle /mainnet/sync {
uri replace /mainnet/sync /eth/v1/node/syncing
reverse_proxy http://127.0.0.1:${toString cfg.mainnet.lighthouse.httpPort}
}
handle /mainnet/finality {
uri replace /mainnet/finality /eth/v1/beacon/states/head/finality_checkpoints
reverse_proxy http://127.0.0.1:${toString cfg.mainnet.lighthouse.httpPort}
}
handle /sepolia/sync {
uri replace /sepolia/sync /eth/v1/node/syncing
reverse_proxy http://127.0.0.1:${toString cfg.sepolia.lighthouse.httpPort}
}
handle /sepolia/finality {
uri replace /sepolia/finality /eth/v1/beacon/states/head/finality_checkpoints
reverse_proxy http://127.0.0.1:${toString cfg.sepolia.lighthouse.httpPort}
}
handle {
header Content-Type text/plain
respond "${caddyRootBody}" 200
}
'';
};
};
}

View file

@ -266,6 +266,70 @@ in
}; };
}; };
nbc = {
enable = lib.mkOption {
type = lib.types.bool;
default = false;
description = "Enable Linux Chrome + virtual-display support for NBC browser-backed broadcasts.";
};
chromeBinary = lib.mkOption {
type = lib.types.str;
default = "/run/current-system/sw/bin/google-chrome-stable";
description = "Chrome binary used by `ec-node nbc-bootstrap` and `ec-node nbc-wt-publish`.";
};
profileDir = lib.mkOption {
type = lib.types.str;
default = "/var/lib/every-channel/nbc-profile";
description = "Persistent Chrome profile directory used for NBC / Adobe auth sessions.";
};
authScreenshotDir = lib.mkOption {
type = lib.types.str;
default = "/var/lib/every-channel/nbc-auth";
description = "Directory for operator-facing screenshots when bootstrap hits an interactive auth page.";
};
display = lib.mkOption {
type = lib.types.str;
default = ":99";
description = "DISPLAY used by the NBC virtual display session.";
};
screen = lib.mkOption {
type = lib.types.str;
default = "1920x1080x24";
description = "Xvfb screen geometry for the NBC virtual display.";
};
noSandbox = lib.mkOption {
type = lib.types.bool;
default = true;
description = "Pass `EVERY_CHANNEL_NBC_NO_SANDBOX=1` for Chrome worker sessions.";
};
vnc = {
enable = lib.mkOption {
type = lib.types.bool;
default = true;
description = "Expose the NBC virtual display over VNC so auth can be completed remotely when needed.";
};
listen = lib.mkOption {
type = lib.types.str;
default = "127.0.0.1";
description = "x11vnc listen address for the NBC virtual display.";
};
port = lib.mkOption {
type = lib.types.port;
default = 5900;
description = "x11vnc TCP port for the NBC virtual display.";
};
};
};
broadcasts = lib.mkOption { broadcasts = lib.mkOption {
type = lib.types.listOf (lib.types.submodule { type = lib.types.listOf (lib.types.submodule {
options = { options = {
@ -283,10 +347,15 @@ in
default = null; default = null;
description = "Optional explicit ffmpeg input URL/file. When set, HDHomeRun settings are ignored for this broadcast."; description = "Optional explicit ffmpeg input URL/file. When set, HDHomeRun settings are ignored for this broadcast.";
}; };
nbcUrl = lib.mkOption {
type = lib.types.nullOr lib.types.str;
default = null;
description = "Optional NBC watch/live URL for a browser-backed relay publish worker.";
};
}; };
}); });
default = [ ]; default = [ ];
description = "List of broadcasts (name + channel, or explicit input) to publish."; description = "List of broadcasts (HDHomeRun, explicit input, or NBC browser-backed URL) to publish.";
}; };
}; };
@ -299,7 +368,7 @@ in
{ {
assertion = assertion =
let let
needsHdhr = builtins.any (b: b.input == null) cfg.broadcasts; needsHdhr = builtins.any (b: b.input == null && b.nbcUrl == null) cfg.broadcasts;
in in
(!needsHdhr) || (cfg.hdhomerun.host != null) || (cfg.hdhomerun.deviceId != null); (!needsHdhr) || (cfg.hdhomerun.host != null) || (cfg.hdhomerun.deviceId != null);
message = "Set services.every-channel.ec-node.hdhomerun.host or .deviceId (required when any broadcast omits `input`)"; message = "Set services.every-channel.ec-node.hdhomerun.host or .deviceId (required when any broadcast omits `input`)";
@ -309,8 +378,20 @@ in
message = "hdhomerun.autoDiscover only applies when hdhomerun.host is unset"; message = "hdhomerun.autoDiscover only applies when hdhomerun.host is unset";
} }
{ {
assertion = builtins.all (b: (b.input != null) || (b.channel != null)) cfg.broadcasts; assertion =
message = "Each broadcast must set either `input` or `channel`"; builtins.all
(b:
(lib.length (lib.filter (value: value != null) [ b.input b.channel b.nbcUrl ])) == 1)
cfg.broadcasts;
message = "Each broadcast must set exactly one of `input`, `channel`, or `nbcUrl`";
}
{
assertion =
let
hasNbcBroadcast = builtins.any (b: b.nbcUrl != null) cfg.broadcasts;
in
(!hasNbcBroadcast) || cfg.nbc.enable;
message = "Set services.every-channel.ec-node.nbc.enable = true before configuring `broadcasts.*.nbcUrl`";
} }
]; ];
@ -318,16 +399,30 @@ in
[ [
"d /run/every-channel 1777 root root - -" "d /run/every-channel 1777 root root - -"
] ]
++ lib.optionals cfg.nbc.enable [
"d /var/lib/every-channel 0750 every-channel every-channel - -"
"d ${cfg.nbc.profileDir} 0750 every-channel every-channel - -"
"d ${cfg.nbc.authScreenshotDir} 0750 every-channel every-channel - -"
]
++ lib.optionals cfg.archive.enable [ ++ lib.optionals cfg.archive.enable [
"d ${cfg.archive.outputDir} 0750 root root - -" "d ${cfg.archive.outputDir} 0750 root root - -"
"d ${cfg.archive.manifestDir} 0750 root root - -" "d ${cfg.archive.manifestDir} 0750 root root - -"
]; ];
users.groups.every-channel = lib.mkIf cfg.nbc.enable { };
users.users.every-channel = lib.mkIf cfg.nbc.enable {
isSystemUser = true;
group = "every-channel";
home = "/var/lib/every-channel";
createHome = true;
};
systemd.services = systemd.services =
lib.listToAttrs (map lib.listToAttrs (map
(b: (b:
let let
unit = "every-channel-wt-publish-${sanitizeUnitName b.name}"; unit = "every-channel-wt-publish-${sanitizeUnitName b.name}";
isNbc = b.nbcUrl != null;
runner = pkgs.writeShellApplication { runner = pkgs.writeShellApplication {
name = unit; name = unit;
runtimeInputs = runtimeInputs =
@ -352,6 +447,7 @@ in
"cmd+=(${lib.concatStringsSep " " (map lib.escapeShellArg cfg.extraArgs)})"; "cmd+=(${lib.concatStringsSep " " (map lib.escapeShellArg cfg.extraArgs)})";
explicitInputStr = if b.input == null then "" else b.input; explicitInputStr = if b.input == null then "" else b.input;
channelStr = if b.channel == null then "" else b.channel; channelStr = if b.channel == null then "" else b.channel;
nbcUrlStr = if b.nbcUrl == null then "" else b.nbcUrl;
controlEndpointOutPath = "/run/every-channel/control-peer-${sanitizeUnitName b.name}.json"; controlEndpointOutPath = "/run/every-channel/control-peer-${sanitizeUnitName b.name}.json";
controlDiscoveryStr = if cfg.control.discovery == null then "" else cfg.control.discovery; controlDiscoveryStr = if cfg.control.discovery == null then "" else cfg.control.discovery;
controlIrohSecretStr = if cfg.control.irohSecret == null then "" else cfg.control.irohSecret; controlIrohSecretStr = if cfg.control.irohSecret == null then "" else cfg.control.irohSecret;
@ -360,14 +456,16 @@ in
'' ''
set -euo pipefail set -euo pipefail
nbc_url=${lib.escapeShellArg nbcUrlStr}
input="" input=""
if [[ -z "$nbc_url" ]]; then
explicit_input=${lib.escapeShellArg explicitInputStr} explicit_input=${lib.escapeShellArg explicitInputStr}
if [[ -n "$explicit_input" ]]; then if [[ -n "$explicit_input" ]]; then
input="$explicit_input" input="$explicit_input"
else else
ch=${lib.escapeShellArg channelStr} ch=${lib.escapeShellArg channelStr}
if [[ -z "$ch" ]]; then if [[ -z "$ch" ]]; then
echo "ec-node: broadcast missing both input and channel" >&2 echo "ec-node: broadcast missing input, channel, and nbcUrl" >&2
exit 2 exit 2
fi fi
@ -401,10 +499,8 @@ in
} }
if ${lib.boolToString cfg.hdhomerun.autoDiscover}; then if ${lib.boolToString cfg.hdhomerun.autoDiscover}; then
# Primary: UDP broadcast discover.
base="$(${cfg.discoveryPackage}/bin/ec-cli discover | jq -r --arg id "$dev_id" '.[] | select(.id == $id) | .base_url // empty' | head -n1 || true)" base="$(${cfg.discoveryPackage}/bin/ec-cli discover | jq -r --arg id "$dev_id" '.[] | select(.id == $id) | .base_url // empty' | head -n1 || true)"
# Fallback: probe known neighbors for /discover.json (fast; avoids full /24 scan).
if [[ -z "$base" ]]; then if [[ -z "$base" ]]; then
while read -r ip; do while read -r ip; do
found="$(try_ip "$ip" || true)" found="$(try_ip "$ip" || true)"
@ -415,7 +511,6 @@ in
done < <(ip neigh | awk '{print $1}' | sort -u) done < <(ip neigh | awk '{print $1}' | sort -u)
fi fi
# Fallback: scan local /24 subnets for /discover.json (slow; worst-case ~50s).
if [[ -z "$base" ]]; then if [[ -z "$base" ]]; then
while read -r cidr; do while read -r cidr; do
ip_addr="''${cidr%/*}" ip_addr="''${cidr%/*}"
@ -442,7 +537,6 @@ in
exit 2 exit 2
fi fi
else else
# Best-effort mDNS convention.
base="http://$dev_id.local" base="http://$dev_id.local"
fi fi
fi fi
@ -452,14 +546,23 @@ in
base="http://$base" base="http://$base"
fi fi
# HDHomeRun streaming is on port 5004, regardless of the discover BaseURL.
hostport="''${base#http://}" hostport="''${base#http://}"
hostport="''${hostport#https://}" hostport="''${hostport#https://}"
hostport="''${hostport%%/*}" hostport="''${hostport%%/*}"
host="''${hostport%%:*}" host="''${hostport%%:*}"
input="http://$host:5004/auto/v$ch" input="http://$host:5004/auto/v$ch"
fi fi
fi
if [[ -n "$nbc_url" ]]; then
cmd=(
${lib.escapeShellArg "${cfg.package}/bin/ec-node"}
nbc-wt-publish
--url ${lib.escapeShellArg cfg.relayUrl}
--name ${lib.escapeShellArg b.name}
--source-url "$nbc_url"
)
else
cmd=( cmd=(
${lib.escapeShellArg "${cfg.package}/bin/ec-node"} ${lib.escapeShellArg "${cfg.package}/bin/ec-node"}
wt-publish wt-publish
@ -468,6 +571,7 @@ in
--input "$input" --input "$input"
) )
${lib.optionalString (!cfg.transcode) "cmd+=(--transcode=false)"} ${lib.optionalString (!cfg.transcode) "cmd+=(--transcode=false)"}
fi
${lib.optionalString (!cfg.passthrough) "cmd+=(--passthrough=false)"} ${lib.optionalString (!cfg.passthrough) "cmd+=(--passthrough=false)"}
${lib.optionalString cfg.tlsDisableVerify "cmd+=(--tls-disable-verify)"} ${lib.optionalString cfg.tlsDisableVerify "cmd+=(--tls-disable-verify)"}
${lib.optionalString cfg.control.enable '' ${lib.optionalString cfg.control.enable ''
@ -503,8 +607,12 @@ in
value = { value = {
description = "every.channel WebTransport publish (${b.name} -> ${cfg.relayUrl})"; description = "every.channel WebTransport publish (${b.name} -> ${cfg.relayUrl})";
wantedBy = [ "multi-user.target" ]; wantedBy = [ "multi-user.target" ];
after = [ "network-online.target" ]; after =
wants = [ "network-online.target" ]; [ "network-online.target" ]
++ lib.optionals isNbc [ "every-channel-nbc-display.service" ];
wants =
[ "network-online.target" ]
++ lib.optionals isNbc [ "every-channel-nbc-display.service" ];
# Keep the unit from entering "failed" due to rapid restarts (deploy-flake treats # Keep the unit from entering "failed" due to rapid restarts (deploy-flake treats
# failed units during `switch-to-configuration test` as a deployment failure). # failed units during `switch-to-configuration test` as a deployment failure).
@ -521,23 +629,34 @@ in
Restart = "always"; Restart = "always";
RestartSec = 2; RestartSec = 2;
DynamicUser = true; DynamicUser = !isNbc;
User = lib.mkIf isNbc "every-channel";
Group = lib.mkIf isNbc "every-channel";
NoNewPrivileges = true; NoNewPrivileges = true;
PrivateTmp = true; PrivateTmp = !isNbc;
ProtectSystem = "strict"; ProtectSystem = "strict";
ProtectHome = true; ProtectHome = !isNbc;
ProtectKernelTunables = true; ProtectKernelTunables = true;
ProtectKernelModules = true; ProtectKernelModules = true;
ProtectControlGroups = true; ProtectControlGroups = true;
LockPersonality = true; LockPersonality = true;
MemoryDenyWriteExecute = true; MemoryDenyWriteExecute = !isNbc;
RestrictSUIDSGID = true; RestrictSUIDSGID = true;
RestrictRealtime = true; RestrictRealtime = true;
SystemCallArchitectures = "native"; SystemCallArchitectures = "native";
ReadWritePaths = lib.optionals cfg.control.enable [ "/run/every-channel" ]; ReadWritePaths =
lib.optionals cfg.control.enable [ "/run/every-channel" ]
++ lib.optionals isNbc [ cfg.nbc.profileDir cfg.nbc.authScreenshotDir ];
}; };
environment = cfg.environment; environment =
cfg.environment
// lib.optionalAttrs isNbc {
DISPLAY = cfg.nbc.display;
EVERY_CHANNEL_NBC_CHROME_PATH = cfg.nbc.chromeBinary;
EVERY_CHANNEL_NBC_PROFILE_DIR = cfg.nbc.profileDir;
EVERY_CHANNEL_NBC_NO_SANDBOX = if cfg.nbc.noSandbox then "1" else "0";
};
}; };
}) })
cfg.broadcasts) cfg.broadcasts)
@ -852,6 +971,112 @@ in
environment = cfg.environment; environment = cfg.environment;
}; };
}); })
// lib.optionalAttrs cfg.nbc.enable
(let
displayUnit = "every-channel-nbc-display";
displayNumber = lib.strings.removePrefix ":" cfg.nbc.display;
displayRunner = pkgs.writeShellApplication {
name = displayUnit;
runtimeInputs = [
pkgs.coreutils
pkgs.xorg.xorgserver
];
text = ''
set -euo pipefail
exec ${pkgs.xorg.xorgserver}/bin/Xvfb ${lib.escapeShellArg cfg.nbc.display} \
-screen 0 ${lib.escapeShellArg cfg.nbc.screen} \
-nolisten tcp \
-ac \
+extension RANDR
'';
};
vncUnit = "every-channel-nbc-vnc";
vncRunner = pkgs.writeShellApplication {
name = vncUnit;
runtimeInputs = [
pkgs.x11vnc
];
text = ''
set -euo pipefail
exec ${pkgs.x11vnc}/bin/x11vnc \
-display ${lib.escapeShellArg cfg.nbc.display} \
-forever \
-shared \
-nopw \
-listen ${lib.escapeShellArg cfg.nbc.vnc.listen} \
-rfbport ${toString cfg.nbc.vnc.port}
'';
};
in
({
"${displayUnit}" = {
description = "every.channel NBC virtual display";
wantedBy = [ "multi-user.target" ];
after = [ "network-online.target" ];
wants = [ "network-online.target" ];
serviceConfig = {
Type = "simple";
ExecStart = "${displayRunner}/bin/${displayUnit}";
Restart = "always";
RestartSec = 2;
User = "every-channel";
Group = "every-channel";
NoNewPrivileges = true;
PrivateTmp = false;
ProtectSystem = "strict";
ProtectHome = false;
ProtectKernelTunables = true;
ProtectKernelModules = true;
ProtectControlGroups = true;
LockPersonality = true;
MemoryDenyWriteExecute = false;
RestrictSUIDSGID = true;
RestrictRealtime = true;
SystemCallArchitectures = "native";
ReadWritePaths = [ "/tmp" "/var/lib/every-channel" ];
};
environment = cfg.environment // {
HOME = "/var/lib/every-channel";
};
};
}
// lib.optionalAttrs cfg.nbc.vnc.enable {
"${vncUnit}" = {
description = "every.channel NBC virtual display VNC bridge";
wantedBy = [ "multi-user.target" ];
after = [ "network-online.target" "${displayUnit}.service" ];
wants = [ "network-online.target" "${displayUnit}.service" ];
serviceConfig = {
Type = "simple";
ExecStart = "${vncRunner}/bin/${vncUnit}";
Restart = "always";
RestartSec = 2;
User = "every-channel";
Group = "every-channel";
NoNewPrivileges = true;
PrivateTmp = false;
ProtectSystem = "strict";
ProtectHome = false;
ProtectKernelTunables = true;
ProtectKernelModules = true;
ProtectControlGroups = true;
LockPersonality = true;
MemoryDenyWriteExecute = false;
RestrictSUIDSGID = true;
RestrictRealtime = true;
SystemCallArchitectures = "native";
ReadWritePaths = [ "/tmp" "/var/lib/every-channel" ];
};
environment = cfg.environment // {
DISPLAY = cfg.nbc.display;
HOME = "/var/lib/every-channel";
};
};
}));
}; };
} }

View file

@ -11,6 +11,12 @@ in
./ecp-forge-hardware.nix ./ecp-forge-hardware.nix
]; ];
nixpkgs.config.allowUnfreePredicate = pkg:
builtins.elem (lib.getName pkg) [
"google-chrome"
"google-chrome-stable"
];
networking = { networking = {
hostName = "ecp-forge"; hostName = "ecp-forge";
hostId = "007f0200"; hostId = "007f0200";
@ -252,6 +258,24 @@ in
services.every-channel.ec-node = { services.every-channel.ec-node = {
enable = true; enable = true;
nbc = {
enable = true;
chromeBinary = "${pkgs.google-chrome}/bin/google-chrome-stable";
display = ":120";
screen = "1920x1080x24";
noSandbox = true;
vnc = {
enable = true;
listen = "127.0.0.1";
port = 5900;
};
};
broadcasts = [
{
name = "forge-nbc-sports-philly";
nbcUrl = "https://www.nbc.com/live?brand=nbc-sports-philadelphia";
}
];
archive = { archive = {
enable = true; enable = true;
outputDir = "/tank/every-channel/archive"; outputDir = "/tank/every-channel/archive";
@ -261,6 +285,25 @@ in
}; };
}; };
services.every-channel.ethereum = {
enable = true;
poolName = "eth";
poolDevice = "/dev/disk/by-id/nvme-eui.01000000000000008ce38ee307de5c01";
rootDir = "/eth";
publicIp = "95.216.114.54";
publicHost = "eth.every.channel";
};
services.mullvad-vpn = {
enable = true;
enableExcludeWrapper = true;
};
systemd.services.every-channel-wt-publish-forge-nbc-sports-philly = {
after = [ "mullvad-daemon.service" ];
wants = [ "mullvad-daemon.service" ];
};
services.every-channel.op-stack = { services.every-channel.op-stack = {
enable = hasOpStackSepoliaKey; enable = hasOpStackSepoliaKey;
challengerEnable = hasOpStackChallengerPrestate; challengerEnable = hasOpStackChallengerPrestate;
@ -276,12 +319,19 @@ in
p2pAdvertiseIp = "95.216.114.54"; p2pAdvertiseIp = "95.216.114.54";
}; };
environment.systemPackages = with pkgs; [ environment.systemPackages =
(with pkgs; [
git git
google-chrome
htop htop
jq jq
mullvad-vpn
tmux tmux
x11vnc
zfs zfs
])
++ [
config.services.every-channel.ec-node.package
]; ];
system.stateVersion = "22.11"; system.stateVersion = "22.11";

View file

@ -12,7 +12,17 @@ let
let let
base = baseNameOf path; base = baseNameOf path;
in in
!(base == "target" || base == ".git" || base == ".direnv" || base == "tmp" || base == "node_modules"); !(base == "target"
|| base == ".git"
|| base == ".direnv"
|| base == "tmp"
|| base == "node_modules"
|| base == "out"
|| base == "test-results"
|| base == "deploy"
|| base == "intake"
|| base == "cache"
|| base == ".tower-minimal");
}; };
in in
rustPlatform.buildRustPackage { rustPlatform.buildRustPackage {

View file

@ -1,5 +1,6 @@
{ lib { lib
, rustPlatform , rustPlatform
, rustfmt
, stdenv , stdenv
, pkg-config , pkg-config
, openssl , openssl
@ -14,7 +15,17 @@ let
base = baseNameOf path; base = baseNameOf path;
in in
# Skip typical build outputs and large scratch dirs. # Skip typical build outputs and large scratch dirs.
!(base == "target" || base == ".git" || base == ".direnv" || base == "tmp" || base == "node_modules"); !(base == "target"
|| base == ".git"
|| base == ".direnv"
|| base == "tmp"
|| base == "node_modules"
|| base == "out"
|| base == "test-results"
|| base == "deploy"
|| base == "intake"
|| base == "cache"
|| base == ".tower-minimal");
}; };
in in
rustPlatform.buildRustPackage { rustPlatform.buildRustPackage {
@ -30,6 +41,7 @@ rustPlatform.buildRustPackage {
nativeBuildInputs = [ nativeBuildInputs = [
pkg-config pkg-config
rustfmt
]; ];
buildInputs = buildInputs =