netty/src/main/java/netty/websocket/client/WebSocketClientHandler.java
package netty.websocket.client;
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
*
* @author Kuangcp
* 2024-03-29 14:16
*/
@Slf4j
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
private final WebSocketClientHandshaker shaker;
private ChannelPromise handshakeFuture;
private ScheduledExecutorService pool;
public WebSocketClientHandler(WebSocketClientHandshaker shaker, ScheduledExecutorService pool) {
this.shaker = shaker;
this.pool = pool;
}
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
handshakeFuture = ctx.newPromise();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
shaker.handshake(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("WebSocket Client disconnected!");
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!shaker.isHandshakeComplete()) {
try {
shaker.finishHandshake(ch, (FullHttpResponse) msg);
log.info("WebSocket Client connected!");
handshakeFuture.setSuccess();
pool.scheduleAtFixedRate(() -> {
ch.writeAndFlush(new TextWebSocketFrame("ping"));
}, 5, 2, TimeUnit.SECONDS);
} catch (WebSocketHandshakeException e) {
System.out.println("WebSocket Client failed to connect");
handshakeFuture.setFailure(e);
}
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
// System.out.println("WebSocket Client received message: " + textFrame.text());
} else if (frame instanceof PongWebSocketFrame) {
// System.out.println("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
log.warn("WebSocket Client received closing");
ch.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("", cause);
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
ctx.close();
}
}