中间件

就像生意里的批发商,渠道商,从上游进货,发售给下游,其存在是为了让复杂的网络变简单,更高效.

在 ZeroMQ 里,根据使用的背景不同,名称可多了:proxies, queues, forwarders, device, or brokers…

下面是列举了一些实际场景,看 Zeromq 怎么解决:

m-n Pub-sub(The Dynamic Discovery Problem)

最简单的情况 1 pub <-> n subs,但是 m to n 呢?

2018-09-12-09-24-27

直接上原文好了,和 push-pull 有区别吗?mute 的策略不同.

We need XPUB and XSUB sockets because ZeroMQ does subscription forwarding from subscribers to publishers. XSUB and XPUB are exactly like SUB and PUB except they expose subscriptions as special messages. The proxy has to forward these subscription messages from subscriber side to publisher side, by reading them from the XSUB socket and writing them to the XPUB socket. This is the main use case for XSUB and XPUB.

publisher:

import zmq
import time

context = zmq.Context()
sender = context.socket(zmq.PUB)
sender.connect("tcp://127.0.0.1:5559")

# send 3 but only 2 received by peer.
for i in range(3):
    sender.send(b'Hello there')
    time.sleep(1)

subscriber:

import zmq

context = zmq.Context()
receiver = context.socket(zmq.SUB)
# Subscribe any topic
receiver.setsockopt(zmq.SUBSCRIBE, b'')

receiver.connect("tcp://127.0.0.1:5560")
while True:
    msg = receiver.recv_multipart()
    print('Subscriber got ', msg)

proxy:

import zmq

context = zmq.Context()

frontend = context.socket(zmq.XSUB)
backend = context.socket(zmq.XPUB)

# Frontend as client , means forwarding one net to another.
# Frontend work as server, means a simple queue or broker
# fontend.connect("tcp://127.0.0.1:5559")
frontend.bind("tcp://127.0.0.1:5559")
backend.bind("tcp://127.0.0.1:5560")

zmq.proxy(frontend, backend)

# We never get here…
frontend.close()
backend.close()
context.term()

第 1 条 pub message 丢失

上面的例子是 subscriber 先运行,再 proxy,最后 publisher.subscriber 先运行时,publisher 的第 1 条消息并没有收到, 更稳健的做法,增加一个 req-rep 的通知渠道,进行 1 次握手.

2018-09-12-17-26-54

这个REQ-REP也可以直接用 proxy 的方式.subscriber 收到 publisher 的 hello 后,给个 yes,然后 publisher 才发送真正的消息.

stable-subscriber:

import zmq

context = zmq.Context()

receiver = context.socket(zmq.SUB)

# Subscribe any topic
receiver.setsockopt(zmq.SUBSCRIBE, b'')
receiver.connect("tcp://127.0.0.1:5560")

controller = context.socket(zmq.REP)
controller.connect("tcp://127.0.0.1:5562")

req = controller.recv()
print('req = ', req)
if req == b'Hello':
    controller.send(b'yes')
    while True:
        msg = receiver.recv_multipart()
        print('Subscriber got ', msg)
else:
    print('Some error')

stable-publisher:

import zmq
import time

context = zmq.Context()

sender = context.socket(zmq.PUB)
sender.connect("tcp://127.0.0.1:5559")

controller = context.socket(zmq.REQ)
controller.connect("tcp://127.0.0.1:5561")

controller.send(b'Hello')
rep = controller.recv()
print('rep = ', rep)
if rep == b'yes':
    for i in range(3):
        sender.send(b'Blablabla')
else:
    print("Some error")

rep-req proxy:

import zmq

context = zmq.Context()

frontend = context.socket(zmq.ROUTER)
backend = context.socket(zmq.DEALER)

# Frontend as client , means forwarding one net to another.
# Frontend work as server, means a simple queue or broker
# fontend.connect("tcp://127.0.0.1:5559")
frontend.bind("tcp://127.0.0.1:5561")
backend.bind("tcp://127.0.0.1:5562")

zmq.proxy(frontend, backend)

# We never get here…
frontend.close()
backend.close()
context.term()

pub-sub proxy 和上面一致.

还可以进一步扩展下,publisher 可根据收到的回应。控制 subsriber 的数量等.就是前面讲过的Node Cordinnation

Transport Bridging(Forwarder)

和上面唯一不同,就是 proxy 里的 XSUB 变成了 client,即 XSUB connect 到 PUB(server)上,然后 forward 到 external network. 也可以叫bridge,作用就是从 1 个网络到另 1 个网络的中间件.

也可以叫 network 的 forward,即 connect 10.1.1.0:8100的 subscirber 也能和内部192.168.55.210:5556网一样 subscribe 相同的 topic.

2018-09-12-11-15-28

Req-rep Shared Queue (DEALER and ROUTER sockets)

REQ-REP模型仅适用的简单的场景,而 DEALER 和 ROUTER 其实是扩展REQ-REP的功能,比如模拟一个类似 AMQP 里的 borker,具有 routing 等功能, 实现了 many-to-many 的REQ-REP通信模型,需要增加服务能力的时候,只需要增加borker就可以扩展系统性能.

REQ(client)只需要和 ROUTER 对话

REP(server)只需要和 DEALER 对话;

中间的 broker 来做负载均衡,消息路由等.逻辑业务并没有改变,但 server 的能力大大增强,反向代理就这么个玩法?

各种 client 的请求是怎么分发到不同的 server backend?这个就是策略问题了.

在云框架下,可不只是 web 反向代理这么简单,应该是非常常见的:

2018-09-11-10-09-47

来个直接实现的 borker:

import zmq

# context = zmq.Context)
context = zmq.Context.instance()

frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://127.0.0.1:5559")

backend = context.socket(zmq.DEALER)
backend.bind("tcp://127.0.0.1:5560")

# Initialize poll set
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)

# router got req, forward to dealer
# dealer got rep , forward to router
# multiply req and rep and can be welly handled .
while True:
    socks = dict(poller.poll())

    if socks.get(frontend) == zmq.POLLIN:
        message = frontend.recv_multipart()
        backend.send_multipart(message)

    if socks.get(backend) == zmq.POLLIN:
        message = backend.recv_multipart()
        frontend.send_multipart(message)

Asynchronous client/server

这个和上面的区别在于用 DEALER 代替 client 的 REQ 和 server 的 REP,实现全部消息来回的异步 client-server,更灵活,不再局限于 REQ-REP 的 send.recv 的模式.

asyncsrv实现:

import zmq
import sys
import threading
import time
from random import randint, random

__author__ = "Felipe Cruz <felipecruz@loogica.net>"
__license__ = "MIT/X11"

def tprint(msg):
    """like print, but won't get newlines confused with multiple threads"""
    sys.stdout.write(msg + '\n')
    sys.stdout.flush()

class ClientTask(threading.Thread):
    """ClientTask"""
    def __init__(self, id):
        self.id = id
        threading.Thread.__init__ (self)

    def run(self):
        context = zmq.Context()
        socket = context.socket(zmq.DEALER)
        identity = u'----%d' % self.id
        socket.identity = identity.encode('ascii')
        socket.connect('tcp://localhost:5570')
        print('Client %s started' % (identity))
        poll = zmq.Poller()
        poll.register(socket, zmq.POLLIN)
        reqs = 0
        while True:
            reqs = reqs + 1
            print('Req #%d sent..' % (reqs))
            socket.send_string(u'request #%d' % (reqs))
            for i in range(5):
                sockets = dict(poll.poll(1000))
                if socket in sockets:
                    msg = socket.recv()
                    tprint('Client %s received: %s' % (identity, msg))

        socket.close()
        context.term()

class ServerTask(threading.Thread):
    """ServerTask"""
    def __init__(self):
        threading.Thread.__init__ (self)

    def run(self):
        context = zmq.Context()
        frontend = context.socket(zmq.ROUTER)
        frontend.bind('tcp://*:5570')

        backend = context.socket(zmq.DEALER)
        backend.bind('inproc://backend')

        workers = []
        for i in range(5):
            worker = ServerWorker(context)
            worker.start()
            workers.append(worker)

        zmq.proxy(frontend, backend)

        frontend.close()
        backend.close()
        context.term()

class ServerWorker(threading.Thread):
    """ServerWorker"""
    def __init__(self, context,id):
        threading.Thread.__init__ (self)
        self.context = context
        self.id = id

    def run(self):
        worker = self.context.socket(zmq.DEALER)
        worker.connect('inproc://backend')
        tprint('Worker-%d started'%self.id)
        while True:
            ident, msg = worker.recv_multipart()
            tprint('Worker-%d received %s from %s' % (self.id,msg, ident))
            replies = randint(0,4)
            for i in range(replies):
                time.sleep(1. / (randint(1,10)))
                worker.send_multipart([ident, msg])

        worker.close()

def main():
    """main function"""
    server = ServerTask()
    server.start()
    for i in range(3):
        client = ClientTask(i)
        client.start()

    server.join()

if __name__ == "__main__":
    main()

上面代码的架构是 DEALER<->(DEALER<->ROUTER)<->DEALER

输出

Worker-0 started
Client ---0 started
Req #1 sent..
Client ---1 started
Req #1 sent..
Worker-1 started
Worker-2 started
Worker-3 started
Worker-4 started
Worker-server-0 received b'request #1' from b'---0'
Worker send 3 replys
Worker-server-1 received b'request #1' from b'---1'
Worker send 4 replys
Client ---2 started
Req #1 sent..
Worker-server-2 received b'request #1' from b'---2'
Worker send 0 replys
Client ---0 received: b'request #1'
Client ---1 received: b'request #1'
Client ---1 received: b'request #1'
Client ---0 received: b'request #1'
Client ---1 received: b'request #1'
Client ---1 received: b'request #1'
Client ---0 received: b'request #1'
Req #2 sent..
Worker-server-3 received b'request #2' from b'---1'
Worker send 4 replys
Client ---1 received: b'request #2'
Client ---1 received: b'request #2'
Client ---1 received: b'request #2'
Client ---1 received: b'request #2'
Req #2 sent..
Worker-server-4 received b'request #2' from b'---0'
Worker send 1 replys
Client ---0 received: b'request #2'
Req #3 sent..
Worker-server-0 received b'request #3' from b'---1'
Worker send 3 replys
Client ---1 received: b'request #3'
Client ---1 received: b'request #3'
Client ---1 received: b'request #3'
Req #2 sent..
Worker-server-1 received b'request #2' from b'---2'
Worker send 0 replys
Req #4 sent..
Worker-server-2 received b'request #4' from b'---1'
Worker send 3 replys
Client ---1 received: b'request #4'
Client ---1 received: b'request #4'
Req #3 sent..
Worker-server-3 received b'request #3' from b'---0'
Worker send 1 replys
Client ---1 received: b'request #4'
Client ---0 received: b'request #3'

3 个 client,  5 个 worker(server)处理. clients sent 后,由中间的 proxy round robin 依次分配到每一个 worker,worker 随机回复 n 个 reply.

zmq_proxy

来来来,zmq_proxy终极封装,同样是实现上面各种功能的 borker,比如 shared queue:

import zmq
context = zmq.Context.instance()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://127.0.0.1:5559")
backend = context.socket(zmq.DEALER)
backend.bind("tcp://127.0.0.1:5560")
zmq.proxy(frontend, backend)
# We never get here…
frontend.close()
backend.close()
context.term()

另外还可以做:

fronend=XSUB, backend=XPUB 的 Forwarder;

fronend=PULL, backend=PUSH 的 Streamer;