当前位置:网站首页>【Rust投稿】从零实现消息中间件(6)-CLIENT
【Rust投稿】从零实现消息中间件(6)-CLIENT
2022-06-25 03:33:00 【51CTO】
功能设计
client实现功能相对比较单一,就是能够向服务器pub消息,然后就会说订阅消息,订阅的主题收到消息以后能够得到通知.因此总结起来就是下面三个功能:
- 提供pub接口
- 提供sub接口
- 处理sub后收到的消息
数据结构定义
提供给用户的接口是上面的三个,
为了实现这三个接口,client一定要有的就是writer以及handler. 而sid则是因为一个client可以有多个sub,每一个sub要有唯一的id,主要是编号用. stop则是为了client正常关闭使用.
接口-connect
connect的功能非常直白就是创建和服务器的连接,同时后台会启动一个任务来处理tcp连接,主要是接收消息.
接口-pub_message
向服务器发布一条pub消息
接口-sub_message
向服务器发布一条sub消息,然后等待服务器推送相关消息.
需要说明的是这里的参数subject和queue完全没有必要使用String,&str即可. 这应该是rust的一个bug,在1.41和nightly 1.43都是编译不过去的.所以退而求其次,使用了String.
receive_task
receive_task主要是做消息的接收,解析,以及将消息派发给合适的handler.
这个其实是本模块最复杂的地方,总体上比较直观.
主要有以下两点
- 使用futures::select这个宏来辅助实现同时监控多个future
- TcpStream如果read到size为0,说明连接已经关闭,无需继续
API的使用
pub
sub
代码实现
type MessageHandler = Box
<
dyn
FnMut(&[u8])
-
> std::result::Result
<
(),
()
> + Sync + Send>;
//#[derive(Debug)]
pub struct Client {
addr: String,
writer: Arc
<
Mutex
<WriteHalf
<
TcpStream
>>>,
pub stop: Option
<
oneshot::Sender
<()>>,
sid: u64,
handler: Arc
<
Mutex
<HashMap
<
String,
MessageHandler
>>>,
}
impl Client {
//1. 建立到服务器的连接
//2. 启动后台任务
pub async fn connect(addr:
&str) -> std::io::Result
<
Client
> {
let conn = TcpStream::connect(addr).await?;
let (reader, writer) = tokio::io::split(conn);
let (tx, rx) = tokio::sync::oneshot::channel();
let c = Client {
addr: addr.into(),
writer: Arc::new(Mutex::new(writer)),
stop: Some(tx),
sid: 0,
handler: Arc::new(Default::default()),
};
let handler = c.handler.clone();
let writer = c.writer.clone();
/*
tokio::spawn 可以认为和go语言中的
go func(){}()
*/
tokio::spawn(async move {
Self::receive_task(reader, rx, handler, writer).await;
});
Ok(c)
}
/*
从服务器接收pub消息
然后推送给相关的订阅方。
*/
async fn receive_task(
mut reader: ReadHalf
<
TcpStream
>,
stop: oneshot::Receiver
<
()
>,
handler: Arc
<
Mutex
<HashMap
<
String,
MessageHandler
>>>,
writer: Arc
<
Mutex
<WriteHalf
<
TcpStream
>>>,
) {
let mut buf = [0 as u8; 512];
let mut parser = Parser::new();
use futures::*;
let mut stop = stop.fuse();
loop {
select! {
_=stop=>{
println!("client closed");
return;
}
r = reader.read(
&mut buf[..]).fuse()=>{
let n = {
match r {
Err(e) => {
println!("read err {}", e);
let _ = writer.lock().await.shutdown().await;
return;
}
Ok(n) => n,
}
};
if n == 0 {
//EOF,说明对方关闭了连接
return;
}
let mut buf2 =
&buf[..n];
loop {
let r = parser.parse(buf2);
let (r, n) = match r {
Err(e) => {
println!("parse err {}", e);
let _ = writer.lock().await.shutdown().await;
return;
}
Ok(r) => r,
};
// println!("receive msg {:?}", r);
match r {
ParseResult::NoMsg => {
break;
}
ParseResult::MsgArg(msg) => {
Self::process_message(msg,
&handler).await;
parser.clear_msg_buf();
}
}
//缓冲区处理完毕
if n == buf.len() {
break;
}
buf2 =
&buf2[n..]
}
}
}
}
}
/*
根据消息的subject,找到订阅方,
然后推送给他们
*/
pub async fn process_message(
msg: MsgArg
<
'_>,
handler: &Arc<Mutex<HashMap<String, MessageHandler>>>,
) {
// println!("broadcast msg {}", msg.subject);
let mut handler = handler.lock().await;
let h = handler.get_mut(msg.subject);
if let Some(h) = h {
let _ = h(msg.msg);
}
}
//pub消息格式为PUB subject size\r\n{message}
pub async fn pub_message(&self, subject: &str, msg: &[u8]) -> std::io::Result<()> {
let mut writer = self.writer.lock().await;
let m = format!("PUB {} {}\r\n", subject, msg.len());
let _ = writer.write_all(m.as_bytes()).await;
let _ = writer.write_all(msg).await;
writer.write_all("\r\n".as_bytes()).await
}
//sub消息格式为SUB subject {queue} {sid}\r\n
//可能由于rustc的bug,导致如果subject是&str,则会报错E0700,暂时使用String来替代
pub async fn sub_message(
&mut self,
subject: String,
queue: Option<String>,
handler: MessageHandler,
) -> std::io::Result<()> {
self.sid += 1;
let mut writer = self.writer.lock().await;
let m = if let Some(queue) = queue {
format!("SUB {} {} {}\r\n", subject.as_str(), queue, self.sid)
} else {
format!("SUB {} {}\r\n", subject.as_str(), self.sid)
};
self.handler.lock().await.insert(subject, handler);
writer.write_all(m.as_bytes()).await
}
pub fn close(&mut self) {
if let Some(stop) = self.stop.take() {
if let Err(e) = stop.send(()) {
println!("stop err {:?}", e);
}
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
- 146.
- 147.
- 148.
- 149.
- 150.
- 151.
其他
相关代码都在我的github rnats 欢迎围观
边栏推荐
- 网上开户股票安全吗?怎么开户呢?
- Zuckerberg's latest VR prototype is coming. It is necessary to confuse virtual reality with reality
- Huawei failed to appeal and was prohibited from selling 5g equipment in Sweden; Apple regained the first place in the world in terms of market value; DeNO completes round a financing of USD 21million
- 在Microsoft Exchange Server 2007中安装SSL证书的教程
- [FPGA] serial port controls temperature acquisition by command
- Li Kou daily question - day 26 -506 Relative rank
- Rebeco: using machine learning to predict stock crash risk
- Winxp kernel driver debugging
- The era of copilot free is over! Student party and defenders of popular open source projects can prostitute for nothing
- Amazon's other side in China
猜你喜欢

About sizeof() and strlen in array

ICML 2022 | 字节跳动 AI Lab 提出多模态模型:X-VLM,学习视觉和语言的多粒度对齐...

AI writes its own code to let agents evolve! The big model of openai has the flavor of "human thought"

2点睡10点起不算熬夜?除非你每天都能执行

Rebeco:使用机器学习预测股票崩盘风险

What is an SSL certificate and what are the benefits of having an SSL certificate?

北大换新校长!中国科学院院士龚旗煌接任,15岁考上北大物理系

AI自己写代码让智能体进化!OpenAI的大模型有“人类思想”那味了

马斯克被诉传销索赔2580亿美元,台积电公布2nm制程,中科院发现月壤中含有羟基形式的水,今日更多大新闻在此...

China's SkyEye found suspicious signals of extraterrestrial civilization. Musk said that the Starship began its orbital test flight in July. Netinfo office: app should not force users to agree to proc
随机推荐
The programmer reality show is coming again! Hulan, as the host, carried the lamp to fill the knowledge. The SSS boss had a bachelor's degree in pharmacy. Zhu Jun and Zhang Min from Tsinghua joined th
Svn deployment
浏览器下载的文件属性里都有保护,如何去掉
中国天眼发现地外文明可疑信号,马斯克称星舰7月开始轨道试飞,网信办:APP不得强制要求用户同意处理个人信息,今日更多大新闻在此...
在Microsoft Exchange Server 2007中安装SSL证书的教程
Is it safe to open an account on the compass? Is it reliable?
马斯克:推特要学习微信,让10亿人「活在上面」成为超级APP
Eggservice builds the basic service of wechat official account
西电AI专业排名超清北,南大蝉联全国第一 | 2022软科中国大学专业排名
Sleep more, you can lose weight. According to the latest research from the University of Chicago, sleeping more than 1 hour a day is equivalent to eating less than one fried chicken leg
跨境电商新手如何防止店铺关联?用什么工具好?
Two way combination of business and technology to build a bank data security management system
Self cultivation and learning encouragement
Sorting of poor cattle (winter vacation daily question 40)
程序员真人秀又来了!呼兰当主持挑灯狂补知识,SSS大佬本科竟是药学,清华朱军张敏等加入导师团...
How far is the memory computing integrated chip from popularization? Listen to what practitioners say | collision school x post friction intelligence
Li Kou daily question - day 26 -506 Relative rank
单例的饥饿、懒汉模式案例
PHP uses getid3 to obtain the duration of MP3, MP4, WAV and other media files
Dr. Sun Jian was commemorated at the CVPR conference. The best student thesis was awarded to Tongji Ali. Lifeifei won the huangxutao Memorial Award