entry

更新时间:2023-01-02 15:04:34 阅读: 评论:0


2023年1月2日发(作者:英文dtad什么意思)

etcd系列-----raft协议:重要数据结构介绍(Entry、

Message、stor。。。

⼀些基础结构体介绍:

Entry

Entry记录:在前⾯介绍Raft协议时提到,节点之间传递的是消息(Message),每条消息中可以携带多条Entry记录,每条Entry记录对

应⼀个独⽴的操作。

在Entry中其中封装了如下信息:

:Term(uint64类型):该Entry所在的任期号。

:Index(uint64类型):该Entry对应的索引号。

:Type(EntryType类型):该Entry记录的类型。该字段有两个可选项:⼀个是EntryNormal,表⽰普通的数据操作;另⼀个是

EntryConfChange,表⽰集群的变更操作。

:Data([]byte类型):具体操作使⽤的数据。

记录的本地Log的基本单位也是Entry记录。有的⽂章也会将Entry记录称为“⽇志记录”,在etcd中还有⼀个WAL⽇志的概念,这两者并

⾮完全等价,所以需要注意⼀下,避免两者混淆。

Message

在每个节点中在raft模块中另⼀个⽐较重要结构体就是e。在raft模块的实现中,Message是所有消息的抽象,包括了各种

类型消息所需要的字段,其中核⼼宇段的含义如下。

:Type(MessageType类型):该字段定义了消息的类型,raft实现中就是通过该字段区分不同的消息井进⾏分类处理

的,MessageType中共定义了19种消息类型,后⾯会介绍每种消息类型的含义及相应的处理⽅式。

:From(uint64类型):发送消息的节点ID。在集群中,每个节点都拥有⼀个唯⼀ID作为标识。

:To(uint64类型):消息的⽬标节点ID。

:Term(uint64类型):发送消息的节点的Term值。如果Term值为0,则为本地消息,在etcd刊负模块的实现中,对本地消息进⾏

特殊处理。

:Entries([]Entry类型):如果是MsgApp类型的消息,则该字段中保存了Leader节点复制到Follower节点的Entry记录。在其他类型

消息中,该字段的含义后⾯会详细介绍。

:LogTerm(uint64类型):该消息携带的第⼀条Entry记录的Term值。

:Index(uint64类型):记录索引值,该索引值的具体含义与消息的类型相关。例如,MsgApp消息的Index宇段保存了其携带的

Entry记录(即Entries字段)中前⼀条记录的Index值,⽽MsgAppResp消息的Index字段则是Follower节点提⽰Leader节点下次从哪个

位置开始发送Entry记录。

:Commit(uint64类型):消息发送节点的提交位置(commitlndex)。

:Snapshot(Snapshot类型):在传输快照时,该字段保存了快照数据。

:Reject(bool类型):主要⽤于响应类型的消息,表⽰是否拒绝收到的消息。例如,Follower节点收到Leader节点发来的MsgApp

消息,如果Follower节点发现MsgApp消息携带的Entry记录并不能直接追加到本地的raftLog中,则会将响应消息的Reject宇段设置为

true,并且会在RejectHint字段中记录合适的Entry索引值,供Leader节点参考。

:RejectHint(uint64类型):在Follower节点拒绝Leader节点的消息之后,会在该字段记录⼀个Entry索引值供Leader节点。

raft结构体

在etcd-ra负模块中,raft结构体是其核⼼数据结构,在结构体raft中封装了当前节点所有的

核⼼数据。先看其核⼼字段。

:id(uint64类型):当前节点在集群的ID

:Term(uint64类型):当前任期号。如果Message的Term字段为0,则表⽰该消息是本地消息,例如,MsgHup、MsgProp、

MsgReadlndex等消息,都属于本地消息。

:Vote(uint64类型):当前任期中当前节点将选票投给了哪个节点,未投票时,该字段为None。

:raftlog(*raftlog类型):在前⾯介绍过,在Ra企协议中的每个节点都会记录本地Log,在raft模块中,使⽤结构体raftLog表⽰本地

Log,在raftLog中还涉及⽇志的缓存等相关内容,后⾯会介绍。

:maxlnflight(int类型):对于当前节点来说,⼰经发送出去但未收到响应的消息个数上限。如果处于该状态的消息超过maxlnflight

这个|现值,则暂停当前节点的消息发送,这是为了防⽌集群中的某个节点不断发送消息,引起⽹络阻塞或是压垮其他节点,从⽽影响其

他节点的正常运⾏。

:maxMsgSize(uint64类型):单条消息的最⼤字节数。

:prs(map[uint64]*Progress类型):在前⾯介绍Raft协议时提到过,Leader节点会记录集群中其他节点的⽇志复制情况

(Nextlndex和Matchlndex)。在raft模块中,每个Follower节点对应的Nextlndex值和Matchlndex值都封装在Progress实例中,除此之

外,每个Progress实例中还封装了对应Follower节点的相关信息,这⾥简单介绍主要字段。

.Match(uint64类型):对应Follower节点当前⼰经成功复制的Entry记录的索引值。

.Next(uint64类型):对应Follower节点下⼀个待复制的Entry记录的索引值。

.State(ProgressState丁ype类型):对应Follower节点的复制状态,其可选项的含义后⾯详细介绍。

.Paud(booI类型):当前Leader节点是否可以向该Progress实例对应的Follower节点发送消息。

.PendingSnapshot(uint64类型):当前正在发送的快照数据信息。

.RecentActive(bool类型):从当前Leader节点的⾓度来看,该Progress实例对应的Follower节点是否存活。

.ins(*inflights类型):记录了⼰经发送出去但未收到响应的消息信息。

:state(StateType类型):当前节点在集群中的⾓⾊,可选值分为StateFollower、StateCandidate、StateLeader和

StatePreCandidat巳四种状态。

:votes(map[uint64]bool类型):在选举过程,如果当前节点收到了来⾃某个节点的投票,则会将votes中对应的值设置为true,通

过统计votes这个map,就可以确定当前节点收到的投票是否超过半数。

:msgs([]e类型):缓存了当前节点等待发送的消息。

:lead(uint64类型):当前集群中Leader节点的ID。

:electionElapd(int类型):选举计时器的指针,其单位是逻辑时钟的刻度,逻辑时钟每推进⼀次,该字段值就会增加1。

:electionTimeout(int类型):选举超时时间,当electionE!apd宇段值到达该值时,就会触发新⼀轮的选举。

:heartbeatElapd(int类型):⼼跳计时器的指针,其单位也是逻辑时钟的刻度,逻辑时钟每推进⼀次,该字段值就会增加1。

:heartbeatTimeout(int类型):⼼跳超时时间,当heartbeatElapd字段值到达该值时,就会触发Leader节点发送⼀条⼼跳消息。

:tick(func()类型):当前节点推进逻辑时钟的函数。如果当前节点是Leader,则指向artbeat()函数,如果当前节点

是Follower或是Candidate,则指向ection()函数。

:step(stepFunc类型):当前节点收到消息时的处理函数。如果是Leader节点,则该字段指向stepLeader()函数,如果是

Follower节点,则该字段指向stepFollower()函数,如果是处于preVote阶段的节点或是Candidate节点,则该字段指向

stepCandidate()函数。

Config结构体

Config结构体主要⽤于配置参数的传递,在创建raft实例时需要的参数会通过Config实例传递进去。Config的主要字段如下。

:ID(uint64类型):当前节点的ID。

:peers([]uint64类型):记录了集群中所有节点的ID。

:ElectionTick(int类型):⽤于初始化onTimeout,即逻辑时钟连续推进多少次后,就会触发Follower节点的状态切换及新

⼀轮的Leader选举。

:HeartbeatTick(int类型):⽤于初始化raftheartbeatTimeout,即逻辑时钟连续推进多少次后,就触发Leader节点发送⼼跳消息。

:Storage(Storage类型):当前节点保存raft⽇志记录使⽤的存储,后⾯会j接收其接⼝及其实现。

:Applied(uint64类型):当前已经应⽤的记录位置(⼰应⽤的最后⼀条Entry记录的索引值),该值在节点重启时需要设置,否则会重

新应⽤⼰经应⽤过ntry记录。

:MaxSizePerMsg(uint64类型):⽤于初始化Size字段,每条消息的最⼤字节数。

:MaxlnflightMsgs(int类型):⽤于初始化ra丘maxlnflight,即已经发送出去且未收到响应的最⼤消息个数。

Storage

MemoryStorage是raft模块为Storage接⼝提供的⼀个实现,从名字也能看出,MemoryStorage在内存中维护上述状态信息

(hardState字段)、快照数据(snapshot宇段)及所有的Entry记录(ents字段,[]类型〕,在

字段中维护了快照数据之后的所有Entry记录。另外需要注意的是,MemoryStorage继承了,MemoryStorage中的⼤部分操

作是需要加锁同步的。通过这⾥的介绍,我们⼤概可以了解MemoryStorage的结构,如图⽰

MemoryStorage中追加Entry记录,该功能主要由()⽅法完成:

func(ms*MemoryStorage)Append(entries[])error{

iflen(entries)==0{

returnnil

}

()

()

first:=ndex()

last:=entries[0].Index+uint64(len(entries))-1

//shortcutifthereisnonewentry.

iflast

returnnil//entries切⽚中所有的Entry都已经过时,⽆须添加任何Entry

}

//first之前的Entry已经记⼊Snapshot中,不应该再记录到ents中,所以将这部分Entry截掉

iffirst>entries[0].Index{

entries=entries[first-entries[0].Index:]

}

//计算entries切⽚中第⼀条可⽤的Entry与first之间的差距

offt:=entries[0].[0].Index

switch{

cauint64(len())>offt:

//保留中first~offt的部分,offt之后的部分被抛弃

//然后将待追加的Entry追加到中

=append([]{},[:offt]...)

=append(,entries...)

cauint64(len())==offt:

//直接将待追加的⽇志记录(entries)追加到MemoryStorage中

=append(,entries...)

default:

("missinglogentry[last:%d,appendat:%d]",

dex(),entries[0].Index)

}

returnnil

}

随着系统的运⾏,中保存的En位y记录会不断增加,为了减⼩内存的压⼒,定期创建快照来记录当前节点的状态并

压缩数组的空间是⾮常有必要的,这样就可以降低内存使⽤。这个过程中涉及三个⽅法,⾸先是

CreateSnapshot()⽅法,它会接收当前集群状态,以及SnapShot相关数据来更新snapshot字段,具体实现如下:

//简单说明该⽅法的参数:i是新建Snapshot包含的最⼤的索引值,cs是当前集群的状态,data是新建Snapshot的具体数据

func(ms*MemoryStorage)CreateSnapshot(iuint64,cs*ate,data[]byte)(ot,error){

()

()

//边界检查,l必须⼤于当前Snapshot包含的最⼤Index佳,并且⼩于MemoryStorage的LastIndex佳,否则抛出异常

ifi<={

ot{},ErrSnapOutOfDate

}

offt:=[0].Index

ifi>dex(){

("snapshot%disoutofboundlastindex(%d)",i,dex())

}

//更新ot的元数据

=i

=[i-offt].Term

ifcs!=nil{

ate=*cs

}

//更新具体的快照数据

=data

ot,nil

}

新建Snapshot之后,⼀般会调⽤t()⽅法将中指定索引之前的Entry记录全部抛弃,从

⽽实现压缩的⽬的,具体实现如下:

func(ms*MemoryStorage)Compact(compactIndexuint64)error{

()

()

offt:=[0].Index

//边界检测

ifcompactIndex<=offt{

returnErrCompacted

}

ifcompactIndex>dex(){

("compact%disoutofboundlastindex(%d)",compactIndex,dex())

}

//创建新的切⽚,⽤来存储compactIndex之后的Entry

i:=compactIndex-offt

ents:=make([],1,1+uint64(len())-i)

ents[0].Index=[i].Index

ents[0].Term=[i].Term

//将compactlndex之后的Entry拷贝到ents中,并更新字段

ents=append(ents,[i+1:]...)

=ents

returnnil

}

最后,上层模块可以通过ot()⽅法获取SnapShot。

unstable结构体

unstable使⽤内存数组维护其中所有的Entry记录,对于Leader节点⽽⾔,它维护了客户端请求对应的Entry记录;对于Follower节点⽽

⾔,它维护的是从Leader节点复制来的Entry记录。⽆论是Leader节点还是Follower节点,对于刚刚接收到的Entry记录⾸先都会被存储在

unstable中。然后按照Raft协议将unstable中缓存的这些Entry记录交给上层模块进⾏处理,上层模块会将这些Entry记录发送到集群其他

节点或进⾏保存(写⼊Storage中)。之后,上层模块会调⽤Advance()⽅法通知底层的raft模块将unstable中对应的Entry记录删除

(因为⼰经保存到了Storage中)。正因为unstable中保存的Entry记录并未进⾏持久化,可能会因节点故障⽽意外丢失,所以被称为

unstable。

unstable中的主要字段。

:entries([]类型):⽤于保存未写⼊Storage中的Entry记录。

:offt(uint64类型):entries中的第⼀条Entry记录的索引值。

:snapshot(ot类型):快照数据,该快照数据也是未写⼊Storage中的。

在unstable中提供了很多与Storage类似的⽅法,在raftLog中,很多⽅法都是先尝试调⽤unstable的相应⽅法,在其失败后(unstable

的⽅法返回(0,fal)即表⽰失败),再尝试调⽤Storage的对应⽅法。

irstlndex()⽅法会尝试获取unstable的第⼀条Entry记录的索引值,astlndex()⽅法会尝试获

取unstable的最后⼀条Entry记录的索引值,如果获取失败则返回(0,fal),erm()⽅法的主要功能是尝试获取指

定Entry记录的Term值,根据条件查找指定的Entry记录的位置。

当s中的Entry记录⼰经被写⼊Storage之后,会调⽤To()⽅法清除entries中对应的Entry记

录,stableTo()⽅法的具体实现如下:

func(u*unstable)stableTo(i,tuint64){

//查找指定Entry记录的Term佳,若查找失败则表⽰对应的Entry不在unstable中,直接返回

gt,ok:=erm(i)

if!ok{

return

}

//ifi

//onlyupdatetheunstableentriesiftermismatchedwith

//anunstableentry.

ifgt==t&&i>={

//指定索引位之前的Entry记录都已经完成持久化,则将其之前的全部Entry记录删除

s=s[i+:]

=i+1

//随着多次追加⽇志和截断⽇志的操作s底层的数组会越来越⼤,shrinkEntriesArray⽅法会在底层数组长度超过实际占⽤的两倍时,对底层数据进⾏缩

EntriesArray()

}

}

func(u*unstable)shrinkEntriesArray(){

//Wereplacethearrayifwe'reusinglessthanhalfofthespacein

//mberisfairlyarbitrary,chonasanattempttobalance

//dprobablybeimproved

//withsomefocudtuning.

constlenMultiple=2

iflen(s)==0{

s=nil

}eliflen(s)*lenMultiple

//重新创建切⽚,复制原有切⽚中的数据,重直entries字段

newEntries:=make([],len(s))

copy(newEntries,s)

s=newEntries

}

}

同理,当ot字段指向的快照被写⼊Storage之后,会调⽤SnapTo()⽅法将snapshot字段清

空.teAndAppend()⽅法的主要功能是向s中追加Entry记录其实现与()⽅法类似也

会涉及截断的场景:

func(u*unstable)truncateAndAppend(ents[]){

//获取第⼀条待追加的Entry记录的索引值

after:=ents[0].Index

switch{

caafter==+uint64(len(s)):

//s

//directlyappend

//若待追加的记录与e⼝tries中的记录正好连续,则可以直接向entries中追加

s=append(s,ents...)

caafter<=:

//直接⽤待追加的Entry记录替换当前的entries字段,并⽀新offt

("replacetheunstableentriesfromindex%d",after)

//Thelogisbeingtruncatedtobeforeourcurrentofft

//portion,sottheofftandreplacetheentries

=after

s=ents

default:

//after在offt~last之间,则after~last之间的Entry记录冲突。这⾥会将offt~after之间的记录保留,抛弃after之后的记录,然后完成追加操作

//()⽅法会检测after是否合法,并返回offt~after的切⽚

//s

//thenappend

("truncatetheunstableentriesbeforeindex%d",after)

s=append([]{},(,after)...)

s=append(s,ents...)

}

}

本文发布于:2023-01-02 15:04:34,感谢您对本站的认可!

本文链接:http://www.wtabcd.cn/fanwen/fan/90/78436.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

上一篇:scheme
下一篇:purpose
标签:entry
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图