Queue¶
Here we will use the ProcessDevice to create a QUEUE device for connecting client and server.
queuedevice.py
import time
import zmq
from zmq.devices.basedevice import ProcessDevice
from multiprocessing import Process
import random
frontend_port = 5559
backend_port = 5560
number_of_workers = 2
As noted earlier, we do not pass socket instance but socket type to ProcessDevice. Also here, we observe the constraint on request/reply pattern by setting the high water mark to 1.
queuedevice = ProcessDevice(zmq.QUEUE, zmq.XREP, zmq.XREQ)
queuedevice.bind_in("tcp://127.0.0.1:%d" % frontend_port)
queuedevice.bind_out("tcp://127.0.0.1:%d" % backend_port)
queuedevice.setsockopt_in(zmq.HWM, 1)
queuedevice.setsockopt_out(zmq.HWM, 1)
queuedevice.start()
time.sleep (2)
Server waits on a request to which it replies.
def server(backend_port):
print "Connecting a server to queue device"
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.connect("tcp://127.0.0.1:%s" % backend_port)
server_id = random.randrange(1,10005)
while True:
message = socket.recv()
print "Received request: ", message
socket.send("Response from %s" % server_id)
Client makes a request and waits for a reply.
def client(frontend_port, client_id):
print "Connecting a worker #%s to queue device" % client_id
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://127.0.0.1:%s" % frontend_port)
# Do 10 requests, waiting each time for a response
for request in range (1,5):
print "Sending request #%s" % request
socket.send ("Request fron client: %s" % client_id)
# Get the reply.
message = socket.recv()
print "Received reply ", request, "[", message, "]"
We have already started our device. Now we will bring up the server, before bringing up the client. Clients make a few request to server connected to our device.
Process(target=server, args=(backend_port,)).start()
time.sleep(2)
for client_id in range(number_of_workers):
Process(target=client, args=(frontend_port, client_id,)).start()
Output:
Connecting a server to queue device
Connecting a worker #0 to queue device
Sending request #1
Connecting a worker #1 to queue device
Received request: Request fron client: 0
Received reply 1 [ Response from 6548 ]
Sending request #2
Received request: Request fron client: 0
Sending request #1
Received reply 2 [ Response from 6548 ]
Sending request #3
Received request: Request fron client: 0
Received request: Request fron client: 1
Received reply 3 [ Response from 6548 ]
Sending request #4
Received request: Request fron client: 0
Received reply 1 [ Response from 6548 ]
Sending request #2
Received request: Request fron client: 1
Received reply 4 [ Response from 6548 ]
Received reply 2 [ Response from 6548 ]
Sending request #3
Received request: Request fron client: 1
Received reply 3 [ Response from 6548 ]
Sending request #4
Received request: Request fron client: 1
Received reply 4 [ Response from 6548 ]