Java io.netty.util.ReferenceCountUtil 代码实例

・18 分钟阅读

以下是展示如何使用io.netty.util.ReferenceCountUtil的最佳示例。 我们使用了代码质量辨别算法从开源项目中提取出了最佳的优秀示例。

实例 1


private static void testPerformOpeningHandshake0(boolean subProtocol) {
    EmbeddedChannel ch = new EmbeddedChannel(
            new HttpObjectAggregator(42), new HttpRequestDecoder(), new HttpResponseEncoder());
    FullHttpRequest req = ReferenceCountUtil.releaseLater(
            new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, "/chat"));
    req.headers().set(Names.HOST, "server.example.com");
    req.headers().set(Names.UPGRADE, WEBSOCKET.toLowerCase());
    req.headers().set(Names.CONNECTION, "Upgrade");
    req.headers().set(Names.SEC_WEBSOCKET_KEY, "dGhlIHNhbXBsZSBub25jZQ==");
    req.headers().set(Names.SEC_WEBSOCKET_ORIGIN, "http://example.com");
    req.headers().set(Names.SEC_WEBSOCKET_PROTOCOL, "chat, superchat");
    req.headers().set(Names.SEC_WEBSOCKET_VERSION, "13");
    if (subProtocol) {
        new WebSocketServerHandshaker13(
                "ws://example.com/chat", "chat", false, Integer.MAX_VALUE).handshake(ch, req);
    } else {
        new WebSocketServerHandshaker13(
                "ws://example.com/chat", null, false, Integer.MAX_VALUE).handshake(ch, req);
    }
    ByteBuf resBuf = (ByteBuf) ch.readOutbound();
    EmbeddedChannel ch2 = new EmbeddedChannel(new HttpResponseDecoder());
    ch2.writeInbound(resBuf);
    HttpResponse res = (HttpResponse) ch2.readInbound();
    Assert.assertEquals(
            "s3pPLMBiTxaQ9kYGzzhZRbK+xOo=", res.headers().get(Names.SEC_WEBSOCKET_ACCEPT));
    if (subProtocol) {
        Assert.assertEquals("chat", res.headers().get(Names.SEC_WEBSOCKET_PROTOCOL));
    } else {
        Assert.assertNull(res.headers().get(Names.SEC_WEBSOCKET_PROTOCOL));
    }
    ReferenceCountUtil.release(res);
}
 

实例 2


@Test
public void testHttpUpgradeRequest() throws Exception {
    EmbeddedChannel ch = createChannel(new MockOutboundHandler());
    ChannelHandlerContext handshakerCtx = ch.pipeline().context(WebSocketServerProtocolHandshakeHandler.class);
    writeUpgradeRequest(ch);
    assertEquals(SWITCHING_PROTOCOLS, ReferenceCountUtil.releaseLater(responses.remove()).getStatus());
    assertNotNull(WebSocketServerProtocolHandler.getHandshaker(handshakerCtx));
}
 

实例 3


@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Class<?> messageClass = msg.getClass();
        if (!handshaker.isHandshakeComplete()) {
                ctx.pipeline().remove(HttpObjectAggregator.class);
                handshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
                httpChannel = new NettyHttpChannel(tcpStream, new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/")) {
                        @Override
                        protected void doSubscribeHeaders(Subscriber<? super Void> s) {
                                Publishers.<Void>empty().subscribe(s);
                        }
                };
                NettyHttpWSClientHandler.super.channelActive(ctx);
                super.channelRead(ctx, msg);
                return;
        }
        if (TextWebSocketFrame.class.isAssignableFrom(messageClass)) {
                try {
                        //don't inflate the String bytes now
                        channelSubscriber.onNext(new StringBuffer(((TextWebSocketFrame) msg).content().nioBuffer()));
                } finally {
                        ReferenceCountUtil.release(msg);
                }
        } else if (CloseWebSocketFrame.class.isAssignableFrom(messageClass)) {
                ctx.close();
        } else {
                doRead(ctx, ((WebSocketFrame)msg).content());
        }
}
 

实例 4


@SuppressWarnings("unchecked")
protected final void doRead(ChannelHandlerContext ctx, Object msg) {
        try {
                if (null == channelSubscriber || msg == Unpooled.EMPTY_BUFFER) {
                        ReferenceCountUtil.release(msg);
                        return;
                }
                NettyBuffer buffer = NettyBuffer.create(msg);
                try {
                        channelSubscriber.onNext(buffer);
                }
                finally {
                        if (buffer.getByteBuf() != null) {
                                if (buffer.getByteBuf()
                                          .refCnt() != 0) {
                                        ReferenceCountUtil.release(buffer.getByteBuf());
                                }
                        }
                }
        }
        catch (Throwable err) {
                Exceptions.throwIfFatal(err);
                if (channelSubscriber != null) {
                        channelSubscriber.onError(err);
                }
                else {
                        throw err;
                }
        }
}
 

实例 5


/**
 * Test try to reproduce issue #1335
 */
@Test
public void testBindMultiple() throws Exception {
    DefaultChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    NioEventLoopGroup group = new NioEventLoopGroup();
    try {
        for (int i = 0; i < 100; i++) {
            Bootstrap udpBootstrap = new Bootstrap();
            udpBootstrap.group(group).channel(NioDatagramChannel.class)
                    .option(ChannelOption.SO_BROADCAST, true)
                    .handler(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) {
                            // Discard
                            ReferenceCountUtil.release(msg);
                        }
                    });
            DatagramChannel datagramChannel = (DatagramChannel) udpBootstrap
                    .bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
            channelGroup.add(datagramChannel);
        }
        Assert.assertEquals(100, channelGroup.size());
    } finally {
        channelGroup.close().sync();
        group.shutdownGracefully().sync();
    }
}
 

实例 6


@BeforeClass
public static void init() {
    // Configure a test server
    group = new LocalEventLoopGroup();
    ServerBootstrap sb = new ServerBootstrap();
    sb.group(group)
      .channel(LocalServerChannel.class)
      .childHandler(new ChannelInitializer<LocalChannel>() {
          @Override
          public void initChannel(LocalChannel ch) throws Exception {
              ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                  @Override
                  public void channelRead(ChannelHandlerContext ctx, Object msg) {
                      // Discard
                      ReferenceCountUtil.release(msg);
                  }
              });
          }
      });
    localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress();
}
 

实例 7


@BeforeClass
public static void init() {
    // Configure a test server
    group = new LocalEventLoopGroup();
    ServerBootstrap sb = new ServerBootstrap();
    sb.group(group)
            .channel(LocalServerChannel.class)
            .childHandler(new ChannelInitializer<LocalChannel>() {
                @Override
                public void initChannel(LocalChannel ch) throws Exception {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) {
                            // Discard
                            ReferenceCountUtil.release(msg);
                        }
                    });
                }
            });
    localAddr = (LocalAddress) sb.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress();
}
 

实例 8


@Override
public ChannelGroupFuture write(Object message, ChannelMatcher matcher) {
    if (message == null) {
        throw new NullPointerException("message");
    }
    if (matcher == null) {
        throw new NullPointerException("matcher");
    }
    Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
    for (Channel c: nonServerChannels) {
        if (matcher.matches(c)) {
            futures.put(c, c.write(safeDuplicate(message)));
        }
    }
    ReferenceCountUtil.release(message);
    return new DefaultChannelGroupFuture(this, futures, executor);
}
 

实例 9


@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (isRemote(ctx)) {
        ByteBuf payload = (ByteBuf) msg;
        byte[] data = getPayloadFromByteBuf(payload);
        writeBuffer(data);
        return;
    }
    ReferenceCountUtil.retain(msg);
    // propagate the data to rest of handlers in pipeline
    ctx.fireChannelRead(msg);
}
 

实例 10


@Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
    // The first message must be authentication response
    if (this.authenticationUrl != null && (this.cookies == null || this.cookies.isEmpty())) {
        HttpResponse response = (HttpResponse) msg;
        CharSequence cookieData = response.headers().get(new AsciiString("set-cookie"));
        if (cookieData != null) {
            this.cookies = ServerCookieDecoder.decode(cookieData.toString());
            if (this.cookies == null || this.cookies.isEmpty()) {
                throw new WebSocketAuthenticationFailureException("Could not authenticate");
            }
            if (log.isDebugEnabled()) {
                for (Cookie cookie : this.cookies) {
                    log.debug("Server says must set cookie with name {} and value {}", cookie.name(), cookie.value());
                }
            }
        } else {
            throw new ITException("Could not authenticate");
        }
        if (log.isDebugEnabled()) {
            log.debug("Authentication succeeded for user {}", this.user);
        }
        handShaker.handshake(ctx.channel());
        return;
    }
    // The second one must be the response for web socket handshake
    if (!handShaker.isHandshakeComplete()) {
        handShaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
        if (log.isDebugEnabled()) {
            log.debug("Web socket client connected for user {}", this.user);
        }
        handshakeFuture.setSuccess();
        return;
    }
    // Take the byte buff and send it up to Stomp decoder
    if (msg instanceof WebSocketFrame) {
        if (log.isDebugEnabled()) {
            if (msg instanceof TextWebSocketFrame) {
                log.debug("Received text frame {}", ((TextWebSocketFrame) msg).text());
            }
        }
        ReferenceCountUtil.retain(msg);
        ctx.fireChannelRead(((WebSocketFrame) msg).content());
    }
}
 

实例 11


@Override
protected void encode(ChannelHandlerContext ctx, DefaultHttpMessage defaultHttpMessage, List out) throws Exception {
    if (defaultHttpMessage.headers().contains(HttpHeaders.CONTENT_LENGTH, "", true)) {
        defaultHttpMessage.headers().remove(HttpHeaders.CONTENT_LENGTH);
    }
    ReferenceCountUtil.retain(defaultHttpMessage);
    out.add(defaultHttpMessage);
}
 

实例 12


private static Object safeDuplicate(Object message) {
    if (message instanceof ByteBuf) {
        return ((ByteBuf) message).duplicate().retain();
    } else if (message instanceof ByteBufHolder) {
        return ((ByteBufHolder) message).duplicate().retain();
    } else {
        return ReferenceCountUtil.retain(message);
    }
}
 

实例 13


@Override
public void onNext(T t) {
    // Retain so that post-buffer, the ByteBuf does not get released.
    // Release will be done after reading from the subject.
    ReferenceCountUtil.retain(t);
    state.bufferedObserver.onNext(t);
    // Schedule timeout once and when not subscribed yet.
    if (state.casTimeoutScheduled() && state.state == State.STATES.UNSUBSCRIBED.ordinal()) {
        timeoutScheduler.subscribe(new Action1<Long>() { // Schedule timeout after the first content arrives.
            @Override
            public void call(Long aLong) {
                disposeIfNotSubscribed();
            }
        });
    }
}
 

实例 14


@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
    Channel channel = ctx.channel();
    if (msg instanceof HttpRequest) {
        HttpRequest request = (HttpRequest) msg;
        if (handleRequest(request, channel, ctx)) {
            if (httpMethodInfoBuilder.getHttpResourceModel()
                    .isStreamingReqSupported() &&
                    channel.pipeline().get("aggregator") != null) {
                channel.pipeline().remove("aggregator");
            } else if (!httpMethodInfoBuilder.getHttpResourceModel()
                    .isStreamingReqSupported() &&
                    channel.pipeline().get("aggregator") == null) {
                channel.pipeline().addAfter("router", "aggregator",
                        new HttpObjectAggregator(Integer.MAX_VALUE));
            }
        }
        ReferenceCountUtil.retain(msg);
        ctx.fireChannelRead(msg);
    } else if (msg instanceof HttpContent) {
        ReferenceCountUtil.retain(msg);
        ctx.fireChannelRead(msg);
    }
}
 

实例 15


@Override
public void onData(final ByteBuf input) {
    // We need to retain until the serializer gets around to processing it.
    ReferenceCountUtil.retain(input);
    serializer.execute(new Runnable() {
        @Override
        public void run() {
            if (isTraceBytes()) {
                TRACE_BYTES.info("Received: {}", ByteBufUtil.hexDump(input));
            }
            ByteBuffer source = input.nioBuffer();
            do {
                ByteBuffer buffer = protonTransport.getInputBuffer();
                int limit = Math.min(buffer.remaining(), source.remaining());
                ByteBuffer duplicate = source.duplicate();
                duplicate.limit(source.position() + limit);
                buffer.put(duplicate);
                protonTransport.processInput();
                source.position(source.position() + limit);
            } while (source.hasRemaining());
            ReferenceCountUtil.release(input);
            // Process the state changes from the latest data and then answer back
            // any pending updates to the Broker.
            processUpdates();
            pumpToProtonTransport();
        }
    });
}
 
讨论
淘淘あ西西 profile image