eventmachine/eventmachine

View on GitHub
java/src/com/rubyeventmachine/EventableDatagramChannel.java

Summary

Maintainability
A
1 hr
Test Coverage
/**
 * $Id$
 * 
 * Author:: Francis Cianfrocca (gmail: blackhedd)
 * Homepage::  http://rubyeventmachine.com
 * Date:: 15 Jul 2007
 * 
 * See EventMachine and EventMachine::Connection for documentation and
 * usage examples.
 * 
 *
 *----------------------------------------------------------------------------
 *
 * Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
 * Gmail: blackhedd
 * 
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of either: 1) the GNU General Public License
 * as published by the Free Software Foundation; either version 2 of the
 * License, or (at your option) any later version; or 2) Ruby's License.
 * 
 * See the file COPYING for complete licensing information.
 *
 *---------------------------------------------------------------------------
 *
 * 
 */


package com.rubyeventmachine;

import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.DatagramChannel;
import java.util.LinkedList;
import java.io.*;
import java.net.*;

public class EventableDatagramChannel implements EventableChannel {
    
    class Packet {
        public ByteBuffer bb;
        public SocketAddress recipient;
        public Packet (ByteBuffer _bb, SocketAddress _recipient) {
            bb = _bb;
            recipient = _recipient;
        }
    }
    
    DatagramChannel channel;
    long binding;
    Selector selector;
    boolean bCloseScheduled;
    LinkedList<Packet> outboundQ;
    long outboundS;
    SocketAddress returnAddress;
    

    public EventableDatagramChannel (DatagramChannel dc, long _binding, Selector sel) throws ClosedChannelException {
        channel = dc;
        binding = _binding;
        selector = sel;
        bCloseScheduled = false;
        outboundQ = new LinkedList<Packet>();
        outboundS = 0;
        
        dc.register(selector, SelectionKey.OP_READ, this);
    }

    public void scheduleOutboundData (ByteBuffer bb) {
        try {
            if ((!bCloseScheduled) && (bb.remaining() > 0)) {
                outboundQ.addLast(new Packet(bb, returnAddress));
                outboundS += bb.remaining();
                channel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, this);
            }
        } catch (ClosedChannelException e) {
            throw new RuntimeException ("no outbound data");            
        }
    }
    
    public void scheduleOutboundDatagram (ByteBuffer bb, String recipAddress, int recipPort) {
        try {
            if ((!bCloseScheduled) && (bb.remaining() > 0)) {
                outboundQ.addLast(new Packet (bb, new InetSocketAddress (recipAddress, recipPort)));
                outboundS += bb.remaining();
                channel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, this);
            }
        } catch (ClosedChannelException e) {
            throw new RuntimeException ("no outbound data");            
        }
    }
    
    public boolean scheduleClose (boolean afterWriting) {
        System.out.println ("NOT SCHEDULING CLOSE ON DATAGRAM");
        return false;
    }
    
    public void startTls() {
        throw new RuntimeException ("TLS is unimplemented on this Channel");
    }
    
    public long getBinding() {
        return binding;
    }

    public void register() throws ClosedChannelException {
        // TODO
    }

    /**
     * Terminate with extreme prejudice. Don't assume there will be another pass through
     * the reactor core.
     */
    public void close() {
        try {
            channel.close();
        } catch (IOException e) {
        }
    }
    
    public void readInboundData (ByteBuffer dst) {
        returnAddress = null;
        try {
            // If there is no datagram available (we're nonblocking after all),
            // then channel.receive returns null.
            returnAddress = channel.receive(dst);
        } catch (IOException e) {
            // probably a no-op. The caller will see the empty (or even partial) buffer
            // and presumably do the right thing.
        }
    }
    
    public boolean writeOutboundData() {
        while (!outboundQ.isEmpty()) {
            Packet p = outboundQ.getFirst();
            int written = 0;
            try {
                // With a datagram socket, it's ok to send an empty buffer.
                written = channel.send(p.bb, p.recipient);
                outboundS -= written;
            }
            catch (IOException e) {
                return false;
            }

            /* Did we consume the whole outbound buffer? If yes, pop it off and
             * keep looping. If no, the outbound network buffers are full, so break
             * out of here. There's a flaw that affects outbound buffers that are intentionally
             * empty. We can tell whether they got sent or not. So we assume they were.
             * TODO: As implemented, this ALWAYS discards packets if they were at least
             * partially written. This matches the behavior of the C++ EM. My judgment
             * is that this is less surprising than fragmenting the data and sending multiple
             * packets would be. I could be wrong, so this is subject to change.
             */

            if ((written > 0) || (p.bb.remaining() == 0))
                outboundQ.removeFirst();
            else
                break;
        }

        if (outboundQ.isEmpty()) {
            try {
                channel.register(selector, SelectionKey.OP_READ, this);
            } catch (ClosedChannelException e) {}
        }
        
        // ALWAYS drain the outbound queue before triggering a connection close.
        // If anyone wants to close immediately, they're responsible for clearing
        // the outbound queue.
        return (bCloseScheduled && outboundQ.isEmpty()) ? false : true;
    }

    public void setCommInactivityTimeout (long seconds) {
        // TODO
        System.out.println ("DATAGRAM: SET COMM INACTIVITY UNIMPLEMENTED " + seconds);
    }

    public Object[] getPeerName () {
        if (returnAddress != null) {
            InetSocketAddress inetAddr = (InetSocketAddress) returnAddress;
            return new Object[]{ inetAddr.getPort(), inetAddr.getHostName() };
        } else {
            return null;
        }
    }

    public Object[] getSockName () {
        DatagramSocket socket = channel.socket();
        return new Object[]{ socket.getLocalPort(),
                             socket.getLocalAddress().getHostAddress() };
    }

    public boolean isWatchOnly() { return false; }
    public boolean isNotifyReadable() { return false; }
    public boolean isNotifyWritable() { return false; }
    public long getOutboundDataSize() { return outboundS; }
}