ip-v8/rust-ipv8

View on GitHub
rust_ipv8/src/networking/mod.rs

Summary

Maintainability
Test Coverage
//! This module handles all the network IO and allows senders to send
//! and notifies receivers of recieved messages.

use crate::serialization::Packet;
use std::error::Error;
use mio::net::UdpSocket;
use std::thread;
use std::thread::JoinHandle;
use mio::{Poll, Token, Events, Ready, PollOpt};
use crate::configuration::Config;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use std::time::Duration;
use crate::networking::address::Address;
use rayon::scope_fifo;

pub mod address;

create_error!(SocketCreationError, "The socket creation failed");
create_error!(ListenError, "An error occured during the listening");

/// Any struct implementing this method can become a receiver of incoming network packets.
/// under normal operation, only the IPV8 struct should be a receiver of these and it should distribute it
/// through its CommunityRegistry to communities
pub trait Receiver {
    /// The callback which the [NetworkReceiver](crate::networking::NetworkReceiver) will call upon receiving a packet
    fn on_receive(&self, packet: Packet, address: Address);
}

/// A NetworkSender is a wrapper for a sending udp socket.
///
/// Every [Community](crate::community::Community) gets one of these, there called 'endpoint'.
/// A community can use it to send messages directed at other communities
/// somewhere on the network.
pub struct NetworkSender {
    /// The actual socket which is used for sending
    socket: UdpSocket,
}

impl NetworkSender {
    /// Creates a new [NetworkSender] object.
    pub fn new(sending_address: &Address) -> Result<Self, Box<dyn Error>> {
        let socket = UdpSocket::bind(&sending_address.0)?;
        debug!("Starting, sending_address: {:?}", sending_address);

        Ok(Self { socket })
    }

    /// Sends a [Packet](crate::serialization::Packet) to the specified address.
    pub fn send(&self, address: &Address, packet: Packet) -> Result<usize, Box<dyn Error>> {
        Ok(self.socket.send_to(packet.raw(), &address.0)?)
    }
}

/// A [NetworkReceiver] is another (see [NetworkSender]) wrapper for a socket.
///
/// However, this one also contains a list of [Receivers](Receiver) which get notified
/// whenever a new message is received. A [NetworkReceiver] can be started with the [start](NetworkReceiver::start) method
/// which starts a new thread, listening for any incoming message on the [Address](Address) specified.
pub struct NetworkReceiver {
    /// A list of structs implementing the [Receiver] type which will get notified upon receiving a packet
    receivers: Vec<Box<dyn Receiver + Send + Sync>>,
    /// The actual UDP socket used for receiving packets
    socket: UdpSocket,
}

impl NetworkReceiver {
    /// Creates a new [NetworkReceiver]. This creates a receiver socket and builds a new threadpool on which
    /// all messages are distributed.
    pub fn new(receiving_address: &Address) -> Result<Self, Box<dyn Error>> {
        let socket = UdpSocket::bind(&receiving_address.0)?;

        debug!("Starting, receiving_address: {:?}", receiving_address);

        let nm = Self {
            receivers: vec![],
            socket,
        };
        Ok(nm)
    }

    /// Starts the [NetworkReceiver]. This spawns a new thread in which it will listen for incoming messages.
    ///
    /// This method consumes self as it is transferred to the new thread. After this no [Receivers](Receiver) can be added to it.
    ///
    /// Returns a [`JoinHandle<()>`](std::thread::JoinHandle) which can be used to block until the [NetworkReceiver] stops listening.
    /// Under normal operation this never happens so this marks the end of the program.
    pub fn start(self, configuration: &Config) -> JoinHandle<()> {
        let queuesize = configuration.queuesize.to_owned();
        let buffersize = configuration.buffersize.to_owned();
        let pollinterval = configuration.pollinterval.to_owned();

        // Start the I/O thread
        thread::spawn(move || {
            self.listen(queuesize, buffersize, pollinterval)
                .or_else(|i| {
                    error!("the listening thread crashed");
                    Err(i)
                })
                .unwrap(); // This only panics the I/O thread not the whole application
        })
    }

    #[doc(hidden)]
    fn listen(
        self,
        queuesize: usize,
        buffersize: usize,
        pollinterval: Option<Duration>,
    ) -> Result<(), Box<dyn Error>> {
        debug!("IPV8 is starting it's listener!");

        let poll = Poll::new()?;

        let mut events = Events::with_capacity(queuesize);

        let mut tmp_buf = vec![0; buffersize];
        let buffer = tmp_buf.as_mut_slice();

        const RECEIVER: Token = Token(0);
        poll.register(&self.socket, RECEIVER, Ready::readable(), PollOpt::edge())?;

        loop {
            poll.poll(&mut events, pollinterval)?;
            trace!("checking poll");
            for _ in events.iter() {
                trace!("handling event");

                let (recv_size, address) = self.socket.recv_from(buffer)?;

                let packet = Packet(buffer[..recv_size].to_vec()).clone();

                // We want a FIFO threadpool
                scope_fifo(|s| {
                    s.spawn_fifo(|_| {
                        // iterate over the receivers asynchronously and non blocking
                        self.receivers.par_iter().for_each(|r| {
                            r.on_receive(packet.clone(), Address(address));
                        });
                    })
                });
            }
        }
    }

    /// Adds a receiver to the networkmanager. Can only happen before the networkmanager is started.
    pub fn add_receiver(&mut self, receiver: Box<dyn Receiver + Send + Sync>) {
        self.receivers.push(receiver)
    }
}

/// Taken and adapted from the [mio testing suite](https://github.com/tokio-rs/mio/blob/master/test/mod.rs#L113)
#[cfg(test)]
pub mod test_helper {
    use std::net::{SocketAddr, Ipv4Addr, IpAddr};
    use std::sync::atomic::Ordering::SeqCst;
    use std::sync::atomic::{AtomicU16};
    use crate::networking::address::Address;

    // Helper for getting a unique port for the task run
    // TODO: Reuse ports to not spam the system
    // If multiple test suites are ran at the same time, this _will_ fail as it will try to bind to the same FIRST_PORT
    const FIRST_PORT: u16 = 18080;
    static NEXT_PORT: AtomicU16 = AtomicU16::new(FIRST_PORT);
    pub const LOCALHOST_IP: Ipv4Addr = Ipv4Addr::new(127, 0, 0, 1);

    fn next_port() -> u16 {
        // Get and increment the port list
        NEXT_PORT.fetch_add(1, SeqCst)
    }

    pub fn localhost() -> Address {
        Address(localhost_socket())
    }

    pub fn localhost_socket() -> SocketAddr {
        SocketAddr::new(IpAddr::V4(LOCALHOST_IP), next_port())
    }
}

#[cfg(test)]
mod tests {
    use lazy_static::lazy_static;

    use crate::IPv8;
    use crate::configuration::Config;
    use mio::net::UdpSocket;

    use std::net::{SocketAddr, IpAddr};
    use crate::serialization::Packet;
    use std::sync::Once;
    use std::time::Duration;
    use crate::networking::{Receiver, NetworkSender, NetworkReceiver};
    use std::thread;
    use std::sync::atomic::{AtomicUsize, Ordering, AtomicU16};
    use crate::networking::address::Address;
    use crate::networking::test_helper::{localhost, localhost_socket, LOCALHOST_IP};

    static BEFORE: Once = Once::new();

    // A poor man's @Before
    fn before() {
        BEFORE.call_once(|| {
            simple_logger::init().unwrap();
        })
    }

    #[test]
    fn test_networkmanager() {
        before();

        // start ipv8
        let mut config = Config::default();

        config.receiving_address = localhost();
        config.sending_address = localhost();
        config.buffersize = 2048;

        let mut ipv8 = IPv8::new(config).unwrap();

        let sender_socket = UdpSocket::bind(&localhost_socket()).unwrap();

        static SEND_PORT: AtomicU16 = AtomicU16::new(0);

        let recv_port: u16 = ipv8.network_receiver.socket.local_addr().unwrap().port();
        let send_port: u16 = sender_socket.local_addr().unwrap().port();

        SEND_PORT.store(send_port, Ordering::SeqCst);

        lazy_static! {
            static ref OGPACKET: Packet = Packet::new(create_test_header!()).unwrap();
        }

        static PACKET_COUNTER: AtomicUsize = AtomicUsize::new(0);

        //create receiver
        struct AReceiver;
        impl Receiver for AReceiver {
            fn on_receive(&self, packet: Packet, address: Address) {
                assert_eq!(OGPACKET.raw(), packet.raw());
                assert_eq!(SEND_PORT.load(Ordering::SeqCst), (address.0).port());

                // Count each packet
                PACKET_COUNTER.fetch_add(1, Ordering::SeqCst);
            }
        }

        ipv8.network_receiver.add_receiver(Box::new(AReceiver));

        ipv8.start();

        // now try to send ipv8 a message
        sender_socket
            .connect(SocketAddr::new(IpAddr::V4(LOCALHOST_IP), recv_port))
            .unwrap();

        let a = sender_socket.send(OGPACKET.raw()).unwrap();
        assert_eq!(a, OGPACKET.raw().len());

        thread::sleep(Duration::from_millis(20));

        let b = sender_socket.send(OGPACKET.raw()).unwrap();
        assert_eq!(b, OGPACKET.raw().len());

        thread::sleep(Duration::from_millis(20));

        // a poor man's `verify(AReceiver, times(2)).on_receiver();`
        assert_eq!(2, PACKET_COUNTER.load(std::sync::atomic::Ordering::SeqCst));
    }

    #[test]
    fn test_sending_networkmanager() {
        before();

        // start ipv8
        let mut config = Config::default();

        config.receiving_address = localhost();
        config.sending_address = localhost();
        config.buffersize = 2048;

        // let mut ipv8 = IPv8::new(config).unwrap();
        let ns = NetworkSender::new(&config.sending_address).unwrap();
        let mut nr = NetworkReceiver::new(&config.receiving_address).unwrap();

        static SEND_PORT: AtomicU16 = AtomicU16::new(0);

        let recv_port: u16 = nr.socket.local_addr().unwrap().port();
        let send_port: u16 = ns.socket.local_addr().unwrap().port();

        SEND_PORT.store(send_port, Ordering::SeqCst);

        lazy_static! {
            static ref OGPACKET: Packet = Packet::new(create_test_header!()).unwrap();
        }

        static PACKET_COUNTER: AtomicUsize = AtomicUsize::new(0);

        //create receiver
        struct AReceiver;
        impl Receiver for AReceiver {
            fn on_receive(&self, packet: Packet, address: Address) {
                assert_eq!(OGPACKET.raw(), packet.raw());
                assert_eq!(SEND_PORT.load(Ordering::SeqCst), (address.0).port());

                // Count each packet
                PACKET_COUNTER.fetch_add(1, Ordering::SeqCst);
            }
        }

        nr.add_receiver(Box::new(AReceiver));
        nr.start(&config);

        let addr = Address(SocketAddr::new(IpAddr::V4(LOCALHOST_IP), recv_port));

        ns.send(&addr, Packet(OGPACKET.raw().to_vec())).unwrap();

        thread::sleep(Duration::from_millis(20));

        // a poor man's `verify(AReceiver, times(2)).on_receiver();`
        assert_eq!(1, PACKET_COUNTER.load(std::sync::atomic::Ordering::SeqCst));
    }
}