etcd系列-----raft协议:重要数据结构介绍(Entry、
Message、stor。。。
⼀些基础结构体介绍:
Entry
Entry记录:在前⾯介绍Raft协议时提到,节点之间传递的是消息(Message),每条消息中可以携带多条Entry记录,每条Entry记录对
应⼀个独⽴的操作。
在Entry中其中封装了如下信息:
:Term(uint64类型):该Entry所在的任期号。
:Index(uint64类型):该Entry对应的索引号。
:Type(EntryType类型):该Entry记录的类型。该字段有两个可选项:⼀个是EntryNormal,表⽰普通的数据操作;另⼀个是
EntryConfChange,表⽰集群的变更操作。
:Data([]byte类型):具体操作使⽤的数据。
记录的本地Log的基本单位也是Entry记录。有的⽂章也会将Entry记录称为“⽇志记录”,在etcd中还有⼀个WAL⽇志的概念,这两者并
⾮完全等价,所以需要注意⼀下,避免两者混淆。
Message
在每个节点中在raft模块中另⼀个⽐较重要结构体就是e。在raft模块的实现中,Message是所有消息的抽象,包括了各种
类型消息所需要的字段,其中核⼼宇段的含义如下。
:Type(MessageType类型):该字段定义了消息的类型,raft实现中就是通过该字段区分不同的消息井进⾏分类处理
的,MessageType中共定义了19种消息类型,后⾯会介绍每种消息类型的含义及相应的处理⽅式。
:From(uint64类型):发送消息的节点ID。在集群中,每个节点都拥有⼀个唯⼀ID作为标识。
:To(uint64类型):消息的⽬标节点ID。
:Term(uint64类型):发送消息的节点的Term值。如果Term值为0,则为本地消息,在etcd刊负模块的实现中,对本地消息进⾏
特殊处理。
:Entries([]Entry类型):如果是MsgApp类型的消息,则该字段中保存了Leader节点复制到Follower节点的Entry记录。在其他类型
消息中,该字段的含义后⾯会详细介绍。
:LogTerm(uint64类型):该消息携带的第⼀条Entry记录的Term值。
:Index(uint64类型):记录索引值,该索引值的具体含义与消息的类型相关。例如,MsgApp消息的Index宇段保存了其携带的
Entry记录(即Entries字段)中前⼀条记录的Index值,⽽MsgAppResp消息的Index字段则是Follower节点提⽰Leader节点下次从哪个
位置开始发送Entry记录。
:Commit(uint64类型):消息发送节点的提交位置(commitlndex)。
:Snapshot(Snapshot类型):在传输快照时,该字段保存了快照数据。
:Reject(bool类型):主要⽤于响应类型的消息,表⽰是否拒绝收到的消息。例如,Follower节点收到Leader节点发来的MsgApp
消息,如果Follower节点发现MsgApp消息携带的Entry记录并不能直接追加到本地的raftLog中,则会将响应消息的Reject宇段设置为
true,并且会在RejectHint字段中记录合适的Entry索引值,供Leader节点参考。
:RejectHint(uint64类型):在Follower节点拒绝Leader节点的消息之后,会在该字段记录⼀个Entry索引值供Leader节点。
raft结构体
在etcd-ra负模块中,raft结构体是其核⼼数据结构,在结构体raft中封装了当前节点所有的
核⼼数据。先看其核⼼字段。
:id(uint64类型):当前节点在集群的ID
:Term(uint64类型):当前任期号。如果Message的Term字段为0,则表⽰该消息是本地消息,例如,MsgHup、MsgProp、
MsgReadlndex等消息,都属于本地消息。
:Vote(uint64类型):当前任期中当前节点将选票投给了哪个节点,未投票时,该字段为None。
:raftlog(*raftlog类型):在前⾯介绍过,在Ra企协议中的每个节点都会记录本地Log,在raft模块中,使⽤结构体raftLog表⽰本地
Log,在raftLog中还涉及⽇志的缓存等相关内容,后⾯会介绍。
:maxlnflight(int类型):对于当前节点来说,⼰经发送出去但未收到响应的消息个数上限。如果处于该状态的消息超过maxlnflight
这个|现值,则暂停当前节点的消息发送,这是为了防⽌集群中的某个节点不断发送消息,引起⽹络阻塞或是压垮其他节点,从⽽影响其
他节点的正常运⾏。
:maxMsgSize(uint64类型):单条消息的最⼤字节数。
:prs(map[uint64]*Progress类型):在前⾯介绍Raft协议时提到过,Leader节点会记录集群中其他节点的⽇志复制情况
(Nextlndex和Matchlndex)。在raft模块中,每个Follower节点对应的Nextlndex值和Matchlndex值都封装在Progress实例中,除此之
外,每个Progress实例中还封装了对应Follower节点的相关信息,这⾥简单介绍主要字段。
.Match(uint64类型):对应Follower节点当前⼰经成功复制的Entry记录的索引值。
.Next(uint64类型):对应Follower节点下⼀个待复制的Entry记录的索引值。
.State(ProgressState丁ype类型):对应Follower节点的复制状态,其可选项的含义后⾯详细介绍。
.Paud(booI类型):当前Leader节点是否可以向该Progress实例对应的Follower节点发送消息。
.PendingSnapshot(uint64类型):当前正在发送的快照数据信息。
.RecentActive(bool类型):从当前Leader节点的⾓度来看,该Progress实例对应的Follower节点是否存活。
.ins(*inflights类型):记录了⼰经发送出去但未收到响应的消息信息。
:state(StateType类型):当前节点在集群中的⾓⾊,可选值分为StateFollower、StateCandidate、StateLeader和
StatePreCandidat巳四种状态。
:votes(map[uint64]bool类型):在选举过程,如果当前节点收到了来⾃某个节点的投票,则会将votes中对应的值设置为true,通
过统计votes这个map,就可以确定当前节点收到的投票是否超过半数。
:msgs([]e类型):缓存了当前节点等待发送的消息。
:lead(uint64类型):当前集群中Leader节点的ID。
:electionElapd(int类型):选举计时器的指针,其单位是逻辑时钟的刻度,逻辑时钟每推进⼀次,该字段值就会增加1。
:electionTimeout(int类型):选举超时时间,当electionE!apd宇段值到达该值时,就会触发新⼀轮的选举。
:heartbeatElapd(int类型):⼼跳计时器的指针,其单位也是逻辑时钟的刻度,逻辑时钟每推进⼀次,该字段值就会增加1。
:heartbeatTimeout(int类型):⼼跳超时时间,当heartbeatElapd字段值到达该值时,就会触发Leader节点发送⼀条⼼跳消息。
:tick(func()类型):当前节点推进逻辑时钟的函数。如果当前节点是Leader,则指向artbeat()函数,如果当前节点
是Follower或是Candidate,则指向ection()函数。
:step(stepFunc类型):当前节点收到消息时的处理函数。如果是Leader节点,则该字段指向stepLeader()函数,如果是
Follower节点,则该字段指向stepFollower()函数,如果是处于preVote阶段的节点或是Candidate节点,则该字段指向
stepCandidate()函数。
Config结构体
Config结构体主要⽤于配置参数的传递,在创建raft实例时需要的参数会通过Config实例传递进去。Config的主要字段如下。
:ID(uint64类型):当前节点的ID。
:peers([]uint64类型):记录了集群中所有节点的ID。
:ElectionTick(int类型):⽤于初始化onTimeout,即逻辑时钟连续推进多少次后,就会触发Follower节点的状态切换及新
⼀轮的Leader选举。
:HeartbeatTick(int类型):⽤于初始化raftheartbeatTimeout,即逻辑时钟连续推进多少次后,就触发Leader节点发送⼼跳消息。
:Storage(Storage类型):当前节点保存raft⽇志记录使⽤的存储,后⾯会j接收其接⼝及其实现。
:Applied(uint64类型):当前已经应⽤的记录位置(⼰应⽤的最后⼀条Entry记录的索引值),该值在节点重启时需要设置,否则会重
新应⽤⼰经应⽤过ntry记录。
:MaxSizePerMsg(uint64类型):⽤于初始化Size字段,每条消息的最⼤字节数。
:MaxlnflightMsgs(int类型):⽤于初始化ra丘maxlnflight,即已经发送出去且未收到响应的最⼤消息个数。
Storage
MemoryStorage是raft模块为Storage接⼝提供的⼀个实现,从名字也能看出,MemoryStorage在内存中维护上述状态信息
(hardState字段)、快照数据(snapshot宇段)及所有的Entry记录(ents字段,[]类型〕,在
字段中维护了快照数据之后的所有Entry记录。另外需要注意的是,MemoryStorage继承了,MemoryStorage中的⼤部分操
作是需要加锁同步的。通过这⾥的介绍,我们⼤概可以了解MemoryStorage的结构,如图⽰
MemoryStorage中追加Entry记录,该功能主要由()⽅法完成:
func(ms*MemoryStorage)Append(entries[])error{
iflen(entries)==0{
returnnil
}
()
()
first:=ndex()
last:=entries[0].Index+uint64(len(entries))-1
//shortcutifthereisnonewentry.
iflast
returnnil//entries切⽚中所有的Entry都已经过时,⽆须添加任何Entry
}
//first之前的Entry已经记⼊Snapshot中,不应该再记录到ents中,所以将这部分Entry截掉
iffirst>entries[0].Index{
entries=entries[first-entries[0].Index:]
}
//计算entries切⽚中第⼀条可⽤的Entry与first之间的差距
offt:=entries[0].[0].Index
switch{
cauint64(len())>offt:
//保留中first~offt的部分,offt之后的部分被抛弃
//然后将待追加的Entry追加到中
=append([]{},[:offt]...)
=append(,entries...)
cauint64(len())==offt:
//直接将待追加的⽇志记录(entries)追加到MemoryStorage中
=append(,entries...)
default:
("missinglogentry[last:%d,appendat:%d]",
dex(),entries[0].Index)
}
returnnil
}
随着系统的运⾏,中保存的En位y记录会不断增加,为了减⼩内存的压⼒,定期创建快照来记录当前节点的状态并
压缩数组的空间是⾮常有必要的,这样就可以降低内存使⽤。这个过程中涉及三个⽅法,⾸先是
CreateSnapshot()⽅法,它会接收当前集群状态,以及SnapShot相关数据来更新snapshot字段,具体实现如下:
//简单说明该⽅法的参数:i是新建Snapshot包含的最⼤的索引值,cs是当前集群的状态,data是新建Snapshot的具体数据
func(ms*MemoryStorage)CreateSnapshot(iuint64,cs*ate,data[]byte)(ot,error){
()
()
//边界检查,l必须⼤于当前Snapshot包含的最⼤Index佳,并且⼩于MemoryStorage的LastIndex佳,否则抛出异常
ifi<={
ot{},ErrSnapOutOfDate
}
offt:=[0].Index
ifi>dex(){
("snapshot%disoutofboundlastindex(%d)",i,dex())
}
//更新ot的元数据
=i
=[i-offt].Term
ifcs!=nil{
ate=*cs
}
//更新具体的快照数据
=data
ot,nil
}
新建Snapshot之后,⼀般会调⽤t()⽅法将中指定索引之前的Entry记录全部抛弃,从
⽽实现压缩的⽬的,具体实现如下:
func(ms*MemoryStorage)Compact(compactIndexuint64)error{
()
()
offt:=[0].Index
//边界检测
ifcompactIndex<=offt{
returnErrCompacted
}
ifcompactIndex>dex(){
("compact%disoutofboundlastindex(%d)",compactIndex,dex())
}
//创建新的切⽚,⽤来存储compactIndex之后的Entry
i:=compactIndex-offt
ents:=make([],1,1+uint64(len())-i)
ents[0].Index=[i].Index
ents[0].Term=[i].Term
//将compactlndex之后的Entry拷贝到ents中,并更新字段
ents=append(ents,[i+1:]...)
=ents
returnnil
}
最后,上层模块可以通过ot()⽅法获取SnapShot。
unstable结构体
unstable使⽤内存数组维护其中所有的Entry记录,对于Leader节点⽽⾔,它维护了客户端请求对应的Entry记录;对于Follower节点⽽
⾔,它维护的是从Leader节点复制来的Entry记录。⽆论是Leader节点还是Follower节点,对于刚刚接收到的Entry记录⾸先都会被存储在
unstable中。然后按照Raft协议将unstable中缓存的这些Entry记录交给上层模块进⾏处理,上层模块会将这些Entry记录发送到集群其他
节点或进⾏保存(写⼊Storage中)。之后,上层模块会调⽤Advance()⽅法通知底层的raft模块将unstable中对应的Entry记录删除
(因为⼰经保存到了Storage中)。正因为unstable中保存的Entry记录并未进⾏持久化,可能会因节点故障⽽意外丢失,所以被称为
unstable。
unstable中的主要字段。
:entries([]类型):⽤于保存未写⼊Storage中的Entry记录。
:offt(uint64类型):entries中的第⼀条Entry记录的索引值。
:snapshot(ot类型):快照数据,该快照数据也是未写⼊Storage中的。
在unstable中提供了很多与Storage类似的⽅法,在raftLog中,很多⽅法都是先尝试调⽤unstable的相应⽅法,在其失败后(unstable
的⽅法返回(0,fal)即表⽰失败),再尝试调⽤Storage的对应⽅法。
irstlndex()⽅法会尝试获取unstable的第⼀条Entry记录的索引值,astlndex()⽅法会尝试获
取unstable的最后⼀条Entry记录的索引值,如果获取失败则返回(0,fal),erm()⽅法的主要功能是尝试获取指
定Entry记录的Term值,根据条件查找指定的Entry记录的位置。
当s中的Entry记录⼰经被写⼊Storage之后,会调⽤To()⽅法清除entries中对应的Entry记
录,stableTo()⽅法的具体实现如下:
func(u*unstable)stableTo(i,tuint64){
//查找指定Entry记录的Term佳,若查找失败则表⽰对应的Entry不在unstable中,直接返回
gt,ok:=erm(i)
if!ok{
return
}
//ifi
//onlyupdatetheunstableentriesiftermismatchedwith
//anunstableentry.
ifgt==t&&i>={
//指定索引位之前的Entry记录都已经完成持久化,则将其之前的全部Entry记录删除
s=s[i+:]
=i+1
//随着多次追加⽇志和截断⽇志的操作s底层的数组会越来越⼤,shrinkEntriesArray⽅法会在底层数组长度超过实际占⽤的两倍时,对底层数据进⾏缩
EntriesArray()
}
}
func(u*unstable)shrinkEntriesArray(){
//Wereplacethearrayifwe'reusinglessthanhalfofthespacein
//mberisfairlyarbitrary,chonasanattempttobalance
//dprobablybeimproved
//withsomefocudtuning.
constlenMultiple=2
iflen(s)==0{
s=nil
}eliflen(s)*lenMultiple
//重新创建切⽚,复制原有切⽚中的数据,重直entries字段
newEntries:=make([],len(s))
copy(newEntries,s)
s=newEntries
}
}
同理,当ot字段指向的快照被写⼊Storage之后,会调⽤SnapTo()⽅法将snapshot字段清
空.teAndAppend()⽅法的主要功能是向s中追加Entry记录其实现与()⽅法类似也
会涉及截断的场景:
func(u*unstable)truncateAndAppend(ents[]){
//获取第⼀条待追加的Entry记录的索引值
after:=ents[0].Index
switch{
caafter==+uint64(len(s)):
//s
//directlyappend
//若待追加的记录与e⼝tries中的记录正好连续,则可以直接向entries中追加
s=append(s,ents...)
caafter<=:
//直接⽤待追加的Entry记录替换当前的entries字段,并⽀新offt
("replacetheunstableentriesfromindex%d",after)
//Thelogisbeingtruncatedtobeforeourcurrentofft
//portion,sottheofftandreplacetheentries
=after
s=ents
default:
//after在offt~last之间,则after~last之间的Entry记录冲突。这⾥会将offt~after之间的记录保留,抛弃after之后的记录,然后完成追加操作
//()⽅法会检测after是否合法,并返回offt~after的切⽚
//s
//thenappend
("truncatetheunstableentriesbeforeindex%d",after)
s=append([]{},(,after)...)
s=append(s,ents...)
}
}
本文发布于:2023-01-02 15:04:34,感谢您对本站的认可!
本文链接:http://www.wtabcd.cn/fanwen/fan/90/78436.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
| 留言与评论(共有 0 条评论) |