Monitor Queue


MonitoredQueue allows you to create a Queue device. The messages in/out of the queue are published on a third socket.

import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from zmq.devices.monitoredqueuedevice import MonitoredQueue
from zmq.utils.strtypes import asbytes
from multiprocessing import Process
import random

frontend_port = 5559
backend_port = 5560
monitor_port = 5562
number_of_workers = 2

MonitoredQueue accepts in/out socket type (type and not socket) like a typical ØMQ device. It also accepts a third socket types which conveniently can be a zmq.PUB type. This allows the communication on in/out socket to be published on a third socket for monitoring purposes.

Also you should read the following: pyzmq and unicode. As it says, PyZMQ is a wrapper for a C library and you should be passing in bytes and not string which in python 3 would be unicode strings. We will correct some of our examples later on for this purpose.

def monitordevice():
    monitoringdevice = MonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, in_prefix, out_prefix)
    monitoringdevice.bind_in("tcp://" % frontend_port)
    monitoringdevice.bind_out("tcp://" % backend_port)
    monitoringdevice.bind_mon("tcp://" % monitor_port)
    monitoringdevice.setsockopt_in(zmq.HWM, 1)
    monitoringdevice.setsockopt_out(zmq.HWM, 1)
    print "Program: Monitoring device has started"

This is a simple server that receives a request and sends a reply.

def server(backend_port):
    print "Program: Server connecting to device"
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.connect("tcp://" % backend_port)
    server_id = random.randrange(1,10005)
    while True:
        message = socket.recv()
        print "Server: Received - %s" % message  
        socket.send("Response from server #%s" % server_id)

This is a simple client that sends a request, receives and prints the reply.

def client(frontend_port, client_id):
    print "Program: Worker #%s connecting to device" % client_id
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://" % frontend_port)
    request_num = 1
    socket.send ("Request #%s from client#%s" % (request_num, client_id))
    #  Get the reply.
    message = socket.recv_multipart()
    print "Client: Received - %s" % message

This is a monitoring client that connects to the publisher socket in the device and publishes the monitoring information.

def monitor():
    print "Starting monitoring process"
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    print "Collecting updates from server..."
    socket.connect ("tcp://" % monitor_port)
    socket.setsockopt(zmq.SUBSCRIBE, "")
    while True:
        string = socket.recv_multipart()
        print "Monitoring Client: %s" % string

Here we just start device, server, client and monitoring clients as separate process.

monitoring_p = Process(target=monitordevice)
server_p = Process(target=server, args=(backend_port,))
monitorclient_p = Process(target=monitor)

for client_id in range(number_of_workers):
    Process(target=client, args=(frontend_port, client_id,)).start()



Program: Server connecting to device
Starting monitoring process
Collecting updates from server...
Program: Worker #0 connecting to device
Program: Worker #1 connecting to device
Server: Received - Request #1 from client#0
Monitoring Client: ['in', '\x00\xcb\xc5J9<$E9\xac\xf6\r:\x82\x92EU', '', 'Request #1 from client#0']
Monitoring Client: ['out', '\x00\xcb\xc5J9<$E9\xac\xf6\r:\x82\x92EU', '', 'Response from server #4431']
Client: Received - ['Response from server #4431']
Server: Received - Request #1 from client#1
Monitoring Client: ['in', "\x00\r'C\x0f\xf6TO\x84\xbe\xe3\x85\xf6(\x07<\xab", '', 'Request #1 from client#1']
Client: Received - ['Response from server #4431']
Monitoring Client: ['out', "\x00\r'C\x0f\xf6TO\x84\xbe\xe3\x85\xf6(\x07<\xab", '', 'Response from server #4431']