0
点赞
收藏
分享

微信扫一扫

Springboot2.0WebFlux 开发

读思意行 2022-06-10 阅读 52

  简单了解下其用法。

1. JDK9的Reactive Stream 用法

响应式流,和发布订阅者模式一样,只不过订阅者可以自己控制生产者发送数据的速度。

1. 背压

  背压是一种常用策略,使得发布者拥有无限制的缓冲区存储元素,用于确保发布者发布元素太快时,不会去压制订阅者。举个例子就是消费者需要水的时候就打开水龙头, 不需要就关闭开关,也就是消费者可以自己控制流量。

2. 简单使用

1. 不带处理器

package flux;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class Test1 {

public static void main(String[] args) throws InterruptedException {
// 1. 定义发布者,发布者的数据类型是Integer
// 使用SubmissionPublisher, 该类实现了接口 java.util.concurrent.Flow.Publisher
SubmissionPublisher<Integer> integerSubmissionPublisher = new SubmissionPublisher<>();

// 2. 定义订阅者
Flow.Subscriber<Integer> subscriber = new Flow.Subscriber<>() {

private Flow.Subscription subscription;

// 建立订阅关系时候调用
@Override
public void onSubscribe(Flow.Subscription subscription) {
// 保存起来
this.subscription = subscription;
// 请求数据
this.subscription.request(1);
}

// 有数据到达会调用
@Override
public void onNext(Integer item) {
// 消费数据
System.out.println("接收到数据: " + item);

// 下面方法就是可以控制速率
// 再次请求一条
subscription.request(1);
// 或者调用cancel 告知已经不需要消费
// subscription.cancel();
}

// 一般是处理异常会调用到
@Override
public void onError(Throwable throwable) {
System.out.println("onError, " + throwable.getMessage());
this.subscription.cancel();
}

// 处理完成。 一般是调用 integerSubmissionPublisher.close(); 会调用下面方法
@Override
public void onComplete() {
System.out.println("java.util.concurrent.Flow.Subscriber.onComplete");
}
};

// 3. 发布者和订阅者建立关系
integerSubmissionPublisher.subscribe(subscriber);

// 4.生产数据并发布
integerSubmissionPublisher.submit(1);
integerSubmissionPublisher.submit(2);
integerSubmissionPublisher.submit(3);

// 5. 关闭相关流。 一般在finally 关闭
integerSubmissionPublisher.close();

// 当前线程阻塞等待,便于看到效果
Thread.currentThread().join(1 * 1000);
}
}

2. 带处理器

package flux;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class Test2 {

public static void main(String[] args) throws InterruptedException {
// 1. 定义发布者,发布者的数据类型是Integer
// 使用SubmissionPublisher, 该类实现了接口 java.util.concurrent.Flow.Publisher
SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();

// 2. 定义一个处理器,对数据进行过滤
Myprocessor myprocessor = new Myprocessor();
publisher.subscribe(myprocessor);

// 3. 定义订阅者
Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {

private Flow.Subscription subscription;

// 建立订阅关系时候调用
@Override
public void onSubscribe(Flow.Subscription subscription) {
// 保存起来
this.subscription = subscription;
// 请求数据
this.subscription.request(1);
}

// 有数据到达会调用
@Override
public void onNext(String item) {
// 消费数据
System.out.println("接收到数据: " + item);

// 下面方法就是可以控制速率
// 再次请求一条
subscription.request(1);
// 或者调用cancel 告知已经不需要消费
// subscription.cancel();
}

// 一般是处理异常会调用到
@Override
public void onError(Throwable throwable) {
System.out.println("onError, " + throwable.getMessage());
this.subscription.cancel();
}

// 处理完成。 一般是调用 integerSubmissionPublisher.close(); 会调用下面方法
@Override
public void onComplete() {
System.out.println("java.util.concurrent.Flow.Subscriber.onComplete");
}
};

// 4. 发布者和订阅者建立关系
myprocessor.subscribe(subscriber);

// 5.生产数据并发布, submit 方法是一个阻塞方法, 实际是
publisher.submit(1);
publisher.submit(-2);
publisher.submit(-3);

// 6. 关闭相关流。 一般在finally 关闭
publisher.close();
// myprocessor.close();

// 当前线程阻塞等待,便于看到效果
Thread.currentThread().join(1 * 1000);
}

private static class Myprocessor extends SubmissionPublisher<String> implements Flow.Processor<Integer, String> {

private Flow.Subscription subscription;

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(Integer item) {
System.out.println("处理器接收到的数据: " + item);
if (item > 0) {
this.submit(item.toString());
}

// 再次请求一条
subscription.request(1);
// 或者调用cancel 告知已经不需要消费
// subscription.cancel();
}

@Override
public void onError(Throwable throwable) {
// ingore
}

@Override
public void onComplete() {
// ignore
}
}
}

结果

处理器接收到的数据: 1
处理器接收到的数据: -2
处理器接收到的数据: -3
接收到数据: 1

 2. 异步Servlet 用法

1. 同步Servlet

package test;

import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@WebServlet("/SyncServlet")
public class SyncServlet extends HttpServlet {

private static final long serialVersionUID = 1L;

protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
long start = System.currentTimeMillis();

doSomeThing(request, response);

long end = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + " use: " + (end - start));
}

private void doSomeThing(HttpServletRequest request, HttpServletResponse response) {
try {
Thread.currentThread().sleep(5 * 1000);
response.getWriter().append("Served at: ").append(request.getContextPath());
} catch (Exception e) {
e.printStackTrace();
}
}

protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
doGet(request, response);
}

}

结果:

http-bio-8080-exec-12 use: 5000

2. AsyncServlet 异步Servlet

(1) . asyncSupported = true 开启异步支持

(2). request.startAsync() 开启异步环境并且获取一个对象: AsyncContext

(3). 执行完成后提交: asyncContext.complete();

异步代码:

package test;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@WebServlet(value = "/AsyncServlet", asyncSupported = true)
public class AsyncServlet extends HttpServlet {

private static final long serialVersionUID = 1L;

protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
long start = System.currentTimeMillis();

AsyncContext asyncContext = request.startAsync();
CompletableFuture.runAsync(() -> {
doSomeThing(asyncContext, request, response);
});
long end = System.currentTimeMillis();

System.out.println(Thread.currentThread().getName() + " use: " + (end - start));
}

private void doSomeThing(AsyncContext asyncContext, HttpServletRequest request, HttpServletResponse response) {
try {
Thread.currentThread().sleep(5 * 1000);
response.getWriter().append("Served at: ").append(request.getContextPath());
} catch (Exception e) {
e.printStackTrace();
}

asyncContext.complete();
}

protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
doGet(request, response);
}

}

结果:

http-bio-8080-exec-9 use: 1

3. 前端界面展示:

Springboot2.0WebFlux 开发_spring

   可以看出来,前端看到的是一样的, 都是阻塞的。后端只是tomcat 执行任务的线程池 exec 线程是非阻塞的。 个人理解是tomcat 的线程池可以执行更多的任务,我们执行业务耗时的操作放到了自己的线程池里面。

3. SSE 简单用法

  Server Send Event 类似于Websocket,只不过websocket 是双通道,客户端和服务端都可以发送消息; sse 是只能server 向客户端发送。

  server 端向客户端发送数据的格式为: "data:" + 真实数据 + "\n\n"

1. 服务器端代码:

package test;

import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
* Server Send Event 基于浏览器的 Web 应用程序中仅从服务器向客户端发送文本消息的技术
*/
@WebServlet("/SSEServlet")
public class SSEServlet extends HttpServlet {

private static final long serialVersionUID = 1L;

protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
long start = System.currentTimeMillis();

doSomeThing(request, response);

long end = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + " use: " + (end - start));
}

private void doSomeThing(HttpServletRequest request, HttpServletResponse response) {
response.setHeader("Content-Type", "text/event-stream");
response.setCharacterEncoding("utf-8");

for (int index = 0; index < 5; index++) {
try {
Thread.currentThread().sleep(1* 1000);
// 添加事件类型标识
response.getWriter().write("event:me\n");
// 数据格式 "data:" + 真实数据 + 两个回车
response.getWriter().write("data:" + index + "\n\n");
response.getWriter().flush();
} catch (Exception e) {
e.printStackTrace();
}
}
}

protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
doGet(request, response);
}

}

2. 客户端代码

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Insert title here</title>
</head>
<body>

<script type="text/javascript">
// 如果支持则能获取到 window.EventSource 对象,否则就是浏览器不支持
console.log(window.EventSource);

// 依赖H5
// 参数为url
var sse = new EventSource("SSEServlet");
/* sse.onmessage = function(msg) {
console.log(msg);
} */

// 指定消息类型
sse.addEventListener("me", function(e) {
// 关闭重连,否则会一直重复刷新数据。 默认好像是3s 后自动重新连接获取笑嘻嘻
console.log("me data:" + e.data);
if (e.data == 4) {
sse.close();
}
});
</script>

</body>
</html>

结果:

 Springboot2.0WebFlux 开发_json_02

 

 查看network 如下:

Springboot2.0WebFlux 开发_spring_03

 

 Springboot2.0WebFlux 开发_spring_04

 4. webflux 简单使用

1. 简介

(1).什么是webflux

  Spring Framework 中包含的原始 Web 框架 Spring Web MVC 是专门为 Servlet API 和 Servlet 容器构建的。反应式堆栈 Web 框架 Spring WebFlux 是在 5.0 版的后期添加的。它是完全非阻塞的,支持反应式流(Reactive Stream)背压,并在Netty,Undertow和Servlet 3.1 +容器等服务器上运行。

(2).Reactive Stream、Reactor 和 WebFlux 三者的关系?

  Reactive Stream 是一套反应式编程 标准 和 规范;Reactor 是基于 Reactive Streams 一套 反应式编程框架;WebFlux 以 Reactor 为基础,实现 Web 领域的 反应式编程框架。

(3).webflux 有两个重要的类。 Mono 和 Flux。  Mono 表示0-1 个元素, Flux 表示0-N 个元素。例如:

package flux;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;

public class ReactorDemo {

public static void main(String[] args) throws InterruptedException {
// reactor = jdk8 stream + jdk9 reactive stream
// Mono 0-1 个元素
// Flux 0-N 个元素
Subscriber<Integer> integerSubscriber = new Subscriber<Integer>() {

private Subscription subscription;

@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
subscription.request(1);
}

@Override
public void onNext(Integer o) {
System.out.println("org.reactivestreams.Subscriber.onNext, o: " + o);
subscription.request(1);
}

@Override
public void onError(Throwable t) {
t.printStackTrace();
}

@Override
public void onComplete() {
System.out.println("complete");
}
};

String []strs = {"1", "2", "3"};
// jdk8 的stream
Flux.fromArray(strs).map(s -> Integer.parseInt(s))
// jdk9 的reactor stream
.subscribe(integerSubscriber);

Thread.sleep(5 * 1000);
}
}

结果:

org.reactivestreams.Subscriber.onNext, o: 1
org.reactivestreams.Subscriber.onNext, o: 2
org.reactivestreams.Subscriber.onNext, o: 3
complete

2. webflux 简单使用

 1. pom 文件依赖

<dependencies>
<!-- validate 相关注解 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-tomcat</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>r09</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

  其有两种使用方式。 第一种是springMVC模式注解,第二种是基于RoutingFunction 模式。

 2. 控制层简单使用-注解模式

(1) 启动项目, 启动后日志如下:可以检出默认使用的是netty 作为服务器

.   ____          _            __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.3.1.RELEASE)

[ WARN] 2022-06-04 13:41:29.756 5849 --- [ main] o.s.boot.StartupInfoLogger : InetAddress.getLocalHost().getHostName() took 5002 milliseconds to respond. Please verify your network configuration (macOS machines may need to add entries to /etc/hosts).
[ INFO] 2022-06-04 13:41:34.764 5849 --- [ main] flux.Main : Starting Main on aaadeMacBook-Pro.local with PID 5849 (/Users/aaa/app/ideaspace/mvnpro/target/classes started by aaa in /Users/aaa/app/ideaspace/mvnpro)
[ INFO] 2022-06-04 13:41:34.765 5849 --- [ main] flux.Main : No active profile set, falling back to default profiles: default
[ INFO] 2022-06-04 13:41:40.643 5849 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8081
[ INFO] 2022-06-04 13:41:40.651 5849 --- [ main] flux.Main : Started Main in 21.157 seconds (JVM running for 26.646)

 (2) 编写Controller

1》简单的Mono、Flux、String 类型的返回以及上面SSE类型交互

package flux.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.stream.IntStream;

@RestController
@Slf4j
public class TestController1 {

@GetMapping("/test1")
public String test1() {
log.info("test1 start");
String result = create();
log.info("test1 end");
return result;
}

private String create() {
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("create");
return Thread.currentThread().getName() + " test1";
}

@GetMapping("/test3")
public Mono<String> test3() {
log.info("test3 start");
Mono<String> result = Mono.fromSupplier(() -> create());
log.info("test3 end");
return result;
}

@GetMapping("/test4")
public Flux<String> test4() {
log.info("test4 start");
Flux<String> objectFlux = Flux.fromStream(IntStream.range(1, 5).mapToObj(i -> {
try {
Thread.sleep(1 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(i + "");
return i + "";
}));
log.info("test4 end");
return objectFlux;
}

@GetMapping(value = "/test5", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> test5() {
log.info("test5 start");
Flux<String> objectFlux = Flux.fromStream(IntStream.range(1, 5).mapToObj(i -> {
try {
Thread.sleep(1 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return String.valueOf(i);
}));
log.info("test5 end");
return objectFlux;
}

}

测试:1

curl 上面的前三个接口, 查看日志(三个接口对前端看来都是同步阻塞的)

aaa@aaadeMacBook-Pro ~ % curl localhost:8081/test1
reactor-http-nio-2 test1%
aaa@aaadeMacBook-Pro ~ % curl localhost:8081/test3
reactor-http-nio-3 test1%
aaa@aaadeMacBook-Pro ~ % curl localhost:8081/test4
1234%

日志如下:

[ INFO] 2022-06-04 13:52:08.721 5890 --- [ctor-http-nio-2] flux.controller.TestController1          : test1 start
[ INFO] 2022-06-04 13:52:13.725 5890 --- [ctor-http-nio-2] flux.controller.TestController1 : create
[ INFO] 2022-06-04 13:52:13.725 5890 --- [ctor-http-nio-2] flux.controller.TestController1 : test1 end
[ INFO] 2022-06-04 13:52:21.967 5890 --- [ctor-http-nio-3] flux.controller.TestController1 : test3 start
[ INFO] 2022-06-04 13:52:21.968 5890 --- [ctor-http-nio-3] flux.controller.TestController1 : test3 end
[ INFO] 2022-06-04 13:52:26.975 5890 --- [ctor-http-nio-3] flux.controller.TestController1 : create
[ INFO] 2022-06-04 13:52:36.636 5890 --- [ctor-http-nio-4] flux.controller.TestController1 : test4 start
[ INFO] 2022-06-04 13:52:36.637 5890 --- [ctor-http-nio-4] flux.controller.TestController1 : test4 end
[ INFO] 2022-06-04 13:52:37.645 5890 --- [ctor-http-nio-4] flux.controller.TestController1 : 1
[ INFO] 2022-06-04 13:52:38.674 5890 --- [ctor-http-nio-4] flux.controller.TestController1 : 2
[ INFO] 2022-06-04 13:52:39.678 5890 --- [ctor-http-nio-4] flux.controller.TestController1 : 3
[ INFO] 2022-06-04 13:52:40.682 5890 --- [ctor-http-nio-4] flux.controller.TestController1 : 4

  test1接口是原来SprignMVC 的处理方式,所以其是同步的方式; test3 接口是返回单个,且我们将create 方法放到了Mono 获取中,所以其是异步的, 从日志也可以看出; test4 接口同test3, 只不过test4是返回Flux,返回多个。(这里可以看出返回Mono和Flux,且构造结果是用Mono和Flux 的api 的操作都是异步的.)

测试2: 界面访问test5, 查看SSE类型的处理, 结果如下:

Springboot2.0WebFlux 开发_json_05

 2》编写简单的UserController, 模拟简单的用户增加和查询操作

代码

package flux.controller;

import flux.bean.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.validation.Valid;
import java.util.ArrayList;
import java.util.List;

@RestController
@Slf4j
@RequestMapping("/user")
public class UserController {

/**
* 模拟新增用户
*
* @param user
* @return
*/
@PostMapping(value = "/createUser")
public Mono<User> createUser(@RequestBody @Valid User user) {
log.info("test6 start");

Mono<User> userMono = Mono.fromSupplier(() -> doCreateUser(user));

log.info("test6 end");
return userMono;

}

@GetMapping(value = "/listUsers")
public Flux<User> listUsers() {
log.info("listUsers start");
Flux<User> userFlux = Flux.fromStream(listAllUser().stream());
log.info("listUsers start");
return userFlux;
}

private List<User> listAllUser() {
List<User> objects = new ArrayList<>();
User user = new User();
user.setAge(25);
user.setUsername("username1");

User user2 = new User();
user2.setAge(25);
user2.setUsername("username2");
objects.add(user);
objects.add(user2);

try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("listAllUser sleep end");

return objects;
}

// 模拟入库操作
private User doCreateUser(User user) {
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

user.setId(20);
log.info("doCreateUser sleep end");
return user;
}
}

 

全局异常拦截器处理参数错误

package flux.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.support.WebExchangeBindException;

/**
* 全局异常处理器
*
* @author Administrator
*
*/
@ControllerAdvice
@Slf4j
public class BXExceptionHandler {

@ExceptionHandler(Throwable.class)
public ResponseEntity<String> errorHandler(WebExchangeBindException e) {
log.error("errorHandler", e);
return new ResponseEntity<String>(toStr(e), HttpStatus.BAD_REQUEST);
}

private String toStr(WebExchangeBindException e) {
return e.getFieldErrors().stream().map(e1 ->
e1.getField() + ": " + e1.getDefaultMessage()
).reduce(((s, s2) -> s + "\r\n" + s2)).get();
}
}

测试:

curl 测试验证参数

aaa@aaadeMacBook-Pro ~ % curl --header 'Content-Type: application/json' -d '{"age": 9}' localhost:8081/user/createUser
username: username 不能为空
age: 年龄不能小于10%

curl 两个接口查看日志:

aaa@aaadeMacBook-Pro ~ % curl -X POST --header 'Content-Type: application/json' -d '{"age": 22, "username": "zhangsan"}' localhost:8081/user/createUser
{"id":20,"username":"zhangsan","age":22}%
aaa@aaadeMacBook-Pro ~ % curl -X 'GET' localhost:8081/user/listUsers
[{"id":null,"username":"username1","age":25},{"id":null,"username":"username2","age":25}]%

 查看后端日志:(可以看出创建接口的doCreateUser 方法是异步的; listUsers的listAllUser 方法是同步的,可以理解为先同步调用listAllUser,然后转为Flux。 起始和上面是一致的,设计Mono、Flux的相关API是异步的)

[ INFO] 2022-06-04 14:39:09.673 6124 --- [ctor-http-nio-2] flux.controller.UserController           : test6 start
[ INFO] 2022-06-04 14:39:09.673 6124 --- [ctor-http-nio-2] flux.controller.UserController : test6 end
[ INFO] 2022-06-04 14:39:14.689 6124 --- [ctor-http-nio-2] flux.controller.UserController : doCreateUser sleep end
[ INFO] 2022-06-04 14:39:16.524 6124 --- [ctor-http-nio-3] flux.controller.UserController : listUsers start
[ INFO] 2022-06-04 14:39:18.529 6124 --- [ctor-http-nio-3] flux.controller.UserController : listAllUser sleep end
[ INFO] 2022-06-04 14:39:18.532 6124 --- [ctor-http-nio-3] flux.controller.UserController : listUsers end

3. 基于Handler 和RouteFuntion 的方式

(1) 编写handler

package flux.controller;

import flux.bean.User;
import flux.config.JSONResultUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.List;

@Component
@Slf4j
public class UserHandler {

public Mono<ServerResponse> listAllUser(ServerRequest serverRequest) {
log.info("listAllUser start");
// 返回集合用 Flux
Flux<User> tFlux = Flux.fromStream(listAllUser().stream());
Mono<ServerResponse> body = ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(tFlux, User.class);
log.info("listAllUser end");
return body;
}

public Mono<ServerResponse> findById(ServerRequest serverRequest) {
log.info("findById start");
Integer id = Integer.valueOf(serverRequest.pathVariable("id"));
// id 大于10 返回资源部不存在
if (id > 10) {
return ServerResponse.notFound().build();
}

Mono<ServerResponse> body = ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON_UTF8)
.body(Mono.fromSupplier(() -> {
// 否则返回元素。并且用JSONResultUtil 进行包装
User user = new User();
user.setId(5);
user.setUsername("user" + id);
JSONResultUtil<User> ok = JSONResultUtil.ok();
ok.setData(user);

try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("findById sleep end");
return ok;
}), JSONResultUtil.class);
log.info("findById start");
return body;
}

public Mono<ServerResponse> createUser(ServerRequest serverRequest) {
log.info("createUser start");
Mono<ServerResponse> create = serverRequest.bodyToMono(User.class).flatMap(user1 -> {
System.out.println(user1);
user1.setId(1);

try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("create");

return Mono.just(user1);
}).flatMap(temp ->
ServerResponse.ok().bodyValue(temp)
);

log.info("createUser end");
return create;
}



public Mono<ServerResponse> deleteUser(ServerRequest serverRequest) {
String id = serverRequest.pathVariable("id");
System.out.println(id);

// 模拟删除成功
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.just(Boolean.TRUE), Boolean.class);
}

private List<User> listAllUser() {
List<User> objects = new ArrayList<>();
User user = new User();
user.setAge(25);
user.setUsername("username1");

User user2 = new User();
user2.setAge(25);
user2.setUsername("username2");
objects.add(user);
objects.add(user2);

try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("listAllUser sleep end");

return objects;
}

}

(2) 编写RouteFunction

package flux.config;

import flux.controller.UserHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

@Configuration
public class AllRouters {

@Bean
RouterFunction<ServerResponse> userRouter(UserHandler handler) {
RouterFunction<ServerResponse> nest = RouterFunctions.nest(
// 相当于统一的前缀
RequestPredicates.path("/user"),
// @GetMapping("/listAllUser")
RouterFunctions.route(RequestPredicates.GET("/listAllUser"), handler::listAllUser)
.andRoute(RequestPredicates.POST("/find/{id}"), handler::findById)
.andRoute(RequestPredicates.POST("/createUser"), handler::createUser)
.andRoute(RequestPredicates.DELETE("/{id}"), handler::deleteUser)
);
return nest;
}
}

(3) 测试以及查看日志

curl 接口进行测试

aaa@aaadeMacBook-Pro ~ % curl -X 'GET' localhost:8081/user/listAllUser
[{"id":null,"username":"username1","age":25},{"id":null,"username":"username2","age":25}]%
aaa@aaadeMacBook-Pro ~ % curl -X 'GET' localhost:8081/user/find/2
{"success":true,"data":{"id":5,"username":"user2","age":null},"msg":""}%
aaa@aaadeMacBook-Pro ~ % curl -X POST --header 'Content-Type: application/json' -d '{"age": 22, "username": "zhangsan"}' localhost:8081/user/createUser
{"id":1,"username":"zhangsan","age":22}%
aaa@aaadeMacBook-Pro ~ % curl -X 'DELETE' localhost:8081/user/2
true%

日志:

[ INFO] 2022-06-04 15:24:45.112 6259 --- [ctor-http-nio-2] flux.controller.UserHandler              : listAllUser start
[ INFO] 2022-06-04 15:24:47.112 6259 --- [ctor-http-nio-2] flux.controller.UserHandler : listAllUser sleep end
[ INFO] 2022-06-04 15:24:47.127 6259 --- [ctor-http-nio-2] flux.controller.UserHandler : listAllUser end
[ INFO] 2022-06-04 15:24:50.826 6259 --- [ctor-http-nio-3] flux.controller.UserHandler : findById start
[ INFO] 2022-06-04 15:24:50.828 6259 --- [ctor-http-nio-3] flux.controller.UserHandler : findById end
[ INFO] 2022-06-04 15:24:55.837 6259 --- [ctor-http-nio-3] flux.controller.UserHandler : findById sleep end
[ INFO] 2022-06-04 15:25:22.159 6259 --- [ctor-http-nio-4] flux.controller.UserHandler : createUser start
[ INFO] 2022-06-04 15:25:22.180 6259 --- [ctor-http-nio-4] flux.controller.UserHandler : createUser end
User(id=null, username=zhangsan, age=22)
[ INFO] 2022-06-04 15:25:27.233 6259 --- [ctor-http-nio-4] flux.controller.UserHandler : create
2

(4) 这种模式的参数校验

  自定义异常加自定义全局异常处理器。

1》pom 增加hutu 工具包

<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.2</version>
</dependency>

2》增加自定义异常和检测的工具类

package flux.exception;

public class MyException extends RuntimeException {

public MyException(String msg) {
super(msg);
}
}



package flux.exception;

public class CheckUtils {

public static void isTrue(boolean expression, String msg) {
if (!expression) {
throw new MyException(msg);
}
}
}

3》增加全局异常处理器

package flux.config;

import cn.hutool.json.JSONUtil;
import flux.exception.MyException;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebExceptionHandler;
import reactor.core.publisher.Mono;

import java.nio.charset.StandardCharsets;

@Order(-2)
@Component
public class ExceptionHandler implements WebExceptionHandler {

@Override
public Mono<Void> handle(ServerWebExchange serverWebExchange, Throwable throwable) {
// 设置响应头
ServerHttpResponse response = serverWebExchange.getResponse();
response.getHeaders().setContentType(MediaType.APPLICATION_JSON);

// 判断异常是已知异常还是未知异常
String errorMsg = toStr(throwable);

JSONResultUtil jsonResultUtil = new JSONResultUtil(false);
jsonResultUtil.setMsg(errorMsg);
String jsonStr = JSONUtil.toJsonStr(jsonResultUtil);

DataBuffer wrap = response.bufferFactory().wrap(jsonStr.getBytes(StandardCharsets.UTF_8));
return response.writeWith(Mono.just(wrap));
}

private String toStr(Throwable throwable) {
if (throwable instanceof MyException) {
return throwable.getMessage();
} else {
// 未知异常
return throwable.toString();
}
}
}

4》修改创建方法

public Mono<ServerResponse> createUser(ServerRequest serverRequest) {
// springboot 2.0.0 可以用下面代码获取参数的user, 2.0.1 会报错。 所以都在flatMap 处理异常
// User block = serverRequest.bodyToMono(User.class).block();

log.info("createUser start");
Mono<ServerResponse> create = serverRequest.bodyToMono(User.class).flatMap(user1 -> {
CheckUtils.isTrue(user1.getUsername() != null && !"".equals(user1.getUsername()), "username 不能为空");
CheckUtils.isTrue(user1.getAge() != null && user1.getAge() > 50, "age 不能小于50");

System.out.println(user1);
user1.setId(1);

try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("create");

return Mono.just(user1);
}).flatMap(temp ->
ServerResponse.ok().bodyValue(temp)
);

log.info("createUser end");
return create;
}

测试:

aaa@aaadeMacBook-Pro ~ % curl -X POST --header 'Content-Type: application/json' -d '{"age": 22, "username": "zhangsan"}' localhost:8081/user/createUser
{"msg":"age 不能小于50","success":false}%
aaa@aaadeMacBook-Pro ~ % curl -X POST --header 'Content-Type: application/json' -d '{"age": 220, "username": "zhangsan"}' localhost:8081/user/createUser
{"id":1,"username":"zhangsan","age":220}%
aaa@aaadeMacBook-Pro ~ %

 

  至此简单研究了下基于SpringMVC 原生注解编写Controller 以及基于Handler+RouteFunction 模式的简单使用,同时简单了解了下对参数的校验。

 


【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】


举报

相关推荐

0 条评论