导航
Golang RPC实现
- day01 我们实现了简单的服务端和客户端。
- 我们简单总结一下day01的模式。
- 服务端按顺序处理客户端过来的请求,按顺序响应客户端的请求。
- 客户端以同步的方式发送请求,不能并发发出请求。
- 那么我们day02干的事情就是,让客户端异步并发的发出请求(请求顺序变得随机),服务端依然是按请求顺序进行处理,处理完某一个请求就返回,可以不按请求的顺序响应数据,但是响应数据是要上锁的,否则会发生响应数据并发安全问题。
- 主要逻辑是修改了客户端的代码,服务端和day01没有变化
一、客户端异步并发多个请求
1、 客户端结构体
type Client struct {
cc codec.Codec
opt *Option
sending sync.Mutex
header codec.Header
mu sync.Mutex
seq uint64
pending map[uint64]*Call
closing bool
shutdown bool
}
2、 一个客户端,异步发送多个请求,使用call结构体代表客户端的每次请求
type Call struct {
Seq uint64
ServiceMethod string
Args interface{}
Reply interface{}
Error error
Done chan *Call
}
3、客户端并发多个请求
func main() {
log.SetFlags(0)
addr := make(chan string)
go startServer(addr)
client, _ := geerpc.Dial("tcp", <-addr)
defer func() { _ = client.Close() }()
time.Sleep(time.Second)
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
args := fmt.Sprintf("geerpc req %d", i)
var reply string
if err := client.Call("Foo.Sum", args, &reply); err != nil {
log.Fatal("call Foo.Sum error:", err)
}
log.Println("reply:", reply)
}(i)
}
wg.Wait()
}
func (client *Client) Call(serviceMethod string, args, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
func (client *Client) Go(serviceMethod string, args, reply interface{}, done chan *Call) *Call {
if done == nil {
done = make(chan *Call, 10)
} else if cap(done) == 0 {
log.Panic("rpc client: done channel is unbuffered")
}
call := &Call{
ServiceMethod: serviceMethod,
Args: args,
Reply: reply,
Done: done,
}
client.send(call)
return call
}
func (client *Client) send(call *Call) {
client.sending.Lock()
defer client.sending.Unlock()
seq, err := client.registerCall(call)
if err != nil {
call.Error = err
call.done()
return
}
client.header.ServiceMethod = call.ServiceMethod
client.header.Seq = seq
client.header.Error = ""
if err := client.cc.Write(&client.header, call.Args); err != nil {
call := client.removeCall(seq)
if call != nil {
call.Error = err
call.done()
}
}
}
4、客户端接收请求
func (client *Client) receive() {
var err error
for err == nil {
var h codec.Header
if err = client.cc.ReadHeader(&h); err != nil {
break
}
call := client.removeCall(h.Seq)
switch {
case call == nil:
err = client.cc.ReadBody(nil)
case h.Error != "":
call.Error = fmt.Errorf(h.Error)
err = client.cc.ReadBody(nil)
call.done()
default:
err = client.cc.ReadBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
client.terminateCalls(err)
}
