如果我对您的理解正确,则需要使用类似的方法包装现有的内容
callExternalUrl
static Observable<String> callExternalUrlAsync(String url, String json, HttpMethod method){ return Observable.fromCallable(() -> callExternalUrl(url, json, method)) .subscribeOn(Schedulers.io()) .flatMap(re -> { if (re.hasBody()) return Observable.just(re.getBody()); else return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode())); }, e -> Observable.error(e), (Func0<Observable<? extends String>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-( .observeOn(Schedulers.computation());}代码的简短说明:
- 它计划
callExternalUrl
在Schedulers.io
- 对
ResponseEntity<T>
成功案例T
和错误案例的转换最少。它也发生在io
调度程序上,但是并不重要,因为它确实很短。(如果内部存在异常callExternalUrl
,则按原样传递。) - 使订阅者执行结果
Schedulers.computation
注意事项 :
- 您可能想同时使用自定义计划程序
subscribeOn
和observeOn
- 您可能希望在传递的第一个lambda中具有更好的逻辑,
flatMap
以区分成功和错误,并且绝对希望有一些更具体的异常类型。
高阶魔术
如果您愿意使用高阶函数并牺牲一点性能来减少重复代码,则可以执行以下操作:
// Universal wrapper methodstatic <T> Observable<T> wrapCallExternalAsAsync(Func3<String, String, HttpMethod, ResponseEntity<T>> externalCall, String url, String json, HttpMethod method){ return Observable.fromCallable(() -> externalCall.call(url, json, method)) .subscribeOn(Schedulers.io()) .flatMap(re -> { if (re.hasBody()) return Observable.just(re.getBody()); else return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode())); }, e -> Observable.error(e), (Func0<Observable<? extends T>>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-( .observeOn(Schedulers.computation());}static Observable<String> callExternalUrlAsync_HigherOrder(String url, String json, HttpMethod method){ return wrapCallExternalAsAsync(MyClass::callExternalUrl, url, json, method);}哪里
MyClass是你的地方
callExternalUrl是。
更新 (仅异步调用)
私有静态RxClient httpClient = Rx.newClient(RxObservableInvoker.class);
//在这里,您可以传递自定义ExecutorService
private <T> Observable<String> executeHttpAsync(String url, String httpMethod, Entity<T> entity) { return httpClient.target(url) .request() .headers(httpHeaders) // assuming httpHeaders is something global as in your example .rx() .method(httpMethod, entity) .map(resp -> { if (200 != resp.getStatus()) { throw new RuntimeException("Bad status pre " + resp.getStatus()); } else { if (!resp.hasEntity()) { // return null; // or error? throw new RuntimeException("Empty response"); // or empty? } else { try { return resp.readEntity(String.class); } catch (Exception ex) { throw new RuntimeException(ex); // wrap exception into unchecked } } } }) .observeOn(Schedulers.computation());}private Observable<String> executeGetAsync(String url) { return executeHttpAsync(url, "GET", null);}private Observable<String> executePostAsync(String url, String json) { return executeHttpAsync(url, "POST", Entity.json(json));}同样,类似的 警告 适用:
- 您可能想将自定义调度程序用于
newClient
呼叫和observeOn
- 您可能希望有一些更好的逻辑来处理错误,而不仅仅是检查它是否为HTTP 200,并且肯定需要一些更特定的异常类型。但这都是特定于业务逻辑的,因此由您决定。
另外,从您的示例中还不清楚请求(
HttpEntity)的主体是如何构建的,以及您是否始终
String像原始示例中一样总是希望作为响应。我还是照原样复制了您的逻辑。如果您还需要其他内容,则可能应该参考https://jersey.java.net/documentation/2.25/media.html#json上的文档



