lab2官网 : https://pdos.csail.mit.edu/6.824/labs/lab-kvsrv.html

这是一个新的lab,但是感觉是官方觉得直接上raft太难了,所以加了个稍微简单点的。主要任务是实现一个key/value服务,该服务应该能够存储和检索键值对,并支持并发访问。


一、 概述

lab2 只需完成src/kvsrv文件夹下的代码,不需要修改其他文件。首先研究一下lab给出的基本框架,可以看到给了我们4种rpc报文:PutAppendArgs、PutAppendReply、GetArgs、GetReply,前两者负责put和append的通信,后两者负责Get的通信。而对于Server和Client,lab给出了基本框架,我们只需要实现其中的PutGetAppend三个方法即可。

1.1 client

从Lab官网的提示我们可以看到,要求我们能够分辨不同的客户端,并且能够处理重复的rpc请求,特别是对append的处理,因此我们引入clientId int64 与rpcId uint32两个变量来区分不同的客户端和rpc请求。新的Clerk结构如下:

1
2
3
4
5
6
type Clerk struct {
server *labrpc.ClientEnd
// You will have to modify this struct.
clientId int64
rpcId uint32 // rpc id for each client request
}

PutAppendArgs 报文结构如下:

1
2
3
4
5
6
7
8
9
10
type PutAppendArgs struct {
Key string
Value string
// You'll have to add definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
ClientId int64
RpcId uint32
}

nrand()函数是lab给我们提供的随机生成int64类型数的函数,我们可以利用他来生成ClientId,而RpcId则可以由我们自行递增。

Client端的几个函数都很简单,调用一下call即可,以PutApepend这个函数为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (ck *Clerk) PutAppend(key string, value string, op string) string {
// You will have to modify this function.

ck.rpcId++
// rpcId++ // increment rpc id for each RPC request
putAppendArgs := PutAppendArgs{key, value, ck.clientId, ck.rpcId}
putAppendReply := PutAppendReply{}
for {
ok := ck.server.Call("KVServer."+op, &putAppendArgs, &putAppendReply)
if ok {
return putAppendReply.Value
}
}
}

1.2 server

在server端,我们需要利用两个map,一个map用来存储K\V键值对,另外一个用来存储client 和他的上次所发的Rpc的关系

1
2
3
4
5
6
7
8
9
10
11
12
13
type LastRpc struct {
value string
rpcId uint32
}

type KVServer struct {
mu sync.Mutex

// Your definitions here.
kvMap map[string]string
lastRpc map[int64]LastRpc // clientId -> lastRpcId
}

tips: 为什么这里需要存储value呢?因为在append时,返回的是旧的value值,而如果遇到的是重复的报文,我们返回的依然是第一次执行时返回的旧value,而不是K\Vmap中的最新值

设计了这个结构后,其他的函数的实现就比较简单了,我只贴一下append 的代码,对于并发的控制,我就通过一把大锁来解决。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (kv *KVServer) Append(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.

kv.mu.Lock()
defer kv.mu.Unlock()
curClientId := args.ClientId
curRpcId := args.RpcId
lastRpc, ok := kv.lastRpc[curClientId]
if ok && lastRpc.rpcId >= curRpcId { // duplicate rpc request, ignore it.
reply.Value = lastRpc.value
return
}
curValue, ok := kv.kvMap[args.Key]
if ok {
kv.kvMap[args.Key] = curValue + args.Value
reply.Value = curValue // return the old value after append.
} else {
kv.kvMap[args.Key] = args.Value // if the key does not exist, just append the value.
reply.Value = ""
}
kv.lastRpc[curClientId] = LastRpc{curValue, curRpcId}
}

二 、一些问题

2.1

测试时,一个客户端和多个客户端的测试会出现 linearizability check timed out, assuming history is ok 的warn

试着找了找原因,在test.go下

1
2
3
4
5
6
res, info := porcupine.CheckOperationsVerbose(models.KvModel, opLog.Read(), linearizabilityCheckTimeout)
if res == porcupine.Illegal {
...
} else if res == porcupine.Unknown {
fmt.Println("info: linearizability check timed out, assuming history is ok")
}

porcupine.Unknown 我看了下 porcupine 的源码,porcupine最后会调用checkParallel函数来并行的检测每个goroutine的结果,如果1秒没有返回,则会回报TimeOut,打印上述的这个warn,我将test中的linearizabilityCheckTimeout改为2s,则不再出现此问题:

更深层的原因,推测是设计的结构还是不够精简,导致系统运行的效率较低,后续可以改良一下

2.2

关于lab中要求实现线性一致性,由于lab2只涉及到了一个server,没有用于容错的副本server,所以对于线性一致性的的实现我们只需要记得加锁、合理处理重复报文即可。更复杂线性一致性涉及的可能会在后续lab中涉及