栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

响应式编程基础 --- 发布订阅模式、JDK9: Flow

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

响应式编程基础 --- 发布订阅模式、JDK9: Flow

一、发布订阅模式 1.1 基本概念

反应式编程离不开发布-订阅模式

发布订阅模式有基本的三个概念:

  • 订阅者可以订阅发布者
  • 名为订阅的连接
  • 消息(也叫事件),它们通过连接传输
1.2 示例:对两个流求和

发布订阅 的一个简单例子就是整合两个信息源的事件并发布给其他用户使用

例如想象成电子表格:C3 = C1+C2这样一个公式,只要C1或者C2更新,C3也会更新这些变化

1、先对保存值的单元格进行建模:

private static class SimpleCell {

    private int value = 0;
    private String name;
    public SimpleCell(String name) {
        this.name = name;
    }
}

2、定义两个接口:

private interface Subscriber {
    void onNext(T t);
}

private interface Publisher {
    void subscribe(Subscriber subscriber);
}

3、如何把单元格和发布订阅者这个概念结合起来呢?

实际上,单元格既是一个发布者(它可以向其他单元格发布自己的事件),又是一个订阅者(需要依据其他单元格的事件进行响应)

private static class SimpleCell implements Publisher, Subscriber {

    private int value = 0;
    private String name;
    private List subscribers = new ArrayList<>();
    public SimpleCell(String name) {
        this.name = name;
    }
	
    @Override
    public void subscribe(Subscriber subscriber) {
        subscribers.add(subscriber);
    }

	//通过更新自己的值来响应它订阅的单元格所发生的变化
    @Override
    public void onNext(Integer integer) {
        //先修改自己的值
        this.value = integer;
        System.out.println(this.name+":"+this.value);
        //再通知所有订阅者,自己更新了值
        notifyAllSubscribers();
    }

    //通知所有订阅者产生了新值
    private void notifyAllSubscribers() {
        subscribers.forEach(subscriber -> subscriber.onNext(this.value));
    }
}

4、如何实现C3=C2+C1呢?

引入一个单独的类来保存算数操作符的左边和右边:

private static class ArithmeticCell extends SimpleCell {
    private int left;
    private int right;

    public ArithmeticCell(String name) {
        super(name);
    }

    public void setLeft(int left) {
        this.left = left;
        onNext(left + this.right);
    }

    public void setRight(int right) {
        this.right = right;
        onNext(right + this.left);
    }
}

现在就可以实现一个案例了:

    public static void main(String[] args) {
//        SimpleCell c1 = new SimpleCell("c1");
//        SimpleCell c2 = new SimpleCell("c2");
//        SimpleCell c3 = new SimpleCell("c3");
//
//        // c3订阅c1
//        c1.subscribe(c3);
//        c1.onNext(10);
//        c2.onNext(20);
        ArithmeticCell c3 = new ArithmeticCell("c3");
        SimpleCell c1 = new SimpleCell("c1");
        SimpleCell c2 = new SimpleCell("c2");

        //c3订阅c1和c2
        c1.subscribe(c3::setLeft);
        c2.subscribe(c3::setRight);

        c1.onNext(10);
        c2.onNext(20);
        c1.onNext(15);
    }

二、JDK9 Flow 2.1 简介

jdk9提供的Flow封装了四种反应式的标准模型接口:

  • 发布者(Publisher)
  • 订阅者(Subscriber)
  • 订阅关系(Subscription)
  • 处理者(Processor)
public final class Flow {
    static final int DEFAULT_BUFFER_SIZE = 256;

    private Flow() {
    }

    public static int defaultBufferSize() {
        return 256;
    }

    public interface Processor extends Flow.Subscriber, Flow.Publisher {
    }

    public interface Subscription {
        void request(long var1);

        void cancel();
    }

    public interface Subscriber {
        void onSubscribe(Flow.Subscription var1);

        void onNext(T var1);

        void onError(Throwable var1);

        void onComplete();
    }

    @FunctionalInterface
    public interface Publisher {
        void subscribe(Flow.Subscriber var1);
    }
}
2.2 示例:温度计汇报

1、JavaBean表示温度信息:

public class TempInfo {
    private static final Random random = new Random();
    private final String town;
    private final int temp;

    public TempInfo(String town, int temp) {
        this.town = town;
        this.temp = temp;
    }

    
    public static TempInfo fetch(String town) {
        if (random.nextInt(10) == 0) { // 每10次可能失败一次
            throw new RuntimeException("Error!");
        }
        // 返回温度,华氏0到99度
        return new TempInfo(town, random.nextInt(100));
    }

    @Override
    public String toString() {
        return town + ":" + temp;
    }

    public String getTown() {
        return town;
    }

    public int getTemp() {
        return temp;
    }
}

2、Subscription接口实现,向Subscriber发送TempInfo Steam

public class TempSubscription implements Flow.Subscription {

    private final Flow.Subscriber subscriber;
    private final String town;
    private static final ExecutorService executor = Executors.newSingleThreadExecutor();

    public TempSubscription(Flow.Subscriber subscriber, String town) {
        this.subscriber = subscriber;
        this.town = town;
    }

    @Override
    public void request(long n) {
        // 另起一个线程向subscriber发送下一个元素
        executor.submit(() -> {
            for (long i = 0L; i < n; i++) {//subscriber每处理一个请求执行一次循环
                try {
                    // 将当前温度发送给Subscriber
                    subscriber.onNext(TempInfo.fetch(town));
                } catch (Exception e) {
                    // 查询温度报错将这个报错信息传给Subscriber
                    subscriber.onError(e);
                    e.printStackTrace();
                    break;
                }
            }
        });

    }

    @Override
    public void cancel() {
        // 如果Subscription被取消了,向subscriber发送一个完成信号
        subscriber.onComplete();
    }
}

3、Subscriber接口实现,打印输出收到的温度数据:

public class TempSubscriber implements Flow.Subscriber {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(TempInfo tempInfo) {
        System.out.println(tempInfo);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println(throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Done!");
    }
}

4、Main类:创建Publisher并向其订阅TempSubscriber

public class Main {
    public static void main(String[] args) {
        getTemperature("Harbin").subscribe(new TempSubscriber());
    }

    //Publisher是个函数式接口
    private static Flow.Publisher getTemperature(String town) {
        return subscriber -> subscriber.onSubscribe(new TempSubscription(subscriber, town));
    }
}

现在已经可以使用了:

5、Processor转换数据:

public class TempProcessor implements Flow.Processor {

    private Flow.Subscriber subscriber;

    @Override
    public void subscribe(Flow.Subscriber subscriber) {
        this.subscriber = subscriber;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscriber.onSubscribe(subscription);
    }

    @Override
    public void onNext(TempInfo tempInfo) {
        //转换为摄氏度
        subscriber.onNext(new TempInfo(tempInfo.getTown(), (tempInfo.getTemp() - 32) * 5 / 9));
    }

    @Override
    public void onError(Throwable throwable) {
        subscriber.onError(throwable);
    }

    @Override
    public void onComplete() {
        subscriber.onComplete();
    }
}
public class Main {
    public static void main(String[] args) {
        getTemperature("Harbin").subscribe(new TempSubscriber());
    }

    //Publisher是个函数式接口
    private static Flow.Publisher getTemperature(String town) {

        return new Flow.Publisher() {
            @Override
            public void subscribe(Flow.Subscriber subscriber) {
                TempProcessor tempProcessor = new TempProcessor();
                tempProcessor.subscribe(subscriber);
                subscriber.onSubscribe(new TempSubscription(subscriber, town));
            }
        };
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/842899.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号