Kuangcp/JavaBase

View on GitHub
network/src/main/java/com/github/kuangcp/nio/NIOServer.java

Summary

Maintainability
A
3 hrs
Test Coverage
package com.github.kuangcp.nio;

import com.github.kuangcp.io.ResourceTool;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;

/**
 * Created by Myth on 2017/4/3 0003
 * NIO Server端,节省线程
 * 该程序启动就建立了一个可监听连接请求的ServerSocketChannel,并将该Channel注册到指定的Selector
 *
 * 从JDK1.4开始,Java提供了NIO API来提供开发
 * 之前的都是一个线程处理一个客户端,线程资源消耗大,因为之前那些处理方式在程序输入输出时会线程阻塞,NIO就不会
 * NIO 采用多路复用和轮询的机制, 将阻塞都让一个线程来处理,然后提取出准备好的IO来操作, 提高性能, 请求和线程比为 n:1
 * 原先的BIO是 1:1
 */
@Slf4j
class NIOServer {

  static final int PORT = 30000;

  //定义实现编码,解码的字符集
  static Charset charset = StandardCharsets.UTF_8;

  private volatile boolean stop = false;

  public static void main(String[] s) throws Exception {
    new NIOServer().start();
  }

  // selector 模型 轮询
  private void start() throws Exception {
    Selector selector = Selector.open();

    //通过OPEN方法来打开一个未绑定的ServerSocketChannel 实例
    ServerSocketChannel server = ServerSocketChannel.open();

    //将该ServerSocketChannel绑定到指定 ip 端口
    server.bind(new InetSocketAddress(PORT));

    //设置是NIO 非阻塞模式
    server.configureBlocking(false);

    //将sever注册到指定Selector对象上
    server.register(selector, SelectionKey.OP_ACCEPT);

    while (!stop) {
      // 因为只有在选择了至少一个通道后,才会返回该选择器的唤醒方法,或者当前线程中断,以先到者为准。否则一直阻塞
      selector.select();

      //依次处理selector上的每个已经准备好的管道
      Set<SelectionKey> selectedKeys = selector.selectedKeys();
      for (SelectionKey sk : selectedKeys) {
        //从selector上的已选择key集 中删除正在处理的连接请求
        selector.selectedKeys().remove(sk);

        // 连接 accept
        if (sk.isAcceptable()) {
          register(selector, server, sk);
        }
        // 连接建立
        else if (sk.isConnectable()) {
          log.info("close");
        }
        // 读就绪
        else if (sk.isReadable()) {
          readContent(selector, sk);
        }
        // 写就绪
        else if (sk.isWritable()) {
          log.info("writeable");
        }
      }
    }
  }

  private void register(Selector selector, ServerSocketChannel server, SelectionKey sk)
      throws IOException {
    //调用accept方法,产生服务器端的SocketChannel
    SocketChannel sc = server.accept();
    sc.configureBlocking(false);//NIO模式
    sc.register(selector, SelectionKey.OP_READ);//注册到selector上
    sk.interestOps(SelectionKey.OP_ACCEPT);//将sk对应的Channel设置成准备接受其他请求
  }

  // 解析数据
  private void readContent(Selector selector, SelectionKey sk) throws IOException {
    SocketChannel sc = (SocketChannel) sk.channel();
    ByteBuffer buff = ByteBuffer.allocate(1024);

    StringBuilder content = new StringBuilder();
    try {
      int s;
      while ((s = sc.read(buff)) > 0) {
        buff.flip();
        content.append(charset.decode(buff));
      }
      buff.clear();
      if (s == -1) {
        System.out.println("close");
        sc.close();
      }

      if (content.length() == 0) {
        return;
      }

      log.info("receive msg: {}", content.toString());
//      TimeUnit.MILLISECONDS.sleep(700);

      if ("stop".equalsIgnoreCase(content.toString())) {
        stop();
        log.info("stop the sever");
        return;
      }

      sk.interestOps(SelectionKey.OP_READ);//设置成准备下次读取
    } catch (Exception e) {
      //从Selector中删除指定的SelectionKey
      sk.cancel();
      ResourceTool.close(sk.channel());
    }

    if (content.length() <= 0) {
      return;
    }

    // 遍历selector里注册的所有SelectionKey
    // 也就是广播消息出去
    for (SelectionKey key : selector.keys()) {
      Channel targetChannel = key.channel();//获取Channel
      //如果改Channel是SocketChannel是SocketChannel对象
      if (targetChannel instanceof SocketChannel) {
        //将读到的内容写入到该Channel中去
        SocketChannel dest = (SocketChannel) targetChannel;
        dest.write(charset.encode(content.toString()));
      }
    }
  }

  private void stop() {
    this.stop = true;
  }
}