Reactive and NIO

响应式编程

Java社区从1.8开始推出了Reactive Streams API,并掀起了一阵Reactive Programming范式热潮。详情可以参见Reactive Manifesto. Reactive Streams API主要包括以下四种接口:

Publiser

1
2
3
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Subscriber

1
2
3
4
5
6
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

Subscription

1
2
3
4
public interface Subscription {
    public void request(long n);
    public void cancel();
}

Processor

1
2
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Reactive Streams API的实现库有很多,例如:RxJava,Reactor,Akka Streams等,Spring 5中基于响应式编程的web框架–WebFlux就是采用的Reactor实现。

webFlux-WebClient

下面就以Spring 5中WebClient为例看一下响应式编程是如何实现的。

1
2
3
4
5
        Mono<ClientResponse> mono = webClient
                .get()
                .uri("/posts/1"
                .exchange();
        System.out.println("Block Result:"+mono.block());
1
2
3
4
5
6
7
8
		public Mono<ClientResponse> exchange() {
			ClientRequest request = (this.inserter != null ?
					initRequestBuilder().body(this.inserter).build() :
					initRequestBuilder().build());
			return Mono.defer(() -> exchangeFunction.exchange(request)
					.checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]")
					.switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR));
		}
1
2
3
4
5
	public T block() {
		BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>();
		subscribe((Subscriber<T>) subscriber);
		return subscriber.blockingGet();
	}

WebClient的默认实现DefaultWebClient通过exchange生成一个MonoDefer实例,实现了Mono接口(Spring中对Publisher接口的封装)。 传入参数为Supplier函数,用于产出数据,此处是进行http请求并获取响应,但此时还并未开始真正的请求。 通过调用Mono的block函数,生成了一个BlockingMonoSubscriber实例,实现了Subscriber接口,并利用MonoDefer重写的subscribe函数注册到了Publisher中。 BlockingMonoSubscriber同时还继承自CountDownLatch,计数为1,调用blockingGet函数后线程进入wait状态,等待计数清零后被唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void subscribe(CoreSubscriber<? super T> actual) {
		Mono<? extends T> p;

		try {
			p = Objects.requireNonNull(supplier.get(),
					"The Mono returned by the supplier is null");
		}
		catch (Throwable e) {
			Operators.error(actual, Operators.onOperatorError(e, actual.currentContext()));
			return;
		}

		p.subscribe(actual);
	}

在MonoDefer.subscribe中调用了supplier.get(),通过前面传入的Supplier函数开始调用链: DefaultExchangeFunction.exchange->ReactorClientHttpConnector.connect->HttpClientTcpConfig.request(HttpClientTcpConfig通过修饰者模式最终封装了HttpClientConnect, 内部组合了实现TcpClient接口的HttpTcpClient,最后返回HttpClientFinalizer)->HttpClientFinalizer.uri->HttpClientFinalizer.send(一路通过创建者和修饰着模式, 构建TcpClientBootstrap,最终修饰HttpTcpClient)->HttpClientFinalizer.responseConnection->TcpClientBootstrap.connect->TcpClientOperator.connect(Bootstrap b)->… (内部嵌套调用被修饰类的connect方法)->HttpTcpClient.connect(内部包含TcpClientConnect,返回实现了Mono接口的MonoHttpConnect)->MonoHttpConnect.flatMapMany(返回MonoFlatMapMany)->MonoFlatMapMany.next(返回MonoNext) ->MonoNext.doOnRequest(返回MonoPeek修饰MonoNext)->MonoPeek.doOnCancel(返回MonoPeek修饰MonoPeek)->MonoPeek.map(返回MonoMap修饰MonoPeek)->MonoMap.checkpoint (返回MonoOnAssembly修饰MonoMap)->onoOnAssembly.switchIfEmpty(返回MonoSwitchIfEmpty修饰MonoOnAssembly) 最后调用MonoSwitchIfEmpty的subscribe注册BlockingMonoSubscriber,会先进入其父类的subscribe函数,此函数会从外向内依次遍历修饰的Mono,并生成相应的Subscriber注册到内层。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
	public final void subscribe(CoreSubscriber<? super O> subscriber) {
		OptimizableOperator operator = this;
		while (true) {
			subscriber = operator.subscribeOrReturn(subscriber);
			if (subscriber == null) {
				// null means "I will subscribe myself", returning...
				return;
			}
			OptimizableOperator newSource = operator.nextOptimizableSource();
			if (newSource == null) {
				operator.source().subscribe(subscriber);
				return;
			}
			operator = newSource;
		}
	}

第一次会先调用MonoSwitchIfEmpty.subscribeOrReturn,生成实现了Subscription接口的FluxSwitchIfEmpty.SwitchIfEmptySubscriber, 传入BlockingMonoSubscriber的onSubscribe函数。。

1
2
3
4
5
6
7
8
	public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
		FluxSwitchIfEmpty.SwitchIfEmptySubscriber<T> parent = new
				FluxSwitchIfEmpty.SwitchIfEmptySubscriber<>(actual, other);

		actual.onSubscribe(parent);

		return parent;
	}

最后调用MonoHttpConnect的subscribe函数,触发Bootstrap.doResolveAndConnect,开始netty创建连接的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();

        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    // Directly obtain the cause and do a null check so we only need one volatile read in case of a
                    // failure.
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

1.1 AbstractBootstrap.initAndRegister(final Channel channel = channelFactory().newChannel();)创建NioSocketChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.

        return regFuture;
    }

1.2 config().group().register(channel)->SingleThreadEventLoop.register->AbstractUnsafe.register0->AbstractNioChannel.doRegister-> selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);将NioSocketChannel注册到对应SingleThreadEventLoop的selector上。 1.3 Bootstrap.doConnet->NioSocketChannel.doConnect,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            doBind0(localAddress);
        }

        boolean success = false;
        try {
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            if (!connected) {
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }

此处就向服务端发起了connect请求,准备三次握手。由于是非阻塞模式,所以该方法会立即返回。如果建立连接成功,则返回true,否则返回false,后续需要使用select来检测连接是否已建立成功。 如果返回false,此种情况就需要将ops设置为SelectionKey.OP_CONNECT,等待connect的select事件通知。 1.4 NioEventLoop中sellector接收到connect时间后调用AbstractNioUnsafe.finishConnect->AbstractNioUnsafe.fulfillConnectPromise->pipeline().fireChannelActive(), 触发ChannelActive事件,该事件是一个inbound事件,所以Inbound的处理器可以通过实现channelActive方法来进行相应的操作,netty的自带例子中,EchoClientHandler实现该方法来开始向服务端写数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop == this) {
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
            }
            return;
        }

        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

NioEventLoop的selector同时还会监听writeable事件来表示可以继续向网卡发送数据和readable事件,来读取服务器返回的响应。

应用场景

  1. 可以节省线程资源,即使被调用方响应缓慢,也不会占用请求线程。请求线程在发送请求后立即返回,等到有响应时再启动一个新的线程来返回结果。
  2. 可以并行发送多个http请求,减少总的请求-响应时间。

What Are Reactive Streams in Java?

Mastering Reactive Streams

netty4源码分析-connect