当前位置:网站首页>[rust contribution] implement Message Oriented Middleware (6) -client from zero

[rust contribution] implement Message Oriented Middleware (6) -client from zero

2022-06-25 03:53:00 51CTO

Functional design

client The implementation function is relatively simple , Is to be able to send messages to the server pub news , Then it will say subscribe to the message , The subscribed topic can be notified after receiving the message . Therefore, the following three functions can be summarized :

  1. Provide pub Interface
  2. Provide sub Interface
  3. Handle sub Messages received after

Data structure definition

The interfaces provided to users are the above three ,
To implement these three interfaces ,client There must be
writer as well as handler. and sid Because of a client There can be multiple sub, every last sub Have a unique id, It is mainly used for numbering . stop For the sake of client Normally closed for use .

      
      
#[derive(Debug)]
pub struct Client {
addr: String,
writer: Arc < Mutex <WriteHalf < TcpStream >>>,
pub stop: oneshot::Sender < () >,
sid: u64,
handler: Arc < Mutex <HashMap < String,
mpsc::UnboundedSender <Vec < u8 >>>>>,
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.


Interface -connect

connect The function of is to create a connection with the server , At the same time, the background will start a task to process tcp Connect , It mainly receives messages .

      
      
pub async fn connect(addr: &str) -> std::io::Result < Client > {}
  • 1.


Interface -pub_message

Publish a message to the server pub news

      
      
pub async fn pub_message( &mut self,
subject: &str,
msg: &[u8])
-> std::io::Result < () > {}
  • 1.
  • 2.
  • 3.
  • 4.


Interface -sub_message

Publish a message to the server sub news , Then wait for the server to push relevant messages .
It should be noted that the parameters here
subject and queue There is absolutely no need to use String,&str that will do . This should be rust One of the bug, stay 1.41 and nightly 1.43 Are compiled in the past . So back to second place , Used String.

      
      
//sub The message format is SUB subject {queue} {sid}\r\n
pub async fn sub_message(
&mut self,
subject: String,
queue: Option < String >,
handler: MessageHandler,
) -> std::io::Result < () > {}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.


receive_task

receive_task Mainly for receiving messages , analysis , And send messages to the appropriate handler.
This is actually the most complicated part of this module , On the whole, it is quite intuitive .
There are two main points

  1. Use futures::select This macro is used to assist in monitoring multiple at the same time future
  2. TcpStream If read To size by 0, Indicates that the connection has been closed , There is no need to continue
      
      
async fn receive_task(
mut reader: ReadHalf < TcpStream >,
stop: oneshot::Receiver < () >,
handler: Arc < Mutex <HashMap < String,
mpsc::UnboundedSender <Vec < u8 >>>>>,
writer: Arc < Mutex <WriteHalf < TcpStream >>>,
)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.


API Use

pub

      
      
c.pub_message("test", format!("hello{}", i).as_bytes())
.await?;
  • 1.
  • 2.


sub

      
      
c.sub_message(
"test".into(),
None,
Box::new(move |msg| {
println!("recevied:{}", unsafe { std::str::from_utf8_unchecked(msg) });
Ok(())
}),
)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.


Code implementation

      
      
type MessageHandler = Box < dyn FnMut(&[u8]) - > std::result::Result < (), () > + Sync + Send>;
//#[derive(Debug)]
pub struct Client {
addr: String,
writer: Arc < Mutex <WriteHalf < TcpStream >>>,
pub stop: Option < oneshot::Sender <()>>,
sid: u64,
handler: Arc < Mutex <HashMap < String, MessageHandler >>>,
}

impl Client {
//1. Establishing a connection to the server
//2. Start background tasks
pub async fn connect(addr: &str) -> std::io::Result < Client > {
let conn = TcpStream::connect(addr).await?;
let (reader, writer) = tokio::io::split(conn);
let (tx, rx) = tokio::sync::oneshot::channel();
let c = Client {
addr: addr.into(),
writer: Arc::new(Mutex::new(writer)),
stop: Some(tx),
sid: 0,
handler: Arc::new(Default::default()),
};
let handler = c.handler.clone();
let writer = c.writer.clone();
/*
tokio::spawn It can be said that and go In language
go func(){}()
*/
tokio::spawn(async move {
Self::receive_task(reader, rx, handler, writer).await;
});
Ok(c)
}
/*
Receive... From server pub news
And then push it to the relevant subscribers .
*/
async fn receive_task(
mut reader: ReadHalf < TcpStream >,
stop: oneshot::Receiver < () >,
handler: Arc < Mutex <HashMap < String, MessageHandler >>>,
writer: Arc < Mutex <WriteHalf < TcpStream >>>,
) {
let mut buf = [0 as u8; 512];
let mut parser = Parser::new();
use futures::*;
let mut stop = stop.fuse();
loop {
select! {
_=stop=>{
println!("client closed");
return;
}
r = reader.read( &mut buf[..]).fuse()=>{
let n = {
match r {
Err(e) => {
println!("read err {}", e);
let _ = writer.lock().await.shutdown().await;
return;
}
Ok(n) => n,
}
};
if n == 0 {
//EOF, It means that the other party has closed the connection
return;
}
let mut buf2 = &buf[..n];
loop {
let r = parser.parse(buf2);
let (r, n) = match r {
Err(e) => {
println!("parse err {}", e);
let _ = writer.lock().await.shutdown().await;
return;
}
Ok(r) => r,
};
// println!("receive msg {:?}", r);
match r {
ParseResult::NoMsg => {
break;
}
ParseResult::MsgArg(msg) => {
Self::process_message(msg, &handler).await;
parser.clear_msg_buf();
}
}
// Buffer processing completed
if n == buf.len() {
break;
}
buf2 = &buf2[n..]
}
}
}
}
}
/*
According to the news subject, Found subscriber ,
Then push it to them
*/
pub async fn process_message(
msg: MsgArg < '_>,
handler: &Arc<Mutex<HashMap<String, MessageHandler>>>,
) {
// println!("broadcast msg {}", msg.subject);
let mut handler = handler.lock().await;
let h = handler.get_mut(msg.subject);
if let Some(h) = h {
let _ = h(msg.msg);
}
}
//pub The message format is PUB subject size\r\n{message}
pub async fn pub_message(&self, subject: &str, msg: &[u8]) -> std::io::Result<()> {
let mut writer = self.writer.lock().await;
let m = format!("PUB {} {}\r\n", subject, msg.len());
let _ = writer.write_all(m.as_bytes()).await;
let _ = writer.write_all(msg).await;
writer.write_all("\r\n".as_bytes()).await
}

//sub The message format is SUB subject {queue} {sid}\r\n
// Probably because of rustc Of bug, Cause if subject yes &str, May be an error E0700, Temporary use String To replace
pub async fn sub_message(
&mut self,
subject: String,
queue: Option<String>,
handler: MessageHandler,
) -> std::io::Result<()> {
self.sid += 1;
let mut writer = self.writer.lock().await;
let m = if let Some(queue) = queue {
format!("SUB {} {} {}\r\n", subject.as_str(), queue, self.sid)
} else {
format!("SUB {} {}\r\n", subject.as_str(), self.sid)
};
self.handler.lock().await.insert(subject, handler);
writer.write_all(m.as_bytes()).await
}
pub fn close(&mut self) {
if let Some(stop) = self.stop.take() {
if let Err(e) = stop.send(()) {
println!("stop err {:?}", e);
}
}
}
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.


other

The relevant code is in my github rnats  Welcome to watch

 ​https://github.com/nkbai/learnrustbynats​

原网站

版权声明
本文为[51CTO]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/176/202206250332316059.html

随机推荐