当前位置:网站首页>[rust contribution] implement Message Oriented Middleware (6) -client from zero
[rust contribution] implement Message Oriented Middleware (6) -client from zero
2022-06-25 03:53:00 【51CTO】
Functional design
client The implementation function is relatively simple , Is to be able to send messages to the server pub news , Then it will say subscribe to the message , The subscribed topic can be notified after receiving the message . Therefore, the following three functions can be summarized :
- Provide pub Interface
- Provide sub Interface
- Handle sub Messages received after
Data structure definition
The interfaces provided to users are the above three ,
To implement these three interfaces ,client There must be writer as well as handler. and sid Because of a client There can be multiple sub, every last sub Have a unique id, It is mainly used for numbering . stop For the sake of client Normally closed for use .
Interface -connect
connect The function of is to create a connection with the server , At the same time, the background will start a task to process tcp Connect , It mainly receives messages .
Interface -pub_message
Publish a message to the server pub news
Interface -sub_message
Publish a message to the server sub news , Then wait for the server to push relevant messages .
It should be noted that the parameters here subject and queue There is absolutely no need to use String,&str that will do . This should be rust One of the bug, stay 1.41 and nightly 1.43 Are compiled in the past . So back to second place , Used String.
receive_task
receive_task Mainly for receiving messages , analysis , And send messages to the appropriate handler.
This is actually the most complicated part of this module , On the whole, it is quite intuitive .
There are two main points
- Use futures::select This macro is used to assist in monitoring multiple at the same time future
- TcpStream If read To size by 0, Indicates that the connection has been closed , There is no need to continue
API Use
pub
sub
Code implementation
type MessageHandler = Box
<
dyn
FnMut(&[u8])
-
> std::result::Result
<
(),
()
> + Sync + Send>;
//#[derive(Debug)]
pub struct Client {
addr: String,
writer: Arc
<
Mutex
<WriteHalf
<
TcpStream
>>>,
pub stop: Option
<
oneshot::Sender
<()>>,
sid: u64,
handler: Arc
<
Mutex
<HashMap
<
String,
MessageHandler
>>>,
}
impl Client {
//1. Establishing a connection to the server
//2. Start background tasks
pub async fn connect(addr:
&str) -> std::io::Result
<
Client
> {
let conn = TcpStream::connect(addr).await?;
let (reader, writer) = tokio::io::split(conn);
let (tx, rx) = tokio::sync::oneshot::channel();
let c = Client {
addr: addr.into(),
writer: Arc::new(Mutex::new(writer)),
stop: Some(tx),
sid: 0,
handler: Arc::new(Default::default()),
};
let handler = c.handler.clone();
let writer = c.writer.clone();
/*
tokio::spawn It can be said that and go In language
go func(){}()
*/
tokio::spawn(async move {
Self::receive_task(reader, rx, handler, writer).await;
});
Ok(c)
}
/*
Receive... From server pub news
And then push it to the relevant subscribers .
*/
async fn receive_task(
mut reader: ReadHalf
<
TcpStream
>,
stop: oneshot::Receiver
<
()
>,
handler: Arc
<
Mutex
<HashMap
<
String,
MessageHandler
>>>,
writer: Arc
<
Mutex
<WriteHalf
<
TcpStream
>>>,
) {
let mut buf = [0 as u8; 512];
let mut parser = Parser::new();
use futures::*;
let mut stop = stop.fuse();
loop {
select! {
_=stop=>{
println!("client closed");
return;
}
r = reader.read(
&mut buf[..]).fuse()=>{
let n = {
match r {
Err(e) => {
println!("read err {}", e);
let _ = writer.lock().await.shutdown().await;
return;
}
Ok(n) => n,
}
};
if n == 0 {
//EOF, It means that the other party has closed the connection
return;
}
let mut buf2 =
&buf[..n];
loop {
let r = parser.parse(buf2);
let (r, n) = match r {
Err(e) => {
println!("parse err {}", e);
let _ = writer.lock().await.shutdown().await;
return;
}
Ok(r) => r,
};
// println!("receive msg {:?}", r);
match r {
ParseResult::NoMsg => {
break;
}
ParseResult::MsgArg(msg) => {
Self::process_message(msg,
&handler).await;
parser.clear_msg_buf();
}
}
// Buffer processing completed
if n == buf.len() {
break;
}
buf2 =
&buf2[n..]
}
}
}
}
}
/*
According to the news subject, Found subscriber ,
Then push it to them
*/
pub async fn process_message(
msg: MsgArg
<
'_>,
handler: &Arc<Mutex<HashMap<String, MessageHandler>>>,
) {
// println!("broadcast msg {}", msg.subject);
let mut handler = handler.lock().await;
let h = handler.get_mut(msg.subject);
if let Some(h) = h {
let _ = h(msg.msg);
}
}
//pub The message format is PUB subject size\r\n{message}
pub async fn pub_message(&self, subject: &str, msg: &[u8]) -> std::io::Result<()> {
let mut writer = self.writer.lock().await;
let m = format!("PUB {} {}\r\n", subject, msg.len());
let _ = writer.write_all(m.as_bytes()).await;
let _ = writer.write_all(msg).await;
writer.write_all("\r\n".as_bytes()).await
}
//sub The message format is SUB subject {queue} {sid}\r\n
// Probably because of rustc Of bug, Cause if subject yes &str, May be an error E0700, Temporary use String To replace
pub async fn sub_message(
&mut self,
subject: String,
queue: Option<String>,
handler: MessageHandler,
) -> std::io::Result<()> {
self.sid += 1;
let mut writer = self.writer.lock().await;
let m = if let Some(queue) = queue {
format!("SUB {} {} {}\r\n", subject.as_str(), queue, self.sid)
} else {
format!("SUB {} {}\r\n", subject.as_str(), self.sid)
};
self.handler.lock().await.insert(subject, handler);
writer.write_all(m.as_bytes()).await
}
pub fn close(&mut self) {
if let Some(stop) = self.stop.take() {
if let Err(e) = stop.send(()) {
println!("stop err {:?}", e);
}
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
- 146.
- 147.
- 148.
- 149.
- 150.
- 151.
other
The relevant code is in my github rnats Welcome to watch
边栏推荐
- 教你如何在winpe里安装win11系统
- Is it safe to open an account with flush securities?
- 严重的PHP缺陷可导致QNAP NAS 设备遭RCE攻击
- The file attributes downloaded by the browser are protected. How to remove them
- 在华泰证券上面开股票账户好不好,安不安全?
- OpenSUSE installation pit log
- Mstp+vrrp+ospf implements a three-tier architecture
- Is it safe to open an account on your mobile phone?
- Configuration source code
- 居家办公之后才明白的时间管理 | 社区征文
猜你喜欢

香蕉为什么能做随机数生成器?因为,它是水果界的“辐射之王”

Redis related-03

后台页制作01《ivx低代码签到系统制作》

中国天眼发现地外文明可疑信号,马斯克称星舰7月开始轨道试飞,网信办:APP不得强制要求用户同意处理个人信息,今日更多大新闻在此...

Nacos practice record

How to play well in the PMP Exam?

扎克伯格最新VR原型机来了,要让人混淆虚拟与现实的那种

Does it count as staying up late to sleep at 2:00 and get up at 10:00? Unless you can do it every day

Peking University has a new president! Gongqihuang, academician of the Chinese Academy of Sciences, took over and was admitted to the Physics Department of Peking University at the age of 15

MySQL根据表前缀批量修改、删除表
随机推荐
windows 2003 64位系统php运行报错:1% 不是有效的 win32 应用程序
协作+安全+存储,云盒子助力深圳爱德泰重构数据中心
Rebeco:使用机器学习预测股票崩盘风险
【Harmony OS】【ArkUI】ets开发 图形与动画绘制
马斯克:推特要学习微信,让10亿人「活在上面」成为超级APP
居家办公之后才明白的时间管理 | 社区征文
Amazon's other side in China
Peking University has a new president! Gongqihuang, academician of the Chinese Academy of Sciences, took over and was admitted to the Physics Department of Peking University at the age of 15
AI writes its own code to let agents evolve! The big model of openai has the flavor of "human thought"
Nacos practice record
一文搞懂php中的(DI)依赖注入
Xidian AI ranked higher than Qingbei in terms of AI majors, and Nantah ranked the first in China in 2022 in terms of soft science majors
Sun Wu plays Warcraft? There is a picture and a truth
Sleep more, you can lose weight. According to the latest research from the University of Chicago, sleeping more than 1 hour a day is equivalent to eating less than one fried chicken leg
zabbix的安装避坑指南
The more AI evolves, the more it resembles the human brain! Meta found the "prefrontal cortex" of the machine. AI scholars and neuroscientists were surprised
DateTimeFormat放到@RequestBody下是无效的
Program. Launch (xxx) open file
北大换新校长!中国科学院院士龚旗煌接任,15岁考上北大物理系
The release function completed 02 "IVX low code sign in system production"