diff --git a/README.md b/README.md index c0ed671..32ccf38 100644 --- a/README.md +++ b/README.md @@ -68,22 +68,32 @@ Control protocol (iroh gossip, relay + direct transport discovery): ```sh # Listener (on node A) -cargo run -p ec-node -- control-listen --gossip-peer +cargo run -p ec-node -- control-listen --gossip-peer # Announcer (on node B) cargo run -p ec-node -- control-announce \ --stream-id la-nbc \ --relay-url https://cdn.moq.dev/anon \ --relay-broadcast la-nbc \ - --gossip-peer + --gossip-peer # Resolver (consumer picks best announced path) cargo run -p ec-node -- control-resolve \ --stream-id la-nbc \ --prefer direct-first \ - --gossip-peer + --gossip-peer + +# Bridge iroh control announcements to every.channel public web list +EVERY_CHANNEL_WEB_UPSERT_TOKEN= \ +cargo run -p ec-node -- control-bridge-web \ + --directory-url https://every.channel \ + --gossip-peer ``` +`control-announce`, `control-listen`, `control-resolve`, and `control-bridge-web` print both +`control endpoint id` and `control endpoint addr` on startup. Use the `endpoint addr` JSON for +`--gossip-peer` when bootstrapping. + Coverage: ```sh diff --git a/apps/web/app.js b/apps/web/app.js index 4c916e0..230aa3a 100644 --- a/apps/web/app.js +++ b/apps/web/app.js @@ -9,6 +9,7 @@ const MOQ_WATCH_MODULE_URLS = [ "https://cdn.jsdelivr.net/npm/@moq/watch@0.1.1/element/+esm", "https://unpkg.com/@moq/watch@0.1.1/element.js?module", ]; +const PUBLIC_STREAMS_PATH = "/api/public-streams"; let moqWatchModulePromise = null; let disposePlayerSignals = null; @@ -52,6 +53,12 @@ function setShareLink(text) { el.textContent = text || ""; } +function setListHint(text, kind) { + const el = $("listHint"); + el.textContent = text || ""; + el.dataset.kind = kind || ""; +} + function clearPlayerSignals() { if (typeof disposePlayerSignals === "function") { disposePlayerSignals(); @@ -214,11 +221,59 @@ function hasWebTransport() { return typeof window.WebTransport !== "undefined"; } +function renderLiveList(entries, onWatch) { + const mount = $("liveList"); + mount.textContent = ""; + if (!entries.length) { + setListHint("No public streams announced yet.", ""); + return; + } + setListHint(`${entries.length} live`, "ok"); + + for (const entry of entries) { + const row = document.createElement("div"); + row.className = "liveItem"; + + const info = document.createElement("div"); + const title = document.createElement("div"); + title.className = "liveTitle"; + title.textContent = entry.title || entry.stream_id || entry.broadcast_name || "Live stream"; + info.appendChild(title); + + const meta = document.createElement("div"); + meta.className = "liveMeta"; + meta.textContent = `${entry.broadcast_name || ""} @ ${entry.relay_url || DEFAULT_RELAY_URL}`; + info.appendChild(meta); + + const btn = document.createElement("button"); + btn.className = "btn secondary"; + btn.textContent = "Watch"; + btn.addEventListener("click", () => { + onWatch(entry); + }); + + row.appendChild(info); + row.appendChild(btn); + mount.appendChild(row); + } +} + +async function fetchLiveList() { + const res = await fetch(PUBLIC_STREAMS_PATH, { cache: "no-store" }); + if (!res.ok) { + throw new Error(`HTTP ${res.status}`); + } + const body = await res.json(); + const entries = Array.isArray(body?.entries) ? body.entries : []; + return entries; +} + function main() { const relayInput = $("relayUrl"); const nameInput = $("broadcastName"); const watchBtn = $("watchBtn"); const copyBtn = $("copyLinkBtn"); + const refreshListBtn = $("refreshListBtn"); const initial = readParams(); relayInput.value = initial.relayUrl; @@ -257,7 +312,7 @@ function main() { await ensureMoqWatchElement(); } catch (e) { setHint( - `Failed to load MoQ web player dependency: ${String(e)}. Disable script blockers for esm.sh and retry.`, + `Failed to load MoQ web player dependency: ${String(e)}. Disable script blockers for esm.sh/jsdelivr/unpkg and retry.`, "warn", ); return; @@ -296,7 +351,30 @@ function main() { } }); + async function refreshLiveList() { + setListHint("Loading live streams...", ""); + try { + const entries = await fetchLiveList(); + renderLiveList(entries, (entry) => { + relayInput.value = normalizeRelayUrl(entry.relay_url || DEFAULT_RELAY_URL); + nameInput.value = normalizeName(entry.broadcast_name || ""); + updateSharePreview(); + void start(); + }); + } catch (e) { + $("liveList").textContent = ""; + setListHint(`Live list error: ${String(e)}`, "warn"); + } + } + refreshListBtn.addEventListener("click", () => { + void refreshLiveList(); + }); + updateSharePreview(); + void refreshLiveList(); + window.setInterval(() => { + void refreshLiveList(); + }, 15000); // Auto-start if a name was provided. if (initial.name) void start(); diff --git a/apps/web/index.html b/apps/web/index.html index 9a372da..c94b8ce 100644 --- a/apps/web/index.html +++ b/apps/web/index.html @@ -43,6 +43,15 @@ +
+
+
Live Now
+ +
+
+
+
+
diff --git a/apps/web/style.css b/apps/web/style.css index 8af748b..3f1bf14 100644 --- a/apps/web/style.css +++ b/apps/web/style.css @@ -93,6 +93,17 @@ body { margin-bottom: 12px; } +.panelHead { + display: flex; + align-items: center; + justify-content: space-between; + gap: 10px; +} + +.panelHead .panel-title { + margin-bottom: 0; +} + .row { display: grid; grid-template-columns: 1.15fr 1fr auto; @@ -178,6 +189,38 @@ body { padding: 0 4px; } +.liveList { + display: grid; + grid-template-columns: 1fr; + gap: 8px; + margin-top: 10px; +} + +.liveItem { + display: grid; + grid-template-columns: 1fr auto; + gap: 8px; + align-items: center; + padding: 9px 10px; + border-radius: 10px; + border: 1px solid var(--line); + background: rgba(0, 0, 0, 0.18); +} + +.liveTitle { + font-size: 14px; + font-weight: 600; +} + +.liveMeta { + margin-top: 2px; + color: var(--faint); + font-size: 12px; + overflow: hidden; + text-overflow: ellipsis; + white-space: nowrap; +} + .player { padding: 0; } @@ -269,4 +312,3 @@ body { flex-direction: column; } } - diff --git a/crates/ec-iroh/src/lib.rs b/crates/ec-iroh/src/lib.rs index 1c4947a..f9ed05c 100644 --- a/crates/ec-iroh/src/lib.rs +++ b/crates/ec-iroh/src/lib.rs @@ -293,10 +293,14 @@ impl CatalogGossip { .map(|addr| addr.id) .collect::>(); - let (sender, receiver) = gossip - .subscribe_and_join(catalog_topic(), peer_ids) - .await? - .split(); + // Allow local-only startup with no explicit bootstrap peers. In that mode we still + // subscribe to the topic immediately and can add peers later. + let topic = if peer_ids.is_empty() { + gossip.subscribe(catalog_topic(), Vec::new()).await? + } else { + gossip.subscribe_and_join(catalog_topic(), peer_ids).await? + }; + let (sender, receiver) = topic.split(); Ok(Self { sender, @@ -365,10 +369,14 @@ impl ControlGossip { .map(|addr| addr.id) .collect::>(); - let (sender, receiver) = gossip - .subscribe_and_join(control_topic(), peer_ids) - .await? - .split(); + // Allow local-only startup with no explicit bootstrap peers. In that mode we still + // subscribe to the topic immediately and can add peers later. + let topic = if peer_ids.is_empty() { + gossip.subscribe(control_topic(), Vec::new()).await? + } else { + gossip.subscribe_and_join(control_topic(), peer_ids).await? + }; + let (sender, receiver) = topic.split(); Ok(Self { sender, diff --git a/crates/ec-node/src/main.rs b/crates/ec-node/src/main.rs index 7f39195..2e127ee 100644 --- a/crates/ec-node/src/main.rs +++ b/crates/ec-node/src/main.rs @@ -86,6 +86,8 @@ enum Commands { ControlListen(ControlListenArgs), /// Resolve a stream id to the best currently-announced transport. ControlResolve(ControlResolveArgs), + /// Bridge iroh control announcements into the website public stream directory. + ControlBridgeWeb(ControlBridgeWebArgs), } #[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] @@ -550,6 +552,39 @@ struct ControlResolveArgs { gossip_peer: Vec, } +#[derive(Parser, Debug)] +struct ControlBridgeWebArgs { + /// Directory base URL (e.g. https://every.channel). + #[arg(long, default_value = "https://every.channel")] + directory_url: String, + /// Optional bearer token for `/api/stream-upsert`. + /// Falls back to `EVERY_CHANNEL_WEB_UPSERT_TOKEN` when omitted. + #[arg(long)] + auth_token: Option, + /// Maximum wall-clock wait for control announcements before exit (ms). + /// Set to 0 to run forever. + #[arg(long, default_value_t = 0)] + timeout_ms: u64, + /// Maximum accepted staleness from updated_unix_ms (ms). + #[arg(long, default_value_t = 30000)] + max_age_ms: u64, + /// Optional stream id prefix filter. + #[arg(long)] + stream_prefix: Option, + /// Exit after the first successful upsert. + #[arg(long, default_value_t = false)] + once: bool, + /// Optional iroh secret key (hex) for control gossip endpoint identity. + #[arg(long)] + iroh_secret: Option, + /// Discovery modes to enable (comma-separated: dht, mdns, dns). + #[arg(long)] + discovery: Option, + /// Gossip peers to connect to (repeatable). + #[arg(long)] + gossip_peer: Vec, +} + #[derive(Subcommand, Debug)] enum IngestSource { /// Ingest from an HDHomeRun device. @@ -630,6 +665,7 @@ fn main() -> Result<()> { Commands::ControlAnnounce(args) => run_async(control_announce(args))?, Commands::ControlListen(args) => run_async(control_listen(args))?, Commands::ControlResolve(args) => run_async(control_resolve(args))?, + Commands::ControlBridgeWeb(args) => run_async(control_bridge_web(args))?, } Ok(()) @@ -2102,15 +2138,10 @@ async fn moq_publish(args: MoqPublishArgs) -> Result<()> { let mut manifest_sequence: u64 = 0; let announce_tx = if args.announce { + let gossip_peers = parse_gossip_peers(args.gossip_peer.clone()); Some( - spawn_catalog_announcer( - &node, - &track, - &broadcast_name, - &track_name, - args.gossip_peer.clone(), - ) - .await?, + spawn_catalog_announcer(&node, &track, &broadcast_name, &track_name, gossip_peers) + .await?, ) } else { None @@ -4244,6 +4275,21 @@ fn parse_discovery(value: Option<&str>) -> Result { } } +fn parse_gossip_peers(mut peers: Vec) -> Vec { + if let Ok(env_peers) = std::env::var("EVERY_CHANNEL_GOSSIP_PEERS") { + for raw in env_peers.split(|c: char| c == ',' || c == ';' || c.is_whitespace()) { + let peer = raw.trim(); + if peer.is_empty() { + continue; + } + if !peers.iter().any(|existing| existing == peer) { + peers.push(peer.to_string()); + } + } + } + peers +} + async fn spawn_catalog_announcer( node: &MoqNode, track: &TrackName, @@ -4407,6 +4453,9 @@ async fn control_announce(args: ControlAnnounceArgs) -> Result<()> { let secret = parse_iroh_secret(args.iroh_secret)?; let discovery = parse_discovery(args.discovery.as_deref())?; let endpoint = ec_iroh::build_endpoint(secret, discovery).await?; + let gossip_peers = parse_gossip_peers(args.gossip_peer.clone()); + let endpoint_addr = + serde_json::to_string(&endpoint.addr()).unwrap_or_else(|_| endpoint.id().to_string()); let mut transports = Vec::new(); @@ -4444,13 +4493,14 @@ async fn control_announce(args: ControlAnnounceArgs) -> Result<()> { let stop_tx = spawn_control_announcer_task( endpoint.clone(), - args.gossip_peer.clone(), + gossip_peers, announcement, Duration::from_millis(args.interval_ms.max(1000)), ) .await?; eprintln!("control endpoint id: {}", endpoint.id()); + eprintln!("control endpoint addr: {}", endpoint_addr); eprintln!("control stream_id: {}", args.stream_id); eprintln!( "control announce interval_ms: {}", @@ -4468,10 +4518,15 @@ async fn control_listen(args: ControlListenArgs) -> Result<()> { let secret = parse_iroh_secret(args.iroh_secret)?; let discovery = parse_discovery(args.discovery.as_deref())?; let endpoint = ec_iroh::build_endpoint(secret, discovery).await?; + let gossip_peers = parse_gossip_peers(args.gossip_peer.clone()); eprintln!("control endpoint id: {}", endpoint.id()); + eprintln!( + "control endpoint addr: {}", + serde_json::to_string(&endpoint.addr()).unwrap_or_else(|_| endpoint.id().to_string()) + ); let mut gossip = tokio::time::timeout( Duration::from_secs(30), - ec_iroh::ControlGossip::join(endpoint.clone(), &args.gossip_peer), + ec_iroh::ControlGossip::join(endpoint.clone(), &gossip_peers), ) .await .context("timed out joining control gossip topic")??; @@ -4534,11 +4589,16 @@ async fn control_resolve(args: ControlResolveArgs) -> Result<()> { let secret = parse_iroh_secret(args.iroh_secret)?; let discovery = parse_discovery(args.discovery.as_deref())?; let endpoint = ec_iroh::build_endpoint(secret, discovery).await?; + let gossip_peers = parse_gossip_peers(args.gossip_peer.clone()); eprintln!("control endpoint id: {}", endpoint.id()); + eprintln!( + "control endpoint addr: {}", + serde_json::to_string(&endpoint.addr()).unwrap_or_else(|_| endpoint.id().to_string()) + ); let mut gossip = tokio::time::timeout( Duration::from_secs(30), - ec_iroh::ControlGossip::join(endpoint.clone(), &args.gossip_peer), + ec_iroh::ControlGossip::join(endpoint.clone(), &gossip_peers), ) .await .context("timed out joining control gossip topic")??; @@ -4601,6 +4661,155 @@ async fn control_resolve(args: ControlResolveArgs) -> Result<()> { )) } +fn select_relay_transport_for_web( + transports: &[StreamTransportDescriptor], +) -> Option<(String, String, String)> { + for transport in transports { + if let StreamTransportDescriptor::RelayMoq { + url, + broadcast_name, + track_name, + } = transport + { + return Some((url.clone(), broadcast_name.clone(), track_name.clone())); + } + } + None +} + +#[derive(Debug, serde::Serialize)] +struct WebStreamUpsertReq<'a> { + stream_id: &'a str, + title: &'a str, + relay_url: &'a str, + broadcast_name: &'a str, + track_name: &'a str, + expires_ms: u64, +} + +async fn control_bridge_web(args: ControlBridgeWebArgs) -> Result<()> { + let secret = parse_iroh_secret(args.iroh_secret)?; + let discovery = parse_discovery(args.discovery.as_deref())?; + let endpoint = ec_iroh::build_endpoint(secret, discovery).await?; + let gossip_peers = parse_gossip_peers(args.gossip_peer.clone()); + eprintln!("control endpoint id: {}", endpoint.id()); + eprintln!( + "control endpoint addr: {}", + serde_json::to_string(&endpoint.addr()).unwrap_or_else(|_| endpoint.id().to_string()) + ); + + let mut gossip = tokio::time::timeout( + Duration::from_secs(30), + ec_iroh::ControlGossip::join(endpoint.clone(), &gossip_peers), + ) + .await + .context("timed out joining control gossip topic")??; + + let token = args + .auth_token + .or_else(|| std::env::var("EVERY_CHANNEL_WEB_UPSERT_TOKEN").ok()) + .map(|v| v.trim().to_string()) + .filter(|v| !v.is_empty()); + let directory_url = args.directory_url.trim_end_matches('/').to_string(); + let upsert_url = format!("{directory_url}/api/stream-upsert"); + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(8)) + .build() + .context("failed to build web bridge http client")?; + let mut last_upserted_unix_ms = HashMap::::new(); + let timeout_deadline = if args.timeout_ms == 0 { + None + } else { + Some(Instant::now() + Duration::from_millis(args.timeout_ms.max(1000))) + }; + + eprintln!("web upsert url: {}", upsert_url); + + loop { + let maybe = if let Some(deadline) = timeout_deadline { + let now = Instant::now(); + if now >= deadline { + return Err(anyhow!("timed out waiting for control announcements")); + } + tokio::time::timeout(deadline - now, gossip.next_announcement()) + .await + .context("timed out waiting for control announcement")?? + } else { + gossip.next_announcement().await? + }; + let Some(announcement) = maybe else { + continue; + }; + + if !control_announcement_is_fresh(&announcement, args.max_age_ms) { + continue; + } + + let stream_id = announcement.stream.id.0.clone(); + if let Some(prefix) = args.stream_prefix.as_deref() { + if !stream_id.starts_with(prefix) { + continue; + } + } + + let Some((relay_url, broadcast_name, track_name)) = + select_relay_transport_for_web(&announcement.transports) + else { + continue; + }; + + if last_upserted_unix_ms + .get(&stream_id) + .is_some_and(|seen| *seen >= announcement.updated_unix_ms) + { + continue; + } + + let ttl_ms = announcement.ttl_ms.clamp(5_000, 60_000); + let payload = WebStreamUpsertReq { + stream_id: &stream_id, + title: &announcement.stream.title, + relay_url: &relay_url, + broadcast_name: &broadcast_name, + track_name: &track_name, + expires_ms: now_unix_ms().saturating_add(ttl_ms), + }; + + let mut req = client.post(&upsert_url).json(&payload); + if let Some(token) = token.as_ref() { + req = req.bearer_auth(token); + } + let res = req + .send() + .await + .with_context(|| format!("failed posting to {upsert_url}"))?; + if !res.status().is_success() { + let status = res.status(); + let body = res.text().await.unwrap_or_default(); + tracing::warn!( + stream = %stream_id, + status = %status, + body = %body, + "web stream upsert failed" + ); + continue; + } + + last_upserted_unix_ms.insert(stream_id.clone(), announcement.updated_unix_ms); + tracing::info!( + stream = %stream_id, + relay = %relay_url, + broadcast = %broadcast_name, + "web stream upserted" + ); + if args.once { + break; + } + } + + Ok(()) +} + fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> { let start = Instant::now(); let mut last_len: Option = None; @@ -4643,6 +4852,7 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { let secret = parse_iroh_secret(args.iroh_secret.clone())?; let discovery = parse_discovery(args.discovery.as_deref())?; let endpoint = ec_iroh::build_endpoint(secret, discovery).await?; + let gossip_peers = parse_gossip_peers(args.gossip_peer.clone()); let announcement = build_control_announcement( args.name.clone(), @@ -4657,7 +4867,7 @@ async fn wt_publish(args: WtPublishArgs) -> Result<()> { match spawn_control_announcer_task( endpoint.clone(), - args.gossip_peer.clone(), + gossip_peers, announcement, Duration::from_millis(args.control_interval_ms.max(1000)), ) diff --git a/deploy/cloudflare-worker/src/index.ts b/deploy/cloudflare-worker/src/index.ts index 21de797..f996720 100644 --- a/deploy/cloudflare-worker/src/index.ts +++ b/deploy/cloudflare-worker/src/index.ts @@ -151,6 +151,7 @@ interface Env { EC_TURN_USER_PREFIX?: string; EC_TURN_HOST?: string; EC_TURN_HMAC?: string; + EC_STREAM_UPSERT_TOKEN?: string; } type DirectoryEntry = { @@ -173,6 +174,21 @@ type DirectoryList = { entries: DirectoryEntry[]; }; +type PublicStreamEntry = { + stream_id: string; + title: string; + relay_url: string; + broadcast_name: string; + track_name: string; + updated_ms: number; + expires_ms: number; +}; + +type PublicStreamList = { + now_ms: number; + entries: PublicStreamEntry[]; +}; + function nowMs(): number { return Date.now(); } @@ -190,6 +206,10 @@ function answerKey(streamId: string): string { return `a:${streamId}`; } +function streamKey(streamId: string): string { + return `s:${streamId}`; +} + async function listWithPrefix( storage: DurableObjectStorage, prefix: string, @@ -212,10 +232,12 @@ async function pruneAndCap( ): Promise { const entries = await listWithPrefix(storage, "e:"); const answers = await listWithPrefix(storage, "a:"); + const streams = await listWithPrefix(storage, "s:"); const toDelete: string[] = []; for (const [k, v] of entries) if (v.expires_ms <= now) toDelete.push(k); for (const [k, v] of answers) if (v.expires_ms <= now) toDelete.push(k); + for (const [k, v] of streams) if (v.expires_ms <= now) toDelete.push(k); // Cap growth defensively. This is not spam-resistant; it's a bootstrap rendezvous. if (entries.length > 200) { @@ -234,6 +256,14 @@ async function pruneAndCap( const keep = new Set(sorted.map((a) => answerKey(a.stream_id))); for (const [k] of answers) if (!keep.has(k)) toDelete.push(k); } + if (streams.length > 1000) { + const sorted = streams + .map(([, v]) => v) + .sort((a, b) => b.updated_ms - a.updated_ms) + .slice(0, 1000); + const keep = new Set(sorted.map((s) => streamKey(s.stream_id))); + for (const [k] of streams) if (!keep.has(k)) toDelete.push(k); + } if (toDelete.length > 0) { // Delete in chunks to avoid oversized requests. @@ -255,13 +285,32 @@ type AnswerPostReq = { answer: string; }; +type StreamUpsertReq = { + stream_id: string; + title: string; + relay_url: string; + broadcast_name: string; + track_name?: string; + expires_ms?: number; +}; + +function authBearerToken(request: Request): string | null { + const auth = request.headers.get("authorization"); + if (!auth) return null; + const m = /^Bearer\s+(.+)$/i.exec(auth.trim()); + if (!m) return null; + return m[1]; +} + // Minimal bootstrap API Durable Object. The binding name is historical; we keep it stable so // existing migrations and wrangler config remain valid while removing Cloudflare Containers. export class EcApiContainer implements DurableObject { private state: DurableObjectState; + private env: Env; - constructor(state: DurableObjectState) { + constructor(state: DurableObjectState, env: Env) { this.state = state; + this.env = env; } async fetch(request: Request): Promise { @@ -275,6 +324,61 @@ export class EcApiContainer implements DurableObject { return jsonNoStore({ ok: true }); } + if (url.pathname === "/api/public-streams") { + const items = await listWithPrefix(this.state.storage, "s:"); + const entries = items + .map(([, v]) => v) + .filter((v) => v.expires_ms > now) + .sort((a, b) => b.updated_ms - a.updated_ms); + const resp: PublicStreamList = { now_ms: now, entries }; + return jsonNoStore(resp); + } + + if (url.pathname === "/api/stream-upsert") { + if (request.method !== "POST") { + return jsonNoStore({ error: "method not allowed" }, { status: 405 }); + } + + const requiredToken = this.env.EC_STREAM_UPSERT_TOKEN?.trim(); + if (requiredToken) { + const supplied = authBearerToken(request); + if (!supplied || supplied !== requiredToken) { + return jsonNoStore({ error: "unauthorized" }, { status: 401 }); + } + } + + let body: StreamUpsertReq; + try { + body = (await request.json()) as StreamUpsertReq; + } catch { + return jsonNoStore({ error: "invalid json" }, { status: 400 }); + } + + if (!body.stream_id || !body.title || !body.relay_url || !body.broadcast_name) { + return jsonNoStore( + { error: "missing stream_id/title/relay_url/broadcast_name" }, + { status: 400 }, + ); + } + + const requestedExpires = body.expires_ms ?? now + 20_000; + const requestedTtl = Math.max(0, requestedExpires - now); + const ttlMs = Math.min(60_000, Math.max(5_000, requestedTtl)); + + const entry: PublicStreamEntry = { + stream_id: clampStr(body.stream_id, 256), + title: clampStr(body.title, 128), + relay_url: clampStr(body.relay_url, 512), + broadcast_name: clampStr(body.broadcast_name, 256), + track_name: clampStr(body.track_name || "video0.m4s", 256), + updated_ms: now, + expires_ms: now + ttlMs, + }; + + await this.state.storage.put(streamKey(entry.stream_id), entry); + return jsonNoStore({ ok: true, ttl_ms: ttlMs, entry }); + } + if (url.pathname === "/api/directory") { const items = await listWithPrefix(this.state.storage, "e:"); const entries = items diff --git a/evolution/proposals/ECP-0068-iroh-control-web-directory-bridge.md b/evolution/proposals/ECP-0068-iroh-control-web-directory-bridge.md new file mode 100644 index 0000000..7157e92 --- /dev/null +++ b/evolution/proposals/ECP-0068-iroh-control-web-directory-bridge.md @@ -0,0 +1,41 @@ +# ECP-0068: Iroh Control To Web Directory Bridge + +Status: Draft + +## Decision + +Add a first-class bridge from iroh control gossip to the public website directory: + +1. `ec-node control-bridge-web`: +- subscribes to control announcements on `every.channel/control/v1`, +- selects relay transports (`relay_url` + `broadcast_name` + `track_name`), +- upserts them into the website API (`/api/stream-upsert`). + +2. Extend the Cloudflare Worker API with: +- `POST /api/stream-upsert` (optional bearer token auth), +- `GET /api/public-streams` (read-only list for browsers). + +3. Extend the web watcher UI to render `Live Now` from `/api/public-streams`, with one-click watch. + +## Motivation + +The site currently requires manual broadcast names even when streams are already announced over iroh control gossip. This bridge makes `every.channel` discoverable from published control state and gives browser users a single “open site and click watch” flow. + +## Scope + +In scope: +- Control gossip -> web directory bridge command. +- Worker storage/API for public stream entries. +- Browser UI list and refresh loop. +- Control gossip join behavior updated to work without bootstrap peers (`subscribe` without blocking join). + +Out of scope: +- Browser-native iroh direct playback. +- Signed/authenticated control announcements. +- Replacing relay playback with direct iroh in browsers. + +## Rollout / Reversibility + +- Additive change; existing `/api/directory` and watch-by-link behavior remain intact. +- Bridge can be disabled by stopping `control-bridge-web`. +- Public listing auth can be tightened by setting `EC_STREAM_UPSERT_TOKEN`.