Hands-On Reactive Programming in Spring 5
上QQ阅读APP看书,第一时间看更新

Advanced - async and parallel in Reactive Streams

In the previous sections, we discussed the conceptual behaviors of Reactive Streams. However, there was no mention of asynchronous and non-blocking behaviors of reactive pipes. So, let's dig into the Reactive Streams standard and analyze those behaviors.

On one hand, the Reactive Streams API states, in rules 2.2 and 3.4, that the processing of all signals produced by the Publisher and consumed by the Subscriber should be non-blocking and non-obstructing. Consequently, we may be sure that we may efficiently utilize one node or one core of the processor, depending on the execution's environment

On the other hand, the efficient utilization of all processors or cores requires parallelization. The usual understanding of the parallelization notion within the Reactive Streams specification may be interpreted as the parallel invocation of the Subscriber#onNext method. Unfortunately, the specification stated in rule 1.3 that the invocation of the on*** methods must be signaled in a thread-safe manner and—if performed by multiple threads—use external synchronization. This assumes a serialized, or simply a sequential, invocation of all on*** methods. In turn, this means that we cannot create something like ParallelPublisher and perform a parallel processing of elements in the stream.

Consequently, the question is: how do we utilize the resources efficiently? To find the answer, we have to analyze the usual stream processing pipe:

Diagram 3.10. Example of processing flow with some business logic between the Source and Destination

As might be noted, the usual processing pipe—along with a data source and final destination—includes a few processing or transformational stages. In turn, each processing stage may take significant processing time and stall other executions.

In such a case, one of the solutions is asynchronous messages passing between stages. For in-memory stream processing, this means that one part of the execution is bound to one Thread and another part to another Thread. For example, final element consumption may be a CPU-intensive task, which will be rationally processed on the separate Thread:

Diagram 3.11. The example of the asynchronous boundary between Source with Processing and Destination

In general, by splitting processing between the two independent Threads, we put the asynchronous boundary between stages. In turn, by doing so, we parallelize the overall processing of elements since both Threads may work independently from each other. To achieve parallelization, we have to apply a data structure, such as Queue, to properly decouple the processing. Hence, processing within Thread A independently supplies the items to, and the Subscriber within Thread B independently consumes items from, the same Queue.

Splitting the processing between threads leads to the additional overhead in a data structure. Of course, owing to the Reactive Streams specification, such a data structure is always bound. In turn, the number of items in the data structure is usually equal to the size of the batch that a Subscriber requests from its Publisher, and this  depends on the general capacity of the system.

Along with that, the primary question that is addressed to API implementors and developers is to which async boundary should the flow processing part be attached? At least three simple choices may arise here. The first case is when a processing flow is attached to the Source resource (Diagram 3.11), and all of the operations occur within the same boundaries as the Source. In that case, all data is processed synchronously one by one, so one item is transformed through all processing stages before it is sent to the processing on the other Thread. The second and opposite configuration of asynchronous boundaries to the first case is when the processing is attached to the Destination, or the Consumer's Thread, and may be used in cases when the elements' production is a CPU-intensive task. 

The third case takes place when the production and consumption is a CPU-intensive task. Hence, the most efficient way of running the intermediate transformation is to run it on separate Thread objects:

Diagram 3.12. The example of asynchronous boundaries between each component of the pipeline

As we can see in the preceding diagram, each processing stage may be bound to a separate Thread. In general, there are a bunch of ways to configure the processing of the data flow. Each of the cases is relevant in its best-fit conditions. For example, the first example is valid when the Source resource is less loaded than the Destination one. Consequently, the transformation operation is profitable to put within Source boundaries and vice versa, when the Destination consumes fewer resources than the Source, it is logical to process all data in the Destination boundary. Moreover, sometimes transformation may be the highest resource consumable operation. In that case, it is better to separate transformations from the Source and the Destination.

Nevertheless, it is crucial to remember that splitting the processing between different Threads is not free and should be balanced between rational resource consumption to achieve boundaries (Thread and additional data structure) and efficient element processing. In turn, achieving such a balance is another challenge, and it is hard to overcome the implementation and management thereof without the library's useful API.

Fortunately, such an API is offered by reactive libraries such as RxJava and Project Reactor. We will not get into the details of the proposed features now, but are going to cover them intensively in Chapter 4, Project Reactor - the Foundation for Reactive Apps.