every.channel/crates/ec-moq/src/lib.rs

834 lines
26 KiB
Rust

//! Media over QUIC (MoQ) scaffolding.
use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use ec_core::Manifest;
use ec_iroh::DiscoveryConfig;
use iroh::{protocol::Router, Endpoint, EndpointAddr, SecretKey};
use moq_lite::{BroadcastConsumer, BroadcastProducer, Group, Track};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TrackName {
pub namespace: String,
pub name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GroupId(pub u64);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObjectId(pub u64);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObjectMeta {
pub created_unix_ms: u64,
pub content_type: String,
pub size_bytes: u64,
pub timing: Option<TimingMeta>,
pub encryption: Option<EncryptionMeta>,
pub chunk_hash: Option<String>,
pub chunk_hash_alg: Option<String>,
pub chunk_proof: Option<Vec<String>>,
pub chunk_proof_alg: Option<String>,
pub manifest_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObjectPayload {
pub meta: ObjectMeta,
pub data: Vec<u8>,
}
pub const DEFAULT_TRACK_NAME: &str = "chunks";
pub const DEFAULT_MANIFEST_TRACK_NAME: &str = "manifests";
pub trait Publisher {
fn publish_object(
&self,
track: &TrackName,
group: GroupId,
object: ObjectPayload,
) -> Result<()>;
}
pub trait Subscriber {
fn subscribe_track(&self, track: &TrackName) -> Result<()>;
}
pub trait Relay {
fn announce_track(&self, track: &TrackName) -> Result<()>;
fn cache_object(&self, track: &TrackName, group: GroupId, object: ObjectPayload) -> Result<()>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimingMeta {
pub chunk_index: u64,
pub chunk_start_27mhz: u64,
pub chunk_duration_27mhz: u64,
pub utc_start_unix: Option<i64>,
pub sync_status: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EncryptionMeta {
pub alg: String,
pub key_id: String,
pub nonce_hex: String,
}
#[derive(Debug, Clone)]
pub struct FileRelay {
root: PathBuf,
}
impl FileRelay {
pub fn new(root: impl Into<PathBuf>) -> Self {
Self { root: root.into() }
}
pub fn write_object(
&self,
track: &TrackName,
group: GroupId,
object_id: ObjectId,
object: &ObjectPayload,
) -> Result<()> {
let base = self.object_dir(track, group, object_id);
fs::create_dir_all(&base)
.with_context(|| format!("failed to create {}", base.display()))?;
let data_path = base.join("data.bin");
let meta_path = base.join("meta.json");
fs::write(&data_path, &object.data)
.with_context(|| format!("failed to write {}", data_path.display()))?;
fs::write(&meta_path, serde_json::to_vec_pretty(&object.meta)?)
.with_context(|| format!("failed to write {}", meta_path.display()))?;
Ok(())
}
fn object_dir(&self, track: &TrackName, group: GroupId, object_id: ObjectId) -> PathBuf {
let namespace = sanitize_component(&track.namespace);
let name = sanitize_component(&track.name);
self.root
.join(namespace)
.join(name)
.join(format!("group-{}", group.0))
.join(format!("object-{}", object_id.0))
}
}
impl Relay for FileRelay {
fn announce_track(&self, _track: &TrackName) -> Result<()> {
Ok(())
}
fn cache_object(&self, track: &TrackName, group: GroupId, object: ObjectPayload) -> Result<()> {
self.write_object(track, group, ObjectId(0), &object)
}
}
fn sanitize_component(value: &str) -> String {
value
.chars()
.map(|c| match c {
'a'..='z' | '0'..='9' | '-' | '_' => c,
'A'..='Z' => c.to_ascii_lowercase(),
_ => '_',
})
.collect()
}
pub fn encode_object_frame(meta: &ObjectMeta, data: &[u8]) -> Result<Vec<u8>> {
let meta_bytes = serde_json::to_vec(meta)?;
let meta_len = u32::try_from(meta_bytes.len()).map_err(|_| anyhow!("object meta too large"))?;
let mut out = Vec::with_capacity(4 + meta_bytes.len() + data.len());
out.extend_from_slice(&meta_len.to_be_bytes());
out.extend_from_slice(&meta_bytes);
out.extend_from_slice(data);
Ok(out)
}
pub fn decode_object_frame(bytes: &[u8]) -> Result<ObjectPayload> {
if bytes.len() < 4 {
return Err(anyhow!("object frame too short"));
}
let meta_len = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as usize;
if bytes.len() < 4 + meta_len {
return Err(anyhow!("object frame missing metadata bytes"));
}
let meta = serde_json::from_slice(&bytes[4..4 + meta_len])?;
let data = bytes[4 + meta_len..].to_vec();
Ok(ObjectPayload { meta, data })
}
pub fn encode_manifest_frame(manifest: &Manifest) -> Result<Vec<u8>> {
Ok(serde_json::to_vec(manifest)?)
}
pub fn decode_manifest_frame(bytes: &[u8]) -> Result<Manifest> {
Ok(serde_json::from_slice(bytes)?)
}
#[derive(Debug)]
pub struct MoqNode {
endpoint: Endpoint,
router: Router,
moq: iroh_moq::Moq,
}
impl MoqNode {
pub async fn bind(secret: Option<SecretKey>) -> Result<Self> {
let discovery = DiscoveryConfig::from_env()?;
Self::bind_with_discovery(secret, discovery).await
}
pub async fn bind_with_discovery(
secret: Option<SecretKey>,
discovery: DiscoveryConfig,
) -> Result<Self> {
let endpoint = ec_iroh::build_endpoint(secret, discovery).await?;
let moq = iroh_moq::Moq::new(endpoint.clone());
let router = Router::builder(endpoint.clone())
.accept(iroh_moq::ALPN, moq.protocol_handler())
.spawn();
Ok(Self {
endpoint,
router,
moq,
})
}
pub fn endpoint(&self) -> &Endpoint {
&self.endpoint
}
pub fn endpoint_addr(&self) -> EndpointAddr {
self.router.endpoint().addr()
}
pub async fn publish_objects(
&self,
broadcast_name: impl Into<String>,
track_name: impl Into<String>,
) -> Result<MoqPublisher> {
let broadcast_name = broadcast_name.into();
let track_name = track_name.into();
let mut broadcast = BroadcastProducer::default();
let track = broadcast.create_track(Track {
name: track_name.clone(),
priority: 0,
});
self.moq
.publish(broadcast_name.clone(), broadcast.clone())
.await?;
Ok(MoqPublisher {
broadcast_name,
track_name,
broadcast,
track,
})
}
/// Publish a broadcast containing multiple tracks, all created before publishing.
///
/// This avoids subtle issues in some MoQ implementations where tracks added after the
/// initial publish are not reliably deliverable to subscribers.
pub async fn publish_track_set(
&self,
broadcast_name: impl Into<String>,
object_tracks: Vec<String>,
manifest_tracks: Vec<String>,
) -> Result<MoqPublishSet> {
let broadcast_name = broadcast_name.into();
let mut broadcast = BroadcastProducer::default();
let mut object = HashMap::new();
for name in object_tracks {
let track = broadcast.create_track(Track {
name: name.clone(),
priority: 0,
});
object.insert(name, track);
}
let mut manifests = HashMap::new();
for name in manifest_tracks {
let track = broadcast.create_track(Track {
name: name.clone(),
priority: 0,
});
manifests.insert(name, track);
}
self.moq.publish(broadcast_name.clone(), broadcast).await?;
Ok(MoqPublishSet {
broadcast_name,
object,
manifests,
})
}
pub async fn subscribe_objects(
&self,
remote: EndpointAddr,
broadcast_name: impl Into<String>,
track_name: impl Into<String>,
) -> Result<MoqObjectStream> {
let broadcast_name = broadcast_name.into();
let track_name = track_name.into();
let mut session = self.moq.connect(remote).await?;
let broadcast = session.subscribe(&broadcast_name).await?;
let track = subscribe_track(&broadcast, &track_name)?;
MoqObjectStream::spawn(session, track)
}
pub async fn subscribe_manifests(
&self,
remote: EndpointAddr,
broadcast_name: impl Into<String>,
track_name: impl Into<String>,
) -> Result<MoqManifestStream> {
let broadcast_name = broadcast_name.into();
let track_name = track_name.into();
let mut session = self.moq.connect(remote).await?;
let broadcast = session.subscribe(&broadcast_name).await?;
let track = subscribe_track(&broadcast, &track_name)?;
MoqManifestStream::spawn(session, track)
}
}
pub struct MoqPublishSet {
broadcast_name: String,
object: HashMap<String, moq_lite::TrackProducer>,
manifests: HashMap<String, moq_lite::TrackProducer>,
}
impl MoqPublishSet {
pub fn publish_object(
&mut self,
track_name: &str,
group: GroupId,
object: ObjectPayload,
) -> Result<()> {
let Some(track) = self.object.get_mut(track_name) else {
return Err(anyhow!("unknown object track {}", track_name));
};
let Some(mut group_writer) = track.create_group(Group { sequence: group.0 }) else {
return Err(anyhow!("group {} already published", group.0));
};
let frame = encode_object_frame(&object.meta, &object.data)?;
group_writer.write_frame(Bytes::from(frame));
group_writer.close();
Ok(())
}
pub fn publish_manifest(
&mut self,
track_name: &str,
sequence: u64,
manifest: &Manifest,
) -> Result<()> {
let Some(track) = self.manifests.get_mut(track_name) else {
return Err(anyhow!("unknown manifest track {}", track_name));
};
let Some(mut group_writer) = track.create_group(Group { sequence }) else {
return Err(anyhow!("manifest group {} already published", sequence));
};
let frame = encode_manifest_frame(manifest)?;
group_writer.write_frame(Bytes::from(frame));
group_writer.close();
Ok(())
}
pub fn broadcast_name(&self) -> &str {
&self.broadcast_name
}
}
pub struct MoqPublisher {
broadcast_name: String,
track_name: String,
broadcast: BroadcastProducer,
track: moq_lite::TrackProducer,
}
impl MoqPublisher {
pub fn publish_object(&mut self, group: GroupId, object: ObjectPayload) -> Result<()> {
let Some(mut group_writer) = self.track.create_group(Group { sequence: group.0 }) else {
return Err(anyhow!("group {} already published", group.0));
};
let frame = encode_object_frame(&object.meta, &object.data)?;
group_writer.write_frame(Bytes::from(frame));
group_writer.close();
Ok(())
}
pub fn create_side_track(&mut self, track_name: impl Into<String>) -> Result<MoqSidePublisher> {
let track_name = track_name.into();
let track = self.broadcast.create_track(Track {
name: track_name.clone(),
priority: 0,
});
Ok(MoqSidePublisher { track_name, track })
}
pub fn create_manifest_track(
&mut self,
track_name: impl Into<String>,
) -> Result<MoqManifestPublisher> {
let track_name = track_name.into();
let track = self.broadcast.create_track(Track {
name: track_name.clone(),
priority: 0,
});
Ok(MoqManifestPublisher { track_name, track })
}
pub fn broadcast_name(&self) -> &str {
&self.broadcast_name
}
pub fn track_name(&self) -> &str {
&self.track_name
}
}
pub struct MoqSidePublisher {
track_name: String,
track: moq_lite::TrackProducer,
}
impl MoqSidePublisher {
pub fn publish_object(&mut self, group: GroupId, object: ObjectPayload) -> Result<()> {
let Some(mut group_writer) = self.track.create_group(Group { sequence: group.0 }) else {
return Err(anyhow!("group {} already published", group.0));
};
let frame = encode_object_frame(&object.meta, &object.data)?;
group_writer.write_frame(Bytes::from(frame));
group_writer.close();
Ok(())
}
pub fn track_name(&self) -> &str {
&self.track_name
}
}
pub struct MoqManifestPublisher {
track_name: String,
track: moq_lite::TrackProducer,
}
impl MoqManifestPublisher {
pub fn publish_manifest(&mut self, sequence: u64, manifest: &Manifest) -> Result<()> {
let Some(mut group_writer) = self.track.create_group(Group { sequence }) else {
return Err(anyhow!("manifest group {} already published", sequence));
};
let frame = encode_manifest_frame(manifest)?;
group_writer.write_frame(Bytes::from(frame));
group_writer.close();
Ok(())
}
pub fn track_name(&self) -> &str {
&self.track_name
}
}
pub struct MoqObjectStream {
receiver: mpsc::Receiver<ObjectPayload>,
_task: JoinHandle<()>,
_session: iroh_moq::MoqSession,
}
impl MoqObjectStream {
fn spawn(session: iroh_moq::MoqSession, mut track: moq_lite::TrackConsumer) -> Result<Self> {
let (tx, rx) = mpsc::channel(32);
let task = tokio::spawn(async move {
loop {
let next_group = track.next_group().await;
let Some(mut group) = (match next_group {
Ok(group) => group,
Err(err) => {
tracing::warn!("moq track error: {err:#}");
break;
}
}) else {
break;
};
let mut buffer = Vec::new();
loop {
match group.read_frame().await {
Ok(Some(frame)) => buffer.extend_from_slice(&frame),
Ok(None) => break,
Err(err) => {
tracing::warn!("moq group error: {err:#}");
break;
}
}
}
if buffer.is_empty() {
continue;
}
match decode_object_frame(&buffer) {
Ok(object) => {
if tx.send(object).await.is_err() {
break;
}
}
Err(err) => {
tracing::warn!("failed to decode object frame: {err:#}");
}
}
}
});
Ok(Self {
receiver: rx,
_task: task,
_session: session,
})
}
pub async fn recv(&mut self) -> Option<ObjectPayload> {
self.receiver.recv().await
}
}
pub struct MoqManifestStream {
receiver: mpsc::Receiver<Manifest>,
_task: JoinHandle<()>,
_session: iroh_moq::MoqSession,
}
impl MoqManifestStream {
fn spawn(session: iroh_moq::MoqSession, mut track: moq_lite::TrackConsumer) -> Result<Self> {
let (tx, rx) = mpsc::channel(8);
let task = tokio::spawn(async move {
loop {
let next_group = track.next_group().await;
let Some(mut group) = (match next_group {
Ok(group) => group,
Err(err) => {
tracing::warn!("moq manifest track error: {err:#}");
break;
}
}) else {
break;
};
let mut buffer = Vec::new();
loop {
match group.read_frame().await {
Ok(Some(frame)) => buffer.extend_from_slice(&frame),
Ok(None) => break,
Err(err) => {
tracing::warn!("moq manifest group error: {err:#}");
break;
}
}
}
if buffer.is_empty() {
continue;
}
match decode_manifest_frame(&buffer) {
Ok(manifest) => {
if tx.send(manifest).await.is_err() {
break;
}
}
Err(err) => {
tracing::warn!("failed to decode manifest frame: {err:#}");
}
}
}
});
Ok(Self {
receiver: rx,
_task: task,
_session: session,
})
}
pub async fn recv(&mut self) -> Option<Manifest> {
self.receiver.recv().await
}
}
fn subscribe_track(broadcast: &BroadcastConsumer, name: &str) -> Result<moq_lite::TrackConsumer> {
let track = broadcast.subscribe_track(&Track::new(name));
Ok(track)
}
#[derive(Debug, Clone)]
pub struct HlsWriter {
output_dir: PathBuf,
window: usize,
target_duration: f64,
init_filename: String,
segments: std::collections::VecDeque<HlsSegment>,
}
#[derive(Debug, Clone)]
struct HlsSegment {
index: u64,
duration: f64,
filename: String,
}
impl HlsWriter {
pub fn new_cmaf(
output_dir: impl Into<PathBuf>,
target_duration: f64,
window: usize,
) -> Result<Self> {
// CMAF-only writer: init.mp4 + segment_*.m4s + HLS playlist as a local compatibility artifact.
let output_dir = output_dir.into();
fs::create_dir_all(&output_dir)
.with_context(|| format!("failed to create {}", output_dir.display()))?;
Ok(Self {
output_dir,
window: window.max(1),
target_duration,
init_filename: "init.mp4".to_string(),
segments: std::collections::VecDeque::new(),
})
}
pub fn write_init_segment(&mut self, data: &[u8]) -> Result<PathBuf> {
let path = self.output_dir.join(&self.init_filename);
fs::write(&path, data).with_context(|| format!("failed to write {}", path.display()))?;
self.write_playlist()?;
Ok(path)
}
pub fn write_segment(&mut self, index: u64, duration: f64, data: &[u8]) -> Result<PathBuf> {
let filename = format!("segment_{index:06}.m4s");
let path = self.output_dir.join(&filename);
fs::write(&path, data).with_context(|| format!("failed to write {}", path.display()))?;
self.segments.push_back(HlsSegment {
index,
duration,
filename,
});
while self.segments.len() > self.window {
self.segments.pop_front();
}
self.write_playlist()?;
Ok(path)
}
fn write_playlist(&self) -> Result<()> {
let mut lines = Vec::new();
lines.push("#EXTM3U".to_string());
lines.push("#EXT-X-VERSION:7".to_string());
lines.push("#EXT-X-INDEPENDENT-SEGMENTS".to_string());
lines.push(format!("#EXT-X-MAP:URI=\"{}\"", self.init_filename));
let target = self.target_duration.ceil().max(1.0) as u64;
lines.push(format!("#EXT-X-TARGETDURATION:{target}"));
if let Some(first) = self.segments.front() {
lines.push(format!("#EXT-X-MEDIA-SEQUENCE:{}", first.index));
}
for seg in &self.segments {
lines.push(format!("#EXTINF:{:.3},", seg.duration));
lines.push(seg.filename.clone());
}
let playlist_path = self.output_dir.join("index.m3u8");
fs::write(&playlist_path, lines.join("\n") + "\n")
.with_context(|| format!("failed to write {}", playlist_path.display()))?;
Ok(())
}
}
pub fn chunk_duration_secs(meta: &ObjectMeta, fallback: Duration) -> f64 {
if let Some(timing) = &meta.timing {
let secs = timing.chunk_duration_27mhz as f64 / 27_000_000.0;
if secs > 0.0 {
return secs;
}
}
fallback.as_secs_f64()
}
#[cfg(test)]
mod tests {
use super::*;
use std::env;
#[test]
fn sanitize_component_is_stable() {
assert_eq!(sanitize_component("Hello World!"), "hello_world_");
assert_eq!(sanitize_component("a-b_C9"), "a-b_c9");
}
#[test]
fn object_frame_roundtrip() {
let meta = ObjectMeta {
created_unix_ms: 1,
content_type: "application/octet-stream".to_string(),
size_bytes: 3,
timing: Some(TimingMeta {
chunk_index: 7,
chunk_start_27mhz: 0,
chunk_duration_27mhz: 54_000_000,
utc_start_unix: None,
sync_status: "synthetic".to_string(),
}),
encryption: None,
chunk_hash: Some("00".repeat(32)),
chunk_hash_alg: Some("blake3".to_string()),
chunk_proof: Some(vec!["00".repeat(32)]),
chunk_proof_alg: Some("merkle+blake3".to_string()),
manifest_id: Some("m".to_string()),
};
let data = b"abc".to_vec();
let frame = encode_object_frame(&meta, &data).unwrap();
let decoded = decode_object_frame(&frame).unwrap();
assert_eq!(decoded.data, data);
assert_eq!(decoded.meta.created_unix_ms, meta.created_unix_ms);
assert_eq!(
decoded.meta.timing.as_ref().unwrap().chunk_index,
meta.timing.as_ref().unwrap().chunk_index
);
assert_eq!(decoded.meta.manifest_id, meta.manifest_id);
}
#[test]
fn decode_rejects_short_frame() {
assert!(decode_object_frame(&[]).is_err());
assert!(decode_object_frame(&[0, 0, 0]).is_err());
}
#[test]
fn manifest_frame_roundtrip() {
let manifest = ec_core::Manifest {
body: ec_core::ManifestBody {
stream_id: ec_core::StreamId("s".to_string()),
epoch_id: "e".to_string(),
chunk_duration_ms: 2000,
total_chunks: 1,
chunk_start_index: 0,
encoder_profile_id: "p".to_string(),
merkle_root: "00".repeat(32),
created_unix_ms: 1,
metadata: Vec::new(),
chunk_hashes: vec!["11".repeat(32)],
variants: None,
},
manifest_id: "m".to_string(),
signatures: Vec::new(),
commitments: Vec::new(),
};
let bytes = encode_manifest_frame(&manifest).unwrap();
let decoded = decode_manifest_frame(&bytes).unwrap();
assert_eq!(decoded.manifest_id, "m");
assert_eq!(decoded.body.epoch_id, "e");
}
#[test]
fn manifest_frame_signed_roundtrip_verifies() {
let prev = env::var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY").ok();
env::set_var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY", "11".repeat(32));
let keypair = ec_crypto::load_manifest_keypair_from_env()
.expect("load should not error")
.expect("keypair should exist");
let chunk_hashes = vec![blake3::hash(b"chunk0").to_hex().to_string()];
let merkle_root = ec_core::merkle_root_from_hashes(&chunk_hashes).unwrap();
let body = ec_core::ManifestBody {
stream_id: ec_core::StreamId("s".to_string()),
epoch_id: "e".to_string(),
chunk_duration_ms: 2000,
total_chunks: 1,
chunk_start_index: 0,
encoder_profile_id: "p".to_string(),
merkle_root,
created_unix_ms: 1,
metadata: Vec::new(),
chunk_hashes,
variants: None,
};
let manifest_id = body.manifest_id().unwrap();
let sig = ec_crypto::sign_manifest_id(&manifest_id, &keypair);
assert!(ec_crypto::verify_manifest_signature(&manifest_id, &sig));
let manifest = ec_core::Manifest {
body,
manifest_id: manifest_id.clone(),
signatures: vec![sig],
commitments: Vec::new(),
};
let bytes = encode_manifest_frame(&manifest).unwrap();
let decoded = decode_manifest_frame(&bytes).unwrap();
assert_eq!(decoded.manifest_id, manifest_id);
assert_eq!(decoded.signatures.len(), 1);
assert!(ec_crypto::verify_manifest_signature(
&decoded.manifest_id,
&decoded.signatures[0]
));
match prev {
Some(value) => env::set_var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY", value),
None => env::remove_var("EVERY_CHANNEL_MANIFEST_SIGNING_KEY"),
}
}
#[test]
fn object_frame_encrypt_decrypt_roundtrip_and_hash_matches_plaintext() {
let stream_id = "ec/stream/v1/source/test/device-a/channel-b";
let chunk_index = 7u64;
let plaintext = b"hello every.channel";
let expected_hash = blake3::hash(plaintext).to_hex().to_string();
let enc = ec_crypto::encrypt_stream_data(stream_id, chunk_index, plaintext, None);
let meta = ObjectMeta {
created_unix_ms: 1,
content_type: "application/octet-stream".to_string(),
size_bytes: enc.ciphertext.len() as u64,
timing: Some(TimingMeta {
chunk_index,
chunk_start_27mhz: 0,
chunk_duration_27mhz: 54_000_000,
utc_start_unix: None,
sync_status: "synthetic".to_string(),
}),
encryption: Some(EncryptionMeta {
alg: enc.alg.to_string(),
key_id: stream_id.to_string(),
nonce_hex: hex::encode(enc.nonce),
}),
chunk_hash: Some(expected_hash.clone()),
chunk_hash_alg: Some("blake3".to_string()),
chunk_proof: None,
chunk_proof_alg: None,
manifest_id: None,
};
let frame = encode_object_frame(&meta, &enc.ciphertext).unwrap();
let decoded = decode_object_frame(&frame).unwrap();
let out = ec_crypto::decrypt_stream_data(stream_id, chunk_index, &decoded.data, None)
.expect("decrypt should succeed");
assert_eq!(out, plaintext);
assert_eq!(
decoded.meta.chunk_hash.as_deref(),
Some(expected_hash.as_str())
);
assert_eq!(
blake3::hash(&out).to_hex().to_string(),
decoded.meta.chunk_hash.unwrap()
);
}
}