Java com.google.common.util.concurrent.SettableFuture 代码实例

・18 分钟阅读

以下是展示如何使用com.google.common.util.concurrent.SettableFuture的最佳示例。 我们使用了代码质量辨别算法从开源项目中提取出了最佳的优秀示例。

实例 1


@Before
public void setUp() throws Exception {
    super.setUp();
    sendMoneyToWallet(Utils.COIN, AbstractBlockChain.NewBlockType.BEST_CHAIN);
    sendMoneyToWallet(Utils.COIN, AbstractBlockChain.NewBlockType.BEST_CHAIN);
    wallet.addExtension(new StoredPaymentChannelClientStates(wallet, failBroadcaster));
    chain = new BlockChain(params, wallet, blockStore); // Recreate chain as sendMoneyToWallet will confuse it
    serverWallet = new Wallet(params);
    serverWallet.addExtension(new StoredPaymentChannelServerStates(serverWallet, failBroadcaster));
    serverWallet.addKey(new ECKey());
    chain.addWallet(serverWallet);
    // Use an atomic boolean to indicate failure because fail()/assert*() dont work in network threads
    fail = new AtomicBoolean(false);
    // Set up a way to monitor broadcast transactions. When you expect a broadcast, you must release a permit
    // to the broadcastTxPause semaphore so state can be queried in between.
    broadcasts = new LinkedBlockingQueue<Transaction>();
    broadcastTxPause = new Semaphore(0);
    mockBroadcaster = new TransactionBroadcaster() {
        @Override
        public ListenableFuture<Transaction> broadcastTransaction(Transaction tx) {
            broadcastTxPause.acquireUninterruptibly();
            SettableFuture<Transaction> future = SettableFuture.create();
            future.set(tx);
            broadcasts.add(tx);
            return future;
        }
    };
    // Because there are no separate threads in the tests here (we call back into client/server in server/client
    // handlers), we have lots of lock cycles. A normal user shouldn't have this issue as they are probably not both
    // client+server running in the same thread.
    Threading.warnOnLockCycles();
}
 

实例 2


@Before
public void setUp() throws Exception {
    super.setUp();
    wallet.addExtension(new StoredPaymentChannelClientStates(wallet, new TransactionBroadcaster() {
        @Override
        public ListenableFuture<Transaction> broadcastTransaction(Transaction tx) {
            fail();
            return null;
        }
    }));
    sendMoneyToWallet(Utils.COIN, AbstractBlockChain.NewBlockType.BEST_CHAIN);
    chain = new BlockChain(params, wallet, blockStore); // Recreate chain as sendMoneyToWallet will confuse it
    serverKey = new ECKey();
    serverWallet = new Wallet(params);
    serverWallet.addKey(serverKey);
    chain.addWallet(serverWallet);
    halfCoin = Utils.toNanoCoins(0, 50);
    broadcasts = new LinkedBlockingQueue<TxFuturePair>();
    mockBroadcaster = new TransactionBroadcaster() {
        @Override
        public ListenableFuture<Transaction> broadcastTransaction(Transaction tx) {
            SettableFuture<Transaction> future = SettableFuture.create();
            broadcasts.add(new TxFuturePair(tx, future));
            return future;
        }
    };
}
 

实例 3


@Test
public void testDelayedExecution() throws Exception {
  SettableFuture<String> a = SettableFuture.create();
  ListenableFuture<String> b = Futures.immediateFuture("b");
  ListenableFuture<String> c = Futures.immediateFuture("c");
  ListenableFuture<String> result = FuturesExtra.syncTransform3(a, b, c, new FuturesExtra.Function3<String, String, String, String>() {
    @Override
    public String apply(String s, String s2, String s3) {
      return s + s2 + s3;
    }
  });
  assertEquals(false, result.isDone());
  a.set("a");
  assertEquals("abc", result.get());
}
 

实例 4


public void testSuccessfulSelect() throws Exception {
  final SettableFuture<String> f1 = SettableFuture.create();
  final SettableFuture<String> f2 = SettableFuture.create();
  final SettableFuture<String> f3 = SettableFuture.create();
  f1.set("value");
  final ListenableFuture<String> zealous = FuturesExtra.select(Lists.<ListenableFuture<String>>newArrayList(f1, f2, f3));
  assertTrue(zealous.get().equals("value"));
}
 

实例 5


/**
 * Make a possibly asynchronous request for the device's battery level
 *
 * @param freshness the desired recentness of battery level
 * @param timeUnit the {@link TimeUnit} of freshness
 * @return a {@link Future} that can be used to retrieve the battery level
 */
public synchronized Future<Integer> getBattery(long freshness, TimeUnit timeUnit) {
    SettableFuture<Integer> result;
    if (mBatteryLevel == null || isFetchRequired(freshness, timeUnit)) {
        if (mPendingRequest == null) {
            // no request underway - start a new one
            mPendingRequest = SettableFuture.create();
            initiateBatteryQuery();
        } else {
            // fall through - return the already created future from the request already
            // underway
        }
        result = mPendingRequest;
    } else {
        // cache is populated within desired freshness
        result = SettableFuture.create();
        result.set(mBatteryLevel);
    }
    return result;
}
 

实例 6


/**
 * Returns a future that completes when the transaction has been confirmed by "depth" blocks. For instance setting
 * depth to one will wait until it appears in a block on the best chain, and zero will wait until it has been seen
 * on the network.
 */
public synchronized ListenableFuture<Transaction> getDepthFuture(final int depth) {
    final SettableFuture<Transaction> result = SettableFuture.create();
    if (getDepthInBlocks() >= depth) {
        result.set(transaction);
    }
    addEventListener(new Listener() {
        @Override public void onConfidenceChanged(Transaction tx, ChangeReason reason) {
            // Runs in user code thread.
            if (getDepthInBlocks() >= depth) {
                removeEventListener(this);
                result.set(transaction);
            }
        }
    });
    return result;
}
 

实例 7


/**
 * <p>Returns a future that will complete when the balance of the given type has becom equal or larger to the given
 * value. If the wallet already has a large enough balance the future is returned in a pre-completed state. Note
 * that this method is not blocking, if you want to actually wait immediately, you have to call .get() on
 * the result.</p>
 *
 * <p>Also note that by the time the future completes, the wallet may have changed yet again if something else
 * is going on in parallel, so you should treat the returned balance as advisory and be prepared for sending
 * money to fail! Finally please be aware that any listeners on the future will run either on the calling thread
 * if it completes immediately, or eventually on a background thread if the balance is not yet at the right
 * level. If you do something that means you know the balance should be sufficient to trigger the future,
 * you can use {@link com.google.devcoin.utils.Threading#waitForUserCode()} to block until the future had a
 * chance to be updated.</p>
 */
public ListenableFuture<BigInteger> getBalanceFuture(final BigInteger value, final BalanceType type) {
    lock.lock();
    try {
        final SettableFuture<BigInteger> future = SettableFuture.create();
        final BigInteger current = getBalance(type);
        if (current.compareTo(value) >= 0) {
            // Already have enough.
            future.set(current);
        } else {
            // Will be checked later in checkBalanceFutures. We don't just add an event listener for ourselves
            // here so that running getBalanceFuture().get() in the user code thread works - generally we must
            // avoid giving the user back futures that require the user code thread to be free.
            BalanceFutureRequest req = new BalanceFutureRequest();
            req.future = future;
            req.value = value;
            req.type = type;
            balanceFutureRequests.add(req);
        }
        return future;
    } finally {
        lock.unlock();
    }
}
 

实例 8


@Override
protected void processMessage(Message m) throws Exception {
    if (m instanceof Ping) {
        SettableFuture<Void> future = mapPingFutures.get(((Ping)m).getNonce());
        if (future != null) {
            future.set(null);
            return;
        }
    }
    if (m instanceof BloomFilter) {
        lastReceivedFilter = (BloomFilter) m;
    }
    inboundMessages.offer(m);
}
 

实例 9


private void receivePaymentAck() {
    SettableFuture<BigInteger> future;
    BigInteger value;
    lock.lock();
    try {
        if (increasePaymentFuture == null) return;
        checkNotNull(increasePaymentFuture, "Server sent a PAYMENT_ACK with no outstanding payment");
        log.info("Received a PAYMENT_ACK from the server");
        future = increasePaymentFuture;
        value = lastPaymentActualAmount;
    } finally {
        lock.unlock();
    }
    // Ensure the future runs without the client lock held.
    future.set(value);
}
 

实例 10


@Suspendable
private static void sleep() throws SuspendExecution, ExecutionException, InterruptedException {
        final SettableFuture<Boolean> future = SettableFuture.create();
        final Timer timer = new Timer(1000, new ActionListener() {
                @Override
                public void actionPerformed(ActionEvent e) {
                        future.set(true);
                }
        });
        timer.setRepeats(false);
        timer.start();
        AsyncListenableFuture.get(future);
}
 

实例 11


@Test
public void badMessage() throws Exception {
    // Bring up an actual network connection and feed it bogus data.
    final SettableFuture<Void> result = SettableFuture.create();
    Threading.uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() {
        @Override
        public void uncaughtException(Thread thread, Throwable throwable) {
            result.setException(throwable);
        }
    };
    ServerSocket server = new ServerSocket(0);
    final NetworkParameters params = TestNet3Params.testNet();
    Peer peer = new Peer(params, blockChain, "test", "1.0");
    ListenableFuture<TCPNetworkConnection> future = TCPNetworkConnection.connectTo(TestNet3Params.get(),
            new InetSocketAddress(InetAddress.getLocalHost(), server.getLocalPort()), 5000, peer);
    Socket socket = server.accept();
    // Write out a verack+version.
    BitcoinSerializer serializer = new BitcoinSerializer(params);
    final VersionMessage ver = new VersionMessage(params, 1000);
    ver.localServices = VersionMessage.NODE_NETWORK;
    serializer.serialize(ver, socket.getOutputStream());
    serializer.serialize(new VersionAck(), socket.getOutputStream());
    // Now write some bogus truncated message.
    serializer.serialize("inv", new InventoryMessage(params) {
        @Override
        public void bitcoinSerializeToStream(OutputStream stream) throws IOException {
            // Add some hashes.
            addItem(new InventoryItem(InventoryItem.Type.Transaction, Sha256Hash.create(new byte[] { 1 })));
            addItem(new InventoryItem(InventoryItem.Type.Transaction, Sha256Hash.create(new byte[] { 2 })));
            addItem(new InventoryItem(InventoryItem.Type.Transaction, Sha256Hash.create(new byte[] { 3 })));
            // Write out a copy that's truncated in the middle.
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            super.bitcoinSerializeToStream(bos);
            byte[] bits = bos.toByteArray();
            bits = Arrays.copyOf(bits, bits.length / 2);
            stream.write(bits);
        }
    }.bitcoinSerialize(), socket.getOutputStream());
    try {
        result.get();
        fail();
    } catch (ExecutionException e) {
        assertTrue(e.getCause() instanceof ProtocolException);
    }
}
 

实例 12


@Test(expected = ExecutionException.class)
public void testAllFailedSelect() throws Exception {
  final SettableFuture<String> f1 = SettableFuture.create();
  final SettableFuture<String> f2 = SettableFuture.create();
  final SettableFuture<String> f3 = SettableFuture.create();
  f1.setException(new Exception());
  f2.setException(new Exception());
  f3.setException(new Exception());
  final ListenableFuture<String> zealous = FuturesExtra.select(Arrays.<ListenableFuture<String>>asList(f1, f2, f3));
  zealous.get(); // will throw Exception
}
 

实例 13


@Test()
public void testOneSuccessfulSelect() throws Exception {
  final SettableFuture<String> f1 = SettableFuture.create();
  final SettableFuture<String> f2 = SettableFuture.create();
  final SettableFuture<String> f3 = SettableFuture.create();
  f1.setException(new Exception());
  f2.set("value");
  f3.setException(new Exception());
  final ListenableFuture<String> zealous = FuturesExtra.select(Arrays.<ListenableFuture<String>>asList(f1, f2, f3));
  assertTrue(zealous.get().equals("value"));
}
 

实例 14


private <V> ChannelFutureListener getPublishChannelFutureListener(final SettableFuture<V> result, final V resultObj,
                                                                  final ConnectionPool.ConnectionReleaser releaser) {
  return new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
      try {
        if (future.isSuccess()) {
          result.set(resultObj);
        } else if (future.isCancelled()) {
          result.cancel(true);
        } else {
          result.setException(future.getCause());
        }
      } finally {
        releaser.release();
      }
    }
  };
}
 

实例 15


public ListenableFuture<ResultMessage> call(CallMessage message) {
    SettableFuture<ResultMessage> future = SettableFuture.create();
    message.setId(idCounter.getAndIncrement());
    try {
        toServer.writeBytes(message.toString());
        callers.put(message.getId(), future);
    } catch (Exception e) {
        future.setException(e);
        log.error("Error making a call to the server: {}", e.getMessage());
        triggerShutdown();
    }
    return future;
}
 
讨论
淘淘あ西西 profile image