Orderer 节点 Deliver gRPC 调用处理
type deliverServer struct {
sm SupportManager
}
func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error整体过程

解析消息
检查合法性
发送区块
返回响应
Last updated
type deliverServer struct {
sm SupportManager
}
func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error
Last updated
for {
envelope, err := srv.Recv() // 从请求中提取一个 Envelope 消息
ds.deliverBlocks(srv, envelope) // 对消息进行处理并答复,核心过程
}// 提取载荷
payload, err := utils.UnmarshalPayload(envelope.Payload)
// 提取通道头
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
// 获取链结构,映射到 orderer.common.multichannel 包中 Registrar 结构体中对应方法
chain, ok := ds.sm.GetChain(chdr.ChannelId)
// 获取当前配置序列号
lastConfigSequence := chain.Sequence()sf := msgprocessor.NewSigFilter(policies.ChannelReaders, chain.PolicyManager())
if err := sf.Apply(envelope); err != nil {
logger.Warningf("[channel: %s] Received unauthorized deliver request from %s: %s", chdr.ChannelId, addr, err)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
}proto.Unmarshal(payload.Data, seekInfo)
chain.Reader().Iterator(seekInfo.Start)
// 检查 seekInfo 的
cursor, number := chain.Reader().Iterator(seekInfo.Start)
switch stop := seekInfo.Stop.Type.(type) {
case *ab.SeekPosition_Oldest: // 截止到最早的区块
stopNum = number
case *ab.SeekPosition_Newest: // 截止到最新的区块
stopNum = chain.Reader().Height() - 1
case *ab.SeekPosition_Specified: // 截止到特定的区块
stopNum = stop.Specified.Number
if stopNum < number {
logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum)
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
}for {
block, status := cursor.Next() // 获取区块
sendBlockReply(srv, block) // 发送区块
if stopNum == block.Header.Number {
break
}
}sendStatusReply(srv, cb.Status_SUCCESS)