如果我理解正确,则需要缓冲,直到某些谓词允许基于项目为止。您可以使用一组复杂的运算符来执行此操作,但编写自定义运算符可能更容易:
public final class BufferUntil<T> implements Operator<List<T>, T>{ final Func1<T, Boolean> boundaryPredicate; public BufferUntil(Func1<T, Boolean> boundaryPredicate) { this.boundaryPredicate = boundaryPredicate; } @Override public Subscriber<? super T> call( Subscriber<? super List<T>> child) { BufferWhileSubscriber parent = new BufferWhileSubscriber(child); child.add(parent); return parent; } final class BufferWhileSubscriber extends Subscriber<T> { final Subscriber<? super List<T>> actual; List<T> buffer = new ArrayList<>(); public BufferWhileSubscriber( Subscriber<? super List<T>> actual) { this.actual = actual; } @Override public void onNext(T t) { buffer.add(t); if (boundaryPredicate.call(t)) { actual.onNext(buffer); buffer = new ArrayList<>(); } } @Override public void onError(Throwable e) { buffer = null; actual.onError(e); } @Override public void onCompleted() { List<T> b = buffer; buffer = null; if (!b.isEmpty()) { actual.onNext(b); } actual.onCompleted(); } }}


