Tag Archives: RxJava

RxJava – Chaining Observables

In ReactiveX one can chain Observables using the flatMap operator, similar to chaining Promises using “then“.

Example 1
In listing 1, three reactive streams are created.
– counterStream creates a stream of integers 0, 1.
– helloStream creates a stream three strings, “Hello”, “World”, “!\n”
– resultStream chains helloStream to counterStream.

Listing 1

public class ChainObservable {
    /**
     * @param args  
     */
    public static void main(String[] args) {
        Observable<Integer> counterStream = 
           Observable.range(0, 2);
        
        Observable<String> helloStream = 
          Observable.create(o -> {
            o.onNext("Hello");
            o.onNext("World");
            o.onNext("!\n");
            o.onCompleted();
        });
        
        Observable<Object> resultStream = 
          counterStream
            .map(x -> {
                System.out.println("# " +x);
                return x;
            })
            .flatMap( counter -> 
               helloStream
            );
        
        resultStream.subscribe(
            v -> System.out.println(
                  "Received: " + v
            ),
            e -> System.out.println(
                  "Error: " + e
            ),
            () -> System.out.println(
                   "Completed"
            )
        ).unsubscribe();
        
    }
}

The output of listing 1 is:

# 0
Received: Hello
Received: World
Received: !

# 1
Received: Hello
Received: World
Received: !

Completed

Note that the “chaining” or ‘thening’ used here is not quite what chaining was meant for. The flatMap operator use in listing 1 passes the current counter, but the chained Observable does not use it, the Observable just repeats its onNext(…) invocations. The flatMap: “transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable”

The strange thing is the “Completed” output. The code does this because of line 14 in the source (does it?). The helloStream invokes onCompleted, but the completed in the subscriber is not triggered until the final counterStream event. Or, I’m looking at this incorrectly?

Example 2
In example 1 above, the function that operates on each item passed by the source Observable is not used. I’m wondering if it could be used in a “chained” in Observable, as in listing 2 below. Does this make sense? Doesn’t this then have a runtime penalty since the Observable is not created before it is used?

Listing 2 version where Observable uses flatMap data

import static java.lang.System.out;
import rx.Observable;

/**
 * 
 * @author jbetancourt
 *
 */
public class ChainObservable {
    /**
     * @param args  
     */
    public static void main(String[] args) {
        Observable.range(0, 2)      
        .map(x -> {
            out.println("# " +x);
            return x;
        })
        .flatMap( count -> 
          Observable.create(o -> {
            o.onNext(count + " Hello");
            o.onNext(count + " World");
            o.onNext(count + " !\n");
            o.onCompleted();
          })
        )
        .subscribe(
            v -> out.println("Received: " + v),
            e -> out.println("Error: " + e),
            () -> out.println("Completed in subscribed")
        ).unsubscribe();
    }
    
}

Output of this version is:

# 0
Received: 0 Hello
Received: 0 World
Received: 0 !

# 1
Received: 1 Hello
Received: 1 World
Received: 1 !

Completed in subscribed

Links

  • RxJava
  • RxJava: chaining observables
  • Grokking RxJava, Part 2: Operator, Operator
  • Transformation of sequences
  • Don’t break the chain: use RxJava’s compose() operator
  • Implementing Your Own Operators
  • Creative Commons License
    This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.

    Can conditional state be used in RxJava Observable streams?

    Few examples of RxJava contain conditional operations on a stream that involve state transitions based on stream content. Sure there are operators such as filter and the rest, but since examples are “pure”, that is, using pure functions, there is no setting of state that can influence the next event.

    It doesn’t seem that this is part of the use-case of reactive streams. At least, that is my current conclusion. I hope it is incorrect. If not then a large type of use-case is not amendable to Reactive approach.

    Here is the general problem statement: A stream consists of items x. If the current x has a property p set, then subsequent x will be skipped if they are related to that first x. Note that those skipped x could also have the same property set. This scenario could be generalized to specify an operation on a subgroup besides skipping.

    A concrete example of this ‘pattern’ is found at Prune deleted folders of repo diff report using Java. IN that blog post, the x are the output of a Subversion diff report, and the conditional property is folder deletion. We want to prune deleted subfolders.

    In Listing 1 below, I solve this with Java 8 streams in method pruneDeletedFoldersJavaStream(List). I also solve it using RxJava in method pruneDeletedFoldersRx(List). However, the Rx approach is just a copy of the Java 8 streams approach, but just using the filter operator. A State object model is used in both since Java 8 requires final fields.

    Can this be implemented with the use of Rx operators and no explicit state handling? I tried many approaches, even use of BehaviorSubject, to no avail. I don’t even know how to phrase the question on Stackoverflow.

    Listing 1, Full Source

    Links

    Creative Commons License
    This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.

    Java ‘hello world’ using RxJava library

    A simple RxJava example that uses an array of strings and combines them to create the traditional first program.

    Jump to source code

    RxJava is a NetFlix open source library that they developed as part of optimizing their architecture. The library is related to the “Reactive programming” pattern:

    “RxJava is an implementation of Reactive Extensions – a library for composing asynchronous and event-based programs using observable sequences for the Java VM”.

    The main data structure in ReactiveX is, as presented in “Streams: The data structure we need” by Pam Selle, is the Stream. Streams are discussed in the SICP book, 3.5 Streams.

    The main resource for RX in general is http://reactivex.io/

    Updates

    • Oct 18, 2015: Revised blog post and source code.
    • Dec 26, 2015: Read an article on ribot’s site, “Unit Testing RxJava Observables“, that RxJava already has experimental support for unit testing assertions.

    Approach

    I thought I would try my hand at it. I used the ‘Hello world’ example on the RxJava wiki’s getting started page:

    public static void hello(String... names) {
          Observable.from(names).subscribe(new Action1<String>() {
              @Override
              public void call(String s) {
                  System.out.println("Hello " + s + "!");
              }
          });
    }
    

    As a source of gradual complexity, I used a list of strings for the target output, and I also don’t include the end “!” in the input stream. With this I was able to use more of the library to get a feel for the syntax and how it relates to the underlying concepts.

    Learning Reactive
    One problem with many introductions to RxJava is that they start off with overly simple or complex examples, and these complex examples require domain knowledge of some application. BTW, the worse example of this tendency was a book on Design Patterns where the author used obscure sports racing concepts and so forth.

    Another problem is that the major Operators are very abstractly documented. Just look at the and/then/when or join operators. You’d think each Rx implementation’s documents would give examples for each operator. Update: I found one tutorial that has much more RxJava examples: Intro-To-RxJava

    A good resource on picking Reactive operators is A Decision Tree of Observable Operators.

    Example
    Below I created a JUnit test class in Java that creates an Observable from an array [“Hello”, “world”], and each test subscribes to it. That is, the test is the Observer. The subtle complexity is that the required “!” at the end of result string is not in the String array. How is that added withing the ‘Rx’ pattern? RxJava has a rich API, thus there are many ways to take that array and create the “Hello world!” string.

    Concats
    Test2 in the source listing 1 is my attempt to avoid ‘programmatic’ addition of the ending “!” to the String result. Instead, I concat three Observables: a concatMap that takes each streamed in String and creates a new Observable with [data, ” “], an Observable that skips the last [data,” “], and finally an Observable that just creates the “!”.

    Yes, this is a lot of complex code for this simple task. The point was to use a simple task to learn the complex code. Later, the complexity would be tamed or matched to the task at hand.

    How to fail JUnit test
    Since I used JUnit tests to write the code, I wondered how would you detect that an RxJava operator got an error and failed the test? For example, during the onCompleted method? An error there will not invoke the onError method since the Observable is done generating any data. Thus, in Test4, I had to wrap the assertion fail in a try catch, then manually invoke the onError method. Probably semantically incorrect approach. But, that still would not make the JUnit test fail, I had to set a field so that the last statements in the test would use that to fail the test. Seems like a kludge.

    Dec 26, 2015
    Using RxJava’s test support is the way to go for simple assertions. Below I use TestSubscriber.

    /**
     * Dan Lew's example in 'Grokking RxJava, Part 1: The Basics'.<p>
     * Converted to a test.
     * @see "http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/"
     * @throws Exception
     */
    @Test
    public final void test6() throws Exception{
    	TestSubscriber<String> testSubscriber = new TestSubscriber<>();
    	
    	Observable.just("Hello, world!")
    	.map(s -> s + " -Dan")
    	.map(s -> s.hashCode())
    	.map(i -> Integer.toString(i))
    	.subscribe(testSubscriber);
    	// .subscribe(s -> System.out.println(s)); // -
    	testSubscriber.assertValue("-238955153");
    }
    

    I’ll leave most of the sample code as is for now. But, its best to use the library to its fullest before you create your own testing kludge.

    Summary
    Have not grokked RxJava yet, so the above is probably not idiomatic use or correct.

    RxJava looks like a powerful system, but it is very sophisticated and may require a long grok curve. There are various YouTube videos and other presentations that give a contextual understanding of RX in general. Ben Christensen’s presentations are awesome.

    Options?
    There are many Reactive libraries available. Another is Pivotal’s Reactor. See this article “Pivotal’s Reactor Goes GA” A review of Reactor is “Playing with Reactor“.

    The “React” term?
    Many things are using some form of this term. For example, ReactJs, the M~V library from FaceBook. Sure, it relates, however the concept of event streams is not central to the library’s conceptual model, afaik.

    JavaScript
    Perhaps one of the most important use of RX, in terms of usage, is on the client side. One library is RxJS.

    Cycle.js
    There is even a framework, Cycle.js created by André Staltz, née Medeiros, that combines the “reactive” concept with RxJS to create a new paradigm of client structure called Model View Intent (MVI). Some intro videos on CycleJS library:

     


     

    Dependencies

    • Java 8 (some tests are using Java 8 syntax)
    • JUnit – ‘junit:junit:4.12’
    • RxJava – ‘io.reactivex:rxjava:1.0.14’
    • Guava – ‘com.google.guava:guava:18.0’

    Dev System

    • Windows 10
    • Eclipse Mars

    Links

    Listing 1, Full Source
    Having the source as a JUnit test is helpful since the code can be used as a means to experiment and extend, while having the ability to easily test regressions.


    Listening to Jake Bugg: “Broken”

    On youtube:

    Creative Commons License
    This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.