Spring-WebFlux使用,一文带你从0开始学明白Spring-WebFlux,学明白响应式编程
作者:mmseoamin日期:2023-12-25

文章目录

  • 一、Spring-WebFlux介绍
    • 区别于Spring MVC
    • 二、Spring-WebFlux的使用
      • 1、注解编程模型
        • (1)定义实体类
        • (2)定义service
        • (3)定义controller
        • (4)测试一下吧~
        • 2、函数式编程模型
          • (1)定义实体类
          • (2)定义service
          • (3)定义handler
          • (4)测试一下吧~
          • 三、源码及原理分析
            • 1、SpringWebFlux核心控制器
            • 2、答疑
            • 四、响应式数据持久化
            • 五、使用响应式web客户端-WebClient
            • 写在后面

              一、Spring-WebFlux介绍

              传统的基于Servlet的Web框架,如Spring MVC,在本质上都是阻塞和多线程的,每个连接都会使用一个线程。在请求处理的时候,会在线程池中拉取一个工作者( worker )线程来对请求进行处理。同时,请求线程是阻塞的,直到工作者线程提示它已经完成为止。

              在Spring5中,引入了一个新的异步、非阻塞的WEB模块,就是Spring-WebFlux。该框架在很大程度上是基于Reactor项目的,能够解决Web应用和API中对更好的可扩展性的需求。

              关于Reactor响应式编程的前置知识,请移步:响应式编程详解,带你熟悉Reactor响应式编程

              异步的Web框架能够以更少的线程获得更⾼的可扩展性,通常它们只需要与CPU核⼼数量相同的线程。通过使⽤所谓的事件轮询(event looping)机制,这些框架能够⽤⼀个线程处理很多请求,这样每次连接的成本会更低。在事件轮询中,所有事情都是以事件的⽅式来进⾏处理的,包括请求以及密集型操作(如数据库和⽹络操作)的回调。当需要执⾏成本⾼昂的操作时,事件轮询会为该操作注册⼀个回调,这样操作可以并⾏执⾏,⽽事件轮询则会继续处理其他的事件。

              在这里插入图片描述

              Spring 5通过名为WebFlux的新Web框架来⽀持反应式Web应⽤,Spring5定义的完整Web开发技术栈如图所⽰:

              在这里插入图片描述

              区别于Spring MVC

              与SpringMVC相比较,Spring WebFlux没有与Servlet API耦合,所以它的运⾏并不需要Servlet容器。它可以运⾏在任意⾮阻塞Web容器中,包括Netty、Undertow、Tomcat、Jetty或任意Servlet 3.1及以上的容器。

              在这里插入图片描述

              而且它的使用,我们需要添加Spring Boot WebFlux starter依赖项,⽽不是标准的Web starter(例如,spring-boot-starter-web)。

              
               org.springframework.boot
               spring-boot-starter-webflux
              
              

              WebFlux的默认嵌⼊式服务器是Netty⽽不是Tomcat。Netty是⼀个异步、事件驱动的服务器,⾮常适合Spring WebFlux这样的反应式Web框架。

              Spring WebFlux是真正的反应式Web框架,允许在事件轮询中处理请求;⽽Spring MVC是基于Servlet的,依赖于多线程来处理多个请求。

              二、Spring-WebFlux的使用

              SpringWebFlux实现方式有两种:注解编程模型和函数式编程模型。

              注解编程模型和之前的SpringMVC方式很类似,注解都是相同的。

              函数式编程模型,需要我们手动来构建web服务和路由。

              首先要引入包:

              
               org.springframework.boot
               spring-boot-starter-webflux
              
              

              1、注解编程模型

              (1)定义实体类

              public class User {
                  private String name;
                  // get set 构造器 toString 略
              }
              

              (2)定义service

              import java.util.HashMap;
              import java.util.Map;
              @Service
              public class UserService {
                  // 模拟数据库存储
                  private Map map = new HashMap<>();
                  public UserService() {
                      map.put(1, new User("zhangsan"));
                      map.put(2, new User("lisi"));
                      map.put(3, new User("wangwu"));
                  }
                  // 根据id查询
                  public Mono getById(Integer id){
                      // 返回数据或空值
                      return Mono.justOrEmpty(map.get(id));
                  }
                  // 查询多个
                  public Flux getAll(){
                      return Flux.fromIterable(map.values());
                  }
                  // 保存
                  public Mono save(Mono userMono){
                      return userMono.doOnNext(user -> {
                          int id = map.size() + 1;
                          map.put(id, user);
                      }).thenEmpty(Mono.empty()); // 最后置空
                  }
              }
              

              (3)定义controller

              import org.springframework.beans.factory.annotation.Autowired;
              import org.springframework.web.bind.annotation.*;
              import reactor.core.publisher.Flux;
              import reactor.core.publisher.Mono;
              @RestController
              @RequestMapping("/flux")
              public class UserController {
                  private final UserService userService;
                  @Autowired
                  public UserController(UserService userService) {
                      this.userService = userService;
                  }
                  // 根据id查询
                  @GetMapping("/{id}")
                  public Mono getById(@PathVariable Integer id){
                      return userService.getById(id);
                  }
                  // 查询多个
                  @GetMapping("/all")
                  public Flux getAll(){
                      return userService.getAll();
                  }
                  // 保存
                  @PostMapping("/save")
                  public Mono save(@RequestBody Mono userMono){
                      return userService.save(userMono);
                  }
              }
              

              (4)测试一下吧~

              跟SpringMVC一样正常访问,查询、修改。

              2、函数式编程模型

              在使用函数式编程模型时,需要自己初始化服务器。

              基于函数式编程模型,有两个核心接口:RouterFunction(实现路由功能,请求转发给对应的handler)和HandlerFunction(处理请求生成响应的函数)。核心任务定义两个函数式接口的实现并且启动需要的服务器。

              SpringWebflux请求和响应不再是ServletRequest和ServletResponse,而是ServerRequest和ServerResponse。

              (1)定义实体类

              public class User {
                  private String name;
                  // get set 构造器 toString 略
              }
              

              (2)定义service

              import reactor.core.publisher.Flux;
              import reactor.core.publisher.Mono;
              import java.util.HashMap;
              import java.util.Map;
              public class UserService {
                  // 模拟数据库存储
                  private Map map = new HashMap<>();
                  public UserService() {
                      map.put(1, new User("zhangsan"));
                      map.put(2, new User("lisi"));
                      map.put(3, new User("wangwu"));
                  }
                  // 根据id查询
                  public Mono getById(Integer id){
                      // 返回数据或空值
                      return Mono.justOrEmpty(map.get(id));
                  }
                  // 查询多个
                  public Flux getAll(){
                      return Flux.fromIterable(map.values());
                  }
                  // 保存
                  public Mono save(Mono userMono){
                      return userMono.doOnNext(user -> {
                          int id = map.size() + 1;
                          map.put(id, user);
                      }).thenEmpty(Mono.empty()); // 最后置空
                  }
              }
              

              (3)定义handler

              import org.springframework.http.MediaType;
              import org.springframework.http.server.reactive.HttpHandler;
              import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
              import org.springframework.web.reactive.function.BodyInserters;
              import org.springframework.web.reactive.function.server.*;
              import reactor.core.publisher.Flux;
              import reactor.core.publisher.Mono;
              import reactor.netty.http.server.HttpServer;
              public class UserHandler {
                  private final UserService userService;
                  public UserHandler(UserService userService) {
                      this.userService = userService;
                  }
                  // 根据id查询
                  public Mono getById(ServerRequest request){
                      // 获取id值
                      String id = request.pathVariable("id");
                      // 空值处理
                      Mono notFound = ServerResponse.notFound().build();
                      // 调用Service方法得到数据
                      Mono userMono = userService.getById(Integer.parseInt(id));
                      // 把userMono进行转换返回
                      return userMono.flatMap(user ->
                              ServerResponse
                                      .ok()
                                      .contentType(MediaType.APPLICATION_JSON)
                                      .body(BodyInserters.fromValue(userMono))
                                      .switchIfEmpty(notFound)
                      );
                  }
                  // 查询多个
                  public Mono getAll(ServerRequest request){
                      // 调用Service得到结果
                      Flux users = userService.getAll();
                      return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(users, User.class);
                  }
                  // 保存
                  public Mono save(ServerRequest request){
                      // 获取User对象
                      Mono userMono = request.bodyToMono(User.class);
                      return ServerResponse.ok().build(userService.save(userMono));
                  }
                  public static void main(String[] args) {
                      // 创建对象
                      UserService userService = new UserService();
                      UserHandler userHandler = new UserHandler(userService);
                      // 创建路由
                      RouterFunction route = RouterFunctions
                              .route(RequestPredicates.GET("/user/{id}").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), userHandler::getById)
                              .andRoute(RequestPredicates.GET("/users").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), userHandler::getAll);
                      // 路由和handler适配
                      HttpHandler httpHandler = RouterFunctions.toHttpHandler(route);
                      ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
                      // 创建服务器
                      HttpServer httpServer = HttpServer.create();
                      httpServer.handle(adapter).bindNow();
                  }
              }
              

              (4)测试一下吧~

              三、源码及原理分析

              1、SpringWebFlux核心控制器

              SpringWebFlux执行过程和SpringMVC很相似。

              SpringWebFlux核心控制器为DispatcherHandler,实现WebHandler接口。

              // org.springframework.web.reactive.DispatcherHandler#handle
              @Override
              public Mono handle(ServerWebExchange exchange) { // exchange中放着http请求响应信息
              	if (this.handlerMappings == null) { // 根据请求地址获取对应的mapping
              		return createNotFoundError();
              	}
              	if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
              		return handlePreFlight(exchange);
              	}
              	return Flux.fromIterable(this.handlerMappings)
              			.concatMap(mapping -> mapping.getHandler(exchange))
              			.next()
              			.switchIfEmpty(createNotFoundError())
              			.flatMap(handler -> invokeHandler(exchange, handler)) // 调用业务方法
              			.flatMap(result -> handleResult(exchange, result)); // 处理结果返回
              }
              

              2、答疑

              在理想情况下,反应式控制器将会位于反应式端到端栈的顶部,这个栈包括了控制器、repository、数据库以及在它们之间可能还会包含的服务。

              在这里插入图片描述

              我们使用SpringWebFlux时并没有调⽤subscribe()。框架将会为我们调⽤subscribe()。

              四、响应式数据持久化

              目前MySQL是不支持响应式的,而部分NoSQL数据库如MongoDB、redis、Cassandra等都支持响应式。

              此处关于与数据库的交互实现响应式暂略,后续有时间再单独出文章供学习借鉴。

              而SpringWebFlux,也常用于SpringCloud-Gateway网关,用于处理请求、路由转发等功能的,对数据库的需求相对来说比较少。

              五、使用响应式web客户端-WebClient

              springboot-webFlux的webclient详细使用介绍,细节拉满

              写在后面

              如果本文对你有帮助,请点赞收藏关注一下吧 ~

              在这里插入图片描述