Hands-On Reactive Programming in Spring 5
上QQ阅读APP看书,第一时间看更新

Reactivity on the service level

Fortunately, the growing demand for reactive systems initiated the creation of a new Spring Project called Spring Cloud. The Spring Cloud Framework is a foundation of projects that address particular problems and simplifies the construction of distributed systems. Consequently, the Spring Framework ecosystem may be relevant for us to build reactive systems.

To learn more about the essential functionality, components, and features of that project please click on the following link: http://projects.spring.io/spring-cloud/.

We will skip the details of Spring Cloud Framework functionality in this chapter and cover the most important parts that help in the development of the reactive system in Chapter 8, Scaling Up with Cloud Streams. Nonetheless, it should be noticed that such a solution building a robust, reactive microservices system with minimum effort.

However, the overall design is only one element of constructing the whole reactive system. As may be noticed from the excellent Reactive Manifesto:

"Large systems are composed of smaller ones and therefore depend on the Reactive properties of their constituents. This means that Reactive Systems apply design principles so these properties apply at all levels of scale, making them able to be composed".

Therefore, it is important to provide a reactive design and implementation on the component level as well. In that context, the term design principle refers to a relationship between components and, for example, programming techniques that are used to compound elements. The most popular traditional technique for writing code in Java is imperative programming.

To understand whether imperative programming follows reactive system design principles, let's consider the next diagram:

Diagram 1.6. UML Schema of component relationship

Here, we have two components within the web store application. In that case, OrdersService calls ShoppingCardService while processing the user request. Suppose that under the hood ShoppingCardService executes a long-running I/O operation, for example, an HTTP request or database query. To understand the disadvantages of imperative programming let's consider the following example of the most common implementation of the aforementioned interaction between components:

interface ShoppingCardService {                                    // (1)
Output calculate(Input value); //

} //

class OrdersService { // (2)
private final ShoppingCardService scService; //
//
void process() { //
Input input = ...; //
Output output = scService.calculate(input); // (2.1)
... // (2.2)
} //
} //

The aforementioned code is explained as follows:

  1. This is the ShoppingCardService interface declaration. This corresponds to the aforementioned class diagram and has only one calculate method, which accepts one argument and returns a response after its processing.
  2. This is the OrderService declaration. Here, at point (2.1) we synchronously call  ShoppingCardService and receive a result right after its execution. Point (2.2) hides the rest of the code responsible for result processing.
  3. In turn, in that case our services are tightly coupled in time, or simply the execution of OrderService is tightly coupled to the execution of ShoppingCardService. Unfortunately, with such a technique, we cannot proceed with any other actions while ShoppingCardService is in the processing phase.

As we can understand from the preceding code, in Java world, the execution of scService.calculate(input) blocks the Thread on which the processing of the OrdersService logic takes place. Thus, to run a separate independent processing in OrderService we have to allocate an additional Thread. As we will see in this chapter, the allocation of an additional Thread might be wasteful. Consequently, from the reactive system perspective, such system behavior is unacceptable.

Blocking communications directly contradicts the message-driven principle, which explicitly offers us non-blocking communication. See the following for more information on this: https://www.reactivemanifesto.org/#message-driven.

Nonetheless, in Java, that problem may be solved by applying a callback technique for the purpose of  cross-component communication:

interface ShoppingCardService {                                    // (1)
void calculate(Input value, Consumer<Output> c); //
} //

class OrdersService { // (2)
private final ShoppingCardService scService; //
//
void process() { //
Input input = ...; //
scService.calculate(input, output -> { // (2.1)
... // (2.2)
}); //
} //
} //

Each point in the preceding code is explained in the following numbered list:

  1. The preceding code is the ShoppingCardService interface declaration. In that case, the calculate method accepts two parameters and returns a void. It means that from the design perspective, the caller may be immediately released from waiting and the result will be sent to the given Consumer<> callback later.
  2. This is the OrderService declaration. Here, at point (2.1) we asynchronously call  ShoppingCardService and continue processing. In turn, when the ShoppingCardService executes the callback function we will be able to proceed with the actual result processing (2.2).

Now, OrdersService passes the function-callback to react at the end of the operation. This embraces the fact that OrdersService is now decoupled from ShoppingCardService and the first one may be notified via the functional callback where the implementation of the ShoppingCardService#calculate  method, which calls the given function, may either be synchronous or asynchronous:

class SyncShoppingCardService implements ShoppingCardService {     // (1)
public void calculate(Input value, Consumer<Output> c) { //
Output result = new Output(); //
c.accept(result); // (1.1)
} //
} //

class AsyncShoppingCardService implements ShoppingCardService { // (2)
public void calculate(Input value, Consumer<Output> c) { //
new Thread(() -> { // (2.1)
Output result = template.getForObject(...); // (2.2)
... //
c.accept(result); // (2.3)
}).start(); // (2.4)
} //
} //

Each point in the preceding code is explained in the following numbered list:

  1. This point is the SyncShoppingCardService class declaration. This implementation assumes the absence of blocking operations. Since we do not have an I/O execution, the result may be returned immediately by passing it to the callback function (1.1).
  2. This point in the preceding code is the AsyncShoppingCardService class declaration. In the case, when we have blocking I/O as depicted in point (2.2), we may wrap it in the separate Thread (2.1) (2.4). After retrieving the result,  it will be processed and passed to the callback function.

In that example, we have the sync implementation of ShoppingCardService, which keeps synchronous bounds and offers no benefits from the API perspective. In the async case, we achieve asynchronous bounds, and a request will be executed in the separate ThreadOrdersService is decoupled from the execution process and will be notified of the completion by the callback execution.

The advantage of that technique is that components are decoupled in time by the callback function. This means that after calling the scService.calculate method, we will be able to proceed with other operations immediately without waiting for the response in the blocking fashion from ShoppingCardService.

The disadvantage is that callback requires the developer to have a good understanding of multi-threading to avoid the traps of shared data modifications and callback hell.

Actually, the phrase  callback hell is mentioned in relation to JavaScript:  http://callbackhell.com, but it is also applicable to Java as well.

Fortunately, the callback technique is not the only option. Another one is  java.util.concurrent.Futurewhich, to some degree, hides the executional behavior and decouples components as well:

interface ShoppingCardService {                                    // (1)
Future<Output> calculate(Input value); //
} //

class OrdersService { // (2)
private final ShoppingCardService scService; //
//
void
process() { //
Input input = ...; //
Future<Output> future = scService.calculate(input); // (2.1)
... //
Output output = future.get(); // (2.2)
... //
} //
} //

The numbered points are described in the following:

  1. At this point is the ShoppingCardService interface declaration. Here, the calculate method accepts one parameter and returns Future. Future is a class wrapper which allows us to check whether there is an available result or blocking to get it.
  2. This is the OrderService declaration. Here, in point (2.1), we asynchronously call ShoppingCardService and receive the Future instance. In turn, we are able to continue processing while the result is being processed asynchronously. After some execution, which may be done independently from ShoppingCardService#calculation, we get the result. This result may end up waiting in the blocking fashion or it may immediately return the result (2.2).

As we may notice from the previous code, with the Future class, we achieve deferred retrieval of the result. With the support of the Future class, we avoid callback hell and hide multi-threading complexity behind a specific Future implementation. Anyway, to get the result we need, we must potentially block the current Thread and synchronize with the external execution that noticeably decreases scalability.

As an improvement, Java 8 offers CompletionStage and CompletableFuture as a direct implementation for CompletionStage. In turn, those classes provide promise-like APIs and make it possible to build code such as the following:

To learn more about futures and promises, please see the following link: https://en.wikipedia.org/wiki/Futures_and_promises.
interface ShoppingCardService {                                    // (1)
CompletionStage<Output> calculate(Input value); //
} //

class
OrdersService { // (2)
private final ComponentB componentB; //
void process() { //
Input input = ...; //
componentB.calculate(input) // (2.1)
.thenApply(out1 -> { ... }) // (2.2)
.thenCombine(out2 -> { ... }) //
.thenAccept(out3 -> { ... }) //
} //
} //

The aforementioned code is described in the following:

  1. At this point, we have the ShoppingCardService interface declaration. In this case, the calculate method accepts one parameter and returns CompletionStage. CompletionStage is a class wrapper that is similar to Future but allows processing the returned result in the functional declarative fashion.
  2. This is an OrderService declaration. Here, at point (2.1) we asynchronously call  ShoppingCardService and receive the CompletionStage immediately as the result of the execution. The overall behavior of the CompletionStage is similar to Future, but CompletionStage provides a fluent API which makes it possible to write methods such as thenAccept and thenCombine. These define transformational operations on the result and thenAccept, which defines the final consumers, to handle the transformed result.

With the support of CompletionStage, we can write code in the functional and declarative style, which looks clean and processes the result asynchronously. Furthermore, we may omit the awaiting results and provide a function to handle the result when it becomes available. Moreover, all of the previous techniques are valued by Spring teams and have already been implemented within most of the projects within the framework. Even though the CompletionStage gives better possibilities for writing efficient and readable code, unfortunately, there are some missing points there. For example, Spring 4 MVC did not support CompletionStage for a long time and for that purpose, it provided its own ListenableFuture. This happened because Spring 4 aimed to become compatible with older Java versions. Let's take an overview of AsyncRestTemplate usage to get an understanding of how to work with Spring's ListenableFuture. The following code shows how we may use ListenableFuture with AsyncRestTemplate:

AsyncRestTemplate template = new AsyncRestTemplate(); 
SuccessCallback onSuccess = r -> { ... };
FailureCallback onFailure = e -> { ... };
ListenableFuture<?> response = template
.getForEntity(
"http://example.com/api/examples",
ExamplesCollection.class
);
response.addCallback(onSuccess, onFailure);

The preceding code shows the callback style for handling an asynchronous call. Essentially, this method of communication is a dirty hack, and Spring Framework wraps blocking network calls in a separate thread under-the-hood. Furthermore, Spring MVC relies on Servlet API, which obligates all implementations to use the thread-per-request model.

Many things have changed with the release of Spring Framework 5 and the new Reactive WebClient, so with the support of WebClient , all cross-service communication is non-blocking anymore. Also, Servlet 3.0 introduced asynchronous client-server communication, Servlet 3.1 allowed non-blocking writing to I/O, and in general new asynchronous non-blocking features of the Servlet 3 API are well integrated into Spring MVC. However, the only problem was that Spring MVC did not provide an out of the box asynchronous non-blocking client that negates all benefits from improved servlets.

This model is quite non-optimal. To understand why this technique is inefficient, we have to revisit the costs of multi-threading. On the one hand, multi-threading is a complex technique by nature. When we work with multi-threading, we have to think about many things, such as access to shared memory from the different threads, synchronization, error handling, and so on. In turn, the design of multi-threading in Java supposes that a few threads may share a single CPU to run their tasks simultaneously. The fact that CPU time will be shared between several threads introduces the notion of context switching. This means that to resume a thread later, it is required to save and load registers, memory maps, and other related elements which in general are computationally-intensive operations. Consequently, its application with a high number of active threads, and few CPUs, will be inefficient.

To learn more about the cost of context switching, please visit the following link: https://en.wikipedia.org/wiki/Context_switch#Cost.

In turn, a typical Java thread has its overhead in memory consumption. A typical stack size for a thread on a 64-bit Java VM is 1,024 KB. On the one hand, an attempt to handle ~6,4000 simultaneous requests in a thread per connection model may result in about 64 GB of used memory. This might be costly from the business perspective or critical from the application standpoint. On the other hand, by switching to traditional thread pools with a limited size and a pre-configured queue for requests, the client waits too long for a response, which is less reliable, increases the average response timeout, and finally may cause unresponsiveness of the application.

For that purpose, the Reactive Manifesto recommends using a non-blocking operation, and this is an omission in the Spring ecosystem. On the other hand, there is no good integration with reactive servers such as Netty, which solves the problem of context switching.

To get source information about the average amount of connections, see the following link: https://stackoverflow.com/questions/2332741/what-is-the-theoretical-maximum-number-of-open-tcp-connections-that-a-modern-lin/2332756#2332756.

The term  thread  refers to allocated memory for the thread object and allocated memory for the thread stack. See the next link for more information:
http://xmlandmore.blogspot.com/2014/09/jdk-8-thread-stack-size-tuning.html?m=1.

It is important to note that asynchronous processing is not limited to a plain request-response pattern, and sometimes we have to deal with handling infinitive streams of data, processing it in the manner of an aligned transformation flow with backpressure support:

Diagram 1.7. Reactive pipeline example

One of the ways for handling such cases is through reactive programming, which embraces the techniques of asynchronous event processing through chaining transformational stages. Consequently, reactive programming is a good technique which fits the design requirements for a reactive system. We will cover the value of applying reactive programming for building a reactive system in the next chapters.

Unfortunately, the reactive programming technique was not well integrated inside Spring Framework. That put another limitation on building modern applications and decreased the competitiveness of the framework. As a consequence, all the mentioned gaps in the growing hype around reactive systems and reactive programming simply increased the need for dramatic improvements within the framework. Finally, that drastically stimulated the improvement of Spring Framework by adding the support for Reactivity on all levels and providing developers with a powerful tool for reactive system development. Its pivotal developers decided to implement new modules that reveal the whole power of Spring Framework as a reactive system foundation.