当前位置:网站首页>【Rust投稿】从零实现消息中间件(6)-CLIENT

【Rust投稿】从零实现消息中间件(6)-CLIENT

2022-06-25 03:33:00 51CTO

功能设计

client实现功能相对比较单一,就是能够向服务器pub消息,然后就会说订阅消息,订阅的主题收到消息以后能够得到通知.因此总结起来就是下面三个功能:

  1. 提供pub接口
  2. 提供sub接口
  3. 处理sub后收到的消息

数据结构定义

提供给用户的接口是上面的三个,
为了实现这三个接口,client一定要有的就是
writer以及handler. 而sid则是因为一个client可以有多个sub,每一个sub要有唯一的id,主要是编号用. stop则是为了client正常关闭使用.

      
      
#[derive(Debug)]
pub struct Client {
addr: String,
writer: Arc < Mutex <WriteHalf < TcpStream >>>,
pub stop: oneshot::Sender < () >,
sid: u64,
handler: Arc < Mutex <HashMap < String,
mpsc::UnboundedSender <Vec < u8 >>>>>,
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.


接口-connect

connect的功能非常直白就是创建和服务器的连接,同时后台会启动一个任务来处理tcp连接,主要是接收消息.

      
      
pub async fn connect(addr: &str) -> std::io::Result < Client > {}
  • 1.


接口-pub_message

向服务器发布一条pub消息

      
      
pub async fn pub_message( &mut self,
subject: &str,
msg: &[u8])
-> std::io::Result < () > {}
  • 1.
  • 2.
  • 3.
  • 4.


接口-sub_message

向服务器发布一条sub消息,然后等待服务器推送相关消息.
需要说明的是这里的参数
subjectqueue完全没有必要使用String,&str即可. 这应该是rust的一个bug,在1.41和nightly 1.43都是编译不过去的.所以退而求其次,使用了String.

      
      
//sub消息格式为SUB subject {queue} {sid}\r\n
pub async fn sub_message(
&mut self,
subject: String,
queue: Option < String >,
handler: MessageHandler,
) -> std::io::Result < () > {}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.


receive_task

receive_task主要是做消息的接收,解析,以及将消息派发给合适的handler.
这个其实是本模块最复杂的地方,总体上比较直观.
主要有以下两点

  1. 使用futures::select这个宏来辅助实现同时监控多个future
  2. TcpStream如果read到size为0,说明连接已经关闭,无需继续
      
      
async fn receive_task(
mut reader: ReadHalf < TcpStream >,
stop: oneshot::Receiver < () >,
handler: Arc < Mutex <HashMap < String,
mpsc::UnboundedSender <Vec < u8 >>>>>,
writer: Arc < Mutex <WriteHalf < TcpStream >>>,
)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.


API的使用

pub

      
      
c.pub_message("test", format!("hello{}", i).as_bytes())
.await?;
  • 1.
  • 2.


sub

      
      
c.sub_message(
"test".into(),
None,
Box::new(move |msg| {
println!("recevied:{}", unsafe { std::str::from_utf8_unchecked(msg) });
Ok(())
}),
)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.


代码实现

      
      
type MessageHandler = Box < dyn FnMut(&[u8]) - > std::result::Result < (), () > + Sync + Send>;
//#[derive(Debug)]
pub struct Client {
addr: String,
writer: Arc < Mutex <WriteHalf < TcpStream >>>,
pub stop: Option < oneshot::Sender <()>>,
sid: u64,
handler: Arc < Mutex <HashMap < String, MessageHandler >>>,
}

impl Client {
//1. 建立到服务器的连接
//2. 启动后台任务
pub async fn connect(addr: &str) -> std::io::Result < Client > {
let conn = TcpStream::connect(addr).await?;
let (reader, writer) = tokio::io::split(conn);
let (tx, rx) = tokio::sync::oneshot::channel();
let c = Client {
addr: addr.into(),
writer: Arc::new(Mutex::new(writer)),
stop: Some(tx),
sid: 0,
handler: Arc::new(Default::default()),
};
let handler = c.handler.clone();
let writer = c.writer.clone();
/*
tokio::spawn 可以认为和go语言中的
go func(){}()
*/
tokio::spawn(async move {
Self::receive_task(reader, rx, handler, writer).await;
});
Ok(c)
}
/*
从服务器接收pub消息
然后推送给相关的订阅方。
*/
async fn receive_task(
mut reader: ReadHalf < TcpStream >,
stop: oneshot::Receiver < () >,
handler: Arc < Mutex <HashMap < String, MessageHandler >>>,
writer: Arc < Mutex <WriteHalf < TcpStream >>>,
) {
let mut buf = [0 as u8; 512];
let mut parser = Parser::new();
use futures::*;
let mut stop = stop.fuse();
loop {
select! {
_=stop=>{
println!("client closed");
return;
}
r = reader.read( &mut buf[..]).fuse()=>{
let n = {
match r {
Err(e) => {
println!("read err {}", e);
let _ = writer.lock().await.shutdown().await;
return;
}
Ok(n) => n,
}
};
if n == 0 {
//EOF,说明对方关闭了连接
return;
}
let mut buf2 = &buf[..n];
loop {
let r = parser.parse(buf2);
let (r, n) = match r {
Err(e) => {
println!("parse err {}", e);
let _ = writer.lock().await.shutdown().await;
return;
}
Ok(r) => r,
};
// println!("receive msg {:?}", r);
match r {
ParseResult::NoMsg => {
break;
}
ParseResult::MsgArg(msg) => {
Self::process_message(msg, &handler).await;
parser.clear_msg_buf();
}
}
//缓冲区处理完毕
if n == buf.len() {
break;
}
buf2 = &buf2[n..]
}
}
}
}
}
/*
根据消息的subject,找到订阅方,
然后推送给他们
*/
pub async fn process_message(
msg: MsgArg < '_>,
handler: &Arc<Mutex<HashMap<String, MessageHandler>>>,
) {
// println!("broadcast msg {}", msg.subject);
let mut handler = handler.lock().await;
let h = handler.get_mut(msg.subject);
if let Some(h) = h {
let _ = h(msg.msg);
}
}
//pub消息格式为PUB subject size\r\n{message}
pub async fn pub_message(&self, subject: &str, msg: &[u8]) -> std::io::Result<()> {
let mut writer = self.writer.lock().await;
let m = format!("PUB {} {}\r\n", subject, msg.len());
let _ = writer.write_all(m.as_bytes()).await;
let _ = writer.write_all(msg).await;
writer.write_all("\r\n".as_bytes()).await
}

//sub消息格式为SUB subject {queue} {sid}\r\n
//可能由于rustc的bug,导致如果subject是&str,则会报错E0700,暂时使用String来替代
pub async fn sub_message(
&mut self,
subject: String,
queue: Option<String>,
handler: MessageHandler,
) -> std::io::Result<()> {
self.sid += 1;
let mut writer = self.writer.lock().await;
let m = if let Some(queue) = queue {
format!("SUB {} {} {}\r\n", subject.as_str(), queue, self.sid)
} else {
format!("SUB {} {}\r\n", subject.as_str(), self.sid)
};
self.handler.lock().await.insert(subject, handler);
writer.write_all(m.as_bytes()).await
}
pub fn close(&mut self) {
if let Some(stop) = self.stop.take() {
if let Err(e) = stop.send(()) {
println!("stop err {:?}", e);
}
}
}
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.


其他

相关代码都在我的github rnats 欢迎围观

 ​https://github.com/nkbai/learnrustbynats​

原网站

版权声明
本文为[51CTO]所创,转载请带上原文链接,感谢
https://blog.51cto.com/u_15683898/5417590

随机推荐