Hands-On Reactive Programming in Spring 5
上QQ阅读APP看书,第一时间看更新

Observer pattern usage example

Now, let's write a simple JUnit test that uses our classes and demonstrates how all of them play together. Also, in the following example, we are using the Mockito library (http://site.mockito.orgin order to verify expectations with the support of the Spies Pattern:

@Test
public void observersHandleEventsFromSubject() {
// given
Subject<String> subject = new ConcreteSubject();
Observer<String> observerA = Mockito.spy(new ConcreteObserverA());
Observer<String> observerB = Mockito.spy(new ConcreteObserverB());

// when
subject.notifyObservers("No listeners");

subject.registerObserver(observerA);
subject.notifyObservers("Message for A");

subject.registerObserver(observerB);
subject.notifyObservers("Message for A & B");

subject.unregisterObserver(observerA);
subject.notifyObservers("Message for B");

subject.unregisterObserver(observerB);
subject.notifyObservers("No listeners");

// then
Mockito.verify(observerA, times(1)).observe("Message for A");
Mockito.verify(observerA, times(1)).observe("Message for A & B");
Mockito.verifyNoMoreInteractions(observerA);

Mockito.verify(observerB, times(1)).observe("Message for A & B");
Mockito.verify(observerB, times(1)).observe("Message for B");
Mockito.verifyNoMoreInteractions(observerB);
}

By running the preceding test, the following output is produced. It shows which messages have been received by which Observer:

Observer A: Message for A
Observer A: Message for A & B
Observer B: Message for A & B
Observer B: Message for B

In the case when we do not need to cancel subscriptions, we may leverage Java 8 features and replace the Observer implementation classes with lambdas. Let's write the corresponding test:

@Test
public void subjectLeveragesLambdas() {
Subject<String> subject = new ConcreteSubject();

subject.registerObserver(e -> System.out.println("A: " + e));
subject.registerObserver(e -> System.out.println("B: " + e));
subject.notifyObservers("This message will receive A & B");
...
}

It is important to mention that the current Subject implementation is based on the CopyOnWriteArraySet, which is not the most efficient one. However, that implementation is thread-safe at least, which means that we are allowed to use our Subject in the multithreaded environment. For example, it may be useful when events are distributed through many independent components, that usually work from multiple threads (it is especially valid nowadays, when most applications are not single threaded). Throughout the course of this book, we will be covering thread safety and other multithreaded concerns.

Do keep in mind that when we have a lot of observers that handle events with some noticeable latency—as introduced by downstream processing—we may parallel message propagation using additional threads or Thread pool. This approach may lead to the next implementation of the notifyObservers method:

private final ExecutorService executorService = 
Executors.newCachedThreadPool();

public void notifyObservers(String event) {
observers.forEach(observer ->
executorService.submit(
() -> observer.observe(event)
)
);
}

However, with such improvements, we are stepping on the slippery road of homegrown solutions that are usually not the most efficient, and that most likely hide bugs. For example, we may forget to limit the thread pool size, which eventually leads to an OutOfMemoryError. A naively configured ExecutorService may create a growing number of threads in situations where clients ask to schedule tasks more frequently than the executors can finish their current ones. And because each Thread consumes around 1 MB in Java, a typical JVM application has a chance to exhaust all available memory by creating a few thousand threads.

For a more detailed description of experiments with the JVM thread capacity, please refer to Peter Lawrey's article at http://vanillajava.blogspot.com/2011/07/java-what-is-limit-to-number-of-threads.html . It is quite old, but not a lot has changed in the JVM memory model since then. To get information about the default stack size of our Java setup, run the following command:

java -XX:+PrintFlagsFinal -version | grep ThreadStackSize

To prevent excessive resource usage, we may restrict the thread pool size and violate the liveness property of the application. Situations such as this arise when all available threads attempt to push some events to the same sluggish Observer. Here, we have just scratched the surface of the potential problems that can occur. Also, as stated in the white-paper:

"Improved Multithreaded Unit Testing" ( http://users.ece.utexas.edu/~gligoric/papers/JagannathETAL11IMunit.pdf), "Multithreaded code is notoriously hard to develop and test".

Consequently, when the multithreaded Observer pattern is required, it is better to use battle-proven libraries.

When talking about liveness, we are referring to the definition from Concurrent computing that describes it as a set of properties that requires a concurrent system in order to make progress, even though its executing components may have to enter critical sections. This was initially defined by Lasley Lamport in Proving the Correctness of Multiprocess Programs ( http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.137.9454&rep=rep1&type=pdf).

Our overview of the Observer pattern would be incomplete without mentioning how the Observer and Observable classes form a java.util package. These classes were released with JDK 1.0, so they are pretty old. If we look into the source code, we find a pretty straightforward implementation, which is very similar to the one that we made previously in this chapter. Because these classes were introduced before Java generics, they operate with events of an Object type and consequently are not type safe. Also, this implementation is not very efficient, especially in a multithreaded environment. Taking what we've mentioned into account (all of these issues and some others), these classes are deprecated in Java 9, so it makes no sense to use them for new applications.

More details about the reasons to deprecate the JDK Observer and Observable can be found at:  https://dzone.com/articles/javas-observer-and-observable-are-deprecated-in-jd.

Of course, when developing applications, we may use hand-crafted implementations of the Observer pattern. This gives us the ability to decouple a source of events and observers. However, it is troublesome to address a lot of aspects that are crucial for modern multithreaded applications. That includes error handling, asynchronous execution, thread-safety, the demand for the highest performance, and so on. We have already seen that event implementation shipped with JDK is not sufficient beyond an educational usage. As a result, it is unquestionably better to use a more mature implementation that is provided by the respectable authority.