handler.go

主要提供:

  • Handler 结构体:对所关联的链码容器进行相应,内部有一个状态机。

  • HandleChaincodeStream() 方法:对外提供初始化的 Handler 结构体,并进入循环,不断接收来自链码容器的消息。

Handler 结构体

Peer 侧会为每一个 chaincode 维护一个 Handler 结构,具体响应所绑定的 chaincode 容器过来的各种消息,通过内部状态机进行处理。

Handler 结构实现了 MessageHandler 接口,主要提供一个 HandleMessage(msg *pb.ChaincodeMessage) error 方法,作为处理各个消息的入口方法。

type Handler struct {
    sync.RWMutex
    //peer to shim grpc serializer. User only in serialSend
    serialLock  sync.Mutex
    ChatStream  ccintf.ChaincodeStream
    FSM         *fsm.FSM
    ChaincodeID *pb.ChaincodeID
    ccInstance  *sysccprovider.ChaincodeInstance

    chaincodeSupport *ChaincodeSupport
    registered       bool
    readyNotify      chan bool
    // Map of tx txid to either invoke tx. Each tx will be
    // added prior to execute and remove when done execute
    txCtxs map[string]*transactionContext

    txidMap map[string]bool

    // used to do Send after making sure the state transition is complete
    nextState chan *nextStateInfo
}

chaincode 容器启动后,会调用到服务端的 Register() 方法,该方法进一步调用到 HandleChaincodeStream(),创建 Handler 结构体,进入接收消息循环。

newChaincodeSupportHandler 方法中会初始化 FSM。

之后,调用 handler.processStream() 进入对来自 chaincode 容器消息处理的主循环。

handler.processStream() 主消息循环

Peer 侧维护一个到 cc 的双向流,循环处理消息。主要在 func (handler *Handler) processStream() error 方法中(cc 到 peer 注册后会自动调用到该方法)。

主循环过程代码如下:

首先是利用 select 结构尝试读取各种消息。包括:

  • case in = <-msgAvail:从 cc 侧读取到请求消息;

  • case nsInfo = <-handler.nextState:读取切换到下个状态的附加消息。

  • case <-handler.waitForKeepaliveTimer():定期发出心跳刷新消息。

读取到合法消息后,会分别调用 handler.HandleMessage(in) 处理 cc 消息;以及检查状态切换消息(仅允许消息类型为 READY,意味着此时 cc 在正常运行状态),是否要发送给 cc 侧(sendToCC 为 True)。

FSM

定义的状态、事件主要在 func newChaincodeSupportHandler(chaincodeSupport *ChaincodeSupport, peerChatStream ccintf.ChaincodeStream) *Handler 方法中。

一般对应 GET_STATE、GET_STATE_BY_RANGE 等简单事件,调用 handleXXX 方法。

PUT_STATE、DEL_STATE、INVOKE_CHAINCODE 三个事件,则会触发 enterBusyState() 方法。

Last updated

Was this helpful?