archive replay: add HLS DVR serve path and web mode

This commit is contained in:
every.channel 2026-02-24 03:19:56 -08:00
parent 656ec11c73
commit b35de70789
No known key found for this signature in database
9 changed files with 904 additions and 26 deletions

View file

@ -38,6 +38,8 @@ use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::process::Command as TokioCommand;
use tokio_tungstenite::tungstenite::Message as WsMessage;
use url::Url;
@ -48,7 +50,8 @@ const DIRECT_WIRE_TAG_PING: u8 = 0x02;
const DIRECT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(8);
// Conservatively under typical SCTP data channel max message sizes.
const DIRECT_WIRE_CHUNK_BYTES: usize = 16 * 1024;
const WT_ARCHIVE_DEFAULT_TRACKS: &[&str] = &["catalog.json", "init.mp4", "video0.m4s", "audio0.m4s"];
const WT_ARCHIVE_DEFAULT_TRACKS: &[&str] =
&["catalog.json", "init.mp4", "video0.m4s", "audio0.m4s"];
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::RwLock;
@ -83,6 +86,8 @@ enum Commands {
WtPublish(WtPublishArgs),
/// Subscribe to a relay broadcast over WebTransport/MoQ and archive groups into CAS.
WtArchive(WtArchiveArgs),
/// Serve archived relay groups as DVR-style HLS playlists + object endpoints.
WtArchiveServe(WtArchiveServeArgs),
/// Announce stream transport availability over iroh gossip control topic.
ControlAnnounce(ControlAnnounceArgs),
/// Listen for stream transport announcements from iroh gossip control topic.
@ -483,6 +488,20 @@ struct WtArchiveArgs {
tls_disable_verify: bool,
}
#[derive(Parser, Debug)]
struct WtArchiveServeArgs {
/// Output directory used by `wt-archive`.
#[arg(long, default_value = "./tmp/wt-archive")]
output_dir: PathBuf,
/// Optional manifest/index directory.
/// Defaults to `<output-dir>/manifests` when omitted.
#[arg(long)]
manifest_dir: Option<PathBuf>,
/// TCP listen address for HTTP replay endpoints.
#[arg(long, default_value = "0.0.0.0:7788")]
listen: String,
}
#[derive(Parser, Debug)]
struct ControlAnnounceArgs {
/// Stable stream id to announce.
@ -694,6 +713,7 @@ fn main() -> Result<()> {
Commands::WsSubscribe(args) => run_async(ws_subscribe(args))?,
Commands::WtPublish(args) => run_async(wt_publish(args))?,
Commands::WtArchive(args) => run_async(wt_archive(args))?,
Commands::WtArchiveServe(args) => run_async(wt_archive_serve(args))?,
Commands::ControlAnnounce(args) => run_async(control_announce(args))?,
Commands::ControlListen(args) => run_async(control_listen(args))?,
Commands::ControlResolve(args) => run_async(control_resolve(args))?,
@ -4874,7 +4894,7 @@ fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> {
))
}
#[derive(Debug, serde::Serialize)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct ArchiveIndexRecord {
received_unix_ms: u64,
relay_url: String,
@ -5000,7 +5020,8 @@ async fn archive_track_loop(
"track {} read failed for broadcast {}",
track_name, broadcast_name
)
})? else {
})?
else {
return Err(anyhow!(
"track {} ended for broadcast {}",
track_name,
@ -5097,8 +5118,10 @@ async fn wt_archive(args: WtArchiveArgs) -> Result<()> {
}
impl web_transport_trait::Session for ProtocolOverride {
type SendStream = <web_transport_quinn::Session as web_transport_trait::Session>::SendStream;
type RecvStream = <web_transport_quinn::Session as web_transport_trait::Session>::RecvStream;
type SendStream =
<web_transport_quinn::Session as web_transport_trait::Session>::SendStream;
type RecvStream =
<web_transport_quinn::Session as web_transport_trait::Session>::RecvStream;
type Error = <web_transport_quinn::Session as web_transport_trait::Session>::Error;
fn accept_bi(
@ -5391,6 +5414,544 @@ async fn wt_archive(args: WtArchiveArgs) -> Result<()> {
Ok(())
}
#[derive(Debug, Clone)]
struct ArchiveReplayState {
cas_root: PathBuf,
manifest_root: PathBuf,
}
#[derive(Debug, serde::Serialize)]
struct ArchiveTimelineResponse {
broadcast_name: String,
start_unix_ms: Option<u64>,
end_unix_ms: Option<u64>,
video_segments: usize,
audio_segments: usize,
}
#[derive(Debug, Clone)]
struct ArchiveHlsSegment {
sequence: u64,
duration_secs: f64,
hash: String,
}
#[derive(Debug)]
struct ArchiveHttpResponse {
status: u16,
content_type: String,
body: Vec<u8>,
}
fn read_archive_records(
manifest_root: &Path,
broadcast_name: &str,
track_name: &str,
) -> Result<Vec<ArchiveIndexRecord>> {
let broadcast_dir = sanitize_path_component(broadcast_name);
let track_file = sanitize_path_component(track_name);
let path = manifest_root
.join(broadcast_dir)
.join(format!("{track_file}.jsonl"));
if !path.exists() {
return Ok(Vec::new());
}
let mut out = Vec::new();
let data = fs::read_to_string(&path)
.with_context(|| format!("failed to read archive index {}", path.display()))?;
for line in data.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
match serde_json::from_str::<ArchiveIndexRecord>(line) {
Ok(record) => out.push(record),
Err(err) => {
tracing::warn!(
path = %path.display(),
err = %err,
"failed to parse archive index line"
);
}
}
}
Ok(out)
}
fn dedupe_by_group_sequence(mut records: Vec<ArchiveIndexRecord>) -> Vec<ArchiveIndexRecord> {
records.sort_by_key(|record| record.group_sequence);
let mut out: Vec<ArchiveIndexRecord> = Vec::with_capacity(records.len());
for record in records {
if let Some(last) = out.last_mut() {
if last.group_sequence == record.group_sequence {
*last = record;
continue;
}
}
out.push(record);
}
out
}
fn default_segment_duration_ms(records: &[ArchiveIndexRecord]) -> u64 {
if records.len() < 2 {
return 1000;
}
let mut deltas = Vec::with_capacity(records.len().saturating_sub(1));
for pair in records.windows(2) {
let current = pair[0].received_unix_ms;
let next = pair[1].received_unix_ms;
if next > current {
deltas.push(next - current);
}
}
if deltas.is_empty() {
return 1000;
}
deltas.sort_unstable();
deltas[deltas.len() / 2].clamp(200, 10000)
}
fn build_hls_segments(records: &[ArchiveIndexRecord]) -> Vec<ArchiveHlsSegment> {
if records.is_empty() {
return Vec::new();
}
let fallback_ms = default_segment_duration_ms(records);
let mut out = Vec::with_capacity(records.len());
for (idx, record) in records.iter().enumerate() {
let mut dur_ms = fallback_ms;
if let Some(next) = records.get(idx + 1) {
if next.received_unix_ms > record.received_unix_ms {
dur_ms = (next.received_unix_ms - record.received_unix_ms).clamp(200, 10000);
}
}
out.push(ArchiveHlsSegment {
sequence: record.group_sequence,
duration_secs: (dur_ms as f64) / 1000.0,
hash: record.blake3.clone(),
});
}
out
}
fn latest_init_hash(manifest_root: &Path, broadcast_name: &str) -> Result<Option<String>> {
let mut init_records = dedupe_by_group_sequence(read_archive_records(
manifest_root,
broadcast_name,
"init.mp4",
)?);
Ok(init_records.pop().map(|record| record.blake3))
}
fn parse_limit(url: &Url) -> Option<usize> {
url.query_pairs()
.find(|(k, _)| k == "limit")
.and_then(|(_, v)| v.parse::<usize>().ok())
}
fn parse_from_ms(url: &Url) -> Option<u64> {
url.query_pairs()
.find(|(k, _)| k == "from_ms")
.and_then(|(_, v)| v.parse::<u64>().ok())
}
fn parse_archive_track(
manifest_root: &Path,
broadcast_name: &str,
track_name: &str,
from_ms: Option<u64>,
limit: Option<usize>,
) -> Result<Vec<ArchiveIndexRecord>> {
let mut records = dedupe_by_group_sequence(read_archive_records(
manifest_root,
broadcast_name,
track_name,
)?);
if let Some(start_ms) = from_ms {
records.retain(|record| record.received_unix_ms >= start_ms);
}
if let Some(max_items) = limit {
if max_items > 0 && records.len() > max_items {
records = records.split_off(records.len() - max_items);
}
}
Ok(records)
}
fn hls_playlist_for_track(
broadcast_name: &str,
track_name: &str,
init_hash: &str,
records: &[ArchiveIndexRecord],
) -> String {
let segments = build_hls_segments(records);
let target_duration = segments
.iter()
.map(|segment| segment.duration_secs.ceil() as u64)
.max()
.unwrap_or(2)
.max(1);
let media_sequence = segments
.first()
.map(|segment| segment.sequence)
.unwrap_or(0);
let mut out = String::new();
out.push_str("#EXTM3U\n");
out.push_str("#EXT-X-VERSION:7\n");
out.push_str("#EXT-X-PLAYLIST-TYPE:EVENT\n");
out.push_str("#EXT-X-INDEPENDENT-SEGMENTS\n");
out.push_str(&format!("#EXT-X-TARGETDURATION:{target_duration}\n"));
out.push_str(&format!("#EXT-X-MEDIA-SEQUENCE:{media_sequence}\n"));
out.push_str(&format!(
"#EXT-X-MAP:URI=\"/archive/{}/init.mp4?hash={}\"\n",
urlencoding::encode(broadcast_name),
init_hash
));
for segment in segments {
out.push_str(&format!("#EXTINF:{:.3},\n", segment.duration_secs));
out.push_str(&format!(
"/archive/{}/segment/{}.m4s?track={}\n",
urlencoding::encode(broadcast_name),
segment.hash,
urlencoding::encode(track_name)
));
}
out
}
fn validate_blake3_hex(hash: &str) -> bool {
hash.len() == 64 && hash.bytes().all(|b| b.is_ascii_hexdigit())
}
fn cas_path_for_hash(cas_root: &Path, hash: &str) -> Result<PathBuf> {
if !validate_blake3_hex(hash) {
return Err(anyhow!("invalid hash"));
}
let shard = &hash[0..2];
Ok(cas_root.join(shard).join(format!("{hash}.bin")))
}
fn archive_response(status: u16, content_type: &str, body: Vec<u8>) -> ArchiveHttpResponse {
ArchiveHttpResponse {
status,
content_type: content_type.to_string(),
body,
}
}
fn archive_error(status: u16, message: &str) -> ArchiveHttpResponse {
archive_response(
status,
"application/json; charset=utf-8",
serde_json::json!({ "error": message })
.to_string()
.into_bytes(),
)
}
fn archive_status_text(status: u16) -> &'static str {
match status {
200 => "OK",
204 => "No Content",
400 => "Bad Request",
404 => "Not Found",
405 => "Method Not Allowed",
500 => "Internal Server Error",
502 => "Bad Gateway",
_ => "OK",
}
}
fn handle_archive_http_request(
state: &ArchiveReplayState,
method: &str,
target: &str,
) -> ArchiveHttpResponse {
if method == "OPTIONS" {
return archive_response(204, "text/plain; charset=utf-8", Vec::new());
}
if method != "GET" && method != "HEAD" {
return archive_error(405, "method not allowed");
}
let req_url = match Url::parse(&format!("http://localhost{target}")) {
Ok(url) => url,
Err(_) => return archive_error(400, "invalid url"),
};
let path = req_url.path();
if path == "/archive/healthz" {
return archive_response(
200,
"application/json; charset=utf-8",
serde_json::json!({ "ok": true }).to_string().into_bytes(),
);
}
let parts = path.trim_start_matches('/').split('/').collect::<Vec<_>>();
if parts.len() < 3 || parts[0] != "archive" {
return archive_error(404, "not found");
}
let broadcast_name = match urlencoding::decode(parts[1]) {
Ok(v) => v.to_string(),
Err(_) => return archive_error(400, "invalid broadcast"),
};
if broadcast_name.is_empty() {
return archive_error(400, "missing broadcast");
}
let from_ms = parse_from_ms(&req_url);
let limit = parse_limit(&req_url);
match parts[2] {
"timeline.json" if parts.len() == 3 => {
let video = match parse_archive_track(
&state.manifest_root,
&broadcast_name,
"video0.m4s",
from_ms,
limit,
) {
Ok(records) => records,
Err(err) => return archive_error(500, &format!("{err:#}")),
};
let audio = match parse_archive_track(
&state.manifest_root,
&broadcast_name,
"audio0.m4s",
from_ms,
limit,
) {
Ok(records) => records,
Err(err) => return archive_error(500, &format!("{err:#}")),
};
let min_ms = video
.first()
.map(|record| record.received_unix_ms)
.into_iter()
.chain(audio.first().map(|record| record.received_unix_ms))
.min();
let max_ms = video
.last()
.map(|record| record.received_unix_ms)
.into_iter()
.chain(audio.last().map(|record| record.received_unix_ms))
.max();
let body = ArchiveTimelineResponse {
broadcast_name,
start_unix_ms: min_ms,
end_unix_ms: max_ms,
video_segments: video.len(),
audio_segments: audio.len(),
};
match serde_json::to_vec(&body) {
Ok(buf) => archive_response(200, "application/json; charset=utf-8", buf),
Err(err) => archive_error(500, &format!("{err:#}")),
}
}
"master.m3u8" if parts.len() == 3 => {
let encoded = urlencoding::encode(&broadcast_name);
let playlist = format!(
"#EXTM3U\n#EXT-X-VERSION:7\n#EXT-X-MEDIA:TYPE=AUDIO,GROUP-ID=\"audio\",NAME=\"main\",DEFAULT=YES,AUTOSELECT=YES,URI=\"/archive/{encoded}/audio.m3u8\"\n#EXT-X-STREAM-INF:BANDWIDTH=3500000,CODECS=\"avc1.640028,mp4a.40.2\",AUDIO=\"audio\"\n/archive/{encoded}/video.m3u8\n"
);
archive_response(
200,
"application/vnd.apple.mpegurl; charset=utf-8",
playlist.into_bytes(),
)
}
"video.m3u8" if parts.len() == 3 => {
let init_hash = match latest_init_hash(&state.manifest_root, &broadcast_name) {
Ok(Some(hash)) => hash,
Ok(None) => return archive_error(404, "missing init segment"),
Err(err) => return archive_error(500, &format!("{err:#}")),
};
let records = match parse_archive_track(
&state.manifest_root,
&broadcast_name,
"video0.m4s",
from_ms,
limit,
) {
Ok(records) => records,
Err(err) => return archive_error(500, &format!("{err:#}")),
};
if records.is_empty() {
return archive_error(404, "video archive is empty");
}
let playlist =
hls_playlist_for_track(&broadcast_name, "video0.m4s", &init_hash, &records);
archive_response(
200,
"application/vnd.apple.mpegurl; charset=utf-8",
playlist.into_bytes(),
)
}
"audio.m3u8" if parts.len() == 3 => {
let init_hash = match latest_init_hash(&state.manifest_root, &broadcast_name) {
Ok(Some(hash)) => hash,
Ok(None) => return archive_error(404, "missing init segment"),
Err(err) => return archive_error(500, &format!("{err:#}")),
};
let records = match parse_archive_track(
&state.manifest_root,
&broadcast_name,
"audio0.m4s",
from_ms,
limit,
) {
Ok(records) => records,
Err(err) => return archive_error(500, &format!("{err:#}")),
};
if records.is_empty() {
return archive_error(404, "audio archive is empty");
}
let playlist =
hls_playlist_for_track(&broadcast_name, "audio0.m4s", &init_hash, &records);
archive_response(
200,
"application/vnd.apple.mpegurl; charset=utf-8",
playlist.into_bytes(),
)
}
"init.mp4" if parts.len() == 3 => {
let hash = req_url
.query_pairs()
.find(|(k, _)| k == "hash")
.map(|(_, v)| v.to_string())
.or_else(|| {
latest_init_hash(&state.manifest_root, &broadcast_name)
.ok()
.flatten()
});
let Some(hash) = hash else {
return archive_error(404, "missing init segment");
};
let path = match cas_path_for_hash(&state.cas_root, &hash) {
Ok(path) => path,
Err(_) => return archive_error(400, "invalid hash"),
};
match fs::read(path) {
Ok(bytes) => archive_response(200, "video/mp4", bytes),
Err(_) => archive_error(404, "init segment not found"),
}
}
"segment" if parts.len() == 4 => {
let hash_part = parts[3].strip_suffix(".m4s").unwrap_or(parts[3]);
let path = match cas_path_for_hash(&state.cas_root, hash_part) {
Ok(path) => path,
Err(_) => return archive_error(400, "invalid hash"),
};
match fs::read(path) {
Ok(bytes) => archive_response(200, "video/mp4", bytes),
Err(_) => archive_error(404, "segment not found"),
}
}
_ => archive_error(404, "not found"),
}
}
async fn handle_archive_http_connection(
mut stream: tokio::net::TcpStream,
state: ArchiveReplayState,
) -> Result<()> {
let mut buf = Vec::with_capacity(4096);
let mut tmp = [0u8; 2048];
loop {
let read = stream
.read(&mut tmp)
.await
.context("failed to read replay request")?;
if read == 0 {
return Ok(());
}
buf.extend_from_slice(&tmp[..read]);
if buf.windows(4).any(|w| w == b"\r\n\r\n") {
break;
}
if buf.len() > 16 * 1024 {
let response = archive_error(400, "request headers too large");
let head = format!(
"HTTP/1.1 {} {}\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\nAccess-Control-Allow-Origin: *\r\nAccess-Control-Allow-Methods: GET,HEAD,OPTIONS\r\nAccess-Control-Allow-Headers: *\r\nCache-Control: no-store\r\n\r\n",
response.status,
archive_status_text(response.status),
response.content_type,
response.body.len(),
);
stream.write_all(head.as_bytes()).await?;
return Ok(());
}
}
let req = String::from_utf8_lossy(&buf);
let mut lines = req.lines();
let first = lines.next().unwrap_or_default();
let mut parts = first.split_whitespace();
let method = parts.next().unwrap_or("");
let target = parts.next().unwrap_or("/");
let response = handle_archive_http_request(&state, method, target);
let is_head = method == "HEAD";
let body_len = if is_head { 0 } else { response.body.len() };
let head = format!(
"HTTP/1.1 {} {}\r\nContent-Type: {}\r\nContent-Length: {}\r\nConnection: close\r\nAccess-Control-Allow-Origin: *\r\nAccess-Control-Allow-Methods: GET,HEAD,OPTIONS\r\nAccess-Control-Allow-Headers: *\r\nCache-Control: no-store\r\n\r\n",
response.status,
archive_status_text(response.status),
response.content_type,
body_len
);
stream
.write_all(head.as_bytes())
.await
.context("failed to write replay response headers")?;
if !is_head {
stream
.write_all(&response.body)
.await
.context("failed to write replay response body")?;
}
Ok(())
}
async fn wt_archive_serve(args: WtArchiveServeArgs) -> Result<()> {
let manifest_root = args
.manifest_dir
.unwrap_or_else(|| args.output_dir.join("manifests"));
let cas_root = args.output_dir.join("objects").join("blake3");
fs::create_dir_all(&manifest_root)
.with_context(|| format!("failed to create manifest dir {}", manifest_root.display()))?;
fs::create_dir_all(&cas_root)
.with_context(|| format!("failed to create CAS dir {}", cas_root.display()))?;
let listener = TcpListener::bind(&args.listen)
.await
.with_context(|| format!("failed to bind archive replay listener {}", args.listen))?;
let local = listener
.local_addr()
.context("failed to read listen addr")?;
tracing::info!(
listen = %local,
manifest_root = %manifest_root.display(),
cas_root = %cas_root.display(),
"archive replay server listening"
);
let state = ArchiveReplayState {
cas_root,
manifest_root,
};
loop {
let (stream, peer) = listener.accept().await.context("accept failed")?;
let state_clone = state.clone();
tokio::spawn(async move {
if let Err(err) = handle_archive_http_connection(stream, state_clone).await {
tracing::debug!(peer = %peer, err = %err, "archive replay request failed");
}
});
}
}
async fn wt_publish(args: WtPublishArgs) -> Result<()> {
let relay_url =
Url::parse(&args.url).with_context(|| format!("invalid relay url: {}", args.url))?;