0
点赞
收藏
分享

微信扫一扫

【Rust投稿】从零实现消息中间件(6)-CLIENT

功能设计

client实现功能相对比较单一,就是能够向服务器pub消息,然后就会说订阅消息,订阅的主题收到消息以后能够得到通知.因此总结起来就是下面三个功能:

  1. 提供pub接口
  2. 提供sub接口
  3. 处理sub后收到的消息

数据结构定义

提供给用户的接口是上面的三个,
为了实现这三个接口,client一定要有的就是
writer以及handler. 而sid则是因为一个client可以有多个sub,每一个sub要有唯一的id,主要是编号用. stop则是为了client正常关闭使用.

#[derive(Debug)]
pub struct Client {
addr: String,
writer: Arc<Mutex<WriteHalf<TcpStream>>>,
pub stop: oneshot::Sender<()>,
sid: u64,
handler: Arc<Mutex<HashMap<String,
mpsc::UnboundedSender<Vec<u8>>>>>,
}


接口-connect

connect的功能非常直白就是创建和服务器的连接,同时后台会启动一个任务来处理tcp连接,主要是接收消息.

pub async fn connect(addr: &str) -> std::io::Result<Client> {}


接口-pub_message

向服务器发布一条pub消息

pub async fn pub_message(&mut self, 
subject: &str,
msg: &[u8])
-> std::io::Result<()> {}


接口-sub_message

向服务器发布一条sub消息,然后等待服务器推送相关消息.
需要说明的是这里的参数
subjectqueue完全没有必要使用String,&str即可. 这应该是rust的一个bug,在1.41和nightly 1.43都是编译不过去的.所以退而求其次,使用了String.

//sub消息格式为SUB subject {queue} {sid}\r\n
pub async fn sub_message(
&mut self,
subject: String,
queue: Option<String>,
handler: MessageHandler,
) -> std::io::Result<()> {}


receive_task

receive_task主要是做消息的接收,解析,以及将消息派发给合适的handler.
这个其实是本模块最复杂的地方,总体上比较直观.
主要有以下两点

  1. 使用futures::select这个宏来辅助实现同时监控多个future
  2. TcpStream如果read到size为0,说明连接已经关闭,无需继续
async fn receive_task(
mut reader: ReadHalf<TcpStream>,
stop: oneshot::Receiver<()>,
handler: Arc<Mutex<HashMap<String,
mpsc::UnboundedSender<Vec<u8>>>>>,
writer: Arc<Mutex<WriteHalf<TcpStream>>>,
)


API的使用

pub

c.pub_message("test", format!("hello{}", i).as_bytes())
.await?;


sub

c.sub_message(
"test".into(),
None,
Box::new(move |msg| {
println!("recevied:{}", unsafe { std::str::from_utf8_unchecked(msg) });
Ok(())
}),
)


代码实现

type MessageHandler = Box<dyn FnMut(&[u8]) -> std::result::Result<(), ()> + Sync + Send>;
//#[derive(Debug)]
pub struct Client {
addr: String,
writer: Arc<Mutex<WriteHalf<TcpStream>>>,
pub stop: Option<oneshot::Sender<()>>,
sid: u64,
handler: Arc<Mutex<HashMap<String, MessageHandler>>>,
}

impl Client {
//1. 建立到服务器的连接
//2. 启动后台任务
pub async fn connect(addr: &str) -> std::io::Result<Client> {
let conn = TcpStream::connect(addr).await?;
let (reader, writer) = tokio::io::split(conn);
let (tx, rx) = tokio::sync::oneshot::channel();
let c = Client {
addr: addr.into(),
writer: Arc::new(Mutex::new(writer)),
stop: Some(tx),
sid: 0,
handler: Arc::new(Default::default()),
};
let handler = c.handler.clone();
let writer = c.writer.clone();
/*
tokio::spawn 可以认为和go语言中的
go func(){}()
*/
tokio::spawn(async move {
Self::receive_task(reader, rx, handler, writer).await;
});
Ok(c)
}
/*
从服务器接收pub消息
然后推送给相关的订阅方。
*/
async fn receive_task(
mut reader: ReadHalf<TcpStream>,
stop: oneshot::Receiver<()>,
handler: Arc<Mutex<HashMap<String, MessageHandler>>>,
writer: Arc<Mutex<WriteHalf<TcpStream>>>,
) {
let mut buf = [0 as u8; 512];
let mut parser = Parser::new();
use futures::*;
let mut stop = stop.fuse();
loop {
select! {
_=stop=>{
println!("client closed");
return;
}
r = reader.read(&mut buf[..]).fuse()=>{
let n = {
match r {
Err(e) => {
println!("read err {}", e);
let _ = writer.lock().await.shutdown().await;
return;
}
Ok(n) => n,
}
};
if n == 0 {
//EOF,说明对方关闭了连接
return;
}
let mut buf2 = &buf[..n];
loop {
let r = parser.parse(buf2);
let (r, n) = match r {
Err(e) => {
println!("parse err {}", e);
let _ = writer.lock().await.shutdown().await;
return;
}
Ok(r) => r,
};
// println!("receive msg {:?}", r);
match r {
ParseResult::NoMsg => {
break;
}
ParseResult::MsgArg(msg) => {
Self::process_message(msg, &handler).await;
parser.clear_msg_buf();
}
}
//缓冲区处理完毕
if n == buf.len() {
break;
}
buf2 = &buf2[n..]
}
}
}
}
}
/*
根据消息的subject,找到订阅方,
然后推送给他们
*/
pub async fn process_message(
msg: MsgArg<'_>,
handler: &Arc<Mutex<HashMap<String, MessageHandler>>>,
) {
// println!("broadcast msg {}", msg.subject);
let mut handler = handler.lock().await;
let h = handler.get_mut(msg.subject);
if let Some(h) = h {
let _ = h(msg.msg);
}
}
//pub消息格式为PUB subject size\r\n{message}
pub async fn pub_message(&self, subject: &str, msg: &[u8]) -> std::io::Result<()> {
let mut writer = self.writer.lock().await;
let m = format!("PUB {} {}\r\n", subject, msg.len());
let _ = writer.write_all(m.as_bytes()).await;
let _ = writer.write_all(msg).await;
writer.write_all("\r\n".as_bytes()).await
}

//sub消息格式为SUB subject {queue} {sid}\r\n
//可能由于rustc的bug,导致如果subject是&str,则会报错E0700,暂时使用String来替代
pub async fn sub_message(
&mut self,
subject: String,
queue: Option<String>,
handler: MessageHandler,
) -> std::io::Result<()> {
self.sid += 1;
let mut writer = self.writer.lock().await;
let m = if let Some(queue) = queue {
format!("SUB {} {} {}\r\n", subject.as_str(), queue, self.sid)
} else {
format!("SUB {} {}\r\n", subject.as_str(), self.sid)
};
self.handler.lock().await.insert(subject, handler);
writer.write_all(m.as_bytes()).await
}
pub fn close(&mut self) {
if let Some(stop) = self.stop.take() {
if let Err(e) = stop.send(()) {
println!("stop err {:?}", e);
}
}
}
}


其他

相关代码都在我的github rnats 欢迎围观

​​https://github.com/nkbai/learnrustbynats​​

举报

相关推荐

0 条评论