(1)当客户端向etcd集群发送了一次请求之后,请求中的封装Entry记录会先被提交给etcd-raft模块进行处理,其中,etcd-raft模块会先将Entry记录保存到raftLog.unstable中。
我们以ETCD数据库源码分析——服务端PUT流程为例,kvServer结构体Put函数处理流程如下:首先会对请求消息进行各方面的检查,检查完之后会将所有的请求交给其内封装的RaftKV接口进行处理,待处理完成得到响应消息之后,会通过header.fill()方法填充响应的头信息,最后将完整的响应消息返回给客户端。因此,往下追踪 srv.(KVServer).Put(ctx, in)其实就是调用(s *EtcdServer) Put()函数。
从raftRequestOnce流程看出最终调用的是 (s *EtcdServer) processInternalRaftRequestOnce(…)函数,在该函数里面有一句关键调用 s.r.Propose(cctx, data)。s是EtcdServer, r是其里面的成员变量raftNode, 这就是进入raft协议相关的节奏了。通过对Propose函数的追踪,我们可以看到最终函数走到了stepWithWaitOption
// raft/node.go
func (n *node) Propose(ctx context.Context, data []byte) error { return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) } // 这里给Message加上了Type为pb.MsgProp,Entry为初入的PutRequest
func (n *node) stepWait(ctx context.Context, m pb.Message) error { return n.stepWithWaitOption(ctx, m, true) }
// Step advances the state machine using msgs. The ctx.Err() will be returned, if any.
func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
if m.Type != pb.MsgProp { // 这里不会走
select {
case n.recvc <- m: return nil
case <-ctx.Done(): return ctx.Err()
case <-n.done: return ErrStopped
}
}
ch := n.propc // 取出node结构体提供的propc通道
pm := msgWithResult{m: m}
if wait { pm.result = make(chan error, 1) }
select {
case ch <- pm: if !wait { return nil } // 将带有msg的pm结构体放入propc通道
case <-ctx.Done(): return ctx.Err()
case <-n.done: return ErrStopped
}
select {
case err := <-pm.result:
if err != nil { return err }
case <-ctx.Done(): return ctx.Err()
case <-n.done: return ErrStopped
}
return nil
}
从raft/node.go的run函数可以看出,当从propc通道中取出msgWithResult,在从其中取出消息msg,设置msg的来源节点为本节点id。然后调用raft模块的Step函数驱动状态机。
(2)etcd-raft模块将该Entry记录封装到前面介绍的Ready实例中,返回给上层模块进行持久化。
(3)当上层模块收到待持久化的Entry记录之后,会先将其记录到WAL日志文件中,然后进行持久化操作,最后通知etcd-raft模块进行处理。
(4)此时etcd-raft模块就会将该Entry记录从unstable移动到storage中保存。
(5)待该Entry记录被复制到集群中的半数以上节点时,该Entry记录会被Leader节点确认为已提交(committed),并封装进Ready实例返回给上层模块。
(6)此时上层模块即可将该Ready实例中携带的待应用Entry记录应用到状态机中。