본문 바로가기
Practical Python Design Patterns

PUBLISH–SUBSCRIBE PATTERN

by 자동매매 2023. 3. 29.

CHAPTER PUBLISH–SUBSCRIBE PATTERN

def unregister(self, callback):

self.callbacks.discard(callback)

def unregister_all(self):

self.callbacks = set()

def poll_for_change(self): if self.changed: self.update_all

def update_all(self):

for callback in self.callbacks:

callback(self)

Although the observer pattern allows us to decouple the objects that are observed from knowing anything about the objects observing them, the observer objects still need to know which objects they need to observe. So, we still have more coupling than we saw in the previous chapter, where we were able to change the view code without needing

to change the controller or model code, and the same goes for the model code. Even the controller, which sits between the models and the views, is fairly decoupled from both. When we changed the view, we had no need to also update the controller, and vice versa.

The observer pattern we dealt with previously works well when a number of potential observable classes that an observer needs to register with is limited. As with many things in programming, we are often faced with challenges that do not comply with the ideal case, and so we need to adapt.

What we are going in search of in this chapter is a way to decouple the observer from the observable even more. We want neither the observer nor the observable to know anything about each other. Each class and its instances should be able to change without any changes being needed on the other side of the equation. New observers and observables should not require alterations in the code of the other classes in the system, as this would cause our code to be less flexible. In an attempt to reach this ideal, we will be looking at ways to extend the one-to-many methodologies of the observer pattern to

a many-to-many relationship between observables and observers, such that one class of objects does not need to know much about the other.

These blind observables will be called publishers, and the disconnected observers will be named subscribers.

316
CHAPTER PUBLISH–SUBSCRIBE PATTERN

The first step might seem trivial, but you will soon realize how simple it is to change the way you think about things by just changing what you call them.

class Subscriber(object):

def update(self, observed):

print("Observing: {}".format(observed))

class Publisher(object):

def __init__(self):

self.callbacks = set() self.changed = False

def register(self, callback):

self.callbacks.add(callback)

def unregister(self, callback):

self.callbacks.discard(callback)

def unregister_all(self):

self.callbacks = set()

def poll_for_change(self): if self.changed: self.update_all

def update_all(self):

for callback in self.callbacks:

callback(self)

Like I said, this makes no difference in what the program can actually do, but what it does do is help us see things differently. We could now take another step forward and rename the methods in both these classes to reflect the publisher subscriber model more closely.

317
CHAPTER PUBLISH–SUBSCRIBE PATTERN

class Subscriber(object):

def process(self, observed):

print("Observing: {}".format(observed))

class Publisher(object):

def __init__(self):

self.subscribers = set()

def subscribe(self, subscriber):

self.subscribers.add(subscriber)

def unsubscribe(self, subscriber):

self.subscribers.discard(subscriber)

def unsubscribe_all(self):

self.subscribers = set()

def publish(self):

for subscriber in self.subscribers: subscriber.process(self)

Now it is clear that we are dealing with two classes, one that deals with publishing and another focused completely on processing something. In the code snippet, it is not yet clear what should be published and what should be processed. We proceed in our attempt to clear up any uncertainty in our model definition by adding a Message class that could be published and processed.

class Message(object):

def __init__(self):

self.payload = None

class Subscriber(object):

def process(self, message):

print("Message: {}".format(message.payload))

318
CHAPTER PUBLISH–SUBSCRIBE PATTERN

class Publisher(object):

def __init__(self):

self.subscribers = set()

def subscribe(self, subscriber):

self.subscribers.add(subscriber)

def unsubscribe(self, subscriber):

self.subscribers.discard(subscriber)

def unsubscribe_all(self):

self.subscribers = set()

def publish(self, message):

for subscriber in self.subscribers: subscriber.process(message)

Next, we remove the final link between the publisher and the subscriber by adding a single dispatcher class through which all messages will pass. The aim is to have a single location to which publishers send messages. The same location keeps an index of all the subscribers. The result is that the number of publishers and subscribers can vary without having any impact on the rest of the system. This gives us a very clean and decoupled architecture to work with.

class Message(object):

def __init__(self):

self.payload = None

class Subscriber(object):

def __init__(self, dispatcher):

dispatcher.subscribe(self)

def process(self, message):

print("Message: {}".format(message.payload))

class Publisher(object):

def __init__(self, dispatcher):

self.dispatcher = dispatcher

319
CHAPTER PUBLISH–SUBSCRIBE PATTERN

def publish(self, message):

self.dispatcher.send(message)

class Dispatcher(object):

def __init__(self):

self.subscribers = set()

def subscribe(self, subscriber):

self.subscribers.add(subscriber)

def unsubscribe(self, subscriber):

self.subscribers.discard(subscriber)

def unsubscribe_all(self):

self.subscribers = set()

def send(self, message):

for subscriber in self.subscribers: subscriber.process(message)

Not all subscribers are interested in all the messages from all publishers. Since the point of this whole exercise was to decouple subscribers from publishers, we do not want to couple them together in any way. The solution we are looking for should allow the dispatcher to send categories of messages to specific subscribers. We are going to add a topic to the messages. Publishers will now add a topic to the messages they publish, and subscribers can then subscribe to specific topics.

class Message(object):

def __init__(self):

self.payload = None self.topic = "all"

class Subscriber(object):

def __init__(self, dispatcher, topic):

dispatcher.subscribe(self, topic)

def process(self, message):

print("Message: {}".format(message.payload))

320
CHAPTER 21 PUBLISH–SUBSCRIBE PATTERN

class Publisher(object):

def __init__(self, dispatcher):

self.dispatcher = dispatcher

def publish(self, message):

self.dispatcher.send(message)

class Dispatcher(object):

def __init__(self):

self.topic_subscribers = dict()

def subscribe(self, subscriber, topic):

self.topic_subscribers.setdefault(topic, set()).add(subscriber)

def unsubscribe(self, subscriber, topic):

self.topic_subscribers.setdefault(topic, set()).discard(subscriber)

def unsubscribe_all(self, topic):

self.subscribers = self.topic_subscribers[topic] = set()

def send(self, message):

for subscriber in self.topic_subscribers[message.topic]:

subscriber.process(message)

def main():

dispatcher = Dispatcher()

publisher_1 = Publisher(dispatcher)

subscriber_1 = Subscriber(dispatcher, ’topic1’)

message = Message() message.payload = "My Payload" message.topic = ’topic1’

publisher_1.publish(message)

if __name__ == "__main__":

main()

Now we see the code resulting in the message (shown below) printed by the subscriber. Message: My Payload

321
CHAPTER PUBLISH–SUBSCRIBE PATTERN

Distributed Message Sender

Sending messages from many machines. Now that we have arrived at a clean implementation of the PubSub pattern, let s use the ideas we have developed so far to build our own simple message sender. For this, we are going to use the socket package from the Python standard library. We use XML-RPC to handle the remote connections; it lets us make calls to remote procedures as if they were part of the local system. In order to make our dispatcher work with messages, we need to send dictionaries as parameters rather than as Python objects, which we do. The code below illustrates how we do that.

dispatcher.py

from xmlrpc.client import ServerProxy

from xmlrpc.server import SimpleXMLRPCServer

class Dispatcher(SimpleXMLRPCServer):

def __init__(self):

self.topic_subscribers = dict()

super(Dispatcher, self).__init__(("localhost", 9000)) print("Listening on port 9000...")

self.register_function(self.subscribe, "subscribe") self.register_function(self.unsubscribe, "unsubscribe") self.register_function(self.unsubscribe_all, "unsubscribe_all")

self.register_function(self.send, "send")

def subscribe(self, subscriber, topic):

print(’Subscribing {} to {}’.format(subscriber, topic)) self.topic_subscribers.setdefault(topic, set()).add(subscriber) return "OK"

def unsubscribe(self, subscriber, topic):

print(’Unsubscribing {} from {}’.format(subscriber, topic)) self.topic_subscribers.setdefault(topic, set()).discard(subscriber) return "OK"

def unsubscribe_all(self, topic):

print(’unsubscribing all from {}’.format(topic))

322
CHAPTER PUBLISH–SUBSCRIBE PATTERN

self.subscribers = self.topic_subscribers[topic] = set() return "OK"

def send(self, message):

print("Sending Message:\nTopic: {}\nPayload: {}". format(message["topic"], message["payload"]))

for subscriber in self.topic_subscribers [message.get("topic", "all")]:

with ServerProxy(subscriber) as subscriber_proxy:

subscriber_proxy.process(message)

return "OK"

def main():

dispatch_server = Dispatcher() dispatch_server.serve_forever()

if __name__ == "__main__":

main()

publisher.py

from xmlrpc.client import ServerProxy

class Publisher(object):

def __init__(self, dispatcher):

self.dispatcher = dispatcher

def publish(self, message):

with ServerProxy(self.dispatcher) as dispatch:

dispatch.send(message)

def main():

message = {"topic": "MessageTopic", "payload": "This is an awesome payload"}

publisher = Publisher("http://localhost:9000") publisher.publish(message)

if __name__ == "__main__":

main()

323
CHAPTER PUBLISH–SUBSCRIBE PATTERN

subscriber.py

from xmlrpc.client import ServerProxy

from xmlrpc.server import SimpleXMLRPCServer

class Subscriber(SimpleXMLRPCServer):

def __init__(self, dispatcher, topic):

super(Subscriber, self).__init__(("localhost", 9001)) print("Listening on port 9001...") self.register_function(self.process, "process")

self.subscribe(dispatcher, topic)

def subscribe(self, dispatcher, topic):

with ServerProxy(dispatcher) as dispatch:

dispatch.subscribe("http://localhost:9001", topic)

def process(self, message):

print("Message: {}".format(message.get("payload", "Default message")))

return "OK"

def main():

subscriber_server = Subscriber("http://localhost:9000", "MessageTopic") subscriber_server.serve_forever()

if __name__ == "__main__":

main()

To run the program and see its true effects, you have to open three terminal windows and activate the virtual environment for your code in each of the windows. We will run each part of the application in its own window to simulate the interaction between independent systems. Each command that follows assumes that the virtualenv is already activated.

window 1: dispatcher

$ python dispatcher.py

window 2: subscriber $ python subscriber.py

324
CHAPTER PUBLISH–SUBSCRIBE PATTERN

window 3: publisher $ python publisher.py

With each of the windows running its process, let s enter a message into the publisher window and see what happens.

We see the dispatcher received the message and sent it to the subscriber.

Listening on port 9000...

Subscribing http://localhost:9001 to MessageTopic

127.0.0.1 - - [16/Aug/2017 20:59:06] "POST /RPC2 HTTP/1.1" 200 - Sending Message:

Topic: MessageTopic

Payload: This is an awesome payload

127.0.0.1 - - [16/Aug/2017 20:59:09] "POST /RPC2 HTTP/1.1" 200 -

The subscriber receives the message and prints the message.

Listening on port 9001...

Message: This is an awesome payload

127.0.0.1 - - [16/Aug/2017 20:59:09] "POST /RPC2 HTTP/1.1" 200 -

Parting Shots

The PubSub pattern has many uses, especially as systems expand and grow into the cloud. This process of growing, expanding, and evolving is one of the reasons I chose to start with the simple implementation of the observer pattern we saw in Chapter

14 and then move through the steps of improved design to the final full PubSub implementation. This is the same pattern of growth that you will repeat in your own projects. A simple project grows a little bit, and with every bit of functionality that you add, you should be asking yourself what ideas from this book, or elsewhere, will make this project easier to maintain in the future. Once you hit upon such an idea, do not hesitate improve the code. You will quickly come to realize that a bit of time spent at the tail end of a project will make your development time significantly shorter in future phases of the project.

325
CHAPTER PUBLISH–SUBSCRIBE PATTERN

As Jocko Willink (Navy SEAL Commander - Task Unit Bruser) likes to say, discipline equals freedom. The discipline you apply to your coding practice will lead to freedom from the pain of maintaining cumbersome systems. You will also avoid the inevitable dump and rewrite that most software products head for from the moment the first line of code is written.

Exercises

See if you can find information about proto buffers and how they

could be used to send data to remote procedure calls.

Read up on queueing software like ZeroMQ and RabbitMQ, together

with packages like Celery. Implement the simple chat application using one of these instead of the Python implementation from this chapter.

Write a proposal for how you would use the publish subscribe

pattern to enable the addition of gamification to an existing Software as a service - SaaS application.

326

'Practical Python Design Patterns' 카테고리의 다른 글

Design Pattern Quick Reference  (0) 2023.03.29
Model-View-Controller Pattern  (0) 2023.03.29
Visitor Pattern  (0) 2023.03.29
Template Method Pattern  (0) 2023.03.29
Strategy Pattern  (0) 2023.03.29

댓글