当前位置:网站首页>【Rust投稿】从零实现消息中间件(6)-CLIENT
【Rust投稿】从零实现消息中间件(6)-CLIENT
2022-06-25 03:33:00 【51CTO】
功能设计
client实现功能相对比较单一,就是能够向服务器pub消息,然后就会说订阅消息,订阅的主题收到消息以后能够得到通知.因此总结起来就是下面三个功能:
- 提供pub接口
- 提供sub接口
- 处理sub后收到的消息
数据结构定义
提供给用户的接口是上面的三个,
为了实现这三个接口,client一定要有的就是writer
以及handler
. 而sid则是因为一个client可以有多个sub,每一个sub要有唯一的id,主要是编号用. stop
则是为了client正常关闭使用.
接口-connect
connect的功能非常直白就是创建和服务器的连接,同时后台会启动一个任务来处理tcp连接,主要是接收消息.
接口-pub_message
向服务器发布一条pub消息
接口-sub_message
向服务器发布一条sub消息,然后等待服务器推送相关消息.
需要说明的是这里的参数subject
和queue
完全没有必要使用String,&str即可. 这应该是rust的一个bug,在1.41和nightly 1.43都是编译不过去的.所以退而求其次,使用了String.
receive_task
receive_task
主要是做消息的接收,解析,以及将消息派发给合适的handler.
这个其实是本模块最复杂的地方,总体上比较直观.
主要有以下两点
- 使用futures::select这个宏来辅助实现同时监控多个future
- TcpStream如果read到size为0,说明连接已经关闭,无需继续
API的使用
pub
sub
代码实现
type MessageHandler = Box < dyn FnMut(&[u8]) - > std::result::Result < (), () > + Sync + Send>;
//#[derive(Debug)]
pub struct Client {
addr: String,
writer: Arc < Mutex <WriteHalf < TcpStream >>>,
pub stop: Option < oneshot::Sender <()>>,
sid: u64,
handler: Arc < Mutex <HashMap < String, MessageHandler >>>,
}
impl Client {
//1. 建立到服务器的连接
//2. 启动后台任务
pub async fn connect(addr: &str) -> std::io::Result < Client > {
let conn = TcpStream::connect(addr).await?;
let (reader, writer) = tokio::io::split(conn);
let (tx, rx) = tokio::sync::oneshot::channel();
let c = Client {
addr: addr.into(),
writer: Arc::new(Mutex::new(writer)),
stop: Some(tx),
sid: 0,
handler: Arc::new(Default::default()),
};
let handler = c.handler.clone();
let writer = c.writer.clone();
/*
tokio::spawn 可以认为和go语言中的
go func(){}()
*/
tokio::spawn(async move {
Self::receive_task(reader, rx, handler, writer).await;
});
Ok(c)
}
/*
从服务器接收pub消息
然后推送给相关的订阅方。
*/
async fn receive_task(
mut reader: ReadHalf < TcpStream >,
stop: oneshot::Receiver < () >,
handler: Arc < Mutex <HashMap < String, MessageHandler >>>,
writer: Arc < Mutex <WriteHalf < TcpStream >>>,
) {
let mut buf = [0 as u8; 512];
let mut parser = Parser::new();
use futures::*;
let mut stop = stop.fuse();
loop {
select! {
_=stop=>{
println!("client closed");
return;
}
r = reader.read( &mut buf[..]).fuse()=>{
let n = {
match r {
Err(e) => {
println!("read err {}", e);
let _ = writer.lock().await.shutdown().await;
return;
}
Ok(n) => n,
}
};
if n == 0 {
//EOF,说明对方关闭了连接
return;
}
let mut buf2 = &buf[..n];
loop {
let r = parser.parse(buf2);
let (r, n) = match r {
Err(e) => {
println!("parse err {}", e);
let _ = writer.lock().await.shutdown().await;
return;
}
Ok(r) => r,
};
// println!("receive msg {:?}", r);
match r {
ParseResult::NoMsg => {
break;
}
ParseResult::MsgArg(msg) => {
Self::process_message(msg, &handler).await;
parser.clear_msg_buf();
}
}
//缓冲区处理完毕
if n == buf.len() {
break;
}
buf2 = &buf2[n..]
}
}
}
}
}
/*
根据消息的subject,找到订阅方,
然后推送给他们
*/
pub async fn process_message(
msg: MsgArg < '_>,
handler: &Arc<Mutex<HashMap<String, MessageHandler>>>,
) {
// println!("broadcast msg {}", msg.subject);
let mut handler = handler.lock().await;
let h = handler.get_mut(msg.subject);
if let Some(h) = h {
let _ = h(msg.msg);
}
}
//pub消息格式为PUB subject size\r\n{message}
pub async fn pub_message(&self, subject: &str, msg: &[u8]) -> std::io::Result<()> {
let mut writer = self.writer.lock().await;
let m = format!("PUB {} {}\r\n", subject, msg.len());
let _ = writer.write_all(m.as_bytes()).await;
let _ = writer.write_all(msg).await;
writer.write_all("\r\n".as_bytes()).await
}
//sub消息格式为SUB subject {queue} {sid}\r\n
//可能由于rustc的bug,导致如果subject是&str,则会报错E0700,暂时使用String来替代
pub async fn sub_message(
&mut self,
subject: String,
queue: Option<String>,
handler: MessageHandler,
) -> std::io::Result<()> {
self.sid += 1;
let mut writer = self.writer.lock().await;
let m = if let Some(queue) = queue {
format!("SUB {} {} {}\r\n", subject.as_str(), queue, self.sid)
} else {
format!("SUB {} {}\r\n", subject.as_str(), self.sid)
};
self.handler.lock().await.insert(subject, handler);
writer.write_all(m.as_bytes()).await
}
pub fn close(&mut self) {
if let Some(stop) = self.stop.take() {
if let Err(e) = stop.send(()) {
println!("stop err {:?}", e);
}
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
- 146.
- 147.
- 148.
- 149.
- 150.
- 151.
其他
相关代码都在我的github rnats 欢迎围观
边栏推荐
- Internet Explorer died, and netizens started to build a true tombstone
- Two common OEE monitoring methods for equipment utilization
- Musk was sued for $258billion in MLM claims. TSMC announced the 2nm process. The Chinese Academy of Sciences found that the lunar soil contained water in the form of hydroxyl. Today, more big news is
- 北大换新校长!中国科学院院士龚旗煌接任,15岁考上北大物理系
- 用CPU方案打破内存墙?学PayPal堆傲腾扩容量,漏查欺诈交易量可降至1/30
- TensorFlow,危!抛弃者正是谷歌自己
- Is it safe to open an account with flush securities?
- Insurance app aging service evaluation analysis 2022 issue 06
- 浏览器下载的文件属性里都有保护,如何去掉
- 在线股票开户安全吗?
猜你喜欢
ICML 2022 | ByteDance AI Lab proposes a multimodal model: x-vlm, learning multi granularity alignment of vision and language
[FPGA] serial port controls temperature acquisition by command
孙武玩《魔兽》?有图有真相
Dr. Sun Jian was commemorated at the CVPR conference. The best student thesis was awarded to Tongji Ali. Lifeifei won the huangxutao Memorial Award
Tencent Open Source Project "Yinglong" est devenu un projet Apache de haut niveau: l'ancien Service à long terme Wechat payment, peut maintenir un million de milliards de niveaux de traitement de flux
Peking University has a new president! Gongqihuang, academician of the Chinese Academy of Sciences, took over and was admitted to the Physics Department of Peking University at the age of 15
Single case of hungry and lazy mode
Copilot免费时代结束!正式版67元/月,学生党和热门开源项目维护者可白嫖
How to play well in the PMP Exam?
大咖说*计算讲谈社|如何提出关键问题?
随机推荐
MySQL installation tutorial
TC object structure and abbreviation
站在风暴中心:如何给飞奔中的腾讯更换引擎
在华泰证券上面开股票账户好不好,安不安全?
Li Kou daily question - day 26 -506 Relative rank
Xiaomi routing R4A Gigabit version installation feed+openwrt tutorial (the full script does not need to be hard modified)
Void* pointer
ACM. Hj70 matrix multiplication calculation amount estimation ●●
Wechat development related
多睡觉,能减肥,芝加哥大学最新研究:每天多睡1小时,等于少吃一根炸鸡腿...
西电AI专业排名超清北,南大蝉联全国第一 | 2022软科中国大学专业排名
Huawei failed to appeal and was prohibited from selling 5g equipment in Sweden; Apple regained the first place in the world in terms of market value; DeNO completes round a financing of USD 21million
Apple's legendary design team disbanded after jobs refused to obey cook
MCN institutions are blooming everywhere: bloggers and authors should sign contracts carefully, and the industry is very deep
IE寿终正寝,网友们搞起了真·墓碑……
The era of copilot free is over! The official version is 67 yuan / month, and the student party and the defenders of popular open source projects can prostitute for nothing
How does the administrator prohibit another person from kicking himself?
Demonstration of combination of dream CAD cloud map and GIS
The programmer reality show is coming again! Hulan, as the host, carried the lamp to fill the knowledge. The SSS boss had a bachelor's degree in pharmacy. Zhu Jun and Zhang Min from Tsinghua joined th
CVPR大会现场纪念孙剑博士,最佳学生论文授予同济阿里,李飞飞获黄煦涛纪念奖...