typeHandlerstruct {sync.RWMutex//peer to shim grpc serializer. User only in serialSend serialLock sync.Mutex ChatStream ccintf.ChaincodeStream FSM *fsm.FSM ChaincodeID *pb.ChaincodeID ccInstance *sysccprovider.ChaincodeInstance chaincodeSupport *ChaincodeSupport registered bool readyNotify chanbool// Map of tx txid to either invoke tx. Each tx will be// added prior to execute and remove when done execute txCtxs map[string]*transactionContext txidMap map[string]bool// used to do Send after making sure the state transition is complete nextState chan*nextStateInfo}
for {
in = nil
err = nil
nsInfo = nil
if recv {
recv = false
go func() {
var in2 *pb.ChaincodeMessage
in2, err = handler.ChatStream.Recv()
msgAvail <- in2
}()
}
select {
case sendErr := <-errc:
if sendErr != nil {
return sendErr
}
//send was successful, just continue
continue
case in = <-msgAvail:
// Defer the deregistering of the this handler.
if err == io.EOF {
err = errors.Wrapf(err, "received EOF, ending chaincode support stream")
chaincodeLogger.Debugf("%+v", err)
return err
} else if err != nil {
chaincodeLogger.Errorf("Error handling chaincode support stream: %+v", err)
return err
} else if in == nil {
err = errors.New("received nil message, ending chaincode support stream")
chaincodeLogger.Debugf("%+v", err)
return err
}
chaincodeLogger.Debugf("[%s]Received message %s from shim", shorttxid(in.Txid), in.Type.String())
if in.Type.String() == pb.ChaincodeMessage_ERROR.String() {
chaincodeLogger.Errorf("Got error: %s", string(in.Payload))
}
// we can spin off another Recv again
recv = true
if in.Type == pb.ChaincodeMessage_KEEPALIVE {
chaincodeLogger.Debug("Received KEEPALIVE Response")
// Received a keep alive message, we don't do anything with it for now
// and it does not touch the state machine
continue
}
case nsInfo = <-handler.nextState:
in = nsInfo.msg
if in == nil {
err = errors.New("next state nil message, ending chaincode support stream")
chaincodeLogger.Debugf("%+v", err)
return err
}
chaincodeLogger.Debugf("[%s]Move state message %s", shorttxid(in.Txid), in.Type.String())
case <-handler.waitForKeepaliveTimer():
if handler.chaincodeSupport.keepalive <= 0 {
chaincodeLogger.Errorf("Invalid select: keepalive not on (keepalive=%d)", handler.chaincodeSupport.keepalive)
continue
}
//if no error message from serialSend, KEEPALIVE happy, and don't care about error
//(maybe it'll work later)
handler.serialSendAsync(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_KEEPALIVE}, nil)
continue
}
err = handler.handleMessage(in)
if err != nil {
err = errors.WithMessage(err, "error handling message, ending stream")
chaincodeLogger.Errorf("[%s] %+v", shorttxid(in.Txid), err)
return err
}
if nsInfo != nil && nsInfo.sendToCC {
chaincodeLogger.Debugf("[%s]sending state message %s", shorttxid(in.Txid), in.Type.String())
//ready messages are sent sync
if nsInfo.sendSync {
if in.Type.String() != pb.ChaincodeMessage_READY.String() {
panic(fmt.Sprintf("[%s]Sync send can only be for READY state %s\n", shorttxid(in.Txid), in.Type.String()))
}
if err = handler.serialSend(in); err != nil {
return errors.WithMessage(err, fmt.Sprintf("[%s]error sending ready message, ending stream:", shorttxid(in.Txid)))
}
} else {
//if error bail in select
handler.serialSendAsync(in, errc)
}
}
}
首先是利用 select 结构尝试读取各种消息。包括:
case in = <-msgAvail:从 cc 侧读取到请求消息;
case nsInfo = <-handler.nextState:读取切换到下个状态的附加消息。
case <-handler.waitForKeepaliveTimer():定期发出心跳刷新消息。
读取到合法消息后,会分别调用 handler.HandleMessage(in) 处理 cc 消息;以及检查状态切换消息(仅允许消息类型为 READY,意味着此时 cc 在正常运行状态),是否要发送给 cc 侧(sendToCC 为 True)。