turboladen/pants

View on GitHub
lib/pants/readers/udp_reader.rb

Summary

Maintainability
A
0 mins
Test Coverage
require 'eventmachine'
require_relative 'base_reader'
require_relative '../network_helpers'


class Pants
  module Readers

    # This is the EventMachine connection that reads on the source IP and UDP
    # port.  It places all read data onto the data channel.  Allows for unicast or
    # multicast addresses; it'll detect which to use from the IP you pass in.
    class UDPReaderConnection < EventMachine::Connection
      include LogSwitch::Mixin
      include Pants::NetworkHelpers

      # @param [EventMachine::Channel] write_to_channel The data channel to write
      #   read data to.
      #
      # @param [EventMachine::Callback] starter_callback The callback that
      #   should get called when the connection has been fully initialized.
      def initialize(write_to_channel, starter_callback)
        @write_to_channel = write_to_channel
        @starter_callback = starter_callback
        port, ip = Socket.unpack_sockaddr_in(get_sockname)

        if Addrinfo.ip(ip).ipv4_multicast? || Addrinfo.ip(ip).ipv6_multicast?
          log "Got a multicast address: #{ip}:#{port}"
          setup_multicast_socket(ip)
        else
          log "Got a unicast address: #{ip}:#{port}"
        end
      end

      def post_init
        @starter_callback.call
      end

      # Reads the data and writes it to the data channel.
      #
      # @param [String] data The socket data to write to the channel.
      def receive_data(data)
        @write_to_channel << data
      end
    end


    # This is the interface for UDPReaderConnections.  It controls what happens
    # when the you want to start it up and stop it.
    class UDPReader < BaseReader
      include LogSwitch::Mixin

      # @param [String] host The IP address to read on.
      #
      # @param [Fixnum] port The UDP port to read on.
      #
      # @param [EventMachine::Callback] core_stopper_callback The Callback that will get
      #   called when #stopper is called.  Since there is no clear end to when
      #   to stop reading this I/O, #stopper is never called internally; it must
      #   be called externally.
      def initialize(host, port, core_stopper_callback)
        @read_object = "udp://#{host}:#{port}"
        @host = host
        @port = port

        super(core_stopper_callback)
      end

      # Starts reading on the UDP IP and port and pushing packets to the channel.
      def start
        callback = EM.Callback do
          log "Adding a #{self.class} at #{@host}:#{@port}..."
          EM.open_datagram_socket(@host, @port, UDPReaderConnection,
            @write_to_channel, starter)
        end

        super(callback)
      end
    end
  end
end