Deliver,意味着客户端通过 gRPC 接口从 Ordering 服务获取数据(例如指定区块的数据)。
Orderer 节点收到请求消息,会首先交给 orderer.common.server
包中 server 结构体的 Deliver(srv ab.AtomicBroadcast_DeliverServer) error
方法处理。该方法进一步调用 orderer.common.deliver
包中 deliverServer 结构的 Handle(srv ab.AtomicBroadcast_DeliverServer) error
方法进行处理。
deliverServer 结构体十分重要,完成对 Deliver 请求的处理过程。
type deliverServer struct {
sm SupportManager
}
func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error
整体过程
整体处理过程如下图所示。
Handle(srv ab.AtomicBroadcast_DeliverServer) error
方法会开启一个循环来从 srv 中不断读取请求消息并进行处理,直到结束。
核心代码如下所示,包括提取消息和对消息进行处理两个步骤。
for {
envelope, err := srv.Recv() // 从请求中提取一个 Envelope 消息
ds.deliverBlocks(srv, envelope) // 对消息进行处理并答复,核心过程
}
可见,对单个请求的处理都在 deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, envelope *cb.Envelope)
方法中。该方法的处理过程包括解析消息、检查合法性、发送区块以及返回响应四个步骤。
下面具体对其进行具体分析。
解析消息
首先,从请求的 Envelope 结构中提取载荷(Payload),进一步从载荷中提取通道头部信息。利用通道头部信息获取对应的本地链结构,并获取当前最新的配置序列号。
// 提取载荷
payload, err := utils.UnmarshalPayload(envelope.Payload)
// 提取通道头
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
// 获取链结构,映射到 orderer.common.multichannel 包中 Registrar 结构体中对应方法
chain, ok := ds.sm.GetChain(chdr.ChannelId)
// 获取当前配置序列号
lastConfigSequence := chain.Sequence()
检查合法性
包括对权限和 seekInfo 数据进行检查。
首先,检查请求方是否对通道拥有读权限。
sf := msgprocessor.NewSigFilter(policies.ChannelReaders, chain.PolicyManager())
if err := sf.Apply(envelope); err != nil {
logger.Warningf("[channel: %s] Received unauthorized deliver request from %s: %s", chdr.ChannelId, addr, err)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
}
接下来,从 Envelope 结构的 payload.data 域中解析出 seekInfo 结构,并检查其合法性。
proto.Unmarshal(payload.Data, seekInfo)
chain.Reader().Iterator(seekInfo.Start)
// 检查 seekInfo 的
cursor, number := chain.Reader().Iterator(seekInfo.Start)
switch stop := seekInfo.Stop.Type.(type) {
case *ab.SeekPosition_Oldest: // 截止到最早的区块
stopNum = number
case *ab.SeekPosition_Newest: // 截止到最新的区块
stopNum = chain.Reader().Height() - 1
case *ab.SeekPosition_Specified: // 截止到特定的区块
stopNum = stop.Specified.Number
if stopNum < number {
logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
}
发送区块
在指定的起始和截止范围内,逐个从本地账本读取区块,并发送对应的区块数据,
核心代码如下所示。
for {
block, status := cursor.Next() // 获取区块
sendBlockReply(srv, block) // 发送区块
if stopNum == block.Header.Number {
break
}
}
返回响应
如果处理成功,则返回成功响应消息。
sendStatusReply(srv, cb.Status_SUCCESS)