Harden LA publishers and add multi-relay guide
Some checks are pending
ci-gates / checks (push) Waiting to run
deploy-cloudflare / checks (push) Waiting to run
deploy-cloudflare / deploy (push) Blocked by required conditions

This commit is contained in:
Conrad Kramer 2026-06-10 01:28:15 -07:00
parent 5d6f77f868
commit cfc4902016
No known key found for this signature in database
13 changed files with 1430 additions and 402 deletions

View file

@ -38,6 +38,31 @@ struct DirectoryList {
entries: Vec<DirectoryEntry>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct PublicStreamRelay {
relay_url: String,
broadcast_name: String,
track_name: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct PublicStreamEntry {
stream_id: String,
title: String,
relay_url: String,
broadcast_name: String,
track_name: String,
relays: Vec<PublicStreamRelay>,
updated_ms: u64,
expires_ms: u64,
}
#[derive(Clone, Debug, Serialize)]
struct PublicStreamList {
now_ms: u64,
entries: Vec<PublicStreamEntry>,
}
#[derive(Clone, Debug, Serialize)]
struct HealthResp {
ok: bool,
@ -69,10 +94,29 @@ struct AnswerGetReq {
stream_id: String,
}
#[derive(Clone, Debug, Deserialize)]
struct StreamUpsertReq {
stream_id: String,
title: String,
relay_url: Option<String>,
broadcast_name: Option<String>,
track_name: Option<String>,
relays: Option<Vec<PublicStreamRelay>>,
expires_ms: Option<u64>,
}
#[derive(Clone, Debug, Serialize)]
struct StreamUpsertResp {
ok: bool,
ttl_ms: u64,
entry: PublicStreamEntry,
}
#[derive(Default)]
struct State {
entries: HashMap<String, DirectoryEntry>,
answers: HashMap<String, AnswerEntry>,
streams: HashMap<String, PublicStreamEntry>,
}
fn now_ms() -> u64 {
@ -100,6 +144,7 @@ fn json_headers() -> HeaderMap {
fn prune_state(state: &mut State, now: u64) {
state.entries.retain(|_, v| v.expires_ms > now);
state.answers.retain(|_, v| v.expires_ms > now);
state.streams.retain(|_, v| v.expires_ms > now);
// Cap growth defensively. This is not spam-resistant; it's a bootstrap rendezvous.
if state.entries.len() > 200 {
@ -114,6 +159,12 @@ fn prune_state(state: &mut State, now: u64) {
items.truncate(500);
state.answers = items.into_iter().map(|e| (e.stream_id.clone(), e)).collect();
}
if state.streams.len() > 1000 {
let mut items = state.streams.values().cloned().collect::<Vec<_>>();
items.sort_by_key(|e| std::cmp::Reverse(e.updated_ms));
items.truncate(1000);
state.streams = items.into_iter().map(|e| (e.stream_id.clone(), e)).collect();
}
}
async fn health() -> impl IntoResponse {
@ -129,6 +180,118 @@ async fn directory(state: axum::extract::State<Arc<RwLock<State>>>) -> impl Into
(json_headers(), Json(DirectoryList { now_ms: now, entries }))
}
fn push_stream_relay(relays: &mut Vec<PublicStreamRelay>, relay: PublicStreamRelay) {
if relay.relay_url.is_empty() || relay.broadcast_name.is_empty() {
return;
}
if relays.iter().any(|existing| {
existing.relay_url == relay.relay_url
&& existing.broadcast_name == relay.broadcast_name
&& existing.track_name == relay.track_name
}) {
return;
}
if relays.len() < 16 {
relays.push(relay);
}
}
fn normalize_stream_relays(body: &StreamUpsertReq) -> Vec<PublicStreamRelay> {
let mut relays = Vec::new();
if let (Some(relay_url), Some(broadcast_name)) = (&body.relay_url, &body.broadcast_name) {
push_stream_relay(
&mut relays,
PublicStreamRelay {
relay_url: clamp_str(relay_url.clone(), 512),
broadcast_name: clamp_str(broadcast_name.clone(), 256),
track_name: clamp_str(
body.track_name
.clone()
.unwrap_or_else(|| "video0.m4s".to_string()),
256,
),
},
);
}
if let Some(body_relays) = &body.relays {
for relay in body_relays {
push_stream_relay(
&mut relays,
PublicStreamRelay {
relay_url: clamp_str(relay.relay_url.clone(), 512),
broadcast_name: clamp_str(relay.broadcast_name.clone(), 256),
track_name: clamp_str(
if relay.track_name.is_empty() {
"video0.m4s".to_string()
} else {
relay.track_name.clone()
},
256,
),
},
);
}
}
relays
}
async fn public_streams(state: axum::extract::State<Arc<RwLock<State>>>) -> impl IntoResponse {
let now = now_ms();
let mut guard = state.write().await;
prune_state(&mut guard, now);
let mut entries = guard.streams.values().cloned().collect::<Vec<_>>();
entries.sort_by_key(|e| std::cmp::Reverse(e.updated_ms));
(json_headers(), Json(PublicStreamList { now_ms: now, entries }))
}
async fn stream_upsert(
state: axum::extract::State<Arc<RwLock<State>>>,
Json(body): Json<StreamUpsertReq>,
) -> impl IntoResponse {
let now = now_ms();
let relays = normalize_stream_relays(&body);
if body.stream_id.is_empty()
|| body.title.is_empty()
|| body.relay_url.as_deref().unwrap_or_default().is_empty()
|| body.broadcast_name.as_deref().unwrap_or_default().is_empty()
{
let resp =
serde_json::json!({ "error": "missing stream_id/title/relay_url/broadcast_name" });
return (StatusCode::BAD_REQUEST, json_headers(), Json(resp)).into_response();
}
let requested_expires = body.expires_ms.unwrap_or(now + 20_000);
let requested_ttl = requested_expires.saturating_sub(now);
let ttl_ms = requested_ttl.clamp(5_000, 60_000);
let primary = relays[0].clone();
let entry = PublicStreamEntry {
stream_id: clamp_str(body.stream_id, 256),
title: clamp_str(body.title, 128),
relay_url: primary.relay_url,
broadcast_name: primary.broadcast_name,
track_name: primary.track_name,
relays,
updated_ms: now,
expires_ms: now + ttl_ms,
};
let mut guard = state.write().await;
prune_state(&mut guard, now);
guard.streams.insert(entry.stream_id.clone(), entry.clone());
(
json_headers(),
Json(StreamUpsertResp {
ok: true,
ttl_ms,
entry,
}),
)
.into_response()
}
async fn announce(
state: axum::extract::State<Arc<RwLock<State>>>,
Json(body): Json<AnnounceReq>,
@ -233,6 +396,8 @@ async fn main() -> anyhow::Result<()> {
let app = Router::new()
.route("/api/health", get(health))
.route("/api/directory", get(directory))
.route("/api/public-streams", get(public_streams))
.route("/api/stream-upsert", post(stream_upsert))
.route("/api/announce", post(announce))
.route("/api/answer", post(post_answer).get(get_answer))
.with_state(state)

View file

@ -212,10 +212,17 @@ type PublicStreamEntry = {
relay_url: string;
broadcast_name: string;
track_name: string;
relays: PublicStreamRelay[];
updated_ms: number;
expires_ms: number;
};
type PublicStreamRelay = {
relay_url: string;
broadcast_name: string;
track_name: string;
};
type PublicStreamList = {
now_ms: number;
entries: PublicStreamEntry[];
@ -323,9 +330,41 @@ type StreamUpsertReq = {
relay_url: string;
broadcast_name: string;
track_name?: string;
relays?: PublicStreamRelay[];
expires_ms?: number;
};
function publicStreamRelayKey(relay: PublicStreamRelay): string {
return `${relay.relay_url}\n${relay.broadcast_name}\n${relay.track_name}`;
}
function normalizePublicStreamRelays(body: StreamUpsertReq): PublicStreamRelay[] {
const relays: PublicStreamRelay[] = [];
const addRelay = (candidate?: Partial<PublicStreamRelay>) => {
if (!candidate?.relay_url || !candidate.broadcast_name) return;
const relay: PublicStreamRelay = {
relay_url: clampStr(candidate.relay_url, 512),
broadcast_name: clampStr(candidate.broadcast_name, 256),
track_name: clampStr(candidate.track_name || "video0.m4s", 256),
};
if (!relays.some((existing) => publicStreamRelayKey(existing) === publicStreamRelayKey(relay))) {
relays.push(relay);
}
};
addRelay({
relay_url: body.relay_url,
broadcast_name: body.broadcast_name,
track_name: body.track_name,
});
if (Array.isArray(body.relays)) {
for (const relay of body.relays) addRelay(relay);
}
return relays.slice(0, 16);
}
function authBearerToken(request: Request): string | null {
const auth = request.headers.get("authorization");
if (!auth) return null;
@ -392,17 +431,20 @@ export class EcApiContainer implements DurableObject {
{ status: 400 },
);
}
const relays = normalizePublicStreamRelays(body);
const requestedExpires = body.expires_ms ?? now + 20_000;
const requestedTtl = Math.max(0, requestedExpires - now);
const ttlMs = Math.min(60_000, Math.max(5_000, requestedTtl));
const primaryRelay = relays[0];
const entry: PublicStreamEntry = {
stream_id: clampStr(body.stream_id, 256),
title: clampStr(body.title, 128),
relay_url: clampStr(body.relay_url, 512),
broadcast_name: clampStr(body.broadcast_name, 256),
track_name: clampStr(body.track_name || "video0.m4s", 256),
relay_url: primaryRelay.relay_url,
broadcast_name: primaryRelay.broadcast_name,
track_name: primaryRelay.track_name,
relays,
updated_ms: now,
expires_ms: now + ttlMs,
};