本文最后更新于 2022-05-26 11:25:30
组件
EventLoop
事件循环对象 eventloop
eventloop本质是一个单线程执行器
它的继承关系如下
- 继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
- 继承自 netty 自己的 OrderedEventExecutor
- 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
- 提供了 EventLoopGroup parent() 方法来看看自己属于哪个 EventLoopGroup
实现循环组 eventloopgroup
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
- 继承自 netty 自己的 EventExecutorGroup最终还是继承自ScheduledExecutorService
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
处理普通以及定时任务
就是ScheduledExecutorService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| EventLoopGroup group = new NioEventLoopGroup(2);
System.out.println(group.next()); System.out.println(group.next()); System.out.println(group.next()); System.out.println(group.next());
group.next().submit(()->{ try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("submit thread"); });
System.out.println("main thread");
|
处理IO任务
同一个pipeline上的handler在同一个group

server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class MyServer { public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
} }); } }) .bind(8080); } }
|
client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class MyClient { public static void main(String[] args) throws IOException, InterruptedException { Channel channel = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost", 8080)) .sync() .channel(); System.out.println(channel); System.in.read(); } }
|
同一个pipeline上的handler在不同group

server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public static void main(String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { ChannelPipeline pipeline = nioSocketChannel.pipeline(); pipeline.addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println(Thread.currentThread().getName()+buf.toString(StandardCharsets.UTF_8)); ctx.fireChannelRead(msg); } }); pipeline.addLast(defaultEventLoopGroup,new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println(Thread.currentThread().getName()+buf.toString(StandardCharsets.UTF_8)); } }); } });
ChannelFuture future = serverBootstrap.bind(9090).sync(); future.channel().closeFuture().sync(); defaultEventLoopGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
|
切换group实现原理
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
|
Channel
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| ChannelId id(); EventLoop eventLoop(); Channel parent(); ChannelConfig config(); boolean isOpen(); boolean isRegistered(); boolean isActive();
ChannelMetadata metadata(); SocketAddress localAddress(); SocketAddress remoteAddress(); ChannelFuture closeFuture(); boolean isWritable(); long bytesBeforeUnwritable(); long bytesBeforeWritable();
Unsafe unsafe(); ChannelPipeline pipeline(); ByteBufAllocator alloc(); @Override Channel read(); @Override Channel flush();
|
EmbeddedChannel
EmbeddedChannel可以用于测试各个handler,通过其构造函数按顺序传入需要测试handler,然后调用对应的Inbound和Outbound方法即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class TestEmbeddedChannel { public static void main(String[] args) { ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("1"); super.channelRead(ctx, msg); } };
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("2"); super.channelRead(ctx, msg); } };
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("3"); super.write(ctx, msg, promise); } };
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("4"); super.write(ctx, msg, promise); } };
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4); channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8))); channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8))); } }
|
ChannelFuture
阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class MyClient { public static void main(String[] args) throws IOException, InterruptedException { ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost", 8080)); channelFuture.sync(); Channel channel = channelFuture.channel(); channel.writeAndFlush("hello world"); } }
|
回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class MyClient { public static void main(String[] args) throws IOException, InterruptedException { ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost", 8080)); channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { Channel channel = channelFuture.channel(); channel.writeAndFlush("hello world"); } }); } }
|
处理关闭
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public class ReadClient { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup(); ChannelFuture channelFuture = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost", 8080)); channelFuture.sync();
Channel channel = channelFuture.channel(); Scanner scanner = new Scanner(System.in);
new Thread(()->{ while (true) { String msg = scanner.next(); if ("q".equals(msg)) { channel.close(); break; } channel.writeAndFlush(msg); } }, "inputThread").start();
ChannelFuture closeFuture = channel.closeFuture(); System.out.println("waiting close..."); closeFuture.sync(); System.out.println("关闭之后执行一些额外操作..."); group.shutdownGracefully(); } }
|
closeFuture回调
1 2 3 4 5 6 7 8 9
| closeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { System.out.println("关闭之后执行一些额外操作..."); group.shutdownGracefully(); } });
|
关系
| 功能/名称 |
jdk Future |
netty Future 继承自jdk Future |
Promise 继承自netty Future |
| cancel |
取消任务 |
|
|
| isCanceled |
任务是否取消 |
|
|
| isDone |
任务是否完成,不能区分成功失败 |
|
|
| get |
获取任务结果,阻塞等待 |
|
|
| getNow |
- |
获取任务结果,非阻塞,还未产生结果时返回 null |
|
| await |
- |
等待任务结束,如果任务失败,**不会抛异常(不是打断异常,是异步逻辑中的异常)**,而是通过 isSuccess 判断 |
|
| sync |
- |
等待任务结束,如果任务失败,抛出异常(reThrow异步逻辑中的异常) |
|
| isSuccess |
- |
判断任务是否成功 |
|
| cause |
- |
获取失败信息,非阻塞,如果没有失败,返回null |
|
| addLinstener |
- |
添加回调,异步接收结果 |
|
| removeListeners |
- |
删除回调 |
|
| setSuccess |
- |
- |
设置成功结果,设置失败抛异常(DefaultPromise) |
| setFailure |
- |
- |
设置失败结果,设置失败抛异常(DefaultPromise) |
| trySuccess |
- |
- |
设置成功结果,返回是否设置成功 |
| tryFailure |
- |
- |
设置失败结果,返回是否设置成功 |
jdk Future
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class JdkFuture { public static void main(String[] args) throws ExecutionException, InterruptedException { ThreadFactory factory = new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "JdkFuture"); } }; ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), factory);
Future<Integer> future = executor.submit(new Callable<Integer>() {
@Override public Integer call() throws Exception { TimeUnit.SECONDS.sleep(1); return 50; } }); System.out.println(future.get()); } }
|
Netty Future
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class NettyFuture { public static void main(String[] args) throws ExecutionException, InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next(); Future<Integer> future = eventLoop.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { return 50; } });
System.out.println(Thread.currentThread().getName() + " 获取结果"); System.out.println("getNow " + future.getNow()); System.out.println("get " + future.get());
future.addListener(new GenericFutureListener<Future<? super Integer>>() { @Override public void operationComplete(Future<? super Integer> future) throws Exception { System.out.println(Thread.currentThread().getName() + " 获取结果"); System.out.println("getNow " + future.getNow()); } }); } }
|
Netty Promise
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class NettyPromise { public static void main(String[] args) throws ExecutionException, InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup(); EventLoop eventLoop = group.next();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(()->{ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } promise.setSuccess(50); }).start();
System.out.println(Thread.currentThread().getName() + " " + promise.get()); } }
|
Handler&Pipeline
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| public class PipeLineServer { public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast("handler1" ,new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(Thread.currentThread().getName() + " Inbound handler 1"); super.channelRead(ctx, msg); } }); socketChannel.pipeline().addLast("handler2", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(Thread.currentThread().getName() + " Inbound handler 2"); socketChannel.writeAndFlush(ctx.alloc().buffer(). writeBytes("Server...".getBytes(StandardCharsets.UTF_8))); super.channelRead(ctx, msg); } }); socketChannel.pipeline().addLast("handler3" ,new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println(Thread.currentThread().getName() + " Outbound handler 1"); super.write(ctx, msg, promise); } }); socketChannel.pipeline().addLast("handler4" ,new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println(Thread.currentThread().getName() + " Outbound handler 2"); super.write(ctx, msg, promise); } }); } }) .bind(8080); } }
|
如果给handler起了名字 ,可以调用pipeline的addAfter、addBefore等方法更灵活地向pipeline中添加handler
handler需要放入通道的pipeline中,才能根据放入顺序来使用handler
- pipeline是结构是一个带有head与tail指针的双向链表,其中的节点为handler
- 要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler
- 当有入站(Inbound)操作时,会从head开始向后调用handler,直到handler不是处理Inbound操作为止
- 当有出站(Outbound)操作时,会从tail开始向前调用handler,直到handler不是处理Outbound操作为止
socketChannel.writeAndFlush()
当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从tail向前寻找
ctx.writeAndFlush()
当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从当前handler向前寻找OutboundHandler
加了@Sharable的注解的Handler是可以多线程共用的
ByteBuf
友好打印bytebuf
1 2 3 4 5 6 7 8 9 10 11
| private static void log(ByteBuf buffer) { int length = buffer.readableBytes(); int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4; StringBuilder buf = new StringBuilder(rows * 80 * 2) .append("read index:").append(buffer.readerIndex()) .append(" write index:").append(buffer.writerIndex()) .append(" capacity:").append(buffer.capacity()) .append(NEWLINE); appendPrettyHexDump(buf, buffer); System.out.println(buf.toString()); }
|
常用创建方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class ByteBufStudy { public static void main(String[] args) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16); ByteBufUtil.log(buffer);
StringBuilder sb = new StringBuilder(); for(int i = 0; i < 20; i++) { sb.append("a"); } buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
ByteBufUtil.log(buffer); } }
|
1 2 3 4 5 6 7 8 9
| read index:0 write index:0 capacity:16
read index:0 write index:20 capacity:64 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa| |00000010| 61 61 61 61 |aaaa | +--------+-------------------------------------------------+----------------+
|
ByteBuf通过ByteBufAllocator选择allocator并调用对应的buffer()方法来创建的,默认使用直接内存作为ByteBuf,容量为256个字节,可以指定初始容量的大小
当ByteBuf的容量无法容纳所有数据时,ByteBuf会进行扩容操作
如果在handler中创建ByteBuf,建议使用ChannelHandlerContext ctx.alloc().buffer()来创建
直接内存&堆内存
通过该默认方法创建的ByteBuf,使用的是基于直接内存的ByteBuf
1
| ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
|
可以使用下面的代码来创建池化基于堆的 ByteBuf
1
| ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);
|
也可以使用下面的代码来创建池化基于直接内存的 ByteBuf
1
| ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
|
- 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用
- 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放
池化与非池化
池化的最大意义在于可以重用 ByteBuf,优点有
- 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
- 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
- 高并发时,池化功能更节约内存,减少内存溢出的可能
池化功能是否开启,可以通过下面的系统环境变量来设置
1
| -Dio.netty.allocator.type={unpooled|pooled}
|
- 4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
- 4.1 之前,池化功能还不成熟,默认是非池化实现
bytebuf组成

写入
常用方法如下
| 方法签名 |
含义 |
备注 |
| writeBoolean(boolean value) |
写入 boolean 值 |
用一字节 01|00 代表 true|false |
| writeByte(int value) |
写入 byte 值 |
|
| writeShort(int value) |
写入 short 值 |
|
| writeInt(int value) |
写入 int 值 |
Big Endian(大端写入),即 0x250,写入后 00 00 02 50 |
| writeIntLE(int value) |
写入 int 值 |
Little Endian(小端写入),即 0x250,写入后 50 02 00 00 |
| writeLong(long value) |
写入 long 值 |
|
| writeChar(int value) |
写入 char 值 |
|
| writeFloat(float value) |
写入 float 值 |
|
| writeDouble(double value) |
写入 double 值 |
|
| writeBytes(ByteBuf src) |
写入 netty 的 ByteBuf |
|
| writeBytes(byte[] src) |
写入 byte[] |
|
| writeBytes(ByteBuffer src) |
写入 nio 的 ByteBuffer |
|
| int writeCharSequence(CharSequence sequence, Charset charset) |
写入字符串 |
CharSequence为字符串类的父类,第二个参数为对应的字符集 |
- 网络传输中,默认习惯是 Big Endian,使用 writeInt(int value)
- 还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置
读取
读取主要是通过一系列read方法进行读取,读取时会根据读取数据的字节数移动读指针
如果需要重复读取,需要调用buffer.markReaderIndex()对读指针进行标记,并通过buffer.resetReaderIndex()将读指针恢复到mark标记的位置
释放
由于 Netty 中有堆外内存(直接内存)的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。
- UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
- UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
- PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存
Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口
- 每个 ByteBuf 对象的初始计数为 1
- 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
- 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
- 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用
释放规则
因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
基本规则是,谁是最后使用者,谁负责 release
当ByteBuf被传到了pipeline的head与tail时,ByteBuf会被其中的方法彻底释放,但前提是ByteBuf被传递到了head与tail中
TailConext中释放ByteBuf的源码
1 2 3 4 5 6 7 8
| protected void onUnhandledInboundMessage(Object msg) { try { logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg); } }
|
判断传过来的是否为ByteBuf,是的话才需要释放
1 2 3
| public static boolean release(Object msg) { return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false; }
|
切片
ByteBuf切片是【零拷贝】的体现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write 指针
得到分片后的buffer后,要调用其retain方法,使其内部的引用计数加一。避免原ByteBuf释放,导致切片buffer无法使用
修改原ByteBuf中的值,也会影响切片后得到的ByteBuf

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class TestSlice { public static void main(String[] args) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
ByteBuf slice1 = buffer.slice(0, 5); ByteBuf slice2 = buffer.slice(5, 5);
slice1.retain(); slice2.retain(); ByteBufUtil.log(slice1); ByteBufUtil.log(slice2);
System.out.println("===========修改原buffer中的值==========="); buffer.setByte(0,5);
System.out.println("===========打印slice1==========="); ByteBufUtil.log(slice1); } }
|
运行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| read index:0 write index:5 capacity:5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 01 02 03 04 05 |..... | +--------+-------------------------------------------------+----------------+ read index:0 write index:5 capacity:5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 06 07 08 09 0a |..... | +--------+-------------------------------------------------+----------------+ ===========修改原buffer中的值=========== ===========打印slice1=========== read index:0 write index:5 capacity:5 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 05 02 03 04 05 |..... | +--------+-------------------------------------------------+----------------+
|
组合bytebuf
底层没有拷贝
addComponents第一个参数设置为true 移动写指针
1 2 3 4
| ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)); ByteBuf byteBuf2 = ByteBufAllocator.DEFAULT.buffer().writeBytes("hello2".getBytes(StandardCharsets.UTF_8)); CompositeByteBuf compositeByteBuf = ByteBufAllocator.DEFAULT.compositeBuffer(); compositeByteBuf.addComponents(true,byteBuf,byteBuf2);
|
1 2 3 4 5 6
| read index:0 write index:11 capacity:11 +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 65 6c 6c 6f 68 65 6c 6c 6f 32 |hellohello2 | +--------+-------------------------------------------------+----------------+
|
duplicate
与原始bytebuf享用同一块内存,没拷贝,但是独立读写指针
1 2
| ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)); ByteBuf duplicate = byteBuf.duplicate();
|
优势
- 池化思想 - 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能
- 读写指针分离,不需要像 ByteBuffer 一样切换读写模式
- 可以自动扩容
- 支持链式调用,使用更流畅
- 很多地方体现零拷贝,例如
- slice、duplicate、CompositeByteBuf