webflux理论
传统的Spring MVC是采用的同步阻塞式IO模型,即是每一个请求,容器都会新开一个线程去处理。 在处理完成之前,不会接收其他的请求。
webflux是一个异步阻塞式IO模型。当容器内发生了一个线程密集型的请求,就会将这些请求交给 一个worker线程组去处理。这样,这个线程本身就可以去处理另外的请求,达到容器只需使用少量 线程就可处理大量的请求。
可以提升吞吐量和伸缩性,但是本身线程处理的时间不会减少,还是会等到worker线程组执行完成。
可以运用在IO密集型,即磁盘IO密集,网络IO密集的服务场景
比如:微服务网关(Spring Cloud gateWay就使用了此模型)
webflux与springmvc异同点

webflux技术依赖
- Reactive Streams : 反应式编程标准和规范
 
- 具有处理无限数量数据的能力
 - 按序处理数据
 - 异步非阻塞的传递数据
 - 必须实现非阻塞的背压(如果数据源发送数据的数量大于消费端消费数据的数量,那么消费端会发送 消息到数据源,减少发送数量或者直接停止发送)
 
它定义了一系列api规范
- publisher: 数据发布者
 
public interface Publisher<T>{
    public void subscribe(Subscriber<? super T> s);
}
- subscriber: 数据订阅者
 
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
- subscription: 订阅信号
 
public interface Subscription {
    public void request(long n);
    public void cancel();
}
- processor: 处理器(包含了发布者和订阅者的混合体)
 
- Reactor : 基于Reactive Streams的反应式编程框架
 - WebFlux : 以Reactor为基础实现Web领域的反应式编程
 
Reactor框架
Spring公司开发,符合Reactive Streams规范。侧重于server端的响应式框架。由 两个模块组成,reactor-core和reactor-ipc
java原油的异步编程方式
Callbacks: 异步方法采用一个callback作为参数,当结果出来后,调用这个callback. 例如:swings的EventListener
局限性:回调地狱
Futures: 异步方法返回一个Future
局限性:多个Future组合不易,调用Future#get时仍然会阻塞,缺乏对多个值以及进一步的出错 处理
Reactor的publisher
实现:Flux Mono
Flux: 代表一个包含0…N个元素的响应式序列 Mono: 代表一个包含0/1个元素的结果
// 创建及调用方式
void test(){
    Flux<Integer> flux = Flux.just(1,2,3,4,5,6);
    Flux<Integer> arrayFlux = Flux.fromArray(new Integer[]{1,2,3,4,5,6});
    Flux<Integer> streamFlux = Flux.fromStream(Stream.of(1,2,34,5,6));
    Flux<Integer> listFlux = Flux.fromIterable(Arrays.asList(1,2,3));
    Flux<Integer> fluxFlux = Flux.from(flux);
    Mono<Integer> mono = Mono.just(1);
    // 消费 不做任何事
    flux.subscribe();
    // 消费
    flux.subscribe(System.out::println);
    // 加上出错处理
    flux.subscribe(System.out::println,System.err::println);
    // 完成处理
    flux.subscribe(System.out::println,System.err::println,
            ()->{System.out.println("完成处理");});
    // 指定处理数量
    flux.subscribe(System.out::println,System.err::println,
            ()->{System.out.println("完成处理");},
            subscription -> subscription.request(3));
    flux.subscribe(new DemoSubscribe());
}
class DemoSubscribe extends BaseSubscriber<Integer> {
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        System.out.println("subscribe");
        subscription.request(1);
    }
    @Override
    protected void hookOnNext(Integer value) {
        if (value == 3){
            cancel();
        }
    }
}
reactor操作符
需要对数据源做一些处理,就需要用到reactor操作符
- map操作符
 
// 对数据进行某些操作
flux.map(i -> i * 3).subscribe(System.out::println);
arrayFlux.flatMap(i -> flux).subscribe(System.out::println);
- filter操作符
 
// 过滤掉一部分数据
listFlux.filter(i ->i>3).subscribe(System.out::println);
- zip操作符
 
// 将两个flux里面的数据进行相加操作输出
Flux.zip(flux,listFlux).subscribe(zip -> System.out.println(zip.getT1() + zip.getT2()));
reactor和java8 stream区别
- 形似而神不似
 - reactor: push模式,服务端推送数据客户端,对应的异步的反应式程序
 - stream: pull模式,客户端主动向服务端请求数据,对应的是同步的程序
 
reactor线程模型
- Schedulers.immediate(): 当前线程
 - Schedulers.single(): 可重用的单线程
 - Schedulers.elastic(): 弹性线程池
 - Schedulers.parallel(): 固定大小线程池
 - Schedulers.fromExecutorService(): 自定义线程池
 
如何指定线程
- 使用publishOn指定
 
flux.map(i -> {
        System.out.println(Thread.currentThread().getName()+"-map1");
        return i *3;
    }).publishOn(Schedulers.elastic()).map(
            i ->{
                System.out.println(Thread.currentThread().getName() + "-map2");
                return i /3;
            }
    ).subscribe(it -> System.out.println(Thread.currentThread().getName()+"-"+it));
它将上游信号传给下游,同时改变后续的操作符的执行所在线程,直到下一个 publishOn出现在这个链上
- 使用subscribeOn指定
 
flux.map(i -> {
            System.out.println(Thread.currentThread().getName()+"-map1");
            return i *3;
        }).publishOn(Schedulers.elastic()).map(
                i ->{
                    System.out.println(Thread.currentThread().getName() + "-map2");
                    return i /3;
                }
        ).subscribeOn(Schedulers.parallel()).subscribe(it -> System.out.println(Thread.currentThread().getName()+"-"+it));
无输出,主线程执行完成,直接退出
作用于向上的订阅链,无论处于操作链的什么位置,它都会影响到源头的线程执行 环境,但不会影响到后续的publishOn
webflux实践
- 与SpringMVC结合
 
@RestController
public class DemoController {
    @GetMapping("/demo")
    public Mono<String> demo(){
        return Mono.just("demo");
    }
}
注意点:
- 使用springMVC注解
 - 使用ServletReq/Resp 切换成ServerReq/Resp
 - 
    
返回Mono对象
 - webflux
 
// 定义
@Component
public class DemoHandler {
    public Mono<ServerResponse> hello(ServerRequest request){
        return ok().contentType(MediaType.TEXT_PLAIN)
                .body(Mono.just("hello"),String.class);
    }
    public Mono<ServerResponse> world(ServerRequest request){
        return ok().contentType(MediaType.TEXT_PLAIN)
                .body(Mono.just("world"),String.class);
    }
    // time没隔一秒发送数据到客户端
    public Mono<ServerResponse> times(ServerRequest request){
        return ok()
                .contentType(MediaType.TEXT_EVENT_STREAM)
                .body(Flux.interval(Duration.ofSeconds(1))
                           .map(it -> new SimpleDateFormat("HH:mm:ss")
                                   .format(new Date())),String.class);
    }
}
// 路由
@Configuration
public class RouterConfig {
    @Autowired
    private DemoHandler demoHandler;
    @Bean
    public RouterFunction<ServerResponse> demoRouter(){
        return route(GET("/hello"),demoHandler::hello)
                .andRoute(GET("/world"),demoHandler::world)
                .andRoute(GET("/times"),demoHandler::times);
    }
}
- SpringMVC处理流程
 

- WebFlux处理流程
 
- DispatcherHandler准备 1.1 setApplicationContextAware 1.2 initStrategies 1.3 获取容器中HandlerMapping及子接口实现 1.4 获取容器中HandlerAdapter及子接口实现 1.5 获取容器中HandlerResultHandler及子接口实现
 
RouterFunctionMapping实例化 因为它实现了InitializingBean,所以在实例化的时候先调用 afterPropertiesSet。然后调用initRouterFunctions方法,然后调用routerFunctions方法。 获取系统中所有的RouterFunction。通过RouterFunction::andOther将对象合并。 返回SameComposedRouterFunction对象。
- 
    
DispatcherHandler#handle 2.1 构建基于handlerMappings集合的Flux对象 2.2 通过concatMap将其转换成handler对象 2.3 取出第一个handler对象,若为空,则抛错 2.4 调用invokeHandler获得response 2.4.1 遍历handlerAdapters集合 2.4.2 依次调用集合元素supports方法 2.4.3 获得具体实现类调用handle方法 2.4.4 进入具体url对应处理类请求 2.4.5 返回Mono
对象  - 
    
DispatcherHandler#handleResult 3.1 遍历resultHandlers集合 3.2 依次调用集合元素supports方法 3.3 获得具体实现类调用handleResult方法 3.4 将请求结果信息写入ServerWebExchange对象
 
若需引入与MongoDB
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
比在web项目中多一个reactive后缀
使用
@Repository
public interface CityRepository extends ReactiveMongoRepository<City,String>{
}
handler添加
public Mono<ServerResponse> listCity(ServerRequest request){
    return ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(cityRepository.findAll(),City.class);
}
public Mono<ServerResponse> saveCity(ServerRequest request){
    String province = request.pathVariable("province");
    String city = request.pathVariable("city");
    City record = new City();
    record.setProvince(province);
    record.setCity(city);
    Mono<City> mono = Mono.just(record);
    return ok().build(cityRepository.insert(mono).then())
}
总结
- webflux出现的意义
 - webflux与springmvc异同
 - 介绍一下reactive stream规范
 - 介绍一下reactor
 - flux和mono对象的区别,如何创建 一个是0到N,一个是0或者1
 - 简述下reactor操作符&线程池
 - publishOn和subscribeOn区别
 - reactor和java8 stream的区别 java8 stream还是一个阻塞的方式,reactor是异步非阻塞的方式
 - RouterFunctionMapping的作用以及何时被加载 作用是路由url到对应处理的RouterFunction。在DispatcherHandler初始化的时候加载