Prerequisites and benefits of RxJava
With RxJava, we have become acquainted with the basics of reactive programming. Different reactive libraries may have slightly different APIs and somewhat various implementation details, but the concept remains the same—the subscriber subscribes to an observable stream that in turn triggers an asynchronous process of event generation. Between the producer and subscriber, there usually exists some subscription that makes it possible to break up the producer-consumer relationship. Such an approach is very flexible and enables to have control over the amount of produced and consumed events, decreasing the number of CPU cycles, which are usually wasted on creating data, and will never be used.
To prove that reactive programming offers the ability to save resources, let's assume that we need to implement a simple in-memory search engine service. This should return a collection of URLs to documents that contain the desired phrase. Usually, the client application (a web or mobile app) also passes a limit, for example, the maximum amount of useful results. Without reactive programming, we would probably design such a service using the following API:
public interface SearchEngine {
List<URL> search(String query, int limit);
}
As we might note from the interface, our service performs a search operation, gathers all results within the limit, puts them into a List, and returns it to the client. In the preceding scenario, a client of the service receives the whole result set, even if someone picks the first or second result on the page after drawing the result on the UI. In that case, our service did a lot of work, and our client has been waiting for a long time, but the client ignored most of the results. That is undoubtedly a waste of resources.
However, we can do better and process the search result iterating over the result set. So the server will search for the next result items as long as a client continues consuming them. Usually, the server search progression happens not for each row, but rather for some fixed size bucket (let's say 100 items). Such an approach is called cursor and is often used by databases. For a client, the resulting cursor is represented in the form of an iterator. The following code represents our improved service API:
public interface IterableSearchEngine {
Iterable<URL> search(String query, int limit);
}
The only drawback in the case of an iterable is that our client's thread will be blocked when it is actively waiting for a new piece of data. That would be a disaster for the Android UI thread. When the new result arrives, the search service is waiting for the next() call. In other words, a client and the service are playing ping-pong through the Iterable interface. Nevertheless, the mentioned interaction may be acceptable sometimes, but in most cases, it is not efficient enough to build a high-performance application.
In turn, our search engine may return CompletableFuture in order to become an asynchronous service. In that case, our client's thread may do something useful and not bother about the search request, as the service invokes a callback as soon as a result arrives. But here we again receive all or nothing, as CompletableFuture may hold only one value, even if it is a list of results, as shown in the following code:
public interface FutureSearchEngine {
CompletableFuture<List<URL>> search(String query, int limit);
}
With RxJava, we will improve our solution and get both asynchronous processing and the ability to react to each arriving event. Also, our client may unsubscribe() at any moment and reduce the amount of work done by the process of searching a service, as shown in the following code:
public interface RxSearchEngine {
Observable<URL> search(String query);
}
By using that approach, we are increasing the responsiveness of an application a lot. Even though the client has not received all of the results yet, it may process the pieces that have already arrived. As humans, we do not like to wait for results. Instead, we value Time To First Byte or Critical Rendering Path metrics. In all of these cases, reactive programming is not worse than conventional approaches, and usually brings better results.
As we saw earlier, RxJava makes it possible to asynchronously compose data streams in a way that is much more versatile and flexible. Similarly, we may wrap the old-school synchronous code into an asynchronous workflow. To manage the actual execution thread for a slow Callable, we may use the subscriberOn(Scheduler) operator. This operator defines on which Scheduler (reactive counterpart of Java's ExecutorService) the stream processing is started. The thread scheduling is covered in detail in Chapter 4, Project Reactor - the Foundation for Reactive Apps. The following code demonstrates such a use case:
String query = ...;
Observable.fromCallable(() -> doSlowSyncRequest(query))
.subscribeOn(Schedulers.io())
.subscribe(this::processResult);
Sure, with such an approach we cannot rely on the fact that one thread will process the whole request. Our workflow may start in one thread, migrate to a handful of other threads, and finish processing in a completely different, newly created thread. It is essential to highlight that with this approach, it's hazardous to mutate objects, and the only reasonable strategy is immutability. It is not a new concept; it is one of the core principles of functional programming. Once an object is created, it may not change. Such a simple rule prevents a whole class of issues in parallel applications.
Before Java 8 introduced lambdas, it was hard to leverage the full power of reactive programming as well as functional programming. Without lambdas, we had to create a lot of anonymous or inner classes that polluted the application code and created more boilerplate than meaningful lines. At the time of RxJava's inception, despite its slow speed, Netflix extensively used Groovy for development purposes, mainly because of lambda support. This leads us to the conclusion that functions as first-class citizens are required for the successful and pleasant usage of reactive programming. Fortunately, this is not a problem for Java anymore, even on the Android platform, where projects such as Retrolambda (https://github.com/orfjackal/retrolambda) enable lambda support for old Java versions.