Forwarder¶
Forwarder device
Just like QUEUE, which is like the request-reply broker, FORWARDER is like the pub-sub proxy server. It allows both publishers and subscribers to be moving parts and it self becomes the stable hub for interconnecting them.
FORWARDER collects messages from a set of publishers and forwards these to a set of subscribers.
You will notice that two zmq sockets, pub and sub are bound to well known ports. The frontend speaks to publishers and the backend speaks to subscribers. You should use ZMQ_FORWARDER with a ZMQ_SUB socket for the frontend and a ZMQ_PUB socket for the backend.
Another important thing to notice is that we want all the published messages to reach to the various subscribers, hence message filtering should be off in the forwarder device. See line no 11.
forwarder_device.py
import zmq
def main():
try:
context = zmq.Context(1)
# Socket facing clients
frontend = context.socket(zmq.SUB)
frontend.bind("tcp://*:5559")
frontend.setsockopt(zmq.SUBSCRIBE, "")
# Socket facing services
backend = context.socket(zmq.PUB)
backend.bind("tcp://*:5560")
zmq.device(zmq.FORWARDER, frontend, backend)
except Exception, e:
print e
print "bringing down zmq device"
finally:
pass
frontend.close()
backend.close()
context.term()
if __name__ == "__main__":
main()
Only thing that changes here is that publisher connects to the intermediary and is not bound to any well known port.
forwarder_server.py
import zmq
import random
import sys
import time
port = "5559"
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect("tcp://localhost:%s" % port)
publisher_id = random.randrange(0,9999)
while True:
topic = random.randrange(1,10)
messagedata = "server#%s" % publisher_id
print "%s %s" % (topic, messagedata)
socket.send("%d %s" % (topic, messagedata))
time.sleep(1)
The subscribers are completely unaffected by introduction of intermediary - “forwarder device” and gains the ability to get messages from different publishers at no cost.
forwarder_subscriber.py
import sys
import zmq
port = "5560"
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
print "Collecting updates from server..."
socket.connect ("tcp://localhost:%s" % port)
topicfilter = "9"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)
for update_nbr in range(10):
string = socket.recv()
topic, messagedata = string.split()
print topic, messagedata
Executing these programs from separate shell:
python forwarder_device.py
python forwarder_subscriber.py
python forwarder_server.py
python forwarder_server.py
Output on the subscriber:
Collecting updates from server...
9 server#3581
9 server#9578
9 server#3581
9 server#9578
9 server#3581
9 server#9578
9 server#3581
9 server#3581
9 server#9578
9 server#3581