Netty搭建WebSocket服务端 - 青衫仗剑 - 博客园

目录

返回目录

Netty服务端

1.引入依赖

复制代码; "复制代码")

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.9.RELEASE</version> <!-- 我这里用的1.5.9 -->
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.blaze</groupId>
<artifactId>netty-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>netty-demo</name>
<description>Demo project for Spring Boot</description>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-thymeleaf</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.4</version>
    </dependency>

    <!--fastjson-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.50</version>
    </dependency>

    <!--netty依赖-->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.43.Final</version>
    </dependency>

</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <configuration>
                <mainClass>com.blaze.nettydemo.server.WebSocketServer</mainClass>
            </configuration>
        </plugin>
    </plugins>
    <finalName>netty-server</finalName>
</build>

</project>

复制代码; "复制代码")

2.服务端

WebSocketServer

复制代码; "复制代码")

import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; /** * create by zy 2019/11/28 14:50
* TODO */
public final class WebSocketServer { //static final boolean SSL = System.getProperty("ssl") != null; //static final int PORT = Integer.parseInt(System.getProperty("port", SSL ? "8443" : "8888"));

static final boolean SSL = false; static final int PORT = 8888; public static void main(String\[\] args) throws Exception { // Configure SSL. 配置 SSL
    final SslContext sslCtx; if (SSL) {
        SelfSignedCertificate ssc = new SelfSignedCertificate();
        sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
    } else {
        sslCtx = null;
    } /** \* interface EventLoopGroup extends EventExecutorGroup extends ScheduledExecutorService extends ExecutorService
     \* 配置服务端的 NIO 线程池,用于网络事件处理,实质上他们就是 Reactor 线程组
     \* bossGroup 用于服务端接受客户端连接,workerGroup 用于进行 SocketChannel 网络读写 */ EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // ServerBootstrap 是 Netty 用于启动 NIO 服务端的辅助启动类,用于降低开发难度
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new WebSocketServerInitializer(sslCtx)); //服务器启动辅助类配置完成后,调用 bind 方法绑定监听端口,调用 sync 方法同步等待绑定操作完成,服务开启
        Channel ch = b.bind(PORT).sync().channel();
        System.out.println("服务已开启,等待客户端连接......"); //下面会进行阻塞,等待服务器连接关闭之后 main 方法退出,程序结束

ch.closeFuture().sync();

    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally { //退出 释放资源

bossGroup.shutdownGracefully();

        workerGroup.shutdownGracefully();
    }
}

}

复制代码; "复制代码")

WebSocketServerInitializer

复制代码; "复制代码")

import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import io.netty.handler.ssl.SslContext; /** * create by zy 2019/11/28 14:53
* TODO */
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> { private static final String WEBSOCKET_PATH = "/"; private final SslContext sslCtx; public WebSocketServerInitializer(SslContext sslCtx) { this.sslCtx = sslCtx;

}

@Override public void initChannel(SocketChannel ch) {
    ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) {
        pipeline.addLast(sslCtx.newHandler(ch.alloc())); // 设置 https 相关

}

    pipeline.addLast(new HttpServerCodec()); // http 编码
    pipeline.addLast(new HttpObjectAggregator(65536)); // http 消息聚合器
    pipeline.addLast(new WebSocketServerCompressionHandler()); // 压缩 可以不设置
    pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true)); // 协议
    pipeline.addLast(new WebSocketFrameHandler()); // 处理WebSocketFrame

}
}

复制代码; "复制代码")

WebSocketFrameHandler

复制代码; "复制代码")

import com.alibaba.fastjson.JSON; import com.rising.netty.model.RequestModel; import com.rising.netty.model.ResultModel; import com.rising.netty.util.MQUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; /** * create by zy 2019/11/28 14:57
* TODO */
public class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

@Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { if (frame instanceof TextWebSocketFrame) {
        String request = ((TextWebSocketFrame) frame).text();
        System.out.println("接收消息:" + request);

        String msg = "接收成功"; //返回信息
        ctx.channel().writeAndFlush(new TextWebSocketFrame(msg));
    } else if (frame instanceof BinaryWebSocketFrame) { //二进制
        ByteBuf content = frame.content(); byte\[\] reg = new byte\[content.readableBytes()\];
        content.readBytes(reg);
        String request = new String(reg, "UTF-8");
        System.out.println("接收消息:" + request);

        String msg = "接收成功"; //返回信息
        ByteBuf respByteBuf = Unpooled.copiedBuffer(msg.getBytes());
        ctx.channel().writeAndFlush(new BinaryWebSocketFrame(respByteBuf));
    } else {
        String message = "unsupported frame type: " + frame.getClass().getName(); throw new UnsupportedOperationException(message);
    }
}

}

复制代码; "复制代码")

3.客户端

WebSocketClient

复制代码; "复制代码")

import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; /* * create by zy 2019/11/28 14:57
* TODO */
public final class WebSocketClient { static final String URL = System.getProperty("url", "ws://127.0.0.1:8888/"); public static void main(String[] args) throws Exception {

    URI uri = new URI(URL);
    String scheme = uri.getScheme() == null ? "ws" : uri.getScheme(); final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost(); final int port; if (uri.getPort() == -1) { if ("ws".equalsIgnoreCase(scheme)) {
            port = 80;
        } else if ("wss".equalsIgnoreCase(scheme)) {
            port = 443;
        } else {
            port = -1;
        }
    } else {
        port = uri.getPort();
    } if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
        System.err.println("Only WS(S) is supported."); return;
    } final boolean ssl = "wss".equalsIgnoreCase(scheme); final SslContext sslCtx; if (ssl) {
        sslCtx = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
    } else {
        sslCtx = null;
    } //配置客户端 NIO 线程组/池
    EventLoopGroup group = new NioEventLoopGroup(); try { // Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00. // If you change it to V00, ping is not supported and remember to change // HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
        final WebSocketClientHandler handler = new WebSocketClientHandler(
                WebSocketClientHandshakerFactory.newHandshaker(
                        uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders())); /** \* Bootstrap 与 ServerBootstrap 都继承(extends)于 AbstractBootstrap
         \* 创建客户端辅助启动类,并对其配置,与服务器稍微不同,这里的 Channel 设置为 NioSocketChannel
         \* 然后为其添加 Handler,这里直接使用匿名内部类,实现 initChannel 方法
         \* 作用是当创建 NioSocketChannel 成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件 */ Bootstrap b = new Bootstrap();
        b.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override protected void initChannel(SocketChannel ch) {
                        ChannelPipeline p = ch.pipeline(); if (sslCtx != null) {
                            p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
                        }
                        p.addLast( new HttpClientCodec(), new HttpObjectAggregator(8192),
                                WebSocketClientCompressionHandler.INSTANCE,
                                handler);
                    }
                }); //客户端与服务端建立连接
        Channel ch = b.connect(uri.getHost(), port).sync().channel();
        handler.handshakeFuture().sync(); /** \* 将输入信息传输到 server 端 */ BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); while (true) {
            String msg = console.readLine(); if (msg == null) { break;
            } else if ("bye".equals(msg.toLowerCase())) { //输入bye 断开连接
                ch.writeAndFlush(new CloseWebSocketFrame());
                ch.closeFuture().sync(); break;
            } else if ("ping".equals(msg.toLowerCase())) {
                WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte\[\]{8, 1, 8, 1}));
                ch.writeAndFlush(frame);
            } else {
                WebSocketFrame frame = new TextWebSocketFrame(msg);
                ch.writeAndFlush(frame);
            }
        }
    } finally {
        group.shutdownGracefully();
    }
}

}

复制代码; "复制代码")

WebSocketClientHandler

复制代码; "复制代码")

import io.netty.channel.; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.websocketx.; import io.netty.util.CharsetUtil; /** * create by zy 2019/11/28 14:58
* TODO */
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> { private final WebSocketClientHandshaker handshaker; private ChannelPromise handshakeFuture; public WebSocketClientHandler(WebSocketClientHandshaker handshaker) { this.handshaker = handshaker;

} public ChannelFuture handshakeFuture() { return handshakeFuture;
}

@Override public void handlerAdded(ChannelHandlerContext ctx) {
    handshakeFuture = ctx.newPromise();
}

@Override public void channelActive(ChannelHandlerContext ctx) {
    handshaker.handshake(ctx.channel());
}

@Override public void channelInactive(ChannelHandlerContext ctx) {
    System.out.println("WebSocket Client disconnected!");
}

@Override public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
    Channel ch = ctx.channel(); if (!handshaker.isHandshakeComplete()) { try { //握手成功 建立连接

handshaker.finishHandshake(ch, (FullHttpResponse) msg);

            System.out.println("WebSocket Client connected!");
            handshakeFuture.setSuccess();
        } catch (WebSocketHandshakeException e) { //握手失败
            System.out.println("WebSocket Client failed to connect");
            handshakeFuture.setFailure(e);
        } return;
    } if (msg instanceof FullHttpResponse) {
        FullHttpResponse response = (FullHttpResponse) msg; throw new IllegalStateException( "Unexpected FullHttpResponse (getStatus=" + response.status() +
                        ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
    }

    WebSocketFrame frame = (WebSocketFrame) msg; if (frame instanceof TextWebSocketFrame) { //接收客户端返回消息
        TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
        System.out.println("WebSocket Client received message: " + textFrame.text());
    } else if (frame instanceof PongWebSocketFrame) {
        System.out.println("WebSocket Client received pong");
    } else if (frame instanceof CloseWebSocketFrame) {
        System.out.println("WebSocket Client received closing");
        ch.close();
    }
}

@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace(); if (!handshakeFuture.isDone()) {
        handshakeFuture.setFailure(cause);
    }
    ctx.close();
}

}

复制代码; "复制代码")

4.web客户端

复制代码; "复制代码")

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml";>
<head>

<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>Netty-Websocket</title>
<script type="text/javascript">
    var socket; if(!window.WebSocket){
        window.WebSocket = window.MozWebSocket;
    } if(window.WebSocket){
        socket = new WebSocket("ws://127.0.0.1:9898/");
        socket.onmessage = function(event){ var ta = document.getElementById('responseText');
            ta.value += event.data+"\\r\\n";
        };
        socket.onopen = function(event){ var ta = document.getElementById('responseText');
            ta.value = "Netty-WebSocket服务器。。。。。。连接  \\r\\n";
            login();
        };
        socket.onclose = function(event){ var ta = document.getElementById('responseText');
            ta.value = "Netty-WebSocket服务器。。。。。。关闭 \\r\\n";
        };
    }else{
        alert("您的浏览器不支持WebSocket协议!");
    } function send(msg){ if(!window.WebSocket){return;} if(socket.readyState == WebSocket.OPEN){
            socket.send(msg);
        }else{
            alert("WebSocket 连接没有建立成功!");
        }
    } function login(){ if(!window.WebSocket){return;} if(socket.readyState == WebSocket.OPEN){
            socket.send("建立连接成功!");
        }else{
            alert("WebSocket 连接没有建立成功!");
        }
    } function closeSocket(){ if(!window.WebSocket){return;}
        socket.close();
    } </script>

</head>
<body>
<form onSubmit="return false;">

<label>TEXT</label><input type="text" name="blaze" value="" /> <br />
<br /> <input type="button" value="发送ws消息" onClick="send(this.form.blaze.value)" />
<hr color="black" />
<br /> <input type="button" value="断开连接" onClick="closeSocket()" />
<hr color="black" />
<h3>服务端返回的应答消息</h3>
<textarea id="responseText" style="width: 1024px;height: 300px;"></textarea>

</form>
</body>
</html>

复制代码; "复制代码")


原网址: 访问
创建于: 2023-09-20 14:30:35
目录: default
标签: 无

请先后发表评论
  • 最新评论
  • 总共0条评论