Tag Archives: rx

Use AOP with RxJS to make functions Observable

There are no Observable create functions that will observe function invocations on an Object. As a proof of concept I used a JavaScript AOP library, Meld, to accomplish this “function to stream” capability.

Alternatives
This sounds like a job for a mixin, proxy, Object.observe, inheritance, or decorator pattern. There are hundreds of mixin libraries, the new JavaScript Proxy is not readily available, Object.observe was cancelled, etc.
Edit: Via a Dan Abramov tweet (what a busy guy!), I found out about MobX.

Aspect-Oriented Programming (AOP)
AOP is paradigm that allows the separation of cross-cutting concerns via applying “advices” to objects. There are many advice types. The Meld library supports several, for example, ‘before’, ‘around’, ‘on’, ‘after’, ‘afterReturning’, and ‘afterThowing’.

We’ll just use two here: “after” to observe a function invocation, and “around” to observe a property change.

Observe of function
In this example object:

var myObject = {
   myProperty:'15',		

   doSideEffect: function(x){this.myProperty = x;},		

   doSomething: function(a, b) {return a + b;}
};

We want to subscribe to the ‘doSomething’ function in this object. That is, when that function is invoked, the resulting return value should be sent to an Observable stream.

The stream:

 
var subject = new Rx.Subject();
    
var subjectAsObservable = subject.map(function(x){
       console.log("in map x:" + x);
       return x;
});
    
var subscription = subjectAsObservable.subscribe(
       function(x){ // onNext
           console.log("Next in subject: " + x.toString());
       }	
);

Now to apply an After advice to that function “joinpoint” we do:

 
var remover = 
  doAfter(myObject, 'doSomething', subject, 
    function(result) {
        console.log("after function .... ");
    }
);

myObject.doSomething(1, 2); 
remover.remove();

The output of running this is:

 
in map x:3
main.js:3
after function .... 

Note that “When adding advice to object methods, meld returns an object with a remove() method that can be used to remove the advice that was just added.”

“doAfter” function is a simple function wrapper of the ‘after‘ Meld function:

 
/**
 * @param anObject the object to advise
 * @param method the method selector
 * @param subject the subject that observes the function
 * @param afterFunction function to invoke after the advised function returns
 * @return object with remove() method
function doAfter(anObject, method, subject, afterFunction){
    return meld.after(anObject, method, function(result){
    	subject.onNext(result);
    	afterFunction(result);
    });
}

Observe of property change
Some AOP libraries support get/set advices on property changes. AspectJ in Java world supports this. Meld does not, but this can be implemented using an Around advice. This type of advice wraps a function call.

With an Around advice we can record a properties’ value before and after a function invocation. If there is a change, we generate a new event for the Observable stream.

 
remover = meld.around(myObject, getMethodNames, function(methodCall){
    var originalValue = myObject.myProperty;
    
    var result = methodCall.proceed();
    if(originalValue != myObject.myProperty){
        subject.onNext("prop changed: " + originalValue + " -> " + myObject.myProperty);
    }
    
    return result;
});

myObject.doSideEffect('25');

remover.remove();

The above uses a utillity function:

/** Utility function to get all functions in object */
function getMethodNames(obj){
   return Object.getOwnPropertyNames(obj)
        .filter(function(name){
	    return typeof obj[name] === 'function';
        });		
}

When any method in object is invoked that changes the property, we get:

in map x:prop changed: 15 -> 25
Next in subject: prop changed: 15 -> 25

Of course, there is a performance cost to this approach of monitoring property changes. This costs has not been quantified or explored.

Summary
Using AOP an object’s functions can be made into observers. An “after” advice can signal a result event, and using an “around” advice, it is possible to generate events on an Object’s properties. Useful? Unknown. Already possible in ReactiveX ecosystem, not sure.

Possible operators for the above could be:
fromFunction(object, methodSelector)
fromProperty(object, propertySelector)
fromProperty(object, methodSelector, propertySelector).

TODO
Look at the Rx.Observable.fromCallback(func, [context], [selector]) operator. Could it be used to go from function to Observable stream without using AOP, as presented here?

Software used
RxJS: version 4.0.7, rx.all.compat.js
Cujojs Meld: version 1.3.1,

Links

Creative Commons License
This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.

Can conditional state be used in RxJava Observable streams?

Few examples of RxJava contain conditional operations on a stream that involve state transitions based on stream content. Sure there are operators such as filter and the rest, but since examples are “pure”, that is, using pure functions, there is no setting of state that can influence the next event.

It doesn’t seem that this is part of the use-case of reactive streams. At least, that is my current conclusion. I hope it is incorrect. If not then a large type of use-case is not amendable to Reactive approach.

Here is the general problem statement: A stream consists of items x. If the current x has a property p set, then subsequent x will be skipped if they are related to that first x. Note that those skipped x could also have the same property set. This scenario could be generalized to specify an operation on a subgroup besides skipping.

A concrete example of this ‘pattern’ is found at Prune deleted folders of repo diff report using Java. IN that blog post, the x are the output of a Subversion diff report, and the conditional property is folder deletion. We want to prune deleted subfolders.

In Listing 1 below, I solve this with Java 8 streams in method pruneDeletedFoldersJavaStream(List). I also solve it using RxJava in method pruneDeletedFoldersRx(List). However, the Rx approach is just a copy of the Java 8 streams approach, but just using the filter operator. A State object model is used in both since Java 8 requires final fields.

Can this be implemented with the use of Rx operators and no explicit state handling? I tried many approaches, even use of BehaviorSubject, to no avail. I don’t even know how to phrase the question on Stackoverflow.

Listing 1, Full Source

Links

Creative Commons License
This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 3.0 Unported License.