Pub/Sub pattern

Publish/Subscribe is another classic pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers. Messages are published without the knowledge of what or if any subscriber of that knowledge exists.

Scenario #2 is more known, general pattern where multiple subscribers subscribes to messages/topics being published by a publisher. It is scenario #1 which is more interesting. Just like ZMQ.REQ which can connect to multiple ZMQ.REP, ZMQ.SUB can connect to multiple ZMQ.PUB (publishers). No single publisher overwhelms the subscriber. The messages from both publishers are interleaved.


Publishers are created with ZMQ.PUB socket types

import zmq
import random
import sys
import time

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)

Data is published along with a topic. The subscribers usually sets a filter on these topics for topic of their interests.

while True:
    topic = random.randrange(9999,10005)
    messagedata = random.randrange(1,215) - 80
    print "%d %d" % (topic, messagedata)
    socket.send("%d %d" % (topic, messagedata))

Subscribers are created with ZMQ.SUB socket types. You should notice that a zmq subscriber can connect to many publishers.

import sys
import zmq

port = "5556"
if len(sys.argv) > 1:
    port =  sys.argv[1]
if len(sys.argv) > 2:
    port1 =  sys.argv[2]

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Collecting updates from weather server..."
socket.connect ("tcp://localhost:%s" % port)

if len(sys.argv) > 2:
    socket.connect ("tcp://localhost:%s" % port1)

The current version of zmq supports filtering of messages based on topics at subscriber side. This is usually set via socketoption.

# Subscribe to zipcode, default is NYC, 10001
topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)

# Process 5 updates
total_value = 0
for update_nbr in range (5):
    string = socket.recv()
    topic, messagedata = string.split()
    total_value += int(messagedata)
    print topic, messagedata

print "Average messagedata value for topic '%s' was %dF" % (topicfilter, total_value / update_nbr)

Pub/Sub communication is asynchronous. If a “publish” service has been started already and then when you start “subscribe” service, it would not receive a number of message that was published already by the pub services. Starting “publisher” and “subscriber” is independent of each other.

A subscriber can in fact connect to more than one publisher, using one ‘connect’ call each time. Data will then arrive and be interleaved so that no single publisher drowns out the others.:

python 5556
python 5546
python 5556 5546

Other things to note:

  • A publisher has no connected subscribers, then it will simply drop all messages.
  • If you’re using TCP, and a subscriber is slow, messages will queue up on the publisher.
  • In the current versions of ØMQ, filtering happens at the subscriber side, not the publisher side.