Netty入门教程——认识Netty
Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序
引入依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.24.Final</version>
</dependency>
netty服务端代码
public class NettyServer {
public static List list=new ArrayList( );
public static void main(String[] args) {
try {
//创建BossGroup和workerGroup
//说明
//1.创建两个线程组bossGroup和workerGroup
//2.bossgroup只是处理连接请求。真正和客户端业务处理会交给workerGroup
//3.两个都是无限循环
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workerGroop=new NioEventLoopGroup( );
//创建服务器的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程进行设置
bootstrap.group( boosGroup,workerGroop )//设置两个线程组
.channel( NioServerSocketChannel.class )//使用NioServerSocketChannel作为服务器的通道实现
.option( ChannelOption.SO_BACKLOG,128 )//设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
.childHandler( new ChannelInitializer<SocketChannel>() {//创建一个通道测试对象(匿名对象)
//给pipeline设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//可执行推送任务
/* list.add( socketChannel.localAddress().toString().substring( 1,socketChannel.localAddress().toString().indexOf( ":" )) );
list.add(socketChannel);
*/
socketChannel.pipeline().addLast(new NettyServerHandler());
}
} );//给workerGroup的EventLoop对应管道设置处理器
System.out.println("服务器等待连接。。。");
//绑定一个端口并且同步,生成一个ChannelFuture对象
//启动服务器
ChannelFuture cf = bootstrap.bind( 38400 ).sync();
//FutureListener机制
cf.addListener( new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(cf.isSuccess()){
System.out.println("*******************"+"端口监听成功");
System.out.println(Thread.currentThread().getName());
}
}
} );
//对关闭通道进行监听
ChannelFuture sync = cf.channel().closeFuture().sync();
}catch ( Exception e){
e.printStackTrace();
}
}
}
自定义Handler需要继承netty规定好的某个ChannelInboundHandlerAdapter
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//读取数据(也可发送数据)
/**
* ChannelHandlerContext ctx指的是上下文,含有管道pipeline,通道,地址
* Object msg 客户端发送的消息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println( "server ctx=" + ctx );
//将msg转成一个bytebuf ByteBuf是netty包下面的
ByteBuf buf = (ByteBuf) msg;
//转换为数组
byte[] req = new byte[buf.readableBytes()];
buf.readBytes( req );
System.out.println( new String( req, "utf-8" ) );
System.out.println( "客户端发送消息是:" + buf.toString( CharsetUtil.UTF_8 ) );
System.out.println( "客户端地址:" + ctx.channel().localAddress() );
//异步发送消息
ctx.channel().eventLoop().execute( new Runnable() {
@Override
public void run() {
int state = 0;
while (true) {
try {
if (state == 0) {
//注册68 84 0 0 0 0 0 0 0 0 0 0 0 11 42 0 2 1 1 0 0 0 2 101 84
byte by[] = {68, 84, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 11, 42, 0, 2, 1, 1, 0, 0, 0, 2, 101, 84};
// sendClinetsxt( by, key, channel );
ctx.writeAndFlush( Unpooled.copiedBuffer( by ) );
System.out.println( "注册执行" + ctx.channel() );
}
if (state == 1) {
//心跳
Thread.currentThread().sleep( 10000 );
byte by[] = {68, 84, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 11, 42, 0, 1, 1, 1, 0, 0, 0, 2, 101, 84};
ctx.writeAndFlush( Unpooled.copiedBuffer( by ) );
System.out.println();
System.out.println( Thread.currentThread().getName() );
System.out.println( "心跳执行" + ctx.channel() + new Date().getTime() );
}
state = 1;
} catch (Exception e) {
e.printStackTrace();
}
}
}
} );
//定时任务,提交到scheduleTaskQueue
ctx.channel().eventLoop().schedule( new Runnable() {
@Override
public void run() {
try {
ctx.writeAndFlush( Unpooled.copiedBuffer( "提交定时任务", CharsetUtil.UTF_8 ) );
} catch (Exception e) {
e.printStackTrace();
}
}
}, 5, TimeUnit.SECONDS );
}
//数据读取完毕后给客户端发消息
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush是write+Flush(将数据写入缓存,并刷新),需要编码的设定一般为Utf-8
ctx.writeAndFlush( Unpooled.copiedBuffer( "我是服务端", CharsetUtil.UTF_8 ) );
}
//处理异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
netty客户端
public class NettyClient {
public static void main(String[] args) {
//客户端需要一个事件循环
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//创建客户端启动对象(客户端启动用Bootstrap而不是serverBootstrap)
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group( eventExecutors )//设置线程组
.channel( NioSocketChannel.class)//设置客户端通道的实现类(反射)
.handler( new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast( new NettyClientHandler() );//加入自己的处理器
}
} );
System.out.println("连接到服务端(* ̄︶ ̄)");
//启动客户端
ChannelFuture channelFuture = bootstrap.connect( "127.0.0.1", 38400 ).sync();
//关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
eventExecutors.shutdownGracefully();
}
}
}
NettyClientHandler
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪后,会触发
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println( "client ctx=" + ctx );
ctx.writeAndFlush( Unpooled.copiedBuffer( "hello server", CharsetUtil.UTF_8 ) );
}
//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf=(ByteBuf) msg;
System.out.println("服务端发送的消息"+buf.toString(CharsetUtil.UTF_8));
System.out.println("服务端地址"+ctx.channel().localAddress());
}
//异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
Channel、ChannelPipeline、ChannelHandler、ChannelHandlerContext 之间的关系
转载:https://blog.csdn.net/changesss/article/details/106546026
查看评论