当前位置:网站首页>Summary of rust high concurrency programming
Summary of rust high concurrency programming
2022-06-24 03:32:00 【beyondma】
Serverless Your concept is on fire , The industry no longer discusses whether to use Serverless The problem. , But shouting Serverless First Our slogan is to embrace Serverless, No server is not Serverless The essence of , You don't need to care about the server to work efficiently , It's just Serverless The core of winning . The ups and downs of traffic in the Internet era , Many technology giants are also defeated in the face of the impact of traffic , For the last few months B The crash of the station , The author has also written 《B The front end of the station collapsed , Don't panic at the back 》 To analyze the context , and Serverless With the automatic elasticity of rapid expansion , Can deal with similar shocks calmly , This also makes this new technology all the rage .
stay Serverless Behind the noise of ,Rust Seems to have firmly occupied C position , But in fact, there are many patterns and routines to be summarized under the topic of high concurrency , Especially like Tokio Professional programming framework , It is very helpful for programmers to write high-performance programs . Therefore, this article introduces Tokio Relevant knowledge points are supplemented and summarized .
Future What concept is it
simply Future It's not a value , It's a value type , A value type that can only be obtained in the future .Future Object must be implemented Rust In the standard library std::future:: future Interface .Future Output Output yes Future Values that can only be generated after completion . stay Rust in Future Call through Manager Future::poll To push Future Arithmetic .Future It's essentially a state machine , And it can be nested , Let's take a look at the following example , stay main Function , We instantiate MainFuture And call .await, and MainFuture In addition to migrating between several States , There will also be a call to Delay Of Future, So as to achieve Future Nesting of .
MainFuture With State0 State as initial state . When the scheduler calls poll When the method is used ,MainFuture Will try to improve their status as much as possible . If future complete , Then return to Poll::Ready, If MainFuture Not completed , Because it is waiting for DelayFuture Don't reach Ready state , Then return to Pending. The scheduler received Pending result , Will take this. MainFuture Put it back into the queue to be scheduled , It will be called again later Poll Methods to advance Future Implementation . As follows
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
enum MainFuture {
State0,
State1(Delay),
Terminated,
}
impl Future for MainFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<()>
{
use MainFuture::*;
loop {
match *self {
State0 => {
let when = Instant::now() +
Duration::from_millis(1);
let future = Delay { when };
println!("init status");
*self = State1(future);
}
State1(ref mut my_future) => {
match Pin::new(my_future).poll(cx) {
Poll::Ready(out) => {
assert_eq!(out, "done");
println!("delay finished this future is ready");
*self = Terminated;
return Poll::Ready(());
}
Poll::Pending => {
println!("not ready");
return Poll::Pending;
}
}
}
Terminated => {
panic!("future polled after completion")
}
}
}
}
}
#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let mainFuture=MainFuture::State0;
mainFuture.await;
}Of course, this Future There is an obvious problem with the implementation of , From the running results, we can also know that the debugger has executed many times when it needs to wait Poll operation , Ideally, you need to be Future Implement when there is progress Poll operation . Constantly favoritism Poll It degenerates into inefficient polling .
The solution is poll Function Context Parameters , This Context Namely Future Of waker(), By calling waker Signals can be sent to the actuator , Indicate that this task should be carried out Poll Operation . When Future When the state advances , call wake To inform the actuator , That's the positive solution, so we need to put Delay Change some of the code :
let waker = cx.waker().clone();
let when = self.when;
// Spawn a timer thread.
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
waker.wake();
});No matter what kind of high concurrency framework , In essence, it is based on this Task/Poll Mechanism scheduler , poll The essential work is to monitor the front of the chain Task Implementation status of .
let waker = cx.waker().clone();
let when = self.when;
// Spawn a timer thread.
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
waker.wake();
});
The use of good Poll The mechanism of , The scheduling algorithm that can avoid the above event cycle and periodically traverse the whole event queue ,Poll The essence of is to change the state to ready Notifies the corresponding handler of the event , And based on poll Designed as tokio Framework for application development , Programmers don't have to care about the whole messaging , Only need to use and_then、spawn And other methods to establish the task chain and make the system work .
Implementation of data frame
Frame is the smallest unit in data transmission , Byte data below the frame granularity has no meaning for the application , At the same time, incomplete frames should also be filtered at the frame processing layer ,read_frame Method waits for the entire frame to be received before returning . Yes TcpStream::read() A single call to can return any number of data . It can contain the whole framework , Partial frame , Or multiple frames . If a partial frame is received , The data will be buffered , And read more data from the socket . If multiple frames are received , Returns the first frame , The rest of the data will be buffered , Until the next call read_frame. To achieve this ,Connection Need a read buffer field . Data is read from the socket into the read buffer . When a frame is parsed , The corresponding data will be deleted from the buffer . We will use BytesMut As a buffer type .
use bytes::BytesMut;use tokio::net::TcpStream;
pub struct Connection {
stream: TcpStream,
buffer: BytesMut,
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream,
// Allocate the buffer with 4kb of capacity.
buffer: BytesMut::with_capacity(4096),
}
}
}read_frame The function tries to parse the frame . If there is enough data to parse the frame , The frame is returned to read_frame() The caller . otherwise , Will try to read more data from the socket into the buffer . After reading more data , Call again parse_frame(). This time, , If enough data is received , Parsing may succeed . When reading data from a stream , The return value is 0 Indicates that data is no longer received from the peer . If there is still data in the read buffer , This indicates that some frames have been received , The connection is terminating abruptly . This is an error condition , And back to Err.
use mini_redis::{Frame, Result};
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
loop {
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}
// Ensure the buffer has capacity
if self.buffer.len() == self.cursor {
// Grow the buffer
self.buffer.resize(self.cursor * 2, 0);
}
// Read into the buffer, tracking the number
// of bytes read
let n = self.stream.read(
&mut self.buffer[self.cursor..]).await?;
if 0 == n {
if self.cursor == 0 {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
} else {
// Update our cursor
self.cursor += n;
}
}
}Be careful Select
Another point worth noting is select, When using more than one channel , Any channel can be completed first . choice select! Keywords will wait on all channels , And will mention the value on the first return channel . Be careful select! When you wait until the first one returns , Other unfinished tasks will be cancelled . As follows :
use tokio::sync::oneshot;
async fn some_operation() -> String {
"hello beyondma".to_string()
}
#[tokio::main]
async fn main() {
let (mut tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async {
let _ = tx1.send("hello beyondma");
});
tokio::spawn(async {
let _ = tx2.send("hi beyondma");
});
tokio::select! {
val = rx1 => {
println!("rx1 completed first with {:?}", val);
}
val = rx2 => {
println!("rx2 completed first with {:?}", val);
}
}
}If the execution result of the above code is not
hello beyondma
Or
hi beyondma
It is impossible to output both .
To explain select The mechanism of , We designed one by ourselves MySelect Of future, In the face of MySelect Conduct poll In operation , The first branch will be polled . If you're ready , Then use this value and complete MySelect. stay MySelect.await Received a Ready after , Whole future To be discarded . As follows :
use tokio::sync::oneshot;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct MySelect {
rx1: oneshot::Receiver<&'static str>,
rx2: oneshot::Receiver<&'static str>,
}
impl Future for MySelect {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) {
println!("rx1 completed first with {:?}", val);
return Poll::Ready(());
}
if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) {
println!("rx2 completed first with {:?}", val);
return Poll::Ready(());
}
Poll::Pending
}
}
#[tokio::main]
async fn main() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
// use tx1 and tx2
tokio::spawn(async {
let _ = tx1.send("hello beyondma");
});
tokio::spawn(async {
let _ = tx2.send("hi beyondma");
});
MySelect {
rx1,
rx2,
}.await;
}Rust High concurrency summary
Rust In recent years, with Serverless A new language , On the surface, he looks like C, Neither JVM Virtual machines don't either GC Garbage collector , But take a closer look, he's not C,Rust I don't trust programmers , Trying to make Rust The compiler kills the errors in the program before generating the executable Build Stage . Because there is no GC therefore Rust It has created a set of variable life cycle and borrowing and calling mechanism . Developers must always be careful whether there is a problem with the life cycle of variables .
and Rust It's as hard as Martian Language , Multiple channels should be used before clone, Before using a locked hash table, you must unwrap, Various usages and Java、Go Completely different , But also because of such strict use restrictions , What we just mentioned Go In language Gorotine There will be problems , stay Rust It's not going to happen in the world , because Go Those uses of , All do not conform to Rust Checking of variable life cycle , It is impossible to compile and pass .
therefore Rust Very much like the carefree school , It's very difficult to get started , But as long as you can graduate , The program written can be compiled , Then you are 100% a master , So this is a high threshold , The ultimate language with a high ceiling .
at present Rust The most representative high concurrency programming framework is Tokio, At the beginning of this article Future Our example is based on Tokio It's written by the framework , I won't repeat it here .
According to the official statement, every Rust Of Tokio The only task is 64 Byte size , It's better than going straight through folk Thread to network request , Efficiency will increase by several orders of magnitude , With the help of high concurrency framework , Developers can squeeze the performance of hardware to the limit .
边栏推荐
- Clickhouse optimize table comprehensive analysis
- What is the principle of intelligent image recognition? What are the applications of intelligent image recognition?
- Ligature in font design
- Micro build low code enterprise exchange day · Shenzhen station opens registration
- Why can't the fortress machine log in? What are the ways to solve the problem
- General scheme for improving reading and writing ability of online es cluster
- Disk partition extension using graphical interface and PowerShell code
- How to register a trademark? What needs to be prepared?
- How to select a server with appropriate configuration when planning to build a live broadcast platform
- Technical dry goods - how to use AI technology to accurately identify mining Trojans
猜你喜欢

Simple and beautiful weather code

Sorting out of key vulnerabilities identified by CMS in the peripheral management of red team (I)

Community pycharm installation visual database

Get to know MySQL database

QT creator tips
![[summary of interview questions] zj6 redis](/img/4b/eadf66ca8d834f049f3546d348fa32.jpg)
[summary of interview questions] zj6 redis
![[summary of interview questions] zj5](/img/d8/ece82f8b2479adb948ba706f6f5039.jpg)
[summary of interview questions] zj5

Ar 3D map technology

On Sunday, I rolled up the uni app "uview excellent UI framework"
随机推荐
Industrial security experts talk about how to build security protection capability for government big data platform?
Introduce the comparison of various distributed configuration centers? Which distributed configuration center is better?
How to set up a cloud desktop server? Is there a charge for cloud desktop server setup?
MySQL stored procedure + function
Grpc: how do I start multiple ports?
Understanding Devops from the perspective of decision makers
Case analysis | interpret the truth that multi branch enterprises choose sd-wan network reconstruction in combination with real cases
Tencent Mu Lei: real scene 3D linking industrial Internet and consumer Internet
Mocktio usage (Part 2)
JD Logistics: from giant baby to mainstay
Which domestic cloud desktop server is good? What are the security guarantees for cloud desktop servers?
Why use code signing? What certificates are required for code signing?
On Sunday, I rolled up the uni app "uview excellent UI framework"
How to design a hybrid system
Does the user need a code signing certificate? What is the use of a code signing certificate
What does elastic scaling of cloud computing mean? What are the application scenarios for elastic scaling of cloud computing?
ClickHouse Buffer
Big coffee face to face | Dr. Chen Guoguo talks about intelligent voice
How to check the progress of trademark registration? Where can I find it?
How to apply for trademark registration? What are the steps?