Reactive Extensions

Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators.

Data sequences can take many forms, such as a stream of data from a file or web service, web services requests, system notifications, or a series of events such as user input.

Reactive Extensions represents all these data sequences as observable sequences. An application can subscribe to these observable sequences to receive asynchronous notifications as new data arrive.  The Rx library is available for desktop application development in Python. It is also released for .NET, Ruby, Silverlight, Windows Phone 7 and JavaScript. For more information on these different platforms, see Differences Between Versions of Rx topic.

· Pulling vs. Pushing Data

In interactive programming, the application actively polls a data source for more information by pulling data from a sequence that represents the source. Such behavior is represented by the iterator pattern of IEnumerable<T>/IEnumerator<T>. The IEnumerable<T> interface exposes a single method GetEnumerator() which returns an IEnumerator<T> to iterate through this collection.  The IEnumerator<T> allows us to get the current item (by returning the Current property), and determine whether there are more items to iterate (by calling the MoveNext method). 

The application is active in the data retrieval process: besides getting an enumerator by calling GetEnumerator, it also controls the pace of the retrieval by calling MoveNext at its own convenience. This enumeration pattern is synchronous, which means that the application might be blocked while polling the data source. Such pulling pattern is similar to visiting your library and checking out a book. After you are done with the book, you pay another visit to check out another one.

On the other hand, in reactive programming, the application is offered more information by subscribing to a data stream (called observable sequence in Rx), and any update is handed to it from the source. The application is passive in the data retrieval process: apart from subscribing to the observable source, it does not actively poll the source, but merely react to the data being pushed to it. When the stream has no more data to offer, or when it errs, the source will send a notice to the subscriber. In this way, the application will not be blocked by waiting for the source to update.

This is the push pattern employed by Reactive Extensions. It is similar to joining a book club in which you register your interest in a particular genre, and books that match your interest are automatically sent to you as they are published. You do not need to stand in line to acquire something that you want. Employing a push pattern is helpful in many scenarios, especially in a UI-heavy environment in which the UI thread cannot be blocked while the application is waiting for some events. This is also essential in programming environments such as Silverlight which has its own set of asynchronous requirements. In summary, by using Rx, you can make your application more responsive.

The push model implemented by Rx is represented by the observable pattern of Observable/Observer. It abstracts a sequence of data, and keeps a list of Observer implementations that are interested in the data sequence. The Observable will notify all the observers automatically of any state changes. To register an interest through a subscription, you use the subscribe method of Observable, which takes on an Observer and returns a disposable. This gives you the ability to track and dispose of the subscription. In addition, Rx’s LINQ implementation over observable sequences allows developers to compose complex event processing queries over push-based sequences such as events, APM-based (“AsyncResult”) computations, Task-based computations, and asynchronous workflows. For more information on the Observable/Observer interfaces, see Exploring The Major Classes in Rx. For tutorials on using the different features in Rx, see Using Rx.

Getting Started with Rx

This section describes in general what Reactive Extensions (Rx) is, and how it can benefit programmers who are creating asynchronous applications.

When Will You Use Rx

This topic describes the advantages of using Rx.

Advantages of using Rx

Whether you are authoring a traditional desktop or web-based application, you have to deal with asynchronous programming from time to time. Desktop applications have I/O or UI threads that might take a long time to complete and potentially block all other active threads. However, a user of the modern asynchronous programming model has to manage exceptions and cancellation of events manually. To compose or filter events, he has to write custom code that is hard to decipher and maintain.

In addition, if your application interacts with multiple sources of data, the conventional way to manage all of these interactions is to implement separate methods as event handlers for each of these data streams. For example, as soon as a user types a character, a keydown event is pushed to your keydown event handler method. Inside this keydown event handler, you have to provide code to react to this event, or to coordinate between all of the different data streams and process this data into a useable form.

Using Rx, you can represent multiple asynchronous data streams (that come from diverse sources, e.g., stock quote, tweets, computer events, web service requests, etc.), and subscribe to the event stream using the Observer class. The Observable class maintains a list of dependent Observer implementations and notifies them automatically of any state changes. You can query observable sequences using standard LINQ query operators implemented by the Rx.Linq.Observable type. Thus you can filter, project, aggregate, compose and perform time-based operations on multiple events easily by using these static LINQ operators. Cancellation and exceptions can also be handled gracefully by using extension methods provided by Rx.

The following example shows how easy it is to implement an observable in Python. For more information on using Subjects, see Use Subjects.

 

# Subscribe to an observable

from rx import Observable, Observer

res = Observable.empty()

subscription = res.subscribe(

                lambda x: print("Observer 1: OnNext: ", x),

                lambda ex: print("Observer 1: OnError: ", ex.Message),

                lambda : print("Observer 1: OnCompleted"))

 

· Manipulating Events

In Python, events are simple mechanisms for communication between threads. As we have discussed earlier, Rx represents events as a collection of objects: e.g., a MouseMove event contains a collection of Point values. Due to the first-class object nature of observables, they can be passed around as function parameters and returns, or stored in a variable.

· Unsubscribing from Events

In Rx, when you subscribe to an observable sequence representing an event stream, you can specify how long you would like to be notified of changes from the sequence (e.g., n iterations, or for a time interval similar to “do not push between 3-5pm”, or when some other event happens).  In addition, when you subscribe to an observable sequence, you get a disposable handle which you can use to unsubscribe (by calling dispose) to the sequence later.

Installing Rx

This topic describes where you can download the Reactive Extensions (Rx) SDK.

·To download Rx

Reactive Extensions is available for different platforms such as Python, Ruby, .NET Framework 3.5, 4.0. Silverlight 3 and 4, JavaScript, as well as Windows Phone 7. You can download the libraries, as well as learn about their prerequisites at the Rx MSDN Developer Center.

Differences Between Versions of Rx

The following topic describes the various platforms for which you can develop solutions using Reactive Extensions.

To get the latest release of Rx, as well as learn about its prerequisites, please visit the Rx MSDN Developer Center.

·Python

The Reactive Extensions for Python (Rx.py) is a set of libraries for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators in Python. Using Rx.py, developers represent asynchronous data streams with Observables, query asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers. Simply put, Rx.py = Observables + LINQ + Schedulers. Rx.py is written for Python 3 only.

·Ruby

Rx for Ruby (Rx.rb) allows you to use Linq operators to create push-based observable collections in Ruby.

·.NET Framework

The core Rx interfaces, Observable<T> and Observer<T>, ship as part of .NET Framework 4. If you are running on .NET Framework 3.5 SP1, or if you want to take advantage of the LINQ operators implemented in T:Rx.Linq.Observable type, as well as many other features such as schedulers, you can download the Rx assemblies in the Rx MSDN Developer Center.

·Silverlight

Silverlight disallows you from making cross-threading calls, thus you cannot use a background thread to update the UI. Instead of writing verbose code using the Dispatcher.BeginInvoke call to explicitly execute code on the main UI thread, you can use the factory Observable.Start method provided by the Rx assemblies to invoke an action asynchronously. Cross-threading is taken care of transparently by Rx under the hood.

You can also use the various Observable operator overloads that take in a Scheduler, and specify the T:Rx.Concurrency.DispatcherScheduler to be used.

·Javascript

Rx for Javascript (RxJS) allows you to use LINQ operators in JavaScript. It provides easy-to-use conversions from existing DOM, XmlHttpRequest (AJAX), and jQuery events to push-based observable collections, allowing users to seamlessly integrate Rx into their existing JavaScript-based websites.

RxJS brings similar capabilities to client script and integrates with jQuery events (Rx.Observable.FromJQueryEvent). It also supports Script#.

·Windows Phone

Windows Phone 7 ships with a version of the Reactive Extensions baked into the ROM of the device. For more information, see Reactive Extensions for .NET Overview for Windows Phone. Documentation for this version of the Reactive Extensions can be found in Windows Phone API library at Microsoft.Phone.Reactive Namespace.

The Rx MSDN Developer Center also contains an updated version of Rx for WP7, which has new definitions in the System.Reactive.Linq namespace. Note that the new APIs will not clash with the library built in to the phone (nor do they replace the version in the ROM). For more information on the differences of these 2 versions, see this Rx team blog post.

Exploring The Major Classes in Rx

This topic describes the major Reactive Extensions (Rx) interfaces used to represent observable sequences and subscribe to them.

·Observable/Observer

Rx exposes asynchronous and event-based data sources as push-based, observable sequences abstracted by the new Observable class. It represents a data source that can be observed, meaning that it can send data to anyone who is interested. It maintains a list of dependent Observer implementations representing such interested listeners, and notifies them automatically of any state changes.

An implementation of the Observable class can be viewed as a collection of elements of type T. Therefore, an Observable<int> can be viewed as a collection of integers, in which integers will be pushed to the subscribed observers.

As described in What is Rx, the other half of the push model is represented by the Observer interface, which represents an observer who registers an interest through a subscription. Items are subsequently handed to the observer from the observable sequence to which it subscribes.

In order to receive notifications from an observable collection, you use the subscribe method of Observable to hand it an Observer object. In return for this observer, the subscribe method returns a disposable object that acts as a handle for the subscription. This allows you to clean up the subscription after you are done.  Calling dispose on this object detaches the observer from the source so that notifications are no longer delivered. As you can infer, in Rx you do not need to explicitly unsubscribe from an event.

Observers support three publication events, reflected by the interface’s methods. on_next can be called zero or more times, when the observable data source has data available. For example, an observable data source used for mouse move events can send out a Point object every time the mouse has moved. The other two methods are used to indicate completion or errors.

The following lists the Observable/Observer classes.

 

class Observable(object):

    """Represents a push-style collection."""

    initializers = []

 

    def subscribe(self, on_next=None, on_error=None, on_completed=None):

class Observer(AbstractObserver):

    """Supports push-style iteration over an observable sequence."""

 

    def __init__(self, on_next=None, on_error=None, on_completed=None):

        super(Observer, self).__init__()

 

        self.next = on_next or noop

        self.error = on_error or default_error

        self.completed = on_completed or noop

 

Rx also provides subscribe extension methods so that you can avoid implementing the Observer interface yourself. For each publication event (on_next, on_error, on_completed) of an observable sequence, you can specify a delegate that will be invoked, as shown in the following example. If you do not specify an action for an event, the default behavior will occur.

 

res = Observable.range(0, 10) #creates an observable sequence of 10 integers, starting from 1

subscription = res.subscribe(

                lambda x: print("Observer 1: OnNext: ", x),

                lambda ex: print("Observer 1: OnError: ", ex.Message),

                lambda : print("Observer 1: OnCompleted"))

 

You can treat the observable sequence (such as a sequence of mouse-over events) as if it were a normal collection. Thus you can write LINQ queries over the collection to do things like filtering, grouping, composing, etc. To make observable sequences more useful, the Rx assemblies provide many factory LINQ operators so that you do not need to implement any of these on your own. This will be covered in the Querying Observable Collections using LINQ Operators topic.

Creating and Subscribing to Simple Observable Sequences

You do not need to implement the Observable interface manually to create an observable sequences. Similarly, you do not need to implement Observer either to subscribe to a sequence. By installing the Reactive Extension assemblies, you can take advantage of the Rx.Linq.Observable type which provides many static LINQ operators for you to create a simple sequence with zero, one or more elements. In addition, Rx provides Subscribe extension methods that take various combinations of on_next, on_error and on_completed handlers in terms of delegates.

·Creating and subscribing to a simple sequence

The following sample uses the Range operator of the Rx.Linq.Observable type to create a simple observable collection of numbers. The observer subscribes to this collection using the Subscribe method of the Rx.Linq.Observable class, and provides actions that are delegates which handle on_next, on_error and on_completed.

The Range operator has several overloads. In our example, it creates a sequence of integers that starts with x and produces y sequential numbers afterwards. 

As soon as the subscription happens, the values are sent to the observer. The on_next delegate then prints out the values.

 

res = Observable.range(0, 10)

subscription = res.subscribe(

                lambda x: print("Observer 1: OnNext: ", x),

                lambda ex: print("Observer 1: OnError: ", ex.Message),

                lambda : print("Observer 1: OnCompleted"))

 

When an observer subscribes to an observable sequence, the thread calling the Subscribe method can be different from the thread in which the sequence runs till completion. Therefore, the subscribe call is asynchronous in that the caller is not blocked until the observation of the sequence completes. This will be covered in more details in the Using Schedulers topic.

Notice that the subscribe method returns a disposable, so that you can unsubscribe to a sequence and dispose of it easily. When you invoke the dispose method on the observable sequence, the observer will stop listening to the observable for data.  Normally, you do not need to explicitly call dispose unless you need to unsubscribe early, or when the source observable sequence has a longer life span than the observer. Subscriptions in Rx are designed for fire-and-forget scenarios without the usage of a finalizer. When the disposable instance is collected by the garbage collector, Rx does not automatically dispose of the subscription. However, note that the default behavior of the Observable operators is to dispose of the subscription as soon as possible (i.e, when an on_completed or on_error messages is published). For example, the code x = Observable.zip(a,b).subscribe() will subscribe x to both sequences a and b. If a throws an error, x will immediately be unsubscribed from b.

In addition to creating an observable sequence from scratch, you can convert existing enumerators, Python events and asynchronous patterns into observable sequences. The other topics in this section will show you how to do this.

Notice that this topic only shows you a few operators that can create an observable sequence from scratch. To learn more about other LINQ operators, see Query Observable Collections using LINQ Operators.

·Converting an Enumerable Collection to an Observable Sequence

Using the from_array operator, you can convert a generic enumerable collection to an observable sequence and subscribe to it.

 

e = [ 1, 2, 3, 4, 5 ]

res = Observable.from_array(e)

subscription = res.subscribe(

                lambda x: print("Observer 1: OnNext: ", x),

                lambda ex: print("Observer 1: OnError: ", ex.Message),

                lambda : print("Observer 1: OnCompleted"))

Querying Observable Sequences using LINQ Operators

In this topic, we will look at the first-class nature of observable sequences as Observable<T> objects, in which generic LINQ operators are supplied by the Rx assemblies to manipulate these objects. Most operators take an observable sequence and perform some logic on it and output another observable sequence. In addition, as you can see from our code samples, you can even chain multiple operators on a source sequence to tweak the resulting sequence to your exact requirement.

·Using Different Operators

We have already used the Create and Generate operators in previous topics to create and return simple sequences. In this topic, we will use other static LINQ operators of the Observable type so that you can filter, group and transform data. Such operators take observable sequence(s) as input, and produce observable sequence(s) as output.

·Combining different sequences

In this section, we will examine some of the operators that combine various observable sequences into a single observable sequence. Notice that data are not transformed when we combine sequences.

In the following sample, we use the Concat operator to combine two sequences into a single sequence and subscribe to it. For illustration purpose, we will use the very simple Range(x, y) operator to create a sequence of integers that starts with x and produces y sequential numbers afterwards.

 

source1 = Observable.range(1, 3)

source2 = Observable.range(1, 3)

x = source1.concat(source2)

 

Notice that the resultant sequence is 1,2,3,1,2,3. This is because when you use the concat operator, the 2nd sequence (source2) will not be active until after the 1st sequence (source1) has finished pushing all its values. It is only after source1 has completed, then source2 will start to push values to the resultant sequence. The subscriber will then get all the values from the resultant sequence.

Compare this with the merge operator. If you run the following sample code, you will get 1,1,2,2,3,3. This is because the two sequences are active at the same time and values are pushed out as they occur in the sources. The resultant sequence only completes when the last source sequence has finished pushing values.

Notice that for Merge to work, all the source observable sequences need to be of the same type of Observable<T>. The resultant sequence will be of the type Observable<T>. If source1 produces an on_error in the middle of the sequence, then the resultant sequence will complete immediately.

 

source1 = Observable.range(1, 3)

source2 = Observable.range(1, 3)

x = source1.merge (source2)

 

Finally, let’s look at on_error_resume_next. This operator will move on to source2 even if source1 cannot be completed due to an error. In the following example, even though source1 represents a sequence that terminates with an exception (by using the Throw operator), the subscriber will receive values (1,2,3) published by source2. Therefore, if you expect either source sequence to produce any error, it is a safer bet to use on_error_resume_next to guarantee that the subscriber will still receive some values.

 

source1 = Observable.throw_exception('Error')

source2 = Observable.range(4, 3)

x = source1.on_error_resume_next(source2)

 

Notice that for all these combination operators to work, all the observable sequences need to be of the same type.

·Filtering

In the following example, we use the generate operator to create a simple observable sequence of numbers. The generate operator has several overloads. In our example, it takes an initial state (0 in our example), a conditional function to terminate (fewer than 10 times), an iterator (+1), a result selector (a square function of the current value). , and print out only those smaller than 15.

 

def checknum(x):

   if x<15 : print(x)

   return

 

seq = Observable.generate(0,  lambda i: i < 10, lambda i: i + 1, lambda i: i * i)

seq.subscribe(lambda x: checknum(x)) # output is 0, 1, 4, 9

·LINQ Operators by Categories

The LINQ Operators by Categories topic lists of all major LINQ operators implemented by the T:Rx.Linq.Observable type by their categories; specifically: creation, combine, functional, mathematical, time, miscellaneous, selection and primitives.

·See Also

LINQ Operators by Categories

Rx.Linq.Observable

LINQ Operators by Categories

This topic lists all major LINQ operators implemented by the T:Rx.Linq.Observable type by their categories, specifically: creation, conversion, combine, functional, mathematical, time, exceptions, miscellaneous, selection and primitives.

LINQ Operators by Categories

 

Usage

Operators

Creating an observable sequence

1.   create

2.   generate

3.   defer

4.   range

 

Combining multiple observable sequences into a single sequence.

1.   amb

2.   concat

3.   join

4.   merge

5.   repeat

6.   zip

 

Functional

1.   let

3.   publish

4.   replay

 

Mathemathical operators on sequences

1.   aggregate

2.   reduce

2.   count

Time-based operations

1.   delay

2.   interval

3.   time_interval

Handling Exceptions

1.   catch

2.   finally

3.   retry

4.   on_error_resume_next

 

Miscellaneous operators

1.   do_action

Filtering and selecting values in a sequence

1.   take

2.   take_until/take_while

3.   select

4.   select_many

5.   skip

Primitives

1.   never

2.   empty

3.   return

4.   throw_exception

 

 

 

 

·See Also

Querying Observable Sequences using LINQ Operators

Rx.Linq.Observable

Subjects

This section describes the Subject<T> type implemented by Reactive Extension. It also describes various implementations of Subject<T> which serves different purposes.

Reference

Rx

Rx.Linq

Rx.Subjects

Rx.Concurrency

Using Subjects

The Subject type implements both Observable and Observer, in the sense that it is both an observer and an observable. You can use a subject to subscribe all the observers, and then subscribe the subject to a backend data source. In this way, the subject can act as a proxy for a group of subscribers and a source. You can use subjects to implement a custom observable with caching, buffering and time shifting. In addition, you can use subjects to broadcast data to multiple subscribers.

By default, subjects do not perform any synchronization across threads. They do not take a scheduler but rather assume that all serialization and grammatical correctness are handled by the caller of the subject.  A subject simply broadcasts to all subscribed observers in the thread-safe list of subscribers. Doing so has the advantage of reducing overhead and improving performance.

·Using Subjects

In the following example, we create a subject, subscribe to that subject and then use the same subject to publish values to the observer. By doing so, we combine the publication and subscription into the same source.

In addition to taking an Observer, the subscribe method also has an overload that takes an action for on_next, which means that the action will be executed every time an item is published. In our sample, whenever on_next is invoked, the item will be written to the console.

 

subject = Subject()

subscription = subject.subscribe(

                lambda x: print("Observer 1: OnNext: ", x),

                lambda ex: print("Observer 1: OnError: ", ex.Message),

                lambda : print("Observer 1: OnCompleted"))

subject.on_next(1)

subject.on_next(2)

subject.on_completed()

subscription.dispose()

 

Testing and Debugging Observable Sequences

·Testing your Rx application

If you have an observable sequence that publishes values over an extended period of time, testing it in real time can be a stretch. The Reactive Extension library provides the Rx.Testing.TestScheduler type to assist testing this kind of time-dependent code without actually waiting for time to pass. The TestScheduler inherits VirtualScheduler and allows you to create, publish and subscribe to sequences in emulated time. For example, you can compact a publication which takes 5 days to complete into a 2 minute run, while maintaining the correct scale. You can also take a sequence which actually has happened in the past (e.g., a sequence of stock ticks for a previous year) and compute or subscribe to it as if it is pushing out new values in real time.

The factory method start executes all scheduled tasks until the queue is empty, or you can specify a time to so that queued-up tasks are only executed to the specified time.

 

·See Also

Creating and Subscribing to Simple Observable Sequences

Querying Observable Collections using LINQ Operators

Using Schedulers

Implementing Your Own Operators for Observable

You can extend Rx by adding new operators for operations that are not provided by the LINQ library, or by creating your own implementation of standard query operators to improve readability and performance. Writing a customized version of a standard LINQ operator is useful when you want to operate with in-memory objects and when the intended customization does not require a comprehensive view of the query.

·Creating New Operators

LINQ offers a full set of operators that cover most of the possible operations on a set of entities. However, you might need an operator to add a particular semantic meaning to your query—especially if you can reuse that same operator several times in your code.

Many existing LINQ operators are in fact built using other basic LINQ operators. For example, the select_many operator is built by composing the select and merge operators.

By reusing existing LINQ operators when you build a new one, you can take advantage of the existing performance or exception handling capabilities implemented in the Rx libraries.

When writing a custom operator, it is good practice not to leave any disposables unused; otherwise, you may find that resources could actually be leaked and cancellation may not work correctly.

·Customizing Existing Operators

Adding new operators to LINQ is a way to extend its capabilities. However, you can also improve code readability by wrapping existing operators into more specialized and meaningful ones.

Last edited Jul 11, 2013 at 9:36 PM by Snesha, version 3

Comments

No comments yet.