响应式编程
Java社区从1.8开始推出了Reactive Streams API,并掀起了一阵Reactive Programming范式热潮。详情可以参见Reactive Manifesto. Reactive Streams API主要包括以下四种接口:
Publiser
1 |
|
Subscriber
1 |
|
Subscription
1 |
|
Processor
1 |
|
Reactive Streams API的实现库有很多,例如:RxJava,Reactor,Akka Streams等,Spring 5中基于响应式编程的web框架–WebFlux就是采用的Reactor实现。
webFlux-WebClient
下面就以Spring 5中WebClient为例看一下响应式编程是如何实现的。
1 |
|
1 |
|
1 |
|
WebClient的默认实现DefaultWebClient通过exchange生成一个MonoDefer实例,实现了Mono接口(Spring中对Publisher接口的封装)。 传入参数为Supplier函数,用于产出数据,此处是进行http请求并获取响应,但此时还并未开始真正的请求。 通过调用Mono的block函数,生成了一个BlockingMonoSubscriber实例,实现了Subscriber接口,并利用MonoDefer重写的subscribe函数注册到了Publisher中。 BlockingMonoSubscriber同时还继承自CountDownLatch,计数为1,调用blockingGet函数后线程进入wait状态,等待计数清零后被唤醒。
1 |
|
在MonoDefer.subscribe中调用了supplier.get(),通过前面传入的Supplier函数开始调用链: DefaultExchangeFunction.exchange->ReactorClientHttpConnector.connect->HttpClientTcpConfig.request(HttpClientTcpConfig通过修饰者模式最终封装了HttpClientConnect, 内部组合了实现TcpClient接口的HttpTcpClient,最后返回HttpClientFinalizer)->HttpClientFinalizer.uri->HttpClientFinalizer.send(一路通过创建者和修饰着模式, 构建TcpClientBootstrap,最终修饰HttpTcpClient)->HttpClientFinalizer.responseConnection->TcpClientBootstrap.connect->TcpClientOperator.connect(Bootstrap b)->… (内部嵌套调用被修饰类的connect方法)->HttpTcpClient.connect(内部包含TcpClientConnect,返回实现了Mono接口的MonoHttpConnect)->MonoHttpConnect.flatMapMany(返回MonoFlatMapMany)->MonoFlatMapMany.next(返回MonoNext) ->MonoNext.doOnRequest(返回MonoPeek修饰MonoNext)->MonoPeek.doOnCancel(返回MonoPeek修饰MonoPeek)->MonoPeek.map(返回MonoMap修饰MonoPeek)->MonoMap.checkpoint (返回MonoOnAssembly修饰MonoMap)->onoOnAssembly.switchIfEmpty(返回MonoSwitchIfEmpty修饰MonoOnAssembly) 最后调用MonoSwitchIfEmpty的subscribe注册BlockingMonoSubscriber,会先进入其父类的subscribe函数,此函数会从外向内依次遍历修饰的Mono,并生成相应的Subscriber注册到内层。
1 |
|
第一次会先调用MonoSwitchIfEmpty.subscribeOrReturn,生成实现了Subscription接口的FluxSwitchIfEmpty.SwitchIfEmptySubscriber, 传入BlockingMonoSubscriber的onSubscribe函数。。
1 |
|
最后调用MonoHttpConnect的subscribe函数,触发Bootstrap.doResolveAndConnect,开始netty创建连接的过程:
1 |
|
1.1 AbstractBootstrap.initAndRegister(final Channel channel = channelFactory().newChannel();)创建NioSocketChannel
1 |
|
1.2 config().group().register(channel)->SingleThreadEventLoop.register->AbstractUnsafe.register0->AbstractNioChannel.doRegister-> selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);将NioSocketChannel注册到对应SingleThreadEventLoop的selector上。 1.3 Bootstrap.doConnet->NioSocketChannel.doConnect,
1 |
|
此处就向服务端发起了connect请求,准备三次握手。由于是非阻塞模式,所以该方法会立即返回。如果建立连接成功,则返回true,否则返回false,后续需要使用select来检测连接是否已建立成功。 如果返回false,此种情况就需要将ops设置为SelectionKey.OP_CONNECT,等待connect的select事件通知。 1.4 NioEventLoop中sellector接收到connect时间后调用AbstractNioUnsafe.finishConnect->AbstractNioUnsafe.fulfillConnectPromise->pipeline().fireChannelActive(), 触发ChannelActive事件,该事件是一个inbound事件,所以Inbound的处理器可以通过实现channelActive方法来进行相应的操作,netty的自带例子中,EchoClientHandler实现该方法来开始向服务端写数据。
1 |
|
NioEventLoop的selector同时还会监听writeable事件来表示可以继续向网卡发送数据和readable事件,来读取服务器返回的响应。
应用场景
- 可以节省线程资源,即使被调用方响应缓慢,也不会占用请求线程。请求线程在发送请求后立即返回,等到有响应时再启动一个新的线程来返回结果。
- 可以并行发送多个http请求,减少总的请求-响应时间。