目录

Raft 算法

复制(Replication)

一致性算法的目的是让多台服务器/状态机能够计算得到相同的状态,同时,如果有部分机器宕机,集群作为一个整体依然能继续工作。

状态转移(State Transfer):Primary 将自己完整状态,比如说内存中的内容,拷贝并发送给 Backup。Backup 会保存收到的最近一次状态,所以 Backup 会有所有的数据。当 Primary 故障了,Backup 就可以从它所保存的最新状态开始运行。所以,状态转移就是发送 Primary 的状态。这种机制下,每过一会,Primary 就需要对自身的内存做拷贝(这里可以只发 diff),并通过网络将其发送到 Backup。

复制状态机(Replicated State Machine):复制状态机将来自客户端的操作日志从 Primary 顺序传输到 Backup,这样多台计算机从相同的状态开始,以相同的顺序,执行了确定的相同指令,达到最终一致性。这里也有一些例外,有一些指令并不是确定的,比如随机数生成器,当前时间,UUID 等,这些指令会将指令结果一起同步过去。

脑裂(Split Brain)

在一些中间件集群环境中,普遍都会有一个「大脑」去做决策。这个大脑一般通过选举产生,但是当出现网络分区的时候就有可能出现多个大脑,也就是我们说的「脑裂」

过半票决(Majority Vote)

过半票决首先得保证集群数量为奇数,这样当出现网络分区的时候,保证最多只有一个分区的数量会过半。

其次就是任何时候执行任何操作,都需要超过半数服务器批准后才能执行(注意这里服务器数量指的是所有服务器数量,故障机器也包括在内)。这种模式下,2*F+1 的节点总数,可接受的最多故障节点数就是 F。

过半票决还有一个很微妙的特性,当 Leader 易主的时候,新 Leader 一定是拿到了过半的服务器选票,那么和上一任的 Leader 选举产生的过半服务器之间必然会有重叠,也就是说在新 Leader 的过半服务器中一定有包含了旧 Leader 的所有操作的节点

Raft 基础

Raft 节点状态转换: https://static.imlgw.top/blog/20220601202043.png

Follower:从节点,初始都是 Follower。比较被动,只对其他 Server 的请求做响应,不会主动发起请求。如果选举定时器(election timeout)超时并且没有收到 Leader 任何消息,就会转变为 Candidates,发起选举请求。

Candidate:候选节点,只存在于选举阶段。选举获得多数票就会转换为 Leader。如果发现已经有了 Leader 或者自己的 Term 过时就会重新转换为 Follower。

Leader:主节点,负责响应客户端发起的请求。如果客户端请求到了 Follower,也会将请求转到 Leader 上。Leader 如果发现自己的 Term 过时就会重新转换为 Follower。

https://static.imlgw.top/blog/20220601221351.png

Raft 生命周期中可能会有多个 Leader,使用 Term 来区分多个 Leader,Raft 保证每个 Term 最多只有一个 Leader,如果选票比较分散,也可能没有 Leader(t3),则直接进入下一轮选举。

Term 可以看作 Raft 中的逻辑时间,每个节点都存有当前 Term 编号。节点可以通过 Term 来检查过期的消息,对齐各个节点进度。当节点进行通信的时候,如果发现其他节点的 Term 比自己大,就会将当前节点 Term 更新为较大的 Term,如果节点是 Leader 或者 Candidates 则会进入 Follower 状态。如果节点 Term 比自己小,则会直接忽略掉。

https://static.imlgw.top/blog/20220605205548.png

Raft 规范

  1. Election Safety:在一个任期内,最多可以选出一个 Leader
  2. Leader Append-Only:Leader 不会覆盖或者删除自己的 log,只会追加
  3. Log Matching:如果两个节点中的两个条目有相同的 log index 和 term,则它们之前的所有日志也一定相同。
  4. Leader Completeness:如果某条日志在某一任期被提交了,那么该日志一定会出现在所有更高任期的 Leader 日志中。
  5. State Machine Safety:如果一个服务器在其状态机上应用了一个给定索引的日志条目,那么没有其他服务器会在同一索引上应用一个不同的日志条目

Raft 选主

Raft 使用心跳机制来触发领导选举。一开始所有节点都是 Follower,只要能收到 Leader 的心跳信息或者 Candidate 的投票信息,就会一直处于 Follower 状态。

如果选举定时器超时了 Follower 还是没有收到任何信息,就认为 Leader 已经挂了,随即进入选举阶段,增加自己的 Term,进入 Candidate 状态,向其他节点发送 RequestVote。

func (rf *Raft) leaderElection() {
    // 增加 Term
    rf.currentTerm++
    // 变为 Candidate 状态
    rf.state = Candidate
    rf.votedFor = rf.me
    rf.persist()
    rf.resetElectionTimer()
    term := rf.currentTerm
    // 获得选票总数,首先投给自己
    voteCounter := 1
    // 当前节点最后一条 log
    lastLog := rf.log.lastLog()
    args := RequestVoteArgs{
        Term:         term,
        CandidateId:  rf.me,
        LastLogIndex: lastLog.Index,
        LastLogTerm:  lastLog.Term,
    }

    var becomeLeader sync.Once
    for serverId, _ := range rf.peers {
        if serverId != rf.me {
            // 发送 RequestVote
            go rf.candidateRequestVote(serverId, &args, &voteCounter, &becomeLeader)
        }
    }
}

发起投票后会发生下列三种情况:

  1. 获得了多数票,成为新的 Leader
  2. 其他节点已经成为了 Leader,收到了新 Leader 的心跳信息,且 Term 大于自己
  3. 选举失败,票数分散,没有 Leader 产生
func (rf *Raft) candidateRequestVote(serverId int, args *RequestVoteArgs,    voteCounter *int, becomeLeader *sync.Once) {
    reply := RequestVoteReply{}
    // 并发的发起 rpc 投票请求
    ok := rf.sendRequestVote(serverId, args, &reply)
    if !ok {
        return
    }
    // 同步的处理投票结果
    rf.mu.Lock()
    defer rf.mu.Unlock()
    // 存在更新的 Term,选举无效,更新 Term,转换为 Follower
    if reply.Term > args.Term {
        rf.setNewTerm(reply.Term)
        rf.setState(Follower)
        return
    }

    if !reply.VoteGranted {
        return
    }

    // 收到 Follower 投票
    *voteCounter++
    if *voteCounter > len(rf.peers)/2 &&
        rf.currentTerm == args.Term &&
        rf.state == Candidate {

        // 获得半数以上选票,且节点状态没变,转换为 Leader
        becomeLeader.Do(func() {
            rf.state = Leader
            lastLogIndex := rf.log.lastLog().Index
            for i, _ := range rf.peers {
                rf.nextIndex[i] = lastLogIndex + 1
                rf.matchIndex[i] = 0
            }
            rf.appendEntries(true)
        })
    }
}

当节点收到 RequestVote RPC 后会根据 Term 和 Log 进行判断是否要投票。

这里就涉及到一个选主的限制了,Raft 中并非所有节点都能成为 Leader。

这里我们考虑一个场景:A、B、C 三个节点,如果 A 为主节点期间 C 挂了,此时消息被多数节点(A,B)接收,所以 A 会提交这些日志。此时若 A 挂了,而 C 恢复且被选为主节点,则 A 已经提交的日志会被 C 的日志覆盖,从而导致状态机的状态不一致。

所以在 Raft 中限制了,只有包含了所有已经提交日志的节点,才能成为 Leader。

前面提到了日志要提交,首先需要被多数节点所接受,同时要成为 Leader 也需要获得多数节点的投票,所以两者之间必然会有重叠的节点,也就是说投票的多数节点中至少有一个节点是包含了所有已经提交的日志的。接着我们从这半数节点中选出一个“最新”的节点出来作为 Leader,就一定是包含了所有已经提交的日志的节点。

这里的”最新“比较的是两个节点最后一条 log 的 Term,如果 Term 一样,就看 Term 内最后一条 log,谁更大谁就更“新”

TODO: 画例子

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    // 候选节点任期大于当前节点,变为 Follower
    if args.Term > rf.currentTerm {
        rf.setNewTerm(args.Term)
        rf.setState(Follower)
    }

    // 候选节点任期比当前节点小,忽略
    if args.Term < rf.currentTerm {
        reply.Term = rf.currentTerm
        reply.VoteGranted = false
        return
    }

    myLastLog := rf.log.lastLog()
    // 候选节点最后一条 log 任期比当前节点大
    // 或者任期相同,但是候选节点最后一条 logIndex 比当前节点大,说明候选节点比本节点日志新
    upToDate := args.LastLogTerm > myLastLog.Term ||
        (args.LastLogTerm == myLastLog.Term && args.LastLogIndex >= myLastLog.Index)
    // 每个节点每一轮选举只能投给一个节点
    if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) && upToDate {
        reply.VoteGranted = true
        rf.votedFor = args.CandidateId
        rf.persist()
        rf.resetElectionTimer()
    } else {
        reply.VoteGranted = false
    }
    reply.Term = rf.currentTerm
}

Raft 日志复制

当节点被选为 Leader 后就开始响应客户端请求。每个客户端请求都包含了一个复制状态机需要执行的命令。Leader 将命令作为一个新的条目附加到它的 Logs 队列中,然后发送 Append Entries RPCs 给其他节点。当 log 被大多数节点复制后,Leader 会将 log 提交到状态机,同时也会将之前 Term 的 Leader 创建的 log 条目提交,并将执行结果返回给客户端。

如果 Follower 崩溃或运行缓慢,或者网络数据包丢失,Leader 会一直重试 Append Entries RPCs(甚至在它响应了客户端之后),直到所有 Follower 最终存储所有的日志条目。

但是在复杂的网络环境下,节点之间的日志可能会不一致,下面的情况都有可能发生:

https://static.imlgw.top/blog/20220603225731.png

要处理不一致首先要找到不一致的位置,Leader 为每个节点维护了一个 nextIndex[peer], 表示 Leader 将从哪里开始发送 log 给 peer。当 Leader 上台的时候就会将所有节点的 nextIndex 设置为自己的 lastLog.Index + 1

当 Leader 发送 Append Entries RPCs 给 Follower 后,Follower 会进行一致性检查,也就是判断 nextIndex[peer] 前一条 log 的 Term 和 Index 是否和当前节点 peer 一致,如果不一致就会拒绝该消息,一致就会添加新 log 到节点的日志队列中,nextIndex[peer]+1

如果 Leader 的消息被 peer 拒绝,Leader 会使 nextIndex[peer] 递减,然后再次发送 Append Entries RPCs,直到找到一致的位置,然后用 Leader 的 log[nextIndex, lastLog.Index],覆盖 Follower 的 log[nextIndex...]

通过上面的机制 Raft 就能保证:

  • 如果两个日志条目有相同的 log index 和 term,则它们的内容一定相同。
  • 如果两个节点中的两个条目有相同的 log index 和 term,则它们之前的所有日志一定相同。

Leader 发送 Append Entries RPCs

func (rf *Raft) appendEntries() {
    lastLog := rf.log.lastLog()
    for peer, _ := range rf.peers {
        if peer == rf.me {
            rf.resetElectionTimer()
            continue
        }
        // 从 nextIndex 开始发送
        nextIndex := rf.nextIndex[peer]
        if nextIndex <= 0 {
            nextIndex = 1
        }
        // 上图(Raft 论文 Figure7)中的情况 cdf,包含额外的未提交的条目
        // 直接回退到 lastLog.Index
        if lastLog.Index+1 < nextIndex {
            nextIndex = lastLog.Index
        }
        // 前一条 log 的 index 和 Term,用于 Follower 校验日志是否一致
        prevLog := rf.log.at(nextIndex - 1)
        args := AppendEntriesArgs{
            Term:         rf.currentTerm,
            LeaderId:     rf.me,
            PrevLogIndex: prevLog.Index,
            PrevLogTerm:  prevLog.Term,
            Entries:      make([]Entry, lastLog.Index-nextIndex+1),
            // 目前 Leader 提交到状态机的 logIndex
            LeaderCommit: rf.commitIndex,
        }
        // 发送 [nextIndex, lastLog.Index]
        copy(args.Entries, rf.log.slice(nextIndex))
        go rf.leaderSendEntries(peer, &args)
    }
}

快速恢复(Fast Backup)

raft 论文中日志回退是一条一条回退的,如果 Follower 掉线时间过长,一条条回退会特别慢,原文中有提到优化方法,但是没有细说

If desired, the protocol can be optimized to reduce the number of rejected AppendEntries RPCs. For example, when rejecting an AppendEntries request, the follower can include the term of the conflicting entry and the first index it stores for that term. With this information, the leader can decrement nextIndex to bypass all of the conflicting entries in that term; one AppendEntries RPC will be required for each term with conflicting entries, rather than one RPC per entry. In practice, we doubt this optimization is necessary, since failures happen infrequently and it is unlikely that there will be many inconsistent entries.

如果需要,该协议可以被优化以减少被拒绝的 AppendEntries RPC 的数量。例如,当拒绝一个 AppendEntries 请求时,跟随者可以包括冲突条目的任期和它为该任期存储的第一个索引。有了这些信息,领导者可以递减 nextIndex 以绕过该任期中的所有冲突条目;每个有冲突条目的任期将需要一个 AppendEntries RPC,而不是每个条目一个 RPC。在实践中,我们怀疑这种优化是否有必要,因为故障不常发生,而且不太可能有很多不一致的条目。

mit6.824 中 Morris 提到了一种优化方式:

在 Follower 回复 Leader 的 AppendEntriesReply 中添加几个字段

  • XTerm: 冲突的 Term,Follower 拒绝 Leader 后会将 XTerm 设置为自己的 Term,如果 Follower 日志较少,对应位置没有 log,设置 XTerm 为 -1
  • XIndex: XTerm 的第一条 log 的 index
  • XLen: Follower 日志较少,XTerm = -1 时,log 日志长度+1,Leader 下次就从 Xlen 开始发

Follower 收到消息后对应处理

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    reply.Success = false
    reply.Term = rf.currentTerm

    // 忽略,Leader 的 Term 比自己小
    if args.Term < rf.currentTerm {
        return
    }

    // 比自己任期大,更新任期
    if args.Term > rf.currentTerm {
        rf.setNewTerm(args.Term)
        rf.setState(Follower)
        reply.Term = args.Term
    }

    // 重置 Election 定时器
    rf.resetElectionTimer()

    // Candidate 收到了 AppendEntries 请求,转换为 Follower
    if rf.state == Candidate {
        rf.setState(Follower)
    }

    // Follower 日志较少,Pre 处没有 log
    if rf.log.lastLog().Index < args.PrevLogIndex {
        reply.XTerm = -1
        reply.XIndex = -1
        reply.XLen = rf.log.len()
        return
    }

    // Pre 处有 log 但是 Term 不一样
    if rf.log.at(args.PrevLogIndex).Term != args.PrevLogTerm {
        // 获取 Follower 冲突位置的 Term
        xTerm := rf.log.at(args.PrevLogIndex).Term
        for xIndex := args.PrevLogIndex; xIndex > 0; xIndex-- {
            // XTerm 第一条 log 的 index
            if rf.log.at(xIndex-1).Term != xTerm {
                reply.XIndex = xIndex
                break
            }
        }
        reply.XTerm = xTerm
        reply.XLen = rf.log.len()
        return
    }

    // 匹配上了
    for idx, entry := range args.Entries {
        // 正常情况下 entry.Index 肯定是大于 lastLog.Index 的
        // 这里小于等于,且任期不一致,说明 Follower 有额外的未提交的日志,需要覆盖掉
        if entry.Index <= rf.log.lastLog().Index && rf.log.at(entry.Index).Term != entry.Term {
            // 截断 Index 后的日志
            rf.log.truncate(entry.Index)
            rf.persist()
        }
        // Follower 复制 log
        if entry.Index > rf.log.lastLog().Index {
            rf.log.append(args.Entries[idx:]...)
            rf.persist()
            break
        }
    }

    // 设置当前节点提交进度
    if args.LeaderCommit > rf.commitIndex {
        rf.commitIndex = min(args.LeaderCommit, rf.log.lastLog().Index)
        // 应用到状态机
        rf.apply()
    }
    reply.Success = true
}

Leader 收到回复后就可以针对不同情况,快速回滚

func (rf *Raft) leaderSendEntries(serverId int, args *AppendEntriesArgs) {
    var reply AppendEntriesReply
    ok := rf.sendAppendEntries(serverId, args, &reply)
    if !ok {
        return
    }
    // 加锁,同步处理结果
    rf.mu.Lock()
    defer rf.mu.Unlock()
    // 返回的 Term 比 Leader 大,当前 Leader 节点过时了
    // 更新 Term 转换为 Follower
    if reply.Term > rf.currentTerm {
        rf.setNewTerm(reply.Term)
        rf.setState(Follower)
        return
    }
    // 发送到接收需要一段时间,确认状态,确认还是在自己的任期
    if args.Term == rf.currentTerm {
        // 没有冲突,校验成功
        if reply.Success {
            // 已知的 Follower 复制到的位置
            match := args.PrevLogIndex + len(args.Entries)
            next := match + 1
            rf.nextIndex[serverId] = max(rf.nextIndex[serverId], next)
            rf.matchIndex[serverId] = max(rf.matchIndex[serverId], match)
        } else {
            if reply.XTerm == -1 {
                // 有冲突,且 Follower 日志少,下一次从 XLen 开始
                // i: 1    2    3   4   5
                //        XLen
                // F: 4  
                // L: 4    6    6   6
                //             Pre     Next
                rf.nextIndex[serverId] = reply.XLen
            } else {
                // 找到 Leader 中 XTerm 中后一条日志的 Index,从这条再开始
                lastLogInXTerm := rf.findNextLogInTerm(reply.XTerm)
                if lastLogInXTerm > 0 {
                    // i: 1    2    3    4    5
                    // F: 4    4    4
                    // L: 4    6    6    6
                    //       Last  Pre       Next
                    rf.nextIndex[serverId] = lastLogInXTerm
                } else {
                    // Leader 没有 XTerm 直接从 XIndex 开始
                    // i: 1    2    3    4    5
                    //      XIndex
                    // F: 4    5    5
                    // L: 4    6    6   6
                    //             Pre      Next
                    rf.nextIndex[serverId] = reply.XIndex
                }
            }
        }
        // 提交 log
        rf.leaderCommit()
    }
}

Leader 最终提交 log

func (rf *Raft) leaderCommit() {
    // 确认状态
    if rf.state != Leader {
        return
    }

    // 从上次 commitIndex 开始提交
    for n := rf.commitIndex + 1; n <= rf.log.lastLog().Index; n++ {
        // 只关心自己任期内的 log 是否复制到多数节点
        // 当前任期内日志的提交后,上一任的间接的也就提交了
        if rf.log.at(n).Term != rf.currentTerm {
            continue
        }
        counter := 1
        for serverId := 0; serverId < len(rf.peers); serverId++ {
            // follower 已经复制的 Index >= n
            if serverId != rf.me && rf.matchIndex[serverId] >= n {
                counter++
            }
            // 超过半数已经复制了 log[n]
            if counter > len(rf.peers)/2 {
                // 提交日志,并且应用到状态机
                rf.commitIndex = n
                rf.apply()
                break
            }
        }
    }
}

Raft 响应

Raft Start,只能由 Leader 响应,接收一个 command,添加到 Leader 日志队列中。然后等待下一次 AppendEntries 进而同步给其他节点。这里也可以 Start 后直接发送 AppendEntries,但是如果 Start 太频繁可能会造成 RPC 频繁,需要根据不同场景进行调整,论文中也并没有明确说明。

func (rf *Raft) Start(command interface{}) (int, int, bool) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    if rf.state != Leader {
        return -1, rf.currentTerm, false
    }
    index := rf.log.lastLog().Index + 1
    term := rf.currentTerm

    log := Entry{
        Command: command,
        Index:   index,
        Term:    term,
    }
    rf.log.append(log)
    rf.persist()

    return index, term, true
}

Raft 持久化

持久化是为了在某些情况下,当服务器故障重启时,能继续之前的状态,避免丢失数据。

在 Raft 论文中,标识了 currentTerm、voteFor、logs 是需要持久化的数据。

logs 需要被持久化很好理解,这是唯一记录了应用程序状态的地方,而 voteFor 和 currentTerm 需要被持久化的原因都是为了避免一个 Term 出现两个 Leader。假设一个节点刚给一个候选人投了票,结果自己 down 了,然后重启后发现自己 voteFor 是空的,那么就还可能给其他候选人投票,就违背了一个节点一个任期只能投一张票的原则。currentTerm 也类似,如果节点重启后不知道当前的任期,就可能从一个历史的任期开始工作,就可能会导致一个任期多个 Leader。

Raft 日志快照

Raft 成员变更