ec-node: add relay CAS archiver and nix auto-archive service

This commit is contained in:
every.channel 2026-02-24 02:50:14 -08:00
parent f70d4a02fd
commit 656ec11c73
No known key found for this signature in database
4 changed files with 823 additions and 6 deletions

View file

@ -32,7 +32,7 @@ use std::collections::{BTreeMap, HashMap, HashSet};
use std::fs;
use std::fs::File;
use std::future::Future;
use std::io::Read;
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::str::FromStr;
@ -48,6 +48,7 @@ const DIRECT_WIRE_TAG_PING: u8 = 0x02;
const DIRECT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(8);
// Conservatively under typical SCTP data channel max message sizes.
const DIRECT_WIRE_CHUNK_BYTES: usize = 16 * 1024;
const WT_ARCHIVE_DEFAULT_TRACKS: &[&str] = &["catalog.json", "init.mp4", "video0.m4s", "audio0.m4s"];
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::RwLock;
@ -80,6 +81,8 @@ enum Commands {
WsSubscribe(WsSubscribeArgs),
/// Publish a CMAF (fMP4) stream to a MoQ relay over WebTransport (Cloudflare preview by default).
WtPublish(WtPublishArgs),
/// Subscribe to a relay broadcast over WebTransport/MoQ and archive groups into CAS.
WtArchive(WtArchiveArgs),
/// Announce stream transport availability over iroh gossip control topic.
ControlAnnounce(ControlAnnounceArgs),
/// Listen for stream transport announcements from iroh gossip control topic.
@ -456,6 +459,30 @@ struct WtPublishArgs {
control_endpoint_addr_out: Option<PathBuf>,
}
#[derive(Parser, Debug)]
struct WtArchiveArgs {
/// Relay URL (WebTransport) to connect to.
#[arg(long, default_value = "https://cdn.moq.dev/anon")]
url: String,
/// Broadcast name to subscribe to.
#[arg(long)]
name: String,
/// Output directory for CAS objects.
#[arg(long, default_value = "./tmp/wt-archive")]
output_dir: PathBuf,
/// Optional manifest/index output directory.
/// Defaults to `<output-dir>/manifests` when omitted.
#[arg(long)]
manifest_dir: Option<PathBuf>,
/// Track names to archive (repeatable).
/// When omitted, defaults to catalog+init+primary audio/video tracks.
#[arg(long)]
track: Vec<String>,
/// Danger: disable TLS verification for the relay.
#[arg(long, default_value_t = false)]
tls_disable_verify: bool,
}
#[derive(Parser, Debug)]
struct ControlAnnounceArgs {
/// Stable stream id to announce.
@ -666,6 +693,7 @@ fn main() -> Result<()> {
Commands::WsPublish(args) => run_async(ws_publish(args))?,
Commands::WsSubscribe(args) => run_async(ws_subscribe(args))?,
Commands::WtPublish(args) => run_async(wt_publish(args))?,
Commands::WtArchive(args) => run_async(wt_archive(args))?,
Commands::ControlAnnounce(args) => run_async(control_announce(args))?,
Commands::ControlListen(args) => run_async(control_listen(args))?,
Commands::ControlResolve(args) => run_async(control_resolve(args))?,
@ -4846,6 +4874,523 @@ fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> {
))
}
#[derive(Debug, serde::Serialize)]
struct ArchiveIndexRecord {
received_unix_ms: u64,
relay_url: String,
broadcast_name: String,
track_name: String,
group_sequence: u64,
frame_count: u64,
size_bytes: usize,
blake3: String,
cas_path: String,
}
fn sanitize_path_component(value: &str) -> String {
let mut out = String::with_capacity(value.len());
for ch in value.chars() {
if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == '.' {
out.push(ch);
} else {
out.push('_');
}
}
if out.is_empty() {
"_".to_string()
} else {
out
}
}
fn default_wt_archive_tracks() -> Vec<String> {
WT_ARCHIVE_DEFAULT_TRACKS
.iter()
.map(|value| (*value).to_string())
.collect()
}
fn cas_store_blob(cas_root: &Path, data: &[u8]) -> Result<(String, PathBuf, bool)> {
let hash = blake3::hash(data).to_hex().to_string();
let shard = &hash[0..2];
let rel_path = PathBuf::from("objects")
.join("blake3")
.join(shard)
.join(format!("{hash}.bin"));
let out_path = cas_root.join(shard).join(format!("{hash}.bin"));
if out_path.exists() {
return Ok((hash, rel_path, false));
}
let parent = out_path
.parent()
.ok_or_else(|| anyhow!("invalid CAS path: {}", out_path.display()))?;
fs::create_dir_all(parent)
.with_context(|| format!("failed to create CAS shard dir {}", parent.display()))?;
let tmp = parent.join(format!(
".{}.tmp-{}-{}",
out_path
.file_name()
.and_then(|f| f.to_str())
.unwrap_or("blob"),
std::process::id(),
SystemTime::now().duration_since(UNIX_EPOCH)?.as_nanos()
));
fs::write(&tmp, data).with_context(|| format!("failed to write {}", tmp.display()))?;
match fs::rename(&tmp, &out_path) {
Ok(()) => {}
Err(err) if out_path.exists() => {
let _ = fs::remove_file(&tmp);
tracing::debug!(
path = %out_path.display(),
err = %err,
"CAS object already exists during rename race"
);
return Ok((hash, rel_path, false));
}
Err(err) => {
let _ = fs::remove_file(&tmp);
return Err(err).with_context(|| {
format!(
"failed to move CAS blob {} -> {}",
tmp.display(),
out_path.display()
)
});
}
}
Ok((hash, rel_path, true))
}
fn append_archive_index_record(path: &Path, record: &ArchiveIndexRecord) -> Result<()> {
let parent = path
.parent()
.ok_or_else(|| anyhow!("invalid manifest path: {}", path.display()))?;
fs::create_dir_all(parent)
.with_context(|| format!("failed to create manifest dir {}", parent.display()))?;
let mut file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.with_context(|| format!("failed to open manifest {}", path.display()))?;
serde_json::to_writer(&mut file, record)
.with_context(|| format!("failed to write manifest {}", path.display()))?;
file.write_all(b"\n")
.with_context(|| format!("failed to newline-terminate {}", path.display()))?;
Ok(())
}
async fn archive_track_loop(
mut track: moq_lite::TrackConsumer,
relay_url: String,
broadcast_name: String,
track_name: String,
cas_root: PathBuf,
manifest_path: PathBuf,
) -> Result<()> {
let mut archived_groups: u64 = 0;
loop {
let next_group = track.next_group().await;
let Some(mut group) = next_group.with_context(|| {
format!(
"track {} read failed for broadcast {}",
track_name, broadcast_name
)
})? else {
return Err(anyhow!(
"track {} ended for broadcast {}",
track_name,
broadcast_name
));
};
let group_sequence = group.info.sequence;
let mut frame_count: u64 = 0;
let mut bytes = Vec::new();
loop {
match group.read_frame().await {
Ok(Some(frame)) => {
frame_count += 1;
bytes.extend_from_slice(&frame);
}
Ok(None) => break,
Err(err) => {
return Err(anyhow!(
"track {} group {} read frame failed: {err:#}",
track_name,
group_sequence
));
}
}
}
if bytes.is_empty() {
continue;
}
let size_bytes = bytes.len();
let (hash, rel_path, inserted) = cas_store_blob(&cas_root, &bytes)?;
let record = ArchiveIndexRecord {
received_unix_ms: now_unix_ms(),
relay_url: relay_url.clone(),
broadcast_name: broadcast_name.clone(),
track_name: track_name.clone(),
group_sequence,
frame_count,
size_bytes,
blake3: hash.clone(),
cas_path: rel_path.display().to_string(),
};
append_archive_index_record(&manifest_path, &record)?;
archived_groups += 1;
if archived_groups % 50 == 0 {
tracing::info!(
relay = %relay_url,
broadcast = %broadcast_name,
track = %track_name,
archived_groups,
inserted,
size_bytes,
hash = %hash,
"archived relay group"
);
}
}
}
async fn wt_archive(args: WtArchiveArgs) -> Result<()> {
let relay_url =
Url::parse(&args.url).with_context(|| format!("invalid relay url: {}", args.url))?;
let relay_url_str = relay_url.to_string();
let mut tracks = if args.track.is_empty() {
default_wt_archive_tracks()
} else {
args.track.clone()
};
tracks.sort();
tracks.dedup();
if tracks.is_empty() {
return Err(anyhow!("no tracks configured for wt-archive"));
}
let cas_root = args.output_dir.join("objects").join("blake3");
let manifest_root = args
.manifest_dir
.unwrap_or_else(|| args.output_dir.join("manifests"));
fs::create_dir_all(&cas_root)
.with_context(|| format!("failed to create CAS root {}", cas_root.display()))?;
fs::create_dir_all(&manifest_root)
.with_context(|| format!("failed to create manifest root {}", manifest_root.display()))?;
let consume = moq_lite::Origin::produce();
let mut consume_updates = consume.consume();
#[derive(Clone)]
struct ProtocolOverride {
inner: web_transport_quinn::Session,
protocol: Option<String>,
}
impl web_transport_trait::Session for ProtocolOverride {
type SendStream = <web_transport_quinn::Session as web_transport_trait::Session>::SendStream;
type RecvStream = <web_transport_quinn::Session as web_transport_trait::Session>::RecvStream;
type Error = <web_transport_quinn::Session as web_transport_trait::Session>::Error;
fn accept_bi(
&self,
) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>>
+ web_transport_trait::MaybeSend {
self.inner.accept_bi()
}
fn accept_uni(
&self,
) -> impl Future<Output = Result<Self::RecvStream, Self::Error>> + web_transport_trait::MaybeSend
{
self.inner.accept_uni()
}
fn open_bi(
&self,
) -> impl Future<Output = Result<(Self::SendStream, Self::RecvStream), Self::Error>>
+ web_transport_trait::MaybeSend {
self.inner.open_bi()
}
fn open_uni(
&self,
) -> impl Future<Output = Result<Self::SendStream, Self::Error>> + web_transport_trait::MaybeSend
{
self.inner.open_uni()
}
fn send_datagram(&self, payload: bytes::Bytes) -> Result<(), Self::Error> {
self.inner.send_datagram(payload)
}
fn recv_datagram(
&self,
) -> impl Future<Output = Result<bytes::Bytes, Self::Error>> + web_transport_trait::MaybeSend
{
self.inner.recv_datagram()
}
fn max_datagram_size(&self) -> usize {
self.inner.max_datagram_size()
}
fn protocol(&self) -> Option<&str> {
self.protocol.as_deref().or_else(|| self.inner.protocol())
}
fn close(&self, code: u32, reason: &str) {
web_transport_trait::Session::close(&self.inner, code, reason)
}
fn closed(&self) -> impl Future<Output = Self::Error> + web_transport_trait::MaybeSend {
self.inner.closed()
}
}
async fn connect_moq_session(
relay_url: &Url,
consume: moq_lite::OriginProducer,
tls_disable_verify: bool,
) -> Result<moq_lite::Session> {
let host = relay_url
.host_str()
.ok_or_else(|| anyhow!("relay url missing host: {relay_url}"))?
.to_string();
let port = relay_url.port().unwrap_or(443);
let mut roots = rustls::RootCertStore::empty();
let native = rustls_native_certs::load_native_certs();
if !native.errors.is_empty() {
tracing::warn!(
errors = ?native.errors,
"some native root certs could not be loaded"
);
}
for cert in native.certs {
let _ = roots.add(cert);
}
let mut tls = rustls::ClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth();
if tls_disable_verify {
#[derive(Debug)]
struct NoCertificateVerification(Arc<rustls::crypto::CryptoProvider>);
impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
fn verify_server_cert(
&self,
_end_entity: &rustls::pki_types::CertificateDer<'_>,
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
_server_name: &rustls::pki_types::ServerName<'_>,
_ocsp: &[u8],
_now: rustls::pki_types::UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error>
{
Ok(rustls::client::danger::ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error>
{
rustls::crypto::verify_tls12_signature(
message,
cert,
dss,
&self.0.signature_verification_algorithms,
)
}
fn verify_tls13_signature(
&self,
message: &[u8],
cert: &rustls::pki_types::CertificateDer<'_>,
dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error>
{
rustls::crypto::verify_tls13_signature(
message,
cert,
dss,
&self.0.signature_verification_algorithms,
)
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
self.0.signature_verification_algorithms.supported_schemes()
}
}
let provider = rustls::crypto::CryptoProvider::get_default()
.cloned()
.unwrap_or_else(|| Arc::new(rustls::crypto::ring::default_provider()));
tls.dangerous()
.set_certificate_verifier(Arc::new(NoCertificateVerification(provider)));
}
tls.alpn_protocols = vec![web_transport_quinn::ALPN.as_bytes().to_vec()];
let socket = std::net::UdpSocket::bind("[::]:0").context("failed to bind UDP socket")?;
let mut transport = quinn::TransportConfig::default();
transport.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap()));
transport.keep_alive_interval(Some(Duration::from_secs(4)));
transport.mtu_discovery_config(None);
let transport = Arc::new(transport);
let runtime = quinn::default_runtime().context("no async runtime")?;
let endpoint_config = quinn::EndpointConfig::default();
let endpoint = quinn::Endpoint::new(endpoint_config, None, socket, runtime)
.context("failed to create QUIC endpoint")?;
let ip = tokio::net::lookup_host((host.clone(), port))
.await
.context("failed DNS lookup")?
.next()
.context("no DNS entries")?;
let quic: quinn::crypto::rustls::QuicClientConfig = tls.try_into()?;
let mut client_cfg = quinn::ClientConfig::new(Arc::new(quic));
client_cfg.transport_config(transport);
tracing::debug!(%ip, %host, %relay_url, "connecting QUIC");
let connection = endpoint
.connect_with(client_cfg, ip, &host)?
.await
.context("failed QUIC connect")?;
let mut request = web_transport_quinn::proto::ConnectRequest::new(relay_url.clone());
for alpn in moq_lite::ALPNS {
request = request.with_protocol(alpn.to_string());
}
let wt = web_transport_quinn::Session::connect(connection, request)
.await
.context("failed WebTransport CONNECT")?;
let client = moq_lite::Client::new().with_consume(consume);
match client.connect(wt.clone()).await {
Ok(session) => {
tracing::info!("connected to relay (native protocol negotiation)");
return Ok(session);
}
Err(err) => {
tracing::debug!(err = %err, "native MoQ SETUP failed; trying protocol overrides");
}
}
let attempts: [&str; 4] = ["moqt-16", "moqt-15", "moq-00", ""];
let mut last_err: Option<anyhow::Error> = None;
for p in attempts {
let session = ProtocolOverride {
inner: wt.clone(),
protocol: (!p.is_empty()).then(|| p.to_string()),
};
match client.connect(session).await {
Ok(session) => {
tracing::info!(protocol = %p, "connected to relay");
return Ok(session);
}
Err(err) => {
last_err = Some(anyhow::Error::new(err));
tracing::debug!(
protocol = %p,
err = %last_err.as_ref().unwrap(),
"MoQ SETUP failed; retrying"
);
}
}
}
Err(last_err.unwrap_or_else(|| anyhow!("failed to connect"))).context("failed MoQ SETUP")
}
tracing::info!(url=%relay_url, name=%args.name, "connecting to relay for archival");
let session = connect_moq_session(&relay_url, consume.clone(), args.tls_disable_verify).await?;
let broadcast = if let Some(active) = consume.consume_broadcast(args.name.as_str()) {
active
} else {
tracing::info!(name=%args.name, "waiting for relay broadcast announcement");
loop {
let Some((path, active)) = consume_updates.announced().await else {
return Err(anyhow!("relay announcement stream closed"));
};
if path.to_string() != args.name {
continue;
}
if let Some(active) = active {
break active;
}
}
};
let broadcast_dir = sanitize_path_component(&args.name);
let mut tasks = tokio::task::JoinSet::new();
for track_name in tracks {
let track = broadcast.subscribe_track(&moq_lite::Track::new(&track_name));
let track_file = sanitize_path_component(&track_name);
let manifest_path = manifest_root
.join(&broadcast_dir)
.join(format!("{track_file}.jsonl"));
tasks.spawn(archive_track_loop(
track,
relay_url_str.clone(),
args.name.clone(),
track_name.clone(),
cas_root.clone(),
manifest_path,
));
tracing::info!(
relay = %relay_url_str,
broadcast = %args.name,
track = %track_name,
"archival track subscribed"
);
}
if tasks.is_empty() {
return Err(anyhow!("no tracks spawned for archival"));
}
loop {
tokio::select! {
next = tasks.join_next() => {
match next {
Some(Ok(Ok(()))) => {
return Err(anyhow!("archival track task exited unexpectedly"));
}
Some(Ok(Err(err))) => return Err(err),
Some(Err(err)) => return Err(anyhow!("archival track task join failure: {err}")),
None => return Err(anyhow!("all archival track tasks exited")),
}
}
_ = session.closed() => {
return Err(anyhow!("relay session closed"));
}
_ = tokio::signal::ctrl_c() => {
tracing::info!("ctrl-c; shutting down wt-archive");
break;
}
}
}
Ok(())
}
async fn wt_publish(args: WtPublishArgs) -> Result<()> {
let relay_url =
Url::parse(&args.url).with_context(|| format!("invalid relay url: {}", args.url))?;