当前位置:网站首页>一文走近ZMQ
一文走近ZMQ
2022-06-22 09:10:00 【InfoQ】
一、我们为什么需要ZMQ
- 如何处理I/O?是让程序阻塞等待响应,还是在后台处理这些事?这是软件设计的关键因素。阻塞式的I/O操作会让程序架构难以扩展,而后台处理I/O也是比较困难的。
- 如何处理那些临时的、来去自由的组件?我们是否要将组件分为客户端和服务端两种,并要求服务端永不消失?那如果我们想要将服务端相连怎么办?我们要每隔几秒就进行重连吗?
- 如何表示一条消息?我们怎样通过拆分消息,让其变得易读易写,不用担心缓存溢出,既能高效地传输小消息,又能胜任视频等大型文件的传输?
- 如何处理那些不能立刻发送出去的消息?比如我们需要等待一个网络组件重新连接的时候?我们是直接丢弃该条消息,还是将它存入数据库,或是内存中的一个队列?
- 要在哪里保存消息队列?如果某个组件读取消息队列的速度很慢,造成消息的堆积怎么办?我们要采取什么样的策略?
- 如何处理丢失的消息?我们是等待新的数据,请求重发,还是需要建立一套新的可靠性机制以保证消息不会丢失?如果这个机制自身崩溃了呢?
- 如果想换一种网络连接协议,如用广播代替TCP单播?或者改用IPv6?我们是否需要重写所有的应用程序,或者将这种协议抽象到一个单独的层中?
- 如何对消息进行路由?我们可以将消息同时发送给多个节点吗?是否能将应答消息返回给请求的发送方?
- 如何为另一种语言写一个API?我们是否需要完全重写某项协议,还是重新打包一个类库?
- 怎样才能做到在不同的架构之间传送消息?是否需要为消息规定一种编码?
- 如何处理网络通信错误?等待并重试,还是直接忽略或取消?
- ZMQ会在后台线程异步地处理I/O操作,它使用一种不会死锁的数据结构来存储消息。
- 网络组件可以来去自如,ZMQ会负责自动重连,这就意味着你可以以任何顺序启动组件;用它创建的面向服务架构(SOA)中,服务端可以随意地加入或退出网络。
- ZMQ会在有必要的情况下自动将消息放入队列中保存,一旦建立了连接就开始发送。
- ZMQ有阈值(HWM)的机制,可以避免消息溢出。当队列已满,ZMQ会自动阻塞发送者,或丢弃部分消息,这些行为取决于你所使用的消息模式。
- ZMQ可以让你用不同的通信协议进行连接,如TCP、广播、进程内、进程间。改变通信协议时你不需要去修改代码。
- ZMQ会恰当地处理速度较慢的节点,会根据消息模式使用不同的策略。
- ZMQ提供了多种模式进行消息路由,如请求-应答模式、发布-订阅模式等。这些模式可以用来搭建网络拓扑结构。
- ZMQ中可以根据消息模式建立起一些中间装置(很小巧),可以用来降低网络的复杂程度。
- ZMQ会发送整个消息,使用消息帧的机制来传递。如果你发送了10KB大小的消息,你就会收到10KB大小的消息。
- ZMQ不强制使用某种消息格式,消息可以是0字节的,或是大到GB级的数据。当你表示这些消息时,可以选用诸如谷歌的protocol buffers,XDR等序列化产品。
- ZMQ能够智能地处理网络错误,有时它会进行重试,有时会告知你某项操作发生了错误。
- ZMQ甚至可以降低对环境的污染,因为节省了CPU时间意味着节省了电能。
二、ZeroMQ 背景介绍
三、ZMQ是什么
四、三种模型
4.1 应答模式
4.2 订阅发布模式
4.3 基于分布式处理(管道模式)
package cn.edu.ujn.pub_sub;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
/**
* Pubsub envelope publisher
*/
public class psenvpub {
public static void main (String[] args) throws Exception {
// Prepare our context and publisher
Context context = ZMQ.context(1);
Socket publisher = context.socket(ZMQ.PUB);
publisher.bind("tcp://*:5563");
while (!Thread.currentThread ().isInterrupted ()) {
// Write two messages, each with an envelope and content
publisher.sendMore ("A");
publisher.send ("We don't want to see this");
publisher.sendMore ("B");
publisher.send("We would like to see this");
}
publisher.close ();
context.term ();
}
}context.socket(ZMQ.PUB)package cn.edu.ujn.pub_sub;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;
/**
* Pubsub envelope subscriber
*/
public class psenvsub {
public static void main (String[] args) {
// Prepare our context and subscriber
Context context = ZMQ.context(1);
Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("tcp://localhost:5563");
subscriber.subscribe("B".getBytes());
while (!Thread.currentThread ().isInterrupted ()) {
// Read envelope with address
String address = subscriber.recvStr ();
// Read message contents
String contents = subscriber.recvStr ();
System.out.println(address + " : " + contents);
}
subscriber.close ();
context.term ();
}
}package cn.edu.ujn.req_rep;
//
// Hello World server in Java
// Binds REP socket to tcp://*:5555
// Expects "Hello" from client, replies with "World"
//
import org.zeromq.ZMQ;
public class hwserver {
private static int i = 0;
public static void main(String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);
// Socket to talk to clients
ZMQ.Socket responder = context.socket(ZMQ.REP);
responder.bind("tcp://*:5555");
while (!Thread.currentThread().isInterrupted()) {
// Wait for next request from the client
byte[] request = responder.recv(0);
System.out.println("Received " + new String(request) + i++);
// Do some 'work'
Thread.sleep(1000);
// Send reply back to client
String reply = "World";
responder.send(reply.getBytes(), 0);
}
responder.close();
context.term();
}
}package cn.edu.ujn.req_rep;
// Hello World client in Java
// Connects REQ socket to tcp://localhost:5555
// Sends "Hello" to server, expects "World" back
import org.zeromq.ZMQ;
public class hwclient {
public static void main(String[] args) {
ZMQ.Context context = ZMQ.context(1);
// Socket to talk to server
System.out.println("Connecting to hello world server…");
ZMQ.Socket requester = context.socket(ZMQ.REQ);
requester.connect("tcp://localhost:5555");
for (int requestNbr = 0; requestNbr != 10; requestNbr++) {
String request = "Hello";
System.out.println("Sending Hello " + requestNbr);
requester.send(request.getBytes(), 0);
byte[] reply = requester.recv(0);
System.out.println("Received " + new String(reply) + " " + requestNbr);
}
requester.close();
context.term();
}
}package cn.edu.ujn.para_pipe;
import java.util.Random;
import org.zeromq.ZMQ;
// Task ventilator in Java
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
public class taskvent {
public static void main (String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);
// Socket to send messages on
ZMQ.Socket sender = context.socket(ZMQ.PUSH);
sender.bind("tcp://*:5557");
// Socket to send messages on
ZMQ.Socket sink = context.socket(ZMQ.PUSH);
sink.connect("tcp://localhost:5558");
System.out.println("Press Enter when the workers are ready: ");
System.in.read();
System.out.println("Sending tasks to workers\n");
// The first message is "0" and signals start of batch
sink.send("0", 0);
// Initialize random number generator
Random srandom = new Random(System.currentTimeMillis());
// Send 100 tasks
int task_nbr;
int total_msec = 0; // Total expected cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
int workload;
// Random workload from 1 to 100msecs
workload = srandom.nextInt(100) + 1;
total_msec += workload;
System.out.print(workload + ".");
String string = String.format("%d", workload);
sender.send(string, 0);
}
System.out.println("Total expected cost: " + total_msec + " msec");
Thread.sleep(1000); // Give 0MQ time to deliver
sink.close();
sender.close();
context.term();
}
}package cn.edu.ujn.para_pipe;
import org.zeromq.ZMQ;
// Task worker in Java
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket
public class taskwork {
public static void main (String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);
// Socket to receive messages on
ZMQ.Socket receiver = context.socket(ZMQ.PULL);
receiver.connect("tcp://localhost:5557");
// Socket to send messages to
ZMQ.Socket sender = context.socket(ZMQ.PUSH);
sender.connect("tcp://localhost:5558");
// Process tasks forever
while (!Thread.currentThread ().isInterrupted ()) {
String string = new String(receiver.recv(0)).trim();
long msec = Long.parseLong(string);
// Simple progress indicator for the viewer
System.out.flush();
System.out.print(string + '.');
// Do the work
Thread.sleep(msec);
// Send results to sink
sender.send("".getBytes(), 0);
}
sender.close();
receiver.close();
context.term();
}
}package cn.edu.ujn.para_pipe;
import org.zeromq.ZMQ;
// Task sink in Java
// Binds PULL socket to tcp://localhost:5558
// Collects results from workers via that socket
public class tasksink {
public static void main (String[] args) throws Exception {
// Prepare our context and socket
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket receiver = context.socket(ZMQ.PULL);
receiver.bind("tcp://*:5558");
// Wait for start of batch
String string = new String(receiver.recv(0));
// Start our clock now
long tstart = System.currentTimeMillis();
// Process 100 confirmations
int task_nbr;
int total_msec = 0; // Total calculated cost in msecs
for (task_nbr = 0; task_nbr < 100; task_nbr++) {
string = new String(receiver.recv(0)).trim();
if ((task_nbr / 10) * 10 == task_nbr) {
System.out.print(":");
} else {
System.out.print(".");
}
}
// Calculate and report duration of batch
long tend = System.currentTimeMillis();
System.out.println("\nTotal elapsed time: " + (tend - tstart) + " msec");
receiver.close();
context.term();
}
}五、注意事项
5.1 正确地使用上下文
5.2 正确地退出和清理
- 处理完消息后,记得用zmq_msg_close()函数关闭消息;
- 如果你同时打开或关闭了很多套接字,那可能需要重新规划一下程序的结构了;
- 退出程序时,应该先关闭所有的套接字,最后调用zmq_term()函数,销毁上下文对象。
5.3 你的想法可能会被颠覆
边栏推荐
- 无线路由攻击和WiFi密码破解实战[渗透技术]
- [path planning] auxiliary points and multi segment Bessel smoothing RRT
- Detailed explanation and Simulation of string and memory operation functions
- OpenCV每日函数 直方图相关(3)
- It is hoped that more and more women will engage in scientific and technological work
- Read all files under the folder in the jar package
- 微表情数据集汇总(全)
- Yolov5 export GPU inference model export
- 【node】脚手架搭建服务器,完成token验证
- Final典型案例
猜你喜欢

性能优化专题

Continuous training on tensorflow breakpoint (principle + code explanation)

Medical information management system database mysql

Didi's two-sided summary

It is hoped that more and more women will engage in scientific and technological work

【node】理论+实践让你拿下session、cookie

三个不同线程顺序打印ABC十种写法【并发编程JUC】

前馈和反向传播

Deeply analyze the usage of final keyword

DOM programming
随机推荐
PHP de duplication and arrange complete small functions in descending order of length
【uni-app】实战总结(含多端打包)
Navicat for MySQL连接MySQL数据库时各种错误解决
pytorch OSError: DLL load failed: 问题解决方法
What is defi and what mode is defi?
Interview shock 59: can there be multiple auto increment columns in a table?
800+ PHP grammar and words are proficient only after you have used them
The third-party libraries commonly used in golang development are not the most complete, but more complete
IP address (IPv4)
How much do you know about the required encryption industry terms in 2022?
STM32 crashes when upgrading bootloader to jump app Online
版本问题导致“无法定位程序输入点OPENSSL_sk_new_reserve于动态链接库C:\Users...\libssl-1_1-x64.dll”
Why can MySQL indexes improve query efficiency so much?
Curl grasping stock information
Yolov5 export GPU inference model export
前馈和反向传播
逻辑回归和线性回归
进程状态汇总
XSS vulnerability attack
General ASP reads CSV files and displays all rows and columns as tables