본문 바로가기
Mastering Python Design Patterns

The Observer Pattern in Reactive Programming

by 자동매매 2023. 3. 22.

The Observer Pattern in Reactive Programming

In the previous chapter, we covered the last four in our list of behavioral patterns. That chapter also marked the end of the list of patterns presented by the Gang of Four in their book.

Among the patterns we have discussed so far, one is particularly interesting now, for this new chapter: the Observer pattern (covered in Chapter 11, *The Observer Pattern), is useful

for notifying an object or a group of objects when the state of a given object changes. This type of traditional Observer applies the publish-subscribe principle, allowing us to react to some object change events. It provides a nice solution for many cases, but in a situation where we have to deal with many events, some of them depending on each other, the traditional way could lead to complicated, difficult-to-maintain code. That is where another paradigm called reactive programming gives us an interesting option. In simple terms, the concept of reactive programming is to react to many events, streams of events, while keeping our code clean.

In this chapter, we will focus on a framework called ReactiveX (http:/ / reactivex.io), part of reactive programming. The core entity in ReactiveX is called an Observable*.* And as we can see on the official website, ReactiveX is defined as an API for asynchronous programming with observable streams. In addition to that, we also have the Observer.

You can think of an Observable as a stream that can push or emit data to the Observer. And it can also emit events.

The Observer Pattern in Reactive Programming Chapter 195*

Here are two quotes from the documentation available, giving a definition of Observables:

*"An Observable is the core type in ReactiveX. It serially pushes items, known as emissions, through a series of operators until it finally arrives at an Observer, where they are consumed."*

*"Push-based (rather than pull-based) iteration opens up powerful new possibilities to express code and concurrency much more quickly. Because an Observable treats events as data and data as events, composing the two together becomes trivial."*

In this chapter, we will discuss:

  • Real-world examples
    • Use cases
      • Implementation

Real-world examples

In real life, a stream of water that accumulates somewhere resembles an Observable. We have quite a few examples in terms of software:

  • A spreadsheet application can be seen as an example of reactive programming, based on its internal behavior. In virtually all spreadsheet applications, interactively changing any one cell in the sheet will result in immediately reevaluating all formulas that directly or indirectly depend on that cell and updating the display to reflect these reevaluations.
    • The ReactiveX concept is implemented in a variety of languages, including Java (RxJava), Python (RxPY), and JavaScript (RxJS).
      • The Angular Framework uses RxJS to implement the Observable pattern.

Use cases

One use case is the idea of the Collection Pipeline discussed by Martin Fowler on his blog (https:// martinfowler.com/ articles/collection-pipeline):

*"Collection pipelines are a programming pattern where you organize some computation as a sequence of operations which compose by taking a collection as output of one operation and feeding it into the next."*

We can use an Observable to do operations such as map and reduce or groupby on sequences of objects when processing data.

Observables can be created for diverse functions such as button events, requests, and RSS or Twitter feeds.

Implementation

Instead of building a complete implementation here, we will approach different possibilities and see how you can use them.

To get started, install RxPY in your Python environment using the pip install rx command.

A first example

To start, let's take the example from the RxPY documentation and write a more fun variant. We are going to observe a stream built from the Zen of Python quotes by Tim Peters (https:// www.python.org/dev/peps/pep- 0020/).

Normally, you can see the quotes by using import this in the Python console, as you can

see in the following screenshot of the console:

>>> import this

Then, the question is how to get that list of quotes from within a Python program. A quick search gives us an answer on Stack Overflow. Basically, you can redirect the result of the import this statement to an io.StringIO instance and then access it and print it using

print(), as follows:

import contextlib, io

zen = io.StringIO()

with contextlib.redirect_stdout(zen): import this

print(zen.getvalue())

Now, to really be able to start using our example code (in the rx_example1.py file), we need to import Observable and Observer classes from the rx module, as follows:

from rx import Observable, Observer

Then, we create a function, get_quotes(), using the contextlib.redirect_stdout() trick, with the code we previously showed slightly adapted, to get and return the quotes:

def get_quotes():

import contextlib, io

zen = io.StringIO()

with contextlib.redirect_stdout(zen): import this

quotes = zen.getvalue().split('\n')[1:] return quotes

That is done! Now, we want to create an Observable from the list of quotes we get. The way we can do things as follows:

  1. Define a function that hands data items to the Observer
  2. Use an Observable.create() factory, and pass it that function, to set up the source or stream of data
  3. Make the Observer subscribe to the source

The Observer class itself has three methods used for this type of communication:

  • The on_next() is used to pass items
    • The on_completed() will signal that no more items are coming
      • The on_error() signals an error

So let's create a function, push_quotes(), that takes an Observer object obs as input, and, using the sequence of quotes, sends each quote using the on_next() and signals the end (after the last quote has been sent) using on_completed(). The function is as follows:

def push_quotes(obs):

quotes = get_quotes()

for q in quotes:

if q: # skip empty obs.on_next(q) obs.on_completed()

We implement the Observer to be used, using a subclass of the Observer base class, as follows:

class ZenQuotesObserver(Observer):

def on_next(self, value):

print(f"Received: {value}")

def on_completed(self): print("Done!")

def on_error(self, error):

print(f"Error Occurred: {error}")

Next, we define the source to be observed, as follows:

source = Observable.create(push_quotes)

Finally, we define the subscription to the Observable, without which nothing would happen:

source.subscribe(ZenQuotesObserver())

Now we are ready to see the results of the code. Here's what we get when executing python rx_example1.py:

A second example

Let's see (in the rx_example2.py** file) another way to write the code and obtain a similar result as in the first example.

We adopt the get_quotes() function in order to return an enumeration of the sequence, using Python's built-in enumerate() function, as follows:

def get_quotes():

import contextlib, io

zen = io.StringIO()

with contextlib.redirect_stdout(zen): import this

quotes = zen.getvalue().split('\n')[1:] return enumerate(quotes)

We can call that function and store its result in a variable, zen_quotes:

zen_quotes = get_quotes()

We create the Observable using the special Observable.from_() function and chain operations such as filter() on the sequence, and finally use subscribe() to subscribe to the Observable.

That last snippet of code is as follows:

Observable.from_(zen_quotes) \

.filter(lambda q: len(q[1]) > 0) \

.subscribe(lambda value: print(f"Received: {value[0]} - {value[1]}"))

Here's what we get when executing python rx_example2.py:

A third example

Let's see a similar example (in the rx_example3.py** file) where we react to the Observable (the stream of quotes created using the same get_quotes() function), using a chain of flat_map(), filter(), and map() operations.

The main difference from the previous example is that we schedule the streaming of items so that a new item is emitted every five seconds (the interval), using the Observable.interval() function. Furthermore, we use the flat_map() method to map

each emission to an Observable (Observable.from_(zen_quotes) for example) and merge their emissions together into a single Observable.

The main part of the code is as follows:

Observable.interval(5000) \

.flat_map(lambda seq: Observable.from_(zen_quotes)) \

.flat_map(lambda q: Observable.from_(q[1].split())) \

.filter(lambda s: len(s) > 2) \

.map(lambda s: s.replace('.', '').replace(',', '').replace('!', '').replace('-', '')) \

.map(lambda s: s.lower()) \

.subscribe(lambda value: print(f"Received: {value}"))

We also add the following line at the end, using the input() function, to make sure we can stop the execution when the user wants:

input("Starting... Press any key to quit\n")

Let's execute the example using the python rx_example3.py command. Every five seconds, we see new items in the console (which in this case are the words from the quotes that are of at least three characters, stripped from the punctuation characters, and converted into lowercase). Here is the output:

Also, another output screenshot is as follows:

Since we may not have a huge number of words, in this case, it is relatively quick, but you can quit the program by typing any character (followed by the Ctrl key) and re-run it to get a better sense of what happens.

A fourth example

Let's build a stream of the list of people and an Observable based on it.

Here is another trick: To help you handle the data source, we will use a third-party module called Faker (https:/ / pypi.org/project/Faker) to generate the names of people.

You can install the Faker module using the pip install faker command.

In the peoplelist.py file, we have the following code, with a populate() function that leverages a Faker instance to generate the first name and last name of fictitious people:

from faker import Faker fake = Faker()

def populate():

persons = []

for _ in range(0, 20):

p = {'firstname': fake.first_name(), 'lastname': fake.last_name()} persons.append(p)

return iter(persons)

For the main part of the program, we write the names of the people in the list that was generated, in the text file people.txt:

if __name__ == '__main__':

new_persons = populate()

new_data = [f"{p['firstname']} {p['lastname']}" for p in new_persons] new_data = ", ".join(new_data) + ", "

with open('people.txt', 'a') as f: f.write(new_data)

Before going further, let's pause and strategize! It seems a good idea to do things in an incremental way, since there are a lot of new concepts and techniques flying around. So we will work on a first implementation for our Observable, and later extend it.

In the fx_peoplelist_1.py** file, let's write the first version of our code.

First, we define a function, firstnames_from_db(), which returns an Observable from the text file (reading the content of the file) containing the names, with transformations (as we have already seen) using flat_map(), filter(), and map() methods, and a new

operation, group_by(), to emit items from another sequence—the first name found in the file, with its number of occurrence:

from rx import Observable

def firstnames_from_db(file_name): file = open(file_name)

  • collect and push stored people firstnames

return Observable.from_(file) \

.flat_map(lambda content: content.split(', ')) \

.filter(lambda name: name!='') \

.map(lambda name: name.split()[0]) \

.group_by(lambda firstname: firstname) \

.flat_map(lambda grp: grp.count().map(lambda ct: (grp.key, ct)))

Then we define an Observable, as in the previous example, which emits data every five seconds, merging its emission with what is returned from

firstnames_from_db(db_file), after setting the db_file to the people.txt file, as follows:

db_file = "people.txt"

  • Emit data every 5 seconds

Observable.interval(5000) \

.flat_map(lambda i: firstnames_from_db(db_file)) \ .subscribe(lambda value: print(str(value)))

input("Starting... Press any key to quit\n")

Now, let's see what happens when we execute both programs (peoplelist.py and rx_peoplelist_1.py).

From one command-line window or terminal, you can generate the people's names by executing python peoplelist.py. The people.txt file is created with the names in it, separated by a comma. Each time you run that command again, a new set of names is added to the file.

From a second command-line window, you can run the first version of the program that implements Observable via the python rx_peoplelist_1.py command. You will get an output similar to this:

By re-executing the first command several times and monitoring what happens in the second window, we can see that the people.txt file is continuously read to extract the names, get the first names from those full names, and make the transformation needed to push the items consisting of the first names, each with their number of occurrences.

To improve what we just achieved, we will try to get an emission of only the first names that are present at least four times. In the rx_peoplelist_2.py file, we need another function to return an Observable and filter it that way. Let's call that function frequent_firstnames_from_db(). Compared to the one we used in the first version, we have to use the filter() operator to only keep the first name groups for which the count

of occurrences (ct) value is bigger than three. If you check the code again, based on the group obtained, we get a tuple containing the group's key as the first element and the count as the second element using the Lambda function lambda grp:

grp.count().map(lambda ct: (grp.key, ct)) which is emitted thanks to the .flat_map() operator. So the next thing to do, in the function, is to further filter using .filter(lambda name_and_ct: name_and_ct[1] > 3) in order to only get first

names that currently appear at least four times.

Here is the code for this new function:

def frequent_firstnames_from_db(file_name): file = open(file_name)

  • collect and push only the frequent firstnames

return Observable.from_(file) \

.flat_map(lambda content: content.split(', ')) \

.filter(lambda name: name!='') \

.map(lambda name: name.split()[0]) \

.group_by(lambda firstname: firstname) \

.flat_map(lambda grp: grp.count().map(lambda ct: (grp.key, ct))) \ .filter(lambda name_and_ct: name_and_ct[1] > 3)

And we add almost the same code for the interval Observable; we only change the name of the referenced function accordingly. The new code for that last bit (as can be seen in

the rx_peoplelist_2.py** file) is as follows:

  • Emit data every 5 seconds

Observable.interval(5000) \

.flat_map(lambda i: frequent_firstnames_from_db(db_file)) \ .subscribe(lambda value: print(str(value)))

  • Keep alive until user presses any key input("Starting... Press any key to quit\n")

Using the same protocol to execute the example, when you run the python rx_peoplelist_2.py command in the second window or terminal, you should get an output similar to this:

We can see the pairs of first names and counts emitted, and there is new emission every five seconds, and that values change whenever we re-run the peoplelist program (using python peoplelist.py) from the first shell window. Nice result!

Finally, in the rx_peoplelist_3.py file, we reuse most of the code to show just a variant of the previous example, with a small change: the use of the distinct() operation applied

to the interval Observable, just before the Observer's subscription to it. The new version of that code snippet is as follows:

  • Emit data every 5 seconds, but only if it changed Observable.interval(5000) \

.flat_map(lambda i: frequent_firstnames_from_db(db_file)) \ .distinct() \

.subscribe(lambda value: print(str(value)))

Similar to what we did before, when executing python rx_peoplelist_3.py you should get an output similar to this:

Do you notice the difference?

Again, we can see the emission of pairs of the first name and count. But this time, there is less change happening, and depending on the case, you may even have the impression that the data does not change by more than 10 seconds. To see more change, you have to re-run the peoplelist.py program, maybe several times, so that the count of a few frequent first names can increment. The explanation to that behavior is that, since we added

the .distinct() operation to the interval Observable, the value of an item is emitted only

if it has changed. That's why we have less data being emitted, and for each first name, we do not see the same count twice.

With this examples series, we discovered how Observables offer a clever way to do things that are difficult to do using traditional patterns, and that is very nice! We just scratched the surface though, and this is an opportunity for the interested reader to pursue the exploration of ReactiveX and reactive programming.

Summary

In this chapter, we introduced the Observer pattern in reactive programming.

The core idea of this type of Observer pattern is to react to a stream of data and events, as with the stream of water we see in nature. We have lots of examples in the computing world of this idea or the ReactiveX technique through its extensions for the programming languages (RxJava, RxJS, RxPY, and so on). Modern JavaScript frameworks such as Angular are other examples we mentioned.

We have discussed examples of RxPY that can be used to build functionality, and which serve as an introduction for the reader to approach this programming paradigm and continue their own research via the RxPY official documentation and examples as well as existing code that one may find on GitHub.

In the next chapter, we will cover the Microservices pattern and other patterns for the Cloud.

[ 195 ])

  • 5

'Mastering Python Design Patterns' 카테고리의 다른 글

Other Behavioral Patterns  (0) 2023.03.22
The State Pattern  (0) 2023.03.22
The Observer Pattern  (0) 2023.03.22
The Command Pattern  (0) 2023.03.22
The Chain of Responsibility  (0) 2023.03.22

댓글