Hyperledger 源码分析之 Fabric
  • 前言
  • 修订记录
  • 如何贡献
  • 整体结构
  • 核心过程
    • 链码容器启动
    • Peer 节点启动过程
    • Peer 节点提案背书过程
    • Peer 节点链码调用过程
    • Peer 节点提交交易过程
    • Peer 节点 Gossip 过程
    • 排序服务核心原理和工作过程
    • Orderer 节点启动过程
    • Orderer 节点 Broadcast gRPC 调用处理
    • Orderer 节点 Deliver gRPC 调用处理
    • Orderer 节点对排序后消息的处理
    • 客户端执行创建通道
    • 客户端执行加入通道
    • 客户端更新锚节点配置
    • 客户端执行链码安装
    • 客户端执行链码实例化
    • 客户端执行链码调用
    • 客户端执行链码查询
  • bccsp
    • factory
      • factory.go
      • nopkcs11.go
      • opts.go
      • pkcs11.go
      • pkcs11factory.go
      • pluginfactory.go
      • swfactory.go
    • idemix
      • bridge
        • credential.go
        • credrequest.go
        • issuer.go
        • math.go
        • nymsignaturescheme.go
        • rand.go
        • revocation.go
        • signaturescheme.go
        • user.go
      • handlers
        • cred.go
        • idemix.go
        • issuer.go
        • nym.go
        • nymsigner.go
        • revocation.go
        • signer.go
        • user.go
      • bccsp.go
    • pkcs11
      • conf.go
      • ecdsa.go
      • ecdsakey.go
      • impl.go
      • pkcs11.go
    • signer
      • signer.go
    • sw
      • aes.go
      • aeskey.go
      • conf.go
      • dummyks.go
      • ecdsa.go
      • ecdsakey.go
      • fileks.go
      • hash.go
      • impl.go
      • inmemoryks.go
      • internals.go
      • keyderiv.go
      • keygen.go
      • keyimport.go
      • new.go
      • rsa.go
      • rsakey.go
    • utils
      • ecdsa.go
      • errs.go
      • io.go
      • keys.go
      • slice.go
      • x509.go
    • aesopts.go
    • bccsp.go
    • ecdsaopts.go
    • hashopts.go
    • idemixerrs.go
    • idemixopts.go
    • keystore.go
    • opts.go
    • rsaopts.go
  • cmd
    • common
      • comm
        • client.go
        • config.go
      • signer
        • signer.go
      • cli.go
      • config.go
    • discover
      • main.go
  • common
    • attrmgr
      • attrmgr.go
    • capabilities
      • application.go
      • capabilities.go
      • channel.go
      • orderer.go
    • cauthdsl
      • cauthdsl.go
      • cauthdsl_builder.go
      • policy.go
      • policyparser.go
    • chaincode
      • metadata.go
    • channelconfig
      • acls.go
      • api.go
      • application.go
      • applicationorg.go
      • bundle.go
      • bundlesource.go
      • channel.go
      • consortium.go
      • consortiums.go
      • logsanitychecks.go
      • msp.go
      • orderer.go
      • organization.go
      • standardvalues.go
      • util.go
    • config
      • api.go
    • configtx
      • compare.go
      • configmap.go
      • configtx.go
      • update.go
      • util.go
      • validator.go
    • crypto
      • tlsgen
        • ca.go
        • key.go
      • expiration.go
      • random.go
      • signer.go
    • deliver
      • acl.go
      • deliver.go
      • metrics.go
    • diag
      • goroutine.go
    • errors
      • errors.go
    • flogging
      • fabenc
        • color.go
        • encoder.go
        • formatter.go
      • httpadmin
        • fakes
        • spec.go
      • metrics
        • observer.go
      • core.go
      • global.go
      • legacy.go
      • levels.go
      • loggerlevels.go
      • logging.go
      • zap.go
    • genesis
      • genesis.go
    • graph
      • choose.go
      • graph.go
      • perm.go
      • tree.go
    • grpclogging
      • fakes
        • echo_service.go
        • leveler.go
      • context.go
      • fields.go
      • server.go
    • grpcmetrics
      • fakes
        • echo_service.go
      • interceptor.go
      • metrics.go
    • ledger
      • blkstorage
        • fsblkstorage
        • blockstorage.go
      • blockledger
        • file
        • json
        • ram
        • ledger.go
        • util.go
      • util
        • leveldbhelper
        • ioutil.go
        • protobuf_util.go
        • util.go
      • ledger_interface.go
    • localmsp
      • signer.go
    • metadata
      • metadata.go
    • metrics
      • cmd
        • gendoc
      • disabled
        • provider.go
      • gendoc
        • options.go
        • table.go
      • internal
        • namer
      • metricsfakes
        • counter.go
        • gauge.go
        • histogram.go
        • provider.go
      • prometheus
        • provider.go
      • statsd
        • goruntime
        • provider.go
      • provider.go
    • policies
      • inquire
        • compare.go
        • inquire.go
        • merge.go
      • implicitmeta.go
      • implicitmeta_util.go
      • implicitmetaparser.go
      • policy.go
      • util.go
    • semaphore
      • semaphore.go
    • tools
      • configtxgen
        • encoder
        • localconfig
        • metadata
        • main.go
      • configtxlator
        • integration
        • metadata
        • rest
        • sanitycheck
        • update
        • main.go
      • cryptogen
        • ca
        • csp
        • metadata
        • msp
        • main.go
      • idemixgen
        • idemixca
        • metadata
        • idemixgen.go
      • protolator
        • integration
        • api.go
        • dynamic.go
        • json.go
        • nested.go
        • statically_opaque.go
        • variably_opaque.go
    • util
      • net.go
      • utils.go
    • viperutil
      • config_util.go
  • core
    • aclmgmt
      • resources
        • resources.go
      • aclmgmt.go
      • aclmgmtimpl.go
      • defaultaclprovider.go
      • resourceprovider.go
    • admin
      • admin.go
      • validate.go
    • cclifecycle
      • lifecycle.go
      • subscription.go
      • util.go
    • chaincode
      • accesscontrol
        • access.go
        • interceptor.go
        • mapper.go
      • lib
        • cid
      • lifecycle
        • lifecycle.go
        • scc.go
        • serializer.go
      • persistence
        • chaincode_package.go
        • package_provider.go
        • persistence.go
      • platforms
        • car
        • ccmetadata
        • golang
        • java
        • node
        • util
        • platforms.go
      • shim
        • ext
        • chaincode.go
        • handler.go
        • inprocstream.go
        • interfaces.go
        • mockstub.go
        • response.go
      • active_transactions.go
      • ccproviderimpl.go
      • chaincode_support.go
      • chaincodetest.yaml
      • config.go
      • container_runtime.go
      • handler.go
      • handler_registry.go
      • metrics.go
      • pending_query_result.go
      • query_response_generator.go
      • runtime_launcher.go
      • transaction_context.go
      • transaction_contexts.go
    • comm
      • client.go
      • config.go
      • connection.go
      • creds.go
      • healthcheck.go
      • metrics.go
      • producer.go
      • server.go
      • serverstatshandler.go
      • throttle.go
      • util.go
    • committer
      • txvalidator
        • plugin
        • v14
        • v20
        • router.go
      • committer.go
      • committer_impl.go
    • common
      • ccpackage
        • ccpackage.go
      • ccprovider
        • cc_info_provider.go
        • cc_statedb_artifacts_provider.go
        • ccinfocache.go
        • ccprovider.go
        • cdspackage.go
        • common.go
        • sigcdspackage.go
      • privdata
        • collection.go
        • membershipinfo.go
        • simplecollection.go
        • store.go
        • util.go
      • sysccprovider
        • sysccprovider.go
      • validation
        • statebased
        • msgvalidation.go
    • config
      • config.go
    • container
      • ccintf
        • ccintf.go
      • dockercontroller
        • dockercontroller.go
        • metrics.go
      • inproccontroller
        • inproccontroller.go
        • inprocstream.go
      • util
        • dockerutil.go
        • writer.go
      • controller.go
    • deliverservice
      • blocksprovider
        • blocksprovider.go
      • client.go
      • deliveryclient.go
      • requester.go
    • dispatcher
      • dispatcher.go
      • protobuf.go
    • endorser
      • endorser.go
      • metrics.go
      • plugin_endorser.go
      • pvtrwset_assembler.go
      • state.go
      • support.go
    • handlers
      • auth
        • filter
        • plugin
        • auth.go
      • decoration
        • decorator
        • plugin
        • decoration.go
      • endorsement
        • api
        • builtin
        • plugin
      • library
        • library.go
        • registry.go
      • validation
        • api
        • builtin
        • token
    • ledger
      • cceventmgmt
        • defs.go
        • lsccstate_listener.go
        • mgr.go
      • confighistory
        • db_helper.go
        • mgr.go
      • customtx
        • custom_tx_processor.go
        • interface.go
        • test_export.go
      • kvledger
        • bookkeeping
        • history
        • txmgmt
        • coll_elg_notifier.go
        • hashcheck_pvtdata.go
        • kv_ledger.go
        • kv_ledger_provider.go
        • metrics.go
        • recovery.go
      • ledgerconfig
        • ledger_config.go
      • ledgermgmt
        • ledger_mgmt.go
        • ledger_mgmt_test_exports.go
      • ledgerstorage
        • store.go
      • pvtdatapolicy
        • btlpolicy.go
      • pvtdatastorage
        • helper.go
        • kv_encoding.go
        • persistent_msgs.pb.go
        • persistent_msgs.proto
        • persistent_msgs_helper.go
        • store.go
        • store_impl.go
        • test_exports.go
        • v11.go
      • util
        • couchdb
        • txvalidationflags.go
        • uint64_encoding.go
        • util.go
      • ledger_interface.go
    • middleware
      • fakes
        • http_handler.go
      • chain.go
      • request_id.go
      • require_cert.go
    • operations
      • fakes
        • healthchecker.go
        • logger.go
      • metrics.go
      • system.go
      • tls.go
    • peer
      • config.go
      • configtx_processor.go
      • configtx_util.go
      • deliverevents.go
      • mock_helpers.go
      • peer.go
      • peer_impl.go
      • support.go
    • policy
      • policy.go
    • policyprovider
      • provider.go
    • scc
      • cscc
        • configure.go
      • lscc
        • deployedcc_infoprovider.go
        • errors.go
        • lscc.go
        • support.go
      • qscc
        • query.go
      • importsysccs.go
      • loadsysccs.go
      • register.go
      • register_pluginsenabled.go
      • sccproviderimpl.go
      • sysccapi.go
    • transientstore
      • store.go
      • store_helper.go
      • test_exports.go
  • discovery
    • client
      • api.go
      • client.go
      • selection.go
      • signer.go
    • cmd
      • cmd.go
      • config.go
      • endorsers.go
      • peers.go
      • stub.go
    • endorsement
      • collection.go
      • endorsement.go
    • support
      • acl
        • support.go
      • chaincode
        • support.go
      • config
        • support.go
      • gossip
        • support.go
      • support.go
    • api.go
    • authcache.go
    • service.go
  • docs
    • custom_theme
      • searchbox.html
    • source
      • Gerrit
      • Style-guides
      • _static
        • css
        • images
      • _templates
        • footer.html
        • layout.html
      • commands
      • dev-setup
        • headers.txt
      • developapps
        • diagrams.pptx
      • diagrams
        • diagrams.pptx
      • glossary
        • diagrams.pptx
      • identity
      • images
        • SideDBTutorialImages.pptx
      • ledger
      • membership
      • network
      • peers
      • private-data
        • PrivateDataConceptImages.pptx
      • tutorial
        • diagrams.pptx
      • DCO1.1.txt
      • conf.py
      • mdtorst.sh
      • metrics_reference.rst.tmpl
      • requirements.txt
    • wrappers
    • Makefile
    • requirements.txt
  • examples
    • chaincode
      • go
        • enccc_example
        • eventsender
        • example01
        • example02
        • example03
        • example04
        • example05
        • invokereturnsvalue
        • map
        • marbles02
        • passthru
        • sleeper
    • cluster
      • compose
        • compose-up.sh.in
        • configure.sh.in
        • docker-compose.yaml.in
      • config
        • configtx.yaml
        • core.yaml
        • cryptogen.yaml
        • fabric-ca-server-config.yaml
        • fabric-tlsca-server-config.yaml
        • orderer.yaml
      • Makefile
      • usage.txt
    • configtxupdate
      • bootstrap_batchsize
        • script.sh
      • common_scripts
        • common.sh
      • reconfig_batchsize
        • script.sh
      • reconfig_membership
        • script.sh
    • events
      • eventsclient
        • eventsclient.go
    • plugins
      • bccsp
        • plugin.go
      • scc
        • plugin.go
  • gossip
    • api
      • channel.go
      • crypto.go
      • subchannel.go
    • comm
      • ack.go
      • comm.go
      • comm_impl.go
      • conn.go
      • crypto.go
      • demux.go
      • msg.go
    • common
      • cert.go
      • common.go
    • discovery
      • discovery.go
      • discovery_impl.go
    • election
      • adapter.go
      • election.go
    • filter
      • filter.go
    • gossip
      • algo
        • pull.go
      • channel
        • channel.go
      • msgstore
        • msgs.go
      • pull
        • pullstore.go
      • batcher.go
      • certstore.go
      • chanstate.go
      • gossip.go
      • gossip_impl.go
    • identity
      • identity.go
    • integration
      • integration.go
    • privdata
      • common
        • common.go
      • coordinator.go
      • dataretriever.go
      • distributor.go
      • pull.go
      • reconcile.go
      • util.go
    • service
      • eventer.go
      • gossip_service.go
    • state
      • payloads_buffer.go
      • state.go
    • util
      • logging.go
      • misc.go
      • msgs.go
      • privdata.go
      • pubsub.go
  • gotools
  • idemix
    • credential.go
    • credrequest.go
    • idemix.pb.go
    • issuerkey.go
    • nonrevocation-prover.go
    • nonrevocation-verifier.go
    • nymsignature.go
    • revocation_authority.go
    • signature.go
    • util.go
    • weak-bb.go
  • images
    • ccenv
      • Dockerfile
      • main.go
    • orderer
      • Dockerfile
    • peer
      • Dockerfile
    • tools
      • Dockerfile
  • integration
    • chaincode
      • keylevelep
        • cmd
        • chaincode.go
      • marbles_private
        • cmd
        • chaincode.go
      • simple
        • cmd
        • chaincode.go
    • discovery
    • e2e
    • helpers
      • images.go
    • nwo
      • commands
        • configtxgen.go
        • cryptogen.go
        • discover.go
        • peer.go
      • fabricconfig
        • core.go
        • orderer.go
      • command.go
      • components.go
      • config.go
      • configblock.go
      • configtx_template.go
      • core_template.go
      • crypto_template.go
      • deploy.go
      • discover.go
      • network.go
      • orderer_template.go
      • solo.yaml
      • standard_networks.go
      • templates.go
    • pluggable
      • plugin_activation.go
    • pvtdata
    • runner
      • couchdb.go
      • defaults.go
      • kafka.go
      • zookeeper.go
    • sbe
    • token
  • msp
    • cache
      • cache.go
      • second_chance.go
    • mgmt
      • deserializer.go
      • mgmt.go
      • principal.go
    • cert.go
    • configbuilder.go
    • factory.go
    • idemix_roles.go
    • idemixmsp.go
    • identities.go
    • msp.go
    • mspimpl.go
    • mspimplsetup.go
    • mspimplvalidate.go
    • mspmgrimpl.go
  • orderer
    • common
      • blockcutter
        • blockcutter.go
        • metrics.go
      • bootstrap
        • file
        • bootstrap.go
      • broadcast
        • broadcast.go
        • metrics.go
      • cluster
        • comm.go
        • connections.go
        • deliver.go
        • replication.go
        • rpc.go
        • service.go
        • util.go
      • localconfig
        • config.go
      • metadata
        • metadata.go
      • msgprocessor
        • expiration.go
        • filter.go
        • msgprocessor.go
        • sigfilter.go
        • sizefilter.go
        • standardchannel.go
        • systemchannel.go
        • systemchannelfilter.go
      • multichannel
        • blockwriter.go
        • chainsupport.go
        • registrar.go
      • server
        • docker-compose.yml
        • main.go
        • onboarding.go
        • sched.go
        • server.go
        • signals.go
        • signals_windows.go
        • util.go
    • consensus
      • etcdraft
        • blockcreator.go
        • chain.go
        • consenter.go
        • dispatcher.go
        • node.go
        • storage.go
        • util.go
      • inactive
        • inactive_chain.go
      • kafka
        • chain.go
        • channel.go
        • config.go
        • consenter.go
        • logger.go
        • metrics.go
        • partitioner.go
        • retry.go
      • solo
        • consensus.go
      • consensus.go
    • sample_clients
      • broadcast_config
        • client.go
        • newchain.go
      • broadcast_msg
        • client.go
      • deliver_stdout
        • client.go
    • main.go
  • peer
    • chaincode
      • api
        • api.go
      • chaincode.go
      • common.go
      • install.go
      • instantiate.go
      • invoke.go
      • list.go
      • package.go
      • query.go
      • signpackage.go
      • upgrade.go
    • channel
      • channel.go
      • create.go
      • fetch.go
      • getinfo.go
      • join.go
      • list.go
      • signconfigtx.go
      • update.go
    • clilogging
      • common.go
      • getlevel.go
      • getlogspec.go
      • logging.go
      • revertlevels.go
      • setlevel.go
      • setlogspec.go
    • common
      • api
        • api.go
      • broadcastclient.go
      • common.go
      • deliverclient.go
      • mockclient.go
      • networkconfig.go
      • ordererclient.go
      • ordererenv.go
      • peerclient.go
      • peerdeliverclient.go
    • gossip
      • mcs.go
      • sa.go
    • node
      • node.go
      • signals.go
      • signals_windows.go
      • start.go
      • status.go
    • version
      • version.go
    • main.go
  • protos
    • common
      • block.go
      • collection.pb.go
      • collection.proto
      • common.go
      • common.pb.go
      • common.proto
      • configtx.go
      • configtx.pb.go
      • configtx.proto
      • configuration.go
      • configuration.pb.go
      • configuration.proto
      • ledger.pb.go
      • ledger.proto
      • policies.go
      • policies.pb.go
      • policies.proto
      • signed_data.go
    • discovery
      • extensions.go
      • protocol.pb.go
      • protocol.proto
    • gossip
      • extensions.go
      • message.pb.go
      • message.proto
    • idemix
      • idemix.proto
    • ledger
      • queryresult
        • kv_query_result.pb.go
        • kv_query_result.proto
      • rwset
        • kvrwset
        • rwset.go
        • rwset.pb.go
        • rwset.proto
    • msp
      • identities.pb.go
      • identities.proto
      • msp_config.go
      • msp_config.pb.go
      • msp_config.proto
      • msp_principal.go
      • msp_principal.pb.go
      • msp_principal.proto
    • orderer
      • etcdraft
        • configuration.go
        • configuration.pb.go
        • configuration.proto
      • ab.pb.go
      • ab.proto
      • cluster.pb.go
      • cluster.proto
      • configuration.go
      • configuration.pb.go
      • configuration.proto
      • kafka.pb.go
      • kafka.proto
    • peer
      • lifecycle
        • db.pb.go
        • db.proto
        • lifecycle.pb.go
        • lifecycle.proto
      • admin.pb.go
      • admin.proto
      • chaincode.go
      • chaincode.pb.go
      • chaincode.proto
      • chaincode_event.pb.go
      • chaincode_event.proto
      • chaincode_shim.pb.go
      • chaincode_shim.proto
      • chaincodeunmarshall.go
      • configuration.go
      • configuration.pb.go
      • configuration.proto
      • events.pb.go
      • events.proto
      • peer.pb.go
      • peer.proto
      • proposal.go
      • proposal.pb.go
      • proposal.proto
      • proposal_response.go
      • proposal_response.pb.go
      • proposal_response.proto
      • query.pb.go
      • query.proto
      • resources.pb.go
      • resources.proto
      • signed_cc_dep_spec.pb.go
      • signed_cc_dep_spec.proto
      • transaction.go
      • transaction.pb.go
      • transaction.proto
    • token
      • expectations.pb.go
      • expectations.proto
      • prover.pb.go
      • prover.proto
      • transaction.pb.go
      • transaction.proto
    • transientstore
      • transientstore.pb.go
      • transientstore.proto
    • utils
      • blockutils.go
      • chaincodeutils.go
      • commonutils.go
      • proputils.go
      • txutils.go
  • release
    • templates
      • get-docker-images.in
  • release_notes
    • v1.0.0-rc1.txt
    • v1.0.0.txt
    • v1.0.1.txt
    • v1.0.2.txt
    • v1.0.3.txt
    • v1.1.0-alpha.txt
    • v1.1.0-preview.txt
    • v1.1.0-rc1.txt
    • v1.2.0-rc1.txt
    • v1.3.0-rc1.txt
    • v1.4.0.txt
  • sampleconfig
    • msp
      • admincerts
        • admincert.pem
      • cacerts
        • cacert.pem
      • keystore
        • key.pem
      • signcerts
        • peer.pem
      • tlscacerts
        • tlsroot.pem
      • tlsintermediatecerts
        • tlsintermediate.pem
      • config.yaml
    • configtx.yaml
    • core.yaml
    • orderer.yaml
  • scripts
    • bootstrap.sh
    • changelog.sh
    • check_deps.sh
    • check_license.sh
    • check_spelling.sh
    • check_trailingspaces.sh
    • compile_protos.sh
    • generateHelpDocs.sh
    • goListFiles.sh
    • golinter.sh
    • metrics_doc.sh
    • multiarch.sh
    • pull_build_artifacts.sh
    • run-integration-tests.sh
    • run-unit-tests.sh
  • token
    • client
      • client.go
      • config.go
      • deliver_client.go
      • grpc.go
      • orderer_client.go
      • prover.go
      • tx_submitter.go
    • identity
      • identity.go
    • ledger
      • ledger.go
    • server
      • accesscontrol.go
      • capability_checker.go
      • ledgermanager.go
      • manager.go
      • marshal.go
      • msp.go
      • prover.go
      • tms.go
    • tms
      • manager
        • manager.go
        • policy.go
      • plain
        • issuer.go
        • ledger.go
        • manager.go
        • pool.go
        • transactor.go
        • verifier.go
      • transactiondata.go
    • transaction
      • marshalling.go
      • processor.go
      • tms.go
    • identity.go
  • vagrant
    • Vagrantfile
    • docker.sh
    • essentials.sh
    • golang.sh
    • softhsm.sh
    • srcdir.sh
    • user.sh
  • Gopkg.lock
  • Gopkg.toml
  • LICENSE
  • Makefile
  • ci.properties
  • docker-env.mk
  • fabric.sublime-project
  • fabric.sublime-workspace
  • gotools.mk
  • settings.gradle
  • tox.ini
Powered by GitBook
On this page
  • 整体过程
  • 解析消息
  • 处理普通交易消息
  • 消息格式检查
  • 入队列操作
  • 处理配置交易消息
  • 合并配置更新
  • 入队列操作
  • 返回响应

Was this helpful?

  1. 核心过程

Orderer 节点 Broadcast gRPC 调用处理

PreviousOrderer 节点启动过程NextOrderer 节点 Deliver gRPC 调用处理

Last updated 5 years ago

Was this helpful?

Broadcast,意味着客户端将请求消息(例如完成背书后的交易)通过 gRPC 接口发送给 Ordering 服务。Orderer 进行本地验证处理后,会转化为入队消息发给后端共识模块(如 Kafka)。

发给 Orderer 的 Broadcast 请求消息包括链码的实例化、调用;通道的创建、更新。

来自客户端的请求消息,会首先交给 orderer.common.server 包中 server 结构体的 Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error 方法处理。该方法主要会调用到 orderer.common.broadcast 包中 handlerImpl 结构的 Handle(srv ab.AtomicBroadcast_BroadcastServer) error 方法。

handlerImpl 结构体十分重要,在 Orderer 整个处理过程中都会用到。

type handlerImpl struct {
    sm ChannelSupportRegistrar
}

func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error

整体过程

Broadcast 请求的整体处理过程如下图所示。

Handle(srv ab.AtomicBroadcast_BroadcastServer) error 方法会开启一个循环来从 srv 中读取请求消息并进行处理,直到结束。主要包括解析消息、处理消息(包括配置消息和非配置消息)和返回响应三个步骤。

核心代码如下所示(位于 orderer/common/broadcast/broadcast.go#handlerImpl.Handle()):

for {
    msg, error := srv.Recv() // 从请求中提取一个 Envelope 消息

    // 解析消息:判断是否为配置消息;获取对应本地账本结构:由通道头部中指定的通道 ID 决定,本地对应账本结构不存在时(如新建应用通道)则由系统通道来处理
    chdr, isConfig, processor, err := bh.sm.BroadcastChannelSupport(msg) 
    // 检查是否被之前重新提交的消息阻塞
    processor.WaitReady()

    // 对应的通道结构对消息进行处理
    if !isConfig { // 普通消息
        configSeq, err := processor.ProcessNormalMsg(msg) //消息检查
        processor.Order(msg, configSeq) //入队列操作
    } else { // 配置消息,目前只有 CONFIG_UPDATE 类型,如创建、更新通道,或获取配置区块
        config, configSeq, err := processor.ProcessConfigUpdateMsg(msg) // 合并配置更新消息
        processor.Configure(config, configSeq) //入队列操作:相关处理后发给 Kafka
    }

    srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS}) // 返回响应消息
}

分为三个步骤:

  • 解析消息:判断是否为配置消息,决定消息应由哪个通道结构进行处理,注意对于创建应用通道消息,处理器指定为系统的通道结构;

  • 处理消息:选用对应的通道结构对消息进行处理,包括普通消息和配置消息;

  • 返回响应消息给请求方。

下面分别进行剖析。

解析消息

首先,解析消息,获取消息通道头、是否为配置消息、获取对应处理器结构(链结构)。

chdr, isConfig, processor, err := bh.sm.BroadcastChannelSupport(msg)

实际上,会映射到 orderer.common.server 包中 broadcastSupport 结构体的 BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, broadcast.ChannelSupport, error) 方法,进一步调用到 orderer.common.multichannel 包中 Registrar 结构体的对应方法。

// orderer/common/multichannel/registrar.go
func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, *ChainSupport, error) {
    chdr, err := utils.ChannelHeader(msg)
    if err != nil {
        return nil, false, nil, fmt.Errorf("could not determine channel ID: %s", err)
    }

    cs, ok := r.chains[chdr.ChannelId] // 应用通道、系统通道
    if !ok {
        cs = r.systemChannel // 空,则默认为系统通道。如收到新建应用通道请求时,Orderer 本地并没有该应用通道对应结构
    }

    isConfig := false
    switch cs.ClassifyMsg(chdr) { // 只有 CONFIG_UPDATE 会返回 ConfigUpdateMsg
    case msgprocessor.ConfigUpdateMsg: // CONFIG_UPDATE 消息,包括创建、更新通道,获取配置区块等
        isConfig = true
    default:
    }

    return chdr, isConfig, cs, nil
}

channel 头部从消息信封结构中解析出来;是否为配置信息根据消息头中通道类型进行判断(是否为 cb.HeaderType_CONFIG_UPDATE);通过字典结构查到对应的 ChainSupport 结构(应用通道、系统通道)作为处理器。

之后,利用解析后的结果,分别对不同类型的消息(普通消息、配置消息)进行不同处理。

下面默认以常见的应用通道场景进行介绍。

处理普通交易消息

对于普通交易消息,主要执行如下两个操作:消息格式检查和入队列操作。

configSeq, err := processor.ProcessNormalMsg(msg) //消息检查
processor.Order(msg, configSeq) //入队列操作

消息格式检查

消息检查方法会映射到 orderer.common.msgprocessor 包中 StandardChannel/SystemChannel 结构体的 ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) 方法,以应用通道为例,实现如下。

// orderer/common/msgprocessor/standardchannel.go
func (s *StandardChannel) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
    configSeq = s.support.Sequence() // 获取配置的序列号,映射到 common.configtx 包中 configManager 结构体的对应方法
    err = s.filters.Apply(env) // 进行过滤检查,实现为 orderer.common.msgprocessor 包中 RuleSet 结构体的对应方法。
    return
}

其中,过滤器会在创建 ChainSupport 结构时候初始化:

  • 应用通道:orderer.common.mspprocessor 包中的 CreateStandardChannelFilters(filterSupport channelconfig.Resources) *RuleSet 方法,包括 EmptyRejectRule、SizeFilter 和 SigFilter(ChannelWriters 角色)。

  • 系统通道:orderer.common.mspprocessor 包中的 CreateSystemChannelFilters(chainCreator ChainCreator, ledgerResources channelconfig.Resources) *RuleSet 方法,包括 EmptyRejectRule、SizeFilter、SigFilter(ChannelWriters 角色)和 SystemChannelFilter。

入队列操作

入队列操作会根据 consensus 配置的不同映射到 orderer.consensus.solo 包或 orderer.consensus.kafka 包中的方法。

以 kafka 情况为例,会映射到 chainImpl 结构体的对应方法。该方法会将消息进一步封装为 sarama.ProducerMessage 类型消息,通过 enqueue 方法发给 Kafka 后端。

// orderer/consensus/kafka/chain.go#chainImpl.Order()
func (chain *chainImpl) Order(env *cb.Envelope, configSeq uint64) error {
    return chain.order(env, configSeq, int64(0))
}

func (chain *chainImpl) order(env *cb.Envelope, configSeq uint64, originalOffset int64) error {
    marshaledEnv, err := utils.Marshal(env)
    if err != nil {
        return fmt.Errorf("cannot enqueue, unable to marshal envelope because = %s", err)
    }
    if !chain.enqueue(newNormalMessage(marshaledEnv, configSeq, originalOffset)) {
        return fmt.Errorf("cannot enqueue")
    }
    return nil
}

处理配置交易消息

对于配置交易消息(CONFIG_UPDATE 类型消息,包括创建、更新通道,获取配置区块等),处理过程与正常消息略有不同,包括合并配置更新消息和入队列操作两个操作。

合并配置更新

主要过程包括如下两个步骤:

config, configSeq, err := processor.ProcessConfigUpdateMsg(msg) // 合并配置更新,生成新的配置信封结构
processor.Configure(config, configSeq) //入队列操作,将生成的配置信封结构消息扔给后端队列(如 Kafka)

其中,合并配置更新消息方法会映射到 orderer.common.msgprocessor 包中 StandardChannel/SystemChannel 结构体的 ProcessConfigUpdateMsg(env *cb.Envelope) (configSeq uint64, err error) 方法,计算合并后的配置和配置编号。

以应用通道为例,实现如下。

// orderer/common/msgprocessor/standardchannel.go
func (s *StandardChannel) ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
    logger.Debugf("Processing config update message for channel %s", s.support.ChainID())

    seq := s.support.Sequence() // 获取当前配置的版本号
    err = s.filters.Apply(env) // 校验权限,是否可以更新配置
    if err != nil {
        return nil, 0, err
    }

    // 根据输入的更新配置交易消息生成配置信封结构:Config 为更新后配置字典;LastUpdate 为输入的更新配置交易
    // 最终调用 `common/configtx` 包下 `ValidatorImpl.ProposeConfigUpdate()` 方法。
    configEnvelope, err := s.support.ProposeConfigUpdate(env)
    if err != nil {
        return nil, 0, err
    }

    // 生成签名的配置信封结构,通道头类型为 HeaderType_CONFIG。即排序后消息类型将由 CONFIG_UPDATE 变更为 CONFIG
    config, err = utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, s.support.ChainID(), s.support.Signer(), configEnvelope, msgVersion, epoch)
    if err != nil {
        return nil, 0, err
    }

    err = s.filters.Apply(config) // 校验生成的配置消息是否合法
    if err != nil {
        return nil, 0, err
    }

    return config, seq, nil
}

对于系统通道情况,除了调用普通通道结构的对应方法来处理普通的更新配置交易外,还会负责新建应用通道请求。

// orderer/common/msgprocessor/systemchannel.go
func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
    channelID, err := utils.ChannelID(envConfigUpdate)
    if channelID == s.support.ChainID() { // 更新系统通道的配置交易,与普通通道相同处理
        return s.StandardChannel.ProcessConfigUpdateMsg(envConfigUpdate)
    }

    // 从系统通道中获取当前最新的配置
    // orderer/common/msgprocessor/systemchannel.go#DefaultTemplator.NewChannelConfig()
    bundle, err := s.templator.NewChannelConfig(envConfigUpdate)

    // 合并来自客户端的配置更新信封结构,创建配置信封结构 ConfigEnvelope
    newChannelConfigEnv, err := bundle.ConfigtxValidator().ProposeConfigUpdate(envConfigUpdate)

    // 封装新的签名信封结构,其 Payload.Data 是 newChannelConfigEnv
    newChannelEnvConfig, err := utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, s.support.Signer(), newChannelConfigEnv, msgVersion, epoch)

    // 处理新建应用通道请求,封装为 ORDERER_TRANSACTION 类型消息
    wrappedOrdererTransaction, err := utils.CreateSignedEnvelope(cb.HeaderType_ORDERER_TRANSACTION, s.support.ChainID(), s.support.Signer(), newChannelEnvConfig, msgVersion, epoch)

    s.StandardChannel.filters.Apply(wrappedOrdererTransaction) // 再次校验配置

    // 返回封装后的签名信封结构
    return wrappedOrdererTransaction, s.support.Sequence(), nil
}

入队列操作

入队列操作会根据 consensus 配置的不同映射到 orderer.consensus.solo 包或 orderer.consensus.kafka 包中的方法。

以 kafka 情况为例,会映射到 chainImpl 结构体的 Configure(config *cb.Envelope, configSeq uint64) 方法。该方法会调用 configure(config *cb.Envelope, configSeq uint64, originalOffset int64) 方法,将消息进一步封装为 KafkaMessage_Regular 类型消息,通过 enqueue 方法发给 Kafka 后端。

// orderer/consensus/kafka/chain.go
func (chain *chainImpl) configure(config *cb.Envelope, configSeq uint64, originalOffset int64) error {
    marshaledConfig, err := utils.Marshal(config)
    if err != nil {
        return fmt.Errorf("cannot enqueue, unable to marshal config because %s", err)
    }

    // 封装为 `KafkaMessageRegular_CONFIG` 类型消息,并通过 producer 发给 Kafka
    if !chain.enqueue(newConfigMessage(marshaledConfig, configSeq, originalOffset)) {
        return fmt.Errorf("cannot enqueue")
    }
    return nil
}

其中,封装为 KafkaMessageRegular_CONFIG 类型消息过程十分简单。

// orderer/consensus/kafka/chain.go
func newConfigMessage(config []byte, configSeq uint64, originalOffset int64) *ab.KafkaMessage {
    return &ab.KafkaMessage{
        Type: &ab.KafkaMessage_Regular{
            Regular: &ab.KafkaMessageRegular{
                Payload:        config,
                ConfigSeq:      configSeq,
                Class:          ab.KafkaMessageRegular_CONFIG,
                OriginalOffset: originalOffset,
            },
        },
    }
}

返回响应

如果处理成功,则返回成功响应消息。

srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})

之后 Orderer 将再次从 Kakfa 获取到共识(这里主要是排序)完成的 KafkaMessageRegular_CONFIG 消息,进行解析和处理。具体可以参考 。

Orderer 节点对排序后消息的处理过程
Orderer 节点 Broadcast 处理过程