use anyhow::{anyhow, Result}; use bytes::Bytes; use ec_direct::{decode_direct_link, encode_direct_link, DirectCodeV1}; use just_webrtc::types::{ DataChannelOptions, PeerConfiguration, PeerConnectionState, SessionDescription, }; use just_webrtc::{DataChannelExt, PeerConnectionBuilder, PeerConnectionExt}; async fn wait_connected(pc: &impl PeerConnectionExt) -> Result<()> { tokio::time::timeout(std::time::Duration::from_secs(20), async { loop { match pc.state_change().await { PeerConnectionState::Connected => break Ok(()), PeerConnectionState::Failed => break Err(anyhow!("peer connection failed")), PeerConnectionState::Closed => break Err(anyhow!("peer connection closed")), _ => {} } } }) .await .map_err(|_| anyhow!("timed out waiting for peer connection"))? } // Ignored by default: WebRTC can be timing-sensitive on some hosts. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[ignore] async fn e2e_direct_connect_loopback_sends_bytes() -> Result<()> { // Avoid depending on external STUN servers in tests: use host candidates only. let cfg = PeerConfiguration { ice_servers: vec![], ..Default::default() }; let offerer = PeerConnectionBuilder::new() .set_config(cfg.clone()) .with_channel_options(vec![( "simple_channel_".to_string(), DataChannelOptions::default(), )]) .map_err(|e| anyhow!("{e:#}"))? .build() .await .map_err(|e| anyhow!("{e:#}"))?; let offer_desc: SessionDescription = offerer .get_local_description() .await .ok_or_else(|| anyhow!("missing offer local description"))?; let offer_candidates = offerer .collect_ice_candidates() .await .map_err(|e| anyhow!("{e:#}"))?; let offer_link = encode_direct_link(&DirectCodeV1 { v: 1, desc: offer_desc, candidates: offer_candidates, label: Some("every.channel0".to_string()), })?; let offer_code = decode_direct_link(&offer_link)?; let answerer = PeerConnectionBuilder::new() .set_config(cfg.clone()) .with_remote_offer(Some(offer_code.desc.clone())) .map_err(|e| anyhow!("{e:#}"))? .build() .await .map_err(|e| anyhow!("{e:#}"))?; answerer .add_ice_candidates(offer_code.candidates.clone()) .await .map_err(|e| anyhow!("{e:#}"))?; let answer_desc = answerer .get_local_description() .await .ok_or_else(|| anyhow!("missing answer local description"))?; let answer_candidates = answerer .collect_ice_candidates() .await .map_err(|e| anyhow!("{e:#}"))?; let answer_link = encode_direct_link(&DirectCodeV1 { v: 1, desc: answer_desc, candidates: answer_candidates, label: Some("every.channel0".to_string()), })?; let answer_code = decode_direct_link(&answer_link)?; offerer .set_remote_description(answer_code.desc.clone()) .await .map_err(|e| anyhow!("{e:#}"))?; offerer .add_ice_candidates(answer_code.candidates.clone()) .await .map_err(|e| anyhow!("{e:#}"))?; // Wait for both peers to report a full connection before waiting for the data channel. wait_connected(&offerer).await?; wait_connected(&answerer).await?; let offerer_ch = offerer .receive_channel() .await .map_err(|e| anyhow!("{e:#}"))?; let answerer_ch = answerer .receive_channel() .await .map_err(|e| anyhow!("{e:#}"))?; offerer_ch.wait_ready().await; answerer_ch.wait_ready().await; let payload = Bytes::from_static(b"hello"); offerer_ch .send(&payload) .await .map_err(|e| anyhow!("{e:#}"))?; let got = tokio::time::timeout(std::time::Duration::from_secs(10), answerer_ch.receive()) .await .map_err(|_| anyhow!("timed out waiting for receive"))? .map_err(|e| anyhow!("{e:#}"))?; assert_eq!(&got[..], b"hello"); // Confirm the reverse direction works too (this also guards against one-way readiness bugs). answerer_ch .send(&Bytes::from_static(b"world")) .await .map_err(|e| anyhow!("{e:#}"))?; let got = tokio::time::timeout(std::time::Duration::from_secs(10), offerer_ch.receive()) .await .map_err(|_| anyhow!("timed out waiting for receive"))? .map_err(|e| anyhow!("{e:#}"))?; assert_eq!(&got[..], b"world"); Ok(()) }