如何理解 Spring Webflux 的「非阻塞」?

假设一个 API 执行要 10 秒——那一般的情况是执行完 10 秒才返回给前端,但如果后台没跑完就给前端返回结果,那这个结果也是空的,返回有什么意义…
关注者
65
被浏览
86,570

13 个回答

对前端是一样的,但是对后端的性能利用就不一样了,对比thread的sleep方法和webflux的delay方法,就可以知道它的非阻塞原理是什么,又是好在哪里。

反应式编程一开始是从前端和客户端开始兴起,现在大有蔓延到后端的趋势,Spring5推出的webflux就是反应式编程的产物。

webflux对比于springMVC,性能高出很多,网上已经有很多的测评,不再在过多说明。



左图同步,右图异步

上图看出对比于同步,异步所用的线程是比较少的,不过有个前提是,程序逻辑中有阻塞(如io阻塞等),且这种阻塞是可以异步化的。

为了满足这个前提,反应式编程框架就必须将这些阻塞变成异步化,如新出的WebClient工具就是将http请求io异步化。

delay方法就是用来代替sleep方法的,下面来讲解一下delay方法是怎么将延时异步化的。

源码解读

  • 通过查看Mono<Long> delay(Duration duration)方法源码,它会构造一个MonoDelay类,并通过传入全局公用的调度器Schedulers.parallel()来调度里面的异步任务。
public static Mono<Long> delay(Duration duration) {
        return delay(duration, Schedulers.parallel());
    }

    public static Mono<Long> delay(Duration duration, Scheduler timer) {
        return onAssembly(new MonoDelay(duration.toMillis(), TimeUnit.MILLISECONDS, timer));
    }
  • 查看MonoDelay类的订阅方法subscribe
public void subscribe(CoreSubscriber<? super Long> actual) {
    MonoDelayRunnable r = new MonoDelayRunnable(actual);

    actual.onSubscribe(r);

    try {
    //重点在于下面的 timedScheduler.schedule(r, delay, unit)
    //通过timedScheduler来调度延时任务,而不是当前线程阻塞等待
        r.setCancel(timedScheduler.schedule(r, delay, unit));
    }
    catch (RejectedExecutionException ree) {
        if(r.cancel != OperatorDisposables.DISPOSED) {
            actual.onError(Operators.onRejectedExecution(ree, r, null, null,
                    actual.currentContext()));
        }
    }
}
  • 查看ParallelScheduler的delay方法:
public Disposable schedule(Runnable task, long delay, TimeUnit unit) {
  //pick方法会获取一个ScheduledExecutorService线程执行器给到Schedulers使用
 return Schedulers.directSchedule(pick(), task, delay, unit);
}
  • 查看directSchedule方法:
static Disposable directSchedule(ScheduledExecutorService exec,
      Runnable task,
      long delay,
      TimeUnit unit) {
  //包装任务
   SchedulerTask sr = new SchedulerTask(task);
   Future<?> f;
   if (delay <= 0L) {
      f = exec.submit((Callable<?>) sr);
   }
   else {
     //延时调度
     //ScheduledExecutorService是java自带的并发调度接口,
     //通过一条线程轮询延时队列来避免所有线程阻塞
      f = exec.schedule((Callable<?>) sr, delay, unit);
   }
  //设置结果
   sr.setFuture(f);

   return sr;
}

自此就可以知道为什么delay方法没有阻塞线程,因为它的延时处理都交给了ScheduledExecutorService执行器处理,调用delay方法的主线程就直接返回了,等到延时时间过后,ScheduledExecutorService就会从线程池就获取一个线程来处理延时后的任务逻辑。整个流程就类似于上面图片中的右图。

通过反应式编程范式,将所有阻塞都修改为类似于delay之于sleep的形式,就能大幅度提升服务性能了。

看到这里了,点个赞再走呗。

其他的知识原理解读,可以查看

需要的时间是不变的(只有一个任务的前提下),但是reactive方式可以把任务「打散」,从而把io请求单独分派到io线程,不会阻塞当前的请求处理线程,从而极大程度地提高请求线程的吞吐量。