栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

ObserveOn和SubscribeOn-完成工作的地方

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

ObserveOn和SubscribeOn-完成工作的地方

有关

SubscribeOn
和的信息有很多误导性
ObserveOn

摘要

  • SubscribeOn
    拦截调用的单个方法
    IObservable<T>
    ,这是
    Subscribe
    和呼叫到
    Dispose
    IDisposable
    手柄返回通过
    Subscribe
  • ObserveOn
    拦截调用的方法
    IObserver<T>
    ,其是
    OnNext
    OnCompleted
    OnError
  • 这两种方法都会导致在指定的调度程序上进行相应的调用。

分析与示范

该声明

ObserveOn设置在订阅处理程序中执行代码的位置:

比帮助更令人困惑。您所说的“订阅处理程序”实际上是一个

OnNext
处理程序。请记住,
Subscribe
方法
IObservable
接受一个
IObserver
OnNext
OnCompleted
OnError
方法,但扩展方法提供了接受lambda表达式,并建立一个方便重载
IObserver
实现你。

不过让我来修饰这个词。我认为“订阅处理程序”是在被调用时被调用 的可观察
代码

Subscribe
。这样,上面的描述更类似于的目的
SubscribeOn

SubscribeOn

SubscribeOn
导致
Subscribe
可观察的方法在指定的调度程序或上下文上异步执行。当您不想
Subscribe
在正在运行的任何线程上的可观察对象上调用该方法时,可以使用它-
通常是因为它可以长时间运行并且您不想阻塞调用线程。

调用时

Subscribe
,您正在调用一个可观察对象,它可能是一连串可观察对象的一部分。它只是可观察
SubscribeOn
到的效果。现在可能情况是链中的所有可观察对象将立即在同一线程上订阅-
并非必须如此。考虑一下
Concat
-仅在前一个流完成后才订阅每个连续的流,通常,这将发生在从前一个流调用的任何线程上
OnCompleted

因此,

SubscribeOn
在您的呼叫
Subscribe
与要订阅的可观察对象之间,拦截该呼叫并使它异步。

它还影响订阅的处置。

Subscribe
返回
IDisposable
用于退订的句柄。
SubscribeOn
确保
Dispose
在提供的调度程序上调度对的调用。

想了解什么,当混乱的公共点

SubscribeOn
确实是在
Subscribe
一个可观察的处理程序可能调用
OnNext
OnCompleted
OnError
在这同一线程。但是,其目的不是影响这些调用。在
Subscribe
方法返回之前完成流的情况并不少见。
Observable.Return
例如,这样做。让我们来看看。

如果使用我编写的Spy方法,并运行以下代码:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);var source = Observable.Return(1).Spy("Return");source.Subscribe();Console.WriteLine("Subscribe returned");

您将得到以下输出(线程ID当然可能有所不同):

Calling from Thread: 1Return: Observable obtained on Thread: 1Return: Subscribed to on Thread: 1Return: onNext(1) on Thread: 1Return: onCompleted() on Thread: 1Return: Subscription completed.Subscribe returned

您可以看到整个订阅处理程序在同一线程上运行,并在返回之前完成。

让我们使用它

SubscribeOn
来异步运行它。我们将同时
Return
观察可观察者和
SubscribeOn
可观察者:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);var source = Observable.Return(1).Spy("Return");source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe();Console.WriteLine("Subscribe returned");

输出(我添加的行号):

01 Calling from Thread: 102 Return: Observable obtained on Thread: 103 SubscribeOn: Observable obtained on Thread: 104 SubscribeOn: Subscribed to on Thread: 105 SubscribeOn: Subscription completed.06 Subscribe returned07 Return: Subscribed to on Thread: 208 Return: onNext(1) on Thread: 209 SubscribeOn: onNext(1) on Thread: 210 Return: onCompleted() on Thread: 211 SubscribeOn: onCompleted() on Thread: 212 Return: Subscription completed.

01-主要方法在线程1上运行。

02-

Return
可观察对象在调用线程上求值。我们只是到达
IObservable
这里,没有任何订阅。

03-

SubscribeOn
可观察对象在调用线程上求值。

04-现在终于可以调用的

Subscribe
方法了
SubscribeOn

05-该

Subscribe
方法异步完成…

06-…并且线程1返回到main方法。 这就是SubscribeOn的作用!

07-同时,

SubscribeOn
将默认调度程序上的呼叫调度为
Return
。在这里它在线程2上被接收。

08 -作为

Return
呢,它调用
OnNext
的上
Subscribe
线…

09-

SubscribeOn
现在只是一个传递。

10,11-同样

OnCompleted

12-最后,所有

Return
订阅处理程序都已完成。

希望这能弄清目的和效果

SubscribeOn

观察

如果您认为

SubscribeOn
作为一个拦截器
Subscribe
是通过调用方法上不同的线程,然后
ObserveOn
做同样的工作,但对于
OnNext
OnCompleted
OnError
电话。

回想一下我们原来的例子:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);var source = Observable.Return(1).Spy("Return");source.Subscribe();Console.WriteLine("Subscribe returned");

给出了以下输出:

Calling from Thread: 1Return: Observable obtained on Thread: 1Return: Subscribed to on Thread: 1Return: onNext(1) on Thread: 1Return: onCompleted() on Thread: 1Return: Subscription completed.Subscribe returned

现在让我们改变它来使用

ObserveOn

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);var source = Observable.Return(1).Spy("Return");source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe();Console.WriteLine("Subscribe returned");

我们得到以下输出:

01 Calling from Thread: 102 Return: Observable obtained on Thread: 103 ObserveOn: Observable obtained on Thread: 104 ObserveOn: Subscribed to on Thread: 105 Return: Subscribed to on Thread: 106 Return: onNext(1) on Thread: 107 ObserveOn: onNext(1) on Thread: 208 Return: onCompleted() on Thread: 109 Return: Subscription completed.10 ObserveOn: Subscription completed.11 Subscribe returned12 ObserveOn: onCompleted() on Thread: 2

01-主要方法在线程1上运行。

02-和以前一样,

Return
可观察对象在调用线程上求值。我们只是到达
IObservable
这里,没有任何订阅。

03-

ObserveOn
可观察对象也在调用线程上评估。

04-现在我们再次在调用线程上订阅

ObserveOn
可观察的对象…

05-…然后将呼叫转接到

Return
可观察对象。

06-现在

Return
调用
OnNext
Subscribe
处理程序。

07- 这是的效果

ObserveOn
我们可以看到,该
OnNext
线程是在线程2上异步调度的。

08-同时

Return
调用
OnCompleted
线程1 …

09-And

Return
的订阅处理程序完成…

10-然后

ObserveOn
订阅处理程序也是如此…

11-将控制权返回给main方法

12-同时,

ObserveOn
已将
Return
OnCompleted
调用移至线程2。在09-11期间的任何时候都可能发生,因为它异步运行。碰巧的是,它现在终于被调用了。

典型的用例是什么?

SubscribeOn
当您需要
Subscribe
长时间运行的可观察对象并希望尽快离开调度程序线程时,通常会在GUI中看到它的使用-
也许是因为您知道它是在订阅处理程序中完成所有工作的那些可观察对象之一。在可观察链的末尾应用它,因为这是您订阅时调用的第一个可观察物。

ObserveOn
当您想要确保时,您最经常会在GUI中看到它的使用
OnNext
OnCompleted
并将
OnError
调用编组回调度程序线程。在可观察链的末尾应用它,以尽可能快地过渡回来。

希望你可以看到,回答你的问题是,

ObserveOnDispatcher
将没有任何区别的线程
Where
SelectMany
执行上-
这一切都取决于什么的线程
从美其名曰!流的订阅处理程序将调用线程上调用,但它不可能说在哪里
Where
,并
SelectMany
会不知道如何运行
stream
实现。

寿命超过订阅调用的可观察值

到目前为止,我们一直在专门研究

Observable.Return
Return
Subscribe
处理程序中完成其流。这不是典型的情况,但是流超出
Subscribe
处理程序的寿命也很普遍。看
Observable.Timer
例如:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");source.Subscribe();Console.WriteLine("Subscribe returned");

这将返回以下内容:

Calling from Thread: 1Timer: Observable obtained on Thread: 1Timer: Subscribed to on Thread: 1Timer: Subscription completed.Subscribe returnedTimer: onNext(0) on Thread: 2Timer: onCompleted() on Thread: 2

你可以清楚地看到认购完成后

OnNext
OnCompleted
正在对不同的线程以后调用。

请注意,对于选择并在哪个线程或调度程序上进行调用的任何组合, 都不 会对

SubscribeOn
ObserveOn
产生 任何影响
Timer``OnNext``OnCompleted

当然,您可以使用

SubscribeOn
确定
Subscribe
线程:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe();Console.WriteLine("Subscribe returned");

(我故意更改为

NewThreadScheduler
此处,以防止在
Timer
碰巧获得与相同的线程池线程的情况下的混乱
SubscribeOn

给予:

Calling from Thread: 1Timer: Observable obtained on Thread: 1SubscribeOn: Observable obtained on Thread: 1SubscribeOn: Subscribed to on Thread: 1SubscribeOn: Subscription completed.Subscribe returnedTimer: Subscribed to on Thread: 2Timer: Subscription completed.Timer: onNext(0) on Thread: 3SubscribeOn: onNext(0) on Thread: 3Timer: onCompleted() on Thread: 3SubscribeOn: onCompleted() on Thread: 3

在这里,您可以清楚地看到线程(1)上的主线程在

Subscribe
调用之后返回,但是
Timer
预订获得了自己的线程(2),但是
OnNext
and
OnCompleted
调用在线程(3)上运行。

现在,对于

ObserveOn
,我们将代码更改为(对于代码中的后续代码,请使用nuget包rx-wpf):

var dispatcher = Dispatcher.CurrentDispatcher;Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");source.ObserveonDispatcher().Spy("ObserveOn").Subscribe();Console.WriteLine("Subscribe returned");

这段代码有些不同。第一行确保我们有一个调度程序,并且我们也引入了

ObserveOnDispatcher
-这就像
ObserveOn
,除了它指定我们应该使用对
求值
DispatcherScheduler
的任何线程
ObserveOnDispatcher

此代码提供以下输出:

Calling from Thread: 1Timer: Observable obtained on Thread: 1ObserveOn: Observable obtained on Thread: 1ObserveOn: Subscribed to on Thread: 1Timer: Subscribed to on Thread: 1Timer: Subscription completed.ObserveOn: Subscription completed.Subscribe returnedTimer: onNext(0) on Thread: 2ObserveOn: onNext(0) on Thread: 1Timer: onCompleted() on Thread: 2ObserveOn: onCompleted() on Thread: 1

请注意,调度程序(和主线程)是线程1。

Timer
它仍在调用
OnNext
OnCompleted
在其选择的线程(2)上进行-
但是,
ObserveOnDispatcher
正在将编组的调用调回到调度程序线程(线程1)上。

还要注意,如果我们要阻塞调度程序线程(用表示

Thread.Sleep
),您会看到
ObserveOnDispatcher
将阻塞(此代码在LINQPad
main方法内效果最好):

var dispatcher = Dispatcher.CurrentDispatcher;Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");source.ObserveonDispatcher().Spy("ObserveOn").Subscribe();Console.WriteLine("Subscribe returned");Console.WriteLine("Blocking the dispatcher");Thread.Sleep(2000);Console.WriteLine("Unblocked");

然后您将看到如下输出:

Calling from Thread: 1Timer: Observable obtained on Thread: 1ObserveOn: Observable obtained on Thread: 1ObserveOn: Subscribed to on Thread: 1Timer: Subscribed to on Thread: 1Timer: Subscription completed.ObserveOn: Subscription completed.Subscribe returnedBlocking the dispatcherTimer: onNext(0) on Thread: 2Timer: onCompleted() on Thread: 2UnblockedObserveOn: onNext(0) on Thread: 1ObserveOn: onCompleted() on Thread: 1

随着通话的通过,

ObserveOnDispatcher
只有一旦
Sleep
运行就能够下车。

关键点

请记住,Reactive Extensions本质上是一个自由线程库,它会尽其所能在其运行的线程上尽可能地保持惰性-
您必须故意干预

ObserveOn
SubscribeOn
并将特定的调度程序传递给接受它们进行更改的运算符,这对您很有用这个。

Observable的使用者无法做任何事情来控制它在内部执行的操作

ObserveOn
SubscribeOn
而装饰器将观察者和Observables的表面积包装起来,以封送跨线程的调用。希望这些例子已经清楚了。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/415352.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号