diff --git a/README.md b/README.md index d4b3b94..9ac8840 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,15 @@ cargo run -p ec-node -- wt-archive \ --manifest-dir /var/lib/every-channel/manifests ``` +Replay server (archive -> HLS DVR endpoints): + +```sh +cargo run -p ec-node -- wt-archive-serve \ + --output-dir /tank/every-channel/archive \ + --manifest-dir /var/lib/every-channel/manifests \ + --listen 0.0.0.0:7788 +``` + Control protocol (iroh gossip, relay + direct transport discovery): ```sh diff --git a/apps/web/app.js b/apps/web/app.js index 230aa3a..7198fb6 100644 --- a/apps/web/app.js +++ b/apps/web/app.js @@ -9,9 +9,16 @@ const MOQ_WATCH_MODULE_URLS = [ "https://cdn.jsdelivr.net/npm/@moq/watch@0.1.1/element/+esm", "https://unpkg.com/@moq/watch@0.1.1/element.js?module", ]; +const HLS_MODULE_URLS = [ + "https://esm.sh/hls.js@1.6.2", + "https://cdn.jsdelivr.net/npm/hls.js@1.6.2/+esm", + "https://unpkg.com/hls.js@1.6.2/dist/hls.mjs", +]; const PUBLIC_STREAMS_PATH = "/api/public-streams"; let moqWatchModulePromise = null; +let hlsModulePromise = null; let disposePlayerSignals = null; +let activeHlsPlayer = null; function $(id) { const el = document.getElementById(id); @@ -30,14 +37,16 @@ function normalizeName(s) { return (s || "").trim(); } -function currentShareLink(relayUrl, name) { +function currentShareLink(relayUrl, name, mode) { const u = new URL(window.location.href); u.pathname = "/watch"; u.searchParams.set("url", relayUrl); u.searchParams.set("name", name); + if (mode === "archive") u.searchParams.set("mode", "archive"); + else u.searchParams.delete("mode"); // Avoid leaking other params. for (const k of [...u.searchParams.keys()]) { - if (k !== "url" && k !== "name") u.searchParams.delete(k); + if (k !== "url" && k !== "name" && k !== "mode") u.searchParams.delete(k); } return u.toString(); } @@ -66,6 +75,17 @@ function clearPlayerSignals() { disposePlayerSignals = null; } +function destroyArchivePlayer() { + if (activeHlsPlayer && typeof activeHlsPlayer.destroy === "function") { + try { + activeHlsPlayer.destroy(); + } catch (_) { + // Ignore teardown errors. + } + } + activeHlsPlayer = null; +} + function bindPlayerSignals(watch, name) { const cleanup = []; let offlineTimer = null; @@ -131,6 +151,7 @@ function bindPlayerSignals(watch, name) { function mountPlayer(relayUrl, name) { clearPlayerSignals(); + destroyArchivePlayer(); const mount = $("playerMount"); mount.textContent = ""; @@ -173,6 +194,74 @@ async function ensureMoqWatchElement() { } } +async function ensureHlsPlayerCtor() { + if (window.Hls) return window.Hls; + if (!hlsModulePromise) { + hlsModulePromise = (async () => { + let lastErr = null; + for (const moduleUrl of HLS_MODULE_URLS) { + try { + const mod = await import(moduleUrl); + if (mod?.default) { + window.Hls = mod.default; + } else if (mod?.Hls) { + window.Hls = mod.Hls; + } + } catch (err) { + lastErr = err; + continue; + } + if (window.Hls) return window.Hls; + } + throw lastErr || new Error("hls.js module is unavailable"); + })(); + } + return hlsModulePromise; +} + +async function mountArchivePlayer(name) { + clearPlayerSignals(); + destroyArchivePlayer(); + + const mount = $("playerMount"); + mount.textContent = ""; + + const video = document.createElement("video"); + video.className = "archiveVideo"; + video.controls = true; + video.autoplay = true; + video.muted = false; + video.playsInline = true; + mount.appendChild(video); + + const archiveUrl = `/api/archive/${encodeURIComponent(name)}/master.m3u8`; + if (video.canPlayType("application/vnd.apple.mpegurl")) { + video.src = archiveUrl; + void video.play().catch(() => {}); + return; + } + + const HlsCtor = await ensureHlsPlayerCtor(); + if (!HlsCtor || typeof HlsCtor.isSupported !== "function" || !HlsCtor.isSupported()) { + throw new Error("HLS playback is unsupported in this browser"); + } + + const hls = new HlsCtor({ + liveDurationInfinity: true, + lowLatencyMode: false, + backBufferLength: 120, + }); + activeHlsPlayer = hls; + hls.on(HlsCtor.Events.ERROR, (_event, data) => { + if (data?.fatal) { + setHint(`Archive playback error: ${data.type || "fatal"}`, "warn"); + } + }); + hls.loadSource(archiveUrl); + hls.attachMedia(video); + void video.play().catch(() => {}); +} + async function copyToClipboard(text) { if (!text) return; if (navigator.clipboard && navigator.clipboard.writeText) { @@ -199,20 +288,24 @@ function readParams() { u.searchParams.get("name") || u.searchParams.get("broadcast") || u.searchParams.get("path"); + const mode = u.searchParams.get("mode") === "archive" ? "archive" : "live"; return { relayUrl: normalizeRelayUrl(relay || DEFAULT_RELAY_URL), name: normalizeName(name || ""), + mode, }; } -function writeParams(relayUrl, name) { +function writeParams(relayUrl, name, mode) { const u = new URL(window.location.href); u.pathname = "/watch"; u.searchParams.set("url", relayUrl); u.searchParams.set("name", name); + if (mode === "archive") u.searchParams.set("mode", "archive"); + else u.searchParams.delete("mode"); // Canonicalize by dropping stale aliases/extra params. for (const k of [...u.searchParams.keys()]) { - if (k !== "url" && k !== "name") u.searchParams.delete(k); + if (k !== "url" && k !== "name" && k !== "mode") u.searchParams.delete(k); } window.history.replaceState({}, "", u.toString()); } @@ -221,7 +314,7 @@ function hasWebTransport() { return typeof window.WebTransport !== "undefined"; } -function renderLiveList(entries, onWatch) { +function renderLiveList(entries, onWatchLive, onWatchArchive) { const mount = $("liveList"); mount.textContent = ""; if (!entries.length) { @@ -245,15 +338,28 @@ function renderLiveList(entries, onWatch) { meta.textContent = `${entry.broadcast_name || ""} @ ${entry.relay_url || DEFAULT_RELAY_URL}`; info.appendChild(meta); - const btn = document.createElement("button"); - btn.className = "btn secondary"; - btn.textContent = "Watch"; - btn.addEventListener("click", () => { - onWatch(entry); + const actions = document.createElement("div"); + actions.className = "liveActions"; + + const watchBtn = document.createElement("button"); + watchBtn.className = "btn secondary"; + watchBtn.textContent = "Live"; + watchBtn.addEventListener("click", () => { + onWatchLive(entry); }); + const archiveBtn = document.createElement("button"); + archiveBtn.className = "btn secondary"; + archiveBtn.textContent = "Archive"; + archiveBtn.addEventListener("click", () => { + onWatchArchive(entry); + }); + + actions.appendChild(watchBtn); + actions.appendChild(archiveBtn); + row.appendChild(info); - row.appendChild(btn); + row.appendChild(actions); mount.appendChild(row); } } @@ -271,6 +377,7 @@ async function fetchLiveList() { function main() { const relayInput = $("relayUrl"); const nameInput = $("broadcastName"); + const archiveModeInput = $("archiveMode"); const watchBtn = $("watchBtn"); const copyBtn = $("copyLinkBtn"); const refreshListBtn = $("refreshListBtn"); @@ -278,20 +385,23 @@ function main() { const initial = readParams(); relayInput.value = initial.relayUrl; nameInput.value = initial.name; + archiveModeInput.checked = initial.mode === "archive"; function updateSharePreview() { const relayUrl = normalizeRelayUrl(relayInput.value); const name = normalizeName(nameInput.value); + const mode = archiveModeInput.checked ? "archive" : "live"; if (!name) { setShareLink(""); return; } - setShareLink(currentShareLink(relayUrl, name)); + setShareLink(currentShareLink(relayUrl, name, mode)); } async function start() { const relayUrl = normalizeRelayUrl(relayInput.value); const name = normalizeName(nameInput.value); + const mode = archiveModeInput.checked ? "archive" : "live"; updateSharePreview(); @@ -300,6 +410,20 @@ function main() { return; } + if (mode === "archive") { + writeParams(relayUrl, name, mode); + setHint(`Loading archive DVR: ${name}`, "ok"); + try { + await mountArchivePlayer(name); + } catch (e) { + setHint( + `Archive playback unavailable: ${String(e)}. Ensure /api/archive is configured.`, + "warn", + ); + } + return; + } + if (!hasWebTransport()) { setHint( "WebTransport is not available in this browser. Try Chrome or Firefox Nightly. Safari support is still incomplete.", @@ -318,13 +442,14 @@ function main() { return; } - writeParams(relayUrl, name); + writeParams(relayUrl, name, mode); setHint(`Connecting to relay and subscribing: ${name}`, "ok"); mountPlayer(relayUrl, name); } relayInput.addEventListener("input", updateSharePreview); nameInput.addEventListener("input", updateSharePreview); + archiveModeInput.addEventListener("input", updateSharePreview); watchBtn.addEventListener("click", () => { void start(); @@ -336,11 +461,12 @@ function main() { copyBtn.addEventListener("click", async () => { const relayUrl = normalizeRelayUrl(relayInput.value); const name = normalizeName(nameInput.value); + const mode = archiveModeInput.checked ? "archive" : "live"; if (!name) { setHint("Enter a broadcast name first.", "warn"); return; } - const link = currentShareLink(relayUrl, name); + const link = currentShareLink(relayUrl, name, mode); try { await copyToClipboard(link); setHint("Link copied.", "ok"); @@ -355,12 +481,23 @@ function main() { setListHint("Loading live streams...", ""); try { const entries = await fetchLiveList(); - renderLiveList(entries, (entry) => { - relayInput.value = normalizeRelayUrl(entry.relay_url || DEFAULT_RELAY_URL); - nameInput.value = normalizeName(entry.broadcast_name || ""); - updateSharePreview(); - void start(); - }); + renderLiveList( + entries, + (entry) => { + archiveModeInput.checked = false; + relayInput.value = normalizeRelayUrl(entry.relay_url || DEFAULT_RELAY_URL); + nameInput.value = normalizeName(entry.broadcast_name || ""); + updateSharePreview(); + void start(); + }, + (entry) => { + archiveModeInput.checked = true; + relayInput.value = normalizeRelayUrl(entry.relay_url || DEFAULT_RELAY_URL); + nameInput.value = normalizeName(entry.broadcast_name || ""); + updateSharePreview(); + void start(); + }, + ); } catch (e) { $("liveList").textContent = ""; setListHint(`Live list error: ${String(e)}`, "warn"); diff --git a/apps/web/index.html b/apps/web/index.html index c94b8ce..5f3702e 100644 --- a/apps/web/index.html +++ b/apps/web/index.html @@ -34,6 +34,13 @@
Broadcast name
+
diff --git a/apps/web/style.css b/apps/web/style.css index 3f1bf14..49c941a 100644 --- a/apps/web/style.css +++ b/apps/web/style.css @@ -106,7 +106,7 @@ body { .row { display: grid; - grid-template-columns: 1.15fr 1fr auto; + grid-template-columns: 1.15fr 1fr auto auto; gap: 10px; align-items: end; } @@ -133,6 +133,23 @@ body { box-shadow: 0 0 0 3px rgba(255, 184, 108, 0.16); } +.checkField .checkRow { + display: flex; + align-items: center; + gap: 8px; + min-height: 44px; + padding: 0 10px; + border: 1px solid var(--line); + border-radius: 12px; + background: rgba(0, 0, 0, 0.28); + color: var(--text); +} + +.checkField input[type="checkbox"] { + width: 16px; + height: 16px; +} + .btn { padding: 11px 14px; border-radius: 12px; @@ -221,6 +238,11 @@ body { white-space: nowrap; } +.liveActions { + display: flex; + gap: 6px; +} + .player { padding: 0; } @@ -272,6 +294,13 @@ body { display: block; } +.archiveVideo { + width: 100%; + height: 100%; + display: block; + background: #000; +} + .tv-scanlines { position: absolute; inset: 12px; diff --git a/crates/ec-node/src/main.rs b/crates/ec-node/src/main.rs index a629a5d..f0065fe 100644 --- a/crates/ec-node/src/main.rs +++ b/crates/ec-node/src/main.rs @@ -38,6 +38,8 @@ use std::process::{Command, Stdio}; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; use tokio::process::Command as TokioCommand; use tokio_tungstenite::tungstenite::Message as WsMessage; use url::Url; @@ -48,7 +50,8 @@ const DIRECT_WIRE_TAG_PING: u8 = 0x02; const DIRECT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(8); // Conservatively under typical SCTP data channel max message sizes. const DIRECT_WIRE_CHUNK_BYTES: usize = 16 * 1024; -const WT_ARCHIVE_DEFAULT_TRACKS: &[&str] = &["catalog.json", "init.mp4", "video0.m4s", "audio0.m4s"]; +const WT_ARCHIVE_DEFAULT_TRACKS: &[&str] = + &["catalog.json", "init.mp4", "video0.m4s", "audio0.m4s"]; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::RwLock; @@ -83,6 +86,8 @@ enum Commands { WtPublish(WtPublishArgs), /// Subscribe to a relay broadcast over WebTransport/MoQ and archive groups into CAS. WtArchive(WtArchiveArgs), + /// Serve archived relay groups as DVR-style HLS playlists + object endpoints. + WtArchiveServe(WtArchiveServeArgs), /// Announce stream transport availability over iroh gossip control topic. ControlAnnounce(ControlAnnounceArgs), /// Listen for stream transport announcements from iroh gossip control topic. @@ -483,6 +488,20 @@ struct WtArchiveArgs { tls_disable_verify: bool, } +#[derive(Parser, Debug)] +struct WtArchiveServeArgs { + /// Output directory used by `wt-archive`. + #[arg(long, default_value = "./tmp/wt-archive")] + output_dir: PathBuf, + /// Optional manifest/index directory. + /// Defaults to `/manifests` when omitted. + #[arg(long)] + manifest_dir: Option, + /// TCP listen address for HTTP replay endpoints. + #[arg(long, default_value = "0.0.0.0:7788")] + listen: String, +} + #[derive(Parser, Debug)] struct ControlAnnounceArgs { /// Stable stream id to announce. @@ -694,6 +713,7 @@ fn main() -> Result<()> { Commands::WsSubscribe(args) => run_async(ws_subscribe(args))?, Commands::WtPublish(args) => run_async(wt_publish(args))?, Commands::WtArchive(args) => run_async(wt_archive(args))?, + Commands::WtArchiveServe(args) => run_async(wt_archive_serve(args))?, Commands::ControlAnnounce(args) => run_async(control_announce(args))?, Commands::ControlListen(args) => run_async(control_listen(args))?, Commands::ControlResolve(args) => run_async(control_resolve(args))?, @@ -4874,7 +4894,7 @@ fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> { )) } -#[derive(Debug, serde::Serialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] struct ArchiveIndexRecord { received_unix_ms: u64, relay_url: String, @@ -5000,7 +5020,8 @@ async fn archive_track_loop( "track {} read failed for broadcast {}", track_name, broadcast_name ) - })? else { + })? + else { return Err(anyhow!( "track {} ended for broadcast {}", track_name, @@ -5097,8 +5118,10 @@ async fn wt_archive(args: WtArchiveArgs) -> Result<()> { } impl web_transport_trait::Session for ProtocolOverride { - type SendStream = ::SendStream; - type RecvStream = ::RecvStream; + type SendStream = + ::SendStream; + type RecvStream = + ::RecvStream; type Error = ::Error; fn accept_bi( @@ -5391,6 +5414,544 @@ async fn wt_archive(args: WtArchiveArgs) -> Result<()> { Ok(()) } +#[derive(Debug, Clone)] +struct ArchiveReplayState { + cas_root: PathBuf, + manifest_root: PathBuf, +} + +#[derive(Debug, serde::Serialize)] +struct ArchiveTimelineResponse { + broadcast_name: String, + start_unix_ms: Option, + end_unix_ms: Option, + video_segments: usize, + audio_segments: usize, +} + +#[derive(Debug, Clone)] +struct ArchiveHlsSegment { + sequence: u64, + duration_secs: f64, + hash: String, +} + +#[derive(Debug)] +struct ArchiveHttpResponse { + status: u16, + content_type: String, + body: Vec, +} + +fn read_archive_records( + manifest_root: &Path, + broadcast_name: &str, + track_name: &str, +) -> Result> { + let broadcast_dir = sanitize_path_component(broadcast_name); + let track_file = sanitize_path_component(track_name); + let path = manifest_root + .join(broadcast_dir) + .join(format!("{track_file}.jsonl")); + if !path.exists() { + return Ok(Vec::new()); + } + let mut out = Vec::new(); + let data = fs::read_to_string(&path) + .with_context(|| format!("failed to read archive index {}", path.display()))?; + for line in data.lines() { + let line = line.trim(); + if line.is_empty() { + continue; + } + match serde_json::from_str::(line) { + Ok(record) => out.push(record), + Err(err) => { + tracing::warn!( + path = %path.display(), + err = %err, + "failed to parse archive index line" + ); + } + } + } + Ok(out) +} + +fn dedupe_by_group_sequence(mut records: Vec) -> Vec { + records.sort_by_key(|record| record.group_sequence); + let mut out: Vec = Vec::with_capacity(records.len()); + for record in records { + if let Some(last) = out.last_mut() { + if last.group_sequence == record.group_sequence { + *last = record; + continue; + } + } + out.push(record); + } + out +} + +fn default_segment_duration_ms(records: &[ArchiveIndexRecord]) -> u64 { + if records.len() < 2 { + return 1000; + } + let mut deltas = Vec::with_capacity(records.len().saturating_sub(1)); + for pair in records.windows(2) { + let current = pair[0].received_unix_ms; + let next = pair[1].received_unix_ms; + if next > current { + deltas.push(next - current); + } + } + if deltas.is_empty() { + return 1000; + } + deltas.sort_unstable(); + deltas[deltas.len() / 2].clamp(200, 10000) +} + +fn build_hls_segments(records: &[ArchiveIndexRecord]) -> Vec { + if records.is_empty() { + return Vec::new(); + } + let fallback_ms = default_segment_duration_ms(records); + let mut out = Vec::with_capacity(records.len()); + for (idx, record) in records.iter().enumerate() { + let mut dur_ms = fallback_ms; + if let Some(next) = records.get(idx + 1) { + if next.received_unix_ms > record.received_unix_ms { + dur_ms = (next.received_unix_ms - record.received_unix_ms).clamp(200, 10000); + } + } + out.push(ArchiveHlsSegment { + sequence: record.group_sequence, + duration_secs: (dur_ms as f64) / 1000.0, + hash: record.blake3.clone(), + }); + } + out +} + +fn latest_init_hash(manifest_root: &Path, broadcast_name: &str) -> Result> { + let mut init_records = dedupe_by_group_sequence(read_archive_records( + manifest_root, + broadcast_name, + "init.mp4", + )?); + Ok(init_records.pop().map(|record| record.blake3)) +} + +fn parse_limit(url: &Url) -> Option { + url.query_pairs() + .find(|(k, _)| k == "limit") + .and_then(|(_, v)| v.parse::().ok()) +} + +fn parse_from_ms(url: &Url) -> Option { + url.query_pairs() + .find(|(k, _)| k == "from_ms") + .and_then(|(_, v)| v.parse::().ok()) +} + +fn parse_archive_track( + manifest_root: &Path, + broadcast_name: &str, + track_name: &str, + from_ms: Option, + limit: Option, +) -> Result> { + let mut records = dedupe_by_group_sequence(read_archive_records( + manifest_root, + broadcast_name, + track_name, + )?); + if let Some(start_ms) = from_ms { + records.retain(|record| record.received_unix_ms >= start_ms); + } + if let Some(max_items) = limit { + if max_items > 0 && records.len() > max_items { + records = records.split_off(records.len() - max_items); + } + } + Ok(records) +} + +fn hls_playlist_for_track( + broadcast_name: &str, + track_name: &str, + init_hash: &str, + records: &[ArchiveIndexRecord], +) -> String { + let segments = build_hls_segments(records); + let target_duration = segments + .iter() + .map(|segment| segment.duration_secs.ceil() as u64) + .max() + .unwrap_or(2) + .max(1); + let media_sequence = segments + .first() + .map(|segment| segment.sequence) + .unwrap_or(0); + + let mut out = String::new(); + out.push_str("#EXTM3U\n"); + out.push_str("#EXT-X-VERSION:7\n"); + out.push_str("#EXT-X-PLAYLIST-TYPE:EVENT\n"); + out.push_str("#EXT-X-INDEPENDENT-SEGMENTS\n"); + out.push_str(&format!("#EXT-X-TARGETDURATION:{target_duration}\n")); + out.push_str(&format!("#EXT-X-MEDIA-SEQUENCE:{media_sequence}\n")); + out.push_str(&format!( + "#EXT-X-MAP:URI=\"/archive/{}/init.mp4?hash={}\"\n", + urlencoding::encode(broadcast_name), + init_hash + )); + for segment in segments { + out.push_str(&format!("#EXTINF:{:.3},\n", segment.duration_secs)); + out.push_str(&format!( + "/archive/{}/segment/{}.m4s?track={}\n", + urlencoding::encode(broadcast_name), + segment.hash, + urlencoding::encode(track_name) + )); + } + out +} + +fn validate_blake3_hex(hash: &str) -> bool { + hash.len() == 64 && hash.bytes().all(|b| b.is_ascii_hexdigit()) +} + +fn cas_path_for_hash(cas_root: &Path, hash: &str) -> Result { + if !validate_blake3_hex(hash) { + return Err(anyhow!("invalid hash")); + } + let shard = &hash[0..2]; + Ok(cas_root.join(shard).join(format!("{hash}.bin"))) +} + +fn archive_response(status: u16, content_type: &str, body: Vec) -> ArchiveHttpResponse { + ArchiveHttpResponse { + status, + content_type: content_type.to_string(), + body, + } +} + +fn archive_error(status: u16, message: &str) -> ArchiveHttpResponse { + archive_response( + status, + "application/json; charset=utf-8", + serde_json::json!({ "error": message }) + .to_string() + .into_bytes(), + ) +} + +fn archive_status_text(status: u16) -> &'static str { + match status { + 200 => "OK", + 204 => "No Content", + 400 => "Bad Request", + 404 => "Not Found", + 405 => "Method Not Allowed", + 500 => "Internal Server Error", + 502 => "Bad Gateway", + _ => "OK", + } +} + +fn handle_archive_http_request( + state: &ArchiveReplayState, + method: &str, + target: &str, +) -> ArchiveHttpResponse { + if method == "OPTIONS" { + return archive_response(204, "text/plain; charset=utf-8", Vec::new()); + } + if method != "GET" && method != "HEAD" { + return archive_error(405, "method not allowed"); + } + + let req_url = match Url::parse(&format!("http://localhost{target}")) { + Ok(url) => url, + Err(_) => return archive_error(400, "invalid url"), + }; + let path = req_url.path(); + if path == "/archive/healthz" { + return archive_response( + 200, + "application/json; charset=utf-8", + serde_json::json!({ "ok": true }).to_string().into_bytes(), + ); + } + + let parts = path.trim_start_matches('/').split('/').collect::>(); + if parts.len() < 3 || parts[0] != "archive" { + return archive_error(404, "not found"); + } + + let broadcast_name = match urlencoding::decode(parts[1]) { + Ok(v) => v.to_string(), + Err(_) => return archive_error(400, "invalid broadcast"), + }; + if broadcast_name.is_empty() { + return archive_error(400, "missing broadcast"); + } + + let from_ms = parse_from_ms(&req_url); + let limit = parse_limit(&req_url); + + match parts[2] { + "timeline.json" if parts.len() == 3 => { + let video = match parse_archive_track( + &state.manifest_root, + &broadcast_name, + "video0.m4s", + from_ms, + limit, + ) { + Ok(records) => records, + Err(err) => return archive_error(500, &format!("{err:#}")), + }; + let audio = match parse_archive_track( + &state.manifest_root, + &broadcast_name, + "audio0.m4s", + from_ms, + limit, + ) { + Ok(records) => records, + Err(err) => return archive_error(500, &format!("{err:#}")), + }; + let min_ms = video + .first() + .map(|record| record.received_unix_ms) + .into_iter() + .chain(audio.first().map(|record| record.received_unix_ms)) + .min(); + let max_ms = video + .last() + .map(|record| record.received_unix_ms) + .into_iter() + .chain(audio.last().map(|record| record.received_unix_ms)) + .max(); + let body = ArchiveTimelineResponse { + broadcast_name, + start_unix_ms: min_ms, + end_unix_ms: max_ms, + video_segments: video.len(), + audio_segments: audio.len(), + }; + match serde_json::to_vec(&body) { + Ok(buf) => archive_response(200, "application/json; charset=utf-8", buf), + Err(err) => archive_error(500, &format!("{err:#}")), + } + } + "master.m3u8" if parts.len() == 3 => { + let encoded = urlencoding::encode(&broadcast_name); + let playlist = format!( + "#EXTM3U\n#EXT-X-VERSION:7\n#EXT-X-MEDIA:TYPE=AUDIO,GROUP-ID=\"audio\",NAME=\"main\",DEFAULT=YES,AUTOSELECT=YES,URI=\"/archive/{encoded}/audio.m3u8\"\n#EXT-X-STREAM-INF:BANDWIDTH=3500000,CODECS=\"avc1.640028,mp4a.40.2\",AUDIO=\"audio\"\n/archive/{encoded}/video.m3u8\n" + ); + archive_response( + 200, + "application/vnd.apple.mpegurl; charset=utf-8", + playlist.into_bytes(), + ) + } + "video.m3u8" if parts.len() == 3 => { + let init_hash = match latest_init_hash(&state.manifest_root, &broadcast_name) { + Ok(Some(hash)) => hash, + Ok(None) => return archive_error(404, "missing init segment"), + Err(err) => return archive_error(500, &format!("{err:#}")), + }; + let records = match parse_archive_track( + &state.manifest_root, + &broadcast_name, + "video0.m4s", + from_ms, + limit, + ) { + Ok(records) => records, + Err(err) => return archive_error(500, &format!("{err:#}")), + }; + if records.is_empty() { + return archive_error(404, "video archive is empty"); + } + let playlist = + hls_playlist_for_track(&broadcast_name, "video0.m4s", &init_hash, &records); + archive_response( + 200, + "application/vnd.apple.mpegurl; charset=utf-8", + playlist.into_bytes(), + ) + } + "audio.m3u8" if parts.len() == 3 => { + let init_hash = match latest_init_hash(&state.manifest_root, &broadcast_name) { + Ok(Some(hash)) => hash, + Ok(None) => return archive_error(404, "missing init segment"), + Err(err) => return archive_error(500, &format!("{err:#}")), + }; + let records = match parse_archive_track( + &state.manifest_root, + &broadcast_name, + "audio0.m4s", + from_ms, + limit, + ) { + Ok(records) => records, + Err(err) => return archive_error(500, &format!("{err:#}")), + }; + if records.is_empty() { + return archive_error(404, "audio archive is empty"); + } + let playlist = + hls_playlist_for_track(&broadcast_name, "audio0.m4s", &init_hash, &records); + archive_response( + 200, + "application/vnd.apple.mpegurl; charset=utf-8", + playlist.into_bytes(), + ) + } + "init.mp4" if parts.len() == 3 => { + let hash = req_url + .query_pairs() + .find(|(k, _)| k == "hash") + .map(|(_, v)| v.to_string()) + .or_else(|| { + latest_init_hash(&state.manifest_root, &broadcast_name) + .ok() + .flatten() + }); + let Some(hash) = hash else { + return archive_error(404, "missing init segment"); + }; + let path = match cas_path_for_hash(&state.cas_root, &hash) { + Ok(path) => path, + Err(_) => return archive_error(400, "invalid hash"), + }; + match fs::read(path) { + Ok(bytes) => archive_response(200, "video/mp4", bytes), + Err(_) => archive_error(404, "init segment not found"), + } + } + "segment" if parts.len() == 4 => { + let hash_part = parts[3].strip_suffix(".m4s").unwrap_or(parts[3]); + let path = match cas_path_for_hash(&state.cas_root, hash_part) { + Ok(path) => path, + Err(_) => return archive_error(400, "invalid hash"), + }; + match fs::read(path) { + Ok(bytes) => archive_response(200, "video/mp4", bytes), + Err(_) => archive_error(404, "segment not found"), + } + } + _ => archive_error(404, "not found"), + } +} + +async fn handle_archive_http_connection( + mut stream: tokio::net::TcpStream, + state: ArchiveReplayState, +) -> Result<()> { + let mut buf = Vec::with_capacity(4096); + let mut tmp = [0u8; 2048]; + loop { + let read = stream + .read(&mut tmp) + .await + .context("failed to read replay request")?; + if read == 0 { + return Ok(()); + } + buf.extend_from_slice(&tmp[..read]); + if buf.windows(4).any(|w| w == b"\r\n\r\n") { + break; + } + if buf.len() > 16 * 1024 { + let response = archive_error(400, "request headers too large"); + let head = format!( + "HTTP/1.1 {} {}\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\nAccess-Control-Allow-Origin: *\r\nAccess-Control-Allow-Methods: GET,HEAD,OPTIONS\r\nAccess-Control-Allow-Headers: *\r\nCache-Control: no-store\r\n\r\n", + response.status, + archive_status_text(response.status), + response.content_type, + response.body.len(), + ); + stream.write_all(head.as_bytes()).await?; + return Ok(()); + } + } + + let req = String::from_utf8_lossy(&buf); + let mut lines = req.lines(); + let first = lines.next().unwrap_or_default(); + let mut parts = first.split_whitespace(); + let method = parts.next().unwrap_or(""); + let target = parts.next().unwrap_or("/"); + let response = handle_archive_http_request(&state, method, target); + let is_head = method == "HEAD"; + let body_len = if is_head { 0 } else { response.body.len() }; + let head = format!( + "HTTP/1.1 {} {}\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\nAccess-Control-Allow-Origin: *\r\nAccess-Control-Allow-Methods: GET,HEAD,OPTIONS\r\nAccess-Control-Allow-Headers: *\r\nCache-Control: no-store\r\n\r\n", + response.status, + archive_status_text(response.status), + response.content_type, + body_len + ); + stream + .write_all(head.as_bytes()) + .await + .context("failed to write replay response headers")?; + if !is_head { + stream + .write_all(&response.body) + .await + .context("failed to write replay response body")?; + } + Ok(()) +} + +async fn wt_archive_serve(args: WtArchiveServeArgs) -> Result<()> { + let manifest_root = args + .manifest_dir + .unwrap_or_else(|| args.output_dir.join("manifests")); + let cas_root = args.output_dir.join("objects").join("blake3"); + fs::create_dir_all(&manifest_root) + .with_context(|| format!("failed to create manifest dir {}", manifest_root.display()))?; + fs::create_dir_all(&cas_root) + .with_context(|| format!("failed to create CAS dir {}", cas_root.display()))?; + + let listener = TcpListener::bind(&args.listen) + .await + .with_context(|| format!("failed to bind archive replay listener {}", args.listen))?; + let local = listener + .local_addr() + .context("failed to read listen addr")?; + tracing::info!( + listen = %local, + manifest_root = %manifest_root.display(), + cas_root = %cas_root.display(), + "archive replay server listening" + ); + + let state = ArchiveReplayState { + cas_root, + manifest_root, + }; + + loop { + let (stream, peer) = listener.accept().await.context("accept failed")?; + let state_clone = state.clone(); + tokio::spawn(async move { + if let Err(err) = handle_archive_http_connection(stream, state_clone).await { + tracing::debug!(peer = %peer, err = %err, "archive replay request failed"); + } + }); + } +} + async fn wt_publish(args: WtPublishArgs) -> Result<()> { let relay_url = Url::parse(&args.url).with_context(|| format!("invalid relay url: {}", args.url))?; diff --git a/deploy/cloudflare-worker/src/index.ts b/deploy/cloudflare-worker/src/index.ts index f996720..be176fc 100644 --- a/deploy/cloudflare-worker/src/index.ts +++ b/deploy/cloudflare-worker/src/index.ts @@ -106,6 +106,35 @@ export default { return stub.fetch(request); } + // Archive replay proxy. Forwards website requests to an archive replay origin + // (typically forge running `ec-node wt-archive-serve`). + if (url.pathname.startsWith("/api/archive/")) { + const origin = env.EC_ARCHIVE_ORIGIN?.trim(); + if (!origin) { + return jsonNoStore({ error: "archive origin not configured" }, { status: 503 }); + } + let base: URL; + try { + base = new URL(origin); + } catch { + return jsonNoStore({ error: "invalid archive origin" }, { status: 500 }); + } + const target = new URL(base.toString()); + const suffix = url.pathname.slice("/api/archive".length); + target.pathname = `${base.pathname.replace(/\/$/, "")}/archive${suffix}`; + target.search = url.search; + + const upstream = await fetch(new Request(target.toString(), request), { + cf: { cacheTtl: 0, cacheEverything: false }, + }); + const headers = new Headers(upstream.headers); + headers.set("cache-control", "no-store"); + return new Response(upstream.body, { + status: upstream.status, + headers, + }); + } + // Minimal bootstrap API: proxy /api/* to a single durable object instance ("global"). // This exists only to rendezvous WebRTC offers/answers and list "live" entries. if (url.pathname.startsWith("/api/")) { @@ -152,6 +181,7 @@ interface Env { EC_TURN_HOST?: string; EC_TURN_HMAC?: string; EC_STREAM_UPSERT_TOKEN?: string; + EC_ARCHIVE_ORIGIN?: string; } type DirectoryEntry = { diff --git a/deploy/cloudflare-worker/wrangler.toml b/deploy/cloudflare-worker/wrangler.toml index a539c4f..4074ce9 100644 --- a/deploy/cloudflare-worker/wrangler.toml +++ b/deploy/cloudflare-worker/wrangler.toml @@ -37,3 +37,6 @@ new_sqlite_classes = ["EcApiContainer"] [[migrations]] tag = "v4" new_sqlite_classes = ["StreamRelayDO"] + +[vars] +EC_ARCHIVE_ORIGIN = "https://archive.every.channel" diff --git a/evolution/proposals/ECP-0071-archive-replay-dvr.md b/evolution/proposals/ECP-0071-archive-replay-dvr.md new file mode 100644 index 0000000..872ad4b --- /dev/null +++ b/evolution/proposals/ECP-0071-archive-replay-dvr.md @@ -0,0 +1,32 @@ +# ECP-0071: Archive Replay DVR Endpoints + +## Context + +ECP-0070 added relay archival (`wt-archive`) into CAS objects plus JSONL indexes, but there is no read path for viewers to scrub historical content. + +## Decision + +Add an archive replay path with these pieces: + +- `ec-node wt-archive-serve`: lightweight HTTP server that reads archive JSONL indexes + CAS blobs and serves: + - `/archive//master.m3u8` + - `/archive//video.m3u8` + - `/archive//audio.m3u8` + - `/archive//init.mp4` + - `/archive//segment/.m4s` + - `/archive//timeline.json` +- HLS playlists are `EVENT` style and include all retained segments by default, so scrub depth is bounded by retained archive data (effectively "infinite" until eviction). +- Cloudflare Worker proxies `/api/archive/*` to a configurable archive origin (`EC_ARCHIVE_ORIGIN`) so `every.channel` can serve replay from the same site domain. +- Web watcher adds an `Archive DVR` mode that plays `/api/archive//master.m3u8` with native HLS or `hls.js` fallback. +- NixOS module adds `services.every-channel.ec-node.archive.serve.*` to run replay server alongside archival workers. + +## Rationale + +- Keeps archival and replay in one binary and one deploy surface. +- Preserves CAS as source of truth; playlists are derived views. +- Uses standard HLS+DVR semantics so browser playback + scrubbing works without custom protocol work in the short term. + +## Reversibility + +- Disable `archive.serve.enable` and remove worker proxy route to revert to archive-only mode. +- `wt-archive` storage format remains unchanged. diff --git a/nix/modules/ec-node.nix b/nix/modules/ec-node.nix index 0a98824..0771cf1 100644 --- a/nix/modules/ec-node.nix +++ b/nix/modules/ec-node.nix @@ -244,6 +244,20 @@ in default = false; description = "Danger: disable TLS verification for relay archive subscribers."; }; + + serve = { + enable = lib.mkOption { + type = lib.types.bool; + default = false; + description = "Run `ec-node wt-archive-serve` HTTP endpoints for archived replay/scrubbing."; + }; + + listen = lib.mkOption { + type = lib.types.str; + default = "0.0.0.0:7788"; + description = "Listen address passed to `ec-node wt-archive-serve --listen`."; + }; + }; }; broadcasts = lib.mkOption { @@ -771,6 +785,62 @@ in ]; }; + environment = cfg.environment; + }; + }) + // lib.optionalAttrs (cfg.archive.enable && cfg.archive.serve.enable) + (let + archiveServeUnit = "every-channel-wt-archive-serve"; + archiveServeRunner = pkgs.writeShellApplication { + name = archiveServeUnit; + runtimeInputs = [ + cfg.package + ]; + text = '' + set -euo pipefail + exec ${lib.escapeShellArg "${cfg.package}/bin/ec-node"} \ + wt-archive-serve \ + --output-dir ${lib.escapeShellArg cfg.archive.outputDir} \ + --manifest-dir ${lib.escapeShellArg cfg.archive.manifestDir} \ + --listen ${lib.escapeShellArg cfg.archive.serve.listen} + ''; + }; + in + { + "${archiveServeUnit}" = { + description = "every.channel archived replay HTTP server"; + wantedBy = [ "multi-user.target" ]; + after = [ "network-online.target" ]; + wants = [ "network-online.target" ]; + + unitConfig = { + StartLimitIntervalSec = 0; + }; + + serviceConfig = { + Type = "simple"; + ExecStart = "${archiveServeRunner}/bin/${archiveServeUnit}"; + Restart = "always"; + RestartSec = 2; + + NoNewPrivileges = true; + PrivateTmp = true; + ProtectSystem = "strict"; + ProtectHome = true; + ProtectKernelTunables = true; + ProtectKernelModules = true; + ProtectControlGroups = true; + LockPersonality = true; + MemoryDenyWriteExecute = true; + RestrictSUIDSGID = true; + RestrictRealtime = true; + SystemCallArchitectures = "native"; + ReadWritePaths = [ + cfg.archive.outputDir + cfg.archive.manifestDir + ]; + }; + environment = cfg.environment; }; });