every.channel/crates/ec-ts/src/lib.rs

2078 lines
67 KiB
Rust

//! Minimal MPEG-TS parsing for timing and table extraction.
use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use serde_big_array::BigArray;
use std::collections::HashMap;
use std::io::Read;
pub const TS_PACKET_SIZE: usize = 188;
pub const PID_PAT: u16 = 0x0000;
pub const PID_ATSC_PSIP: u16 = 0x1FFB;
pub const PID_DVB_TDT_TOT: u16 = 0x0014;
const SYNC_BYTE: u8 = 0x47;
const TABLE_ID_PAT: u8 = 0x00;
const TABLE_ID_ATSC_MGT: u8 = 0xC7;
const TABLE_ID_ATSC_TVCT: u8 = 0xC8;
const TABLE_ID_ATSC_CVCT: u8 = 0xC9;
const TABLE_ID_ATSC_RRT: u8 = 0xCA;
const TABLE_ID_ATSC_EIT: u8 = 0xCB;
const TABLE_ID_ATSC_ETT: u8 = 0xCC;
const TABLE_ID_ATSC_STT: u8 = 0xCD;
const TABLE_ID_DVB_TDT: u8 = 0x70;
const TABLE_ID_DVB_TOT: u8 = 0x73;
const GPS_EPOCH_TO_UNIX: i64 = 315964800;
const MJD_UNIX_EPOCH: i64 = 40587;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TsPacket {
#[serde(with = "BigArray")]
data: [u8; TS_PACKET_SIZE],
pub pid: u16,
pub payload_unit_start: bool,
pub continuity_counter: u8,
pub discontinuity: bool,
pub pcr_27mhz: Option<u64>,
payload_offset: usize,
payload_len: usize,
}
impl TsPacket {
pub fn payload(&self) -> &[u8] {
&self.data[self.payload_offset..self.payload_offset + self.payload_len]
}
pub fn as_bytes(&self) -> &[u8; TS_PACKET_SIZE] {
&self.data
}
}
pub struct TsReader<R> {
reader: R,
}
impl<R: Read> TsReader<R> {
pub fn new(reader: R) -> Self {
Self { reader }
}
pub fn read_packet(&mut self) -> Result<Option<TsPacket>> {
let mut data = [0u8; TS_PACKET_SIZE];
let mut read = 0usize;
while read < TS_PACKET_SIZE {
let n = self.reader.read(&mut data[read..])?;
if n == 0 {
if read == 0 {
return Ok(None);
}
return Err(anyhow!("truncated TS packet"));
}
read += n;
}
let packet = parse_packet(data)?;
Ok(Some(packet))
}
}
pub fn parse_packet(data: [u8; TS_PACKET_SIZE]) -> Result<TsPacket> {
if data[0] != SYNC_BYTE {
return Err(anyhow!("missing sync byte"));
}
let payload_unit_start = (data[1] & 0x40) != 0;
let pid = ((data[1] as u16 & 0x1F) << 8) | data[2] as u16;
let continuity_counter = data[3] & 0x0F;
let adaptation_control = (data[3] >> 4) & 0x03;
let has_adaptation = adaptation_control == 2 || adaptation_control == 3;
let has_payload = adaptation_control == 1 || adaptation_control == 3;
let mut offset = 4usize;
let mut discontinuity = false;
let mut pcr_27mhz = None;
if has_adaptation {
let length = data[offset] as usize;
offset += 1;
if offset + length > TS_PACKET_SIZE {
return Err(anyhow!("invalid adaptation field length"));
}
if length > 0 {
let flags = data[offset];
discontinuity = (flags & 0x80) != 0;
let pcr_flag = (flags & 0x10) != 0;
if pcr_flag && length >= 7 {
let pcr_bytes = &data[offset + 1..offset + 7];
pcr_27mhz = Some(parse_pcr_27mhz(pcr_bytes));
}
}
offset += length;
}
let payload_len = if has_payload && offset <= TS_PACKET_SIZE {
TS_PACKET_SIZE - offset
} else {
0
};
Ok(TsPacket {
data,
pid,
payload_unit_start,
continuity_counter,
discontinuity,
pcr_27mhz,
payload_offset: offset,
payload_len,
})
}
fn parse_pcr_27mhz(data: &[u8]) -> u64 {
let base = ((data[0] as u64) << 25)
| ((data[1] as u64) << 17)
| ((data[2] as u64) << 9)
| ((data[3] as u64) << 1)
| ((data[4] as u64) >> 7);
let ext = (((data[4] as u64) & 0x01) << 8) | data[5] as u64;
base * 300 + ext
}
pub fn parse_pts_90khz(packet: &TsPacket) -> Option<u64> {
if !packet.payload_unit_start {
return None;
}
let payload = packet.payload();
if payload.len() < 14 {
return None;
}
if payload[0] != 0 || payload[1] != 0 || payload[2] != 1 {
return None;
}
let flags = payload[7];
let pts_dts_flags = (flags >> 6) & 0x03;
if pts_dts_flags == 0 {
return None;
}
let header_length = payload[8] as usize;
let pts_start = 9usize;
if header_length < 5 || payload.len() < pts_start + 5 {
return None;
}
let b = &payload[pts_start..pts_start + 5];
let pts = ((b[0] as u64 & 0x0E) << 29)
| ((b[1] as u64) << 22)
| ((b[2] as u64 & 0xFE) << 14)
| ((b[3] as u64) << 7)
| ((b[4] as u64) >> 1);
Some(pts)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Section {
pub pid: u16,
pub table_id: u8,
pub data: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ProgramAssociationTable {
pub transport_stream_id: u16,
pub program_numbers: Vec<u16>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TransportStreamIdentity {
pub standard: Option<String>,
pub transport_stream_id: u16,
pub program_number: Option<u16>,
pub major_channel_number: Option<u16>,
pub minor_channel_number: Option<u16>,
pub short_name: Option<String>,
pub source_id: Option<u16>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MasterGuideTable {
pub protocol_version: u8,
pub tables_defined: Vec<MasterGuideTableEntry>,
pub descriptors: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MasterGuideTableEntry {
pub table_type: u16,
pub table_type_pid: u16,
pub table_type_version_number: u8,
pub number_bytes: u32,
pub descriptors: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct VirtualChannelTable {
pub table_id: u8,
pub transport_stream_id: u16,
pub protocol_version: u8,
pub channels: Vec<VirtualChannel>,
pub additional_descriptors: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct VirtualChannel {
pub short_name: String,
pub major_channel_number: u16,
pub minor_channel_number: u16,
pub modulation_mode: u8,
pub carrier_frequency: u32,
pub channel_tsid: u16,
pub program_number: u16,
pub etm_location: u8,
pub access_controlled: bool,
pub hidden: bool,
pub hide_guide: bool,
pub service_type: u8,
pub source_id: u16,
pub descriptors: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MultipleStringStructure {
pub strings: Vec<LocalizedString>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct LocalizedString {
pub language_code: String,
pub segments: Vec<MultipleStringSegment>,
pub decoded: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct MultipleStringSegment {
pub compression_type: u8,
pub mode: u8,
pub bytes: Vec<u8>,
pub decoded: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct EventInformationTable {
pub source_id: u16,
pub protocol_version: u8,
pub section_number: u8,
pub last_section_number: u8,
pub events: Vec<EventInformation>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct EventInformation {
pub event_id: u16,
pub start_time_gps_seconds: u32,
pub etm_location: u8,
pub length_in_seconds: u32,
pub title: MultipleStringStructure,
pub descriptors: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ExtendedTextTable {
pub table_id_extension: u16,
pub protocol_version: u8,
pub etm_id: u32,
pub message: MultipleStringStructure,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RatingRegionTable {
pub rating_region: u8,
pub protocol_version: u8,
pub rating_region_name: MultipleStringStructure,
pub dimensions: Vec<RatingDimension>,
pub descriptors: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RatingDimension {
pub dimension_name: MultipleStringStructure,
pub graduated_scale: bool,
pub values: Vec<RatingValue>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct RatingValue {
pub abbrev_rating_value: MultipleStringStructure,
pub rating_value: MultipleStringStructure,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct SystemTimeTable {
pub protocol_version: u8,
pub system_time_gps_seconds: u32,
pub gps_utc_offset: u8,
pub daylight_saving_status: bool,
pub daylight_saving_day_of_month: u8,
pub daylight_saving_hour: u8,
pub descriptors: Vec<u8>,
pub unix_seconds: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum PsipTable {
MasterGuide(MasterGuideTable),
VirtualChannel(VirtualChannelTable),
RatingRegion(RatingRegionTable),
EventInformation(EventInformationTable),
ExtendedText(ExtendedTextTable),
SystemTime(SystemTimeTable),
}
#[derive(Debug, Default)]
pub struct SectionAssembler {
buffers: HashMap<u16, SectionBuffer>,
}
#[derive(Debug)]
struct SectionBuffer {
expected_len: usize,
data: Vec<u8>,
}
impl SectionAssembler {
pub fn push_packet(&mut self, packet: &TsPacket) -> Vec<Section> {
let mut sections = Vec::new();
let payload = packet.payload();
if payload.is_empty() {
return sections;
}
if packet.payload_unit_start {
let pointer = payload[0] as usize;
if pointer + 1 > payload.len() {
return sections;
}
let mut idx = 1 + pointer;
while idx + 3 <= payload.len() {
let table_id = payload[idx];
let section_length =
(((payload[idx + 1] & 0x0F) as usize) << 8) | payload[idx + 2] as usize;
let total_len = 3 + section_length;
if idx + total_len <= payload.len() {
let data = payload[idx..idx + total_len].to_vec();
sections.push(Section {
pid: packet.pid,
table_id,
data,
});
idx += total_len;
} else {
let data = payload[idx..].to_vec();
self.buffers.insert(
packet.pid,
SectionBuffer {
expected_len: total_len,
data,
},
);
break;
}
}
} else if let Some(buffer) = self.buffers.get_mut(&packet.pid) {
buffer.data.extend_from_slice(payload);
if buffer.data.len() >= buffer.expected_len {
let data = buffer.data[..buffer.expected_len].to_vec();
let table_id = data[0];
sections.push(Section {
pid: packet.pid,
table_id,
data,
});
self.buffers.remove(&packet.pid);
}
}
sections
}
}
#[derive(Debug)]
struct BitReader<'a> {
data: &'a [u8],
bit_pos: usize,
}
impl<'a> BitReader<'a> {
fn new(data: &'a [u8]) -> Self {
Self { data, bit_pos: 0 }
}
fn remaining_bits(&self) -> usize {
(self.data.len() * 8).saturating_sub(self.bit_pos)
}
fn read_bits(&mut self, count: usize) -> Option<u64> {
if count == 0 || count > 64 || self.remaining_bits() < count {
return None;
}
let mut value = 0u64;
for _ in 0..count {
let byte_index = self.bit_pos / 8;
let bit_index = 7 - (self.bit_pos % 8);
let bit = (self.data[byte_index] >> bit_index) & 1;
value = (value << 1) | bit as u64;
self.bit_pos += 1;
}
Some(value)
}
fn skip_bits(&mut self, count: usize) -> Option<()> {
if self.remaining_bits() < count {
return None;
}
self.bit_pos += count;
Some(())
}
fn read_bytes(&mut self, count: usize) -> Option<Vec<u8>> {
let mut bytes = Vec::with_capacity(count);
for _ in 0..count {
bytes.push(self.read_bits(8)? as u8);
}
Some(bytes)
}
}
pub fn parse_pat_section(section: &Section) -> Option<ProgramAssociationTable> {
if section.pid != PID_PAT || section.table_id != TABLE_ID_PAT {
return None;
}
if section.data.len() < 8 {
return None;
}
let section_length = (((section.data[1] & 0x0F) as usize) << 8) | section.data[2] as usize;
let total_len = 3 + section_length;
if total_len > section.data.len() || section_length < 9 {
return None;
}
let transport_stream_id = u16::from_be_bytes([section.data[3], section.data[4]]);
let mut idx = 8usize;
let end = total_len.saturating_sub(4); // Exclude CRC32.
let mut program_numbers = Vec::new();
while idx + 4 <= end {
let program_number = u16::from_be_bytes([section.data[idx], section.data[idx + 1]]);
if program_number != 0 {
program_numbers.push(program_number);
}
idx += 4;
}
Some(ProgramAssociationTable {
transport_stream_id,
program_numbers,
})
}
fn psip_section_body(section: &Section) -> Option<(u16, &[u8])> {
if section.data.len() < 12 {
return None;
}
let section_length = (((section.data[1] & 0x0F) as usize) << 8) | section.data[2] as usize;
let total_len = 3 + section_length;
if total_len > section.data.len() || section_length < 9 {
return None;
}
let table_id_extension = u16::from_be_bytes([section.data[3], section.data[4]]);
Some((table_id_extension, &section.data[8..total_len - 4]))
}
pub fn parse_psip_table(section: &Section) -> Option<PsipTable> {
match section.table_id {
TABLE_ID_ATSC_MGT if section.pid == PID_ATSC_PSIP => {
parse_mgt_section(section).map(PsipTable::MasterGuide)
}
TABLE_ID_ATSC_TVCT | TABLE_ID_ATSC_CVCT if section.pid == PID_ATSC_PSIP => {
parse_vct_section(section).map(PsipTable::VirtualChannel)
}
TABLE_ID_ATSC_RRT if section.pid == PID_ATSC_PSIP => {
parse_rrt_section(section).map(PsipTable::RatingRegion)
}
TABLE_ID_ATSC_EIT => parse_eit_section(section).map(PsipTable::EventInformation),
TABLE_ID_ATSC_ETT => parse_ett_section(section).map(PsipTable::ExtendedText),
TABLE_ID_ATSC_STT if section.pid == PID_ATSC_PSIP => {
parse_stt_section(section).map(PsipTable::SystemTime)
}
_ => None,
}
}
pub fn parse_mgt_section(section: &Section) -> Option<MasterGuideTable> {
if section.pid != PID_ATSC_PSIP || section.table_id != TABLE_ID_ATSC_MGT {
return None;
}
let (_, body) = psip_section_body(section)?;
let mut bits = BitReader::new(body);
let protocol_version = bits.read_bits(8)? as u8;
let tables_defined = bits.read_bits(16)? as usize;
let mut entries = Vec::with_capacity(tables_defined);
for _ in 0..tables_defined {
let table_type = bits.read_bits(16)? as u16;
bits.skip_bits(3)?;
let table_type_pid = bits.read_bits(13)? as u16;
bits.skip_bits(3)?;
let table_type_version_number = bits.read_bits(5)? as u8;
let number_bytes = bits.read_bits(32)? as u32;
bits.skip_bits(4)?;
let descriptors_length = bits.read_bits(12)? as usize;
let descriptors = bits.read_bytes(descriptors_length)?;
entries.push(MasterGuideTableEntry {
table_type,
table_type_pid,
table_type_version_number,
number_bytes,
descriptors,
});
}
bits.skip_bits(4)?;
let descriptors_length = bits.read_bits(12)? as usize;
let descriptors = bits.read_bytes(descriptors_length)?;
Some(MasterGuideTable {
protocol_version,
tables_defined: entries,
descriptors,
})
}
pub fn parse_vct_section(section: &Section) -> Option<VirtualChannelTable> {
if section.pid != PID_ATSC_PSIP
|| (section.table_id != TABLE_ID_ATSC_TVCT && section.table_id != TABLE_ID_ATSC_CVCT)
{
return None;
}
let (transport_stream_id, body) = psip_section_body(section)?;
let mut bits = BitReader::new(body);
let protocol_version = bits.read_bits(8)? as u8;
let channel_count = bits.read_bits(8)? as usize;
let mut channels = Vec::with_capacity(channel_count);
for _ in 0..channel_count {
let mut short_name_words = [0u16; 7];
for word in &mut short_name_words {
*word = bits.read_bits(16)? as u16;
}
bits.skip_bits(4)?;
let major_channel_number = bits.read_bits(10)? as u16;
let minor_channel_number = bits.read_bits(10)? as u16;
let modulation_mode = bits.read_bits(8)? as u8;
let carrier_frequency = bits.read_bits(32)? as u32;
let channel_tsid = bits.read_bits(16)? as u16;
let program_number = bits.read_bits(16)? as u16;
let etm_location = bits.read_bits(2)? as u8;
let access_controlled = bits.read_bits(1)? != 0;
let hidden = bits.read_bits(1)? != 0;
bits.skip_bits(2)?;
let hide_guide = bits.read_bits(1)? != 0;
bits.skip_bits(3)?;
let service_type = bits.read_bits(6)? as u8;
let source_id = bits.read_bits(16)? as u16;
bits.skip_bits(6)?;
let descriptors_length = bits.read_bits(10)? as usize;
let descriptors = bits.read_bytes(descriptors_length)?;
channels.push(VirtualChannel {
short_name: decode_atsc_short_name(&short_name_words),
major_channel_number,
minor_channel_number,
modulation_mode,
carrier_frequency,
channel_tsid,
program_number,
etm_location,
access_controlled,
hidden,
hide_guide,
service_type,
source_id,
descriptors,
});
}
bits.skip_bits(6)?;
let additional_descriptors_length = bits.read_bits(10)? as usize;
let additional_descriptors = bits.read_bytes(additional_descriptors_length)?;
Some(VirtualChannelTable {
table_id: section.table_id,
transport_stream_id,
protocol_version,
channels,
additional_descriptors,
})
}
fn decode_atsc_short_name(words: &[u16; 7]) -> String {
let mut text = String::new();
for word in words {
if *word == 0 {
continue;
}
if let Some(ch) = char::from_u32(*word as u32) {
text.push(ch);
}
}
text.trim().to_string()
}
pub fn parse_multiple_string_structure(data: &[u8]) -> Option<MultipleStringStructure> {
if data.is_empty() {
return None;
}
let number_strings = data[0] as usize;
let mut idx = 1usize;
let mut strings = Vec::with_capacity(number_strings);
for _ in 0..number_strings {
if idx + 4 > data.len() {
return None;
}
let language_code = String::from_utf8_lossy(&data[idx..idx + 3]).to_string();
idx += 3;
let number_segments = data[idx] as usize;
idx += 1;
let mut segments = Vec::with_capacity(number_segments);
for _ in 0..number_segments {
if idx + 3 > data.len() {
return None;
}
let compression_type = data[idx];
let mode = data[idx + 1];
let number_bytes = data[idx + 2] as usize;
idx += 3;
if idx + number_bytes > data.len() {
return None;
}
let bytes = data[idx..idx + number_bytes].to_vec();
idx += number_bytes;
segments.push(MultipleStringSegment {
compression_type,
mode,
decoded: decode_multiple_string_segment(compression_type, mode, &bytes),
bytes,
});
}
let mut combined = String::new();
let mut fully_decoded = true;
for segment in &segments {
if let Some(text) = &segment.decoded {
combined.push_str(text);
} else {
fully_decoded = false;
}
}
strings.push(LocalizedString {
language_code,
segments,
decoded: fully_decoded.then_some(combined),
});
}
if idx != data.len() {
return None;
}
Some(MultipleStringStructure { strings })
}
fn decode_multiple_string_segment(compression_type: u8, mode: u8, bytes: &[u8]) -> Option<String> {
if compression_type != 0 {
return None;
}
match mode {
0x3F => {
if bytes.len() % 2 != 0 {
return None;
}
let words: Vec<u16> = bytes
.chunks_exact(2)
.map(|chunk| u16::from_be_bytes([chunk[0], chunk[1]]))
.collect();
String::from_utf16(&words).ok()
}
0x00..=0x06 | 0x09..=0x10 | 0x20..=0x27 | 0x30..=0x33 => {
let base = (mode as u32) << 8;
let mut text = String::new();
for byte in bytes {
let ch = char::from_u32(base | *byte as u32)?;
text.push(ch);
}
Some(text)
}
_ => None,
}
}
pub fn parse_rrt_section(section: &Section) -> Option<RatingRegionTable> {
if section.pid != PID_ATSC_PSIP || section.table_id != TABLE_ID_ATSC_RRT {
return None;
}
let (table_id_extension, body) = psip_section_body(section)?;
if body.len() < 4 {
return None;
}
let rating_region = (table_id_extension & 0x00FF) as u8;
let protocol_version = body[0];
let mut idx = 1usize;
let rating_region_name_length = body[idx] as usize;
idx += 1;
if idx + rating_region_name_length > body.len() {
return None;
}
let rating_region_name =
parse_multiple_string_structure(&body[idx..idx + rating_region_name_length])?;
idx += rating_region_name_length;
if idx >= body.len() {
return None;
}
let dimensions_defined = body[idx] as usize;
idx += 1;
let mut dimensions = Vec::with_capacity(dimensions_defined);
for _ in 0..dimensions_defined {
if idx >= body.len() {
return None;
}
let dimension_name_length = body[idx] as usize;
idx += 1;
if idx + dimension_name_length > body.len() {
return None;
}
let dimension_name =
parse_multiple_string_structure(&body[idx..idx + dimension_name_length])?;
idx += dimension_name_length;
if idx >= body.len() {
return None;
}
let flags = body[idx];
idx += 1;
let graduated_scale = (flags & 0x10) != 0;
let values_defined = (flags & 0x0F) as usize;
let mut values = Vec::with_capacity(values_defined);
for _ in 0..values_defined {
if idx >= body.len() {
return None;
}
let abbrev_rating_length = body[idx] as usize;
idx += 1;
if idx + abbrev_rating_length > body.len() {
return None;
}
let abbrev_rating_value =
parse_multiple_string_structure(&body[idx..idx + abbrev_rating_length])?;
idx += abbrev_rating_length;
if idx >= body.len() {
return None;
}
let rating_value_length = body[idx] as usize;
idx += 1;
if idx + rating_value_length > body.len() {
return None;
}
let rating_value =
parse_multiple_string_structure(&body[idx..idx + rating_value_length])?;
idx += rating_value_length;
values.push(RatingValue {
abbrev_rating_value,
rating_value,
});
}
dimensions.push(RatingDimension {
dimension_name,
graduated_scale,
values,
});
}
if idx + 2 > body.len() {
return None;
}
let descriptors_length = (u16::from_be_bytes([body[idx], body[idx + 1]]) & 0x03FF) as usize;
idx += 2;
if idx + descriptors_length > body.len() {
return None;
}
let descriptors = body[idx..idx + descriptors_length].to_vec();
idx += descriptors_length;
if idx != body.len() {
return None;
}
Some(RatingRegionTable {
rating_region,
protocol_version,
rating_region_name,
dimensions,
descriptors,
})
}
pub fn parse_eit_section(section: &Section) -> Option<EventInformationTable> {
if section.table_id != TABLE_ID_ATSC_EIT {
return None;
}
let (source_id, body) = psip_section_body(section)?;
if body.len() < 2 {
return None;
}
let protocol_version = body[0];
let num_events = body[1] as usize;
let mut idx = 2usize;
let mut events = Vec::with_capacity(num_events);
for _ in 0..num_events {
if idx + 10 > body.len() {
return None;
}
let event_id = u16::from_be_bytes([body[idx], body[idx + 1]]) & 0x3FFF;
idx += 2;
let start_time_gps_seconds =
u32::from_be_bytes([body[idx], body[idx + 1], body[idx + 2], body[idx + 3]]);
idx += 4;
let etm_and_length =
((body[idx] as u32) << 16) | ((body[idx + 1] as u32) << 8) | body[idx + 2] as u32;
idx += 3;
let etm_location = ((etm_and_length >> 20) & 0x03) as u8;
let length_in_seconds = etm_and_length & 0x000F_FFFF;
let title_length = body[idx] as usize;
idx += 1;
if idx + title_length > body.len() {
return None;
}
let title = parse_multiple_string_structure(&body[idx..idx + title_length])?;
idx += title_length;
if idx + 2 > body.len() {
return None;
}
let descriptors_length = (u16::from_be_bytes([body[idx], body[idx + 1]]) & 0x0FFF) as usize;
idx += 2;
if idx + descriptors_length > body.len() {
return None;
}
let descriptors = body[idx..idx + descriptors_length].to_vec();
idx += descriptors_length;
events.push(EventInformation {
event_id,
start_time_gps_seconds,
etm_location,
length_in_seconds,
title,
descriptors,
});
}
if idx != body.len() {
return None;
}
Some(EventInformationTable {
source_id,
protocol_version,
section_number: section.data[6],
last_section_number: section.data[7],
events,
})
}
pub fn parse_ett_section(section: &Section) -> Option<ExtendedTextTable> {
if section.table_id != TABLE_ID_ATSC_ETT {
return None;
}
let (table_id_extension, body) = psip_section_body(section)?;
if body.len() < 5 {
return None;
}
Some(ExtendedTextTable {
table_id_extension,
protocol_version: body[0],
etm_id: u32::from_be_bytes([body[1], body[2], body[3], body[4]]),
message: parse_multiple_string_structure(&body[5..])?,
})
}
pub fn parse_stt_section(section: &Section) -> Option<SystemTimeTable> {
if section.pid != PID_ATSC_PSIP || section.table_id != TABLE_ID_ATSC_STT {
return None;
}
let (_, body) = psip_section_body(section)?;
if body.len() < 8 {
return None;
}
let protocol_version = body[0];
let system_time_gps_seconds = u32::from_be_bytes([body[1], body[2], body[3], body[4]]);
let gps_utc_offset = body[5];
let daylight_saving = u16::from_be_bytes([body[6], body[7]]);
let descriptors = body[8..].to_vec();
let unix_seconds = GPS_EPOCH_TO_UNIX + system_time_gps_seconds as i64 - gps_utc_offset as i64;
Some(SystemTimeTable {
protocol_version,
system_time_gps_seconds,
gps_utc_offset,
daylight_saving_status: (daylight_saving & 0x8000) != 0,
daylight_saving_day_of_month: ((daylight_saving >> 8) & 0x1F) as u8,
daylight_saving_hour: (daylight_saving & 0x00FF) as u8,
descriptors,
unix_seconds,
})
}
pub fn probe_transport_stream_identity<R: Read>(
stream: R,
max_packets: usize,
) -> Result<Option<TransportStreamIdentity>> {
let mut reader = TsReader::new(stream);
let mut assembler = SectionAssembler::default();
let mut pat: Option<ProgramAssociationTable> = None;
let mut vct: Option<VirtualChannelTable> = None;
let mut saw_atsc = false;
let max_packets = max_packets.max(1);
for _ in 0..max_packets {
let Some(packet) = reader.read_packet()? else {
break;
};
if packet.pid == PID_ATSC_PSIP {
saw_atsc = true;
}
for section in assembler.push_packet(&packet) {
if section.pid == PID_ATSC_PSIP {
saw_atsc = true;
}
if pat.is_none() {
pat = parse_pat_section(&section);
}
if vct.is_none() {
if let Some(PsipTable::VirtualChannel(table)) = parse_psip_table(&section) {
vct = Some(table);
}
}
}
if pat.is_some() && saw_atsc && vct.is_some() {
break;
}
}
let Some(pat) = pat else {
return Ok(None);
};
let program_number = match pat.program_numbers.as_slice() {
[single] => Some(*single),
_ => None,
};
let matched_channel = program_number.and_then(|program_number| {
vct.as_ref().and_then(|table| {
table.channels.iter().find(|channel| {
channel.program_number == program_number && channel.program_number != 0
})
})
});
Ok(Some(TransportStreamIdentity {
standard: saw_atsc.then(|| "atsc".to_string()),
transport_stream_id: pat.transport_stream_id,
program_number,
major_channel_number: matched_channel.map(|channel| channel.major_channel_number),
minor_channel_number: matched_channel.map(|channel| channel.minor_channel_number),
short_name: matched_channel.map(|channel| channel.short_name.clone()),
source_id: matched_channel.map(|channel| channel.source_id),
}))
}
pub fn probe_psip_tables<R: Read>(stream: R, max_packets: usize) -> Result<Vec<PsipTable>> {
let mut reader = TsReader::new(stream);
let mut assembler = SectionAssembler::default();
let mut tables = Vec::new();
let max_packets = max_packets.max(1);
for _ in 0..max_packets {
let Some(packet) = reader.read_packet()? else {
break;
};
for section in assembler.push_packet(&packet) {
if let Some(table) = parse_psip_table(&section) {
tables.push(table);
}
}
}
Ok(tables)
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum TimeSource {
AtscStt,
DvbTdt,
DvbTot,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct BroadcastUtc {
pub unix_seconds: i64,
pub source: TimeSource,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeSyncUpdate {
pub pcr_27mhz: Option<u64>,
pub utc_unix_seconds: Option<i64>,
pub chunk_index: Option<u64>,
pub chunk_start_27mhz: Option<u64>,
pub utc_start_unix: Option<i64>,
pub synced: bool,
pub discontinuity: bool,
}
#[derive(Debug)]
pub struct TimeSyncEngine {
chunk_ticks: u64,
last_pcr: Option<u64>,
utc_offset_ticks: Option<i64>,
synced: bool,
last_chunk_index: Option<u64>,
}
impl TimeSyncEngine {
pub fn new(chunk_duration_ms: u64) -> Self {
let chunk_ticks = chunk_duration_ms * 27_000;
Self {
chunk_ticks,
last_pcr: None,
utc_offset_ticks: None,
synced: false,
last_chunk_index: None,
}
}
pub fn ingest_packet(
&mut self,
packet: &TsPacket,
assembler: &mut SectionAssembler,
) -> Vec<TimeSyncUpdate> {
let mut updates = Vec::new();
if packet.discontinuity {
self.last_pcr = None;
self.utc_offset_ticks = None;
self.synced = false;
self.last_chunk_index = None;
updates.push(TimeSyncUpdate {
pcr_27mhz: packet.pcr_27mhz,
utc_unix_seconds: None,
chunk_index: None,
chunk_start_27mhz: None,
utc_start_unix: None,
synced: false,
discontinuity: true,
});
}
for section in assembler.push_packet(packet) {
if let Some(utc) = parse_time_section(&section) {
if let Some(pcr) = self.last_pcr {
let utc_ticks = utc.unix_seconds.saturating_mul(27_000_000);
let offset = utc_ticks - pcr as i64;
self.utc_offset_ticks = Some(offset);
self.synced = true;
}
updates.push(TimeSyncUpdate {
pcr_27mhz: self.last_pcr,
utc_unix_seconds: Some(utc.unix_seconds),
chunk_index: self.current_chunk_index(),
chunk_start_27mhz: self.current_chunk_start_27mhz(),
utc_start_unix: self.current_chunk_utc_start(),
synced: self.synced,
discontinuity: false,
});
}
}
if let Some(pcr) = packet.pcr_27mhz {
self.last_pcr = Some(pcr);
let chunk_index = self.current_chunk_index();
if chunk_index != self.last_chunk_index {
self.last_chunk_index = chunk_index;
updates.push(TimeSyncUpdate {
pcr_27mhz: Some(pcr),
utc_unix_seconds: self.current_utc_seconds(),
chunk_index,
chunk_start_27mhz: self.current_chunk_start_27mhz(),
utc_start_unix: self.current_chunk_utc_start(),
synced: self.synced,
discontinuity: false,
});
}
}
updates
}
fn current_utc_seconds(&self) -> Option<i64> {
let pcr = self.last_pcr? as i64;
let offset = self.utc_offset_ticks?;
Some((pcr + offset) / 27_000_000)
}
fn current_chunk_index(&self) -> Option<u64> {
let pcr = self.last_pcr? as i128;
let offset = self.utc_offset_ticks.unwrap_or(0) as i128;
let t = pcr + offset;
if t < 0 {
return None;
}
Some((t as u128 / self.chunk_ticks as u128) as u64)
}
fn current_chunk_start_27mhz(&self) -> Option<u64> {
let chunk_index = self.current_chunk_index()? as i128;
let offset = self.utc_offset_ticks.unwrap_or(0) as i128;
let anchored = chunk_index * self.chunk_ticks as i128;
let pcr = anchored - offset;
if pcr < 0 {
return None;
}
Some(pcr as u64)
}
fn current_chunk_utc_start(&self) -> Option<i64> {
let _ = self.utc_offset_ticks?;
let chunk_index = self.current_chunk_index()? as i128;
let anchored = chunk_index * self.chunk_ticks as i128;
Some((anchored / 27_000_000) as i64)
}
}
pub fn parse_time_section(section: &Section) -> Option<BroadcastUtc> {
match section.table_id {
TABLE_ID_ATSC_STT if section.pid == PID_ATSC_PSIP => {
parse_stt_section(section).map(|stt| BroadcastUtc {
unix_seconds: stt.unix_seconds,
source: TimeSource::AtscStt,
})
}
TABLE_ID_DVB_TDT if section.pid == PID_DVB_TDT_TOT => {
parse_dvb_time(&section.data).map(|utc| BroadcastUtc {
unix_seconds: utc,
source: TimeSource::DvbTdt,
})
}
TABLE_ID_DVB_TOT if section.pid == PID_DVB_TDT_TOT => {
parse_dvb_time(&section.data).map(|utc| BroadcastUtc {
unix_seconds: utc,
source: TimeSource::DvbTot,
})
}
_ => None,
}
}
fn parse_dvb_time(data: &[u8]) -> Option<i64> {
if data.len() < 8 {
return None;
}
let mjd = u16::from_be_bytes([data[3], data[4]]);
let hour = bcd_to_dec(data[5])?;
let minute = bcd_to_dec(data[6])?;
let second = bcd_to_dec(data[7])?;
let days = mjd as i64 - MJD_UNIX_EPOCH;
Some(days * 86_400 + hour as i64 * 3_600 + minute as i64 * 60 + second as i64)
}
fn bcd_to_dec(value: u8) -> Option<u32> {
let high = (value >> 4) & 0x0F;
let low = value & 0x0F;
if high > 9 || low > 9 {
return None;
}
Some((high as u32) * 10 + low as u32)
}
#[cfg(test)]
mod tests {
use super::*;
struct BitWriter {
bytes: Vec<u8>,
bit_pos: usize,
}
impl BitWriter {
fn new() -> Self {
Self {
bytes: Vec::new(),
bit_pos: 0,
}
}
fn push_bits(&mut self, value: u64, count: usize) {
for i in (0..count).rev() {
let bit = ((value >> i) & 1) as u8;
if self.bit_pos % 8 == 0 {
self.bytes.push(0);
}
let byte_index = self.bit_pos / 8;
let bit_index = 7 - (self.bit_pos % 8);
self.bytes[byte_index] |= bit << bit_index;
self.bit_pos += 1;
}
}
fn into_bytes(self) -> Vec<u8> {
self.bytes
}
}
fn build_ts_packet_with_adaptation_pcr(
pid: u16,
continuity_counter: u8,
pcr_27mhz: u64,
) -> [u8; TS_PACKET_SIZE] {
// Encode PCR into base (90kHz) and extension (27MHz remainder).
let base = pcr_27mhz / 300;
let ext = pcr_27mhz % 300;
let mut pcr = [0u8; 6];
pcr[0] = ((base >> 25) & 0xFF) as u8;
pcr[1] = ((base >> 17) & 0xFF) as u8;
pcr[2] = ((base >> 9) & 0xFF) as u8;
pcr[3] = ((base >> 1) & 0xFF) as u8;
pcr[4] = (((base & 0x1) << 7) as u8) | 0x7E | (((ext >> 8) & 0x1) as u8);
pcr[5] = (ext & 0xFF) as u8;
let mut data = [0u8; TS_PACKET_SIZE];
data[0] = SYNC_BYTE;
data[1] = ((pid >> 8) as u8) & 0x1F;
data[2] = (pid & 0xFF) as u8;
// adaptation only (no payload): adaptation_control=2
data[3] = (2 << 4) | (continuity_counter & 0x0F);
// adaptation length: 1 byte flags + 6 bytes PCR = 7
data[4] = 7;
// flags: PCR flag
data[5] = 0x10;
data[6..12].copy_from_slice(&pcr);
data
}
fn build_payload_packet(
pid: u16,
continuity_counter: u8,
payload_unit_start: bool,
payload: &[u8],
) -> [u8; TS_PACKET_SIZE] {
let mut data = [0u8; TS_PACKET_SIZE];
data[0] = SYNC_BYTE;
data[1] = (((pid >> 8) as u8) & 0x1F) | if payload_unit_start { 0x40 } else { 0x00 };
data[2] = (pid & 0xFF) as u8;
data[3] = (1 << 4) | (continuity_counter & 0x0F);
let start = 4usize;
let len = payload.len().min(TS_PACKET_SIZE - start);
data[start..start + len].copy_from_slice(&payload[..len]);
data
}
fn build_pat_section(transport_stream_id: u16, program_numbers: &[u16]) -> Vec<u8> {
let section_length = 5 + (program_numbers.len() * 4) + 4;
let mut data = vec![0u8; 3 + section_length];
data[0] = TABLE_ID_PAT;
data[1] = 0xB0 | (((section_length >> 8) as u8) & 0x0F);
data[2] = (section_length & 0xFF) as u8;
data[3..5].copy_from_slice(&transport_stream_id.to_be_bytes());
data[5] = 0xC1; // version 0, current_next=1
data[6] = 0;
data[7] = 0;
let mut idx = 8usize;
for program_number in program_numbers {
data[idx..idx + 2].copy_from_slice(&program_number.to_be_bytes());
// PMT PID = 0x0100 for all test programs; parser only cares about program_number.
data[idx + 2] = 0xE1;
data[idx + 3] = 0x00;
idx += 4;
}
data
}
fn build_mgt_section(entries: &[(u16, u16, u8, u32)]) -> Vec<u8> {
let mut body = BitWriter::new();
body.push_bits(0, 8); // protocol_version
body.push_bits(entries.len() as u64, 16);
for (table_type, pid, version, number_bytes) in entries {
body.push_bits(*table_type as u64, 16);
body.push_bits(0b111, 3);
body.push_bits(*pid as u64, 13);
body.push_bits(0b111, 3);
body.push_bits(*version as u64, 5);
body.push_bits(*number_bytes as u64, 32);
body.push_bits(0b1111, 4);
body.push_bits(0, 12);
}
body.push_bits(0b1111, 4);
body.push_bits(0, 12);
let body = body.into_bytes();
let section_length = 9 + body.len();
let mut data = vec![0u8; 3 + section_length];
data[0] = TABLE_ID_ATSC_MGT;
data[1] = 0xB0 | (((section_length >> 8) as u8) & 0x0F);
data[2] = (section_length & 0xFF) as u8;
data[3] = 0;
data[4] = 0;
data[5] = 0xC1;
data[6] = 0;
data[7] = 0;
data[8..8 + body.len()].copy_from_slice(&body);
data
}
fn build_tvct_section() -> Vec<u8> {
let mut body = BitWriter::new();
body.push_bits(0, 8); // protocol_version
body.push_bits(1, 8); // num_channels_in_section
for word in [
'K' as u16, 'C' as u16, 'B' as u16, 'S' as u16, '-' as u16, 'H' as u16, 'D' as u16,
] {
body.push_bits(word as u64, 16);
}
body.push_bits(0b1111, 4);
body.push_bits(2, 10); // major
body.push_bits(1, 10); // minor
body.push_bits(0x04, 8); // modulation
body.push_bits(573_000_000, 32);
body.push_bits(0x0456, 16); // channel_tsid
body.push_bits(3, 16); // program_number
body.push_bits(0, 2); // etm_location
body.push_bits(0, 1); // access_controlled
body.push_bits(0, 1); // hidden
body.push_bits(0b11, 2); // reserved
body.push_bits(0, 1); // hide_guide
body.push_bits(0b111, 3); // reserved
body.push_bits(2, 6); // service_type
body.push_bits(0x1001, 16); // source_id
body.push_bits(0b111111, 6);
body.push_bits(0, 10); // descriptors_length
body.push_bits(0b111111, 6);
body.push_bits(0, 10); // additional descriptors_length
let body = body.into_bytes();
let section_length = 9 + body.len();
let mut data = vec![0u8; 3 + section_length];
data[0] = TABLE_ID_ATSC_TVCT;
data[1] = 0xB0 | (((section_length >> 8) as u8) & 0x0F);
data[2] = (section_length & 0xFF) as u8;
data[3..5].copy_from_slice(&0x0456u16.to_be_bytes());
data[5] = 0xC1;
data[6] = 0;
data[7] = 0;
data[8..8 + body.len()].copy_from_slice(&body);
data
}
fn build_psip_section(
table_id: u8,
table_id_extension: u16,
body: &[u8],
section_number: u8,
last_section_number: u8,
) -> Vec<u8> {
let section_length = 9 + body.len();
let mut data = vec![0u8; 3 + section_length];
data[0] = table_id;
data[1] = 0xB0 | (((section_length >> 8) as u8) & 0x0F);
data[2] = (section_length & 0xFF) as u8;
data[3..5].copy_from_slice(&table_id_extension.to_be_bytes());
data[5] = 0xC1;
data[6] = section_number;
data[7] = last_section_number;
data[8..8 + body.len()].copy_from_slice(body);
data
}
fn build_multiple_string_ascii(language: &str, text: &str) -> Vec<u8> {
let language = language.as_bytes();
assert!(language.len() == 3);
let text = text.as_bytes();
assert!(text.len() <= u8::MAX as usize);
let mut data = Vec::with_capacity(8 + text.len());
data.push(1); // number_strings
data.extend_from_slice(language);
data.push(1); // number_segments
data.push(0); // compression_type
data.push(0); // mode
data.push(text.len() as u8);
data.extend_from_slice(text);
data
}
fn build_multiple_string_utf16(language: &str, text: &str) -> Vec<u8> {
let language = language.as_bytes();
assert!(language.len() == 3);
let mut bytes = Vec::new();
for word in text.encode_utf16() {
bytes.extend_from_slice(&word.to_be_bytes());
}
assert!(bytes.len() <= u8::MAX as usize);
let mut data = Vec::with_capacity(8 + bytes.len());
data.push(1);
data.extend_from_slice(language);
data.push(1);
data.push(0);
data.push(0x3F);
data.push(bytes.len() as u8);
data.extend_from_slice(&bytes);
data
}
fn build_rrt_section() -> Vec<u8> {
let region_name = build_multiple_string_ascii("eng", "U.S. (50 states + possessions)");
let empty = build_multiple_string_ascii("eng", "");
let none = build_multiple_string_ascii("eng", "None");
let tv_g = build_multiple_string_ascii("eng", "TV-G");
let dimension_name = build_multiple_string_ascii("eng", "Entire Audience");
let mut body = Vec::new();
body.push(0); // protocol_version
body.push(region_name.len() as u8);
body.extend_from_slice(&region_name);
body.push(1); // dimensions_defined
body.push(dimension_name.len() as u8);
body.extend_from_slice(&dimension_name);
body.push(0xE0 | 0x10 | 3); // reserved + graduated_scale + 3 values
for (abbrev, value) in [(&empty, &empty), (&none, &none), (&tv_g, &tv_g)] {
body.push(abbrev.len() as u8);
body.extend_from_slice(abbrev);
body.push(value.len() as u8);
body.extend_from_slice(value);
}
body.extend_from_slice(&0xFC00u16.to_be_bytes()); // descriptors_length = 0
build_psip_section(TABLE_ID_ATSC_RRT, 0x0001, &body, 0, 0)
}
fn build_eit_section() -> Vec<u8> {
let title = build_multiple_string_ascii("eng", "Flipper");
let descriptors = [
0x87, 0x10, 0x01, 0x01, 0x00, 0x02, 0x05, 0x01, b'e', b'n', b'g', 4, b'T', b'V', b'-',
b'G',
];
let mut body = Vec::new();
body.push(0); // protocol_version
body.push(1); // num_events_in_section
body.extend_from_slice(&(0xC000 | 0x0028u16).to_be_bytes());
body.extend_from_slice(&123_456u32.to_be_bytes());
let etm_and_length = 0xC00000 | (1 << 20) | 1_800u32;
body.push(((etm_and_length >> 16) & 0xFF) as u8);
body.push(((etm_and_length >> 8) & 0xFF) as u8);
body.push((etm_and_length & 0xFF) as u8);
body.push(title.len() as u8);
body.extend_from_slice(&title);
body.extend_from_slice(&(0xF000 | descriptors.len() as u16).to_be_bytes());
body.extend_from_slice(&descriptors);
build_psip_section(TABLE_ID_ATSC_EIT, 0x0003, &body, 0, 0)
}
fn build_ett_section() -> Vec<u8> {
let message = build_multiple_string_ascii("eng", "Long event text");
let mut body = Vec::new();
body.push(0); // protocol_version
body.extend_from_slice(&0x01020304u32.to_be_bytes());
body.extend_from_slice(&message);
build_psip_section(TABLE_ID_ATSC_ETT, 0x0003, &body, 0, 0)
}
fn build_stt_section(system_time_gps_seconds: u32, gps_utc_offset: u8) -> Vec<u8> {
let daylight_saving = 0x8000u16 | (17 << 8) | 2;
let mut body = Vec::new();
body.push(0); // protocol_version
body.extend_from_slice(&system_time_gps_seconds.to_be_bytes());
body.push(gps_utc_offset);
body.extend_from_slice(&daylight_saving.to_be_bytes());
build_psip_section(TABLE_ID_ATSC_STT, 0x0000, &body, 0, 0)
}
fn encode_pts_90khz(pts: u64) -> [u8; 5] {
let mut b = [0u8; 5];
b[0] = 0x20 | ((((pts >> 30) & 0x07) as u8) << 1) | 1;
b[1] = ((pts >> 22) & 0xFF) as u8;
b[2] = ((((pts >> 15) & 0x7F) as u8) << 1) | 1;
b[3] = ((pts >> 7) & 0xFF) as u8;
b[4] = (((pts & 0x7F) as u8) << 1) | 1;
b
}
#[test]
fn parse_packet_extracts_pid_and_payload() {
let pid = 0x0033u16;
let mut data = [0u8; TS_PACKET_SIZE];
data[0] = SYNC_BYTE;
data[1] = 0x40 | (((pid >> 8) as u8) & 0x1F); // payload_unit_start
data[2] = (pid & 0xFF) as u8;
data[3] = (1 << 4) | 0x0A; // payload only
data[4] = 0xAA;
let pkt = parse_packet(data).unwrap();
assert_eq!(pkt.pid, pid);
assert!(pkt.payload_unit_start);
assert_eq!(pkt.continuity_counter, 0x0A);
assert_eq!(pkt.payload()[0], 0xAA);
}
#[test]
fn parse_packet_rejects_bad_sync() {
let mut data = [0u8; TS_PACKET_SIZE];
data[0] = 0;
assert!(parse_packet(data).is_err());
}
#[test]
fn parse_packet_rejects_invalid_adaptation_length() {
let pid = 0x0011u16;
let mut data = [0u8; TS_PACKET_SIZE];
data[0] = SYNC_BYTE;
data[1] = ((pid >> 8) as u8) & 0x1F;
data[2] = (pid & 0xFF) as u8;
data[3] = (3 << 4) | 0x00; // adaptation + payload
data[4] = 250; // too large
assert!(parse_packet(data).is_err());
}
#[test]
fn parse_packet_reads_pcr_27mhz() {
let pcr = 54_000_123u64;
let data = build_ts_packet_with_adaptation_pcr(0x0100, 0, pcr);
let pkt = parse_packet(data).unwrap();
assert_eq!(pkt.pcr_27mhz, Some(pcr));
}
#[test]
fn parse_pts_extracts_expected_value() {
let pid = 0x0020u16;
let pts = 90_000u64 * 3;
let pts_bytes = encode_pts_90khz(pts);
let mut data = [0u8; TS_PACKET_SIZE];
data[0] = SYNC_BYTE;
data[1] = 0x40 | (((pid >> 8) as u8) & 0x1F);
data[2] = (pid & 0xFF) as u8;
data[3] = (1 << 4) | 0x00; // payload only
// Minimal PES header with PTS.
let payload = &mut data[4..];
payload[0..3].copy_from_slice(&[0, 0, 1]);
payload[3] = 0xE0;
payload[7] = 0x80; // pts_dts_flags = 2
payload[8] = 5; // header length
payload[9..14].copy_from_slice(&pts_bytes);
let pkt = parse_packet(data).unwrap();
let parsed = parse_pts_90khz(&pkt).unwrap();
assert_eq!(parsed, pts);
}
#[test]
fn section_assembler_reassembles_across_packets() {
let pid = 0x0014u16;
let table_id = TABLE_ID_DVB_TDT;
// A tiny "section" with declared length 10 (3 + 10 = 13 bytes total).
let total_len = 13usize;
let section_length = (total_len - 3) as u16;
let mut section = vec![0u8; total_len];
section[0] = table_id;
section[1] = 0x00 | (((section_length >> 8) as u8) & 0x0F);
section[2] = (section_length & 0xFF) as u8;
for i in 3..total_len {
section[i] = i as u8;
}
// Packet 1: payload is intentionally short (via a large adaptation field) so the assembler
// must buffer until packet 2 arrives.
let mut pkt1 = [0u8; TS_PACKET_SIZE];
pkt1[0] = SYNC_BYTE;
pkt1[1] = 0x40 | (((pid >> 8) as u8) & 0x1F);
pkt1[2] = (pid & 0xFF) as u8;
pkt1[3] = (3 << 4) | 0; // adaptation + payload
let payload_len_1 = 8usize; // 1 pointer + 7 bytes of section
let adaptation_len_1 = (TS_PACKET_SIZE - 5) - payload_len_1;
pkt1[4] = adaptation_len_1 as u8;
// adaptation flags at pkt1[5] left as 0; rest is stuffing 0.
let payload_start_1 = 4 + 1 + adaptation_len_1;
pkt1[payload_start_1] = 0; // pointer = 0
pkt1[payload_start_1 + 1..payload_start_1 + 1 + 7].copy_from_slice(&section[..7]);
let mut pkt2 = [0u8; TS_PACKET_SIZE];
pkt2[0] = SYNC_BYTE;
pkt2[1] = ((pid >> 8) as u8) & 0x1F;
pkt2[2] = (pid & 0xFF) as u8;
pkt2[3] = (1 << 4) | 1; // payload only
pkt2[4..4 + (total_len - 7)].copy_from_slice(&section[7..]);
let p1 = parse_packet(pkt1).unwrap();
let p2 = parse_packet(pkt2).unwrap();
let mut asm = SectionAssembler::default();
assert!(asm.push_packet(&p1).is_empty());
let out = asm.push_packet(&p2);
assert_eq!(out.len(), 1);
assert_eq!(out[0].pid, pid);
assert_eq!(out[0].table_id, table_id);
assert_eq!(out[0].data.len(), total_len);
assert_eq!(out[0].data[3], 3u8);
}
#[test]
fn parse_pat_section_extracts_tsid_and_programs() {
let section = Section {
pid: PID_PAT,
table_id: TABLE_ID_PAT,
data: build_pat_section(0x0456, &[3, 7]),
};
let pat = parse_pat_section(&section).unwrap();
assert_eq!(pat.transport_stream_id, 0x0456);
assert_eq!(pat.program_numbers, vec![3, 7]);
}
#[test]
fn parse_mgt_section_extracts_table_catalog() {
let section = Section {
pid: PID_ATSC_PSIP,
table_id: TABLE_ID_ATSC_MGT,
data: build_mgt_section(&[(0x0000, PID_ATSC_PSIP, 1, 1024)]),
};
let mgt = parse_mgt_section(&section).unwrap();
assert_eq!(mgt.protocol_version, 0);
assert_eq!(mgt.tables_defined.len(), 1);
assert_eq!(mgt.tables_defined[0].table_type, 0x0000);
assert_eq!(mgt.tables_defined[0].table_type_pid, PID_ATSC_PSIP);
assert_eq!(mgt.tables_defined[0].table_type_version_number, 1);
assert_eq!(mgt.tables_defined[0].number_bytes, 1024);
}
#[test]
fn parse_vct_section_extracts_virtual_channel_fields() {
let section = Section {
pid: PID_ATSC_PSIP,
table_id: TABLE_ID_ATSC_TVCT,
data: build_tvct_section(),
};
let vct = parse_vct_section(&section).unwrap();
assert_eq!(vct.transport_stream_id, 0x0456);
assert_eq!(vct.channels.len(), 1);
let channel = &vct.channels[0];
assert_eq!(channel.short_name, "KCBS-HD");
assert_eq!(channel.major_channel_number, 2);
assert_eq!(channel.minor_channel_number, 1);
assert_eq!(channel.channel_tsid, 0x0456);
assert_eq!(channel.program_number, 3);
assert_eq!(channel.source_id, 0x1001);
}
#[test]
fn parse_multiple_string_structure_decodes_ascii_and_utf16() {
let ascii = parse_multiple_string_structure(&build_multiple_string_ascii("eng", "Flipper"))
.unwrap();
assert_eq!(ascii.strings.len(), 1);
assert_eq!(ascii.strings[0].language_code, "eng");
assert_eq!(ascii.strings[0].decoded.as_deref(), Some("Flipper"));
let utf16 =
parse_multiple_string_structure(&build_multiple_string_utf16("eng", "TV-G")).unwrap();
assert_eq!(utf16.strings[0].decoded.as_deref(), Some("TV-G"));
}
#[test]
fn parse_rrt_section_extracts_dimensions_and_values() {
let section = Section {
pid: PID_ATSC_PSIP,
table_id: TABLE_ID_ATSC_RRT,
data: build_rrt_section(),
};
let rrt = parse_rrt_section(&section).unwrap();
assert_eq!(rrt.rating_region, 1);
assert_eq!(
rrt.rating_region_name.strings[0].decoded.as_deref(),
Some("U.S. (50 states + possessions)")
);
assert_eq!(rrt.dimensions.len(), 1);
assert!(rrt.dimensions[0].graduated_scale);
assert_eq!(rrt.dimensions[0].values.len(), 3);
assert_eq!(
rrt.dimensions[0].values[2].abbrev_rating_value.strings[0]
.decoded
.as_deref(),
Some("TV-G")
);
}
#[test]
fn parse_eit_section_extracts_events() {
let section = Section {
pid: 0x1D00,
table_id: TABLE_ID_ATSC_EIT,
data: build_eit_section(),
};
let eit = parse_eit_section(&section).unwrap();
assert_eq!(eit.source_id, 3);
assert_eq!(eit.protocol_version, 0);
assert_eq!(eit.section_number, 0);
assert_eq!(eit.events.len(), 1);
assert_eq!(eit.events[0].event_id, 0x0028);
assert_eq!(eit.events[0].etm_location, 1);
assert_eq!(eit.events[0].length_in_seconds, 1_800);
assert_eq!(
eit.events[0].title.strings[0].decoded.as_deref(),
Some("Flipper")
);
assert_eq!(eit.events[0].descriptors.len(), 16);
}
#[test]
fn parse_ett_section_extracts_message() {
let section = Section {
pid: 0x1E00,
table_id: TABLE_ID_ATSC_ETT,
data: build_ett_section(),
};
let ett = parse_ett_section(&section).unwrap();
assert_eq!(ett.table_id_extension, 3);
assert_eq!(ett.etm_id, 0x01020304);
assert_eq!(
ett.message.strings[0].decoded.as_deref(),
Some("Long event text")
);
}
#[test]
fn parse_stt_section_extracts_fields_and_utc() {
let section = Section {
pid: PID_ATSC_PSIP,
table_id: TABLE_ID_ATSC_STT,
data: build_stt_section(1_236_854_919, 18),
};
let stt = parse_stt_section(&section).unwrap();
assert_eq!(stt.protocol_version, 0);
assert_eq!(stt.system_time_gps_seconds, 1_236_854_919);
assert_eq!(stt.gps_utc_offset, 18);
assert!(stt.daylight_saving_status);
assert_eq!(stt.daylight_saving_day_of_month, 17);
assert_eq!(stt.daylight_saving_hour, 2);
assert_eq!(stt.unix_seconds, 1_552_819_701);
}
#[test]
fn probe_transport_stream_identity_prefers_single_program_and_atsc_hint() {
let mut stream = Vec::new();
let mut pat_payload = vec![0u8; 1];
pat_payload.extend(build_pat_section(0x0456, &[3]));
stream.extend_from_slice(&build_payload_packet(PID_PAT, 0, true, &pat_payload));
let mut mgt_payload = vec![0u8; 1];
mgt_payload.extend(build_mgt_section(&[(0x0000, PID_ATSC_PSIP, 1, 1024)]));
stream.extend_from_slice(&build_payload_packet(PID_ATSC_PSIP, 1, true, &mgt_payload));
let mut vct_payload = vec![0u8; 1];
vct_payload.extend(build_tvct_section());
stream.extend_from_slice(&build_payload_packet(PID_ATSC_PSIP, 2, true, &vct_payload));
let identity = probe_transport_stream_identity(std::io::Cursor::new(stream), 8)
.unwrap()
.unwrap();
assert_eq!(identity.standard.as_deref(), Some("atsc"));
assert_eq!(identity.transport_stream_id, 0x0456);
assert_eq!(identity.program_number, Some(3));
assert_eq!(identity.major_channel_number, Some(2));
assert_eq!(identity.minor_channel_number, Some(1));
assert_eq!(identity.short_name.as_deref(), Some("KCBS-HD"));
assert_eq!(identity.source_id, Some(0x1001));
}
#[test]
fn probe_transport_stream_identity_keeps_ambiguous_multi_program_streams_unspecified() {
let mut stream = Vec::new();
let mut pat_payload = vec![0u8; 1];
pat_payload.extend(build_pat_section(0x0456, &[3, 7]));
stream.extend_from_slice(&build_payload_packet(PID_PAT, 0, true, &pat_payload));
let identity = probe_transport_stream_identity(std::io::Cursor::new(stream), 4)
.unwrap()
.unwrap();
assert_eq!(identity.transport_stream_id, 0x0456);
assert_eq!(identity.program_number, None);
assert_eq!(identity.standard, None);
}
#[test]
fn probe_psip_tables_returns_known_table_variants() {
let mut stream = Vec::new();
let mut mgt_payload = vec![0u8; 1];
mgt_payload.extend(build_mgt_section(&[
(0x0000, PID_ATSC_PSIP, 1, 1024),
(0x0100, 0x1D00, 1, 420),
(0x0200, 0x1E00, 1, 128),
(0x0301, PID_ATSC_PSIP, 0, 256),
]));
stream.extend_from_slice(&build_payload_packet(PID_ATSC_PSIP, 0, true, &mgt_payload));
let mut vct_payload = vec![0u8; 1];
vct_payload.extend(build_tvct_section());
stream.extend_from_slice(&build_payload_packet(PID_ATSC_PSIP, 1, true, &vct_payload));
let mut rrt_payload = vec![0u8; 1];
rrt_payload.extend(build_rrt_section());
stream.extend_from_slice(&build_payload_packet(PID_ATSC_PSIP, 2, true, &rrt_payload));
let mut stt_payload = vec![0u8; 1];
stt_payload.extend(build_stt_section(1_236_854_919, 18));
stream.extend_from_slice(&build_payload_packet(PID_ATSC_PSIP, 3, true, &stt_payload));
let mut eit_payload = vec![0u8; 1];
eit_payload.extend(build_eit_section());
stream.extend_from_slice(&build_payload_packet(0x1D00, 4, true, &eit_payload));
let mut ett_payload = vec![0u8; 1];
ett_payload.extend(build_ett_section());
stream.extend_from_slice(&build_payload_packet(0x1E00, 5, true, &ett_payload));
let tables = probe_psip_tables(std::io::Cursor::new(stream), 8).unwrap();
assert!(tables
.iter()
.any(|table| matches!(table, PsipTable::MasterGuide(_))));
assert!(tables
.iter()
.any(|table| matches!(table, PsipTable::VirtualChannel(_))));
assert!(tables
.iter()
.any(|table| matches!(table, PsipTable::RatingRegion(_))));
assert!(tables
.iter()
.any(|table| matches!(table, PsipTable::EventInformation(_))));
assert!(tables
.iter()
.any(|table| matches!(table, PsipTable::ExtendedText(_))));
assert!(tables
.iter()
.any(|table| matches!(table, PsipTable::SystemTime(_))));
}
#[test]
fn parse_time_sections_for_dvb_and_atsc_epoch() {
// DVB TDT at UNIX epoch.
let mut dvb = vec![0u8; 8];
dvb[0] = TABLE_ID_DVB_TDT;
dvb[3] = 0x9E;
dvb[4] = 0x8B; // MJD 40587
dvb[5] = 0x00;
dvb[6] = 0x00;
dvb[7] = 0x00;
let section = Section {
pid: PID_DVB_TDT_TOT,
table_id: TABLE_ID_DVB_TDT,
data: dvb,
};
let utc = parse_time_section(&section).unwrap();
assert_eq!(utc.unix_seconds, 0);
// ATSC STT at the GPS epoch.
let atsc = build_stt_section(0, 0);
let section = Section {
pid: PID_ATSC_PSIP,
table_id: TABLE_ID_ATSC_STT,
data: atsc,
};
let utc = parse_time_section(&section).unwrap();
assert_eq!(utc.unix_seconds, GPS_EPOCH_TO_UNIX);
}
#[test]
#[ignore]
fn real_atsc_sample_matches_known_psip_shape() {
let path = std::env::var("EC_REAL_ATSC_SAMPLE")
.ok()
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| "/tmp/tsduck-test/input/test-040.ts".to_string());
let Ok(file) = std::fs::File::open(&path) else {
eprintln!("real ATSC sample not found: {path}");
return;
};
let tables = probe_psip_tables(file, 150_000).unwrap();
let mgt = tables
.iter()
.find_map(|table| match table {
PsipTable::MasterGuide(table) => Some(table),
_ => None,
})
.expect("missing MGT");
assert_eq!(mgt.tables_defined.len(), 11);
assert!(mgt
.tables_defined
.iter()
.any(|entry| entry.table_type == 0x0100));
assert!(mgt
.tables_defined
.iter()
.any(|entry| entry.table_type == 0x0200));
assert!(mgt
.tables_defined
.iter()
.any(|entry| entry.table_type == 0x0301));
let stt = tables
.iter()
.find_map(|table| match table {
PsipTable::SystemTime(table) => Some(table),
_ => None,
})
.expect("missing STT");
assert_eq!(stt.gps_utc_offset, 18);
assert_eq!(stt.unix_seconds, 1_552_819_701);
let vct = tables
.iter()
.find_map(|table| match table {
PsipTable::VirtualChannel(table) => Some(table),
_ => None,
})
.expect("missing VCT");
assert_eq!(vct.transport_stream_id, 0x1FE1);
assert_eq!(vct.channels.len(), 4);
assert_eq!(vct.channels[0].short_name.trim(), "KULX");
assert_eq!(vct.channels[0].major_channel_number, 10);
assert_eq!(vct.channels[0].minor_channel_number, 1);
let eit_count = tables
.iter()
.filter(|table| matches!(table, PsipTable::EventInformation(_)))
.count();
let ett_count = tables
.iter()
.filter(|table| matches!(table, PsipTable::ExtendedText(_)))
.count();
assert!(eit_count >= 4, "expected EIT tables, found {eit_count}");
assert!(ett_count >= 1, "expected ETT tables, found {ett_count}");
let identity =
probe_transport_stream_identity(std::fs::File::open(&path).unwrap(), 150_000)
.unwrap()
.unwrap();
assert_eq!(identity.standard.as_deref(), Some("atsc"));
assert_eq!(identity.transport_stream_id, 0x1FE1);
assert_eq!(identity.program_number, None);
}
#[test]
#[ignore]
fn real_rrt_sample_matches_known_reference() {
let path = std::env::var("EC_REAL_RRT_SAMPLE")
.ok()
.filter(|value| !value.trim().is_empty())
.unwrap_or_else(|| "/tmp/tsduck-test/input/test-052.ts".to_string());
let Ok(file) = std::fs::File::open(&path) else {
eprintln!("real RRT sample not found: {path}");
return;
};
let tables = probe_psip_tables(file, 10_000).unwrap();
let rrt = tables
.iter()
.find_map(|table| match table {
PsipTable::RatingRegion(table) => Some(table),
_ => None,
})
.expect("missing RRT");
assert_eq!(rrt.rating_region, 1);
assert_eq!(
rrt.rating_region_name.strings[0].decoded.as_deref(),
Some("U.S. (50 states + possessions)")
);
assert_eq!(rrt.dimensions.len(), 8);
assert_eq!(
rrt.dimensions[0].dimension_name.strings[0]
.decoded
.as_deref(),
Some("Entire Audience")
);
assert_eq!(
rrt.dimensions[0].values[2].rating_value.strings[0]
.decoded
.as_deref(),
Some("TV-G")
);
}
#[test]
fn time_sync_engine_emits_chunk_boundaries_from_pcr() {
let mut engine = TimeSyncEngine::new(1000);
let mut asm = SectionAssembler::default();
let p0 = parse_packet(build_ts_packet_with_adaptation_pcr(0x0100, 0, 0)).unwrap();
let p1 = parse_packet(build_ts_packet_with_adaptation_pcr(0x0100, 1, 27_000_000)).unwrap();
let u0 = engine.ingest_packet(&p0, &mut asm);
assert!(u0.iter().any(|u| u.chunk_index == Some(0)));
let u1 = engine.ingest_packet(&p1, &mut asm);
assert!(u1.iter().any(|u| u.chunk_index == Some(1)));
}
}