//! Media over QUIC (MoQ) scaffolding. use anyhow::{anyhow, Context, Result}; use bytes::Bytes; use ec_core::Manifest; use ec_iroh::DiscoveryConfig; use iroh::{protocol::Router, Endpoint, EndpointAddr, SecretKey}; use moq_lite::{BroadcastConsumer, BroadcastProducer, Group, Track}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fs; use std::path::PathBuf; use std::time::Duration; use tokio::sync::mpsc; use tokio::task::JoinHandle; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TrackName { pub namespace: String, pub name: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct GroupId(pub u64); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ObjectId(pub u64); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ObjectMeta { pub created_unix_ms: u64, pub content_type: String, pub size_bytes: u64, pub timing: Option, pub encryption: Option, pub chunk_hash: Option, pub chunk_hash_alg: Option, pub chunk_proof: Option>, pub chunk_proof_alg: Option, pub manifest_id: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ObjectPayload { pub meta: ObjectMeta, pub data: Vec, } pub const DEFAULT_TRACK_NAME: &str = "chunks"; pub const DEFAULT_MANIFEST_TRACK_NAME: &str = "manifests"; pub trait Publisher { fn publish_object( &self, track: &TrackName, group: GroupId, object: ObjectPayload, ) -> Result<()>; } pub trait Subscriber { fn subscribe_track(&self, track: &TrackName) -> Result<()>; } pub trait Relay { fn announce_track(&self, track: &TrackName) -> Result<()>; fn cache_object(&self, track: &TrackName, group: GroupId, object: ObjectPayload) -> Result<()>; } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TimingMeta { pub chunk_index: u64, pub chunk_start_27mhz: u64, pub chunk_duration_27mhz: u64, pub utc_start_unix: Option, pub sync_status: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct EncryptionMeta { pub alg: String, pub key_id: String, pub nonce_hex: String, } #[derive(Debug, Clone)] pub struct FileRelay { root: PathBuf, } impl FileRelay { pub fn new(root: impl Into) -> Self { Self { root: root.into() } } pub fn write_object( &self, track: &TrackName, group: GroupId, object_id: ObjectId, object: &ObjectPayload, ) -> Result<()> { let base = self.object_dir(track, group, object_id); fs::create_dir_all(&base) .with_context(|| format!("failed to create {}", base.display()))?; let data_path = base.join("data.bin"); let meta_path = base.join("meta.json"); fs::write(&data_path, &object.data) .with_context(|| format!("failed to write {}", data_path.display()))?; fs::write(&meta_path, serde_json::to_vec_pretty(&object.meta)?) .with_context(|| format!("failed to write {}", meta_path.display()))?; Ok(()) } fn object_dir(&self, track: &TrackName, group: GroupId, object_id: ObjectId) -> PathBuf { let namespace = sanitize_component(&track.namespace); let name = sanitize_component(&track.name); self.root .join(namespace) .join(name) .join(format!("group-{}", group.0)) .join(format!("object-{}", object_id.0)) } } impl Relay for FileRelay { fn announce_track(&self, _track: &TrackName) -> Result<()> { Ok(()) } fn cache_object(&self, track: &TrackName, group: GroupId, object: ObjectPayload) -> Result<()> { self.write_object(track, group, ObjectId(0), &object) } } fn sanitize_component(value: &str) -> String { value .chars() .map(|c| match c { 'a'..='z' | '0'..='9' | '-' | '_' => c, 'A'..='Z' => c.to_ascii_lowercase(), _ => '_', }) .collect() } pub fn encode_object_frame(meta: &ObjectMeta, data: &[u8]) -> Result> { let meta_bytes = serde_json::to_vec(meta)?; let meta_len = u32::try_from(meta_bytes.len()).map_err(|_| anyhow!("object meta too large"))?; let mut out = Vec::with_capacity(4 + meta_bytes.len() + data.len()); out.extend_from_slice(&meta_len.to_be_bytes()); out.extend_from_slice(&meta_bytes); out.extend_from_slice(data); Ok(out) } pub fn decode_object_frame(bytes: &[u8]) -> Result { if bytes.len() < 4 { return Err(anyhow!("object frame too short")); } let meta_len = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize; if bytes.len() < 4 + meta_len { return Err(anyhow!("object frame missing metadata bytes")); } let meta = serde_json::from_slice(&bytes[4..4 + meta_len])?; let data = bytes[4 + meta_len..].to_vec(); Ok(ObjectPayload { meta, data }) } pub fn encode_manifest_frame(manifest: &Manifest) -> Result> { Ok(serde_json::to_vec(manifest)?) } pub fn decode_manifest_frame(bytes: &[u8]) -> Result { Ok(serde_json::from_slice(bytes)?) } #[derive(Debug)] pub struct MoqNode { endpoint: Endpoint, router: Router, moq: iroh_moq::Moq, } impl MoqNode { pub async fn bind(secret: Option) -> Result { let discovery = DiscoveryConfig::from_env()?; Self::bind_with_discovery(secret, discovery).await } pub async fn bind_with_discovery( secret: Option, discovery: DiscoveryConfig, ) -> Result { let endpoint = ec_iroh::build_endpoint(secret, discovery).await?; let moq = iroh_moq::Moq::new(endpoint.clone()); let router = Router::builder(endpoint.clone()) .accept(iroh_moq::ALPN, moq.protocol_handler()) .spawn(); Ok(Self { endpoint, router, moq, }) } pub fn endpoint(&self) -> &Endpoint { &self.endpoint } pub fn endpoint_addr(&self) -> EndpointAddr { self.router.endpoint().addr() } pub async fn publish_objects( &self, broadcast_name: impl Into, track_name: impl Into, ) -> Result { let broadcast_name = broadcast_name.into(); let track_name = track_name.into(); let mut broadcast = BroadcastProducer::default(); let track = broadcast.create_track(Track { name: track_name.clone(), priority: 0, }); self.moq .publish(broadcast_name.clone(), broadcast.clone()) .await?; Ok(MoqPublisher { broadcast_name, track_name, broadcast, track, }) } /// Publish a broadcast containing multiple tracks, all created before publishing. /// /// This avoids subtle issues in some MoQ implementations where tracks added after the /// initial publish are not reliably deliverable to subscribers. pub async fn publish_track_set( &self, broadcast_name: impl Into, object_tracks: Vec, manifest_tracks: Vec, ) -> Result { let broadcast_name = broadcast_name.into(); let mut broadcast = BroadcastProducer::default(); let mut object = HashMap::new(); for name in object_tracks { let track = broadcast.create_track(Track { name: name.clone(), priority: 0, }); object.insert(name, track); } let mut manifests = HashMap::new(); for name in manifest_tracks { let track = broadcast.create_track(Track { name: name.clone(), priority: 0, }); manifests.insert(name, track); } self.moq.publish(broadcast_name.clone(), broadcast).await?; Ok(MoqPublishSet { broadcast_name, object, manifests, }) } pub async fn subscribe_objects( &self, remote: EndpointAddr, broadcast_name: impl Into, track_name: impl Into, ) -> Result { let broadcast_name = broadcast_name.into(); let track_name = track_name.into(); let mut session = self.moq.connect(remote).await?; let broadcast = session.subscribe(&broadcast_name).await?; let track = subscribe_track(&broadcast, &track_name)?; MoqObjectStream::spawn(session, track) } pub async fn subscribe_manifests( &self, remote: EndpointAddr, broadcast_name: impl Into, track_name: impl Into, ) -> Result { let broadcast_name = broadcast_name.into(); let track_name = track_name.into(); let mut session = self.moq.connect(remote).await?; let broadcast = session.subscribe(&broadcast_name).await?; let track = subscribe_track(&broadcast, &track_name)?; MoqManifestStream::spawn(session, track) } } pub struct MoqPublishSet { broadcast_name: String, object: HashMap, manifests: HashMap, } impl MoqPublishSet { pub fn publish_object( &mut self, track_name: &str, group: GroupId, object: ObjectPayload, ) -> Result<()> { let Some(track) = self.object.get_mut(track_name) else { return Err(anyhow!("unknown object track {}", track_name)); }; let Some(mut group_writer) = track.create_group(Group { sequence: group.0 }) else { return Err(anyhow!("group {} already published", group.0)); }; let frame = encode_object_frame(&object.meta, &object.data)?; group_writer.write_frame(Bytes::from(frame)); group_writer.close(); Ok(()) } pub fn publish_manifest( &mut self, track_name: &str, sequence: u64, manifest: &Manifest, ) -> Result<()> { let Some(track) = self.manifests.get_mut(track_name) else { return Err(anyhow!("unknown manifest track {}", track_name)); }; let Some(mut group_writer) = track.create_group(Group { sequence }) else { return Err(anyhow!("manifest group {} already published", sequence)); }; let frame = encode_manifest_frame(manifest)?; group_writer.write_frame(Bytes::from(frame)); group_writer.close(); Ok(()) } pub fn broadcast_name(&self) -> &str { &self.broadcast_name } } pub struct MoqPublisher { broadcast_name: String, track_name: String, broadcast: BroadcastProducer, track: moq_lite::TrackProducer, } impl MoqPublisher { pub fn publish_object(&mut self, group: GroupId, object: ObjectPayload) -> Result<()> { let Some(mut group_writer) = self.track.create_group(Group { sequence: group.0 }) else { return Err(anyhow!("group {} already published", group.0)); }; let frame = encode_object_frame(&object.meta, &object.data)?; group_writer.write_frame(Bytes::from(frame)); group_writer.close(); Ok(()) } pub fn create_side_track(&mut self, track_name: impl Into) -> Result { let track_name = track_name.into(); let track = self.broadcast.create_track(Track { name: track_name.clone(), priority: 0, }); Ok(MoqSidePublisher { track_name, track }) } pub fn create_manifest_track( &mut self, track_name: impl Into, ) -> Result { let track_name = track_name.into(); let track = self.broadcast.create_track(Track { name: track_name.clone(), priority: 0, }); Ok(MoqManifestPublisher { track_name, track }) } pub fn broadcast_name(&self) -> &str { &self.broadcast_name } pub fn track_name(&self) -> &str { &self.track_name } } pub struct MoqSidePublisher { track_name: String, track: moq_lite::TrackProducer, } impl MoqSidePublisher { pub fn publish_object(&mut self, group: GroupId, object: ObjectPayload) -> Result<()> { let Some(mut group_writer) = self.track.create_group(Group { sequence: group.0 }) else { return Err(anyhow!("group {} already published", group.0)); }; let frame = encode_object_frame(&object.meta, &object.data)?; group_writer.write_frame(Bytes::from(frame)); group_writer.close(); Ok(()) } pub fn track_name(&self) -> &str { &self.track_name } } pub struct MoqManifestPublisher { track_name: String, track: moq_lite::TrackProducer, } impl MoqManifestPublisher { pub fn publish_manifest(&mut self, sequence: u64, manifest: &Manifest) -> Result<()> { let Some(mut group_writer) = self.track.create_group(Group { sequence }) else { return Err(anyhow!("manifest group {} already published", sequence)); }; let frame = encode_manifest_frame(manifest)?; group_writer.write_frame(Bytes::from(frame)); group_writer.close(); Ok(()) } pub fn track_name(&self) -> &str { &self.track_name } } pub struct MoqObjectStream { receiver: mpsc::Receiver, _task: JoinHandle<()>, _session: iroh_moq::MoqSession, } impl MoqObjectStream { fn spawn(session: iroh_moq::MoqSession, mut track: moq_lite::TrackConsumer) -> Result { let (tx, rx) = mpsc::channel(32); let task = tokio::spawn(async move { loop { let next_group = track.next_group().await; let Some(mut group) = (match next_group { Ok(group) => group, Err(err) => { tracing::warn!("moq track error: {err:#}"); break; } }) else { break; }; let mut buffer = Vec::new(); loop { match group.read_frame().await { Ok(Some(frame)) => buffer.extend_from_slice(&frame), Ok(None) => break, Err(err) => { tracing::warn!("moq group error: {err:#}"); break; } } } if buffer.is_empty() { continue; } match decode_object_frame(&buffer) { Ok(object) => { if tx.send(object).await.is_err() { break; } } Err(err) => { tracing::warn!("failed to decode object frame: {err:#}"); } } } }); Ok(Self { receiver: rx, _task: task, _session: session, }) } pub async fn recv(&mut self) -> Option { self.receiver.recv().await } } pub struct MoqManifestStream { receiver: mpsc::Receiver, _task: JoinHandle<()>, _session: iroh_moq::MoqSession, } impl MoqManifestStream { fn spawn(session: iroh_moq::MoqSession, mut track: moq_lite::TrackConsumer) -> Result { let (tx, rx) = mpsc::channel(8); let task = tokio::spawn(async move { loop { let next_group = track.next_group().await; let Some(mut group) = (match next_group { Ok(group) => group, Err(err) => { tracing::warn!("moq manifest track error: {err:#}"); break; } }) else { break; }; let mut buffer = Vec::new(); loop { match group.read_frame().await { Ok(Some(frame)) => buffer.extend_from_slice(&frame), Ok(None) => break, Err(err) => { tracing::warn!("moq manifest group error: {err:#}"); break; } } } if buffer.is_empty() { continue; } match decode_manifest_frame(&buffer) { Ok(manifest) => { if tx.send(manifest).await.is_err() { break; } } Err(err) => { tracing::warn!("failed to decode manifest frame: {err:#}"); } } } }); Ok(Self { receiver: rx, _task: task, _session: session, }) } pub async fn recv(&mut self) -> Option { self.receiver.recv().await } } fn subscribe_track(broadcast: &BroadcastConsumer, name: &str) -> Result { let track = broadcast.subscribe_track(&Track::new(name)); Ok(track) } #[derive(Debug, Clone)] pub struct HlsWriter { output_dir: PathBuf, window: usize, target_duration: f64, init_filename: String, segments: std::collections::VecDeque, } #[derive(Debug, Clone)] struct HlsSegment { index: u64, duration: f64, filename: String, } impl HlsWriter { pub fn new_cmaf( output_dir: impl Into, target_duration: f64, window: usize, ) -> Result { // CMAF-only writer: init.mp4 + segment_*.m4s + HLS playlist as a local compatibility artifact. let output_dir = output_dir.into(); fs::create_dir_all(&output_dir) .with_context(|| format!("failed to create {}", output_dir.display()))?; Ok(Self { output_dir, window: window.max(1), target_duration, init_filename: "init.mp4".to_string(), segments: std::collections::VecDeque::new(), }) } pub fn write_init_segment(&mut self, data: &[u8]) -> Result { let path = self.output_dir.join(&self.init_filename); fs::write(&path, data).with_context(|| format!("failed to write {}", path.display()))?; self.write_playlist()?; Ok(path) } pub fn write_segment(&mut self, index: u64, duration: f64, data: &[u8]) -> Result { let filename = format!("segment_{index:06}.m4s"); let path = self.output_dir.join(&filename); fs::write(&path, data).with_context(|| format!("failed to write {}", path.display()))?; self.segments.push_back(HlsSegment { index, duration, filename, }); while self.segments.len() > self.window { self.segments.pop_front(); } self.write_playlist()?; Ok(path) } fn write_playlist(&self) -> Result<()> { let mut lines = Vec::new(); lines.push("#EXTM3U".to_string()); lines.push("#EXT-X-VERSION:7".to_string()); lines.push("#EXT-X-INDEPENDENT-SEGMENTS".to_string()); lines.push(format!("#EXT-X-MAP:URI=\"{}\"", self.init_filename)); let target = self.target_duration.ceil().max(1.0) as u64; lines.push(format!("#EXT-X-TARGETDURATION:{target}")); if let Some(first) = self.segments.front() { lines.push(format!("#EXT-X-MEDIA-SEQUENCE:{}", first.index)); } for seg in &self.segments { lines.push(format!("#EXTINF:{:.3},", seg.duration)); lines.push(seg.filename.clone()); } let playlist_path = self.output_dir.join("index.m3u8"); fs::write(&playlist_path, lines.join("\n") + "\n") .with_context(|| format!("failed to write {}", playlist_path.display()))?; Ok(()) } } pub fn chunk_duration_secs(meta: &ObjectMeta, fallback: Duration) -> f64 { if let Some(timing) = &meta.timing { let secs = timing.chunk_duration_27mhz as f64 / 27_000_000.0; if secs > 0.0 { return secs; } } fallback.as_secs_f64() } #[cfg(test)] mod tests { use super::*; use std::env; #[test] fn sanitize_component_is_stable() { assert_eq!(sanitize_component("Hello World!"), "hello_world_"); assert_eq!(sanitize_component("a-b_C9"), "a-b_c9"); } #[test] fn object_frame_roundtrip() { let meta = ObjectMeta { created_unix_ms: 1, content_type: "application/octet-stream".to_string(), size_bytes: 3, timing: Some(TimingMeta { chunk_index: 7, chunk_start_27mhz: 0, chunk_duration_27mhz: 54_000_000, utc_start_unix: None, sync_status: "synthetic".to_string(), }), encryption: None, chunk_hash: Some("00".repeat(32)), chunk_hash_alg: Some("blake3".to_string()), chunk_proof: Some(vec!["00".repeat(32)]), chunk_proof_alg: Some("merkle+blake3".to_string()), manifest_id: Some("m".to_string()), }; let data = b"abc".to_vec(); let frame = encode_object_frame(&meta, &data).unwrap(); let decoded = decode_object_frame(&frame).unwrap(); assert_eq!(decoded.data, data); assert_eq!(decoded.meta.created_unix_ms, meta.created_unix_ms); assert_eq!( decoded.meta.timing.as_ref().unwrap().chunk_index, meta.timing.as_ref().unwrap().chunk_index ); assert_eq!(decoded.meta.manifest_id, meta.manifest_id); } #[test] fn decode_rejects_short_frame() { assert!(decode_object_frame(&[]).is_err()); assert!(decode_object_frame(&[0, 0, 0]).is_err()); } #[test] fn manifest_frame_roundtrip() { let manifest = ec_core::Manifest { body: ec_core::ManifestBody { stream_id: ec_core::StreamId("s".to_string()), epoch_id: "e".to_string(), chunk_duration_ms: 2000, total_chunks: 1, chunk_start_index: 0, encoder_profile_id: "p".to_string(), merkle_root: "00".repeat(32), created_unix_ms: 1, metadata: Vec::new(), chunk_hashes: vec!["11".repeat(32)], variants: None, }, manifest_id: "m".to_string(), signatures: Vec::new(), commitments: Vec::new(), }; let bytes = encode_manifest_frame(&manifest).unwrap(); let decoded = decode_manifest_frame(&bytes).unwrap(); assert_eq!(decoded.manifest_id, "m"); assert_eq!(decoded.body.epoch_id, "e"); } #[test] fn manifest_frame_signed_roundtrip_verifies() { let prev = env::var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY").ok(); env::set_var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY", "11".repeat(32)); let keypair = ec_crypto::load_manifest_keypair_from_env() .expect("load should not error") .expect("keypair should exist"); let chunk_hashes = vec![blake3::hash(b"chunk0").to_hex().to_string()]; let merkle_root = ec_core::merkle_root_from_hashes(&chunk_hashes).unwrap(); let body = ec_core::ManifestBody { stream_id: ec_core::StreamId("s".to_string()), epoch_id: "e".to_string(), chunk_duration_ms: 2000, total_chunks: 1, chunk_start_index: 0, encoder_profile_id: "p".to_string(), merkle_root, created_unix_ms: 1, metadata: Vec::new(), chunk_hashes, variants: None, }; let manifest_id = body.manifest_id().unwrap(); let sig = ec_crypto::sign_manifest_id(&manifest_id, &keypair); assert!(ec_crypto::verify_manifest_signature(&manifest_id, &sig)); let manifest = ec_core::Manifest { body, manifest_id: manifest_id.clone(), signatures: vec![sig], commitments: Vec::new(), }; let bytes = encode_manifest_frame(&manifest).unwrap(); let decoded = decode_manifest_frame(&bytes).unwrap(); assert_eq!(decoded.manifest_id, manifest_id); assert_eq!(decoded.signatures.len(), 1); assert!(ec_crypto::verify_manifest_signature( &decoded.manifest_id, &decoded.signatures[0] )); match prev { Some(value) => env::set_var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY", value), None => env::remove_var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY"), } } #[test] fn object_frame_encrypt_decrypt_roundtrip_and_hash_matches_plaintext() { let stream_id = "ec/stream/v1/source/test/device-a/channel-b"; let chunk_index = 7u64; let plaintext = b"hello every.channel"; let expected_hash = blake3::hash(plaintext).to_hex().to_string(); let enc = ec_crypto::encrypt_stream_data(stream_id, chunk_index, plaintext, None); let meta = ObjectMeta { created_unix_ms: 1, content_type: "application/octet-stream".to_string(), size_bytes: enc.ciphertext.len() as u64, timing: Some(TimingMeta { chunk_index, chunk_start_27mhz: 0, chunk_duration_27mhz: 54_000_000, utc_start_unix: None, sync_status: "synthetic".to_string(), }), encryption: Some(EncryptionMeta { alg: enc.alg.to_string(), key_id: stream_id.to_string(), nonce_hex: hex::encode(enc.nonce), }), chunk_hash: Some(expected_hash.clone()), chunk_hash_alg: Some("blake3".to_string()), chunk_proof: None, chunk_proof_alg: None, manifest_id: None, }; let frame = encode_object_frame(&meta, &enc.ciphertext).unwrap(); let decoded = decode_object_frame(&frame).unwrap(); let out = ec_crypto::decrypt_stream_data(stream_id, chunk_index, &decoded.data, None) .expect("decrypt should succeed"); assert_eq!(out, plaintext); assert_eq!( decoded.meta.chunk_hash.as_deref(), Some(expected_hash.as_str()) ); assert_eq!( blake3::hash(&out).to_hex().to_string(), decoded.meta.chunk_hash.unwrap() ); } }