栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Reactor编程之旅

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Reactor编程之旅

文章目录
    • lamda与FunctionalInterface
    • Reactive Programming、Reactive Streams和Reactor
    • Thread per Connection 和 Reactor in Single Thread
    • spring mvc和spring webflux
    • Reactor使用
    • 总结
    • 写在最后

lamda与FunctionalInterface

lamda表达式,其本质是一段函数,确切的说是一段闭包。我们当然可以通过定义普通的函数来代替lamda,但是lamda形式的函数使得函数的定义更加灵活,同时可以让你的代码看起来很简洁。

在java8以前,如果想使用lamda表达式(确切的说是像使用lamda表达式那样取调用一个函数),只能使用接口+匿名类的方式:

    // 定义一个接口
    interface TestInterface {
        void method();
    }

    static void main(String[] args) {
        // 定义接口的匿名实现
        TestInterface testInterface = new TestInterface() {
            @Override
            public void method() {
                System.out.println("hello world");
            }
        };

        // 调用接口
        testInterface.method();
    }

从Java8开始,引入了lamda表达式和functioninterface。所谓functioninterface,是指当接口中有且只有一个需要实现的函数时,可以给此接口加上@FunctionalInterface注解,同时此接口可以采用lamda的形式进行实现。

使用lamda表达式:

    // 定义一个接口
    @FunctionalInterface//注意这里添加了注解
    interface TestInterface {
        void method();
    }

    static void main(String[] args) {
        // 定义接口的匿名实现
        TestInterface testInterface = () -> {
            System.out.println("hello world");
        };

        // 调用接口
        testInterface.method();
    }

@FunctionalInterface注解严格来说更像一种标识,用来标识当前接口符合FunctionalInterface规范,因此如果接口本身已经符合了 FunctionalInterface规范,是可以不用添加@FunctionalInterface注解的。因此上面示例中的@FunctionalInterface注解是可以省略的。

java8在引入FunctionalInterface的同时,还将一些高频使用的场景进行了封装,这些接口都位于java.util.function路径下,例如:

public interface Consumer:只有一个参数的函数

public interface BiConsumer:两个参数的函数

public interface Function:一个入参,一个返回值的函数

public interface Supplier:只有一个返回值的函数

其他更多的示例可以直接查看源码,这些基本能覆盖80%以上的场景。

同时,一些Java以前的接口,例如Runnable和Callable,也都添加了@FunctionalInterface注解,因此在使用这些接口时,可以直接使用lamda表达式去编写他们的实现。


Reactive Programming、Reactive Streams和Reactor

Reactive Programming,中文称反应式编程,是一种高性能应用的编程方式。其最早是由微软提出并引入到 .NET 平台中,随后 ES6 也引入了类似的技术( Promise编程)。在 Java 平台上,较早采用反应式编程技术的是 Netflix 公司开源的 RxJava 框架。现在大家比较熟知的 Hystrix 就是以 RxJava 为基础开发的。

和Reactive Programming对应的就是Imperative Programming(指令式编程),我们常用的 Java、Python 等语言写代码的常见编程风格即为指令式编程,此风格的特点是代码执行顺序和编写顺序基本一致。

Reactive Programming也可以称为Observable模式(可以类比观察者模式),Imperative Programming可以称为Iterable模式,对应的前者即为推模式,后者为拉模式,推的是事件,拉的是指令。

在Java 平台上,Netflix、TypeSafe、Pivatol共同制定了一个被称为 Reactive Streams 的项目规范,用于制定反应式编程相关的规范以及接口,此规范主要的接口有这三个:

  • Publisher
  • Subscriber
  • Subcription

其中,Subcriber 主要包含了 onNext、onError、onCompleted 这三个方法。对于 Reactive Streams,大家只需要理解其思想就可以。

在Spring 5中,作为在背后支持其反应式编程的框架 Reactor,onNext对应Reactor的doonNext()方法;onError对应Reactor的onErrorContinue()、onErrorResume()、onErrorReturn()方法;onCompleted对应Reactor的doonSuccess()方法。

一句话,Reactive Programming是一种编程方式;Reactive Streams是针对Reactive Programming制定的编程规范;Reactor是实现了Reactive Streams的编程框架(有具体的组件包),这些为spring webflux提供了组件基础。


Thread per Connection 和 Reactor in Single Thread

在操作系统中,进程(process)是线程(thread)的集合,线程是进程中的最小执行单位。在处理并发时,我们有两种编程思路,一个是多线程方式,一个是协程方式(python里的概念,这里我想不到更好的名词了)。

  • Thread per Connection:即一请求一线程,为每个请求分配一个线程,此线程负责请求的执行,当请求执行结束时,线程也随之结束。在没有nio之前,这是传统的java网络编程和servlet等所采用的即为线程模型。此方案的优点即实现简单,缺点则是方案的伸缩性受到线程数的限制。
  • Reactor in Single Thread:即为将需要执行的多个任务放置到一个队列中,并通过事件驱动的方式(通常做法),将每个任务交由某个线程去执行。在此方式下,即使只有一个线程,也可以实现一种伪多线程(参考python的协程),此方式的实现方式之一即为IO多路复用。此方案的优点是不受线程数的限制,且适合于CPU资源紧张的应用上。基于nio的mina、netty等框架,就是使用的此方式;缺点是受限于使用场景,仅适合于IO密集的应用,不太适合CPU密集的应用。

spring mvc和spring webflux

先说springmvc,springmvc刚诞生时,servlet 大行其道(现在也是),所以springmvc是完全基于servlet的。servlet的设计思路即为内部启动了一个线程池,池内会有一定数量的空闲线程,当有http请求进入时,servlet会从线程池中获取一空闲线程负责http请求的执行。因此springmvc的并发数极容易受线程池容量的限制,当请求数超过线程池的容量时,会直接导致请求无法被处理。因此在高并发场景下,如果使用springmvc,只能通过增加线程池上限(但线程数量又受cpu核心数和操作系统允许的open files数量限制)和扩展物理机器的方式来增加整个系统的吞吐量。其实对于servlet来将,其自身是典型的io密集型而非cpu密集型,是不需要启动很多线程的,在这种背景下spring webflux应运而生。

Spring 5中引入了一个基于 Netty 而不是 Servlet 的高性能的 Web 框架,Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高。spring webflux将Netty和Reactor集成在一起用于处理web请求(所以说spring webflux的诞生离不开nio和Reactive Programming),能充分发挥两者的优势,从而极大增加框架的吞吐量(业务自身的耗时不在讨论之列)。
我们先来看以下两者的编程区别,当我们使用springmvc时,通常的编码是这样的:

@RestController
@RequestMapping("/test")
public class TestController {

    @RequestMapping("/hello")
    public String demo(){
        return "hello world";
    }

}

当我们使用spring webflux时,通常的编码是这样的:

@RestController
@RequestMapping("/test")
public class TestController {

    @RequestMapping(value = "/foobar")
    public Mono demo() {
        return Mono.just("hello world");
    }

}

两者的变化即为由之前的直接返回目标对象改为返回一个Mono/Flux对象(Mono和Flux的介绍在后面)。

这里可以看到,当我们使用Reactor编程时,大部分情况下并不需要直接接触Publisher、Subscriber、Subcription等接口,因为这些接口已经被spring webflux封装了,我们只需要构建我们的Mono/Flux对象即可,spring webflux会自动把Mono/Flux对象所包含的任务推送到任务队列中并被Reactor和Netty处理。

此外,当我们使用feign进行服务调用时,在springmvc框架下和spring webflux框架下,所使用的feign是不同的。
在springmvc框架下,通常使用spring-cloud-starter-openfeign(内部实现是feign+ribbon)来进行http服务调用;在spring webflux下,目前还没有正式的框架可以使用,不过有一个官方还为正式发布的可以使用feign-reactor-spring-cloud-starter,不过目前此项目还在孵化器中。


Reactor使用

具体的说是Mono和Flux的使用,先放一段Reactor中关于Mono和Flux的介绍:

  • Mono

A Reactive Streams Publisher with basic rx operators that completes successfully by emitting an element, or with an error.
The recommended way to learn about the Mono API and discover new operators is through the reference documentation, rather than through this javadoc (as opposed to learning more about individual operators). See the “which operator do I need?” appendix
The rx operators will offer aliases for input Mono type to preserve the “at most one” property of the resulting Mono. For instance flatMap returns a Mono, while there is a flatMapMany alias with possibly more than 1 emission.
Mono should be used for Publisher that just completes without any value.
It is intended to be used in implementations and return types, input parameters should keep using raw Publisher as much as possible.
Note that using state in the java.util.function / lambdas used within Mono operators should be avoided, as these may be shared between several Subscribers.

  • Flux

A Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes (successfully or with an error).
The recommended way to learn about the Flux API and discover new operators is through the reference documentation, rather than through this javadoc (as opposed to learning more about individual operators). See the “which operator do I need?” appendix .
It is intended to be used in implementations and return types. Input parameters should keep using raw Publisher as much as possible.
If it is known that the underlying Publisher will emit 0 or 1 element, Mono should be used instead.
Note that using state in the java.util.function / lambdas used within Flux operators should be avoided, as these may be shared between several Subscribers.
subscribe(CoreSubscriber) is an internal extension to subscribe(Subscriber) used internally for Context passing. User provided Subscriber may be passed to this “subscribe” extension but will loose the available per-subscribe Hooks.onLastOperator.

通俗来说,Mono就是Reactor中一个基础的任务单元,或者说是一段lamda表达式指令。所以我们查看Mono的各种方法时,入参基本都是FunctionalInterface,这这段指令内,我们可以接收若干参数,同时返回若干结果值;甚至可以返回另一个Mono对象。

Flux是Mono的集合,Mono是Reactor中的基础任务单元,Flux就是若干个基础单元的集合。因此Flux更适合处理并发任务(作为类比,Mono是串行的)。

下面介绍Mono的一些创建方式。

从对象创建:

Mono.just("hello world")//从字符串创建一个Mono对象,当任务被执行时,会返回此字符串
.subscribe();
Mono.just(123)//从int创建一个Mono对象
.subscribe();
Mono.empty()//创建一个不返回任何值的Mono对象,函数原型为`public static  Mono empty()`,因此返回值可以被当作任何类型
.subscribe();

从Runnable、Future等对象创建,基本都是使用Mono.fromXxxx()方法创建

Mono.fromRunnable(()->{//从Runnable创建,这里使用了lamda表达式,当Mono被执行时,此lamda包裹的代码段会被执行
    System.out.println("hello world");
});
Mono.fromCallable(() -> {//从Callable创建,和Runnable的区别是带返回值
    return "Hello world";
})
.subscribe();

创建一个Mono,且这个Mono会创建另一个Mono,此场景使用还是比较多的

Mono.defer(()->{
    return Mono.just("hello world");//注意这里返回了一个Mono,而非其他类型
})
.subscribe();

在上一个Mono处理完后,如果需要处理另一个Mono,可以使用Mono.then()方法进行连接:public final Mono then(Mono other)

Mono.empty()
    .then(Mono.fromRunnable(() -> {
        System.out.println("hello world");
    }))
.subscribe();

接收参数(消费者)

Mono.just("hello world")
        .doonNext((e) -> {//e的类型为Mono.just()返回的类型
            System.out.println(e);//打印hello world
        })
        .doonNext((e) -> {//这里调用了两次doOnNext,因为变量会在doOnNext间传递
            System.out.println(e);//打印hello world
        })
.subscribe();

变量转换(变量变形,就是编程中的map操作)

Mono.just("hello world")
        .map((e) -> {
            System.out.println(e);//打印hello world
            return "simon's dream";//这里返回了一个新的变量
        })
        .doonNext((e) -> {
            System.out.println(e);//打印simon's dream
        })
.subscribe();
Mono.just("hello world")
        .flatMap((e) -> {//这里使用了flatMap,flatMap和map的区别是,map是直接转换,flatMap是返回一个包裹了新值的Mono
            System.out.println(e);//打印hello world
            return Mono.just("simon's dream");
        })
        .doonNext((e) -> {
            System.out.println(e);//打印simon's dream
        })
.subscribe();

缓存。当调用Mono.cache()方法后,当前产生的值会被缓存下来,当有新的订阅者加入后,Mono任务会从调用了cache()方法的位置开始执行,而非从头开始

Mono a1 = Mono.defer(() -> {
    System.out.println("start");
    return Mono.just("a");
});

a1.subscribe((e) -> {
    System.out.println("s1:" + e);
});
a1.subscribe((e) -> {
    System.out.println("s2:" + e);
});
 System.out.println("==============================");
a1 = a1.cache()
;

a1.subscribe((e) -> {
    System.out.println("s21:" + e);
});
a1.subscribe((e) -> {
    System.out.println("s22:" + e);
});

输出:

start < - 输出start

s1:a

start < - 输出start

s2:a

==============================

start < - 输出start

s21:a

s22:a

可以看到,当调用了cache后,新的subscribe会从cache处开始执行

异常处理

Mono.just("hello world")
        .map((e) -> {
            throw new RuntimeException("这是手动扔出的异常");
        }).doonError(e -> {//doOnError会拦截到异常,但不会捕获异常,这里可以打印异常的信息,同时异常会被继续向上抛
            e.printStackTrace();
        }).onErrorResume(e -> {//onErrorResume会捕获异常,同时产生一个新的Mono继续后续其他操作
            e.printStackTrace();
            return Mono.just("simon's dream");
        }).doonNext(e -> {
            System.out.println(e);//打印simon's dream
        }).doonSuccess(e -> {
            System.out.println(e);//打印simon's dream
        })
;

repeat,如果循环执行某个Mono,可以使用repeatWhen(),repeatWhen()会根据条件判断是否循环执行

int count = 0;
Mono.fromRunnable(() -> {
            System.out.println("hello world");//会被打印3次
        })
        .repeatWhen(Repeat.onlyIf((e) -> {
            count++;
            return count < 3;
        }))
        .subscribe()
;

retry,如果需要在出现error时自动重试,可使用retryWhen()

int count = 0;
Mono.fromRunnable(() -> {
            System.out.println("hello world");//会打印三次
            throw new RuntimeException("手动扔出的异常");
        })
        .retryWhen(Retry.onlyIf((e) -> {
            count++;
            return count < 3;
        }))
        .onErrorResume((e) -> {
            System.out.println(e.getMessage());//打印一次
            return Mono.empty();
        })
        .subscribe()
;

filter 过滤,对上一步产生的值进行判断,并返回true/false。当返回true时,后续的的doOnNext可以正常执行;当返回false时,后续的doOnNext会被忽略,其他不需要接收参数的任务不受影响,例如then等

Mono.just(10)
        .filter((e) -> {
            return e > 5;
        })
        .doonNext((e) -> {
            System.out.println("after filter1:" + e);//正常输出
        })
        .filter((e) -> {
            return e < 5;
        })
        .doonNext((e) -> {
            System.out.println("after filter2:" + e);//没有输出
        })
        .then(Mono.fromRunnable(() -> {
            System.out.println("then");//正常输出
        }))
        .subscribe()
;

doOnSuccess、doOnError、doFinally

当所有mono正常执行完毕,没有出现任何异常时,doOnSuccess会被执行;当有任何异常且异常未被捕获时,doOnError会被执行(注意doOnError自身并不会捕获异常);doFinally在任何条件下都会执行

Mono.just("hello world")
        .doonSuccess((e) -> {
            System.out.println(e);//打印hello world
        })
        .doFinally((e) -> {
            System.out.println("doFinally");//打印doFinally
        })
        .subscribe();
Mono.just("hello world")
        .doonNext((e) -> {
            System.out.println(e);//打印hello world
            throw new RuntimeException("手动扔出的异常");
        })
        .doonSuccess((e) -> {//因为出现了Exception,因此doOnSuccess并不会被执行
            System.out.println("doOnSuccess:" + e);
        })
        .doonError((e) -> {
            System.out.println(e.getMessage());//打印异常信息
        })
        .doFinally((e) -> {
            System.out.println("doFinally");//打印doFinally
        })
        .subscribe();

zip 并发。我们可以使用zip方法来同时执行多个Mono,并把他们的结果汇总在一起。注意以下多个Mono实际是并发执行的

Mono mono1 = Mono.just(1);
Mono mono2 = Mono.just(2);
Mono mono3 = Mono.just(3);

Mono.zip(mono1, mono2, mono3)
        .doonNext(e -> {
            System.out.println("结果:" + e);//输出[1,2,3],注意e是一个Tripple类型
        })
        .subscribe()

也可以用这种写法

Mono mono1 = Mono.just(1);
Mono mono2 = Mono.just(2);
Mono mono3 = Mono.just(3);

Mono.zip((e) -> {
            return e;
        }, mono1, mono2, mono3)
        .doonNext(e -> {
            System.out.println("结果:" + Arrays.asList(e));//输出[1,2,3],注意e是一个Object[]类型
        })
        .subscribe()
;

Mono的用法基本就是这些了,实际使用时就是这些用法的排列组合。

可以看到,Reactive Programming和Imperative Programming编程思路还是相差很大的,Reactive就是各种代码段的组合,然后变量会在各个代码段之间流动;Imperative Programming则是定义各个方法,然后各个方法之间互相调用。因此在使用Reactor时,需要考虑清除业务的处理流程,然后将整个业务流程拆分成一段一段的过程,然后用Mono去实现。

在来说说Flux,首先先看一下Mono和Flux的解释:

  • Mono

  • FLux

可以看到,Mono每次是对单个任务进行操作,Flux是对多个任务同时进行操作,因此Flux更适合多元素处理的场景。

这里先用前面的filter来演示一下Flux是运作的

Flux.just(1,2,3,4,5,6,7,8,9)//这里可以理解为同时产生了多个Mono,且每个Mono包裹了一个元素,每个Mono会被分别交给后续的任务进行处理
        .filter((e) -> {//每个元素都会被此过滤器过滤,其中通过的元素会流动到后续其他任务
            return e > 5;
        })
        .collectList()//收集所有元素并产生一个List
        .doonNext((list) -> {//这里的入参是一个List
            System.out.println("after filter1:" + list);//输出 after filter1:[6, 7, 8, 9]
        })
        .then(Mono.fromRunnable(() -> {
            System.out.println("then");//输出then
        }))
        .subscribe()
;

Flux的另一个特点是能对结果集执行reduce操作

List list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
Set set = new HashSet();

Flux.fromIterable(list)
        .reduce(set, (initial, e) -> {//将每个元素过滤并添加到Set中
            if (e > 5) {
                initial.add(e);
            }
            return initial;
        })
        .doonNext(e -> {//e是Set类型
            System.out.println("结果:" + e);//输出:[6, 7, 8, 9]
        })
        .subscribe()
;

我们在了解了这些后,就可以在spring webflux编写Reactor风格的程序了。

总结

本文开始先介绍了lamda与FunctionalInterface,因为在Reactor中大量用到了它们;然后介绍了Reactive Programming、Reactive Streams和Reactor的区别与联系,只有理解了这些才能理解Reactor编程的思路;之后对比了Thread per Connection 和 Reactor in Single Thread及spring mvc和spring webflux,用来对比当下比较流行的处理http请求的方式;在最后列举了Reactor中Mono和Flux的常见用法。

写在最后

Reactor在编程复杂度和代码可读性上,相比传统的方式更加的复杂,因此对编写者的要求较高,而且当Reactor程序出现问题时排查问题也较为困难。但Reactor在高并发场景下优势更加明显,其编写的程序吞吐量相对传统风格程序有质的提升。因此我们需要仔细评估可能带来的技术风险和对程序性能的提升,最终确定是否需要在项目中使用。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/281872.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号