RxJava – Chaining Observables

In ReactiveX one can chain Observables using the flatMap operator, similar to chaining Promises using “then“.

Example 1
In listing 1, three reactive streams are created.
– counterStream creates a stream of integers 0, 1.
– helloStream creates a stream three strings, “Hello”, “World”, “!\n”
– resultStream chains helloStream to counterStream.

Listing 1

public class ChainObservable {
    /**
     * @param args  
     */
    public static void main(String[] args) {
        Observable<Integer> counterStream = 
           Observable.range(0, 2);
        
        Observable<String> helloStream = 
          Observable.create(o -> {
            o.onNext("Hello");
            o.onNext("World");
            o.onNext("!\n");
            o.onCompleted();
        });
        
        Observable<Object> resultStream = 
          counterStream
            .map(x -> {
                System.out.println("# " +x);
                return x;
            })
            .flatMap( counter -> 
               helloStream
            );
        
        resultStream.subscribe(
            v -> System.out.println(
                  "Received: " + v
            ),
            e -> System.out.println(
                  "Error: " + e
            ),
            () -> System.out.println(
                   "Completed"
            )
        ).unsubscribe();
        
    }
}

The output of listing 1 is:

# 0
Received: Hello
Received: World
Received: !

# 1
Received: Hello
Received: World
Received: !

Completed

Note that the “chaining” or ‘thening’ used here is not quite what chaining was meant for. The flatMap operator use in listing 1 passes the current counter, but the chained Observable does not use it, the Observable just repeats its onNext(…) invocations. The flatMap: “transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable”

The strange thing is the “Completed” output. The code does this because of line 14 in the source (does it?). The helloStream invokes onCompleted, but the completed in the subscriber is not triggered until the final counterStream event. Or, I’m looking at this incorrectly?

Example 2
In example 1 above, the function that operates on each item passed by the source Observable is not used. I’m wondering if it could be used in a “chained” in Observable, as in listing 2 below. Does this make sense? Doesn’t this then have a runtime penalty since the Observable is not created before it is used?

Listing 2 version where Observable uses flatMap data

import static java.lang.System.out;
import rx.Observable;

/**
 * 
 * @author jbetancourt
 *
 */
public class ChainObservable {
    /**
     * @param args  
     */
    public static void main(String[] args) {
        Observable.range(0, 2)      
        .map(x -> {
            out.println("# " +x);
            return x;
        })
        .flatMap( count -> 
          Observable.create(o -> {
            o.onNext(count + " Hello");
            o.onNext(count + " World");
            o.onNext(count + " !\n");
            o.onCompleted();
          })
        )
        .subscribe(
            v -> out.println("Received: " + v),
            e -> out.println("Error: " + e),
            () -> out.println("Completed in subscribed")
        ).unsubscribe();
    }
    
}

Output of this version is:

# 0
Received: 0 Hello
Received: 0 World
Received: 0 !

# 1
Received: 1 Hello
Received: 1 World
Received: 1 !

Completed in subscribed

Links

  • RxJava
  • RxJava: chaining observables
  • Grokking RxJava, Part 2: Operator, Operator
  • Transformation of sequences
  • Don’t break the chain: use RxJava’s compose() operator
  • Implementing Your Own Operators
  • Similar Posts:

    Creative Commons License
    This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.

    One thought on “RxJava – Chaining Observables”

    Leave a Reply

    Your email address will not be published. Required fields are marked *