Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,11 @@ impl Controller {
log::debug!("Operational mDNS discovered device: {:?}", info);

let port = info.port.unwrap_or(5540);
let addresses: Vec<String> = info.ips.iter().map(|ip| {
if ip.is_ipv6() { format!("[{}]:{}", ip, port) } else { format!("{}:{}", ip, port) }
}).collect();
let addresses: Vec<String> = info
.ips
.iter()
.map(|ip| crate::discover::addr_string(ip, port, info.scope_id))
.collect();
log::info!("Device discovered at {}", addresses.join(", "));
Ok(addresses)
}
Expand Down
13 changes: 3 additions & 10 deletions src/devman/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ impl DeviceManager {
matter_info.session_active_interval_ms,
matter_info.session_active_threshold_ms,
);
let scope_id = matter_info.scope_id;
let ips = matter_info.ips;
let port = matter_info.port.unwrap_or(5540);

Expand All @@ -298,11 +299,7 @@ impl DeviceManager {

let mut last_err = anyhow::anyhow!("no IPs to try");
for ip in &ips {
let address = if ip.is_ipv6() {
format!("[{}]:{}", ip, port)
} else {
format!("{}:{}", ip, port)
};
let address = crate::discover::addr_string(ip, port, scope_id);
match self.commission_at(&address, passcode, node_id, name, mrp_ms).await {
Ok(conn) => return Ok(conn),
Err(e) => {
Expand Down Expand Up @@ -398,11 +395,7 @@ impl DeviceManager {
let ip = matter_info.ips.first()
.context(format!("discovered {} but no IPs in response", instance_name))?;
let port = matter_info.port.unwrap_or(5540);
let address = if ip.is_ipv6() {
format!("[{}]:{}", ip, port)
} else {
format!("{}:{}", ip, port)
};
let address = crate::discover::addr_string(ip, port, matter_info.scope_id);

self.update_device_address(node_id, &address)?;
if let Err(e) = self.registry
Expand Down
33 changes: 33 additions & 0 deletions src/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ pub struct MatterDeviceInfo {
pub session_active_threshold_ms: Option<u32>,
/// Device type (DT TXT key) from the commissionable advertisement, decimal string.
pub device_type: Option<String>,
/// scope_id (interface index) for the device's link-local IPv6 addresses: the
/// interface on which its mDNS reply arrived. Needed to send to `fe80::...`.
pub scope_id: Option<u32>,
}

impl MatterDeviceInfo {
Expand Down Expand Up @@ -198,6 +201,7 @@ pub fn to_matter_info2(msg: &DnsMessage, svc: &str) -> Result<Vec<MatterDeviceIn
session_active_interval_ms: None,
session_active_threshold_ms: None,
device_type: None,
scope_id: None,
};
services.insert(service_name, mi);
}
Expand Down Expand Up @@ -299,6 +303,7 @@ pub fn to_matter_info(msg: &DnsMessage, svc: &str) -> Result<MatterDeviceInfo> {
session_active_interval_ms: mrp.1,
session_active_threshold_ms: mrp.2,
device_type,
scope_id: None,
})
}

Expand Down Expand Up @@ -534,6 +539,19 @@ pub async fn extract_matter_info(target: &str, mdns: &mdns2::MdnsService) -> Res
let pairing_hint = txt_info.get("PH").cloned();
let device_type = txt_info.get("DT").cloned();
let (sii, sai, sat) = parse_mrp_txt(&txt_info);
// Correct scope_id for link-local addresses: the interface on which mDNS
// received the device's reply (see MdnsService::scope_for).
let mut scope_id = None;
for ip in &ips {
if let IpAddr::V6(v6) = ip {
if (v6.segments()[0] & 0xffc0) == 0xfe80 {
scope_id = mdns.scope_for(v6).await;
if scope_id.is_some() {
break;
}
}
}
}
Ok(MatterDeviceInfo {
name,
instance: target.trim_end_matches('.').to_owned(),
Expand All @@ -550,8 +568,23 @@ pub async fn extract_matter_info(target: &str, mdns: &mdns2::MdnsService) -> Res
session_active_interval_ms: sai,
session_active_threshold_ms: sat,
device_type,
scope_id,
})
}

/// Build the address string for a UDP connection. For link-local IPv6 it appends
/// the zone `%<scope_id>` (interface index); without it the OS cannot send to
/// `fe80::...`.
pub fn addr_string(ip: &IpAddr, port: u16, scope_id: Option<u32>) -> String {
match ip {
IpAddr::V6(v6) if (v6.segments()[0] & 0xffc0) == 0xfe80 => match scope_id {
Some(idx) => format!("[{}%{}]:{}", v6, idx, port),
None => format!("[{}]:{}", v6, port),
},
IpAddr::V6(v6) => format!("[{}]:{}", v6, port),
IpAddr::V4(v4) => format!("{}:{}", v4, port),
}
}
#[cfg(test)]
mod tests {
use super::*;
Expand Down
20 changes: 19 additions & 1 deletion src/mdns2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ struct MdnsServiceInner {
services: Vec<ServiceRegistration>,
local_ips_v4: Vec<Ipv4Addr>,
local_ips_v6: Vec<Ipv6Addr>,
/// Device link-local IPv6 -> index of the interface its mDNS reply arrived
/// on. That index is the correct scope_id for sending to `fe80::...`.
link_local_scopes: std::collections::HashMap<Ipv6Addr, u32>,
}

const EVENT_CHANNEL_CAPACITY: usize = 256;
Expand All @@ -57,6 +60,7 @@ pub struct MdnsService {

async fn recv_loop(
socket: Arc<UdpSocket>,
interface: Option<u32>,
inner: Arc<Mutex<MdnsServiceInner>>,
send_tx: UnboundedSender<SendCommand>,
event_tx: broadcast::Sender<MdnsEvent>,
Expand Down Expand Up @@ -101,6 +105,12 @@ async fn recv_loop(
let mut new_ptr_records = Vec::new();
for rr in &all_records {
state.cache.ingest(rr);
// Remember the receive interface for link-local AAAA records
if let (Some(idx), mdns::RRData::AAAA(ip)) = (interface, &rr.data) {
if (ip.segments()[0] & 0xffc0) == 0xfe80 {
state.link_local_scopes.insert(*ip, idx);
}
}
if rr.typ == mdns::TYPE_PTR {
if let mdns::RRData::PTR(ref target) = rr.data {
new_ptr_records.push((rr.name.clone(), target.clone()));
Expand Down Expand Up @@ -212,6 +222,7 @@ impl MdnsService {
services: Vec::new(),
local_ips_v4: v4,
local_ips_v6: v6,
link_local_scopes: std::collections::HashMap::new(),
}));

// Create sockets
Expand All @@ -223,6 +234,7 @@ impl MdnsService {
Ok(s) => mcast_sockets.push(McastSocket {
sock: Arc::new(s),
multicast_addr: MDNS_ADDR_V4,
interface: None,
}),
Err(e) => log::warn!("mdns2: failed to wrap v4 socket: {}", e),
},
Expand All @@ -244,6 +256,7 @@ impl MdnsService {
Ok(s) => mcast_sockets.push(McastSocket {
sock: Arc::new(s),
multicast_addr: MDNS_ADDR_V6,
interface: Some(idx),
}),
Err(e) => {
log::debug!("mdns2: failed to wrap v6 socket idx={}: {}", idx, e)
Expand All @@ -264,12 +277,13 @@ impl MdnsService {
// Spawn recv loops (one per socket)
for ms in &mcast_sockets {
let sock = ms.sock.clone();
let interface = ms.interface;
let inner = inner.clone();
let send_tx = send_tx.clone();
let event_tx = event_tx.clone();
let cancel = cancel.child_token();
tokio::spawn(async move {
recv_loop(sock, inner, send_tx, event_tx, cancel).await;
recv_loop(sock, interface, inner, send_tx, event_tx, cancel).await;
});
}

Expand Down Expand Up @@ -302,6 +316,10 @@ impl MdnsService {
Ok(service)
}

pub async fn scope_for(&self, ip: &Ipv6Addr) -> Option<u32> {
self.inner.lock().await.link_local_scopes.get(ip).copied()
}

/// Subscribe to discovery events.
///
/// Returns an independent [`broadcast::Receiver`]; each subscriber receives every event.
Expand Down
3 changes: 3 additions & 0 deletions src/mdns2/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ pub(super) fn get_local_ips() -> (Vec<Ipv4Addr>, Vec<Ipv6Addr>) {
pub(super) struct McastSocket {
pub sock: Arc<UdpSocket>,
pub multicast_addr: &'static str,
/// Network interface index (IPv6 sockets); used as the scope_id when sending
/// to a device's link-local address. None for IPv4.
pub interface: Option<u32>,
}

pub(super) async fn send_loop(
Expand Down
57 changes: 51 additions & 6 deletions src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ pub struct Transport {
pub struct Connection {
transport: Arc<Transport>,
remote_address: String,
/// scope_id (interface zone) for a link-local IPv6 peer, from mDNS.
scope_id: Option<u32>,
receiver: Mutex<tokio::sync::mpsc::Receiver<Vec<u8>>>,
generation: u64,
mrp: std::sync::Mutex<crate::mrp::MrpParameters>,
Expand Down Expand Up @@ -142,7 +144,7 @@ impl Transport {
.upgrade()
.context("weakpointer to self is gone - just stop")?;
let cons = self_strong.connections.lock().await;
if let Some(c) = cons.get(&addr.to_string()) {
if let Some(c) = cons.get(&scopeless_key(addr)) {
_ = c.sender.send(buf).await;
}
}
Expand Down Expand Up @@ -206,14 +208,16 @@ impl Transport {

/// Create (or replace) a logical connection entry for the given remote address.
pub async fn create_connection(self: &Arc<Self>, remote: &str) -> Arc<dyn ConnectionTrait> {
let remote = normalize_remote_for_socket(&self.socket, remote);
let (remote, scope_id) = split_scope(remote);
let remote = normalize_remote_for_socket(&self.socket, &remote);
let mut clock = self.connections.lock().await;
let generation = self.next_generation.fetch_add(1, Ordering::Relaxed);
let (sender, receiver) = tokio::sync::mpsc::channel(32);
clock.insert(remote.to_owned(), ConnectionInfo { sender, generation });
Arc::new(Connection {
transport: self.clone(),
remote_address: remote,
scope_id,
receiver: Mutex::new(receiver),
generation,
mrp: std::sync::Mutex::new(Default::default()),
Expand All @@ -223,13 +227,54 @@ impl Transport {
}
}

/// True for IPv6 link-local (fe80::/10).
fn is_link_local_v6(ip: &std::net::Ipv6Addr) -> bool {
(ip.segments()[0] & 0xffc0) == 0xfe80
}

/// Split the zone out of `[fe80::...%<idx>]:port`: returns the zone-less address and the scope_id.
fn split_scope(remote: &str) -> (String, Option<u32>) {
if let Some(pct) = remote.find('%') {
let (head, tail) = remote.split_at(pct);
let after = &tail[1..];
let digits: String = after.chars().take_while(|c| c.is_ascii_digit()).collect();
let rest = &after[digits.len()..];
if let Ok(idx) = digits.parse::<u32>() {
return (format!("{head}{rest}"), Some(idx));
}
}
(remote.to_owned(), None)
}

/// Connection key without the zone.
fn scopeless_key(addr: SocketAddr) -> String {
match addr {
// SocketAddrV6 is Copy: `mut v6` copies, so the original addr is untouched.
SocketAddr::V6(mut v6) if v6.scope_id() != 0 => {
v6.set_scope_id(0);
v6.to_string()
}
_ => addr.to_string(),
}
}

impl Connection {
/// Send a datagram to the remote address.
pub async fn send(&self, data: &[u8]) -> Result<()> {
self.transport
.socket
.send_to(data, &self.remote_address)
.await?;
let socket = &self.transport.socket;

// For link-local IPv6 attach the scope_id (interface zone from mDNS).
if let (Some(scope), Ok(SocketAddr::V6(v6))) =
(self.scope_id, self.remote_address.parse::<SocketAddr>())
{
if is_link_local_v6(v6.ip()) {
let target = std::net::SocketAddrV6::new(*v6.ip(), v6.port(), v6.flowinfo(), scope);
socket.send_to(data, SocketAddr::V6(target)).await?;
return Ok(());
}
}

socket.send_to(data, &self.remote_address).await?;
Ok(())
}
/// Receive the next datagram for this connection (with timeout).
Expand Down