SpringBoot 集成 Netty
背景描述
- 如果需要在
SpringBoot
开发的app
中,提供Socket
服务,那么Netty
是不错的选择。
Netty与SpringBoot整合关注点
Netty
跟Springboot
生命周期保持一致,同生共死Netty
能用上ioc
中的Bean
Netty
能读取到全局的配置
Netty组件
Bootstrap、ServerBootstrap
- 帮助
Netty
使用者更加方便地组装和配置Netty
,也可以更方便地启动Netty
应用程序 Bootstrap
用于启动一个Netty TCP
客户端,或者UDP
的一端。ServerBootstrap
往往是用于启动一个Netty
服务端。
Channel
Channel
是Netty
网络操作抽象类,它除了包括基本的I/O
操作,如bind、connect、read、write
之外,还包括了Netty
框架相关的一些功能,如获取该Channel
的EventLoop
。- 其实就是我们平常网络编程中经常使用的
socket
套接字对象
EventLoop、EventLoopGroup
EventLoop
定义了Netty
的核心对象,用于处理IO
事件,多线程模型、并发- 一个
EventLoopGroup
包含一个或者多个EventLoop
- 一个
EventLoop
在它的生命周期内只和一个Thread
绑定 - 所有有
EventLoop
处理的I/O
事件都将在它专有的Thread
上被处理 - 一个
Channel
在它的生命周期内只注册于一个EventLoop
- 一个
EventLoop
可能会被分配给一个或者多个Channel
ChannelHandler
ChannelHandler
其实就是用于负责处理接收和发送数据的的业务逻辑,Netty
中可以注册多个handler
,以链式的方式进行处理,根据继承接口的不同,实现的顺序也不同。ChannelHandler
主要用于对出站和入站数据进行处理,它有两个重要的子接口:ChannelInboundHandler
——处理入站数据ChannelOutboundHandler
——处理出站数据
ChannelPipeline
ChannelPipeline
是ChannelHandler
的容器,通过ChannelPipeline
可以将ChannelHandler
组织成一个逻辑链,该逻辑链可以用来拦截流经Channel
的入站和出站事件,当Channel
被创建时,它会被自动地分配到它的专属的ChannelPipeline
。
ByteBuf
ByteBuf
就是字节缓冲区,用于高效处理输入输出。
Pom依赖
- 引入
springboot starter web
和netty
<!-- SpringBoot 初始化依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.5.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.85.Final</version>
</dependency>
Yml 配置
# Springboot 端口
server:
port: 2345
netty:
websocket:
# Websocket服务端口
port: 1024
# 绑定的网卡
ip: 0.0.0.0
# 消息帧最大体积
max-frame-size: 10240
# URI路径
path: /channel
整合Netty步骤
服务端
- 使用
SpringBoot Runner
机制启动Netty
服务。
@Component
@Order
public class NettyStartListener implements ApplicationRunner {
@Resource
private SocketServer socketServer;
@Override
public void run(ApplicationArguments args) {
this.socketServer.start();
}
}
SocketServe.java
@Component
public class SocketServer {
private static final Logger logger = LoggerFactory.getLogger(SocketServer.class);
/**
* 负责初始化 netty 服务器
*/
private ServerBootstrap serverBootstrap;
@Autowired
private SocketInitializer socketInitializer;
@Value("${netty.websocket.port}")
private int port;
/**
* 启动 netty 服务器
*/
public void start() {
this.init();
this.serverBootstrap.bind(this.port);
logger.info("Netty started on port: {} (TCP) with boss thread {}", this.port, 2);
}
/**
* 初始化 netty 配置
*/
private void init() {
// 创建两个线程组 bossGroup 为接收请求的线程组 一般1-2个就行
NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);
// 实际工作的线程组
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
this.serverBootstrap = new ServerBootstrap();
// 两个线程组加入进来 加入自己的初始化器
this.serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(this.socketInitializer);
}
}
- 编写
Netty
服务端监听消息处理器
public class SocketHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(SocketHandler.class);
public static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 读取到客户端发来的消息
*
* @param ctx ChannelHandlerContext
* @param msg msg
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 由于我们配置的是 字节数组 编解码器,所以这里取到的用户发来的数据是 byte数组
byte[] data = (byte[]) msg;
log.info("收到消息: " + new String(data));
// 给其他人转发消息
for (Channel client : clients) {
if (!client.equals(ctx.channel())) {
client.writeAndFlush(data);
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
log.info("新的客户端链接:" + ctx.channel().id().asShortText());
clients.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
clients.remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.channel().close();
clients.remove(ctx.channel());
}
}
- 设置出站解码器和入站编码器
@Component
public class SocketInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
// 添加对byte数组的编解码,netty提供了很多编解码器,你们可以根据需要选择
pipeline.addLast(new ByteArrayDecoder());
pipeline.addLast(new ByteArrayEncoder());
// 添加上自己的处理器
pipeline.addLast(new SocketHandler());
}
}
客户端
- 编写
socket
连接
public class ChatClient {
public void start(String name) throws IOException {
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1024));
socketChannel.configureBlocking(false);
Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_READ);
// 监听服务端发来得消息
new Thread(new ClientThread(selector)).start();
// 监听用户输入
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String message = scanner.next();
if (StringUtils.hasText(message)) {
socketChannel.write(StandardCharsets.UTF_8.encode(name + ": " + message));
}
}
}
private class ClientThread implements Runnable {
private final Logger logger = LoggerFactory.getLogger(ClientThread.class);
private final Selector selector;
public ClientThread(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
try {
while (true) {
int channels = selector.select();
if (channels == 0) {
continue;
}
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeySet.iterator();
while (keyIterator.hasNext()) {
SelectionKey selectionKey = keyIterator.next();
// 移除集合当前得selectionKey,避免重复处理
keyIterator.remove();
if (selectionKey.isReadable()) {
handleRead(selector, selectionKey);
}
}
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
// 处理可读状态
private void handleRead(Selector selector, SelectionKey selectionKey) throws IOException {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
StringBuilder message = new StringBuilder();
if (channel.read(byteBuffer) > 0) {
byteBuffer.flip();
message.append(StandardCharsets.UTF_8.decode(byteBuffer));
}
// 再次注册到选择器上,继续监听可读状态
channel.register(selector, SelectionKey.OP_READ);
System.out.println(message);
}
}
start()
public static void main(String[] args) throws IOException {
new ChatClient().start("张三");
}
转载:https://blog.csdn.net/qq_37248504/article/details/127938165
查看评论