首先, 获取"集群"的 ReadIndex. 注意, 不是某个节点上面的某个状态, 而集群的多个节点共同确定的一个变量. 这个变量如何获取, 下面说明.
func GetClusterReadIndex(){ foreach(peers as peer){ i = peer.rpc_GetLastIndex(); ret = max(ret, i); if(recv_majority){ break; } } return ret; }
获取至少半数以上成员的 LastIndex, 也就是每个节点持久化的最新一条日志的 index. 一条日志只要在一个节点上被持久化, 那么这条日志要么被集群 commit, 要么被覆盖之后再 commit, 没有其它的选项.
由此可见, ReadIndex 不一定已经被 commit, 但一定会被 commit, 最终也将会 apply. 所以, 拿到集群的 ReadIndex 之后, 我们只需要等, 等它被任意一个节点 apply 即可.
ReadIndex = cluster.GetReadIndex(); while(1){ foreach(peers as peer){ if(ReadIndex <= peer.LastApplied()){ return peer.GetValue(); } } }
不断地重复轮询全部节点, 如果谁 apply 了, 就以谁的值为准. 还有一种方案, 那就是等本地的状态机 apply, 这样避免网络传输.
ReadIndex = cluster.GetReadIndex(); while(1){ if(ReadIndex <= self.LastApplied()){ return self.GetValue(); } }
还有一种优化方案, 是客户端判断. 客户端请求多数节点, 获取 max pending 和 max committed, 如果 committed 大于等于 pending, 那么取对应的 value. 如果 pending 大于 committed, 说明集群中有"脏写"(未决的数据), 客户端要做的就是等待. 如果不等待, 那就做 Paxos phase 2 所讲的"读修复".
没什么玄幻的, 道理非常简单: 第一步, 查询集群的状态; 第二步, 根据状态判断没有脏数据就返回, 有的话要么主动修复, 要么等待; 这是一致性的基础, 也是一致性的全部.