第二次课:netty学习
分类: springboot vue 专栏: 物联网项目 标签: netty学习
2023-05-29 23:38:36 1342浏览
Java共支持3种网络编程模型/IO模式:BIO、NIO、AIO
BIO
同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销

最初的写法
public class BioServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(9000);
while(true){
System.out.println("等待连接");
//阻塞方法
Socket clientSocket = serverSocket.accept();
System.out.println("有客户端连接了");
handler(clientSocket);//处理客户端连接
}
}
private static void handler(Socket clientSocket) {
byte[] bytes= new byte[1024];
System.out.println("准备reade..");
//接受客户端的数据,阻塞方法,没有数据可读时就阻塞
int read = 0;
try {
read = clientSocket.getInputStream().read(bytes);
System.out.println("read完毕");
if (read!=-1) {
System.out.println("接收到客户端的数据:"+new String(bytes,0,read));
}
System.out.println("end..");
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}存在的问题:只要是高并发过来,压根没法处理。
多线程写法
new Thread(new Runnable() {
@Override
public void run() {
try {
handler(clientSocket);//处理客户端连接
} catch (Exception e) {
e.printStackTrace();
}finally {
}
}
}).start();存在的问题:c10k(客户端同时来了1万个连接)问题,以上方式虽然可以解决阻塞问题,但又有新问题,高并发的时候要创建很多个线程(假如是c10M呢)。造成内存溢出问题(oom)
线程池写法
上述问题的解决方案:用线程池的方式(设置pool固定大小),这样的话也有弊端——就是支持的并发数受到了线程池的大小限制,另外还有一个问题,就是假设线程池设置的是500,已有的500个客户端连接都只连接不发消息的话,就导致这500个线程全部阻塞住了,第501个客户端即使等再长的时间都连不上。因为没有一个线程池连接空闲出来。
newCachedThreadPool.execute(new Runnable() {
public void run() { //我们重写
handler(clientSocket);//处理客户端连接
}
});适用场景
BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易理解。
NIO
同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理 。

最初的写法
public class NioServer {
//保存客户端连接
static List<SocketChannel> channelList = new ArrayList<>();
public static void main(String[] args) throws Exception {
//创建nio ServerSocketChannel,与bio的serversSocket类似
ServerSocketChannel serverSocket=ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress(9000));
//设置ServerSocketChannel为非阻塞
serverSocket.configureBlocking(false);
System.out.println("服务器端启动成功");
while (true){
//非阻寒模式accept方法不会阻塞,否则会阻塞
// NIO的非阻寨是由操作系统内部实现的,底层调用了linux内核的accept丽数
SocketChannel socketChannel = serverSocket.accept();
if (socketChannel != null) { //如果有客户端进行连接
System.out.println("连接成功");
//设置SocketChanneL为非阻塞
socketChannel.configureBlocking(false);
//保存客户端连接在List中
channelList.add(socketChannel);
}
//遍历连接进行数据读取
Iterator<SocketChannel> iterator =channelList.iterator();
while (iterator.hasNext()) {
SocketChannel sc = iterator.next();
ByteBuffer byteBuffer = ByteBuffer. allocate(6);
//非阻塞模式read方法不会阻寨,否则会阻寨
int len = sc.read(byteBuffer);
//如果有数据,把数据打印出来
if(len>0){
System.out.println("接收到消息:"+ new String(byteBuffer.array()));
} else if (len == -1) { //如果客户端断开,把socket从集合中去掉
iterator. remove();
System.out.println("客户端断开连接");
}
}
}
}
}存在的问题:空转,浪费cpu,假设有1万个客户端与服务端连接上了,但只有一个跟服务端之间有通讯,为了监控这个客户端的通讯,不得不遍历循环1万个连接,这就对cpu造成了极大的浪费。
解决方案:把有数据收发的连接单独拿出来,只对有数据收发的连接进行遍历,当处理完后,将线程阻塞,让出cpu
多路复用器写法
public class NioSelectorServer {
public static void main(String[] args) throws Exception {
//创建nio ServerSocketChannel,与bio的serversSocket类似
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9000));
//设置ServerSocketChannel为非阻塞
serverSocketChannel.configureBlocking(false);
//打开selector
Selector selector = Selector.open();
//把ServerSocketChanneL注册到selector上,并iselector对客户端accept连接操作感兴趣
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//监听连接事件
System.out.println("服务启动成功");
while (true){
selector.select(); // 阻塞等待需要处理的事件发生
//获取selector中注册的全部事件的SelectionKey实例
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
// 可接收连接 能注册SelectionKey.OP_ACCEPT事件的只有 ServerSocketChannel通道
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 接受客户端连接
SocketChannel client = server.accept();
client.configureBlocking(false); // 设置客户端通道非阻塞
// 为客户端通道注册 OP_READ 事件
client.register(selector,SelectionKey.OP_READ);
System.out.println("客户端连接成功");
}
// 可读数据
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer byteBuffer =ByteBuffer.allocate(6);
int len = client.read(byteBuffer);
//如果有数据,把数据打印出来
if(len>0){
System.out.println("接收到消息:"+ new String(byteBuffer.array()));
} else if (len == -1) { //如果客户端断开,把socket从集合中去掉
System.out.println("客户端断开连接");
client.close();
}
}
//从事件集合中删除本次处理的key,防止下次select重复处理
iterator.remove();
}
}
}
}
适用场景
NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4开始支持。
缺点
NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。
需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。
开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。
JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。直到 JDK 1.7 版本该问题仍旧存在,没有被根本解决。
AIO
Java AIO(NIO.2) : 异步非阻塞,AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用
适用场景
AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。
netty
什么是netty
Netty 是由 JBOSS 提供的一个 Java 开源框架,现为 Github上的独立项目。
Netty 是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。
Netty主要针对在TCP协议下,面向Clients端的高并发应用,或者Peer-to-Peer场景下的大量数据持续传输的应用。
Netty本质是一个NIO框架,适用于服务器通讯相关的多种应用场景
Netty 可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了 NIO 的开发过程
Netty 是目前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。
Netty的应用场景
互联网行业:在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架(远程过程调用)使用。典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。
游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。
Netty 作为高性能的基础通信组件,提供了 TCP/UDP 和 HTTP 协议栈,方便定制和开发私有协议栈,账号登录服务器。 地图服务器之间可以方便的通过 Netty 进行高性能的通信。
大数据领域:经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。
Netty的优点
Netty 对 JDK 自带的 NIO 的 API 进行了封装。
- 设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;
- 基于灵活且可扩展的事件模型,可以清晰地分离关注点;
- 高度可定制的线程模型 - 单线程,一个或多个线程池;
- 使用方便:详细记录的 Javadoc,用户指南和示例;
- 没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。
- 高性能、吞吐量更高:延迟更低;
- 减少资源消耗;
- 最小化不必要的内存复制。
- 安全:完整的 SSL/TLS 和 StartTLS 支持。
- 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入
单机模式下都就可以承受百万级别的并发
springboot集成netty
依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.77.Final</version>
</dependency>服务端
// netty server类
@Component
public class NettyServer {
@Value("${netty-port}")
private int port;
public void start() throws InterruptedException {
/**
* 创建两个线程组 bossGroup 和 workerGroup
* bossGroup 只是处理连接请求,真正的和客户端业务处理,会交给 workerGroup 完成
* 两个都是无线循环
*/
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//设置两个线程组
bootstrap.group(bossGroup, workerGroup)
//使用NioServerSocketChannel 作为服务器的通道实现
.channel(NioServerSocketChannel.class)
//设置线程队列得到连接个数
.option(ChannelOption.SO_BACKLOG, 128)
//设置保持活动连接状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
//通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
.childOption(ChannelOption.TCP_NODELAY, true)
//可以给 bossGroup 加个日志处理器
.handler(new LoggingHandler(LogLevel.INFO))
//给workerGroup 的 EventLoop 对应的管道设置处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器
pipeline.addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器
pipeline.addLast(new LengthFieldBasedFrameDecoder(24*1024,0,2));
pipeline.addLast(new NettyServerHandler());
}
});
//启动服务器并绑定一个端口并且同步生成一个 ChannelFuture 对象
ChannelFuture cf = bootstrap.bind(port).sync();
if (cf.isSuccess()) {
System.out.println("socket server start---------------");
}
//对关闭通道进行监听
cf.channel().closeFuture().sync();
} finally {
//发送异常关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
// handler类
public class NettyServerHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger log = LoggerFactory.getLogger(NettyServerHandler.class);
protected void channelRead0(ChannelHandlerContext context, Object obj) throws Exception {
log.info(">>>>>>>>>>>服务端接收到客户端的消息:{}",obj);
SocketChannel socketChannel = (SocketChannel) context.channel();
/**
* 服务器返回客户端消息
*/
Map map = new HashMap();
map.put("msg","我是服务端,收到你的消息了");
socketChannel.writeAndFlush(JSON.toJSONString(map));
ReferenceCountUtil.release(obj);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
// springboot 集成启动 netty server,同时不影响tomcat接口
@Component
public class NettyBoot implements CommandLineRunner {
@Autowired
private NettyServer nettyServer;
public void run(String... args) throws Exception {
try {
nettyServer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
客户端
// netty client客户端
@Component
public class NettyClient {
private int port = 9999;
private String host = "localhost";
public Channel channel;
public void start() {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.remoteAddress(host, port)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器
pipeline.addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器
pipeline.addLast(new LengthFieldBasedFrameDecoder(24*1024,0,2));
pipeline.addLast(new NettyClientHandler());
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
if (future.isSuccess()) {
channel = future.channel();
System.out.println("connect server 成功---------");
}
// 给关闭通道进行监听
future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
public void sendMsg(String msg) {
this.channel.writeAndFlush(msg);
}
}
// handler处理类
public class NettyClientHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger log = LoggerFactory.getLogger(NettyClientHandler.class);
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.info(">>>>>>>>连接");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.info(">>>>>>>>退出");
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
log.info(">>>>>>>>>>>>>userEventTriggered:{}", evt);
}
/**
* 客户端接收到服务端发的数据
* @param channelHandlerContext
* @param obj
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object obj) {
log.info(">>>>>>>>>>>>>客户端接收到消息:{}", obj);
ReferenceCountUtil.release(obj);
}
/**
* socket通道处于活动状态
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info(">>>>>>>>>>socket建立了");
super.channelActive(ctx);
}
/**
* socket通道不活动了
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info(">>>>>>>>>>socket关闭了");
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
// springboot集成,同时不影响tomact接口
@Component
public class NettyBoot implements CommandLineRunner {
@Autowired
private NettyClient nettyClient;
public void run(String... args) throws Exception {
nettyClient.start();
}
}
补充:项目启动的时候就把netty-server也启动起来(除了上面的写法外,还有一种常用写法)
// 服务器端启动,并绑定 19080 端口
//@PostConstruct是Java自带的注解,在方法上加该注解会在项目启动的时候执行该方法,
// 也可以理解为在spring容器初始化的时候执行该方法。
@PostConstruct服务端主动给客户端发消息
在server的handler里加ChannelGroup属性
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public static ChannelGroup getChannels() {
return channelGroup;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//客户端上线了
Channel channel = ctx.channel();
channelGroup.add(channel);
log.info(ctx.channel().remoteAddress()+"上线了");
} ChannelGroup channelGroup= ServerHandler.getChannels();
Iterator channelIterator = channelGroup.iterator();
while(channelIterator.hasNext()){
Channel channel = (Channel)channelIterator.next();
InetSocketAddress insocket = (InetSocketAddress)channel.remoteAddress();
//给指定客户端ip发送消息
if(ip.equals(insocket.toString())){
channel.writeAndFlush("hello server I am client");
}
}聊天im小案例


server服务端
/**
* @author:xiaojie
* @create: 2023-05-29 22:46
* @Description: 聊天服务端
*/
public class ImServer {
private static EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private static EventLoopGroup workerGroup = new NioEventLoopGroup();
public static void main(String[] args) {
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//设置两个线程组
bootstrap.group(bossGroup, workerGroup)
//使用NioServerSocketChannel 作为服务器的通道实现
.channel(NioServerSocketChannel.class)
//设置线程队列得到连接个数
.option(ChannelOption.SO_BACKLOG, 128)
//设置保持活动连接状态
.childOption(ChannelOption.SO_KEEPALIVE, true)
//通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
.childOption(ChannelOption.TCP_NODELAY, true)
//给workerGroup 的 EventLoop 对应的管道设置处理器
.childHandler(new ChannelInitializer<SocketChannel>() {
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器
pipeline.addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器
pipeline.addLast(new LengthFieldBasedFrameDecoder(24*1024,0,2));
pipeline.addLast(new ImServerHandler());
}
});
//启动服务器并绑定一个端口并且同步生成一个 ChannelFuture 对象
ChannelFuture cf = null;
cf = bootstrap.bind(9000).sync();
if (cf.isSuccess()) {
System.out.println("socket server start---------------");
}
//对关闭通道进行监听
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//发送异常关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}服务端处理器
public class ImServerHandler extends SimpleChannelInboundHandler<Object> {
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//客户端跟服务端连接上后触发
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//将该客户加入聊天的信息推送给其它在线的客户瑞
//该方法会将channeLGroup 中所有的channel 遍历。井发送消息
channelGroup.writeAndFlush("[ 客户端] "+ channel.remoteAddress() +"上线了 "+ sdf.format (new Date()) + "\n");
//将当前channel 加入到channeLGroup
channelGroup.add(channel);
System.out.println(ctx.channel().remoteAddress() +"上线了"+ "\n");
}
//读取客户端数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
//获取到当前channeL
Channel channel = ctx.channel();
//这时我们遍历channeLGroup, 根据不同的情况。回送 不同的消息
channelGroup.forEach(ch -> {
if (channel != ch) { //不是当前的channel,转发消息
ch.writeAndFlush( "[客户瑞]"+ channel.remoteAddress() +"发送了消息:"+ msg + "\n" );
} else {//回显自己发送的消息给自己
ch.writeAndFlush("[自己]发送了消息:" + msg + "\n");
}
});
}
//表示channel处于不活动状态,提示离线
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//将客户离开信息推送给当前在线的客户
channelGroup.writeAndFlush("[ 客户端] "+ channel.remoteAddress() +"下线了 "+ sdf.format (new Date()) + "\n");
System.out.println(channel.remoteAddress()+"下线了"+"\n");
}
}client客户端
public class ImClient {
private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
private static int port = 9000;
private static String host = "localhost";
public static Channel channel;
public static void main(String[] args) {
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringEncoder());//对 String 对象自动编码,属于出站站处理器
pipeline.addLast(new StringDecoder());//把网络字节流自动解码为 String 对象,属于入站处理器
pipeline.addLast(new LengthFieldBasedFrameDecoder(24*1024,0,2));
pipeline.addLast(new ImClientHandler());
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
if (future.isSuccess()) {
channel = future.channel();
System.out.println("=====" + channel.localAddress() + "===");
//客户端需要输入信息。 创建一个扫描源
Scanner scanner = new Scanner(System.in) ;
while (scanner.hasNextLine() ) {
String msg = scanner.nextLine();
//通过channeL发送到服务器端
channel.writeAndFlush(msg);
}
}
// 给关闭通道进行监听
future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}客户端处理器
public class ImClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println(msg.trim());
}
}
好博客就要一起分享哦!分享海报
您可能感兴趣的博客


新业务
springboot学习
ssm框架课
vue学习
【带小白】java基础速成