MIT 6.824 : Spring 2015 lab3 训练笔记

摘要: 源代码参见我的github:https://github.com/YaoZengzeng/MIT-6.824

Lab3: Paxos-based Key/Value Service

Introduction

在Lab2中我们依靠单个的master view server来获取primary。如果view server不可用(崩溃了或者有网络问题),那么key/value service就不工作了,即使primary和backup都是可用的。而且它在处理server(primary或者backup)的时候还有一个严重的问题,当它们只是短暂地不可达的时候,系统就简单地对阻塞或者宣布它们已经死了,而后一种处理方案是非常昂贵的,因为它需要对完整的数据库进行传输。

在本实验中,我们将使用Paxos来管理key/value存储的备份操作。我们不需要有任何和master view server对应的东西。相反,一系列的replica会以相同的顺序来执行所有来自client的请求,并且使用Paxos来协调。Paxos会在一些replica不可用,或者存在不可靠的网络,或者一部分的replica被隔离在它们自己的网络中的时候,也能正确执行操作。只要Paxos能编排大多数的replica,它就能对client的请求进行处理。那些之前不可用的replica也可以在之后询问Paxos来获取它们遗失的操作。

我们的系统将由以下几部分组成:clients, kvpaxos servers 以及Paxos peers。Clients发送Put(), Append()以及Get() RPCs给key/value server(或者叫kvpaxos servers)。client可以给任何kvpaxos servers发送RPC,并且在遇到故障的时候需要通过发送给其他server进行重试。每个kvpaxos server都包含一个key/value数据库的备份以及一个Paxos peer,处理来自client的Get(),Put()以及Append() RPC。Paxos在每个kvpaxos server中以库的形式存在。每个kvpaxos server通过方法调用和本地的Paxos peer进行交互。不同的Paxos peer通过RPC来和其他peer进行交互,来达成对每个操作的协议。

Paxos库的接口支持无限数目的agreement "instance"。instance按顺序进行标号。每个instance要么处于"decided",要么处于"not yet decided"的状态。一个处于decided状态的instance会有一个value。如果一个instance处于decided状态,那么所有发现该instance已经decided的Paxos peer会对该instance有相同的value。Paxos库接口允许kvpaxos给instance建议一个value,并且让kvpaxos能发现instance是否已经被decided以及该instance的value是多少。

kvpaxos servers会使用Paxos来确认来自client的Put(),Append()以及Get()操作的执行顺序。每次kvpaxos server获取一个Put()/Append()/Get() RPC,它会使用Paxos来让一些Paxos instance的value作为这一操作的描述。instance的序号决定了该操作的执行相对于其他操作的顺序。为了获取Get()对应的结果,kvpaxos首先应当执行在该Get()之前的所有Puts()和Append()s操作。

我们应该认为kvpaxos使用Paxos来实现了对Put/Append/Get操作的日志化。这意味着,每个Paxos instance都是一个log element,而log中操作的顺序就是kvpaxos server将操作写入key/value 数据库的顺序。Paxos会确保所有kvpaxos server都同意这一顺序。

client和server,不同的server之间以及不同的client之间只能通过RPC来交互。比如,一台server上不同的instance是不能共享Go变量或者文件的。

我们的Paxos-based key/value存储系统为了能成为一个严密的系统还有一些问题需要处理。它不能处理崩溃的情况,因为它既没有将key/value数据库的状态也没有将Paxos的状态存储在磁盘上。它需要一组服务器是固定的,所以我们不能对老的server进行替换。最后,它非常慢,因为每个client操作都需要交换很多Paxos message。所有这些问题都要处理。

Part A: Paxos

我们必须实现以下这些接口:

px = paxos.Make(peers []string, me int)

px.Start(seq int, v interface{})  // start agreement on new instance

px.Status(seq int, v interface{})  // get info about an instance

px.Done(seq int)  // ok to forget all instances <= seq

px.Max() int // highest instance seq known, or -1

px.Min() int // instances before this have been forgotten

首先调用Make(peers,me)来创建一个Paxos peer。形参peers包含了所有peer的ports(包括当前这个peer),而me是当前peer在peers数组中的索引。Start(seq, v)让Paxos来启动对instance seq的agreement,并且建议的value为v。Start()必须立即返回,而不需要等待agreement的完成。接着调用Status(seq)来确认Paxos peer是否认为instance已经到达了agreement,如果到达了,那么value是多少。Status访问的是本地的Paxos peer并且应该立即返回,它不能和其他peer进行交互。我们可以调用Status来获取老的instance。

我们的实现必须能同时对多个instance进行agreement。比如,peer同时以不同的序号调用了多个Start(),我们的实现必须为它们并行地运行Paxos协议。我们不应该在instance i的agreement完成之后再启动对instance i+1的协议。每个instance都应该独立地执行Paxos协议。

一个长期运行的Paxos-based server必须丢弃那些不再需要的instance,从而释放存储那些instance信息的内存。如果还需要调用那些实例的Status()或者还有一个Paxos peer还没有对这个instance达到agreement。我们应该以如下方式来释放instance。当一个特定的peer应用不再需要对<=x的instance调用Status时,它应该调用Done(x)。但是Paxos peer还不能删除这个instance,因为可能还有其他的Paxos peer还没有对这个instance达到agreement。因此,每个Paxos peer都需要告诉其他peer,它能提供给Done的最大的参数。每个Paxos peer都会有一个来自于所有其他peer的Done value。它需要找到其中最小的,然后释放所有序号小于该最小值的序号。Min()方法返回最小序号加一。

我们的Paxos可以在agreement协议包中搭载Done value。这意味着允许peer P1在下次P2向P1发送agreement信息的时候才获得P2最新的Done值。如果调用Start()的序号小于Min(),那么该Start()调用将被忽略。如果调用Status()的序号小于Min(),那么Status()将会返回Forgotten。

接下来是Paxos伪代码(对于单个实例):

proposer(v):

  while not decided:

    choose n, unique and higher than any n seen so far

    send prepare(n) to all servers including self

    if prepare_ok(n, n_a, v_a) from majority:

      v'=v_a with highest n_a; choose own v otherwise

      send accept(n, v') to all

      if accept_ok(n) from majority:

        send decided(v') to all

acceptor's state:

  n_p (highest prepare seen)

  n_a, v_a (highest accept seen)

acceptor's prepare(n) handler:

  if n > n_p

    n_p = n

    reply prepare_ok(n, n_a, v_a)

  else

    reply prepare_reject

acceptor's accept(n, v) handler:

  if n >= n_p

    n_p = n

    n_a = n

    n_v = v

    reply accept_ok(n)

  else

    reply accept_reject

Here's a reasonable plan of attack:

1、根据伪代码,往paxos.go的Paxos结构中加入一些元素维护状态。我们需要定义一个结构用于保存每一个agreement instance的信息

2、根据伪代码,定义Paxos protocal message的RPC argument/reply 类型

3、写一个proposer函数用于为一个实例驱动Paxos协议以及一个PRC handler作为acceptor。对于每一个实例,在一个单独的线程启动proposer函数,如果需要的话。(比如在Start()函数里)

4、到此为止,一ing个能通过开始的一部分测试

5、现在可以实现forgetting

Hint: 在给定时刻可能有不止一个Paxos实例在执行,它们Start()或者decided的顺序可能不一致。(seq 10可能在seq 5之前决定)

Hint:为了通过那些假设网络不可靠的测试,我们应该使用函数调用而不是RPC来调用本地的acceptor

Hint:可能有多个peer针对同一个实例调用Start(),它们的建议的value可能不同。甚至还有可能针对一个已经decided的实例调用Start()

Hint:在开始写代码之前,先思考一下怎么让paxos忘掉老的实例信息。每一个Paxos peer需要在某些数据结构中存储实例信息,从而允许单个的实例信息能够被删除。(因此Go的GC能释放/重用内存)

Hint:我们不需要写代码对Paxos peer崩溃之后又重启的情况进行处理。如果有一个Paxos peer崩溃了,它永远不会重启

Hint:为每一个un-decided的实例启动一个线程,它的作用是扮演proposer,并最终促使实例达到共识

Hint:一个Paxos peer可能同时扮演同一个实例的acceptor和proposer的角色。尽可能将这两种活动分开

Hint:proposer需要一个方法来选择一个更高的proposer number。这是对proposer和acceptor应该尽量分开的一个合理的例外。如果propose handler 能够在拒绝一个RPC的时候返回一个最高的已知proposal number,从而让调用者能够在下次选用一个更高的number,这将是非常有用的。每个Paxos peer的px.me是不同的,所以我们可以使用px.me来确保proposal number是唯一的。

Hint:搞清楚在不存在故障的情况下,Paxos达到共识需要的最少的信息数目,并且让我们的实现使用这个最小值

Hint:当我们想要我们的Paxos关闭的时候就调用Kill(),Kill()会设置px.dead。在任何一个需要运行一定时间的循环里都要调用px.isdead(),并且在px.isdead()为true的时候跳出循环。这对于我们创建的需要长时间运行的线程格外重要。

Part B: Paxos-based Key/Value Server

我们现在将构建kvpaxos,一个fault-tolerant key/value storage system。kvpaxos replica需要保持一致;唯一的例外是有些replica因为unreachable而落后于其他replica。如果一个replica处于unreachalbe一段时间之后,然后又重新变得reachable,它需要最终能够catch up(重新获取所有遗失的操作)。

我们必须kvpaxos client需要尝试不同的replica,直到其中一个回复。一个kvpaxos replica,作为replicas的majority的一部分并且相互之间能互相通信,需要能够处理client request。

我们的storage system必须为使用它的接口的应用提供sequential consistency。这意味着,当完整的应用调用了kvpaxos/client.go中的Clerk.Get(),Clerk.Put(),Clerk.Append()之后,必须以同样的顺序影响所有的replica并且符合at-most-once semantics。一个Clerk.Get()操作需要能看到最近对于相同的key的Clerk.Put()或者Clerk.Append()操作写的值。这样做的一个结果是,我们必须保证Clerk.Put()或者Clerk.Append()只能操作一次(i.e,只写一次key/value database),即使client.go事实上已经发送了多个RPC,直到它找到了一个回复它的kvpaxos server replica。

接下来是实施方案:

1、用"value"信息填充server.go中的Op结构,其中"value"是kvpaxos用paxos来对每个client request达成共识的信息。Op的字段必须以大写字母开始。我们应该使用Op结构作为agreed-on values——比如,我们应该将Op结构传输给Paxos Start()。Go的RPC可以marshall/unmarshall Op结构;StartServer()中对gob.Register()的调用说明了这是怎么完成的。

2、实现server.go中的PutAppend() handler。它应该将Put或者Append Op加入Paxos log(i.e.,使用Paxos分配一个Paxos instance,它的值包括key和value(因此其他的kvpaxos就知道了该Put()或Append()))。一个Append Paxos log entry应该包含Append的参数,而不包括resulting value,因为result可能会很大。

3、实现Get() handler。它应该将Get Op加入Paxos log,并且在这之前"解析"该log从而确保它的key/value database包含了所有的Put()。

4、添加代码来处理重复的client request,包括client向一个kvpaxos replica发送一个请求,在等待回复的时候超时了,之后又向另一个不同的replica发送同样的请求。client的请求只能被执行一次。要确保重复检测方案能快速释放内存,比如通过让client告诉server哪些RPC它已经收到了回复。我们可以将这一信息包含在下一个client request中。

Hint:对于每一个client RPC,server应该为它们分配下一个可用的Paxos instance(sequence number)。但是一些其他的kvpaxos replica也可能将这个instance用于其他不同的client的操作。因此,kvpaxos server应该要能够尝试不同的instance。

Hint:kvpaxos server之间不能直接通信;它们应该通过Paxos log互相通信

Hint:和Lab 2中类似,我们应该唯一地确认一个client operation从而确保它们只被操作一次。同样和Lab 2类似,我们可以假设每个clerk当前只进行一个Put,Get或者Append操作。

Hint:在没有得到majority之前,kvpaxos server不能完成一个Get() RPC(从而确保不会输出过时的数据)。这意味着每个Get()(Put()和Append()也一样)必须包括Paxos agreement。

Hint:不要忘了调用Paxos Done()方法,在kvpaxos已经处理了一个instance并且不再需要它以及之前的instance的时候

Hint:我们的代码要能等待Paxos instance能完成agreement。唯一的方法是阶段性地调用Status(),并且在调用间隙休眠。那怎么休眠呢?一个比较好的方法是在一开始检查地频繁一些,然后渐渐变慢:

to := 10 * time.Millisecond

for {

  statusm _ := kv.px.Status(seq)

  if status == paxos.Decided{

    ...

    return

  }

  time.Sleep(to)

  if to < 10 * time.Second {

    to *= 2

  }

}

Hint:如果一个kvpaxos server落后了(i.e. 没有参加一些instance的agreement),它之后要获取需要共识哪些东西。一个合理的方法是调用Start(),它能够发现之前的agreed-to value或者导致agreement发生。思考一下在这种情况下应该给Start()传什么值。

Hint:当test 失败的时候,检查log中的gob error(e.g. "rpc: writing response: gob: type not registered for interface ..."),因为go不会考虑error fatal,虽然这对于lab来说是致命的

Paxos源码分析

Paxos的数据结构如下所示:

type Paxos struct {

  mu    sync.Mutext
  l      net.Listener
  dead    int32  // for testing
  unreliable int32  // for testing
  rpcCount  int32  // for testing
  peers    []string
  me      int  // index into peers[]


  // Your data here

}

  

// the application wants to create a paxos peer.

// the ports of all the paxos peers (including this one)

// are in peers[]. this servers port is peers[me].

func Make(peers []string, me int, rpcs *rpc.Server)

(1)初始化一个Paxos结构

(2)当rpcs 不为nil时,直接调用rpcs.Register(px),否则新建一个rpc server并注册使用

// the application wants paxos to start agreement on

// instance seq, with proposed value v.

// Start() returns right away; the application wil

// call Status() to find out if/when agreement is reached

func (px *Paxos) Start(seq int, v interface{})

// the application wants to know whether this

// peer thinks an instance has been decided,

// and if so what the agreed value is. Status()

// should just inspect the local peer state;

// it should not contact other Paxos peers.

func (px *Paxos) Status(seq int) (Fate, interface{})

// the application wants to know the highest instance sequence known to this peer

func (px *Paxos)  Max() int

// Min() should return one more than the minimum among z_i,

// where z_i is the highest number ever passed

// to Done() on peer i. A peers z_i is -1 if it has never called Done().

//

// Paxos is required to have forgotten all information about any instances it knows that are < Min().

// The point is to free up memory in long-running Paxos-based servers 

// Paxos peers need to exchange their highest Done() arguments in order to implement Min().

// These exchanges can be piggybacked on ordinary Paxos agreement protocol messages, so it is OK

// if one peers Min does not reflect another Peers Done() until after the next instance is agreed to.

//

// The fact that Min() is defined as a minimum over all Paxos peers means that Min() cannot increase

// until all peers have been heard from.So if a peer is dead or unreachable, other peers Min()s will not

// increase even if all reachable peers call Done.The reason for that is that when the unreachable peer

// comes back to life, it will need to catch up on instances that is missed -- the other peers therefor cannot

// forget these instances

func (px *Paxos) Min() int

KVPaxos 源码分析

KVPaxos结构如下所示:

type KVPaxos struct {

  mu    sync.Mutext
  l      net.Listener
  me    int
  dead    int32  // for testing
  unrealiable int32  // for testing

  px      *paxos.Paxos


  // Your definitions here.

}

  

// servers[] contains the ports of the set of servers that will cooperate via Paxos to

// form the fault-tolerant key/value service. me is the index of the current server in servers[].

func StartServer(servers []string, me int) *KVPaxos

(1)、先调用gob.Register(Op{})

(2)、创建并填充KVPaxos结构

(3)、创建一个RPC server并监听

(4)、调用kv.px = paxos.Make(servers, me, rpcs)

Clerk结构如下所示:

type Clerk struct {
  servers   []string
  // You will have to modify this struct.

}

  

---------------------------------------------- 测试框架分析 ------------------------------------------------

Part A:

func waitn(t *testing.T, pxa []*Paxos, seq int, wanted int):

每隔to := 10 * time.Millisecond调用一次ndecided(t, pxa, seq)函数,直到返回结果大于等于wanted的时候跳出,或者迭代次数超过30次。最后再调用一次ndecided(t, pxa, seq),如果小于wanted就报错

func waitmajority(t *testing.T, pxa []*Paxos, seq int)

该函数只是简单地调用wait(t, pxa, seq, (len(pxa)/2)+1)

func ndecided(t *testing.T pxa []*Paxos, seq int):

循环len(pxa)次,当pxa[i]不为空时,调用decided, v1 := pxa[i].Status(seq),如果状态为Decided,并且新得到的v1和之前得到的v1不同,就报错

func part(t *testing.T, tag string, npaxos int, p1 []int, p2 int[], p3 int[])

针对p1,p2,p3分别,ij := pp(tag, p[i], p[j]),pj := port(tag, p[j])和os.Link(pj, ij)

func checkmax(t *testing.T, pxa []*Paxos, seq int, max int)

先休眠3秒,再调用nd := ndecided(t, pxa, seq),若nd > max则报错

func TestBasic(t *testing.T):

创建常量npaxos为3,npaxos个Paxos实例,npaxos大小的pxh []string,

Test: Single proposer ...

调用pxa[0].Start(0, "hello")并调用waitn(t, pxa, 0, npaxos)

Test:Many proposers, same value ...

调用npaxos次pxa[i].Start(1, 77)并且调用waitn(t, pxa, 1, npaxos)

Test: Many proposers, different values...

分别调用pxa[0].Start(2, 100),pxa[1].Start(2, 101)和pxa[2](2, 102),再调用waitn(t, pxa, 2, npaxos)

Test: Out-of-order instances ...

(1)、依次调用pxa[0].Start(7, 700),pxa[0].Start(6, 600)和pxa[1].Start(5, 500),再调用waitn(t, pxa, 7, npaxos)

(2)、再调用pxa[0].Start(4, 400)和pxa[1].Start(3, 300),再依次针对seq 6, 5, 4, 3调用waitn

func TestDeaf(t *testing.T)

创建常量npaxos=5,npaxos个大小的Paxos实例,npaxos大小的phx []string

Test: Deaf proposer ...

(1)、调用pxa[0].Start(0, "hello")并调用waitn(t, pxa, 0, npaxos)达到共识

(2)、删除port,phx[0]和phx[npaxos-1]

(3)、调用pxa[1].Start(1, "goodbye")并调用waitmajority(t, pxa, 1),休眠一秒之后,若ndecided(t, pxa, 1) != npaxos-2则报错

(4)、调用pxa[0].Start(1, "xxx")并调用waitn(t, pxa, 1, npaxos-1),休眠一秒之后,若ndecided(t, pxa, 1) !=npaxos-1则报错

(5)、调用pxa[npaxos-1].Start(1, "yyy")并调用waitn(t, pxa, 1, npaxos)

func TestForget(t *testing.T)

创建常量npaxos=6,npaxos个大小的Paxos实例,npaxos大小的phx []string

Test: Forgetting...

(1)、测试每个Paxos实例的Min()是否初始化正确

(2)、调用pxa[0].Start(0, "00"),pxa[1].Start(1, "11"),pxa[2].Start(2, "22"),pxa[0].Start(6, "66")和pxa[1].Start(7, "77"),再调用waitn(t, pxa, 0, npaxos),并检测每个Paxos实例的Min()是否正确

(3)、pxa 0~npaxos-1调用Done(0),pxa 1~npaxos-1调用Done(1),pxa 0~npaxos-1调用Start(8+i, "xx")

(4)、迭代12次,直到所有的pxa的Min()都为1时退出,否则出错

func TestManyForget(t *testing.T)

创建常量npaxos=3,npaxos个大小的Paxos实例并且setunreliable(true),npaxos大小的phx []string

Test: Lots of forgettting...

(1)、启动一个goroutine,随机选取pxa来start(seq, v),共执行maxseq=20个instances

(2)、启动一个goroutine,随机选取seq和i,当seq >= pxa[i].Min()并且pxa[i].Status(seq)为Decided时,调用pxa[i].Done(seq),运行五秒结束

(3)、对每个pxa调用setunreliable(false),再休眠2s

(4)、遍历maxseq,对于每个大于等于pxa[i].Min()的seq调用pxa[i].Status(seq)函数

// does paxos forgetting actually free the memory?

func TestForgetMem(t *testing.T)

创建常量npaxos=3,npaxos个大小的Paxos实例,npaxos大小的phx []string

Test: Paxos frees forgotten instance memory...

(1)、调用pxa[0].Start(0, "x")并达到一致,再调用runtime.GC()和runtime.ReadMemStats(&m0)

(2)、循环十次,每次调用pxa[0].Start(i, string(big)),其中big是一个长度为1M的字符串,再调用一次runtime.GC()以及runtime.ReadMemStats(&m1)

(3)、每个pxa调用Done(10),再对每个pxa调用Start(11+i, "z"),并休眠三秒之后检验pxa[i].Min()不为11,则报错

(4)、调用runtime.GC()以及runtime.ReadMemStats(&m2),若m2.Alloc > (m1.Alloc / 2)则报错

(5)、再对已经Forgotten的instance进行检测,并且尝试对这些old instance进行Start(),检测instance的状态是否为Forgotten以及value是否有更新

// many agreements (without failures)

func TestMany(t *testing.T)

创建常量npaxos = 3, npaxos个大小的Paxos实例并且Start(0, 0)

设置创建ninst = 50,让npaxos个实例分别对每个实例propose不同的value并直到达到共识为止

// many agreements, with unreliable RPC

func TestManyUnreliable(t *testing.T)

创建常量npaxos=3,npaxos个大小的Paxos实例,且都调用setunreliable(true)和Start(0, 0)

设置常量ninst = 50,让npaxos个实例分别对每个实例propose不同的value并直到达到共识为止

func TestPartition(t *testing.T)

创建常量npaxos = 5,npaxos个大小的Paxos实例,但是每个实例的port列表都不同,需要通过调用part函数,其中分在同一个part中的peer才能互相通信。

Test: No decision if partitioned ...

调用part(t, tag, npaxos, []int{0, 2}, []int{1, 3}, []int{4}),pxa[1].Start(seq, 111),再调用checkmax(t, pxa, seq, 0)

Test: Decision in majority partition...

调用part(t, tag, npaxos, []int{0}, []int{1, 2, 3}, []int{4}),此时之前的pxa[1].Start开始生效,休眠两秒,调用waitmajority(t, pxa, seq)

Test: All agree after full heal...

调用pxa[0].Start(seq, 1000)和pxa[4].Start(seq, 1004),最后再调用part(t, tag, npaxos, []int{0, 1, 2, 3, 4}, []int{}, []int{})从而让整个集群联通,最后调用waitn(t, pxa, seq, npaxos)

Test: One peer switches partitions...

(1)、遍历20次,每次先调用part(t, tag, npaxos, []int{0, 1, 2}, []int{3, 4}, []int{}

(2)、再调用pxa[0].Start(seq, seq*10)和pxa[3].Start(seq, (seq*10)+1),以及waitmajority(t, pxa, seq)

(3)、再调用part(t, tag, npaxos, []int{0, 1}, []int{2, 3, 4}, []int{})和waitn(t, pxa, seq, npaxos) 

Test: One peer switches partitions, unreliable...
(1)、遍历20次,对每个pxa调用setunreliable(true)

(2)、调用part(t, tag, npaxos, []int{0, 1, 2}, []int{3, 4}, []int{})

(3)、针对npaxos个paxos调用pxa[i].Start(seq, (seq*10)+i),再调用waitn(t, pxa, seq, 3),如果ndecided(t, pxa, seq) > 3则报错

(4)、再调用part(t, tag, npaxos, []int{0, 1}, []int{2, 3, 4}, []int{})

(5)、最后对每个pxa调用setunreliable(false)函数,并调用waitn(t, pxa, seq, 5)。

Part B:

func TestBasic(t *testing.T):

(1)、设置常量nserver = 3,创建并启动nserver个KVPaxos实例

(2)、调用ck := MakeClerk(kvh),以及nservers个cka[i] = MakeClerk([]string{kvh[i]})

Test:Basic put/append/get ...

对ck调用两个append以及一个Put操作,再对cka[1]调用一个Put操作

Test:Concurrent clients...

迭代做20次操作,对一个key并行地进行Put或者Get操作,最后对每个server做Get操作,判断结果是否一致

func TestDone(t *testing.T):

原文地址:https://www.cnblogs.com/YaoDD/p/6144983.html