Lab 链接:https://pdos.csail.mit.edu/6.824/labs/lab-shard.html
在之前的Lab4中,利用Raft实现了简单的K/V server。而在Lab5中,会更进一步,基于Raft实现一个分片数据库。
分片数据库的组成结构是:1个controller + 多个replica groups。controller将数据进行分片, 存储在不同的集群上, 每个集群都是一个raft集群, controller负责管理分片, 也就是管理配置项。一个分片由一个组进行处理,一对一的关系。而一个组可以用来处理多个分片,也因此加入组只要不是超过分片的数量,加入的组可以使系统的性能达到提升。
5A
Lab5A是实现分片数据库的控制器。在server.go中有着config的构造:
1 2 3 4 5 type Config struct { Num int // config number Shards [NShards]int // shard -> gid Groups map[int][]string // gid -> servers[] }
其中,Num代表配置的编号,Shards代表分片和组的映射关系,Groups代表组和server的映射关系。
控制器有着以下4种RPC:
Join: 向系统中加入一个新的组,创建一个新的配置,并且进行负载均衡
Leave: Remove一个组,并且进行负载均衡
Move: 将一个分片分配给一个指定的组
Query: 查询配置版本,如果给定的版本号不存在切片下标中,则直接返回最新配置版本。
5A 代码实现
Lab5A的实现与Lab4的实现类似,基本框架也一致,只是将Put、Get等操作换成了Join等。
在Rpc结构体中,只需在其给定的结构上加上
1 2 RpcSeq uint64 ClientId int64
即可。
在Client.go中需要实现Join等函数,均是借助Rpc调用ShardCtrler对应的函数即可,以Query为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func (ck *Clerk) Query(num int) Config { args := QueryArgs{Num: num, ClientId: ck.clientId, RpcSeq: ck.rpcSeq} ck.rpcSeq++ // Your code here. for { // try each known server. reply := QueryReply{} ok := ck.servers[ck.leaderId].Call("ShardCtrler.Query", &args, &reply) if !ok || reply.Err == ErrWrongLeader { ck.leaderId = (ck.leaderId + 1) % len(ck.servers) // try next server if not ok or wrong leader time.Sleep(RetryInterval) continue } switch reply.Err { case ErrChanClosed: time.Sleep(RetryInterval) continue case ErrHandleOpTimeout: time.Sleep(RetryInterval) continue } if ok { return reply.Config } } }
Server的结构与Lab4的Server结构类似,只是将Put、Get等操作换成了Join等。以Join为例:在Join函数中调用OpHandler,OpHandler调用Raft协议的Start将操作作为一个日志进行共识,利用ApplyHandler不断轮询是否有commited的log待apply。整体的逻辑就是这样,主要是4种操作的具体实现。
5A Join
Join创建一个新的配置,将新的组添加到配置中,并且进行负载均衡。
注意:Go的map的复制是浅复制, 复制一个map时,只是复制了对底层数据结构的引用,而不是底层数据本身的副本。因此我们不能直接用=来对新配置的map进行赋值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func (sc *ShardCtrler) JoinNewGroups(servers map[int][]string) Config { lastConfig := sc.configs[len(sc.configs)-1] newGroups := make(map[int][]string) for gid, serverList := range lastConfig.Groups { newGroups[gid] = serverList } for gid, serverList := range servers { newGroups[gid] = serverList } // GroupMap: groupId -> shards GroupMap := make(map[int]int) for gid := range newGroups { GroupMap[gid] = 0 } for _, gid := range lastConfig.Shards { if gid != 0 { GroupMap[gid]++ } } if len(GroupMap) == 0 { return Config{ Num: lastConfig.Num + 1, Shards: [10]int{}, Groups: newGroups, } } return Config{ Num: lastConfig.Num + 1, Shards: sc.loadBalance(GroupMap, lastConfig.Shards), Groups: newGroups, } }
5A Leave
将指定的组从配置中删除,进行负载均衡,返回一个新的配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 func (sc *ShardCtrler) RemoveGroups(gids []int) Config { lastConfig := sc.configs[len(sc.configs)-1] newGroups := make(map[int][]string) RemoveMap := make(map[int]bool) for _, gid := range gids { RemoveMap[gid] = true } for gid, serverList := range lastConfig.Groups { newGroups[gid] = serverList } for _, leaveGid := range gids { delete(newGroups, leaveGid) } GroupMap := make(map[int]int) var newShard [10]int copy(newShard[:], lastConfig.Shards[:]) // newShard := lastConfig.Shards for gid := range newGroups { if !RemoveMap[gid] { GroupMap[gid] = 0 } } for shard, gid := range lastConfig.Shards { if gid != 0 { if RemoveMap[gid] { newShard[shard] = 0 } else { GroupMap[gid]++ } } } if len(GroupMap) == 0 { return Config{ Num: lastConfig.Num + 1, Shards: [10]int{}, Groups: newGroups, } } return Config{ Num: lastConfig.Num + 1, Shards: sc.loadBalance(GroupMap, newShard), Groups: newGroups, } }
5A Move
将指定的分片移动到指定的组,返回一个新的配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func (sc *ShardCtrler) MoveShards(shard int, gid int) Config { lastConfig := sc.configs[len(sc.configs)-1] newGroups := make(map[int][]string) var newShard [10]int copy(newShard[:], lastConfig.Shards[:]) newShard[shard] = gid // move shard to gid. for gid, servers := range lastConfig.Groups { newGroups[gid] = servers } return Config{ Num: lastConfig.Num + 1, Shards: newShard, Groups: newGroups, } }
5A Query
返回指定编号的配置,或者最新配置
1 2 3 4 5 6 func (sc *ShardCtrler) QueryConfig(num int) Config { if num == -1 || num >= len(sc.configs) { return sc.configs[len(sc.configs)-1] // return the latest config. } return sc.configs[num] }
这里在测试的时候有一个很奇怪的bug,在执行move操作时,会报错 test_test.go:152: Move should increase Config.Num
在对应Test中,发现Move操作后,Query查询得到的是一个空的配置,而如果在中间加一个Query,结果又会正确,其他test中的Query操作也正常。
debug了很久也没有找到为什么会这样,最后在server 的query做出了更改如下:
1 2 3 4 5 6 7 8 func (sc *ShardCtrler) Query(args *QueryArgs, reply *QueryReply) { ... if res.Err == ErrWrongLeader { reply.WrongLeader = true } reply.Config = res.lastConfig ... }
改为:
1 2 3 4 5 6 7 8 9 10 11 12 13 func (sc *ShardCtrler) Query(args *QueryArgs, reply *QueryReply) { // Your code here. ... if res.Err == ErrWrongLeader { reply.WrongLeader = true } if opArgs.Num == -1 || opArgs.Num >= len(sc.configs) { reply.Config = sc.configs[len(sc.configs)-1] } else { reply.Config = sc.configs[opArgs.Num] } ... }
保证如果正确返回ok了,不会返回一个空的config。但是为什么在move操作后,Query会返回一个空的config,还是没有找到原因,后面会再理一理。
5A 负载均衡
这一部分算是5A中比较难的一部分,负载均衡指每个组负责的分片数量要尽可能一致,并且在不同副本中要一致,避免出现一致性问题。整体逻辑如下:
根据负载以及gid对组进行排序,避免一致性问题
对排序结果进行处理,将过载的组的分片移动到负载较轻的组中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 func (sc *ShardCtrler) loadBalance(GroupMap map[int]int, lastShards [NShards]int) [NShards]int { length := len(GroupMap) ave := NShards / length remainder := NShards % length sortGids := sortGroupShard(GroupMap) for i := 0; i < length; i++ { target := ave if !moreAllocations(length, remainder, i) { target = ave + 1 } if GroupMap[sortGids[i]] > target { overLoadGid := sortGids[i] changeNum := GroupMap[overLoadGid] - target for shard, gid := range lastShards { if changeNum <= 0 { break } if gid == overLoadGid { lastShards[shard] = 0 changeNum-- } } GroupMap[overLoadGid] = target } } for i := 0; i < length; i++ { target := ave if !moreAllocations(length, remainder, i) { target = ave + 1 } if GroupMap[sortGids[i]] < target { freeGid := sortGids[i] changeNum := target - GroupMap[freeGid] for shard, gid := range lastShards { if changeNum <= 0 { break } if gid == 0 { lastShards[shard] = freeGid changeNum-- } } GroupMap[freeGid] = target } } return lastShards } func sortGroupShard(GroupMap map[int]int) []int { length := len(GroupMap) gidSlice := make([]int, 0, length) for gid, _ := range GroupMap { gidSlice = append(gidSlice, gid) } for i := 0; i < length-1; i++ { for j := length - 1; j > i; j-- { if GroupMap[gidSlice[j]] < GroupMap[gidSlice[j-1]] || (GroupMap[gidSlice[j]] == GroupMap[gidSlice[j-1]] && gidSlice[j] < gidSlice[j-1]) { gidSlice[j], gidSlice[j-1] = gidSlice[j-1], gidSlice[j] } } } return gidSlice } func moreAllocations(length int, remainder int, i int) bool { if i < length-remainder { return true } else { return false } }
测试结果如下: shardkv的内容会放在5B中一起解决
5B Shard Movement
在lab4的基础上,利用5A的分片控制器构造一个分片的容错数据库,主要是添加了有关配置的一系列操作。整体架构如下:
client客户端,将发送的请求利用Key2Shard进行分片,分到具体某个组下的server,然后这个server如果是leader则再利用自身的raft组进行共识,利用共识对整个server集群同步当前组对分片的操作,保持容错,而整个系统的集群则是通过lab5A的分片控制器来保证。
在client 端,与前几个lab差不多,在原有基础上加上rpcSeq序列号保证去重即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func (ck *Clerk) PutAppend(key string, value string, op string) { ck.rpcSeq++ for { args := PutAppendArgs{ Key: key, Value: value, Op: Operation(op), ClientId: ck.clientId, RequestId: ck.rpcSeq, } shard := key2shard(key) gid := ck.config.Shards[shard] if servers, ok := ck.config.Groups[gid]; ok { for si := 0; si < len(servers); si++ { srv := ck.make_end(servers[si]) var reply PutAppendReply ok := srv.Call("ShardKV.PutAppend", &args, &reply) if ok && reply.Err == OK { return } if ok && reply.Err == ErrWrongGroup { break } } } time.Sleep(100 * time.Millisecond) ck.config = ck.sm.Query(-1) } }
server 端的大体结构与之前的lab类似,但一些细节也需要更改
在循环检测是否有msgaplly的过程中,我们需要追加对于一个group内对进行对切片操作(增加GC)的共识,以及配置更新的共识
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 // ConfigDetectedLoop 配置检测 func (kv *ShardKV) ConfigDetectedLoop() { kv.mu.Lock() curConfig := kv.Config rf := kv.rf kv.mu.Unlock() for !kv.killed() { // only leader needs to deal with configuration tasks if _, isLeader := rf.GetState(); !isLeader { time.Sleep(UpConfigLoopInterval) continue } kv.mu.Lock() // 判断是否把不属于自己的部分给分给别人了 if !kv.allSent() { SeqMap := make(map[int64]int) for k, v := range kv.SeqMap { SeqMap[k] = v } for shardId, gid := range kv.LastConfig.Shards { // 将最新配置里不属于自己的分片分给别人 if gid == kv.gid && kv.Config.Shards[shardId] != kv.gid && kv.shardsPersist[shardId].ConfigNum < kv.Config.Num { sendDate := kv.cloneShard(kv.Config.Num, kv.shardsPersist[shardId].KvMap) args := SendShardArg{ LastAppliedRequestId: SeqMap, ShardId: shardId, Shard: sendDate, ClientId: int64(gid), RequestId: kv.Config.Num, } // shardId -> gid -> server names serversList := kv.Config.Groups[kv.Config.Shards[shardId]] servers := make([]*labrpc.ClientEnd, len(serversList)) for i, name := range serversList { servers[i] = kv.makeEnd(name) } // 开启协程对每个客户端发送切片(这里发送的应是别的组别,自身的共识组需要raft进行状态修改) go func(servers []*labrpc.ClientEnd, args *SendShardArg) { index := 0 start := time.Now() for { var reply AddShardReply // 对自己的共识组内进行add ok := servers[index].Call("ShardKV.AddShard", args, &reply) // 如果给予切片成功,或者时间超时,这两种情况都需要进行GC掉不属于自己的切片 if ok && reply.Err == OK || time.Now().Sub(start) >= 5*time.Second { // 如果成功 kv.mu.Lock() command := Op{ OpType: RemoveShardType, ClientId: int64(kv.gid), SeqId: kv.Config.Num, ShardId: args.ShardId, } kv.mu.Unlock() kv.startCommand(command, RemoveShardsTimeout) break } index = (index + 1) % len(servers) if index == 0 { time.Sleep(UpConfigLoopInterval) } } }(servers, &args) } } kv.mu.Unlock() time.Sleep(UpConfigLoopInterval) continue } if !kv.allReceived() { kv.mu.Unlock() time.Sleep(UpConfigLoopInterval) continue } // current configuration is configured, poll for the next configuration curConfig = kv.Config sck := kv.sck kv.mu.Unlock() newConfig := sck.Query(curConfig.Num + 1) if newConfig.Num != curConfig.Num+1 { time.Sleep(UpConfigLoopInterval) continue } command := Op{ OpType: UpConfigType, ClientId: int64(kv.gid), SeqId: newConfig.Num, UpConfig: newConfig, } kv.startCommand(command, UpConfigTimeout) } }
这里同样是会涉及到一个去重的问题,相比客户端RPC通过在client端进行rpcSeq自增,关于的配置的自增,只要利用配置号进行就可以,只要配置更新,那么一系列的操作就都会与最新的配置号有关。
Raft部分的Rpc请求如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 func (kv *ShardKV) Get(args *GetArgs, reply *GetReply) { shardId := key2shard(args.Key) kv.mu.Lock() if kv.Config.Shards[shardId] != kv.gid { reply.Err = ErrWrongGroup } else if kv.shardsPersist[shardId].KvMap == nil { reply.Err = ShardNotArrived } kv.mu.Unlock() if reply.Err == ErrWrongGroup || reply.Err == ShardNotArrived { return } command := Op{ OpType: GetType, ClientId: args.ClientId, SeqId: args.RequestId, Key: args.Key, } err := kv.startCommand(command, GetTimeout) if err != OK { reply.Err = err return } kv.mu.Lock() if kv.Config.Shards[shardId] != kv.gid { reply.Err = ErrWrongGroup } else if kv.shardsPersist[shardId].KvMap == nil { reply.Err = ShardNotArrived } else { reply.Err = OK reply.Value = kv.shardsPersist[shardId].KvMap[args.Key] } kv.mu.Unlock() return } func (kv *ShardKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) { shardId := key2shard(args.Key) kv.mu.Lock() if kv.Config.Shards[shardId] != kv.gid { reply.Err = ErrWrongGroup } else if kv.shardsPersist[shardId].KvMap == nil { reply.Err = ShardNotArrived } kv.mu.Unlock() if reply.Err == ErrWrongGroup || reply.Err == ShardNotArrived { return } command := Op{ OpType: args.Op, ClientId: args.ClientId, SeqId: args.RequestId, Key: args.Key, Value: args.Value, } reply.Err = kv.startCommand(command, AppOrPutTimeout) return } // AddShard move shards from caller to this server func (kv *ShardKV) AddShard(args *SendShardArg, reply *AddShardReply) { command := Op{ OpType: AddShardType, ClientId: args.ClientId, SeqId: args.RequestId, ShardId: args.ShardId, Shard: args.Shard, SeqMap: args.LastAppliedRequestId, } reply.Err = kv.startCommand(command, AddShardsTimeout) return }
最后是在循环中对于切片、配置更新等操作做的handler。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 // 更新最新的config的handler func (kv *ShardKV) upConfigHandler(op Op) { curConfig := kv.Config upConfig := op.UpConfig if curConfig.Num >= upConfig.Num { return } for shard, gid := range upConfig.Shards { if gid == kv.gid && curConfig.Shards[shard] == 0 { // 如果更新的配置的gid与当前的配置的gid一样且分片为0(未分配) kv.shardsPersist[shard].KvMap = make(map[string]string) kv.shardsPersist[shard].ConfigNum = upConfig.Num } } kv.LastConfig = curConfig kv.Config = upConfig } func (kv *ShardKV) addShardHandler(op Op) { // this shard is added or it is an outdated command if kv.shardsPersist[op.ShardId].KvMap != nil || op.Shard.ConfigNum < kv.Config.Num { return } kv.shardsPersist[op.ShardId] = kv.cloneShard(op.Shard.ConfigNum, op.Shard.KvMap) for clientId, seqId := range op.SeqMap { if r, ok := kv.SeqMap[clientId]; !ok || r < seqId { kv.SeqMap[clientId] = seqId } } } func (kv *ShardKV) removeShardHandler(op Op) { if op.SeqId < kv.Config.Num { return } kv.shardsPersist[op.ShardId].KvMap = nil kv.shardsPersist[op.ShardId].ConfigNum = op.SeqId }
结果如下: