我认为您需要这样的东西(scala中给出的示例)
import rx.lang.scala.{Observable, Subscriber}case class Message(message: String)trait MessageCallback { def onMessage(message: Message)}object LibraryObject { def setCallback(callback: MessageCallback): Unit = { ??? } def removeCallback(callback: MessageCallback): Unit = { ??? } def start(): Unit = { ??? }}def messagesSource: Observable[Message] = Observable((subscriber: Subscriber[Message]) ⇒ { val callback = new MessageCallback { def onMessage(message: Message) { subscriber.onNext(message) } } LibraryObject.setCallback(callback) subscriber.add { LibraryObject.removeCallback(callback) } })至于阻塞/非阻塞
start():通常,基于回调的体系结构将回调订阅与进程启动分开。在这种情况下,您可以
messageSource完全独立
start()于进程的时间创建任意数量的。另外,是否分叉的决定完全取决于您。您的体系结构与此不同吗?
您还应该以某种方式完成该过程。最好是将
onCompleted处理程序添加到MessageCallback接口。如果要处理错误,请添加
onError处理程序。现在,您刚刚宣布了RxJava的基本构建基石,一个观察员 :-)



