RxJava – Chaining Observables

I read that in ReactiveX one can chain Observables using the flatMap operator, similar to chaining Promises using “then“. So, I thought I would try this.

Of course, there is always an issue to cause one to renew learning something.

The code below in listing 1, has a strange output.

# 0
Received: Hello
Received: World
Received: !

# 1
Received: Hello
Received: World
Received: !


The strange thing is the “Completed” output. The code does this because of line 13 in the source below. If line 13 is commented out, the “Completed” does not print to the console.

Why doesn’t “Completed” print for each counter? Note that the “chaining” or ‘thening’ used here is not quite what chaining was meant for. The flatMap operator use in the code below passes the current counter, but the chained Observable does not use it, the Observable just repeats its onNext(…) invocations.

Listing 1

public class ChainObservable {
     * @param args  
    public static void main(String[] args) {
        Observable<Integer> counterStream = 
           Observable.range(0, 2);
        Observable<String> helloStream = 
          Observable.create(o -> {
        Observable<Object> resultStream = 
            .map(x -> {
                System.out.println("# " +x);
                return x;
            .flatMap( counter -> 
            v -> System.out.println(
                  "Received: " + v
            e -> System.out.println(
                  "Error: " + e
            () -> System.out.println(


  • RxJava
  • RxJava: chaining observables
  • Grokking RxJava, Part 2: Operator, Operator
  • Transformation of sequences
  • Don’t break the chain: use RxJava’s compose() operator
  • Implementing Your Own Operators
  • Creative Commons License
    This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.

    Sonos music using external HD on Raspberry Pi

    You can attach an external hard drive to a Raspberry Pi and then share music over Sonos. This works very well. Even though my hard drive is connected to the rPI via USB 2.0, the music streams fine, no stutters.

    Right now I’m playing Jeff Buckley’s ‘Sketches for My Sweetheart The Drunk’ all over the house. “Vancouver” track is so awesome!

    Technically this kind of storage sharing falls under the term Network Attached Storage (NAS). But, that seems like an overblown term for just sharing one disk. There are a lot of features on a full-blown NAS.

    How does the RaspberryPI share the storage? By running a server called Samba. This is a set of open-source programs that run in Unix/Linux to provide file and print services compatible with Windows-based clients.

    Spin down?
    Currently I’m looking into how to enable spin down of the hard drive when idle. Necessary? Supposed to make HD last longer. I just want to reduce power usage. The whole point of a Raspberry Pi in this scenario.

    Maybe this page, “Spin Down and Manage Hard Drive Power on Raspberry Pi”, will help.

    Feb 1, 2015
    hdparm and sd-idle did not work. trying SDPARM.
    sudo blkid
    /dev/sda1: LABEL=”USB B” UUID=”B23A8B373A8AF81D” TYPE=”ntfs” PARTUUID=”b75ac8d0-01″

    sudo sdparm –flexible –command=stop /dev/sda1
    /dev/sda1: ST325082 3AS

    Feb 3, 2015
    The spin down using sdparm is working fine.

    Hardware Used

    • Raspberry Pi 2 (Canakit Ulitimate Starter Kit with WIFI)
    • Old USB hub from Staples
    • 3.5 inch 1 GB Hard drive
    • HexStar-3 3.5in External Hard drive enclosure

    Technical details
    I had a lot of grief getting it to work. Haven’t touched a Linux system in while.

    Some articles of the many articles I found information on how to do this are in the links section below. Note that there isn’t one single approach to do this. And, it also depends on what OS your running on Raspberry PI. I’m running Raspbian which I installed via NOOBS; all included in the kit I purchased.

    Other articles about this approach:

    Creative Commons License
    This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.

    Use AOP with RxJS to make functions Observable

    There are no Observable create functions that will observe function invocations on an Object. As a proof of concept I used an AOP library to accomplish this function to stream capability.

    This sounds like a job for a mixin, proxy, Object.observe, inheritance, or decorator pattern. There are hundreds of mixin libraries, the new JavaScript Proxy is not readily available, Object.observe was cancelled, etc.

    Aspect-Oriented Programming (AOP)
    AOP is paradigm that allows the separation of cross-cutting concerns via applying “advices” to objects. There are many advice types. The Meld library supports several, for example, ‘before’, ‘around’, ‘on’, ‘after’, ‘afterReturning’, and ‘afterThowing’.

    We’ll just use two here: “after” to observe a function invocation, and “around” to observe a property change.

    Observe of function
    In this example object:

    var myObject = {
       doSideEffect: function(x){this.myProperty = x;},		
       doSomething: function(a, b) {return a + b;}

    We want to subscribe to the ‘doSomething’ function in this object. That is, when that function is invoked, the resulting return value should be sent to an Observable stream.

    The stream:

    var subject = new Rx.Subject();
    var subjectAsObservable = subject.map(function(x){
           console.log("in map x:" + x);
           return x;
    var subscription = subjectAsObservable.subscribe(
           function(x){ // onNext
               console.log("Next in subject: " + x.toString());

    Now to apply an After advice to that function “joinpoint” we do:

    var remover = 
      doAfter(myObject, 'doSomething', subject, 
        function(result) {
            console.log("after function .... ");
    myObject.doSomething(1, 2); 

    The output of running this is:

    in map x:3
    after function .... 

    “doAfter” function is a simple function wrapper of the ‘after’ Meld function:

     * @param anObject the object to advise
     * @param method the method selector
     * @param subject the subject that observes the function
     * @param afterFunction function to invoke after the advised function returns
    function doAfter(anObject, method, subject, afterFunction){
        return meld.after(anObject, method, function(result){

    Observe of property change
    Some AOP libraries support get/set advices on property changes. AspectJ in Java world supports this. Meld does not, but this can be implemented using an Around advice. This type of advice wraps a function call.

    With an Around advice we can record a properties’ value before and after a function invocation. If there is a change, we generate a new event for the Observable stream.

    remover = meld.around(myObject, getMethodNames, function(methodCall){
        var originalValue = myObject.myProperty;
        var result = methodCall.proceed();
        if(originalValue != myObject.myProperty){
            subject.onNext("prop changed: " + originalValue + " -> " + myObject.myProperty);
        return result;

    The above uses a utillity function:

    /** Utility function to get all functions in object */
    function getMethodNames(obj){
       return Object.getOwnPropertyNames(obj)
    	    return typeof obj[name] === 'function';

    When any method in object is invoked that changes the property, we get:

    in map x:prop changed: 15 -> 25
    Next in subject: prop changed: 15 -> 25

    Of course, there is a performance cost to this approach of monitoring property changes. This costs has not been quantified or explored.

    Using AOP an object’s functions can be made into observers. An “after” advice can signal a result event, and using an “around” advice, it is possible to generate events on an Object’s properties. Useful? Unknown. Already possible in ReactiveX ecosystem, not sure.

    Possible operators for the above could be:
    fromFunction(object, methodSelector)
    fromProperty(object, propertySelector)
    fromProperty(object, methodSelector, propertySelector).

    Look at the Rx.Observable.fromCallback(func, [context], [selector]) operator. Could it be used to go from function to Observable stream without using AOP, as presented here?

    Software used
    RxJS: version 4.0.7, rx.all.compat.js
    Cujojs Meld: version 1.3.1,


    Creative Commons License
    This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.

    RxJS process loop with toggled operations and canceling

    How to create a stream on a button that toggles between two operations, and another button that can cancel the same stream? Observable operators are numerous and very complex, so coming up with a solution, or composing a stream, to any problem can be very difficult. At least, that is what I found while learning all this reactive stuff.

    Use case
    Two buttons: one labeled “Start”, and another labeled “Exit”.

    First click on ‘Start’ button, changes its label to “Toggle” and starts the operational loop that invokes operation1.

    Further click on button, now labeled “Toggle”, will change the operation invoked alternatively to operation2 or operation1.

    A click on the ‘Exit’ button will stop the loop stream.

    Here is the RxJS based solution running in JS Bin playground: (Ability to re-run is not implemented. ‘Exit’ means you closed the app.)
    RxJS toggable and cancellable process loop on jsbin.com

    Sure, this would have been much easier with just regular JavaScript. Just create a timer and a listener on each button, etc. Observable streams don’t really add any value for this simple scenario. But, this is just a learning exercise.

    Then again, using streams may offer more opportunities for handling more complex scenarios with timer loops.

    This appears to be just like the very basic traditional state machine exercise of modeling a stopwatch. There, the toggle button starts/stops the timer, and the Exit is the cancel button. Do a web image search for “statechart stopwatch”.

    Jump to source code

    This seems easy now, but it took a lot to come up with a solution. The approach selected, using ‘scan’ was suggested in an article and a StackOverflow question. ‘Scan’ can be used to transfer statefulness.

    For the loop we can create streams using timer or interval.

    The alternation of operation is where the complexity comes in. “case” operator is very powerful since it can choose an Observable based on a key into a sources map. Would be useful if the operations, i.e., functions invoked in the loop, are themselves streams or Subjects.

    If there is only one operation to execute in the loop and the toggle button is behaviorally a run/pause, then there is the “pausable” operator.

    One thing I did not implement is a way to restart the process after it was exited. Do the whole thing as a programmatic setup on first start click, or figure out how to resubscribe to the observables? Oh well.

    Software used
    RxJS: version 4.0.7, rx.all.compat.js
    JQuery 2.2.0: jquery-2.2.0.js


    Two versions shown below, ES6 using arrow function expressions, and one using plain functions. I’m sure this is a naive newbie solution to this, so be very skeptical of the RxJS usage here. Note: I named the streams with a “$” prefix. a kind of reverse Hungarian Notation. This is a naming convention used by the Cycle.js project.

    Creative Commons License
    This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.

    Reflective light channels as buriable Solar Cell energy generators.

    One of the causes in low efficiency capturing solar radiation via solar cells is the reflection loss. A lot of research on this has resulted in many approaches such as nano-structures to reduce this reflection. An alternative is to embrace reflection. How?

    The sun shines on a collector opening and this light is reflected down into a ‘pipe’ or channel. The angle of the reflectors cause the light to bounce from one side to another. Now instead of mirrors being used as reflectors, we use solar cells. Thus, instead of larger solar farms we would have wider and deeper light capture networks. Since the reflection occurs within the channel, this structure can be buried in the ground.

    Unknown at this point are the parameters of this pipe or channel? Is this really feasible as large structures or only efficient at nano-scales?

    solar cell pipe to reuse reflection
    solar cell pipe

    © 2016 by Josef Betancourt. All rights reserved

    Creative Commons License
    This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.

    Can conditional state be used in RxJava Observable streams?

    Few examples of RxJava contain conditional operations on a stream that involve state transitions based on stream content. Sure there are operators such as filter and the rest, but since examples are “pure”, that is, using pure functions, there is no setting of state that can influence the next event.

    It doesn’t seem that this is part of the use-case of reactive streams. At least, that is my current conclusion. I hope it is incorrect. If not then a large type of use-case is not amendable to Reactive approach.

    Here is the general problem statement: A stream consists of items x. If the current x has a property p set, then subsequent x will be skipped if they are related to that first x. Note that those skipped x could also have the same property set. This scenario could be generalized to specify an operation on a subgroup besides skipping.

    A concrete example of this ‘pattern’ is found at Prune deleted folders of repo diff report using Java. IN that blog post, the x are the output of a Subversion diff report, and the conditional property is folder deletion. We want to prune deleted subfolders.

    In Listing 1 below, I solve this with Java 8 streams in method pruneDeletedFoldersJavaStream(List). I also solve it using RxJava in method pruneDeletedFoldersRx(List). However, the Rx approach is just a copy of the Java 8 streams approach, but just using the filter operator. A State object model is used in both since Java 8 requires final fields.

    Can this be implemented with the use of Rx operators and no explicit state handling? I tried many approaches, even use of BehaviorSubject, to no avail. I don’t even know how to phrase the question on Stackoverflow.

    Listing 1, Full Source


    Creative Commons License
    This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.

    Reactive client application state constraint contracts

    As in Design By Contract (DbC), the contents and structure of a Store mutation can be constrained by Preconditions, Postconditions, and Invariants. These constraints are part of the Store implementation.

    Modern JavaScript frameworks are now emphasizing a form of “global” state storage. This is not really new, but gained new impetus when React came out, which synergistically combined this with other factors, like components, virtual-DOM, and one-way data flow.

    React Stores

    “… contain the application state and logic. Their role is somewhat similar to a model in a traditional MVC, but they manage the state of many objects — they do not represent a single record of data like ORM models do. Nor are they the same as Backbone’s collections. More than simply managing a collection of ORM-style objects, stores manage the application state for a particular domain within the application.” — https://facebook.github.io/flux/docs/overview.html

    In very large apps and multiple development teams, this Store could become a high maintenance cost and a source of defects.

    Non-React Store
    For example of a reusable Store implementation see Redux. Amazing video: Dan Abramov – Live React: Hot Reloading with Time Travel at react-europe 2015

    Interesting Java implementation of Redux: redux-java.

    Today’s great application and shiny new framework are tomorrow’s maintenance nightmare. Thus, it makes sense to add in features that will aid this future support.

    A store has a JSON map of { foo : true, fee : false, fum : {…}, … }.

    A particular flow in the application will update foo to false. Is that ok?

    If foo and fee are in the same ‘context’, can they both be false? Note that the ‘application’ or maintenance developer may not know that there is a relationship between foo and fee. With annotations, dependency injection, and so forth, even perusal of the source could make this determination difficult.

    Sure, in a small app and simple Store, it would be easy to trace the program and find out the implicit constraints. In reality, Stores grow complex over time, the app gets complex, new requirements mean many changes, and the developer culprit has left the company. Stores will mutate to resemble the catch-all Windows’ Registry.

    To compound this, the control flows are hidden within the framework being used. No matter if its today’s MVC usurper, React, or some Observable liberator, there is spaghetti flow and implicit state somewhere.

    “At some point, you no longer understand what happens in your app as you have lost control over the when, why, and how of its state. When a system is opaque and non-deterministic, it’s hard to reproduce bugs or add new features.” —

    One way to reduce issues is to use the techniques from data design and use schema constraints. This does not necessarily mean the use of actual schemas as in XML’s XSD or its wannabe JSON equivalent. One alternative is that the Store has an API for constraint binding to content, new life cycle functions apply the constraints. The result of a constraint violation depends on dev or production deployment, log or throw of exceptions.

    Notice how in the development world we are going back in time? Central stores, Data flow diagrams, functional programming, FSM (obfuscated), etc. Maybe Object-Oriented Programming was a great distraction like Disco. Time for some real funk. “Maceo, I want you to Blow!

    Wow, this synchronicity is weird. I’ve been thinking of this post’s contents for a while. Started to write it and my twitter app on phone rings. @brianloveswords tweets about a blog post by @jlongster: “Starters and Mainainers”. Not exactly the same subject, but was about maintenance.


    Creative Commons License
    This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.

    A Visit from Saint Nicholas: “Twas The Night Before Christmas”

    Great poem! Originally called “A Visit from St. Nicholas“. I still remember singing a version of it in a grade school choir.

    Doesn’t he sound like the late Mitch Hedberg? The diction and phrasing.

    History of Christmas:

    When I was in Italy I visited the tomb of the real Saint Nicholas at the Basilica di San Nicola in Bari, Italy.

    Merry X-Mas and whatever good and moral stuff you celebrate!

    Creative Commons License
    This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.

    Prune deleted folders of repo diff report using Java

    Sometimes the contents of deleted folders are not relevant. Just showing the top level deleted folder or dir is enough. For example, in a branch diff against trunk, folder moves are shown as adds and deletes. But in a diff report, the whole sub node of the deleted folder is output.

    In the particular report I was creating everything under the deleted directories was just noise. How to prune that?

    BTW, I searched the SVN docs and found no support for this kind of pruning. I also looked at the Git docs to see if it was different, but also found no mention of this use-case.

    Example scenario:

    d1 (modified)
    ├ d2 (deleted)
       ┝ d3 (deleted)

    If we create a deleted prune report of the above, it would contain, d1 modified, and d1/d2 deleted. It would not contain d1/d2/d3.

    The algorithm in psuedocode

    Jan 8, 2015: I found a use-case where this algorithm does not work when used with an actual Subversion repository being accessed by SvnKit. Not due to SVNKit of course. Will update when fixed.


    boolean deleting := false
    String deletePath = “”
    for each path{
        if deleting then 
                if path starts with deletePath then
                    skip path
                    deleting = false
                    deletePath = “”
            if path is directory and it is deleted then
                deleting = true
                deletePath = path

    This can be implemented at the script level, for example, using Groovy or some other language.

    SvnKit Scenario

    In an Subversion Console application I wrote using ReactJS, I used SvnKit at the server side to interface to the Subversion server. Future blog post will discuss this single-page app. One of the React components was a Diff table.

    Using the SvnKit library I generated an svn diff summary list. This is a list of SvnDiffStatus objects: List. Among other things, these object contain a path string of the respective resource.

    To prune the excess deleted folders I first attempted to create an Object tree structure, then I gave up on that (too slow), finally a co-worker suggested I just sort the paths and do a simple contains comparison. Wow, how did I miss that? I guess I was sidetracked by “Objects” and didn’t use simple scripting techniques.

    So to prune the folders we sort and prune the results of an SvnDiffSummary object, svnDiff::
    List list = pruneDeletedFolders(sortDiffList(((Receiver)svnDiff.getReceiver()).getEntries()));

    Where Receiver is an implementation of ISvnObjectReceiver.

    (The SvnDiff entries may already be sorted, but the SvnKit API docs do not specify this).

    Sort the diff list using Guava

    	private List<SvnDiffStatus> sortDiffList(List<SvnDiffStatus> list) {
    		return new Ordering<SvnDiffStatus>() {
    			public int compare(SvnDiffStatus left, SvnDiffStatus right) {
    				return left.getPath().compareTo(right.getPath());

    prune the list

    	private List<SvnDiffStatus> pruneDeletedFolders(List<SvnDiffStatus> list){
    		String deletedPath = "";
    		boolean isDeleting = false;
    		List<SvnDiffStatus> prunedList = new ArrayList<>();
    		for (Iterator<SvnDiffStatus> iterator = list.iterator(); iterator.hasNext();) {
    			SvnDiffStatus diff = iterator.next();
    			String path = diff.getPath();
    			if (isDeleting) {
    				if (path.startsWith(deletedPath)) {
    					// skip this diff
    				} else {
    					isDeleting = false;
    					deletedPath = "";
    			} else {
    				if (isDirectory(diff) && isStatusDeleted(diff)) {
    					isDeleting = true;
    					deletedPath = path;
    		return prunedList;


    Creative Commons License
    This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.

    Control multiple PCs with single keyboard and mouse

    A software KVM approach is sometimes a good solution. While setting up a new PC I had to access the older PC. I used to use Synergy, but could not find if that software is still around.

    Now I’m using ‘Mouse without Borders’. It allows me to control the two PCs, Windows 7 and Windows 10. Works very well. It also shares the keyboard and copy and paste.


    Creative Commons License
    This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.

    The celebrated man in the street

    %d bloggers like this: