Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented unicast responses to direct unicast, unicast questions and legacy unicast queries. #190

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ if-addrs = { version = "0.10", features = ["link-local"] } # get local IP addres
log = { version = "0.4", optional = true } # logging
polling = "2.1" # select/poll sockets
socket2 = { version = "0.5.5", features = ["all"] } # socket APIs
socket-pktinfo = { version = "0.2.1" } # socket packet info extension

[dev-dependencies]
fastrand = "1.8"
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ This implementation is based on the following RFCs:

This is still beta software. We focus on the common use cases at hand. And we tested with some existing common tools (e.g. `Avahi` on Linux, `dns-sd` on MacOS, and `Bonjour` library on iOS) to verify the basic compatibility.

Currently this library has the following limitations:
- Only support multicast, no unicast send/recv.

## License

Licensed under either of
Expand Down
7 changes: 7 additions & 0 deletions src/dns_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub(crate) const MAX_MSG_ABSOLUTE: usize = 8972;
pub(crate) const FLAGS_QR_MASK: u16 = 0x8000; // mask for query/response bit
pub(crate) const FLAGS_QR_QUERY: u16 = 0x0000;
pub(crate) const FLAGS_QR_RESPONSE: u16 = 0x8000;
pub(crate) const FLAGS_UNICAST_RESPONSE_MASK: u16 = 0x8000; // mask for question unicast response bit
pixsperdavid marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) const FLAGS_AA: u16 = 0x0400; // mask for Authoritative answer bit

pub(crate) type DnsRecordBox = Box<dyn DnsRecordExt + Send>;
Expand Down Expand Up @@ -77,6 +78,12 @@ pub(crate) struct DnsQuestion {
pub(crate) entry: DnsEntry,
}

impl DnsQuestion {
pub fn is_unicast_response_requested(&self) -> bool {
self.entry.class & FLAGS_UNICAST_RESPONSE_MASK == FLAGS_UNICAST_RESPONSE_MASK
}
}

/// A DNS Resource Record - like a DNS entry, but has a TTL.
/// RFC: https://www.rfc-editor.org/rfc/rfc1035#section-3.2.1
/// https://www.rfc-editor.org/rfc/rfc1035#section-4.1.3
Expand Down
157 changes: 129 additions & 28 deletions src/service_daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,15 @@ use crate::{
Receiver,
};
use flume::{bounded, Sender, TrySendError};
use if_addrs::Interface;
use if_addrs::{IfAddr, Interface};
use polling::Poller;
use socket2::{SockAddr, Socket};
use socket2::SockAddr;
use socket_pktinfo::PktInfoUdpSocket;
use std::cmp::min;
use std::{
cmp,
collections::{HashMap, HashSet},
fmt,
io::Read,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, UdpSocket},
str, thread,
time::Duration,
Expand All @@ -75,6 +76,8 @@ const LOOPBACK_V4: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);

const RESOLVE_WAIT_IN_MILLIS: u64 = 500;

const LEGACY_UNICAST_RR_TTL: u32 = 10;
pixsperdavid marked this conversation as resolved.
Show resolved Hide resolved

/// Response status code for the service `unregister` call.
#[derive(Debug)]
pub enum UnregisterStatus {
Expand Down Expand Up @@ -105,6 +108,8 @@ enum Counter {
UnregisterResend,
Browse,
Respond,
RespondUnicast,
RespondLegacyUnicast,
CacheRefreshQuery,
}

Expand All @@ -117,6 +122,8 @@ impl fmt::Display for Counter {
Counter::UnregisterResend => write!(f, "unregister-resend"),
Counter::Browse => write!(f, "browse"),
Counter::Respond => write!(f, "respond"),
Counter::RespondUnicast => write!(f, "respond-unicast"),
Counter::RespondLegacyUnicast => write!(f, "respond-legacy-unicast"),
Counter::CacheRefreshQuery => write!(f, "cache-refresh"),
}
}
Expand Down Expand Up @@ -691,7 +698,7 @@ impl ServiceDaemon {
}

/// Creates a new UDP socket that uses `intf` to send and recv multicast.
fn new_socket_bind(intf: &Interface) -> Result<Socket> {
fn new_socket_bind(intf: &Interface) -> Result<PktInfoUdpSocket> {
// Use the same socket for receiving and sending multicast packets.
// Such socket has to bind to INADDR_ANY or IN6ADDR_ANY.
let intf_ip = &intf.ip();
Expand Down Expand Up @@ -738,31 +745,30 @@ fn new_socket_bind(intf: &Interface) -> Result<Socket> {

/// Creates a new UDP socket to bind to `port` with REUSEPORT option.
/// `non_block` indicates whether to set O_NONBLOCK for the socket.
fn new_socket(addr: SocketAddr, non_block: bool) -> Result<Socket> {
fn new_socket(addr: SocketAddr, non_block: bool) -> Result<PktInfoUdpSocket> {
let domain = match addr {
SocketAddr::V4(_) => socket2::Domain::IPV4,
SocketAddr::V6(_) => socket2::Domain::IPV6,
};

let fd = Socket::new(domain, socket2::Type::DGRAM, None)
.map_err(|e| e_fmt!("create socket failed: {}", e))?;
let sock = PktInfoUdpSocket::new(domain).map_err(|e| e_fmt!("create socket failed: {}", e))?;

fd.set_reuse_address(true)
sock.set_reuse_address(true)
.map_err(|e| e_fmt!("set ReuseAddr failed: {}", e))?;
#[cfg(unix)] // this is currently restricted to Unix's in socket2
fd.set_reuse_port(true)
sock.set_reuse_port(true)
.map_err(|e| e_fmt!("set ReusePort failed: {}", e))?;

if non_block {
fd.set_nonblocking(true)
sock.set_nonblocking(true)
.map_err(|e| e_fmt!("set O_NONBLOCK: {}", e))?;
}

fd.bind(&addr.into())
sock.bind(&addr.into())
.map_err(|e| e_fmt!("socket bind to {} failed: {}", &addr, e))?;

debug!("new socket bind to {}", &addr);
Ok(fd)
Ok(sock)
}

/// Specify a UNIX timestamp in millis to run `command` for the next time.
Expand All @@ -777,7 +783,7 @@ struct ReRun {
#[derive(Debug)]
struct IntfSock {
intf: Interface,
sock: Socket,
sock: PktInfoUdpSocket,
}

/// Specify kinds of interfaces. It is used to enable or to disable interfaces in the daemon.
Expand Down Expand Up @@ -882,7 +888,7 @@ struct Zeroconf {
/// Next poll id value
poll_id_count: usize,

/// Local registered services, keyed by service full names.
/// Local registered services,Keyed by service full names.
my_services: HashMap<String, ServiceInfo>,

cache: DnsCache,
Expand Down Expand Up @@ -1128,7 +1134,7 @@ impl Zeroconf {

fn add_new_interface(&mut self, intf: Interface) {
// Bind the new interface.
let new_ip = intf.ip();
let new_ip = intf.addr.ip();
let sock = match new_socket_bind(&intf) {
Ok(s) => s,
Err(e) => {
Expand Down Expand Up @@ -1389,6 +1395,7 @@ impl Zeroconf {
Some(if_sock) => if_sock,
None => return false,
};

let mut buf = vec![0u8; MAX_MSG_ABSOLUTE];

// Read the next mDNS UDP datagram.
Expand All @@ -1397,8 +1404,8 @@ impl Zeroconf {
// be truncated by the socket layer depending on the platform's libc.
// In any case, such large datagram will not be decoded properly and
// this function should return false but should not crash.
let sz = match intf_sock.sock.read(&mut buf) {
Ok(sz) => sz,
let (sz, pktinfo) = match intf_sock.sock.recv(&mut buf) {
Ok(r) => r,
Err(e) => {
if e.kind() != std::io::ErrorKind::WouldBlock {
error!("listening socket read failed: {}", e);
Expand Down Expand Up @@ -1430,12 +1437,48 @@ impl Zeroconf {

buf.truncate(sz); // reduce potential processing errors

let is_unicast = !pktinfo.addr_dst.is_multicast();

// Ignore unicast packets outside the local link
if is_unicast {
let should_respond = match (pktinfo.addr_src.ip(), &intf_sock.intf.addr) {
pixsperdavid marked this conversation as resolved.
Show resolved Hide resolved
(IpAddr::V4(src_ip), IfAddr::V4(intf)) => {
if src_ip.is_loopback() {
true
} else {
let src_ip: u32 = src_ip.into();
let intf_ip: u32 = intf.ip.into();
let intf_netmask: u32 = intf.netmask.into();
// Is src_ip in local subnet?
(intf_ip & intf_netmask) == (src_ip & intf_netmask)
}
}
(IpAddr::V6(src_ip), &IfAddr::V6(_)) => {
if src_ip.is_loopback() {
true
} else {
// Does src_ip have on-link prefix?
src_ip.segments()[0] & 0xffc0 == 0xfe80
}
}
// Interface and source message IP versions do not match
_ => false,
};
if !should_respond {
return true;
}
};

match DnsIncoming::new(buf) {
Ok(msg) => {
if msg.is_query() {
self.handle_query(msg, ip);
self.handle_query(msg, ip, pktinfo.addr_src, is_unicast);
} else if msg.is_response() {
self.handle_response(msg);
if !is_unicast {
self.handle_response(msg);
} else {
error!("Invalid message: unrequested unicast response");
keepsimple1 marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
error!("Invalid message: not query and not response");
}
Expand Down Expand Up @@ -1724,12 +1767,24 @@ impl Zeroconf {
}
}

fn handle_query(&mut self, msg: DnsIncoming, ip: &IpAddr) {
fn handle_query(
&mut self,
msg: DnsIncoming,
ip: &IpAddr,
src_addr: SocketAddr,
is_unicast_query: bool,
) {
let intf_sock = match self.intf_socks.get(ip) {
Some(sock) => sock,
None => return,
};
let mut out = DnsOutgoing::new(FLAGS_QR_RESPONSE | FLAGS_AA);
let is_legacy_unicast = is_unicast_query && src_addr.port() != MDNS_PORT;
pixsperdavid marked this conversation as resolved.
Show resolved Hide resolved
let is_unicast_reply = is_unicast_query
&& msg
.questions
.iter()
.all(|q| q.is_unicast_response_requested());

// Special meta-query "_services._dns-sd._udp.<Domain>".
// See https://datatracker.ietf.org/doc/html/rfc6763#section-9
Expand All @@ -1755,7 +1810,11 @@ impl Zeroconf {
&question.entry.name,
TYPE_PTR,
CLASS_IN,
service.get_other_ttl(),
if is_legacy_unicast {
min(LEGACY_UNICAST_RR_TTL, service.get_host_ttl())
} else {
service.get_other_ttl()
},
service.get_type().to_string(),
)),
);
Expand Down Expand Up @@ -1792,7 +1851,11 @@ impl Zeroconf {
&question.entry.name,
t,
CLASS_IN | CLASS_UNIQUE,
service.get_host_ttl(),
if is_legacy_unicast {
min(LEGACY_UNICAST_RR_TTL, service.get_host_ttl())
} else {
service.get_host_ttl()
},
address,
)),
);
Expand All @@ -1813,7 +1876,11 @@ impl Zeroconf {
Box::new(DnsSrv::new(
&question.entry.name,
CLASS_IN | CLASS_UNIQUE,
service.get_host_ttl(),
if is_legacy_unicast {
min(LEGACY_UNICAST_RR_TTL, service.get_host_ttl())
} else {
service.get_host_ttl()
},
service.get_priority(),
service.get_weight(),
service.get_port(),
Expand All @@ -1829,7 +1896,11 @@ impl Zeroconf {
&question.entry.name,
TYPE_TXT,
CLASS_IN | CLASS_UNIQUE,
service.get_host_ttl(),
if is_legacy_unicast {
min(LEGACY_UNICAST_RR_TTL, service.get_host_ttl())
} else {
service.get_host_ttl()
},
service.generate_txt(),
)),
);
Expand All @@ -1853,7 +1924,11 @@ impl Zeroconf {
service.get_hostname(),
t,
CLASS_IN | CLASS_UNIQUE,
service.get_host_ttl(),
if is_legacy_unicast {
min(LEGACY_UNICAST_RR_TTL, service.get_host_ttl())
} else {
service.get_host_ttl()
},
pixsperdavid marked this conversation as resolved.
Show resolved Hide resolved
address,
)));
}
Expand All @@ -1863,9 +1938,18 @@ impl Zeroconf {

if !out.answers.is_empty() {
out.id = msg.id;
broadcast_dns_on_intf(&out, intf_sock);

self.increase_counter(Counter::Respond, 1);
if is_unicast_reply {
if is_legacy_unicast {
unicast_dns_on_intf(&out, src_addr.ip(), intf_sock);
self.increase_counter(Counter::RespondLegacyUnicast, 1);
} else {
unicast_dns_on_intf(&out, src_addr.ip(), intf_sock);
pixsperdavid marked this conversation as resolved.
Show resolved Hide resolved
self.increase_counter(Counter::RespondUnicast, 1);
}
} else {
broadcast_dns_on_intf(&out, intf_sock);
self.increase_counter(Counter::Respond, 1);
}
}
}

Expand Down Expand Up @@ -2282,6 +2366,23 @@ fn broadcast_on_intf<'a>(packet: &'a [u8], intf: &IntfSock) -> &'a [u8] {
packet
}

/// Send an outgoing unicast DNS query or response, and returns the packet bytes.
fn unicast_dns_on_intf(out: &DnsOutgoing, ip_addr: IpAddr, intf: &IntfSock) -> Vec<u8> {
let qtype = if out.is_query() { "query" } else { "response" };
debug!(
"Unicasting ({}) {}: {} questions {} answers {} authorities {} additional",
ip_addr,
qtype,
out.questions.len(),
out.answers.len(),
out.authorities.len(),
out.additionals.len()
);
let packet = out.to_packet_data();
send_packet(&packet[..], SocketAddr::new(ip_addr, MDNS_PORT), intf);
packet
}

/// Sends out `packet` to `addr` on the socket in `intf_sock`.
fn send_packet(packet: &[u8], addr: SocketAddr, intf_sock: &IntfSock) {
let sockaddr = SockAddr::from(addr);
Expand Down