Lab 链接:https://pdos.csail.mit.edu/6.824/labs/lab-shard.html

在之前的Lab4中,利用Raft实现了简单的K/V server。而在Lab5中,会更进一步,基于Raft实现一个分片数据库。

分片数据库的组成结构是:1个controller + 多个replica groups。controller将数据进行分片, 存储在不同的集群上, 每个集群都是一个raft集群, controller负责管理分片, 也就是管理配置项。一个分片由一个组进行处理,一对一的关系。而一个组可以用来处理多个分片,也因此加入组只要不是超过分片的数量,加入的组可以使系统的性能达到提升。


5A

Lab5A是实现分片数据库的控制器。在server.go中有着config的构造:

1
2
3
4
5
type Config struct {
Num int // config number
Shards [NShards]int // shard -> gid
Groups map[int][]string // gid -> servers[]
}

其中,Num代表配置的编号,Shards代表分片和组的映射关系,Groups代表组和server的映射关系。

控制器有着以下4种RPC:

  • Join: 向系统中加入一个新的组,创建一个新的配置,并且进行负载均衡
  • Leave: Remove一个组,并且进行负载均衡
  • Move: 将一个分片分配给一个指定的组
  • Query: 查询配置版本,如果给定的版本号不存在切片下标中,则直接返回最新配置版本。

5A 代码实现

Lab5A的实现与Lab4的实现类似,基本框架也一致,只是将Put、Get等操作换成了Join等。
在Rpc结构体中,只需在其给定的结构上加上

1
2
   RpcSeq   uint64
ClientId int64

即可。

在Client.go中需要实现Join等函数,均是借助Rpc调用ShardCtrler对应的函数即可,以Query为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (ck *Clerk) Query(num int) Config {
args := QueryArgs{Num: num, ClientId: ck.clientId, RpcSeq: ck.rpcSeq}
ck.rpcSeq++
// Your code here.
for {
// try each known server.
reply := QueryReply{}
ok := ck.servers[ck.leaderId].Call("ShardCtrler.Query", &args, &reply)
if !ok || reply.Err == ErrWrongLeader {
ck.leaderId = (ck.leaderId + 1) % len(ck.servers) // try next server if not ok or wrong leader
time.Sleep(RetryInterval)
continue
}
switch reply.Err {
case ErrChanClosed:
time.Sleep(RetryInterval)
continue
case ErrHandleOpTimeout:
time.Sleep(RetryInterval)
continue
}
if ok {
return reply.Config
}

}
}

Server的结构与Lab4的Server结构类似,只是将Put、Get等操作换成了Join等。以Join为例:在Join函数中调用OpHandler,OpHandler调用Raft协议的Start将操作作为一个日志进行共识,利用ApplyHandler不断轮询是否有commited的log待apply。整体的逻辑就是这样,主要是4种操作的具体实现。

5A Join

Join创建一个新的配置,将新的组添加到配置中,并且进行负载均衡。
注意:Go的map的复制是浅复制, 复制一个map时,只是复制了对底层数据结构的引用,而不是底层数据本身的副本。因此我们不能直接用=来对新配置的map进行赋值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (sc *ShardCtrler) JoinNewGroups(servers map[int][]string) Config {
lastConfig := sc.configs[len(sc.configs)-1]
newGroups := make(map[int][]string)

for gid, serverList := range lastConfig.Groups {
newGroups[gid] = serverList
}
for gid, serverList := range servers {
newGroups[gid] = serverList
}

// GroupMap: groupId -> shards
GroupMap := make(map[int]int)
for gid := range newGroups {
GroupMap[gid] = 0
}
for _, gid := range lastConfig.Shards {
if gid != 0 {
GroupMap[gid]++
}
}
if len(GroupMap) == 0 {
return Config{
Num: lastConfig.Num + 1,
Shards: [10]int{},
Groups: newGroups,
}
}
return Config{
Num: lastConfig.Num + 1,
Shards: sc.loadBalance(GroupMap, lastConfig.Shards),
Groups: newGroups,
}
}

5A Leave

将指定的组从配置中删除,进行负载均衡,返回一个新的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (sc *ShardCtrler) RemoveGroups(gids []int) Config {
lastConfig := sc.configs[len(sc.configs)-1]
newGroups := make(map[int][]string)

RemoveMap := make(map[int]bool)
for _, gid := range gids {
RemoveMap[gid] = true
}
for gid, serverList := range lastConfig.Groups {
newGroups[gid] = serverList
}

for _, leaveGid := range gids {
delete(newGroups, leaveGid)
}

GroupMap := make(map[int]int)
var newShard [10]int
copy(newShard[:], lastConfig.Shards[:])
// newShard := lastConfig.Shards
for gid := range newGroups {
if !RemoveMap[gid] {
GroupMap[gid] = 0
}
}

for shard, gid := range lastConfig.Shards {
if gid != 0 {
if RemoveMap[gid] {
newShard[shard] = 0
} else {
GroupMap[gid]++
}
}
}

if len(GroupMap) == 0 {
return Config{
Num: lastConfig.Num + 1,
Shards: [10]int{},
Groups: newGroups,
}
}
return Config{
Num: lastConfig.Num + 1,
Shards: sc.loadBalance(GroupMap, newShard),
Groups: newGroups,
}
}

5A Move

将指定的分片移动到指定的组,返回一个新的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (sc *ShardCtrler) MoveShards(shard int, gid int) Config {
lastConfig := sc.configs[len(sc.configs)-1]
newGroups := make(map[int][]string)

var newShard [10]int
copy(newShard[:], lastConfig.Shards[:])
newShard[shard] = gid // move shard to gid.

for gid, servers := range lastConfig.Groups {
newGroups[gid] = servers
}

return Config{
Num: lastConfig.Num + 1,
Shards: newShard,
Groups: newGroups,
}
}

5A Query

返回指定编号的配置,或者最新配置

1
2
3
4
5
6
func (sc *ShardCtrler) QueryConfig(num int) Config {
if num == -1 || num >= len(sc.configs) {
return sc.configs[len(sc.configs)-1] // return the latest config.
}
return sc.configs[num]
}

这里在测试的时候有一个很奇怪的bug,在执行move操作时,会报错 test_test.go:152: Move should increase Config.Num 在对应Test中,发现Move操作后,Query查询得到的是一个空的配置,而如果在中间加一个Query,结果又会正确,其他test中的Query操作也正常。
debug了很久也没有找到为什么会这样,最后在server 的query做出了更改如下:

1
2
3
4
5
6
7
8
func (sc *ShardCtrler) Query(args *QueryArgs, reply *QueryReply) {
...
if res.Err == ErrWrongLeader {
reply.WrongLeader = true
}
reply.Config = res.lastConfig
...
}

改为:

1
2
3
4
5
6
7
8
9
10
11
12
13
func (sc *ShardCtrler) Query(args *QueryArgs, reply *QueryReply) {
// Your code here.
...
if res.Err == ErrWrongLeader {
reply.WrongLeader = true
}
if opArgs.Num == -1 || opArgs.Num >= len(sc.configs) {
reply.Config = sc.configs[len(sc.configs)-1]
} else {
reply.Config = sc.configs[opArgs.Num]
}
...
}

保证如果正确返回ok了,不会返回一个空的config。但是为什么在move操作后,Query会返回一个空的config,还是没有找到原因,后面会再理一理。

5A 负载均衡

这一部分算是5A中比较难的一部分,负载均衡指每个组负责的分片数量要尽可能一致,并且在不同副本中要一致,避免出现一致性问题。整体逻辑如下:

  • 根据负载以及gid对组进行排序,避免一致性问题
  • 对排序结果进行处理,将过载的组的分片移动到负载较轻的组中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
func (sc *ShardCtrler) loadBalance(GroupMap map[int]int, lastShards [NShards]int) [NShards]int {
length := len(GroupMap)
ave := NShards / length
remainder := NShards % length
sortGids := sortGroupShard(GroupMap)

for i := 0; i < length; i++ {
target := ave

if !moreAllocations(length, remainder, i) {
target = ave + 1
}

if GroupMap[sortGids[i]] > target {
overLoadGid := sortGids[i]
changeNum := GroupMap[overLoadGid] - target
for shard, gid := range lastShards {
if changeNum <= 0 {
break
}
if gid == overLoadGid {
lastShards[shard] = 0
changeNum--
}
}
GroupMap[overLoadGid] = target
}
}
for i := 0; i < length; i++ {
target := ave
if !moreAllocations(length, remainder, i) {
target = ave + 1
}

if GroupMap[sortGids[i]] < target {
freeGid := sortGids[i]
changeNum := target - GroupMap[freeGid]
for shard, gid := range lastShards {
if changeNum <= 0 {
break
}
if gid == 0 {
lastShards[shard] = freeGid
changeNum--
}
}
GroupMap[freeGid] = target
}

}
return lastShards
}

func sortGroupShard(GroupMap map[int]int) []int {
length := len(GroupMap)

gidSlice := make([]int, 0, length)

for gid, _ := range GroupMap {
gidSlice = append(gidSlice, gid)
}

for i := 0; i < length-1; i++ {
for j := length - 1; j > i; j-- {

if GroupMap[gidSlice[j]] < GroupMap[gidSlice[j-1]] || (GroupMap[gidSlice[j]] == GroupMap[gidSlice[j-1]] && gidSlice[j] < gidSlice[j-1]) {
gidSlice[j], gidSlice[j-1] = gidSlice[j-1], gidSlice[j]
}
}
}
return gidSlice
}

func moreAllocations(length int, remainder int, i int) bool {
if i < length-remainder {
return true
} else {
return false
}
}

测试结果如下: shardkv的内容会放在5B中一起解决
img

5B Shard Movement

在lab4的基础上,利用5A的分片控制器构造一个分片的容错数据库,主要是添加了有关配置的一系列操作。整体架构如下:

client客户端,将发送的请求利用Key2Shard进行分片,分到具体某个组下的server,然后这个server如果是leader则再利用自身的raft组进行共识,利用共识对整个server集群同步当前组对分片的操作,保持容错,而整个系统的集群则是通过lab5A的分片控制器来保证。

client端,与前几个lab差不多,在原有基础上加上rpcSeq序列号保证去重即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (ck *Clerk) PutAppend(key string, value string, op string) {
ck.rpcSeq++
for {
args := PutAppendArgs{
Key: key,
Value: value,
Op: Operation(op),
ClientId: ck.clientId,
RequestId: ck.rpcSeq,
}
shard := key2shard(key)
gid := ck.config.Shards[shard]
if servers, ok := ck.config.Groups[gid]; ok {
for si := 0; si < len(servers); si++ {
srv := ck.make_end(servers[si])
var reply PutAppendReply
ok := srv.Call("ShardKV.PutAppend", &args, &reply)
if ok && reply.Err == OK {
return
}
if ok && reply.Err == ErrWrongGroup {
break
}
}
}

time.Sleep(100 * time.Millisecond)
ck.config = ck.sm.Query(-1)
}
}

server端的大体结构与之前的lab类似,但一些细节也需要更改

在循环检测是否有msgaplly的过程中,我们需要追加对于一个group内对进行对切片操作(增加GC)的共识,以及配置更新的共识

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// ConfigDetectedLoop 配置检测
func (kv *ShardKV) ConfigDetectedLoop() {
kv.mu.Lock()

curConfig := kv.Config
rf := kv.rf
kv.mu.Unlock()

for !kv.killed() {
// only leader needs to deal with configuration tasks
if _, isLeader := rf.GetState(); !isLeader {
time.Sleep(UpConfigLoopInterval)
continue
}
kv.mu.Lock()

// 判断是否把不属于自己的部分给分给别人了
if !kv.allSent() {
SeqMap := make(map[int64]int)
for k, v := range kv.SeqMap {
SeqMap[k] = v
}
for shardId, gid := range kv.LastConfig.Shards {

// 将最新配置里不属于自己的分片分给别人
if gid == kv.gid && kv.Config.Shards[shardId] != kv.gid && kv.shardsPersist[shardId].ConfigNum < kv.Config.Num {

sendDate := kv.cloneShard(kv.Config.Num, kv.shardsPersist[shardId].KvMap)

args := SendShardArg{
LastAppliedRequestId: SeqMap,
ShardId: shardId,
Shard: sendDate,
ClientId: int64(gid),
RequestId: kv.Config.Num,
}

// shardId -> gid -> server names
serversList := kv.Config.Groups[kv.Config.Shards[shardId]]
servers := make([]*labrpc.ClientEnd, len(serversList))
for i, name := range serversList {
servers[i] = kv.makeEnd(name)
}

// 开启协程对每个客户端发送切片(这里发送的应是别的组别,自身的共识组需要raft进行状态修改)
go func(servers []*labrpc.ClientEnd, args *SendShardArg) {

index := 0
start := time.Now()
for {
var reply AddShardReply
// 对自己的共识组内进行add
ok := servers[index].Call("ShardKV.AddShard", args, &reply)

// 如果给予切片成功,或者时间超时,这两种情况都需要进行GC掉不属于自己的切片
if ok && reply.Err == OK || time.Now().Sub(start) >= 5*time.Second {

// 如果成功
kv.mu.Lock()
command := Op{
OpType: RemoveShardType,
ClientId: int64(kv.gid),
SeqId: kv.Config.Num,
ShardId: args.ShardId,
}
kv.mu.Unlock()
kv.startCommand(command, RemoveShardsTimeout)
break
}
index = (index + 1) % len(servers)
if index == 0 {
time.Sleep(UpConfigLoopInterval)
}
}
}(servers, &args)
}
}
kv.mu.Unlock()
time.Sleep(UpConfigLoopInterval)
continue
}
if !kv.allReceived() {
kv.mu.Unlock()
time.Sleep(UpConfigLoopInterval)
continue
}

// current configuration is configured, poll for the next configuration
curConfig = kv.Config
sck := kv.sck
kv.mu.Unlock()

newConfig := sck.Query(curConfig.Num + 1)
if newConfig.Num != curConfig.Num+1 {
time.Sleep(UpConfigLoopInterval)
continue
}

command := Op{
OpType: UpConfigType,
ClientId: int64(kv.gid),
SeqId: newConfig.Num,
UpConfig: newConfig,
}
kv.startCommand(command, UpConfigTimeout)
}

}

这里同样是会涉及到一个去重的问题,相比客户端RPC通过在client端进行rpcSeq自增,关于的配置的自增,只要利用配置号进行就可以,只要配置更新,那么一系列的操作就都会与最新的配置号有关。

Raft部分的Rpc请求如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
func (kv *ShardKV) Get(args *GetArgs, reply *GetReply) {
shardId := key2shard(args.Key)
kv.mu.Lock()
if kv.Config.Shards[shardId] != kv.gid {
reply.Err = ErrWrongGroup
} else if kv.shardsPersist[shardId].KvMap == nil {
reply.Err = ShardNotArrived
}
kv.mu.Unlock()
if reply.Err == ErrWrongGroup || reply.Err == ShardNotArrived {
return
}
command := Op{
OpType: GetType,
ClientId: args.ClientId,
SeqId: args.RequestId,
Key: args.Key,
}
err := kv.startCommand(command, GetTimeout)
if err != OK {
reply.Err = err
return
}
kv.mu.Lock()

if kv.Config.Shards[shardId] != kv.gid {
reply.Err = ErrWrongGroup
} else if kv.shardsPersist[shardId].KvMap == nil {
reply.Err = ShardNotArrived
} else {
reply.Err = OK
reply.Value = kv.shardsPersist[shardId].KvMap[args.Key]
}
kv.mu.Unlock()
return
}

func (kv *ShardKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
shardId := key2shard(args.Key)
kv.mu.Lock()
if kv.Config.Shards[shardId] != kv.gid {
reply.Err = ErrWrongGroup
} else if kv.shardsPersist[shardId].KvMap == nil {
reply.Err = ShardNotArrived
}
kv.mu.Unlock()
if reply.Err == ErrWrongGroup || reply.Err == ShardNotArrived {
return
}
command := Op{
OpType: args.Op,
ClientId: args.ClientId,
SeqId: args.RequestId,
Key: args.Key,
Value: args.Value,
}
reply.Err = kv.startCommand(command, AppOrPutTimeout)
return
}

// AddShard move shards from caller to this server
func (kv *ShardKV) AddShard(args *SendShardArg, reply *AddShardReply) {
command := Op{
OpType: AddShardType,
ClientId: args.ClientId,
SeqId: args.RequestId,
ShardId: args.ShardId,
Shard: args.Shard,
SeqMap: args.LastAppliedRequestId,
}
reply.Err = kv.startCommand(command, AddShardsTimeout)
return
}

最后是在循环中对于切片、配置更新等操作做的handler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 更新最新的config的handler
func (kv *ShardKV) upConfigHandler(op Op) {
curConfig := kv.Config
upConfig := op.UpConfig
if curConfig.Num >= upConfig.Num {
return
}
for shard, gid := range upConfig.Shards {
if gid == kv.gid && curConfig.Shards[shard] == 0 {
// 如果更新的配置的gid与当前的配置的gid一样且分片为0(未分配)
kv.shardsPersist[shard].KvMap = make(map[string]string)
kv.shardsPersist[shard].ConfigNum = upConfig.Num
}
}
kv.LastConfig = curConfig
kv.Config = upConfig

}

func (kv *ShardKV) addShardHandler(op Op) {
// this shard is added or it is an outdated command
if kv.shardsPersist[op.ShardId].KvMap != nil || op.Shard.ConfigNum < kv.Config.Num {
return
}

kv.shardsPersist[op.ShardId] = kv.cloneShard(op.Shard.ConfigNum, op.Shard.KvMap)

for clientId, seqId := range op.SeqMap {
if r, ok := kv.SeqMap[clientId]; !ok || r < seqId {
kv.SeqMap[clientId] = seqId
}
}
}

func (kv *ShardKV) removeShardHandler(op Op) {
if op.SeqId < kv.Config.Num {
return
}
kv.shardsPersist[op.ShardId].KvMap = nil
kv.shardsPersist[op.ShardId].ConfigNum = op.SeqId
}

结果如下:

img