Tag Archives: rxjs

Use AOP with RxJS to make functions Observable

There are no Observable create functions that will observe function invocations on an Object. As a proof of concept I used a JavaScript AOP library, Meld, to accomplish this “function to stream” capability.

This sounds like a job for a mixin, proxy, Object.observe, inheritance, or decorator pattern. There are hundreds of mixin libraries, the new JavaScript Proxy is not readily available, Object.observe was cancelled, etc.
Edit: Via a Dan Abramov tweet (what a busy guy!), I found out about MobX.

Aspect-Oriented Programming (AOP)
AOP is paradigm that allows the separation of cross-cutting concerns via applying “advices” to objects. There are many advice types. The Meld library supports several, for example, ‘before’, ‘around’, ‘on’, ‘after’, ‘afterReturning’, and ‘afterThowing’.

We’ll just use two here: “after” to observe a function invocation, and “around” to observe a property change.

Observe of function
In this example object:

var myObject = {

   doSideEffect: function(x){this.myProperty = x;},		

   doSomething: function(a, b) {return a + b;}

We want to subscribe to the ‘doSomething’ function in this object. That is, when that function is invoked, the resulting return value should be sent to an Observable stream.

The stream:

var subject = new Rx.Subject();
var subjectAsObservable = subject.map(function(x){
       console.log("in map x:" + x);
       return x;
var subscription = subjectAsObservable.subscribe(
       function(x){ // onNext
           console.log("Next in subject: " + x.toString());

Now to apply an After advice to that function “joinpoint” we do:

var remover = 
  doAfter(myObject, 'doSomething', subject, 
    function(result) {
        console.log("after function .... ");

myObject.doSomething(1, 2); 

The output of running this is:

in map x:3
after function .... 

Note that “When adding advice to object methods, meld returns an object with a remove() method that can be used to remove the advice that was just added.”

“doAfter” function is a simple function wrapper of the ‘after‘ Meld function:

 * @param anObject the object to advise
 * @param method the method selector
 * @param subject the subject that observes the function
 * @param afterFunction function to invoke after the advised function returns
 * @return object with remove() method
function doAfter(anObject, method, subject, afterFunction){
    return meld.after(anObject, method, function(result){

Observe of property change
Some AOP libraries support get/set advices on property changes. AspectJ in Java world supports this. Meld does not, but this can be implemented using an Around advice. This type of advice wraps a function call.

With an Around advice we can record a properties’ value before and after a function invocation. If there is a change, we generate a new event for the Observable stream.

remover = meld.around(myObject, getMethodNames, function(methodCall){
    var originalValue = myObject.myProperty;
    var result = methodCall.proceed();
    if(originalValue != myObject.myProperty){
        subject.onNext("prop changed: " + originalValue + " -> " + myObject.myProperty);
    return result;



The above uses a utillity function:

/** Utility function to get all functions in object */
function getMethodNames(obj){
   return Object.getOwnPropertyNames(obj)
	    return typeof obj[name] === 'function';

When any method in object is invoked that changes the property, we get:

in map x:prop changed: 15 -> 25
Next in subject: prop changed: 15 -> 25

Of course, there is a performance cost to this approach of monitoring property changes. This costs has not been quantified or explored.

Using AOP an object’s functions can be made into observers. An “after” advice can signal a result event, and using an “around” advice, it is possible to generate events on an Object’s properties. Useful? Unknown. Already possible in ReactiveX ecosystem, not sure.

Possible operators for the above could be:
fromFunction(object, methodSelector)
fromProperty(object, propertySelector)
fromProperty(object, methodSelector, propertySelector).

Look at the Rx.Observable.fromCallback(func, [context], [selector]) operator. Could it be used to go from function to Observable stream without using AOP, as presented here?

Software used
RxJS: version 4.0.7, rx.all.compat.js
Cujojs Meld: version 1.3.1,


Creative Commons License
This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.

RxJS process loop with toggled operations and canceling

How to create a stream on a button that toggles between two operations, and another button that can cancel the same stream? Observable operators are numerous and very complex, so coming up with a solution, or composing a stream, to any problem can be very difficult. At least, that is what I found while learning all this reactive stuff.

Use case
Two buttons: one labeled “Start”, and another labeled “Exit”.

First click on ‘Start’ button, changes its label to “Toggle” and starts the operational loop that invokes operation1.

Further click on button, now labeled “Toggle”, will change the operation invoked alternatively to operation2 or operation1.

A click on the ‘Exit’ button will stop the loop stream.

Here is the RxJS based solution running in JS Bin playground: (Ability to re-run is not implemented. ‘Exit’ means you closed the app.)
RxJS toggable and cancellable process loop on jsbin.com

Sure, this would have been much easier with just regular JavaScript. Just create a timer and a listener on each button, etc. Observable streams don’t really add any value for this simple scenario. But, this is just a learning exercise.

Then again, using streams may offer more opportunities for handling more complex scenarios with timer loops.

This appears to be just like the very basic traditional state machine exercise of modeling a stopwatch. There, the toggle button starts/stops the timer, and the Exit is the cancel button. Do a web image search for “statechart stopwatch”.

Jump to source code

This seems easy now, but it took a lot to come up with a solution. The approach selected, using ‘scan’ was suggested in an article and a StackOverflow question. ‘Scan’ can be used to transfer statefulness.

For the loop we can create streams using timer or interval.

The alternation of operation is where the complexity comes in. “case” operator is very powerful since it can choose an Observable based on a key into a sources map. Would be useful if the operations, i.e., functions invoked in the loop, are themselves streams or Subjects.

If there is only one operation to execute in the loop and the toggle button is behaviorally a run/pause, then there is the “pausable” operator.

One thing I did not implement is a way to restart the process after it was exited. Do the whole thing as a programmatic setup on first start click, or figure out how to resubscribe to the observables? Oh well.

Software used
RxJS: version 4.0.7, rx.all.compat.js
JQuery 2.2.0: jquery-2.2.0.js


Two versions shown below, ES6 using arrow function expressions, and one using plain functions. I’m sure this is a naive newbie solution to this, so be very skeptical of the RxJS usage here. Note: I named the streams with a “$” prefix. a kind of reverse Hungarian Notation. This is a naming convention used by the Cycle.js project.

Creative Commons License
This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.

Reactive client application state constraint contracts

As in Design By Contract (DbC), the contents and structure of a Store mutation can be constrained by Preconditions, Postconditions, and Invariants. These constraints are part of the Store implementation.

Modern JavaScript frameworks are now emphasizing a form of “global” state storage. This is not really new, but gained new impetus when React came out, which synergistically combined this with other factors, like components, virtual-DOM, and one-way data flow.

React Stores

“… contain the application state and logic. Their role is somewhat similar to a model in a traditional MVC, but they manage the state of many objects — they do not represent a single record of data like ORM models do. Nor are they the same as Backbone’s collections. More than simply managing a collection of ORM-style objects, stores manage the application state for a particular domain within the application.” — https://facebook.github.io/flux/docs/overview.html

In very large apps and multiple development teams, this Store could become a high maintenance cost and a source of defects.

Non-React Store
For example of a reusable Store implementation see Redux. Amazing video: Dan Abramov – Live React: Hot Reloading with Time Travel at react-europe 2015

Interesting Java implementation of Redux: redux-java.

Today’s great application and shiny new framework are tomorrow’s maintenance nightmare. Thus, it makes sense to add in features that will aid this future support.

A store has a JSON map of { foo : true, fee : false, fum : {…}, … }.

A particular flow in the application will update foo to false. Is that ok?

If foo and fee are in the same ‘context’, can they both be false? Note that the ‘application’ or maintenance developer may not know that there is a relationship between foo and fee. With annotations, dependency injection, and so forth, even perusal of the source could make this determination difficult.

Sure, in a small app and simple Store, it would be easy to trace the program and find out the implicit constraints. In reality, Stores grow complex over time, the app gets complex, new requirements mean many changes, and the developer culprit has left the company. Stores will mutate to resemble the catch-all Windows’ Registry.

To compound this, the control flows are hidden within the framework being used. No matter if its today’s MVC usurper, React, or some Observable liberator, there is spaghetti flow and implicit state somewhere.

“At some point, you no longer understand what happens in your app as you have lost control over the when, why, and how of its state. When a system is opaque and non-deterministic, it’s hard to reproduce bugs or add new features.” —

One way to reduce issues is to use the techniques from data design and use schema constraints. This does not necessarily mean the use of actual schemas as in XML’s XSD or its wannabe JSON equivalent. One alternative is that the Store has an API for constraint binding to content, new life cycle functions apply the constraints. The result of a constraint violation depends on dev or production deployment, log or throw of exceptions.

Notice how in the development world we are going back in time? Central stores, Data flow diagrams, functional programming, FSM (obfuscated), etc. Maybe Object-Oriented Programming was a great distraction like Disco. Time for some real funk. “Maceo, I want you to Blow!

Wow, this synchronicity is weird. I’ve been thinking of this post’s contents for a while. Started to write it and my twitter app on phone rings. @brianloveswords tweets about a blog post by @jlongster: “Starters and Mainainers”. Not exactly the same subject, but was about maintenance.


Creative Commons License
This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.