第二次课:netty学习

飞一样的编程
飞一样的编程
擅长邻域:Java,MySQL,Linux,nginx,springboot,mongodb,微信小程序,vue

分类: springboot vue 专栏: 物联网项目 标签: netty学习

2023-05-29 23:38:36 666浏览

netty学习

Java共支持3种网络编程模型/IO模式:BIO、NIO、AIO

BIO

同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销

最初的写法

public class BioServer {

    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(9000);
        while(true){
            System.out.println("等待连接");
            //阻塞方法
            Socket clientSocket = serverSocket.accept();
            System.out.println("有客户端连接了");

            handler(clientSocket);//处理客户端连接


        }
    }

    private static void handler(Socket clientSocket)  {
        byte[] bytes= new byte[1024];
        System.out.println("准备reade..");
        //接受客户端的数据,阻塞方法,没有数据可读时就阻塞
        int read = 0;
        try {
            read = clientSocket.getInputStream().read(bytes);
            System.out.println("read完毕");
            if (read!=-1) {
                System.out.println("接收到客户端的数据:"+new String(bytes,0,read));
            }
            System.out.println("end..");
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
                clientSocket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

}

存在的问题:只要是高并发过来,压根没法处理。

多线程写法

  new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                handler(clientSocket);//处理客户端连接
            } catch (Exception e) {
                e.printStackTrace();
            }finally {

            }

        }
    }).start();

存在的问题:c10k(客户端同时来了1万个连接)问题,以上方式虽然可以解决阻塞问题,但又有新问题,高并发的时候要创建很多个线程(假如是c10M呢)。造成内存溢出问题(oom)

线程池写法

上述问题的解决方案:用线程池的方式(设置pool固定大小),这样的话也有弊端——就是支持的并发数受到了线程池的大小限制,另外还有一个问题,就是假设线程池设置的是500,已有的500个客户端连接都只连接不发消息的话,就导致这500个线程全部阻塞住了,第501个客户端即使等再长的时间都连不上。因为没有一个线程池连接空闲出来。

    newCachedThreadPool.execute(new Runnable() {
        public void run() { //我们重写
            handler(clientSocket);//处理客户端连接
        }
    });

适用场景

BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易理解。

NIO

同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理 。

最初的写法

public class NioServer {
    //保存客户端连接
    static List<SocketChannel> channelList = new ArrayList<>();
    public static void main(String[] args) throws Exception {

        //创建nio ServerSocketChannel,与bio的serversSocket类似
        ServerSocketChannel serverSocket=ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress(9000));
        //设置ServerSocketChannel为非阻塞
        serverSocket.configureBlocking(false);
        System.out.println("服务器端启动成功");
        while (true){
            //非阻寒模式accept方法不会阻塞,否则会阻塞
            // NIO的非阻寨是由操作系统内部实现的,底层调用了linux内核的accept丽数
            SocketChannel socketChannel = serverSocket.accept();
            if (socketChannel != null) { //如果有客户端进行连接
                System.out.println("连接成功");
                //设置SocketChanneL为非阻塞
                socketChannel.configureBlocking(false);
                //保存客户端连接在List中
                channelList.add(socketChannel);
            }
            //遍历连接进行数据读取
            Iterator<SocketChannel> iterator =channelList.iterator();
            while (iterator.hasNext()) {
                SocketChannel sc = iterator.next();
                ByteBuffer byteBuffer = ByteBuffer. allocate(6);
                //非阻塞模式read方法不会阻寨,否则会阻寨
                int len = sc.read(byteBuffer);
                //如果有数据,把数据打印出来
                if(len>0){
                    System.out.println("接收到消息:"+ new String(byteBuffer.array()));
                } else if (len == -1) { //如果客户端断开,把socket从集合中去掉
                    iterator. remove();
                    System.out.println("客户端断开连接");
                }
            }

        }

    }
}

存在的问题:空转,浪费cpu,假设有1万个客户端与服务端连接上了,但只有一个跟服务端之间有通讯,为了监控这个客户端的通讯,不得不遍历循环1万个连接,这就对cpu造成了极大的浪费。

解决方案:把有数据收发的连接单独拿出来,只对有数据收发的连接进行遍历,当处理完后,将线程阻塞,让出cpu

多路复用器写法

public class NioSelectorServer {
    public static void main(String[] args) throws Exception {

        //创建nio ServerSocketChannel,与bio的serversSocket类似
        ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(9000));
        //设置ServerSocketChannel为非阻塞
        serverSocketChannel.configureBlocking(false);
        //打开selector
        Selector selector = Selector.open();
        //把ServerSocketChanneL注册到selector上,并iselector对客户端accept连接操作感兴趣
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//监听连接事件
        System.out.println("服务启动成功");
        while (true){
            selector.select();  // 阻塞等待需要处理的事件发生
            //获取selector中注册的全部事件的SelectionKey实例
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                // 可接收连接 能注册SelectionKey.OP_ACCEPT事件的只有 ServerSocketChannel通道
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    // 接受客户端连接
                    SocketChannel client = server.accept();
                    client.configureBlocking(false); // 设置客户端通道非阻塞
                    // 为客户端通道注册 OP_READ 事件
                    client.register(selector,SelectionKey.OP_READ);
                    System.out.println("客户端连接成功");
                }
                // 可读数据
                if (key.isReadable()) {
                    SocketChannel client = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer =ByteBuffer.allocate(6);
                    int len = client.read(byteBuffer);
                    //如果有数据,把数据打印出来
                    if(len>0){
                        System.out.println("接收到消息:"+ new String(byteBuffer.array()));
                    } else if (len == -1) { //如果客户端断开,把socket从集合中去掉

                        System.out.println("客户端断开连接");
                        client.close();
                    }
                }

                //从事件集合中删除本次处理的key,防止下次select重复处理
                iterator.remove();

            }

        }

    }
}

适用场景

NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4开始支持。

缺点

NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。

需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。

开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。

JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。直到 JDK 1.7 版本该问题仍旧存在,没有被根本解决。

AIO

Java AIO(NIO.2) : 异步非阻塞,AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用

适用场景

AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。

netty

什么是netty

Netty 是由 JBOSS 提供的一个 Java 开源框架,现为 Github上的独立项目。

Netty 是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。

Netty主要针对在TCP协议下,面向Clients端的高并发应用,或者Peer-to-Peer场景下的大量数据持续传输的应用。

Netty本质是一个NIO框架,适用于服务器通讯相关的多种应用场景

Netty 可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了 NIO 的开发过程

Netty 是目前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。

Netty的应用场景

互联网行业:在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架(远程过程调用)使用。典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。

游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。

Netty 作为高性能的基础通信组件,提供了 TCP/UDP 和 HTTP 协议栈,方便定制和开发私有协议栈,账号登录服务器。 地图服务器之间可以方便的通过 Netty 进行高性能的通信。

大数据领域:经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。

Netty的优点

Netty 对 JDK 自带的 NIO 的 API 进行了封装。

  • 设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;
  • 基于灵活且可扩展的事件模型,可以清晰地分离关注点;
  • 高度可定制的线程模型 - 单线程,一个或多个线程池;
  • 使用方便:详细记录的 Javadoc,用户指南和示例;
  • 没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。
  • 高性能、吞吐量更高:延迟更低;
  • 减少资源消耗;
  • 最小化不必要的内存复制。
  • 安全:完整的 SSL/TLS 和 StartTLS 支持。
  • 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入

单机模式下都就可以承受百万级别的并发

springboot集成netty

依赖

 <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.77.Final</version>
  </dependency>

服务端

// netty server类
@Component
public class NettyServer {

    @Value("${netty-port}")
    private int port;

    public void start() throws InterruptedException {
        /**
         * 创建两个线程组 bossGroup 和 workerGroup
         * bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给 workerGroup 完成
         *  两个都是无线循环
         */
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            //设置两个线程组
            bootstrap.group(bossGroup, workerGroup)
                    //使用NioServerSocketChannel 作为服务器的通道实现
                    .channel(NioServerSocketChannel.class)
                    //设置线程队列得到连接个数
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //设置保持活动连接状态
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    //可以给 bossGroup 加个日志处理器
                    .handler(new LoggingHandler(LogLevel.INFO))
                    //给workerGroup 的 EventLoop 对应的管道设置处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        //给pipeline 设置处理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器
                            pipeline.addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(24*1024,0,2));
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });

            //启动服务器并绑定一个端口并且同步生成一个 ChannelFuture 对象
            ChannelFuture cf = bootstrap.bind(port).sync();
            if (cf.isSuccess()) {
                System.out.println("socket server start---------------");
            }
            //对关闭通道进行监听
            cf.channel().closeFuture().sync();
        } finally {
            //发送异常关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

// handler类
public class NettyServerHandler extends SimpleChannelInboundHandler<Object> {
    private static final Logger log = LoggerFactory.getLogger(NettyServerHandler.class);

    protected void channelRead0(ChannelHandlerContext context, Object obj) throws Exception {
        log.info(">>>>>>>>>>>服务端接收到客户端的消息:{}",obj);
        SocketChannel socketChannel = (SocketChannel) context.channel();
        /**
         * 服务器返回客户端消息
         */
        Map map =  new HashMap();
        map.put("msg","我是服务端,收到你的消息了");
        socketChannel.writeAndFlush(JSON.toJSONString(map));
        ReferenceCountUtil.release(obj);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}


// springboot 集成启动 netty server,同时不影响tomcat接口
@Component
public class NettyBoot implements CommandLineRunner {

    @Autowired
    private NettyServer nettyServer;

    public void run(String... args) throws Exception {
        try {
            nettyServer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

客户端

// netty client客户端
@Component
public class NettyClient {

    private int port = 9999;
    private String host = "localhost";
    public Channel channel;

    public void start() {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        try {
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .remoteAddress(host, port)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器
                            pipeline.addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(24*1024,0,2));
                            pipeline.addLast(new NettyClientHandler());
                        }
                    });
            ChannelFuture future = bootstrap.connect(host, port).sync();
            if (future.isSuccess()) {
                channel = future.channel();
                System.out.println("connect server  成功---------");
            }
//            给关闭通道进行监听
            future.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }

    public void sendMsg(String msg) {
        this.channel.writeAndFlush(msg);
    }
}

// handler处理类
public class NettyClientHandler extends SimpleChannelInboundHandler<Object> {
    private static final Logger log = LoggerFactory.getLogger(NettyClientHandler.class);
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info(">>>>>>>>连接");
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info(">>>>>>>>退出");
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        log.info(">>>>>>>>>>>>>userEventTriggered:{}", evt);
    }

    /**
     * 客户端接收到服务端发的数据
     * @param channelHandlerContext
     * @param obj
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object obj)  {
        log.info(">>>>>>>>>>>>>客户端接收到消息:{}", obj);
        ReferenceCountUtil.release(obj);
    }

    /**
     * socket通道处于活动状态
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info(">>>>>>>>>>socket建立了");
        super.channelActive(ctx);
    }

    /**
     * socket通道不活动了
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info(">>>>>>>>>>socket关闭了");
        super.channelInactive(ctx);
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

// springboot集成,同时不影响tomact接口
@Component
public class NettyBoot implements CommandLineRunner {

    @Autowired
    private NettyClient nettyClient;

    public void run(String... args) throws Exception {
        nettyClient.start();
    }
}

补充:项目启动的时候就把netty-server也启动起来(除了上面的写法外,还有一种常用写法)

  // 服务器端启动,并绑定 19080 端口
    //@PostConstruct是Java自带的注解,在方法上加该注解会在项目启动的时候执行该方法,
    // 也可以理解为在spring容器初始化的时候执行该方法。
    @PostConstruct

服务端主动给客户端发消息

在server的handler里加ChannelGroup属性

 private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    public static ChannelGroup getChannels() {
        return channelGroup;
    }
 	@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
            //客户端上线了
        Channel channel = ctx.channel();
        channelGroup.add(channel);
        log.info(ctx.channel().remoteAddress()+"上线了");

    }
 ChannelGroup channelGroup= ServerHandler.getChannels();
        Iterator channelIterator =  channelGroup.iterator();
        while(channelIterator.hasNext()){
            Channel channel = (Channel)channelIterator.next();
            InetSocketAddress insocket = (InetSocketAddress)channel.remoteAddress();
            //给指定客户端ip发送消息
            if(ip.equals(insocket.toString())){
              
                channel.writeAndFlush("hello server I am client");
               

            }
        }

聊天im小案例

server服务端

/**
 * @author:xiaojie
 * @create: 2023-05-29 22:46
 * @Description: 聊天服务端
 */
public class ImServer {
    private static EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private static EventLoopGroup workerGroup = new NioEventLoopGroup();

    public static void main(String[] args) {
        try {

            //创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            //设置两个线程组
            bootstrap.group(bossGroup, workerGroup)
                    //使用NioServerSocketChannel 作为服务器的通道实现
                    .channel(NioServerSocketChannel.class)
                    //设置线程队列得到连接个数
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //设置保持活动连接状态
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    //给workerGroup 的 EventLoop 对应的管道设置处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        //给pipeline 设置处理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器
                            pipeline.addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(24*1024,0,2));
                            pipeline.addLast(new ImServerHandler());
                        }
                    });

            //启动服务器并绑定一个端口并且同步生成一个 ChannelFuture 对象
            ChannelFuture cf = null;
            cf = bootstrap.bind(9000).sync();
            if (cf.isSuccess()) {
                System.out.println("socket server start---------------");
            }
            //对关闭通道进行监听
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //发送异常关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

服务端处理器

public class ImServerHandler extends SimpleChannelInboundHandler<Object> {
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    //客户端跟服务端连接上后触发
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //将该客户加入聊天的信息推送给其它在线的客户瑞
        //该方法会将channeLGroup 中所有的channel 遍历。井发送消息
        channelGroup.writeAndFlush("[ 客户端] "+ channel.remoteAddress() +"上线了 "+ sdf.format (new Date()) + "\n");

        //将当前channel 加入到channeLGroup
        channelGroup.add(channel);
        System.out.println(ctx.channel().remoteAddress() +"上线了"+ "\n");
    }

    //读取客户端数据
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取到当前channeL
        Channel channel = ctx.channel();
        //这时我们遍历channeLGroup, 根据不同的情况。回送 不同的消息
        channelGroup.forEach(ch -> {
            if (channel != ch) { //不是当前的channel,转发消息
                ch.writeAndFlush( "[客户瑞]"+ channel.remoteAddress() +"发送了消息:"+ msg + "\n" );
            } else {//回显自己发送的消息给自己
                ch.writeAndFlush("[自己]发送了消息:" + msg + "\n");
            }
        });
    }



    //表示channel处于不活动状态,提示离线
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //将客户离开信息推送给当前在线的客户
        channelGroup.writeAndFlush("[ 客户端] "+ channel.remoteAddress() +"下线了 "+ sdf.format (new Date()) + "\n");
        System.out.println(channel.remoteAddress()+"下线了"+"\n");
    }
}

client客户端

public class ImClient {
    private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    private static int port = 9000;
    private static String host = "localhost";
    public static Channel channel;

    public static void main(String[] args) {

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器
                            pipeline.addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器
                            pipeline.addLast(new LengthFieldBasedFrameDecoder(24*1024,0,2));
                            pipeline.addLast(new ImClientHandler());
                        }
                    });
            ChannelFuture future = bootstrap.connect(host, port).sync();
            if (future.isSuccess()) {
                channel = future.channel();
                System.out.println("=====" + channel.localAddress() + "===");
                //客户端需要输入信息。 创建一个扫描源
                Scanner scanner = new Scanner(System.in) ;
                while (scanner.hasNextLine() ) {
                    String msg = scanner.nextLine();
                    //通过channeL发送到服务器端
                    channel.writeAndFlush(msg);
                }
            }
//            给关闭通道进行监听
            future.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

客户端处理器

public class ImClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        System.out.println(msg.trim());
    }
}

好博客就要一起分享哦!分享海报

此处可发布评论

评论(1展开评论

蓝色妖姬 能力:10

2023-06-09 08:30:17

学习中
点击查看更多评论

展开评论

您可能感兴趣的博客

客服QQ 1913284695