当前位置:网站首页>[rust contribution] implement Message Oriented Middleware (6) -client from zero
[rust contribution] implement Message Oriented Middleware (6) -client from zero
2022-06-25 03:53:00 【51CTO】
Functional design
client The implementation function is relatively simple , Is to be able to send messages to the server pub news , Then it will say subscribe to the message , The subscribed topic can be notified after receiving the message . Therefore, the following three functions can be summarized :
- Provide pub Interface
- Provide sub Interface
- Handle sub Messages received after
Data structure definition
The interfaces provided to users are the above three ,
To implement these three interfaces ,client There must be writer as well as handler. and sid Because of a client There can be multiple sub, every last sub Have a unique id, It is mainly used for numbering . stop For the sake of client Normally closed for use .
Interface -connect
connect The function of is to create a connection with the server , At the same time, the background will start a task to process tcp Connect , It mainly receives messages .
Interface -pub_message
Publish a message to the server pub news
Interface -sub_message
Publish a message to the server sub news , Then wait for the server to push relevant messages .
It should be noted that the parameters here subject and queue There is absolutely no need to use String,&str that will do . This should be rust One of the bug, stay 1.41 and nightly 1.43 Are compiled in the past . So back to second place , Used String.
receive_task
receive_task Mainly for receiving messages , analysis , And send messages to the appropriate handler.
This is actually the most complicated part of this module , On the whole, it is quite intuitive .
There are two main points
- Use futures::select This macro is used to assist in monitoring multiple at the same time future
- TcpStream If read To size by 0, Indicates that the connection has been closed , There is no need to continue
API Use
pub
sub
Code implementation
type MessageHandler = Box
<
dyn
FnMut(&[u8])
-
> std::result::Result
<
(),
()
> + Sync + Send>;
//#[derive(Debug)]
pub struct Client {
addr: String,
writer: Arc
<
Mutex
<WriteHalf
<
TcpStream
>>>,
pub stop: Option
<
oneshot::Sender
<()>>,
sid: u64,
handler: Arc
<
Mutex
<HashMap
<
String,
MessageHandler
>>>,
}
impl Client {
//1. Establishing a connection to the server
//2. Start background tasks
pub async fn connect(addr:
&str) -> std::io::Result
<
Client
> {
let conn = TcpStream::connect(addr).await?;
let (reader, writer) = tokio::io::split(conn);
let (tx, rx) = tokio::sync::oneshot::channel();
let c = Client {
addr: addr.into(),
writer: Arc::new(Mutex::new(writer)),
stop: Some(tx),
sid: 0,
handler: Arc::new(Default::default()),
};
let handler = c.handler.clone();
let writer = c.writer.clone();
/*
tokio::spawn It can be said that and go In language
go func(){}()
*/
tokio::spawn(async move {
Self::receive_task(reader, rx, handler, writer).await;
});
Ok(c)
}
/*
Receive... From server pub news
And then push it to the relevant subscribers .
*/
async fn receive_task(
mut reader: ReadHalf
<
TcpStream
>,
stop: oneshot::Receiver
<
()
>,
handler: Arc
<
Mutex
<HashMap
<
String,
MessageHandler
>>>,
writer: Arc
<
Mutex
<WriteHalf
<
TcpStream
>>>,
) {
let mut buf = [0 as u8; 512];
let mut parser = Parser::new();
use futures::*;
let mut stop = stop.fuse();
loop {
select! {
_=stop=>{
println!("client closed");
return;
}
r = reader.read(
&mut buf[..]).fuse()=>{
let n = {
match r {
Err(e) => {
println!("read err {}", e);
let _ = writer.lock().await.shutdown().await;
return;
}
Ok(n) => n,
}
};
if n == 0 {
//EOF, It means that the other party has closed the connection
return;
}
let mut buf2 =
&buf[..n];
loop {
let r = parser.parse(buf2);
let (r, n) = match r {
Err(e) => {
println!("parse err {}", e);
let _ = writer.lock().await.shutdown().await;
return;
}
Ok(r) => r,
};
// println!("receive msg {:?}", r);
match r {
ParseResult::NoMsg => {
break;
}
ParseResult::MsgArg(msg) => {
Self::process_message(msg,
&handler).await;
parser.clear_msg_buf();
}
}
// Buffer processing completed
if n == buf.len() {
break;
}
buf2 =
&buf2[n..]
}
}
}
}
}
/*
According to the news subject, Found subscriber ,
Then push it to them
*/
pub async fn process_message(
msg: MsgArg
<
'_>,
handler: &Arc<Mutex<HashMap<String, MessageHandler>>>,
) {
// println!("broadcast msg {}", msg.subject);
let mut handler = handler.lock().await;
let h = handler.get_mut(msg.subject);
if let Some(h) = h {
let _ = h(msg.msg);
}
}
//pub The message format is PUB subject size\r\n{message}
pub async fn pub_message(&self, subject: &str, msg: &[u8]) -> std::io::Result<()> {
let mut writer = self.writer.lock().await;
let m = format!("PUB {} {}\r\n", subject, msg.len());
let _ = writer.write_all(m.as_bytes()).await;
let _ = writer.write_all(msg).await;
writer.write_all("\r\n".as_bytes()).await
}
//sub The message format is SUB subject {queue} {sid}\r\n
// Probably because of rustc Of bug, Cause if subject yes &str, May be an error E0700, Temporary use String To replace
pub async fn sub_message(
&mut self,
subject: String,
queue: Option<String>,
handler: MessageHandler,
) -> std::io::Result<()> {
self.sid += 1;
let mut writer = self.writer.lock().await;
let m = if let Some(queue) = queue {
format!("SUB {} {} {}\r\n", subject.as_str(), queue, self.sid)
} else {
format!("SUB {} {}\r\n", subject.as_str(), self.sid)
};
self.handler.lock().await.insert(subject, handler);
writer.write_all(m.as_bytes()).await
}
pub fn close(&mut self) {
if let Some(stop) = self.stop.take() {
if let Err(e) = stop.send(()) {
println!("stop err {:?}", e);
}
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
- 146.
- 147.
- 148.
- 149.
- 150.
- 151.
other
The relevant code is in my github rnats Welcome to watch
边栏推荐
- Easynvr fails to use onvif to detect the device. What is the reason why "no data" is displayed?
- ASP. Net conference room booking applet source code booking applet source code
- Wechat development related
- 香蕉为什么能做随机数生成器?因为,它是水果界的“辐射之王”
- 浏览器下载的文件属性里都有保护,如何去掉
- 太极图形60行代码实现经典论文,0.7秒搞定泊松盘采样,比Numpy实现快100倍
- Jilin University 22 spring March "career design" assignment assessment-00072
- 多睡觉,能减肥,芝加哥大学最新研究:每天多睡1小时,等于少吃一根炸鸡腿...
- JSP cannot be resolved to a type error reporting solution
- Tai Chi graphics 60 lines of code to achieve classic papers, 0.7 seconds to get Poisson disk sampling, 100 times faster than numpy
猜你喜欢

The era of copilot free is over! The official version is 67 yuan / month, and the student party and the defenders of popular open source projects can prostitute for nothing

China's SkyEye found suspicious signals of extraterrestrial civilization. Musk said that the Starship began its orbital test flight in July. Netinfo office: app should not force users to agree to proc

Background page production 01 production of IVX low code sign in system

Perfect shuffle problem

威马招股书拆解:电动竞争已结束,智能排位赛刚开始

Musk was sued for $258billion in MLM claims. TSMC announced the 2nm process. The Chinese Academy of Sciences found that the lunar soil contained water in the form of hydroxyl. Today, more big news is

Sun Wu plays Warcraft? There is a picture and a truth

JSP cannot be resolved to a type error reporting solution

ICML 2022 | 字节跳动 AI Lab 提出多模态模型:X-VLM,学习视觉和语言的多粒度对齐...

老叶的祝福
随机推荐
js工具函数,自己封装一个节流函数
Program. Launch (xxx) open file
Self cultivation and learning encouragement
Administrator如何禁止另一个人踢掉自己?
MySQL modifies and deletes tables in batches according to the table prefix
Musk: Twitter should learn from wechat and make 1billion people "live on it" into a super app
扎克伯格最新VR原型机来了,要让人混淆虚拟与现实的那种
站在风暴中心:如何给飞奔中的腾讯更换引擎
Mstp+vrrp+ospf implements a three-tier architecture
【Rust投稿】捋捋 Rust 中的 impl Trait 和 dyn Trait
Break the memory wall with CPU scheme? Learn from PayPal stack to expand capacity, and the volume of missed fraud transactions can be reduced to 1/30
【Harmony OS】【ARK UI】ETS 上下文基本操作
教你如何在winpe里安装win11系统
浏览器下载的文件属性里都有保护,如何去掉
DevEco Studio 3.0编辑器配置技巧篇
现在,耳朵也要进入元宇宙了
Peking University has a new president! Gongqihuang, academician of the Chinese Academy of Sciences, took over and was admitted to the Physics Department of Peking University at the age of 15
Wuenda, the new course of machine learning is coming again! Free auditing, Xiaobai friendly
Jilin University 22 spring March "career design" assignment assessment-00072
Randla net: efficient semantic segmentation of large scale point clouds