Orderer 节点对排序后消息的处理
经过 Kafka 排序后的消息,在网络中已经达成了对顺序的共识,后面可以执行提交动作。Orderer 会不断从 Kafka 获取排序后消息,进行提交处理(包括打包为区块,更新本地账本结构等)。
主要过程
仍以 Kafka 模式为例,Orderer 节点启动后,会为每个账本结构调用 orderer/consensus/kafka.chainImpl 结构体的 processMessagesToBlocks() ([]uint64, error) 方法,持续获取 Kafka 对应分区中的消息并进行处理。该方法内是一个 for 循环,不断从对应 Kafka 分区中提取(Consume)消息,满足分块条件(超时或消息的个数或大小满足预设条件)时,还会发送 TimeToCut(TTC) 消息到 Kakfa 对应分区中。
主要逻辑如下图所示。

主要实现代码如下所示。
其中,除了出错和超时外,最核心过程为处理从 Kakfa 收到的正常消息,主要包括三种类型:
Connect 消息:Producer 启动后发出,刷新与 Kafka 的连接。目前为忽略该消息。
Fabric 交易消息(即 Regular 消息):根据消息内容进行进一步处理,包括普通交易消息和配置消息。
TimeToCut 消息:收到该消息意味着当前可以进行分块。
对于不同消息,分别调用对应方法进行处理。
Fabric 交易消息的处理
对于 Fabric 相关消息(包括普通交易消息和配置消息),具体会调用 chainImpl 结构体的 processRegular(regularMessage *ab.KafkaMessageRegular, receivedOffset int64) error 方法进行处理。
主要过程如下所示:

该方法的核心代码如下:
普通交易消息
普通交易消息,会检查是否合法(offset、配置版本号是否匹配),通过检查则扔给内部的 commitNormalMsg(env) 方法来处理。
commitNormalMsg(env) 将消息扔给本地缓冲,并检查是否需要进行切块:batches, pending := chain.BlockCutter().Ordered(message)。
如果不需要切块,则更新 lastOriginalOffsetProcessed 后返回;否则尝试重置 BatchTimeout 计时器,进行切块处理。
切块处理主要调用 orderer/common/multichannel.BlockWriter 结构体的 CreateNextBlock(messages []*cb.Envelope) *cb.Block、WriteBlock(block *cb.Block, encodedMetadataValue []byte) 两个方法。
CreateNextBlock(messages []*cb.Envelope) *cb.Block 方法基本过程十分简单,创建新的区块,将传入的交易信封结构直接序列化到 block.Data.Data[] 数组中,并创建区块头信息(包括配置序列号、前导区块头 Hash、本区块数据内容的 Hash)。
WriteBlock(block *cb.Block, encodedMetadataValue []byte) 方法则进一步调用 commitBlock(encodedMetadataValue []byte) 方法,填写 ORDERER、SIGNATURES、LAST_CONFIG 三个元数据域的内容。最终,调用 common/ledger/blkstorage/fsblkstorage/blockfileMgr.addBlock(block *common.Block) error 方法将区块写入到本地的账本文件中,并更新索引数据库中内容(包括区块号、区块 Hash、区块所在文件指针、交易的偏移量和区块元数据。
ORDERER 相关的元数据包括:
LastOffsetPersisted:上次消息的偏移量;
LastOriginalOffsetProcessed:本条消息被重复处理时,最新的偏移量;
LastResubmittedConfigOffset:上次提交的配置消息的偏移量。
配置交易消息
首先会检查消息中配置版本号是否跟当前链上的配置版本号一致。
如果不一致,则会更新后生成新的配置信封消息,扔回到后端的共识模块(如 Kafka),并且阻塞新的 Broadcast 消息直到重新提交的消息得到处理。代码片段如下:
如果版本一致,则调用内部的 commitConfigMsg(env) 方法根据信封结构来产生区块,代码如下:
由于每个配置消息都需要单独生成区块。因此,如果之前已经收到了一些普通交易消息,会先把这些消息生成区块。
接下来,调用 orderer/common/multichannel 模块中 BlockWriter 结构体的 CreateNextBlock(messages []*cb.Envelope) *cb.Block 方法和 WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte) 方法来分别打包区块和更新账本结构。
其中,WriteConfigBlock() 方法执行解析消息和处理的主要逻辑(包括创建新的应用通道和更新已有通道的配置两种),核心代码如下所示。
创建新的应用通道
创建新的本地账本结构并启动对应的轮询消息过程,实际调用 orderer/common/multichannel/Registrar.newChain(configtx *cb.Envelope) 方法。
该方法首先根据当前给定的配置来创建账本结构,之后初始化该账本对应的 ChainSupport 数据结构,最后调用 cs.start() 启动对应通道的消息处理服务。
更新已有通道配置
首先,检查配置信息是否合法,实际调用 common/configtx/validator.ValidatorImpl.Validate(configEnv *cb.ConfigEnvelope) error 方法,主要核对:
版本号需要比现在的递增 1;
发起人拥有对应更新的权限;
配置更新交易结构正确。
Last updated
Was this helpful?