小言_互联网的博客

认识Netty

382人阅读  评论(0)

Netty入门教程——认识Netty

Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序

引入依赖

  <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.24.Final</version>
 </dependency>

netty服务端代码

public class NettyServer {
    public  static List list=new ArrayList(  );
    public static void main(String[] args) {
        try {
            //创建BossGroup和workerGroup
            //说明
            //1.创建两个线程组bossGroup和workerGroup
            //2.bossgroup只是处理连接请求。真正和客户端业务处理会交给workerGroup
            //3.两个都是无限循环
            EventLoopGroup boosGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroop=new NioEventLoopGroup( );
            //创建服务器的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            //使用链式编程进行设置
            bootstrap.group( boosGroup,workerGroop )//设置两个线程组
                    .channel( NioServerSocketChannel.class )//使用NioServerSocketChannel作为服务器的通道实现
                    .option( ChannelOption.SO_BACKLOG,128 )//设置线程队列得到连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
                    .childHandler( new ChannelInitializer<SocketChannel>() {//创建一个通道测试对象(匿名对象)
                        //给pipeline设置处理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //可执行推送任务
                           /* list.add( socketChannel.localAddress().toString().substring( 1,socketChannel.localAddress().toString().indexOf( ":" )) );
                            list.add(socketChannel);
                            */
                            socketChannel.pipeline().addLast(new  NettyServerHandler());
                        }
                    } );//给workerGroup的EventLoop对应管道设置处理器
            System.out.println("服务器等待连接。。。");
            //绑定一个端口并且同步,生成一个ChannelFuture对象
            //启动服务器
            ChannelFuture cf = bootstrap.bind( 38400 ).sync();
            //FutureListener机制
            cf.addListener( new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if(cf.isSuccess()){
                        System.out.println("*******************"+"端口监听成功");
                        System.out.println(Thread.currentThread().getName());
                    }
                }
            } );
            //对关闭通道进行监听
            ChannelFuture sync = cf.channel().closeFuture().sync();
        }catch ( Exception e){
            e.printStackTrace();
        }
    }
}

自定义Handler需要继承netty规定好的某个ChannelInboundHandlerAdapter

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

   //读取数据(也可发送数据)

   /**
    * ChannelHandlerContext ctx指的是上下文,含有管道pipeline,通道,地址
    * Object msg  客户端发送的消息
    */
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       System.out.println( "server ctx=" + ctx );
       //将msg转成一个bytebuf  ByteBuf是netty包下面的
       ByteBuf buf = (ByteBuf) msg;
       //转换为数组
       byte[] req = new byte[buf.readableBytes()];
       buf.readBytes( req );
       System.out.println( new String( req, "utf-8" ) );
       System.out.println( "客户端发送消息是:" + buf.toString( CharsetUtil.UTF_8 ) );
       System.out.println( "客户端地址:" + ctx.channel().localAddress() );
       //异步发送消息
       ctx.channel().eventLoop().execute( new Runnable() {
           @Override
           public void run() {
               int state = 0;
               while (true) {
                   try {
                       if (state == 0) {
                           //注册68 84 0 0 0 0 0 0 0 0 0 0 0 11 42 0 2 1 1 0 0 0 2 101 84
                           byte by[] = {68, 84, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 11, 42, 0, 2, 1, 1, 0, 0, 0, 2, 101, 84};
                           //     sendClinetsxt( by, key, channel );
                           ctx.writeAndFlush( Unpooled.copiedBuffer( by ) );
                           System.out.println( "注册执行" + ctx.channel() );
                       }
                       if (state == 1) {
                           //心跳
                           Thread.currentThread().sleep( 10000 );
                           byte by[] = {68, 84, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 11, 42, 0, 1, 1, 1, 0, 0, 0, 2, 101, 84};
                           ctx.writeAndFlush( Unpooled.copiedBuffer( by ) );
                           System.out.println();
                           System.out.println( Thread.currentThread().getName() );
                           System.out.println( "心跳执行" + ctx.channel() + new Date().getTime() );
                       }
                       state = 1;
                   } catch (Exception e) {
                       e.printStackTrace();
                   }
               }
           }
       } );
       //定时任务,提交到scheduleTaskQueue
       ctx.channel().eventLoop().schedule( new Runnable() {
           @Override
           public void run() {
               try {
                   ctx.writeAndFlush( Unpooled.copiedBuffer( "提交定时任务", CharsetUtil.UTF_8 ) );
               } catch (Exception e) {
                   e.printStackTrace();
               }
           }
       }, 5, TimeUnit.SECONDS );
   }

   //数据读取完毕后给客户端发消息
   @Override
   public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
       //writeAndFlush是write+Flush(将数据写入缓存,并刷新),需要编码的设定一般为Utf-8
       ctx.writeAndFlush( Unpooled.copiedBuffer( "我是服务端", CharsetUtil.UTF_8 ) );
   }

   //处理异常
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       ctx.close();
   }
}

netty客户端

public class NettyClient {
    public static void main(String[] args) {
        //客户端需要一个事件循环
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
      try {
          //创建客户端启动对象(客户端启动用Bootstrap而不是serverBootstrap)
          Bootstrap bootstrap = new Bootstrap();
          //设置相关参数
          bootstrap.group( eventExecutors )//设置线程组
                  .channel( NioSocketChannel.class)//设置客户端通道的实现类(反射)
                  .handler( new ChannelInitializer<SocketChannel>() {
                      @Override
                      protected void initChannel(SocketChannel socketChannel) throws Exception {
                          socketChannel.pipeline().addLast( new NettyClientHandler() );//加入自己的处理器
                      }
                  } );
          System.out.println("连接到服务端(* ̄︶ ̄)");
          //启动客户端
          ChannelFuture channelFuture = bootstrap.connect( "127.0.0.1", 38400 ).sync();
          //关闭通道进行监听
          channelFuture.channel().closeFuture().sync();
      }catch (Exception e){
          e.printStackTrace();
      }finally {
          eventExecutors.shutdownGracefully();
      }
    }
}

NettyClientHandler

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
   //当通道就绪后,会触发
   @Override
   public void channelActive(ChannelHandlerContext ctx) throws Exception {
       System.out.println( "client ctx=" + ctx );
       ctx.writeAndFlush( Unpooled.copiedBuffer( "hello server", CharsetUtil.UTF_8 ) );
   }

   //当通道有读取事件时,会触发
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       ByteBuf buf=(ByteBuf) msg;
       System.out.println("服务端发送的消息"+buf.toString(CharsetUtil.UTF_8));
       System.out.println("服务端地址"+ctx.channel().localAddress());
   }
   //异常

   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       cause.printStackTrace();
       ctx.close();
   }
}

Channel、ChannelPipeline、ChannelHandler、ChannelHandlerContext 之间的关系


转载:https://blog.csdn.net/changesss/article/details/106546026
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场