This chapter is all about Reactive Extensions, so let's go ahead and create a project called rx-playground which we will be using in our exploratory tour. We will use RxClojure (see https://github.com/ReactiveX/RxClojure), a library that provides Clojure bindings for RxJava() (see https://github.com/ReactiveX/RxJava):
$ lein new rx-playground
Open the project file and add a dependency on RxJava's Clojure bindings:
This will print the "Got value: 10" string to the REPL.
The subscribe function of an observable allows us to register handlers for three main things that happen throughout its life cycle: new values, errors, or a notification that the observable is done emitting values. This corresponds to the onNext, onError, and onCompleted methods of the Observer interface, respectively.
In the preceding example, we are simply subscribing to onNext, which is why we get notified about the observable's only value, 10.
A single-value observable isn't terribly interesting, though. Let's create and interact with one that emits multiple values:
This will print the numbers from 1 to 10, inclusive, to the REPL. seq->o is a way to create observables from Clojure sequences. It just so happens that the preceding snippet can be rewritten using Rx's own range operator:
(-> (rx/range 1 10)
(rx/subscribe prn))
Of course, this doesn't present any advantages to working with raw values or sequences in Clojure yet.
But what if we need an observable that emits an undefined number of integers at a given interval? This becomes challenging to represent as a sequence in Clojure, but Rx makes it trivial:
RxClojure doesn't provide bindings to all of RxJava's API yet. The interval method is one such example of this. We're required to use interoperability and call the method directly on the Observable class from RxJava.
Observable/interval takes a number and a time unit as arguments. In this case, we are telling it to emit an integer starting from zero every 100 milliseconds. If we type this in a REPL-connected editor, however, two things will happen:
We will not see any output (depending on your REPL; this is true for Emacs)
We will have a rogue thread emitting numbers indefinitely
Both issues arise from the fact that Observable/interval is the first factory method we have used that doesn't emit values synchronously. Instead, it returns an observable that defers the work to a separate thread.
The first issue is simple enough to fix. Functions such as prn will print to whatever the dynamic var *out* is bound to. When working in certain REPL environments, such as Emacs, this is bound to the REPL stream, which is why we can generally see everything we print.
However, since Rx is deferring the work to a separate thread, *out* isn't bound to the REPL stream anymore, so we don't see the output. To fix this, we need to capture the current value of *out* and bind it in our subscription. This will be incredibly useful as we experiment with Rx in the REPL. Let's revisit the prn-to-repl helper function which we defined earlier:
The first thing we do is create var called repl-out that contains the current REPL stream. Next, we create a function called prn-to-repl which works just like prn, except it uses the binding macro to create a new binding for *out* that is valid within that scope.
This still leaves us with the rogue thread problem. Now is the appropriate time to mention that the subscribe method from an observable returns a subscription object. By holding onto a reference to it, we can call its unsubscribe method to indicate that we are no longer interested in the values that are produced by that observable.
Putting it all together, our interval example can be rewritten like so:
We create a new interval observable and immediately subscribe to it, just as we did before. This time, however, we assign the resulting subscription to a local var. Note that it now uses our helper function, prn-to-repl, so we will start seeing values being printed to the REPL straight away.
Next, we sleep the current (the REPL) thread for a second. This is enough time for Observable to produce numbers from 0 to 9. That's roughly when the REPL thread wakes up and unsubscribes from that observable, causing it to stop emitting values.