An introduction to Reactive Programming with RxJava – Part #2

An introduction to Reactive Programming with RxJava – Part #2

In my last blog entry, I gave a short introduction to reactive programming. In this part, I will provide a brief introduction to a reactive framework called RxJava. Yep, know I promised you some practical examples, and there will be some. However, I couldn’t fit everything in, which means it will be a part 3 as well. 

RxJava is an implementation of the Reactive Extensions Library, for the JVM (Java Virtual Machine). It’s building on the Observable design pattern, which means that your building blocks can either be of type Publisher or Subscribers. A Publisher is an object that can produce events, and a Subscriber can consume events provided by a Publisher.

The Basic

So, to get started, we first need to create a subscriber. In RxJava, all subscribers are implementing the interface org.reactivestreams.Subscriber<T>. The Subscriber interface is declaring three methods.

–    onNext: is called when there is new data.
–    onError: a failed terminal state.
–    onComplete: the successful terminal state.

Next, we need to create a Publisher; that produces the events. RxJava has two types, Observable and Flowable. The difference between these two is that Flowable has been designed for handling backpressure, while Observable is not. Backpressure is when the publishers are creating more events than the Subscribers can process, which can lead to that the subscribers are missing events or a queue of events is created (which could lead to out of memory).

Flowable is implementing the org.reactivestreams.Publisher interface that has one method:

–    subscribe: request to start streaming to the Subscriber, given as argument.

The subscribe method is returning a Disposable object, which can be used to stop subscribing to events prematurely. In the example below, I have created a Flowable containing three Strings. Next, I have created a Subscriber, that will just print the strings to the console.

[cc lang=”java” tab_size=”2″ lines=”80″]
Flowable oneAgency = Flowable.just(“Take MAssive Action”, “Courage To Change”, “Live Service”);
Subscriber mySubscriber = new DisposableSubscriber() {

public void onNext(String s) {
System.out.println(s);
}

public void onError(Throwable t) {
System.out.println(“Error: ” + t.getMessage());
}

public void onComplete() {
System.out.println(“Completed”);
}
}
[/cc]

Operators

The Observable/Flowable classes have several methods for transforming the event data.

Map

The map method executed on each event that is produced and transforming one type of data to another. In the example below, the Strings are mapped to their corresponding hashCode values (a number).

[cc lang=”java” tab_size=”2″ lines=”80″]
Observable.fromArray(“Take Massive Action”, “Courage To Change”, “Live Service”)
.map(s -> s.hashCode())
.subscribe(System.out::println);
[/cc]

FlatMap

The flatmap method can convert one object to several, for example:

[cc lang=”java” tab_size=”2″ lines=”80″]
Observable.fromArray(“Take Massive Action”, “Courage To Change”, “Live Service”)
.flatMap(s -> Observable.fromArray(s.toUpperCase(), s.hashCode()))
.subscribe(System.out::println);
[/cc]

The example above takes one string, and then returns a new Observable, containing two new Strings. Of course, the new Observable can be of an entirely different type.

Scan

Scan takes two parameters, the first one is an initial state and the second one is a function (Java 8’s BiFunctional interface). It then takes the initial state plus the first item and inputs it to the function. The result is then used with the second item when calling the function for the second time. I think an example will make it more clear.

[cc lang=”java” tab_size=”2″ lines=”80″]
Observable.fromArray(“Take Massive Action”, “Courage To Change”, “Live Service”)
.scan(0, (total, s) -> total + s.length())
.subscribe(System.out::println);
[/cc]

Here, we are first setting the initial value to zero. In the function, we are taking the previous outcome (in the first iteration it is the initial value 0), and adding it to the length of the string. It’s just counting the number of letters in our strings (including the whitespace). If we wanted to ignore the whitespace, we could use the map function as well.

[cc lang=”java” tab_size=”2″ lines=”80″]
Observable.fromArray(“Take Massive Action”, “Courage To Change”, “Live Service”)
.map(s -> s.replaceAll(“\\s”,””))
.scan(0, (total, s) -> total + s.length())
.subscribe(System.out::println);
[/cc]

Filter

As name states, this is a method for applying a filter to the events. For example:

[cc lang=”java” tab_size=”2″ lines=”80″]
Observable.fromArray(“Take Massive Action”, “Courage To Change”, “Live Service”)
.filter(s -> s.startsWith(“T”))
.subscribe(System.out::println);
[/cc]

The code will only print “Take Massive Action” since it’s the only String that starts with the letter T. There’s a lot of other useful methods, but these are some of the basic ones.

Conclusion

For anyone familiar with Streams in Java 8, this looks just like that. Or does it? Streams in Java 8 are pull-based, meaning that is it’s the “subscriber” that is pulling the data out from the Stream. RxJava is push-based, pushing the data to the subscriber. Another difference is how you are handling exceptions. With streams in Java 8, you have to wrap the code within try-catch, for example, the following code won’t compile:

[cc lang=”java” tab_size=”2″ lines=”80″]
List numbers = IntStream.rangeClosed(0, 1000)
.boxed()
.collect(Collectors.toList());

numbers.stream().forEach(i -> MyPrinter.print(i.toString()));
[/cc]

The method MyPrinter.print has declared that it might throw an exception of type Exception, to avoid compilation errors we need to write it as:

[cc lang=”java” tab_size=”2″ lines=”80″]
List numbers = IntStream.rangeClosed(0, 1000)
.boxed()
.collect(Collectors.toList());

numbers.stream().forEach(i -> {
try {
MyPrinter.print(i.toString()));
} catch (Exception e) {
e.printStackTrace();
}
});
[/cc]

With RxJava, that is taken care of in the onError method on our subscribers. Furthermore, RxJava has excellent support for different Schedulers, where you can specify if the subscribers should run in a separate thread than the main thread. You won’t have to block the main thread while processing the events in another thread. In Java 9 there will be support for reactive programming with the new Flow API, this could be interesting.

This entry was a short introduction to the RxJava framework. In my next entry, I will give a more concrete example of how we can use RxJava in some real code. Stay tuned.

This blog entry is an updated version of the one published on One Agency’s blog, http://bit.ly/2nz4KO3.

One thought on “An introduction to Reactive Programming with RxJava – Part #2

  1. Reactive Programming raises the level of abstraction of your code so you can focus on the interdependence of events that define the business logic, rather than having to constantly fiddle with a large amount of implementation details.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.