every.channel/crates/ec-iroh/src/lib.rs

414 lines
13 KiB
Rust

//! iroh transport scaffolding for every.channel.
use anyhow::{Context, Result};
use bytes::Bytes;
use ec_core::{StreamCatalogEntry, StreamControlAnnouncement};
use futures_lite::StreamExt;
use iroh::address_lookup::{
DhtAddressLookup, DiscoveryEvent, DnsAddressLookup, MdnsAddressLookup, PkarrPublisher, UserData,
};
use iroh::endpoint::RelayMode;
use iroh::{
address_lookup::memory::MemoryLookup, protocol::Router, Endpoint, EndpointAddr, PublicKey,
SecretKey,
};
use iroh_gossip::{
api::{Event, GossipReceiver, GossipSender},
net::{Gossip, GOSSIP_ALPN},
proto::TopicId,
};
use std::collections::BTreeMap;
use std::env;
use std::time::{Duration, Instant};
pub const ALPN_MOQ: &[u8] = b"every.channel/moq/0";
pub const DEFAULT_CATALOG_TOPIC: &str = "every.channel/catalog/v1";
pub const DEFAULT_CONTROL_TOPIC: &str = "every.channel/control/v1";
pub const MDNS_USER_DATA: &str = "every.channel";
#[derive(Debug, Clone)]
pub struct TokenBucket {
capacity: u64,
tokens: f64,
refill_per_sec: f64,
last_refill: Instant,
}
impl TokenBucket {
pub fn new(capacity: u64, refill_per_sec: u64) -> Self {
let capacity = capacity.max(1);
let refill_per_sec = refill_per_sec.max(1) as f64;
Self {
capacity,
tokens: capacity as f64,
refill_per_sec,
last_refill: Instant::now(),
}
}
pub fn allow(&mut self, amount: u64) -> bool {
self.refill();
let amount = amount as f64;
if amount <= self.tokens {
self.tokens -= amount;
true
} else {
false
}
}
fn refill(&mut self) {
let now = Instant::now();
let elapsed = now.duration_since(self.last_refill).as_secs_f64();
if elapsed <= 0.0 {
return;
}
self.tokens = (self.tokens + elapsed * self.refill_per_sec).min(self.capacity as f64);
self.last_refill = now;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn token_bucket_allows_and_refills() {
let mut bucket = TokenBucket::new(10, 10);
assert!(bucket.allow(7));
assert!(bucket.allow(3));
assert!(!bucket.allow(1));
// Force a refill without sleeping.
bucket.last_refill = Instant::now() - Duration::from_secs(1);
assert!(bucket.allow(1));
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct DiscoveryConfig {
pub dht: bool,
pub mdns: bool,
pub dns: bool,
}
impl DiscoveryConfig {
pub fn from_env() -> Result<Self> {
match env::var("EVERY_CHANNEL_IROH_DISCOVERY") {
Ok(value) => Self::from_list(&value),
Err(env::VarError::NotPresent) => Ok(Self::default()),
Err(err) => Err(err.into()),
}
}
pub fn from_list(value: &str) -> Result<Self> {
let mut config = DiscoveryConfig::default();
for raw in value.split(|c: char| c == ',' || c == ';' || c.is_whitespace()) {
let token = raw.trim().to_ascii_lowercase();
if token.is_empty() {
continue;
}
match token.as_str() {
"dht" => config.dht = true,
"mdns" => config.mdns = true,
"dns" => config.dns = true,
"all" => {
config.dht = true;
config.mdns = true;
config.dns = true;
}
"none" | "off" => {
config = DiscoveryConfig::default();
}
_ => {
return Err(anyhow::anyhow!("unknown discovery mode: {token}"));
}
}
}
Ok(config)
}
}
pub async fn build_endpoint(
secret: Option<SecretKey>,
discovery: DiscoveryConfig,
) -> Result<Endpoint> {
let relay_mode = relay_mode_from_env().unwrap_or(RelayMode::Default);
let mut builder = Endpoint::empty_builder(relay_mode);
if let Some(secret) = secret {
builder = builder.secret_key(secret);
}
if discovery.dns {
builder = builder
.address_lookup(PkarrPublisher::n0_dns())
.address_lookup(DnsAddressLookup::n0_dns());
}
if discovery.dht {
builder = builder.address_lookup(DhtAddressLookup::builder());
}
if discovery.mdns {
builder = builder.address_lookup(MdnsAddressLookup::builder());
}
let endpoint = builder.bind().await?;
endpoint.set_alpns(vec![ALPN_MOQ.to_vec()]);
Ok(endpoint)
}
fn relay_mode_from_env() -> Result<RelayMode> {
let value = match env::var("EVERY_CHANNEL_IROH_RELAY") {
Ok(value) => value,
Err(env::VarError::NotPresent) => return Ok(RelayMode::Default),
Err(err) => return Err(err.into()),
};
match value.trim().to_ascii_lowercase().as_str() {
"" | "default" => Ok(RelayMode::Default),
"disabled" | "off" => Ok(RelayMode::Disabled),
other => Err(anyhow::anyhow!("unknown relay mode: {other}")),
}
}
pub async fn start_endpoint() -> Result<Endpoint> {
let discovery = DiscoveryConfig::from_env()?;
build_endpoint(None, discovery).await
}
pub fn catalog_topic() -> TopicId {
let hash = blake3::hash(DEFAULT_CATALOG_TOPIC.as_bytes());
TopicId::from_bytes(*hash.as_bytes())
}
pub fn control_topic() -> TopicId {
let hash = blake3::hash(DEFAULT_CONTROL_TOPIC.as_bytes());
TopicId::from_bytes(*hash.as_bytes())
}
pub fn parse_endpoint_addr(value: &str) -> Result<EndpointAddr> {
let value = value.trim();
if value.starts_with('{') {
let addr =
serde_json::from_str::<EndpointAddr>(value).context("invalid EndpointAddr json")?;
return Ok(addr);
}
let id = value.parse::<PublicKey>().context("invalid endpoint id")?;
Ok(EndpointAddr::new(id))
}
#[derive(Debug, Clone)]
pub struct MdnsDiscovery {
mdns: MdnsAddressLookup,
endpoint_id: PublicKey,
user_data: Option<UserData>,
}
impl MdnsDiscovery {
pub async fn start(
endpoint: &Endpoint,
user_data: Option<&str>,
advertise: bool,
) -> Result<Self> {
let mdns = MdnsAddressLookup::builder()
.advertise(advertise)
.build(endpoint.id())
.context("mdns address lookup failed")?;
endpoint.address_lookup().add(mdns.clone());
let user_data = if let Some(value) = user_data {
let data = UserData::try_from(value.to_string()).context("invalid mdns user data")?;
endpoint.set_user_data_for_address_lookup(Some(data.clone()));
Some(data)
} else {
None
};
Ok(Self {
mdns,
endpoint_id: endpoint.id(),
user_data,
})
}
pub async fn discover_peers(&self, timeout: Duration) -> Result<Vec<EndpointAddr>> {
let mut stream = self.mdns.subscribe().await;
let deadline = Instant::now() + timeout;
let mut peers: BTreeMap<PublicKey, EndpointAddr> = BTreeMap::new();
loop {
let now = Instant::now();
if now >= deadline {
break;
}
let remaining = deadline - now;
match tokio::time::timeout(remaining, stream.next()).await {
Ok(Some(DiscoveryEvent::Discovered { endpoint_info, .. })) => {
if endpoint_info.endpoint_id == self.endpoint_id {
continue;
}
if let Some(expected) = self.user_data.as_ref() {
if endpoint_info.data.user_data() != Some(expected) {
continue;
}
}
let addr = EndpointAddr::from(endpoint_info);
peers.insert(addr.id, addr);
}
Ok(Some(DiscoveryEvent::Expired { .. })) => {}
Ok(None) => break,
Err(_) => break,
}
}
Ok(peers.into_values().collect())
}
}
#[derive(Debug)]
pub struct CatalogGossip {
sender: GossipSender,
receiver: GossipReceiver,
_router: Router,
_gossip: Gossip,
_memory_lookup: MemoryLookup,
}
impl CatalogGossip {
pub async fn join(endpoint: Endpoint, peers: &[String]) -> Result<Self> {
let memory_lookup = MemoryLookup::new();
endpoint.address_lookup().add(memory_lookup.clone());
let gossip = Gossip::builder().spawn(endpoint.clone());
let router = Router::builder(endpoint.clone())
.accept(GOSSIP_ALPN, gossip.clone())
.spawn();
let peer_addrs = peers
.iter()
.map(|peer| parse_endpoint_addr(peer))
.collect::<Result<Vec<_>, _>>()
.context("failed to parse gossip peer addr")?;
for peer in &peer_addrs {
memory_lookup.add_endpoint_info(peer.clone());
}
let peer_ids = peer_addrs
.iter()
.map(|addr| addr.id)
.collect::<Vec<PublicKey>>();
// Allow local-only startup with no explicit bootstrap peers. In that mode we still
// subscribe to the topic immediately and can add peers later.
let topic = if peer_ids.is_empty() {
gossip.subscribe(catalog_topic(), Vec::new()).await?
} else {
gossip.subscribe_and_join(catalog_topic(), peer_ids).await?
};
let (sender, receiver) = topic.split();
Ok(Self {
sender,
receiver,
_router: router,
_gossip: gossip,
_memory_lookup: memory_lookup,
})
}
pub async fn announce(&mut self, entry: StreamCatalogEntry) -> Result<()> {
let bytes = serde_json::to_vec(&entry)?;
self.sender.broadcast(Bytes::from(bytes)).await?;
Ok(())
}
pub async fn next_entry(&mut self) -> Result<Option<StreamCatalogEntry>> {
while let Some(event) = self.receiver.try_next().await? {
if let Event::Received(msg) = event {
if let Ok(entry) = serde_json::from_slice::<StreamCatalogEntry>(&msg.content) {
return Ok(Some(entry));
}
}
}
Ok(None)
}
/// Add peers after the gossip topic has already been joined. This enables
/// "nearby" discovery to continuously contribute new peers over time.
pub fn add_peers(&self, peers: Vec<EndpointAddr>) {
for peer in peers {
self._memory_lookup.add_endpoint_info(peer);
}
}
}
#[derive(Debug)]
pub struct ControlGossip {
sender: GossipSender,
receiver: GossipReceiver,
_router: Router,
_gossip: Gossip,
_memory_lookup: MemoryLookup,
}
impl ControlGossip {
pub async fn join(endpoint: Endpoint, peers: &[String]) -> Result<Self> {
let memory_lookup = MemoryLookup::new();
endpoint.address_lookup().add(memory_lookup.clone());
let gossip = Gossip::builder().spawn(endpoint.clone());
let router = Router::builder(endpoint.clone())
.accept(GOSSIP_ALPN, gossip.clone())
.spawn();
let peer_addrs = peers
.iter()
.map(|peer| parse_endpoint_addr(peer))
.collect::<Result<Vec<_>, _>>()
.context("failed to parse control peer addr")?;
for peer in &peer_addrs {
memory_lookup.add_endpoint_info(peer.clone());
}
let peer_ids = peer_addrs
.iter()
.map(|addr| addr.id)
.collect::<Vec<PublicKey>>();
// Allow local-only startup with no explicit bootstrap peers. In that mode we still
// subscribe to the topic immediately and can add peers later.
let topic = if peer_ids.is_empty() {
gossip.subscribe(control_topic(), Vec::new()).await?
} else {
gossip.subscribe_and_join(control_topic(), peer_ids).await?
};
let (sender, receiver) = topic.split();
Ok(Self {
sender,
receiver,
_router: router,
_gossip: gossip,
_memory_lookup: memory_lookup,
})
}
pub async fn announce(&mut self, announcement: StreamControlAnnouncement) -> Result<()> {
let bytes = serde_json::to_vec(&announcement)?;
self.sender.broadcast(Bytes::from(bytes)).await?;
Ok(())
}
pub async fn next_announcement(&mut self) -> Result<Option<StreamControlAnnouncement>> {
while let Some(event) = self.receiver.try_next().await? {
if let Event::Received(msg) = event {
if let Ok(announcement) =
serde_json::from_slice::<StreamControlAnnouncement>(&msg.content)
{
return Ok(Some(announcement));
}
}
}
Ok(None)
}
pub fn add_peers(&self, peers: Vec<EndpointAddr>) {
for peer in peers {
self._memory_lookup.add_endpoint_info(peer);
}
}
}