1.名词解释
Reactive Streams、Reactor、WebFlux以及响应式编程之间存在密切的关系,它们共同构成了在Java生态系统中处理异步和响应式编程的一系列工具和框架。
-  
Reactive Streams:
- Reactive Streams 是一个规范,定义了一组接口和协议,用于处理异步数据流的背压。它包括发布者(Publisher)、订阅者(Subscriber)、订阅(Subscription)和处理器(Processor)等接口。
 - Reactive Streams 规范的目标是提供一种标准的方式来处理异步数据流,解决背压问题。Java标准库从Java 9开始提供了 
java.util.concurrent.Flow类,定义了Reactive Streams规范。 
 -  
Reactor:
- Reactor 是一个基于Reactive Streams规范的响应式编程框架。它提供了一组用于构建异步、事件驱动、响应式应用程序的工具和库。Reactor 的核心是 Flux(表示一个包含零到多个元素的异步序列)和 Mono(表示一个包含零或一个元素的异步序列)。
 - Reactor 通过提供响应式的操作符,如
map、filter、flatMap等,使得开发者能够方便地进行数据流的转换和处理。 
 -  
WebFlux:
- WebFlux 是Spring Framework 5引入的响应式编程支持。它构建在 Reactor 之上,提供了一套用于构建异步、非阻塞、响应式的Web应用程序的API。WebFlux支持使用Reactive Streams处理HTTP请求和响应。
 - Spring WebFlux 可以用于构建反应式的RESTful服务,支持使用注解的方式定义路由和处理器函数。
 
 -  
响应式编程:
- 响应式编程是一种编程范式,强调数据流和变化的传播。在这个范式中,数据源产生数据并通知观察者,观察者相应地处理这些数据。这种方式更容易处理异步操作和事件。
 - 在Java中,响应式编程通常涉及到使用类似于Reactor或RxJava的库,这些库提供了响应式的操作符和工具。
 
 
综上所述,Reactive Streams 提供了规范,Reactor 是一个实现了该规范的响应式编程框架,而WebFlux是Spring对于响应式编程的支持。它们共同致力于构建异步、非阻塞、响应式的应用程序。响应式编程则是一种更广义的编程范式,与Reactive Streams和Reactor等具体实现密切相关。
2.Reactive Streams 规范
2.1.Reactive Streams规范定义
在java.util.concurrent.Flow 类中,定义了Reactive Streams规范
 
- Publisher(发布者):负责生成数据流,并向订阅者发送数据。
 - Subscriber(订阅者):表示数据流的消费者,它订阅一个或多个发布者,并接收数据。
 - Subscription(订阅):表示订阅关系的接口,用于控制数据流的请求和取消。
 - Processor(处理器):充当发布者和订阅者的中间组件,可以对数据进行转换和处理。
 
2.2.API方法
1. Publisher(发布者):
interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}
 
subscribe(Subscriber<? super T> subscriber): 用于订阅数据流。当订阅者调用这个方法时,发布者将建立与订阅者的订阅关系,并开始推送数据。
2. Subscriber(订阅者):
interface Subscriber<T> {
    void onSubscribe(Subscription subscription);
    void onNext(T item);
    void onError(Throwable throwable);
    void onComplete();
}
 
-  
onSubscribe(Subscription subscription): 在订阅关系建立时调用。通过这个方法,订阅者可以持有Subscription对象,以便后续请求数据和取消订阅。 -  
onNext(T item): 在接收到新元素时调用。订阅者通过这个方法处理收到的数据。 -  
onError(Throwable throwable): 在数据流中出现错误时调用。订阅者通过这个方法处理错误情况。 -  
onComplete(): 在数据流完成时调用。通知订阅者数据流结束,不再有新的元素。 
3. Subscription(订阅):
interface Subscription {
    void request(long n);
    void cancel();
}
 
-  
request(long n): 用于请求订阅者处理指定数量的元素。订阅者通过这个方法告知发布者它可以处理多少个元素。 -  
cancel(): 用于取消订阅关系。当订阅者不再需要接收数据时,调用此方法取消订阅。 
4. Processor(处理器):
interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
 
Processor 接口是 Subscriber 和 Publisher 的组合,表示一个中间处理组件,可以同时充当订阅者和发布者的角色。
-  
Subscriber部分的方法:onSubscribe(Subscription subscription),onNext(T item),onError(Throwable throwable),onComplete()。 -  
Publisher部分的方法:subscribe(Subscriber<? super R> subscriber)。表示Processor可以被其他订阅者订阅。 
5.泛型T
泛型T即为数据流
2.3.工作流程
在 Reactive Streams 中,Publisher、Subscriber、Subscription 和 Processor 之间的协作流程如下:
-  
Publisher(发布者):
Publisher是异步产生数据流的组件,它通过subscribe方法允许订阅者订阅。subscribe方法会接收一个Subscriber对象作为参数。- 当 
Publisher有新数据准备好时,通过调用订阅者的onNext方法将数据推送给订阅者。 
interface Publisher<T> { void subscribe(Subscriber<? super T> subscriber); } -  
Subscriber(订阅者):
Subscriber是数据流的消费者,通过实现Subscriber接口来接收来自发布者的数据。订阅者通过调用subscription.request(n)请求一定数量的数据,处理数据时通过onNext方法接收元素。- 当订阅者无法处理更多的元素时,可以调用 
subscription.cancel()来取消订阅。 
interface Subscriber<T> { void onSubscribe(Subscription subscription); void onNext(T item); void onError(Throwable throwable); void onComplete(); } -  
Subscription(订阅):
Subscription表示订阅关系,它在onSubscribe方法中被传递给订阅者。通过Subscription,订阅者可以请求数据和取消订阅。- 订阅者通过 
request(long n)方法请求处理 n 个元素,通过cancel()方法取消订阅。 
interface Subscription { void request(long n); void cancel(); } -  
Processor(处理器):
Processor是一个同时实现了Publisher和Subscriber接口的中间组件,可以作为数据流的处理器,对数据进行转换和处理。Processor既能接收数据,也能发布数据。它将onNext、onError和onComplete方法委托给下游的订阅者,并将数据推送给上游的发布者。
interface Processor<T, R> extends Subscriber<T>, Publisher<R> { } 
这些接口一起构成了 Reactive Streams 的基本协议。发布者产生数据,订阅者订阅数据流并通过 onNext 方法接收元素,订阅者通过 request 方法请求处理一定数量的元素,同时可以通过 cancel 方法取消订阅。Processor 则可以用于在订阅者和发布者之间进行数据转换和处理。在 Reactive Streams 的实现中,这些接口的方法调用是异步进行的,以支持非阻塞的数据流处理。
3.自定义实现Reactive Streams规范
自己实现了一个,参考了SubmissionPublisher
class MyPublisher implements Flow.Publisher<String>{
    MySubscription<String> subscription;
    public int request ;
    public void publish(String item){
        subscription.items.add(item);
        while (true) {
            if (request > 0) {
                for (int i = 0; i < request; i++) {
                    if (!subscription.items.isEmpty()) {
                        try {
                            Object o = subscription.items.get(subscription.items.size() - 1);
                            subscription.subscriber.onNext(o.toString());
                            subscription.items.remove(o);
                        }catch (Exception e){
                            subscription.subscriber.onError(e);
                            return;
                        }
                    }
                }
            }
            if (subscription.items.isEmpty()) {
                break;
            }
        }
    }
    @Override
    public void subscribe(Flow.Subscriber<? super String> subscriber) {
        System.out.println("第一步:绑定订阅者" );
        MySubscription<String> subscription = new MySubscription<>(subscriber,this);
        this.subscription = subscription;
        subscriber.onSubscribe(subscription);
    }
}
class MySubscriber implements Flow.Subscriber<String>{
    private Flow.Subscription subscription;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        System.out.println("第二步:接收Subscription" );
        this.subscription = subscription;
        // 请求订阅者处理的元素数量
        subscription.request(1);
    }
    @Override
    public void onNext(String item) {
        System.out.println("第四步:推送数据" );
        System.out.println("MySubscriber 消费了item = " + item);
        subscription.request(1);
    }
    @Override
    public void onError(Throwable throwable) {
        System.out.println("出异常了 = " + throwable);
    }
    @Override
    public void onComplete() {
    }
}
class MySubscription<T> implements Flow.Subscription{
    final Flow.Subscriber<? super T> subscriber;
    final MyPublisher publisher;
    List items = new ArrayList();
    public MySubscription(Flow.Subscriber<? super T> subscriber, MyPublisher publisher) {
        this.subscriber = subscriber;
        this.publisher = publisher;
    }
    @Override
    public void request(long n) {
        this.publisher.request++;
        System.out.println("第三步:拉取请求" );
    }
    @Override
    public void cancel() {
    }
}
public class FlowDemo {
    public static void main(String[] args) {
        MyPublisher myPublisher = new MyPublisher();
        MySubscriber mySubscriber = new MySubscriber();
        myPublisher.subscribe(mySubscriber);
        myPublisher.publish("111");
        myPublisher.publish("222");
        myPublisher.publish(null);
    }
}
 
4.Jdk实现Reactive Streams使用示例
class SimplePublisher implements Flow.Publisher<Integer> {
    private final SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();
    public void publishItems() {
        for (int i = 1; i <= 5; i++) {
            publisher.submit(i);
        }
        // 发布者完成发布
        publisher.close();
    }
    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        publisher.subscribe(subscriber);
    }
}
class SimpleSubscriber implements Flow.Subscriber<Integer> {
    private Flow.Subscription subscription;
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        // 请求订阅者处理的元素数量
        subscription.request(1);
    }
    @Override
    public void onNext(Integer item) {
        System.out.println("Received item: " + item);
        // 处理完一个元素后请求下一个
        subscription.request(1);
    }
    @Override
    public void onError(Throwable throwable) {
        System.err.println("Error occurred: " + throwable.getMessage());
    }
    @Override
    public void onComplete() {
        System.out.println("Processing completed.");
    }
}
public class ReactiveStreamsExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建发布者和订阅者
        SimplePublisher simplePublisher = new SimplePublisher();
        SimpleSubscriber simpleSubscriber = new SimpleSubscriber();
        // 订阅者订阅发布者
        simplePublisher.subscribe(simpleSubscriber);
        // 发布者发布数据
        simplePublisher.publishItems();
        // 睡一觉,确保数据处理完成
        Thread.sleep(3000);
    }
}









