voiddoLogReplication(){ appendEntriesArgs->set_term(currentTerm); appendEntriesArgs->set_leaderid(id); appendEntriesArgs->set_prevlogindex(prevlogindex); appendEntriesArgs->set_prevlogterm(prevlogterm); appendEntriesArgs->set_leadercommit(leaderCommit); appendEntriesArgs->set_entries(request_log); heartBeatTimer.reset(); for (each server in server_list){ if (status!=Leader) break; thread_pool_add_work(sendAppendEntries,server_id,appendEntriesArgs); } }
appendEntriesArgs->set_prevlogindex(-1); appendEntriesArgs->set_prevlogterm(-1); appendEntriesArgs->set_entries([]); heartBeatTimer.reset(); for (each server in server_list){ if (status!=Leader) break; // 如果日志状态不同步,则发送最后一条日志来逐个同步 if (nextIndex[server_id] != prevlogindex){ appendEntriesArgs->set_prevlogindex(prevlogindex); appendEntriesArgs->set_prevlogterm(prevlogterm); appendEntriesArgs->set_entries(entries); } thread_pool_add_work(sendAppendEntries,server_id,appendEntriesArgs); } }
Status sendAppendEntries(int server_id, AppendEntriesArgs &appendEntriesArgs){ Status status = stub_->appendEntriesRPC(&context, appendEntriesArgs, &reply); if (!status.ok()) return connectFailed;
// 清空选举列表(用列表记录所有结点的投票结果,而不只统计票数,防止一个结点因为网络重传等原因投了多票) vote_list.reset(); tickets = 1; // 投给自己 for (each server in server_list){ if (status != Candidate) // 在选举过程中可能自己会退化为Follower,发送前判断避免无效通信 break; // 使用线程池异步发送选举请求(若采用同步则每次至少需要等2个RTT) thread_pool_add_work(sendRequestVote,server_id,requestVoteArgs,vote_list,tickets); } }
Status sendRequestVote(int server_id,RequestVoteArgs *requestVoteArgs, vector<bool> &vote_list,int &tickets){ // client调用requestVoteRPC服务 Status status = stub_->requestVoteRPC(&context, requestVoteArgs, &reply); if (!status.ok()) return connectFailed;
// Follower term > Candidate term(说明当前Candidate已落后集群中至少一半结点,退化为Follower) if (currentTerm < reply->term()){ status = Follower; currentTerm = reply.term; voteFor = -1; persist(); if (status!=Follower) thread_pool_add_work(becomeFollower,NULL); return failedCampaign; }
// 其他拒绝的情况(详见server的requestVoteRPC服务) if (reply->voteGranted() == false){ if (status != Follower) thread_pool_add_work(becomeFollower,NULL); return failedCampaign; }
// 获得选票 if (vote_list[server_id]!=0){ tickets++; // 获得集群中所有结点超过半数选票则当选,成为Leader if (tickets >= server_list.size() / 2 + 1){ if (status != Leader) thread_pool_add_work(becomeLeader,NULL); return winCampaign; } } }
voidpersist(){ if (status not changed) // copy on write return; persistRaftNode.currentTerm = currentTerm; persistRaftNode.votedFor = votedFor; for (item in entries) { persistRaftNode.log.push_back(item); } write_to_file(file_path,persistRaftNode.SerializeAsString()); }
在节点数能够达到 quorum 的分区中,选举流程会正常进行,该分区中的所有节点的 term 最终会稳定为新选举出的 Leader 节点的term。不幸的是,在节点数无法达到 quorum 的分区中,如果该分区中没有 Leader 节点,因为节点总是无法收到数量达到quorum的投票而不会选举出新的 Leader ,所以该分区中的节点在 election timeout 超时后,会增大 term 并发起下一轮选举,这导致该分区中的节点的term会不断增大。
如果网络一直没有恢复,这是没有问题的。但是,如果网络分区恢复,此时,达不到 quorum 的分区中的节点的 term 值会远大于能够达到 quorum 的分区中的节点的 term ,这会导致能够达到 quorum 的分区的 Leader退位(step down)并增大自己的 term 到更大的term,使集群产生一轮不必要的选举。
Pre-Vote 机制就是为了解决这一问题而设计的,其解决的思路在于不允许达不到 quorum 的分区正常进入投票流程,也就避免了其 term 号的增大。为此,Pre-Vote 引入了“预投票”,也就是说,当节点 election timeout 超时时,它们不会立即增大自身的 term 并请求投票,而是先发起一轮预投票。收到预投票请求的节点不会退位。只有当节点收到了达到 quorum 的预投票响应时,节点才能增大自身term 号并发起投票请求。这样,达不到 quorum 的分区中的节点永远无法增大 term ,也就不会在分区恢复后引起不必要的一轮投票。