orderer 启动后的 main 方法会调用到这里的 Main()
方法。
核心代码非常简单。
Copy // Main is the entry point of orderer process
func Main () {
fullCmd := kingpin. MustParse (app. Parse (os.Args[ 1 :]))
// "version" command
if fullCmd == version. FullCommand () {
fmt. Println (metadata. GetVersionInfo ())
return
}
conf := config. Load ()
initializeLoggingLevel (conf)
initializeLocalMsp (conf)
Start (fullCmd, conf)
}
整体的调用流程如下图所示。
其中:
config.Load():从本地配置文件和环境变量中读取配置信息,构建配置树结构。
initializeLoggingLevel(conf):配置日志级别。
initializeLocalMsp(conf):配置 MSP 结构。
initializeLocalMsp
根据 orderer.yaml 配置中路径,读取本地的 msp 数据。
Copy func initializeLocalMsp (conf * config . TopLevel ) {
// Load local MSP
err := mspmgmt. LoadLocalMsp (conf.General.LocalMSPDir, conf.General.BCCSP, conf.General.LocalMSPID)
if err != nil { // Handle errors reading the config file
logger. Fatal ( "Failed to initialize local MSP:" , err)
}
}
Start()
Start() 完成了 Orderer 节点主要的启动过程。
首先,利用 localmsp,创建签名者结构。
接下来是初始化各个账本结构。如果本地不存在旧的账本文件时,需要初始化系统通道,判断是否需要从外部 genesis 区块文件读入数据,还是根据给定配置初始化 genesis 区块结构。
接下来,新建 grpc server,其中包括 Broadcast() 和 Deliver() 两个服务接口。
最后,启动 grpc 服务。
Copy func Start (cmd string , conf * config . TopLevel ) {
signer := localmsp. NewSigner ()
manager := initializeMultichannelRegistrar (conf, signer)
server := NewServer (manager, signer, & conf.Debug)
switch cmd {
case start. FullCommand (): // "start" command
logger. Infof ( "Starting %s " , metadata. GetVersionInfo ())
initializeProfilingService (conf)
grpcServer := initializeGrpcServer (conf)
ab. RegisterAtomicBroadcastServer (grpcServer. Server (), server)
logger. Info ( "Beginning to serve requests" )
grpcServer. Start ()
case benchmark. FullCommand (): // "benchmark" command
logger. Info ( "Starting orderer in benchmark mode" )
benchmarkServer := performance. GetBenchmarkServer ()
benchmarkServer. RegisterService (server)
benchmarkServer. Start ()
}
}
initializeGrpcServer
创建 grpc 的服务器。
Copy grpcServer, err := comm. NewGRPCServerFromListener (lis, secureConfig)
if err != nil {
logger. Fatal ( "Failed to return new GRPC server:" , err)
}
initializeMultichannelRegistrar
这一部分是十分核心的初始化步骤。
初始化一个 multichannel.Registrar 结构,是整个服务的访问和控制核心结构。
Copy type Registrar struct {
chains map [ string ] * ChainSupport
consenters map [ string ] consensus . Consenter
ledgerFactory blockledger . Factory
signer crypto . LocalSigner
systemChannelID string
systemChannel * ChainSupport
templator msgprocessor . ChannelConfigTemplator
callbacks [] func (bundle * channelconfig . Bundle )
}
通过 createLedgerFactory() 初始化账本结构,包括 file、json、ram 等类型。file 的话会在本地指定目录(/var/production/chains)下创建账本结构。账本所关联的链名称会自动命名为 chain_FILENAME。之后通过指定初始区块,或自动生成来初始化区块链结构。至此,账本结构初始化完成。
接下来,初始化 consenter 部分,初始化 solo、kafka 两种类型。
账本、consenter,再加上传入的签名者结构,通过 multichannel.NewRegistrar() 方法构造一个 multichannel.Registrar 结构,负责处理消息。
multichannel.NewRegistrar() 方法会查看本地已有的链结构文件(例如重启后),并挨个执行如下过程:
如果配置中包括联盟信息,说明该链结构是系统链,调用。
如果配置中不包括联盟信息,说明该链结构是应用链,同样调用 newChainSupport() 方法生成链支持结构,并通过链的 start() 方法启动。
NewServer
新建一个 Server 结构,包括一个 broadcast 的处理句柄,以及一个 deliver 的处理句柄。
Copy type server struct {
bh broadcast . Handler
dh deliver . Handler
debug * localconfig . Debug
* multichannel . Registrar
}
NewServer 分别初始化这两个句柄,挂载上前面初始化的 multichannel.Registrar 结构。broadcast 句柄还需要初始化一个配置更新的处理器,负责处理 CONFIG_UPDATE 交易。
Copy func NewServer(r *multichannel.Registrar, _ crypto.LocalSigner, debug *localconfig.Debug, timeWindow time.Duration, mutualTLS bool) ab.AtomicBroadcastServer {
s := & server {
dh: deliver. NewHandlerImpl ( deliverSupport {Registrar: r}, timeWindow, mutualTLS),
bh: broadcast. NewHandlerImpl ( broadcastSupport {Registrar: r}),
debug: debug,
Registrar: r,
}
return s
}