反应式编程离不开发布-订阅模式
发布订阅模式有基本的三个概念:
- 订阅者可以订阅发布者
- 名为订阅的连接
- 消息(也叫事件),它们通过连接传输
发布订阅 的一个简单例子就是整合两个信息源的事件并发布给其他用户使用
例如想象成电子表格:C3 = C1+C2这样一个公式,只要C1或者C2更新,C3也会更新这些变化
1、先对保存值的单元格进行建模:
private static class SimpleCell {
private int value = 0;
private String name;
public SimpleCell(String name) {
this.name = name;
}
}
2、定义两个接口:
private interface Subscriber{ void onNext(T t); } private interface Publisher { void subscribe(Subscriber super T> subscriber); }
3、如何把单元格和发布订阅者这个概念结合起来呢?
实际上,单元格既是一个发布者(它可以向其他单元格发布自己的事件),又是一个订阅者(需要依据其他单元格的事件进行响应)
private static class SimpleCell implements Publisher, Subscriber { private int value = 0; private String name; private List subscribers = new ArrayList<>(); public SimpleCell(String name) { this.name = name; } @Override public void subscribe(Subscriber super Integer> subscriber) { subscribers.add(subscriber); } //通过更新自己的值来响应它订阅的单元格所发生的变化 @Override public void onNext(Integer integer) { //先修改自己的值 this.value = integer; System.out.println(this.name+":"+this.value); //再通知所有订阅者,自己更新了值 notifyAllSubscribers(); } //通知所有订阅者产生了新值 private void notifyAllSubscribers() { subscribers.forEach(subscriber -> subscriber.onNext(this.value)); } }
4、如何实现C3=C2+C1呢?
引入一个单独的类来保存算数操作符的左边和右边:
private static class ArithmeticCell extends SimpleCell {
private int left;
private int right;
public ArithmeticCell(String name) {
super(name);
}
public void setLeft(int left) {
this.left = left;
onNext(left + this.right);
}
public void setRight(int right) {
this.right = right;
onNext(right + this.left);
}
}
现在就可以实现一个案例了:
public static void main(String[] args) {
// SimpleCell c1 = new SimpleCell("c1");
// SimpleCell c2 = new SimpleCell("c2");
// SimpleCell c3 = new SimpleCell("c3");
//
// // c3订阅c1
// c1.subscribe(c3);
// c1.onNext(10);
// c2.onNext(20);
ArithmeticCell c3 = new ArithmeticCell("c3");
SimpleCell c1 = new SimpleCell("c1");
SimpleCell c2 = new SimpleCell("c2");
//c3订阅c1和c2
c1.subscribe(c3::setLeft);
c2.subscribe(c3::setRight);
c1.onNext(10);
c2.onNext(20);
c1.onNext(15);
}
二、JDK9 Flow
2.1 简介
jdk9提供的Flow封装了四种反应式的标准模型接口:
- 发布者(Publisher)
- 订阅者(Subscriber)
- 订阅关系(Subscription)
- 处理者(Processor)
public final class Flow {
static final int DEFAULT_BUFFER_SIZE = 256;
private Flow() {
}
public static int defaultBufferSize() {
return 256;
}
public interface Processor extends Flow.Subscriber, Flow.Publisher {
}
public interface Subscription {
void request(long var1);
void cancel();
}
public interface Subscriber {
void onSubscribe(Flow.Subscription var1);
void onNext(T var1);
void onError(Throwable var1);
void onComplete();
}
@FunctionalInterface
public interface Publisher {
void subscribe(Flow.Subscriber super T> var1);
}
}
2.2 示例:温度计汇报
1、JavaBean表示温度信息:
public class TempInfo {
private static final Random random = new Random();
private final String town;
private final int temp;
public TempInfo(String town, int temp) {
this.town = town;
this.temp = temp;
}
public static TempInfo fetch(String town) {
if (random.nextInt(10) == 0) { // 每10次可能失败一次
throw new RuntimeException("Error!");
}
// 返回温度,华氏0到99度
return new TempInfo(town, random.nextInt(100));
}
@Override
public String toString() {
return town + ":" + temp;
}
public String getTown() {
return town;
}
public int getTemp() {
return temp;
}
}
2、Subscription接口实现,向Subscriber发送TempInfo Steam
public class TempSubscription implements Flow.Subscription {
private final Flow.Subscriber super TempInfo> subscriber;
private final String town;
private static final ExecutorService executor = Executors.newSingleThreadExecutor();
public TempSubscription(Flow.Subscriber super TempInfo> subscriber, String town) {
this.subscriber = subscriber;
this.town = town;
}
@Override
public void request(long n) {
// 另起一个线程向subscriber发送下一个元素
executor.submit(() -> {
for (long i = 0L; i < n; i++) {//subscriber每处理一个请求执行一次循环
try {
// 将当前温度发送给Subscriber
subscriber.onNext(TempInfo.fetch(town));
} catch (Exception e) {
// 查询温度报错将这个报错信息传给Subscriber
subscriber.onError(e);
e.printStackTrace();
break;
}
}
});
}
@Override
public void cancel() {
// 如果Subscription被取消了,向subscriber发送一个完成信号
subscriber.onComplete();
}
}
3、Subscriber接口实现,打印输出收到的温度数据:
public class TempSubscriber implements Flow.Subscriber{ private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(TempInfo tempInfo) { System.out.println(tempInfo); subscription.request(1); } @Override public void onError(Throwable throwable) { System.out.println(throwable.getMessage()); } @Override public void onComplete() { System.out.println("Done!"); } }
4、Main类:创建Publisher并向其订阅TempSubscriber
public class Main {
public static void main(String[] args) {
getTemperature("Harbin").subscribe(new TempSubscriber());
}
//Publisher是个函数式接口
private static Flow.Publisher getTemperature(String town) {
return subscriber -> subscriber.onSubscribe(new TempSubscription(subscriber, town));
}
}
现在已经可以使用了:
5、Processor转换数据:
public class TempProcessor implements Flow.Processor{ private Flow.Subscriber super TempInfo> subscriber; @Override public void subscribe(Flow.Subscriber super TempInfo> subscriber) { this.subscriber = subscriber; } @Override public void onSubscribe(Flow.Subscription subscription) { subscriber.onSubscribe(subscription); } @Override public void onNext(TempInfo tempInfo) { //转换为摄氏度 subscriber.onNext(new TempInfo(tempInfo.getTown(), (tempInfo.getTemp() - 32) * 5 / 9)); } @Override public void onError(Throwable throwable) { subscriber.onError(throwable); } @Override public void onComplete() { subscriber.onComplete(); } }
public class Main {
public static void main(String[] args) {
getTemperature("Harbin").subscribe(new TempSubscriber());
}
//Publisher是个函数式接口
private static Flow.Publisher getTemperature(String town) {
return new Flow.Publisher() {
@Override
public void subscribe(Flow.Subscriber super TempInfo> subscriber) {
TempProcessor tempProcessor = new TempProcessor();
tempProcessor.subscribe(subscriber);
subscriber.onSubscribe(new TempSubscription(subscriber, town));
}
};
}
}



