concept:
netty:
server: #服务配置
message:
retry:
times: 0 #客户端重试次数,默认不重试
period: 0 #客户端重试间隔,单位ms,默认0ms
load-balance: #负载均衡(转发)配置
subscriber-master: redisson_topic #主订阅器,默认无
subscriber-slave: none #从订阅器,默认无
message:
retry:
times: 0 #转发重试次数,默认不重试
period: 0 #转发重试间隔,单位ms,默认0ms
heartbeat: #心跳配置
enabled: true #是否启用心跳,默认true
period: 60000 #心跳间隔,单位ms,默认1分钟
timeout: 210000 #超时时间,单位ms,默认3.5分钟,3次心跳间隔
executor:
thread-pool-size: 1 #线程池大小,默认1
@Slf4j
@Configuration
public class NettyWebSocketServer {
public static final int WEB_SOCKET_PORT = 8090;
/**
* 创建线程池执行器
*/
private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
/**
* 工作线程池
*/
private final EventLoopGroup workerGroup = new NioEventLoopGroup(NettyRuntime.availableProcessors() * 2);
@Resource
private NettyLoadBalanceConcept concept;
/**
* 启动 ws server
*/
@PostConstruct
public void start() throws InterruptedException {
// 需要开启一个新的线程来执行netty server 服务器
new Thread(() -> {
try {
run();
} catch (InterruptedException e) {
log.error("启动 ws server 失败! reason=[{}]", e.getMessage());
}
}).start();
}
/**
* 销毁
*/
@PreDestroy
public void destroy() {
Future<?> bossGroupShutdownFuture = bossGroup.shutdownGracefully();
Future<?> workerGroupShutdownFuture = workerGroup.shutdownGracefully();
bossGroupShutdownFuture.syncUninterruptibly();
workerGroupShutdownFuture.syncUninterruptibly();
log.info("关闭 ws server 成功!");
}
public void run() throws InterruptedException {
// 服务器启动引导对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 30秒客户端没有向服务器发送心跳则关闭连接
pipeline.addLast(new IdleStateHandler(30, 0, 0));
// 因为使用http协议,所以需要使用http的编码器,解码器
pipeline.addLast(new HttpServerCodec());
// 以块方式写,添加 chunkedWriter 处理器
pipeline.addLast(new ChunkedWriteHandler());
/**
* 说明:
* 1. http数据在传输过程中是分段的,HttpObjectAggregator可以把多个段聚合起来;
* 2. 这就是为什么当浏览器发送大量数据时,就会发出多次 http请求的原因
*/
pipeline.addLast(new HttpObjectAggregator(8192));
// 保存用户ip
pipeline.addLast(new HttpHeadersHandler());
/**
* 说明:
* 1. 对于 WebSocket,它的数据是以帧frame 的形式传递的;
* 2. 可以看到 WebSocketFrame 下面有6个子类
* 3. 浏览器发送请求时: ws://localhost:7000/hello 表示请求的uri
* 4. WebSocketServerProtocolHandler 核心功能是把 http协议升级为 ws 协议,保持长连接;
* 是通过一个状态码 101 来切换的
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
// 将连接交由 NettyLoadBalanceHandler 管理
pipeline.addLast(new NettyLoadBalanceHandler(concept));
// 自定义handler ,处理业务逻辑
pipeline.addLast(new NettyWebSocketServerHandler());
}
});
// 启动服务器,监听端口,阻塞直到启动成功
// ChannelFuture future = serverBootstrap.bind(WEB_SOCKET_PORT).sync();
ChannelFuture future = serverBootstrap.bind(Integer.parseInt(Objects.requireNonNull(SpringUtil.getApplicationContext().getEnvironment().getProperty("netty.port")))).sync();
log.info("Server started and listen on:{}", future.channel().localAddress());
future.channel().closeFuture().sync();
}
}