834 lines
26 KiB
Rust
834 lines
26 KiB
Rust
//! Media over QUIC (MoQ) scaffolding.
|
|
|
|
use anyhow::{anyhow, Context, Result};
|
|
use bytes::Bytes;
|
|
use ec_core::Manifest;
|
|
use ec_iroh::DiscoveryConfig;
|
|
use iroh::{protocol::Router, Endpoint, EndpointAddr, SecretKey};
|
|
use moq_lite::{BroadcastConsumer, BroadcastProducer, Group, Track};
|
|
use serde::{Deserialize, Serialize};
|
|
use std::collections::HashMap;
|
|
use std::fs;
|
|
use std::path::PathBuf;
|
|
use std::time::Duration;
|
|
use tokio::sync::mpsc;
|
|
use tokio::task::JoinHandle;
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct TrackName {
|
|
pub namespace: String,
|
|
pub name: String,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct GroupId(pub u64);
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct ObjectId(pub u64);
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct ObjectMeta {
|
|
pub created_unix_ms: u64,
|
|
pub content_type: String,
|
|
pub size_bytes: u64,
|
|
pub timing: Option<TimingMeta>,
|
|
pub encryption: Option<EncryptionMeta>,
|
|
pub chunk_hash: Option<String>,
|
|
pub chunk_hash_alg: Option<String>,
|
|
pub chunk_proof: Option<Vec<String>>,
|
|
pub chunk_proof_alg: Option<String>,
|
|
pub manifest_id: Option<String>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct ObjectPayload {
|
|
pub meta: ObjectMeta,
|
|
pub data: Vec<u8>,
|
|
}
|
|
|
|
pub const DEFAULT_TRACK_NAME: &str = "chunks";
|
|
pub const DEFAULT_MANIFEST_TRACK_NAME: &str = "manifests";
|
|
|
|
pub trait Publisher {
|
|
fn publish_object(
|
|
&self,
|
|
track: &TrackName,
|
|
group: GroupId,
|
|
object: ObjectPayload,
|
|
) -> Result<()>;
|
|
}
|
|
|
|
pub trait Subscriber {
|
|
fn subscribe_track(&self, track: &TrackName) -> Result<()>;
|
|
}
|
|
|
|
pub trait Relay {
|
|
fn announce_track(&self, track: &TrackName) -> Result<()>;
|
|
fn cache_object(&self, track: &TrackName, group: GroupId, object: ObjectPayload) -> Result<()>;
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct TimingMeta {
|
|
pub chunk_index: u64,
|
|
pub chunk_start_27mhz: u64,
|
|
pub chunk_duration_27mhz: u64,
|
|
pub utc_start_unix: Option<i64>,
|
|
pub sync_status: String,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct EncryptionMeta {
|
|
pub alg: String,
|
|
pub key_id: String,
|
|
pub nonce_hex: String,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct FileRelay {
|
|
root: PathBuf,
|
|
}
|
|
|
|
impl FileRelay {
|
|
pub fn new(root: impl Into<PathBuf>) -> Self {
|
|
Self { root: root.into() }
|
|
}
|
|
|
|
pub fn write_object(
|
|
&self,
|
|
track: &TrackName,
|
|
group: GroupId,
|
|
object_id: ObjectId,
|
|
object: &ObjectPayload,
|
|
) -> Result<()> {
|
|
let base = self.object_dir(track, group, object_id);
|
|
fs::create_dir_all(&base)
|
|
.with_context(|| format!("failed to create {}", base.display()))?;
|
|
|
|
let data_path = base.join("data.bin");
|
|
let meta_path = base.join("meta.json");
|
|
|
|
fs::write(&data_path, &object.data)
|
|
.with_context(|| format!("failed to write {}", data_path.display()))?;
|
|
fs::write(&meta_path, serde_json::to_vec_pretty(&object.meta)?)
|
|
.with_context(|| format!("failed to write {}", meta_path.display()))?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn object_dir(&self, track: &TrackName, group: GroupId, object_id: ObjectId) -> PathBuf {
|
|
let namespace = sanitize_component(&track.namespace);
|
|
let name = sanitize_component(&track.name);
|
|
self.root
|
|
.join(namespace)
|
|
.join(name)
|
|
.join(format!("group-{}", group.0))
|
|
.join(format!("object-{}", object_id.0))
|
|
}
|
|
}
|
|
|
|
impl Relay for FileRelay {
|
|
fn announce_track(&self, _track: &TrackName) -> Result<()> {
|
|
Ok(())
|
|
}
|
|
|
|
fn cache_object(&self, track: &TrackName, group: GroupId, object: ObjectPayload) -> Result<()> {
|
|
self.write_object(track, group, ObjectId(0), &object)
|
|
}
|
|
}
|
|
|
|
fn sanitize_component(value: &str) -> String {
|
|
value
|
|
.chars()
|
|
.map(|c| match c {
|
|
'a'..='z' | '0'..='9' | '-' | '_' => c,
|
|
'A'..='Z' => c.to_ascii_lowercase(),
|
|
_ => '_',
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
pub fn encode_object_frame(meta: &ObjectMeta, data: &[u8]) -> Result<Vec<u8>> {
|
|
let meta_bytes = serde_json::to_vec(meta)?;
|
|
let meta_len = u32::try_from(meta_bytes.len()).map_err(|_| anyhow!("object meta too large"))?;
|
|
let mut out = Vec::with_capacity(4 + meta_bytes.len() + data.len());
|
|
out.extend_from_slice(&meta_len.to_be_bytes());
|
|
out.extend_from_slice(&meta_bytes);
|
|
out.extend_from_slice(data);
|
|
Ok(out)
|
|
}
|
|
|
|
pub fn decode_object_frame(bytes: &[u8]) -> Result<ObjectPayload> {
|
|
if bytes.len() < 4 {
|
|
return Err(anyhow!("object frame too short"));
|
|
}
|
|
let meta_len = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize;
|
|
if bytes.len() < 4 + meta_len {
|
|
return Err(anyhow!("object frame missing metadata bytes"));
|
|
}
|
|
let meta = serde_json::from_slice(&bytes[4..4 + meta_len])?;
|
|
let data = bytes[4 + meta_len..].to_vec();
|
|
Ok(ObjectPayload { meta, data })
|
|
}
|
|
|
|
pub fn encode_manifest_frame(manifest: &Manifest) -> Result<Vec<u8>> {
|
|
Ok(serde_json::to_vec(manifest)?)
|
|
}
|
|
|
|
pub fn decode_manifest_frame(bytes: &[u8]) -> Result<Manifest> {
|
|
Ok(serde_json::from_slice(bytes)?)
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct MoqNode {
|
|
endpoint: Endpoint,
|
|
router: Router,
|
|
moq: iroh_moq::Moq,
|
|
}
|
|
|
|
impl MoqNode {
|
|
pub async fn bind(secret: Option<SecretKey>) -> Result<Self> {
|
|
let discovery = DiscoveryConfig::from_env()?;
|
|
Self::bind_with_discovery(secret, discovery).await
|
|
}
|
|
|
|
pub async fn bind_with_discovery(
|
|
secret: Option<SecretKey>,
|
|
discovery: DiscoveryConfig,
|
|
) -> Result<Self> {
|
|
let endpoint = ec_iroh::build_endpoint(secret, discovery).await?;
|
|
let moq = iroh_moq::Moq::new(endpoint.clone());
|
|
let router = Router::builder(endpoint.clone())
|
|
.accept(iroh_moq::ALPN, moq.protocol_handler())
|
|
.spawn();
|
|
Ok(Self {
|
|
endpoint,
|
|
router,
|
|
moq,
|
|
})
|
|
}
|
|
|
|
pub fn endpoint(&self) -> &Endpoint {
|
|
&self.endpoint
|
|
}
|
|
|
|
pub fn endpoint_addr(&self) -> EndpointAddr {
|
|
self.router.endpoint().addr()
|
|
}
|
|
|
|
pub async fn publish_objects(
|
|
&self,
|
|
broadcast_name: impl Into<String>,
|
|
track_name: impl Into<String>,
|
|
) -> Result<MoqPublisher> {
|
|
let broadcast_name = broadcast_name.into();
|
|
let track_name = track_name.into();
|
|
let mut broadcast = BroadcastProducer::default();
|
|
let track = broadcast.create_track(Track {
|
|
name: track_name.clone(),
|
|
priority: 0,
|
|
});
|
|
self.moq
|
|
.publish(broadcast_name.clone(), broadcast.clone())
|
|
.await?;
|
|
Ok(MoqPublisher {
|
|
broadcast_name,
|
|
track_name,
|
|
broadcast,
|
|
track,
|
|
})
|
|
}
|
|
|
|
/// Publish a broadcast containing multiple tracks, all created before publishing.
|
|
///
|
|
/// This avoids subtle issues in some MoQ implementations where tracks added after the
|
|
/// initial publish are not reliably deliverable to subscribers.
|
|
pub async fn publish_track_set(
|
|
&self,
|
|
broadcast_name: impl Into<String>,
|
|
object_tracks: Vec<String>,
|
|
manifest_tracks: Vec<String>,
|
|
) -> Result<MoqPublishSet> {
|
|
let broadcast_name = broadcast_name.into();
|
|
let mut broadcast = BroadcastProducer::default();
|
|
|
|
let mut object = HashMap::new();
|
|
for name in object_tracks {
|
|
let track = broadcast.create_track(Track {
|
|
name: name.clone(),
|
|
priority: 0,
|
|
});
|
|
object.insert(name, track);
|
|
}
|
|
|
|
let mut manifests = HashMap::new();
|
|
for name in manifest_tracks {
|
|
let track = broadcast.create_track(Track {
|
|
name: name.clone(),
|
|
priority: 0,
|
|
});
|
|
manifests.insert(name, track);
|
|
}
|
|
|
|
self.moq.publish(broadcast_name.clone(), broadcast).await?;
|
|
Ok(MoqPublishSet {
|
|
broadcast_name,
|
|
object,
|
|
manifests,
|
|
})
|
|
}
|
|
|
|
pub async fn subscribe_objects(
|
|
&self,
|
|
remote: EndpointAddr,
|
|
broadcast_name: impl Into<String>,
|
|
track_name: impl Into<String>,
|
|
) -> Result<MoqObjectStream> {
|
|
let broadcast_name = broadcast_name.into();
|
|
let track_name = track_name.into();
|
|
let mut session = self.moq.connect(remote).await?;
|
|
let broadcast = session.subscribe(&broadcast_name).await?;
|
|
let track = subscribe_track(&broadcast, &track_name)?;
|
|
MoqObjectStream::spawn(session, track)
|
|
}
|
|
|
|
pub async fn subscribe_manifests(
|
|
&self,
|
|
remote: EndpointAddr,
|
|
broadcast_name: impl Into<String>,
|
|
track_name: impl Into<String>,
|
|
) -> Result<MoqManifestStream> {
|
|
let broadcast_name = broadcast_name.into();
|
|
let track_name = track_name.into();
|
|
let mut session = self.moq.connect(remote).await?;
|
|
let broadcast = session.subscribe(&broadcast_name).await?;
|
|
let track = subscribe_track(&broadcast, &track_name)?;
|
|
MoqManifestStream::spawn(session, track)
|
|
}
|
|
}
|
|
|
|
pub struct MoqPublishSet {
|
|
broadcast_name: String,
|
|
object: HashMap<String, moq_lite::TrackProducer>,
|
|
manifests: HashMap<String, moq_lite::TrackProducer>,
|
|
}
|
|
|
|
impl MoqPublishSet {
|
|
pub fn publish_object(
|
|
&mut self,
|
|
track_name: &str,
|
|
group: GroupId,
|
|
object: ObjectPayload,
|
|
) -> Result<()> {
|
|
let Some(track) = self.object.get_mut(track_name) else {
|
|
return Err(anyhow!("unknown object track {}", track_name));
|
|
};
|
|
let Some(mut group_writer) = track.create_group(Group { sequence: group.0 }) else {
|
|
return Err(anyhow!("group {} already published", group.0));
|
|
};
|
|
let frame = encode_object_frame(&object.meta, &object.data)?;
|
|
group_writer.write_frame(Bytes::from(frame));
|
|
group_writer.close();
|
|
Ok(())
|
|
}
|
|
|
|
pub fn publish_manifest(
|
|
&mut self,
|
|
track_name: &str,
|
|
sequence: u64,
|
|
manifest: &Manifest,
|
|
) -> Result<()> {
|
|
let Some(track) = self.manifests.get_mut(track_name) else {
|
|
return Err(anyhow!("unknown manifest track {}", track_name));
|
|
};
|
|
let Some(mut group_writer) = track.create_group(Group { sequence }) else {
|
|
return Err(anyhow!("manifest group {} already published", sequence));
|
|
};
|
|
let frame = encode_manifest_frame(manifest)?;
|
|
group_writer.write_frame(Bytes::from(frame));
|
|
group_writer.close();
|
|
Ok(())
|
|
}
|
|
|
|
pub fn broadcast_name(&self) -> &str {
|
|
&self.broadcast_name
|
|
}
|
|
}
|
|
|
|
pub struct MoqPublisher {
|
|
broadcast_name: String,
|
|
track_name: String,
|
|
broadcast: BroadcastProducer,
|
|
track: moq_lite::TrackProducer,
|
|
}
|
|
|
|
impl MoqPublisher {
|
|
pub fn publish_object(&mut self, group: GroupId, object: ObjectPayload) -> Result<()> {
|
|
let Some(mut group_writer) = self.track.create_group(Group { sequence: group.0 }) else {
|
|
return Err(anyhow!("group {} already published", group.0));
|
|
};
|
|
let frame = encode_object_frame(&object.meta, &object.data)?;
|
|
group_writer.write_frame(Bytes::from(frame));
|
|
group_writer.close();
|
|
Ok(())
|
|
}
|
|
|
|
pub fn create_side_track(&mut self, track_name: impl Into<String>) -> Result<MoqSidePublisher> {
|
|
let track_name = track_name.into();
|
|
let track = self.broadcast.create_track(Track {
|
|
name: track_name.clone(),
|
|
priority: 0,
|
|
});
|
|
Ok(MoqSidePublisher { track_name, track })
|
|
}
|
|
|
|
pub fn create_manifest_track(
|
|
&mut self,
|
|
track_name: impl Into<String>,
|
|
) -> Result<MoqManifestPublisher> {
|
|
let track_name = track_name.into();
|
|
let track = self.broadcast.create_track(Track {
|
|
name: track_name.clone(),
|
|
priority: 0,
|
|
});
|
|
Ok(MoqManifestPublisher { track_name, track })
|
|
}
|
|
|
|
pub fn broadcast_name(&self) -> &str {
|
|
&self.broadcast_name
|
|
}
|
|
|
|
pub fn track_name(&self) -> &str {
|
|
&self.track_name
|
|
}
|
|
}
|
|
|
|
pub struct MoqSidePublisher {
|
|
track_name: String,
|
|
track: moq_lite::TrackProducer,
|
|
}
|
|
|
|
impl MoqSidePublisher {
|
|
pub fn publish_object(&mut self, group: GroupId, object: ObjectPayload) -> Result<()> {
|
|
let Some(mut group_writer) = self.track.create_group(Group { sequence: group.0 }) else {
|
|
return Err(anyhow!("group {} already published", group.0));
|
|
};
|
|
let frame = encode_object_frame(&object.meta, &object.data)?;
|
|
group_writer.write_frame(Bytes::from(frame));
|
|
group_writer.close();
|
|
Ok(())
|
|
}
|
|
|
|
pub fn track_name(&self) -> &str {
|
|
&self.track_name
|
|
}
|
|
}
|
|
|
|
pub struct MoqManifestPublisher {
|
|
track_name: String,
|
|
track: moq_lite::TrackProducer,
|
|
}
|
|
|
|
impl MoqManifestPublisher {
|
|
pub fn publish_manifest(&mut self, sequence: u64, manifest: &Manifest) -> Result<()> {
|
|
let Some(mut group_writer) = self.track.create_group(Group { sequence }) else {
|
|
return Err(anyhow!("manifest group {} already published", sequence));
|
|
};
|
|
let frame = encode_manifest_frame(manifest)?;
|
|
group_writer.write_frame(Bytes::from(frame));
|
|
group_writer.close();
|
|
Ok(())
|
|
}
|
|
|
|
pub fn track_name(&self) -> &str {
|
|
&self.track_name
|
|
}
|
|
}
|
|
|
|
pub struct MoqObjectStream {
|
|
receiver: mpsc::Receiver<ObjectPayload>,
|
|
_task: JoinHandle<()>,
|
|
_session: iroh_moq::MoqSession,
|
|
}
|
|
|
|
impl MoqObjectStream {
|
|
fn spawn(session: iroh_moq::MoqSession, mut track: moq_lite::TrackConsumer) -> Result<Self> {
|
|
let (tx, rx) = mpsc::channel(32);
|
|
let task = tokio::spawn(async move {
|
|
loop {
|
|
let next_group = track.next_group().await;
|
|
let Some(mut group) = (match next_group {
|
|
Ok(group) => group,
|
|
Err(err) => {
|
|
tracing::warn!("moq track error: {err:#}");
|
|
break;
|
|
}
|
|
}) else {
|
|
break;
|
|
};
|
|
|
|
let mut buffer = Vec::new();
|
|
loop {
|
|
match group.read_frame().await {
|
|
Ok(Some(frame)) => buffer.extend_from_slice(&frame),
|
|
Ok(None) => break,
|
|
Err(err) => {
|
|
tracing::warn!("moq group error: {err:#}");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if buffer.is_empty() {
|
|
continue;
|
|
}
|
|
match decode_object_frame(&buffer) {
|
|
Ok(object) => {
|
|
if tx.send(object).await.is_err() {
|
|
break;
|
|
}
|
|
}
|
|
Err(err) => {
|
|
tracing::warn!("failed to decode object frame: {err:#}");
|
|
}
|
|
}
|
|
}
|
|
});
|
|
Ok(Self {
|
|
receiver: rx,
|
|
_task: task,
|
|
_session: session,
|
|
})
|
|
}
|
|
|
|
pub async fn recv(&mut self) -> Option<ObjectPayload> {
|
|
self.receiver.recv().await
|
|
}
|
|
}
|
|
|
|
pub struct MoqManifestStream {
|
|
receiver: mpsc::Receiver<Manifest>,
|
|
_task: JoinHandle<()>,
|
|
_session: iroh_moq::MoqSession,
|
|
}
|
|
|
|
impl MoqManifestStream {
|
|
fn spawn(session: iroh_moq::MoqSession, mut track: moq_lite::TrackConsumer) -> Result<Self> {
|
|
let (tx, rx) = mpsc::channel(8);
|
|
let task = tokio::spawn(async move {
|
|
loop {
|
|
let next_group = track.next_group().await;
|
|
let Some(mut group) = (match next_group {
|
|
Ok(group) => group,
|
|
Err(err) => {
|
|
tracing::warn!("moq manifest track error: {err:#}");
|
|
break;
|
|
}
|
|
}) else {
|
|
break;
|
|
};
|
|
|
|
let mut buffer = Vec::new();
|
|
loop {
|
|
match group.read_frame().await {
|
|
Ok(Some(frame)) => buffer.extend_from_slice(&frame),
|
|
Ok(None) => break,
|
|
Err(err) => {
|
|
tracing::warn!("moq manifest group error: {err:#}");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if buffer.is_empty() {
|
|
continue;
|
|
}
|
|
match decode_manifest_frame(&buffer) {
|
|
Ok(manifest) => {
|
|
if tx.send(manifest).await.is_err() {
|
|
break;
|
|
}
|
|
}
|
|
Err(err) => {
|
|
tracing::warn!("failed to decode manifest frame: {err:#}");
|
|
}
|
|
}
|
|
}
|
|
});
|
|
Ok(Self {
|
|
receiver: rx,
|
|
_task: task,
|
|
_session: session,
|
|
})
|
|
}
|
|
|
|
pub async fn recv(&mut self) -> Option<Manifest> {
|
|
self.receiver.recv().await
|
|
}
|
|
}
|
|
|
|
fn subscribe_track(broadcast: &BroadcastConsumer, name: &str) -> Result<moq_lite::TrackConsumer> {
|
|
let track = broadcast.subscribe_track(&Track::new(name));
|
|
Ok(track)
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct HlsWriter {
|
|
output_dir: PathBuf,
|
|
window: usize,
|
|
target_duration: f64,
|
|
init_filename: String,
|
|
segments: std::collections::VecDeque<HlsSegment>,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct HlsSegment {
|
|
index: u64,
|
|
duration: f64,
|
|
filename: String,
|
|
}
|
|
|
|
impl HlsWriter {
|
|
pub fn new_cmaf(
|
|
output_dir: impl Into<PathBuf>,
|
|
target_duration: f64,
|
|
window: usize,
|
|
) -> Result<Self> {
|
|
// CMAF-only writer: init.mp4 + segment_*.m4s + HLS playlist as a local compatibility artifact.
|
|
let output_dir = output_dir.into();
|
|
fs::create_dir_all(&output_dir)
|
|
.with_context(|| format!("failed to create {}", output_dir.display()))?;
|
|
Ok(Self {
|
|
output_dir,
|
|
window: window.max(1),
|
|
target_duration,
|
|
init_filename: "init.mp4".to_string(),
|
|
segments: std::collections::VecDeque::new(),
|
|
})
|
|
}
|
|
|
|
pub fn write_init_segment(&mut self, data: &[u8]) -> Result<PathBuf> {
|
|
let path = self.output_dir.join(&self.init_filename);
|
|
fs::write(&path, data).with_context(|| format!("failed to write {}", path.display()))?;
|
|
self.write_playlist()?;
|
|
Ok(path)
|
|
}
|
|
|
|
pub fn write_segment(&mut self, index: u64, duration: f64, data: &[u8]) -> Result<PathBuf> {
|
|
let filename = format!("segment_{index:06}.m4s");
|
|
let path = self.output_dir.join(&filename);
|
|
fs::write(&path, data).with_context(|| format!("failed to write {}", path.display()))?;
|
|
|
|
self.segments.push_back(HlsSegment {
|
|
index,
|
|
duration,
|
|
filename,
|
|
});
|
|
while self.segments.len() > self.window {
|
|
self.segments.pop_front();
|
|
}
|
|
|
|
self.write_playlist()?;
|
|
Ok(path)
|
|
}
|
|
|
|
fn write_playlist(&self) -> Result<()> {
|
|
let mut lines = Vec::new();
|
|
lines.push("#EXTM3U".to_string());
|
|
lines.push("#EXT-X-VERSION:7".to_string());
|
|
lines.push("#EXT-X-INDEPENDENT-SEGMENTS".to_string());
|
|
lines.push(format!("#EXT-X-MAP:URI=\"{}\"", self.init_filename));
|
|
let target = self.target_duration.ceil().max(1.0) as u64;
|
|
lines.push(format!("#EXT-X-TARGETDURATION:{target}"));
|
|
if let Some(first) = self.segments.front() {
|
|
lines.push(format!("#EXT-X-MEDIA-SEQUENCE:{}", first.index));
|
|
}
|
|
for seg in &self.segments {
|
|
lines.push(format!("#EXTINF:{:.3},", seg.duration));
|
|
lines.push(seg.filename.clone());
|
|
}
|
|
let playlist_path = self.output_dir.join("index.m3u8");
|
|
fs::write(&playlist_path, lines.join("\n") + "\n")
|
|
.with_context(|| format!("failed to write {}", playlist_path.display()))?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
pub fn chunk_duration_secs(meta: &ObjectMeta, fallback: Duration) -> f64 {
|
|
if let Some(timing) = &meta.timing {
|
|
let secs = timing.chunk_duration_27mhz as f64 / 27_000_000.0;
|
|
if secs > 0.0 {
|
|
return secs;
|
|
}
|
|
}
|
|
fallback.as_secs_f64()
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use std::env;
|
|
|
|
#[test]
|
|
fn sanitize_component_is_stable() {
|
|
assert_eq!(sanitize_component("Hello World!"), "hello_world_");
|
|
assert_eq!(sanitize_component("a-b_C9"), "a-b_c9");
|
|
}
|
|
|
|
#[test]
|
|
fn object_frame_roundtrip() {
|
|
let meta = ObjectMeta {
|
|
created_unix_ms: 1,
|
|
content_type: "application/octet-stream".to_string(),
|
|
size_bytes: 3,
|
|
timing: Some(TimingMeta {
|
|
chunk_index: 7,
|
|
chunk_start_27mhz: 0,
|
|
chunk_duration_27mhz: 54_000_000,
|
|
utc_start_unix: None,
|
|
sync_status: "synthetic".to_string(),
|
|
}),
|
|
encryption: None,
|
|
chunk_hash: Some("00".repeat(32)),
|
|
chunk_hash_alg: Some("blake3".to_string()),
|
|
chunk_proof: Some(vec!["00".repeat(32)]),
|
|
chunk_proof_alg: Some("merkle+blake3".to_string()),
|
|
manifest_id: Some("m".to_string()),
|
|
};
|
|
let data = b"abc".to_vec();
|
|
let frame = encode_object_frame(&meta, &data).unwrap();
|
|
let decoded = decode_object_frame(&frame).unwrap();
|
|
assert_eq!(decoded.data, data);
|
|
assert_eq!(decoded.meta.created_unix_ms, meta.created_unix_ms);
|
|
assert_eq!(
|
|
decoded.meta.timing.as_ref().unwrap().chunk_index,
|
|
meta.timing.as_ref().unwrap().chunk_index
|
|
);
|
|
assert_eq!(decoded.meta.manifest_id, meta.manifest_id);
|
|
}
|
|
|
|
#[test]
|
|
fn decode_rejects_short_frame() {
|
|
assert!(decode_object_frame(&[]).is_err());
|
|
assert!(decode_object_frame(&[0, 0, 0]).is_err());
|
|
}
|
|
|
|
#[test]
|
|
fn manifest_frame_roundtrip() {
|
|
let manifest = ec_core::Manifest {
|
|
body: ec_core::ManifestBody {
|
|
stream_id: ec_core::StreamId("s".to_string()),
|
|
epoch_id: "e".to_string(),
|
|
chunk_duration_ms: 2000,
|
|
total_chunks: 1,
|
|
chunk_start_index: 0,
|
|
encoder_profile_id: "p".to_string(),
|
|
merkle_root: "00".repeat(32),
|
|
created_unix_ms: 1,
|
|
metadata: Vec::new(),
|
|
chunk_hashes: vec!["11".repeat(32)],
|
|
variants: None,
|
|
},
|
|
manifest_id: "m".to_string(),
|
|
signatures: Vec::new(),
|
|
commitments: Vec::new(),
|
|
};
|
|
let bytes = encode_manifest_frame(&manifest).unwrap();
|
|
let decoded = decode_manifest_frame(&bytes).unwrap();
|
|
assert_eq!(decoded.manifest_id, "m");
|
|
assert_eq!(decoded.body.epoch_id, "e");
|
|
}
|
|
|
|
#[test]
|
|
fn manifest_frame_signed_roundtrip_verifies() {
|
|
let prev = env::var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY").ok();
|
|
env::set_var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY", "11".repeat(32));
|
|
let keypair = ec_crypto::load_manifest_keypair_from_env()
|
|
.expect("load should not error")
|
|
.expect("keypair should exist");
|
|
|
|
let chunk_hashes = vec![blake3::hash(b"chunk0").to_hex().to_string()];
|
|
let merkle_root = ec_core::merkle_root_from_hashes(&chunk_hashes).unwrap();
|
|
let body = ec_core::ManifestBody {
|
|
stream_id: ec_core::StreamId("s".to_string()),
|
|
epoch_id: "e".to_string(),
|
|
chunk_duration_ms: 2000,
|
|
total_chunks: 1,
|
|
chunk_start_index: 0,
|
|
encoder_profile_id: "p".to_string(),
|
|
merkle_root,
|
|
created_unix_ms: 1,
|
|
metadata: Vec::new(),
|
|
chunk_hashes,
|
|
variants: None,
|
|
};
|
|
let manifest_id = body.manifest_id().unwrap();
|
|
let sig = ec_crypto::sign_manifest_id(&manifest_id, &keypair);
|
|
assert!(ec_crypto::verify_manifest_signature(&manifest_id, &sig));
|
|
|
|
let manifest = ec_core::Manifest {
|
|
body,
|
|
manifest_id: manifest_id.clone(),
|
|
signatures: vec![sig],
|
|
commitments: Vec::new(),
|
|
};
|
|
let bytes = encode_manifest_frame(&manifest).unwrap();
|
|
let decoded = decode_manifest_frame(&bytes).unwrap();
|
|
assert_eq!(decoded.manifest_id, manifest_id);
|
|
assert_eq!(decoded.signatures.len(), 1);
|
|
assert!(ec_crypto::verify_manifest_signature(
|
|
&decoded.manifest_id,
|
|
&decoded.signatures[0]
|
|
));
|
|
|
|
match prev {
|
|
Some(value) => env::set_var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY", value),
|
|
None => env::remove_var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn object_frame_encrypt_decrypt_roundtrip_and_hash_matches_plaintext() {
|
|
let stream_id = "ec/stream/v1/source/test/device-a/channel-b";
|
|
let chunk_index = 7u64;
|
|
let plaintext = b"hello every.channel";
|
|
let expected_hash = blake3::hash(plaintext).to_hex().to_string();
|
|
let enc = ec_crypto::encrypt_stream_data(stream_id, chunk_index, plaintext, None);
|
|
|
|
let meta = ObjectMeta {
|
|
created_unix_ms: 1,
|
|
content_type: "application/octet-stream".to_string(),
|
|
size_bytes: enc.ciphertext.len() as u64,
|
|
timing: Some(TimingMeta {
|
|
chunk_index,
|
|
chunk_start_27mhz: 0,
|
|
chunk_duration_27mhz: 54_000_000,
|
|
utc_start_unix: None,
|
|
sync_status: "synthetic".to_string(),
|
|
}),
|
|
encryption: Some(EncryptionMeta {
|
|
alg: enc.alg.to_string(),
|
|
key_id: stream_id.to_string(),
|
|
nonce_hex: hex::encode(enc.nonce),
|
|
}),
|
|
chunk_hash: Some(expected_hash.clone()),
|
|
chunk_hash_alg: Some("blake3".to_string()),
|
|
chunk_proof: None,
|
|
chunk_proof_alg: None,
|
|
manifest_id: None,
|
|
};
|
|
|
|
let frame = encode_object_frame(&meta, &enc.ciphertext).unwrap();
|
|
let decoded = decode_object_frame(&frame).unwrap();
|
|
let out = ec_crypto::decrypt_stream_data(stream_id, chunk_index, &decoded.data, None)
|
|
.expect("decrypt should succeed");
|
|
assert_eq!(out, plaintext);
|
|
assert_eq!(
|
|
decoded.meta.chunk_hash.as_deref(),
|
|
Some(expected_hash.as_str())
|
|
);
|
|
assert_eq!(
|
|
blake3::hash(&out).to_hex().to_string(),
|
|
decoded.meta.chunk_hash.unwrap()
|
|
);
|
|
}
|
|
}
|