Java rx.subscriptions.CompositeSubscription 代码实例

・17 分钟阅读

以下是展示如何使用rx.subscriptions.CompositeSubscription的最佳示例。 我们使用了代码质量辨别算法从开源项目中提取出了最佳的优秀示例。

实例 1


private void tryOutSchedulers() throws InterruptedException {
    final CountDownLatch cdl = new CountDownLatch(4);
    final Action0 countAction = new Action0() {
        @Override
        public void call() {
            cdl.countDown();
        }
    };
    CompositeSubscription csub = new CompositeSubscription();
    try {
        Worker w1 = Schedulers.computation().createWorker();
        csub.add(w1);
        w1.schedule(countAction);
        Worker w2 = Schedulers.io().createWorker();
        csub.add(w2);
        w2.schedule(countAction);
        Worker w3 = Schedulers.newThread().createWorker();
        csub.add(w3);
        w3.schedule(countAction);
        GenericScheduledExecutorService.getInstance().execute(new Runnable() {
            @Override
            public void run() {
                countAction.call();
            }
        });
        RxRingBuffer.getSpscInstance().release();
        if (!cdl.await(3, TimeUnit.SECONDS)) {
            fail("countAction was not run by every worker");
        }
    } finally {
        csub.unsubscribe();
    }
}
 

实例 2


@Override
public void onActivityCreated(@Nullable Bundle savedInstanceState) {
    super.onActivityCreated(savedInstanceState);
    mSubscriptions = new CompositeSubscription();
    onScrollChanged(mScrollView.getCurrentScrollY(), false, false);
    onMovieLoaded(getArguments().getParcelable(ARG_MOVIE));
    if (mReviews != null) onReviewsLoaded(mReviews);
    else loadReviews();
    if (mVideos != null) onVideosLoaded(mVideos);
    else loadVideos();
    // subscribe to global favored changes in order to synchronise movies from different views
    mSubscriptions.add(mHelper.getFavoredObservable()
            .filter(event -> ((mMovie != null)
                    && (mMovie.getId() == event.movieId)))
            .subscribe(movie -> {
                mMovie.setFavored(movie.favored);
                mFavoriteButton.setSelected(movie.favored);
            }));
}
 

实例 3


@Override
protected void onStart() {
    super.onStart();
    adapter.setLogs(lumberYard.bufferedLogs());
    subscriptions = new CompositeSubscription();
    subscriptions.add(lumberYard.logs() //
        .observeOn(AndroidSchedulers.mainThread()) //
        .subscribe(adapter));
}
 

实例 4


public void addSubscription(Subscription s) {
        if (mCompositeSubscription == null) {
            mCompositeSubscription = new CompositeSubscription();
        }
        mCompositeSubscription.add(s);
    }
 

实例 5


private void setupMadge(final ViewHolder viewHolder, CompositeSubscription subscriptions) {
  subscriptions.add(pixelGridEnabled.asObservable().subscribe(enabled -> {
    viewHolder.madgeFrameLayout.setOverlayEnabled(enabled);
  }));
  subscriptions.add(pixelRatioEnabled.asObservable().subscribe(enabled -> {
    viewHolder.madgeFrameLayout.setOverlayRatioEnabled(enabled);
  }));
}
 

实例 6


/** 
 * Schedules a reconnect operation<br>
 * The reconnect can be canceled by unsubscribing the {@link #reconnectSubscription} 
 * */
private void scheduleReconnect() {
    // Check for possible reconnects
    if (remainingNrReconnects == 0) return;
    status = Status.Connecting;
    // Decrease remaining number of reconnects if it's not infinite
    if (remainingNrReconnects > 0) remainingNrReconnects--;
    // Make a composite subscription that is used to cancel the
    // reconnect. The status of it can be checked inside the callback
    final CompositeSubscription sub = new CompositeSubscription();
    sub.add(scheduler.createWorker().schedule(new Action0() {
        @Override
        public void call() {
            if (reconnectSubscription.isUnsubscribed()) return; 
            beginConnect();
        }
    }, reconnectInterval, TimeUnit.MILLISECONDS));
    reconnectSubscription = sub; // Store it as our new reconnect subscription
}
 

实例 7


@Test
public void testNestedTrampolineWithUnsubscribe() {
    final ArrayList<String> workDone = new ArrayList<String>();
    final CompositeSubscription workers = new CompositeSubscription();
    Worker worker = Schedulers.trampoline().createWorker();
    try {
        workers.add(worker);
        worker.schedule(new Action0() {
            @Override
            public void call() {
                workers.add(doWorkOnNewTrampoline("A", workDone));
            }
        });
        final Worker worker2 = Schedulers.trampoline().createWorker();
        workers.add(worker2);
        worker2.schedule(new Action0() {
            @Override
            public void call() {
                workers.add(doWorkOnNewTrampoline("B", workDone));
                // we unsubscribe worker2 ... it should not affect work scheduled on a separate Trampline.Worker
                worker2.unsubscribe();
            }
        });
        assertEquals(6, workDone.size());
        assertEquals(Arrays.asList("A.1", "A.B.1", "A.B.2", "B.1", "B.B.1", "B.B.2"), workDone);
    } finally {
        workers.unsubscribe();
    }
}
 

实例 8


@Override
public CloseableIterator<U> iterator() {
    CompositeSubscription handle = new CompositeSubscription();
    ObserverToIteratorSink<T, U> it = run(handle);
    Subscription c = observable.subscribe(it);
    // this won't add C if the handle is already closed
    handle.add(c);
    return it;
}
 

实例 9


public static void loadMultiArtwork(
        ArtworkRequestManager requestor,
        CompositeSubscription cs,
        AnimatedImageView artwork,
        AnimatedImageView artwork2,
        AnimatedImageView artwork3,
        AnimatedImageView artwork4,
        List<ArtInfo> artInfos,
        ArtworkType artworkType
) {
    final int num = artInfos.size();
    if (artwork != null) {
        if (num >= 1) {
            cs.add(requestor.newRequest(artwork, null, artInfos.get(0), artworkType));
        } else {
            artwork.setDefaultImage(R.drawable.default_artwork);
        }
    }
    if (artwork2 != null) {
        if (num >= 2) {
            cs.add(requestor.newRequest(artwork2, null, artInfos.get(1), artworkType));
        } else {
            // never get here
            artwork2.setDefaultImage(R.drawable.default_artwork);
        }
    }
    if (artwork3 != null) {
        if (num >= 3) {
            cs.add(requestor.newRequest(artwork3, null, artInfos.get(2), artworkType));
        } else if (num >= 2) {
            //put the second image here, first image will be put in 4th spot to crisscross
            cs.add(requestor.newRequest(artwork3, null, artInfos.get(1), artworkType));
        } else {
            // never get here
            artwork3.setDefaultImage(R.drawable.default_artwork);
        }
    }
    if (artwork4 != null) {
        if (num >= 4) {
            cs.add(requestor.newRequest(artwork4, null, artInfos.get(3), artworkType));
        } else if (num >= 2) {
            //3 -> loopback, 2 -> put the first image here for crisscross
            cs.add(requestor.newRequest(artwork4, null, artInfos.get(0), artworkType));
        } else {
            //never get here
            artwork4.setDefaultImage(R.drawable.default_artwork);
        }
    }
}
 

实例 10


public void openDirectory(final String path, final boolean restorePosition) {
    if (!mIsInitialized) {
        addToPendingList(new Runnable() {
            @Override
            public void run() {
                openDirectory(path, restorePosition);
            }
        });
        return;
    }
    if (mSubscription == null) {
        mSubscription = new CompositeSubscription();
    }
    mOpenDirectoryAction.setPath(path);
    mOpenDirectoryObserver.init(path, restorePosition);
    Subscription subscription = mOpenDirectoryObservable.subscribeOn(Schedulers.computation()).
            observeOn(AndroidSchedulers.mainThread()).subscribe(mOpenDirectoryObserver);
    mSubscription.add(subscription);
}
 

实例 11


@Override
public void bind(@Nonnull MainPresenter.AdapterItem item) {
    text.setText(item.text());
    date.setText(timeInstance.format(new Date(item.publishTime())));
    details.setText(item.details());
    details.setVisibility(item.details() == null ? View.GONE : View.VISIBLE);
    if (subscription != null) {
        subscription.unsubscribe();
    }
    subscription = new CompositeSubscription(
            ViewObservable.clicks(text).subscribe(item.clickObserver())
    );
}
 

实例 12


@Override
public void onPause() {
    super.onPause();
    Timber.d("OnPause Subscriptions");
    subscriptions.unsubscribe();
    subscriptions.clear();
    subscriptions = new CompositeSubscription();
}
 

实例 13


@Test
public void testNestedTrampolineWithUnsubscribe() {
    final ArrayList<String> workDone = new ArrayList<String>();
    final CompositeSubscription workers = new CompositeSubscription();
    Worker worker = Schedulers.trampoline().createWorker();
    try {
        workers.add(worker);
        worker.schedule(new Action0() {
            @Override
            public void call() {
                workers.add(doWorkOnNewTrampoline("A", workDone));
            }
        });
        final Worker worker2 = Schedulers.trampoline().createWorker();
        workers.add(worker2);
        worker2.schedule(new Action0() {
            @Override
            public void call() {
                workers.add(doWorkOnNewTrampoline("B", workDone));
                // we unsubscribe worker2 ... it should not affect work scheduled on a separate Trampline.Worker
                worker2.unsubscribe();
            }
        });
        assertEquals(6, workDone.size());
        assertEquals(Arrays.asList("A.1", "A.B.1", "A.B.2", "B.1", "B.B.1", "B.B.2"), workDone);
    } finally {
        workers.unsubscribe();
    }
}
 

实例 14


private void disconnectNow() {
    lock.lock();
    try {
        if (subscriptionCount.get() == 0) {
            baseSubscription.unsubscribe();
            // need a new baseSubscription because once
            // unsubscribed stays that way
            baseSubscription = new CompositeSubscription();
        }
    } finally {
        lock.unlock();
    }
}
 

实例 15


@Override
public Subscriber<? super R> call(final Subscriber<? super R> child) {
    final CompositeSubscription s = new CompositeSubscription();
    // if the child unsubscribes we unsubscribe our parent as well
    child.add(s);
    /*
     * Define the action to perform on timeout outside of the TimerListener to it can capture the HystrixRequestContext
     * of the calling thread which doesn't exist on the Timer thread.
     */
    final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {
        @Override
        public void run() {
            child.onError(new HystrixTimeoutException());
        }
    });
    TimerListener listener = new TimerListener() {
        @Override
        public void tick() {
            // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
            // otherwise it means we lost a race and the run() execution completed
            if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                // report timeout failure
                originalCommand.metrics.markTimeout(System.currentTimeMillis() - originalCommand.invocationStartTime);
                // we record execution time because we are returning before 
                originalCommand.recordTotalExecutionTime(originalCommand.invocationStartTime);
                // shut down the original request
                s.unsubscribe();
                timeoutRunnable.run();
            }
        }
        @Override
        public int getIntervalTimeInMilliseconds() {
            return originalCommand.properties.executionTimeoutInMilliseconds().get();
        }
    };
    final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
    // set externally so execute/queue can see this
    originalCommand.timeoutTimer.set(tl);
    /**
     * If this subscriber receives values it means the parent succeeded/completed
     */
    Subscriber<R> parent = new Subscriber<R>() {
        @Override
        public void onCompleted() {
            if (isNotTimedOut()) {
                // stop timer and pass notification through
                tl.clear();
                child.onCompleted();
            }
        }
        @Override
        public void onError(Throwable e) {
            if (isNotTimedOut()) {
                // stop timer and pass notification through
                tl.clear();
                child.onError(e);
            }
        }
        @Override
        public void onNext(R v) {
            if (isNotTimedOut()) {
                child.onNext(v);
            }
        }
        private boolean isNotTimedOut() {
            // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
            return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
                    originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
        }
    };
    // if s is unsubscribed we want to unsubscribe the parent
    s.add(parent);
    return parent;
}
 
讨论
淘淘あ西西 profile image