当前位置:网站首页>[rust contribution] implement Message Oriented Middleware (6) -client from zero
[rust contribution] implement Message Oriented Middleware (6) -client from zero
2022-06-25 03:53:00 【51CTO】
Functional design
client The implementation function is relatively simple , Is to be able to send messages to the server pub news , Then it will say subscribe to the message , The subscribed topic can be notified after receiving the message . Therefore, the following three functions can be summarized :
- Provide pub Interface
- Provide sub Interface
- Handle sub Messages received after
Data structure definition
The interfaces provided to users are the above three ,
To implement these three interfaces ,client There must be writer as well as handler. and sid Because of a client There can be multiple sub, every last sub Have a unique id, It is mainly used for numbering . stop For the sake of client Normally closed for use .
Interface -connect
connect The function of is to create a connection with the server , At the same time, the background will start a task to process tcp Connect , It mainly receives messages .
Interface -pub_message
Publish a message to the server pub news
Interface -sub_message
Publish a message to the server sub news , Then wait for the server to push relevant messages .
It should be noted that the parameters here subject and queue There is absolutely no need to use String,&str that will do . This should be rust One of the bug, stay 1.41 and nightly 1.43 Are compiled in the past . So back to second place , Used String.
receive_task
receive_task Mainly for receiving messages , analysis , And send messages to the appropriate handler.
This is actually the most complicated part of this module , On the whole, it is quite intuitive .
There are two main points
- Use futures::select This macro is used to assist in monitoring multiple at the same time future
- TcpStream If read To size by 0, Indicates that the connection has been closed , There is no need to continue
API Use
pub
sub
Code implementation
type MessageHandler = Box
<
dyn
FnMut(&[u8])
-
> std::result::Result
<
(),
()
> + Sync + Send>;
//#[derive(Debug)]
pub struct Client {
addr: String,
writer: Arc
<
Mutex
<WriteHalf
<
TcpStream
>>>,
pub stop: Option
<
oneshot::Sender
<()>>,
sid: u64,
handler: Arc
<
Mutex
<HashMap
<
String,
MessageHandler
>>>,
}
impl Client {
//1. Establishing a connection to the server
//2. Start background tasks
pub async fn connect(addr:
&str) -> std::io::Result
<
Client
> {
let conn = TcpStream::connect(addr).await?;
let (reader, writer) = tokio::io::split(conn);
let (tx, rx) = tokio::sync::oneshot::channel();
let c = Client {
addr: addr.into(),
writer: Arc::new(Mutex::new(writer)),
stop: Some(tx),
sid: 0,
handler: Arc::new(Default::default()),
};
let handler = c.handler.clone();
let writer = c.writer.clone();
/*
tokio::spawn It can be said that and go In language
go func(){}()
*/
tokio::spawn(async move {
Self::receive_task(reader, rx, handler, writer).await;
});
Ok(c)
}
/*
Receive... From server pub news
And then push it to the relevant subscribers .
*/
async fn receive_task(
mut reader: ReadHalf
<
TcpStream
>,
stop: oneshot::Receiver
<
()
>,
handler: Arc
<
Mutex
<HashMap
<
String,
MessageHandler
>>>,
writer: Arc
<
Mutex
<WriteHalf
<
TcpStream
>>>,
) {
let mut buf = [0 as u8; 512];
let mut parser = Parser::new();
use futures::*;
let mut stop = stop.fuse();
loop {
select! {
_=stop=>{
println!("client closed");
return;
}
r = reader.read(
&mut buf[..]).fuse()=>{
let n = {
match r {
Err(e) => {
println!("read err {}", e);
let _ = writer.lock().await.shutdown().await;
return;
}
Ok(n) => n,
}
};
if n == 0 {
//EOF, It means that the other party has closed the connection
return;
}
let mut buf2 =
&buf[..n];
loop {
let r = parser.parse(buf2);
let (r, n) = match r {
Err(e) => {
println!("parse err {}", e);
let _ = writer.lock().await.shutdown().await;
return;
}
Ok(r) => r,
};
// println!("receive msg {:?}", r);
match r {
ParseResult::NoMsg => {
break;
}
ParseResult::MsgArg(msg) => {
Self::process_message(msg,
&handler).await;
parser.clear_msg_buf();
}
}
// Buffer processing completed
if n == buf.len() {
break;
}
buf2 =
&buf2[n..]
}
}
}
}
}
/*
According to the news subject, Found subscriber ,
Then push it to them
*/
pub async fn process_message(
msg: MsgArg
<
'_>,
handler: &Arc<Mutex<HashMap<String, MessageHandler>>>,
) {
// println!("broadcast msg {}", msg.subject);
let mut handler = handler.lock().await;
let h = handler.get_mut(msg.subject);
if let Some(h) = h {
let _ = h(msg.msg);
}
}
//pub The message format is PUB subject size\r\n{message}
pub async fn pub_message(&self, subject: &str, msg: &[u8]) -> std::io::Result<()> {
let mut writer = self.writer.lock().await;
let m = format!("PUB {} {}\r\n", subject, msg.len());
let _ = writer.write_all(m.as_bytes()).await;
let _ = writer.write_all(msg).await;
writer.write_all("\r\n".as_bytes()).await
}
//sub The message format is SUB subject {queue} {sid}\r\n
// Probably because of rustc Of bug, Cause if subject yes &str, May be an error E0700, Temporary use String To replace
pub async fn sub_message(
&mut self,
subject: String,
queue: Option<String>,
handler: MessageHandler,
) -> std::io::Result<()> {
self.sid += 1;
let mut writer = self.writer.lock().await;
let m = if let Some(queue) = queue {
format!("SUB {} {} {}\r\n", subject.as_str(), queue, self.sid)
} else {
format!("SUB {} {}\r\n", subject.as_str(), self.sid)
};
self.handler.lock().await.insert(subject, handler);
writer.write_all(m.as_bytes()).await
}
pub fn close(&mut self) {
if let Some(stop) = self.stop.take() {
if let Err(e) = stop.send(()) {
println!("stop err {:?}", e);
}
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
- 146.
- 147.
- 148.
- 149.
- 150.
- 151.
other
The relevant code is in my github rnats Welcome to watch
边栏推荐
- Administrator如何禁止另一个人踢掉自己?
- Musk: Twitter should learn from wechat and make 1billion people "live on it" into a super app
- Dr. Sun Jian was commemorated at the CVPR conference. The best student thesis was awarded to Tongji Ali. Lifeifei won the huangxutao Memorial Award
- Mobile mall project operation
- Sleep more, you can lose weight. According to the latest research from the University of Chicago, sleeping more than 1 hour a day is equivalent to eating less than one fried chicken leg
- The sign in function completes 03 "IVX low code sign in system production"
- Is it safe to open an account online? Online and other answers
- 华为上诉失败,被禁止在瑞典销售 5G 设备;苹果公司市值重获全球第一;Deno 完成 2100 万美元 A 轮融资|极客头条
- Musk was sued for $258billion in MLM claims. TSMC announced the 2nm process. The Chinese Academy of Sciences found that the lunar soil contained water in the form of hydroxyl. Today, more big news is
- Sorting of poor cattle (winter vacation daily question 40)
猜你喜欢

Lao Ye's blessing

Randla net: efficient semantic segmentation of large scale point clouds

亚马逊在中国的另一面

Background page production 01 production of IVX low code sign in system

谷歌创始人布林二婚破裂:被曝1月已提出与华裔妻子离婚,目前身家6314亿美元...

The programmer reality show is coming again! Hulan, as the host, carried the lamp to fill the knowledge. The SSS boss had a bachelor's degree in pharmacy. Zhu Jun and Zhang Min from Tsinghua joined th

协作+安全+存储,云盒子助力深圳爱德泰重构数据中心

Perfect shuffle problem

What is an SSL certificate and what are the benefits of having an SSL certificate?

Two common OEE monitoring methods for equipment utilization
随机推荐
股票在网上开户安全吗?在线等答案
Perfect shuffle problem
DateTimeFormat放到@RequestBody下是无效的
Sleep more, you can lose weight. According to the latest research from the University of Chicago, sleeping more than 1 hour a day is equivalent to eating less than one fried chicken leg
Apple's legendary design team disbanded after jobs refused to obey cook
Lu Qi invests in quantum computing for the first time
Preparedstatement principle of preventing SQL injection
Maintenant, les oreilles vont entrer dans le métacosme.
Is it safe to open an account on your mobile phone?
Two common OEE monitoring methods for equipment utilization
Wuenda, the new course of machine learning is coming again! Free auditing, Xiaobai friendly
可能是拿反了的原因
Three key explanations of overseas e-commerce operation in 2022
程序员真人秀又来了!呼兰当主持挑灯狂补知识,SSS大佬本科竟是药学,清华朱军张敏等加入导师团...
Tencent Open Source Project "Yinglong" est devenu un projet Apache de haut niveau: l'ancien Service à long terme Wechat payment, peut maintenir un million de milliards de niveaux de traitement de flux
A new generation of cascadable Ethernet Remote i/o data acquisition module
中国天眼发现地外文明可疑信号,马斯克称星舰7月开始轨道试飞,网信办:APP不得强制要求用户同意处理个人信息,今日更多大新闻在此...
Redis related-02
Create SQLite table with shell script and add SQL statement -- General
Randla net: efficient semantic segmentation of large scale point clouds