Hands-On Reactive Programming in Spring 5
上QQ阅读APP看书,第一时间看更新

The basics of the Reactive Streams spec

The Reactive Streams specification defines four primary interfaces: Publisher, Subscriber, Subscription, and Processor. Since that initiative grew independently from any organization, it became available as a separate JAR file where all interfaces live within the org.reactivestreams package.

In general, the specified interfaces are similar to what we had earlier (for example, in RxJava 1.x). In a way, these reflect the well-known classes from RxJava. The first two of those interfaces are similar to Observable-Observer, which resemble the classic Publisher-Subscriber model. Consequently, the first two were named Publisher and Subscriber. To check whether these two interfaces are similar to Observable and Observer, let’s consider the declaration of those:

package org.reactivestreams;

public interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}

The preceding code depicts the internals of the Publisher interface. As might be noticed, there is only one method that makes it possible to register the Subscriber. In comparison with Observable, which was designed for providing a useful DSL, Publisher stands for a standardized entry point for a straightforward Publisher and Subscriber connection. As opposed to Publisher, the Subscriber side is a bit more of a verbose API which is almost identical to what we have in the Observer interface from RxJava:

package org.reactivestreams;

public interface
Subscriber<T>
{
void onSubscribe(Subscription s);
void onNext(T t);
void onError(Throwable t);
void onComplete();
}

As we may have noticed, along with three methods that are identical to methods in the RxJava Observer, the specification provides us with a new additional method called onSubscribe.

The onSubscribe method is a conceptually new API method that provides us with a standardized way of notifying the Subscriber about a successful subscription. In turn, the incoming parameter of that method introduces us to a new contract called Subscription. To understand the idea, let's take a closer look at the interface:

package org.reactivestreams;

public interface Subscription {
void request(long n);
void cancel();
}

As we may have noticed, Subscription provides the fundamentals in order to control the elements' production. Similar to RxJava 1.x's Subscription#unsubscribe(), here we had the cancel() method, allowing us to unsubscribe from a stream or even cancel the publishing completely. However, the most significant improvement that comes along with the cancellation feature is in the new request method. The Reactive Stream specification introduced the request method to expand the ability of interaction between the Publisher and Subscriber. Now, to notify the Publisher of how much data should be pushed, the Subscriber should signal the size of the demand via the request method,  and may be sure that the number of incoming elements does not exceed the limit. Let's take a look at the following marble diagram to understand the underlying mechanism:

Diagram 3.7. Backpressure mechanism

As may be noticed from the preceding diagram, the Publisher now guarantees that the new portion of elements are only sent if the Subscriber asked for them. The overall implementation of the Publisher is up to the Publisher, which may vary from purely blocking waiting, to the sophisticated mechanism of generating data only on the Subscriber's requests. However, we now don't have to pay the cost of additional queues since we have the mentioned guarantees.

Moreover, as opposed to a pure push model, the specification provides us with the hybrid push-pull model, which allows proper control of the backpressure.

To understand the power of the hybrid model, let's revisit our previous example of streaming from the database and see whether such a technique is as efficient as before:

public Publisher<Item> list(int count) {                           // (1)

Publisher<Item> source = dbClient.getStreamOfItems(); // (2)
TakeFilterOperator<Item> takeFilter = new TakeFilterOperator<>( // (2.1)
source, //
count, //
item -> isValid(item) //
); //

return takeFilter; // (3)
} //

The key is as follows:

  1. This is the list method declaration. Here we follow the Reactive Streams spec and return the Publisher<> interface as a first-class citizen for communication.
  2. This is the AsyncDatabaseClient#getStreamOfItems method execution. Here we use an updated method, which returns Publisher<>. At point (2.1), we instantiate a custom implementation of the Take and Filter operators which accept the number of elements that should be taken. In addition, we pass a custom Predicate implementation, which makes it possible to validate the incoming items in the stream.
  3. At that point, we return the previously created TakeFilterOperator instance. Remember, even though the operator has a different type, it still extends the Publisher interface.

In turn, it is essential to get a clear understanding of the internals of our custom TakeFilterOperator. The following code expands the internals of that operator:

public class TakeFilterOperator<T> implements Publisher<T> {       // (1)
... //

public void subscribe(Subscriber s) { // (2)
source.subscribe(new TakeFilterInner<>(s, take, predicate)); //
} //

static final class TakeFilterInner<T> implements Subscriber<T>, // (3)
Subscription { //
final Subscriber<T> actual; //
final int take; //
final Predicate<T> predicate; //
final Queue<T> queue; //
Subscription current; //
int remaining; //
int filtered; //
volatile long requested; //
... //

TakeFilterInner( // (4)
Subscriber<T> actual, //
int take, //
Predicate<T> predicate //
) { ... } //

public void onSubscribe(Subscription current) { // (5)
... //
current.request(take); // (5.1)
... //
} //

public void onNext(T element) { // (6)
... //
long r = requested; //
Subscriber<T> a = actual; //
Subscription s = current; //

if (remaining > 0) { // (7)
boolean isValid = predicate.test(element); //
boolean isEmpty = queue.isEmpty(); //
if (isValid && r > 0 && isEmpty) { //
a.onNext(element); // (7.1)
remaining--; //
... //
} //
else if (isValid && (r == 0 || !isEmpty)) { //
queue.offer(element); // (7.2)
remaining--; //
... //
} //
else if (!isValid) { //
filtered++; // (7.3)
} //
} //
else { // (7.4)
s.cancel(); //
onComplete(); //
} //

if (filtered > 0 && remaining / filtered < 2) { // (8)
s.request(take); //
filtered = 0; //
} //
}
... // (9)
}
}

The key points of the preceding code are explained in the following list:

  1. This is the TakeFilterOperator class declaration. This class extends Publisher<>. Additionally, behind  ... is hidden the constructor of the class and related fields.
  2. This is the Subscriber#subscribe method implementation. By considering the implementation, we may conclude that to provide additional logic to the stream, we have to wrap the actual Subscriber into an adapter class that extends the same interface.
  3. This is the TakeFilterOperator.TakeFilterInner class declaration. This class implements the Subscriber interface and plays the most important role since it is passed as the actual Subscriber to the main source. Once the element is received in onNext, it is filtered and transferred to the downstream Subscriber. In turn, along with the Subscriber interface, the TakeFilterInner class implements the Subscription interface, making it possible to get transferred to the downstream Subscriber and therefore take control of all downstream demands. Note that here, Queue is the instance of ArrayBlockingQueue which is equal in size to take. The technique of creating an inner class that extends the Subscriber and Subscription interfaces is the classic way of implementing the intermediate transformation stage.
  4. This is the constructor declaration. As might be noticed here, along with the take and predicate parameters, we have the actual subscriber instance that has been subscribed to TakeFilterOperator by calling the subscribe() method.
  5. This is the Subscriber#onSubscribe method implementation. The most interesting element here is found at point (5.1). Here we have the execution of the first Subscription#request to the remote database, which usually happens during the first onSubscribe method invocation.
  6. This is the Subscriber#onNext invocation, which has a list of useful parameters required for the element processing declaration.
  1. This is the processing flow of the element declaration. Here, we have four key points in that processing. Once the remaining number of elements that should be taken is higher than zero, the actual Subscriber has requested the data, the element is valid, and there are no elements in the queue, then we may send that element directly to the downstream (7.1). If the demand has not been shown yet, or there is something in the queue, we have to queue that element (to preserve order) and deliver it later (7.2). In the case in which an element is not valid, we have to increase the number of filtered elements (7.3). Finally, if the remaining number of elements is zero, then we have to cancel(7.4) the Subscription and complete the stream.
  2. This is the mechanism of an additional data requesting a declaration. Here, if the number of filtered elements reaches a limit, we request an additional portion of data from the database without blocking the whole process.
  3. This is the rest of the Subscriber and Subscriptions method's implementation.

In general, when the connection with the database is wired and the TakeFilterOperator instance has received the Subscription, the first request with the specified number of elements is sent to the database. Right after that, the database starts generating the specified amount of elements and pushing them as they come. In turn, the logic of the TakeFilterOperator specifies the case in which the additional portion of data should be requested. Once that happens, a new non-blocking request for the next portion of data is sent from the service to the database. It is important to note here that the Reactive Streams specification directly specifies that the calling of the Subscription#request should be a non-obstructive execution, which means that blocking operations or any operations that stall the caller′s thread of execution within that method are not recommended.

To get more information about the mentioned behavior, please see the following link:  https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/README.md#3.4.

Finally, the following diagram depicts the overall interaction between the service and the database: 

Diagram 3.8. The hybrid Push-Pull processing flow

As might be noticed from the preceding diagram, the first element from the database might arrive a bit later because of the Reactive Streams specification's contract for interaction between the Publisher and Subscriber. Requesting a new portion of data does not require the interruption or blocking of ongoing elements handling. Consequently, the entire processing time is almost unaffected. 

On the other hand, there are some cases in which the pure push model is preferable. Fortunately, Reactive Streams is flexible enough. Along with a dynamic push-pull model, the specification provides separate push and pull models as well. According to the documentation, to achieve a pure push model we may consider requesting a demand equal to 263-1 (java.lang.Long.MAX_VALUE).

This number may be considered as unbounded because, with the current or foreseen hardware,  it is not feasible  to fulfill  a demand of 2 63 -1  within a reasonable amount of time  (1 element per nanosecond would take 292 years). Consequently,  it is permitted for a Publisher to stop tracking demand beyond this point:  https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/README.md#3.17 .

In contrast, to switch to the pure pull model, we may request one new element each time the Subscriber#onNext has been invoked.