It’s been a while since I started the Reactive Framework series. In case you missed the earlier posts, here’s the links to what we’ve done so far:
- Reactive Framework Why bother – Identifying common scenarios solved by RX
- Reactive Framework Building an IObservable Event Generator – Generates random Observable values
- Reactive Framework Getting your LINQ on (Using LINQ over Observables)
At this point, we’ve created our observer and set up the the logic that handles our OnNext values. What we haven’t done yet is wired our LINQ based processing pipeline to the event source. To do this, we need to Subscribe the handler to the Observables. By default, we need to create a new class that implements IObserver. To keep this simple, let’s just output the values to the console for now:
Class ConsoleObserver(Of T) Implements IObserver(Of T) Public Sub OnCompleted() Implements System.IObserver(Of T).OnCompleted End Sub Public Sub OnError(ByVal [error] As System.Exception) Implements System.IObserver(Of T).OnError End Sub Public Sub OnNext(ByVal value As T) Implements System.IObserver(Of T).OnNext Console.WriteLine(value.ToString) End Sub End Class
The IObserver interface has three methods: the method that is fired when the source is done (OnCompleted), the method that occurs when an exception occurs (OnError) and the method that is used when each new value is received (OnNext). In the case of this simple example, we only implement the OnNext method and output the value to the Console Window.
With this in place, we can tie this all together creating and starting our sensor, filtering and projecting from the values (using LINQ) and displaying the values (through the new ConsoleObserver):
Dim Sensor As New ObservableSensor Sensor.Start() Dim lowvalueSensors = From s In Sensor Where s.SensorValue < 3 Select s.SensorValue lowvalueSensors.Subscribe(New ConsoleObserver(Of Double))
Thankfully, consuming our Observable chain doesn’t require creating a new class. Observable.Subscribe offers an additional overload which, instead of taking an IObservable, we can use an Action Lambda. As a result, we can restate our example above as follows:
Dim Sensor As New ObservableSensor Sensor.Start() Dim lowvalueSensors = From s In Sensor Where s.SensorValue < 3 Select s.SensorValue lowvalueSensors.Subscribe(Sub(value) Console.WriteLine(value))
While this consuming code is slightly longer, the total net effect is significantly more maintainable code since we don’t need to declare a separate class to just output our results.
We now could have a fully functioning set of examples. Unfortunately or example at this point is extremely unresponsive because we are completely CPU bound constantly running all of the process on the current thread. Up next time, moving our logic to the background thread.