Lab 链接:https://pdos.csail.mit.edu/6.824/labs/lab-kvraft.html

lab4是实现一个基于raft的k/v数据库,相比于lab2,lab4会存在多个server,因此需要raft协议来实现一致性。为了保证一致性,我们还需要lab2中的思想,利用rpc的序列号来判断重复请求,保证重复的Put/Append请求只在第一次出现时应用到状态机。

具体而言,系统的整体运行流程如下:

  • 客户端将请求发送给Leader
  • Leader的应用层(K/V数据库)将操作向下发送到自己的Raft层
  • Leader的Raft层将操作发送到多个副本Server
  • 副本回复Leader日志拷贝成功
  • Leader的Raft收到过半回复向上层应用层服务提示操作可执行
  • 应用层真正执行操作并告知Client结果
  • 其余副本也需要想自己的应用层应用已提交的日志

4A 无快照的KVRaft

4A是实现无快照的K/Vserver,我们延续在lab2中的基本设计,Rpc结构如下:

1
2
3
4
5
6
7
8
9
10
11
type PutAppendArgs struct {
Key string
Value string
ClientId int64
RpcSeq uint64
}
type GetArgs struct {
Key string
ClientId int64
RpcSeq uint64
}

同时添加一些用于判断的错误信息:

1
2
3
4
5
6
7
const (
OK = "OK"
ErrNoKey = "ErrNoKey"
ErrWrongLeader = "ErrWrongLeader"
ErrHandleOpTimeout = "ErrHandleOpTimeout"
ErrChanClosed = "ErrChanClosed"
)

对于Client端,与lab2类似,我们不断轮询Server即可,不过由于Raft协议,我们需要增加一个leaderId,客户端只能与leader进行访问,代码比较简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (ck *Clerk) PutAppend(key string, value string, op string) {
// You will have to modify this function.
args := &PutAppendArgs{Key: key, Value: value, Op: op, Seq: ck.GetSeq(), Identifier: ck.identifier}

for {
reply := &PutAppendReply{}
ok := ck.servers[ck.leaderId].Call("KVServer.PutAppend", args, reply)
if !ok || reply.Err == ErrNotLeader || reply.Err == ErrLeaderOutDated {
ck.leaderId += 1
ck.leaderId %= len(ck.servers)
continue
}

switch reply.Err {
case ErrChanClose:
continue
case ErrHandleOpTimeOut:
continue
}
return
}
}

Server端,我们知道每一个请求信息通过接口Start交付给Raft层后,必须在raft层commit后才能进行应用,这之间存在一定的等待时间。因此,必然需要一个协程不断接收raft层的commit日志来进行一系列的处理,包括重复Rpc的判断、数据库操作等。

Server结构体如下:在lab2的基础上我们增加了一个raft当中的index 到 chan LastRpc的map 来记录等待commit信息的RPC handler(Get、Put等操作)的通道。
而在LastRpc中增加一个ApplyTerm,记录commit被apply时的term, 因为其可能与Start相比发生了变化, 需要将这一信息返回给客户端。
因此LastRpc不再仅仅作为上一次RPC的记录,而是作为已发送过的Rpc的记录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type LastRpc struct {
LastRpcSeq uint64
Err Err
Value string
ApplyTerm int
}

type KVServer struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
dead int32 // set by Kill()

maxraftstate int // snapshot if log grows this big

// Your definitions here.
kvMap map[string]string
lastRpc map[int64]LastRpc // clientId -> lastRpcSeq
waiCh map[int]chan LastRpc
}

Get、Put等Rpc Handler的设计结构都比较简单,如下:

1
2
3
4
5
6
7
func (kv *KVServer) Put(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.
opArgs := &Op{OpType: OpPut, RpcSeq: args.RpcSeq, Key: args.Key, Value: args.Value, ClientId: args.ClientId}

res := kv.OpHandler(opArgs)
reply.Err = res.Err
}

Get、Put和Append都将请求封装成Op结构体,然后调用OpHandler函数进行处理。OpHandler利用通道等待ApplyHandler处理完消息后返回结果给客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (kv *KVServer) OpHandler(opArgs *Op) (res LastRpc) {
kv.mu.Lock()
if rpc, ok := kv.lastRpc[opArgs.ClientId]; ok && rpc.LastRpcSeq == opArgs.RpcSeq {
kv.mu.Unlock()
return rpc
}
kv.mu.Unlock()

startIndex, startTerm, isLeader := kv.rf.Start(*opArgs)
if !isLeader {
return LastRpc{Err: ErrWrongLeader, Value: ""}
}

kv.mu.Lock()
newCh := make(chan LastRpc)
kv.waiCh[startIndex] = newCh
kv.mu.Unlock()

defer func() {
kv.mu.Lock()
delete(kv.waiCh, startIndex)
close(newCh)
kv.mu.Unlock()
}()

select {
case <-time.After(HandleOpTimeout):
res.Err = ErrHandleOpTimeout
return
case msg, success := <-newCh:
if success && msg.ApplyTerm == startTerm {
res = msg
return
} else if !success {
res.Err = ErrChanClosed
return
} else {
res.Err = ErrWrongLeader
res.Value = ""
return
}
}
}

ApplyHandler需要不断等待kv.applyCh发来新的已commited并且可以apply的消息:

  • 判断log请求的ClientId和RpcSeq是否在历史记录lastRpc中是否存在, 如果存在就直接返回历史记录
  • 不存在就需要应用到状态机, 并更新历史记录
  • 如果log请求的CommandIndex对应的key在waiCh中存在, 表面当前节点可能是一个Leader, 需要将结果发送给RPC handler。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
func (kv *KVServer) ApplyHandler() {
for !kv.killed() {
log := <-kv.applyCh
if log.CommandValid {
op := log.Command.(Op)
kv.mu.Lock()

needApply := false
var res LastRpc
if lastrpc, ok := kv.lastRpc[op.ClientId]; ok {
if lastrpc.LastRpcSeq == op.RpcSeq {
res = lastrpc
} else if lastrpc.LastRpcSeq < op.RpcSeq {
needApply = true
}
} else {
needApply = true
}

if needApply {
res = kv.KvMapExecute(&op)
res.ApplyTerm = log.SnapshotTerm
kv.lastRpc[op.ClientId] = res
}

ch, ok := kv.waiCh[log.CommandIndex]
if !ok {
kv.mu.Unlock()
continue
}
kv.mu.Unlock()
res.ApplyTerm = log.SnapshotTerm
ch <- res
}
}
}

func (kv *KVServer) KvMapExecute(op *Op) (res LastRpc) {
res.LastRpcSeq = op.RpcSeq
switch op.OpType {
case OpGet:
value, ok := kv.kvMap[op.Key]
if ok {
res.Value = value
return
} else {
res.Err = ErrNoKey
res.Value = ""
return
}
case OpPut:
kv.kvMap[op.Key] = op.Value
return
case OpAppend:
value, ok := kv.kvMap[op.Key]
if ok {
kv.kvMap[op.Key] = value + op.Value
return
} else {
kv.kvMap[op.Key] = op.Value
return
}
}
return
}

测试结果如下:

4B 有快照的KVRaft

lab 4B是在4A的基础上实现快照。当底层raft的log过大时生成快照并截断日志,并将快照持久化存储到本地。

实际逻辑并不复杂,但是正如lab中所说的一样,测试时raft的一些边界情况和小细节会暴露出来,进而需要调试很久。

快照中应该包括内存的数据库、每个clerk序列号的记录信息、同时也应该记录最近一次应用到状态机的日志索引, 凡是低于这个索引的日志都是包含在快照中的。

因此在server结构体中添加以下成员:

1
2
persister    *raft.Persister
lastApplied int

生成和加载快照的函数防止raft中的写即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (kv *KVServer) SnapShot() []byte {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)

e.Encode(kv.kvMap)
e.Encode(kv.lastRpc)

snapshot := w.Bytes()
return snapshot
}

func (kv *KVServer) ReadSnapShot(snapShot []byte) {
if len(snapShot) == 0 || snapShot == nil {
// DPrintf("server %v LoadSnapShot: snapShot is nil", kv.me)
return
}

r := bytes.NewBuffer(snapShot)
d := labgob.NewDecoder(r)

kvmap := make(map[string]string)
lastrpc := make(map[int64]LastRpc)

if d.Decode(&kvmap) != nil || d.Decode(&lastrpc) != nil {
// DPrintf("server %v LoadSnapShot: decode error", kv.me)s
} else {
kv.kvMap = kvmap
kv.lastRpc = lastrpc
}
}

什么时候我们需要生成快照呢?即当我们收到log进行判断即可,低于lastApplied索引的日志都是包含在快照中d ,因此我们直接跳过即可,对于kv.maxraftstate != -1 && kv.persister.RaftStateSize() >= kv.maxraftstate,则表示需要生成日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (kv *KVServer) ApplyHandler() {
...
if log.CommandIndex <= kv.lastApplied {
kv.mu.Unlock()
continue
}
...
if kv.maxraftstate != -1 && kv.persister.RaftStateSize() >= kv.maxraftstate {
snapShot := kv.SnapShot()
kv.rf.Snapshot(log.CommandIndex, snapShot)
}
kv.mu.Unlock()
...
}

什么时候加载日志呢?则是当ApplyHandler从通道收到的日志信息表明这是一个快照时,则需要加载日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (kv *KVServer) ApplyHandler() {
for !kv.killed() {
log := <-kv.applyCh
if log.CommandValid {
...
} else if log.SnapshotValid {
kv.mu.Lock()
if log.SnapshotIndex >= kv.lastApplied {
kv.ReadSnapShot(log.Snapshot)
kv.lastApplied = log.SnapshotIndex
}
kv.mu.Unlock()
}
}
}

K/V server部分的代码就完成了,但是不出意外就报错了,在测试时出现了logs were not trimmed的错误,在检测了没有死锁和持锁接发通道消息等问题后,发现应该是raft层无法承受高并发而导致clerk无限重发Rpc,因此我们要减少raft发送Rpc的数量并且让clerk在失败时等待一会

在start中,让Raft等待一段时间,积累多个请求后一次发送,避免发送过多的Rpc

1
2
3
4
5
6
7
8
9
func (rf *Raft) Start(command interface{}) (int, int, bool) {
...
rf.mu.Lock()
defer func() {
rf.ResetHeartTimer(15)
rf.mu.Unlock()
}()
...
}

减少Raft中发送InstallSnapshot RPC的数量,原来在handleAppendEntries中,如果发现follower需要的日志项被快照截断则立即调用协程发送快照。我们将对这种情况都做一定修改,让其仅设置rf.nextIndex[server] = rf.lastIncludedIndex,让leader在下一次心跳时再调用协程发送快照。

除此之外我们让clerk先sleep再重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (ck *Clerk) Get(key string) string {
...
for {
...
if !ok || getReply.Err == ErrWrongLeader {
ck.leaderId = (ck.leaderId + 1) % len(ck.servers) // try next server if not ok or wrong leader
time.Sleep(50 * time.Millisecond)
continue
}
switch getReply.Err {
case ErrChanClosed:
time.Sleep(time.Microsecond * 50)
continue
case ErrHandleOpTimeout:
time.Sleep(time.Microsecond * 50)
continue
...
}
}
}

测试结果如下: