fabric源码学习笔记

Hyperledger Fabric Orderer部分源码分析

笔记

Orderer的数据结构

前言

Orderer负责接收交易,把交易打包成区块,然后区块在所有Orderer节点之间达成一致,再分发给Peer的功能,这涉及了:

  • 网络:gRPC接受交易Brodcast,向peer发送区块
  • 切块:把交易按一定规则打包成区块
  • 共识:所有Orderer节点达成一致

简单来讲Orderer和Peer有点像发布订阅,生产消费的关系

Registrar

|   |-- multichannel
|   |   |-- blockwriter.go
|   |   |-- blockwriter_test.go
|   |   |-- chainsupport.go
|   |   |-- chainsupport_test.go
|   |   |-- registrar.go
|   |   |-- registrar_test.go
|   |   `-- util_test.go

Registrar serves as a point of access and control for the individual channel resources.

它负责了每个channel资源的访问和控制点,要对某个通道(链)怎么样,得从这入手。(通道和链其实都是区块链)

type Registrar struct {
	lock               sync.RWMutex
	// 保存了每一条链,每一条链在Orderer中都以ChainSupport代表,也就是说根据一个链的chainID来查询进而进行后续操作
	chains             map[string]*ChainSupport
	config             localconfig.TopLevel
	// 保存了所有的共识插件,每个共识插件都是一个Consenter,Fabric 1.4中共识插件有Solo、Kafka、EtcdRaft
	consenters         map[string]consensus.Consenter
	// 用来读取和创建链的账本
	ledgerFactory      blockledger.Factory
	// 用来对Orderer中的数据进行签名,以及创建SignatureHeader
	signer             crypto.LocalSigner
	blockcutterMetrics *blockcutter.Metrics
	// 系统链ID
	systemChannelID    string
	// 系统链实例
	systemChannel      *ChainSupport
	templator          msgprocessor.ChannelConfigTemplator
	callbacks          []channelconfig.BundleActor
}

ChainSupport

ChainSupport汇集了一条通道所需要的所有资源,所以说一个ChainSupport代表了一条链。

// ChainSupport holds the resources for a particular channel.
type ChainSupport struct {
	// 账本读写
	*ledgerResources
	// 处理交易信息,根据不同类型有不同的处理
	msgprocessor.Processor
	// 把区块写入到账本,看源码这里是只允许一个thread操作,因此也加入了协程的操作
	*BlockWriter
	// Orderer的共识实例,比如每条链都有自己的Raft共识实例,它们互不干扰
	consensus.Chain
	// 交易切块
	cutter blockcutter.Receiver
	crypto.LocalSigner
}

Chain


Chain是接口,它的实现并不一条链,而是一条链的共识实例,可以是Solo、Kafka和EtcdRaft,它运行在单独的协程,使用Channel和ChainSupport通信,它调用其它接口完成切块,以及让所有的Orderer节点对交易达成一致。

type Chain interface {
	// 普通消息/交易排序
	Order(env *cb.Envelope, configSeq uint64) error

	// 配置消息/交易排序
	Configure(config *cb.Envelope, configSeq uint64) error

	// 等待排序集群可用
	WaitReady() error

	// 当排序集群发送错误时,会关闭返回的通道
	Errored() <-chan struct{}

	// 启动当前链
	Start()

	// 停止当前链,并释放资源
	Halt()
}

Consenter

type Consenter interface {
	HandleChain(support ConsenterSupport, metadata *cb.Metadata) (Chain, error)
}

Consenter也是接口,它只有1个功能用来创建Chain。每种共识插件,都有自己单独的consenter实现,分别用来创建solo实例、kafka实例或etcdraft实例。

ConsenterSupport

type ConsenterSupport interface {
	crypto.LocalSigner
	msgprocessor.Processor

	// VerifyBlockSignature verifies a signature of a block with a given optional
	// configuration (can be nil).
	VerifyBlockSignature([]*cb.SignedData, *cb.ConfigEnvelope) error

	// BlockCutter returns the block cutting helper for this channel.
	// 把消息切成区块
	BlockCutter() blockcutter.Receiver

	// SharedConfig provides the shared config from the channel's current config block.
	// 当前链的Orderer配置
	SharedConfig() channelconfig.Orderer

	// ChannelConfig provides the channel config from the channel's current config block.
	// 当前链的通道配置
	ChannelConfig() channelconfig.Channel

	// CreateNextBlock takes a list of messages and creates the next block based on the block with highest block number committed to the ledger
	// Note that either WriteBlock or WriteConfigBlock must be called before invoking this method a second time.
	// 生成区块
	CreateNextBlock(messages []*cb.Envelope) *cb.Block

	// Block returns a block with the given number,
	// or nil if such a block doesn't exist.
	// 读区块
	Block(number uint64) *cb.Block

	// WriteBlock commits a block to the ledger.
	// 写区块
	WriteBlock(block *cb.Block, encodedMetadataValue []byte)

	// WriteConfigBlock commits a block to the ledger, and applies the config update inside.
	// 写区块并更新配置
	WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte)

	// Sequence returns the current config squence.
	Sequence() uint64

	// ChainID returns the channel ID this support is associated with.
	ChainID() string

	// Height returns the number of blocks in the chain this channel is associated with.
	Height() uint64

	// Append appends a new block to the ledger in its raw form,
	// unlike WriteBlock that also mutates its metadata.
	// 以原始数据的格式追加区块,不像WriteBlock那样修改元数据
	Append(block *cb.Block) error
}

小结

  • Registrar 包容万象,主要是ChainSupport和Consenter,Consenter是可插拔的
  • ChainSupport 代表了一条链,能够指向属于本条链的共识实例,该共识实例由对应共识类型的Consenter创建
  • 共识实例使用ConsenterSupport访问共识外部资源

Order主要模块

orderer
├── README.md
├── common
│   ├── blockcutter 缓存待打包的交易,切块
│   ├── bootstrap 启动时替换通道创世块
│   ├── broadcast orderer的Broadcast接口
│   ├── cluster (Raft)集群服务
│   ├── localconfig 解析orderer配置文件orderer.yaml
│   ├── metadata 区块元数据填写
│   ├── msgprocessor 交易检查
│   ├── multichannel 多通道支持:Registrar、chainSupport、写区块
│   └── server Orderer节点的服务端程序
├── consensus 共识插件
│   ├── consensus.go 共识插件需要实现的接口等定义
│   ├── etcdraft raft共识插件
│   ├── inactive 未激活时的raft
│   ├── kafka kafka共识插件
│   ├── mocks 测试用的共识插件
│   └── solo solo共识插件
├── main.go orderer程序入口
├── mocks
│   ├── common
│   └── util
└── sample_clients orderer的客户端程序样例
    ├── broadcast_config
    ├── broadcast_msg
    └── deliver_stdout

Orderer启动过程

步骤

  • 加载配置文件
  • 设置Logger
  • 设置本地MSP
  • 核心启动部分:
    • 加载创世块
    • 创建账本工厂
    • 创建本机gRPCServer
    • 如果共识需要集群(raft),创建集群gRPCServer
    • 创建Registrar:设置好共识插件,启动各通道,如果共识是raft,还会设置集群的gRPC接口处理函数Step
    • 创建本机server:它是原子广播的处理服务,融合了Broadcast处理函数、deliver处理函数和registrar
    • 开启profile
    • 启动集群gRPC服务
    • 启动本机gRPC服务

启动入口main

github.comhyperledgerfabricorderercommonservermain.go

// Main is the entry point of orderer process
func Main() {
	fullCmd := kingpin.MustParse(app.Parse(os.Args[1:]))

	// "version" command
	if fullCmd == version.FullCommand() {
		fmt.Println(metadata.GetVersionInfo())
		return
	}

	// 从本地配置文件和环境变量中读取配置信息,构建配置树结构
	conf, err := localconfig.Load()
	if err != nil {
		logger.Error("failed to parse config: ", err)
		os.Exit(1)
	}
	// 配置日志级别
	initializeLogging()
	// 配置 MSP 结构
	initializeLocalMsp(conf)

	prettyPrintStruct(conf)
	// 完成启动后的核心工作
	Start(fullCmd, conf)
}

Start

github.comhyperledgerfabricorderercommonservermain.go

// Start provides a layer of abstraction for benchmark test
func Start(cmd string, conf *localconfig.TopLevel) {
	// 加载/创建创世块
	bootstrapBlock := extractBootstrapBlock(conf)
	if err := ValidateBootstrapBlock(bootstrapBlock); err != nil {
		logger.Panicf("Failed validating bootstrap block: %v", err)
	}

	opsSystem := newOperationsSystem(conf.Operations, conf.Metrics)
	err := opsSystem.Start()
	if err != nil {
		logger.Panicf("failed to initialize operations subsystem: %s", err)
	}
	defer opsSystem.Stop()
	metricsProvider := opsSystem.Provider

	// 创建操作的账本工厂
	lf, _ := createLedgerFactory(conf, metricsProvider)
	sysChanLastConfigBlock := extractSysChanLastConfig(lf, bootstrapBlock)
	clusterBootBlock := selectClusterBootBlock(bootstrapBlock, sysChanLastConfigBlock)

	// 初始化签名结构
	signer := localmsp.NewSigner()

	logObserver := floggingmetrics.NewObserver(metricsProvider)
	flogging.Global.SetObserver(logObserver)

	serverConfig := initializeServerConfig(conf, metricsProvider)
	// 创建本机gRPC服务连接
	grpcServer := initializeGrpcServer(conf, serverConfig)
	caSupport := &comm.CredentialSupport{
		AppRootCAsByChain:           make(map[string]comm.CertificateBundle),
		OrdererRootCAsByChainAndOrg: make(comm.OrgRootCAs),
		ClientRootCAs:               serverConfig.SecOpts.ClientRootCAs,
	}

	var r *replicationInitiator
	clusterServerConfig := serverConfig
	clusterGRPCServer := grpcServer // by default, cluster shares the same grpc server
	clusterClientConfig := comm.ClientConfig{SecOpts: &comm.SecureOptions{}, KaOpts: &comm.KeepaliveOptions{}}
	var clusterDialer *cluster.PredicateDialer

	var reuseGrpcListener bool
	typ := consensusType(bootstrapBlock)
	var serversToUpdate []*comm.GRPCServer

	clusterType := isClusterType(clusterBootBlock)
	// 如果共识需要集群(raft),创建集群gRPCServer
	if clusterType {
		logger.Infof("Setting up cluster for orderer type %s", typ)

		clusterClientConfig = initializeClusterClientConfig(conf)
		clusterDialer = &cluster.PredicateDialer{
			ClientConfig: clusterClientConfig,
		}

		r = createReplicator(lf, bootstrapBlock, conf, clusterClientConfig.SecOpts, signer)
		// Only clusters that are equipped with a recent config block can replicate.
		if conf.General.GenesisMethod == "file" {
			r.replicateIfNeeded(bootstrapBlock)
		}

		if reuseGrpcListener = reuseListener(conf, typ); !reuseGrpcListener {
			clusterServerConfig, clusterGRPCServer = configureClusterListener(conf, serverConfig, ioutil.ReadFile)
		}

		// If we have a separate gRPC server for the cluster,
		// we need to update its TLS CA certificate pool.
		serversToUpdate = append(serversToUpdate, clusterGRPCServer)
	}

	// if cluster is reusing client-facing server, then it is already
	// appended to serversToUpdate at this point.
	if grpcServer.MutualTLSRequired() && !reuseGrpcListener {
		serversToUpdate = append(serversToUpdate, grpcServer)
	}

	tlsCallback := func(bundle *channelconfig.Bundle) {
		logger.Debug("Executing callback to update root CAs")
		updateTrustedRoots(caSupport, bundle, serversToUpdate...)
		if clusterType {
			updateClusterDialer(caSupport, clusterDialer, clusterClientConfig.SecOpts.ServerRootCAs)
		}
	}

	sigHdr, err := signer.NewSignatureHeader()
	if err != nil {
		logger.Panicf("Failed creating a signature header: %v", err)
	}

	expirationLogger := flogging.MustGetLogger("certmonitor")
	crypto.TrackExpiration(
		serverConfig.SecOpts.UseTLS,
		serverConfig.SecOpts.Certificate,
		[][]byte{clusterClientConfig.SecOpts.Certificate},
		sigHdr.Creator,
		expirationLogger.Warnf, // This can be used to piggyback a metric event in the future
		time.Now(),
		time.AfterFunc)

	// 初始化Registrar
	manager := initializeMultichannelRegistrar(clusterBootBlock, r, clusterDialer, clusterServerConfig, clusterGRPCServer, conf, signer, metricsProvider, opsSystem, lf, tlsCallback)
	mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
	expiration := conf.General.Authentication.NoExpirationChecks
	// 初始化gRPC服务端结构
	// 分别响应 Deliver() 和 Broadcast() 两个 gRPC 调用。
	server := NewServer(manager, metricsProvider, &conf.Debug, conf.General.Authentication.TimeWindow, mutualTLS, expiration)

	logger.Infof("Starting %s", metadata.GetVersionInfo())
	go handleSignals(addPlatformSignals(map[os.Signal]func(){
		syscall.SIGTERM: func() {
			grpcServer.Stop()
			if clusterGRPCServer != grpcServer {
				clusterGRPCServer.Stop()
			}
		},
	}))

	if !reuseGrpcListener && clusterType {
		logger.Info("Starting cluster listener on", clusterGRPCServer.Address())
		go clusterGRPCServer.Start()
	}
	// 开启profile服务
	initializeProfilingService(conf)
	// 绑定gRPC服务并启动
	ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)
	logger.Info("Beginning to serve requests")
	grpcServer.Start()
}

extractSysChanLastConfig

github.comhyperledgerfabricorderercommonservermain.go

// Extract system channel last config block
func extractSysChanLastConfig(lf blockledger.Factory, bootstrapBlock *cb.Block) *cb.Block {
	// Are we bootstrapping?
	chainCount := len(lf.ChainIDs())
	// 如果是首次启动,默认先创建系统通道的本地账本结构
	if chainCount == 0 {
		logger.Info("Bootstrapping because no existing channels")
		return nil
	}
    ...
}

initializeMultichannelRegistrar

github.comhyperledgerfabricorderercommonservermain.go

func initializeMultichannelRegistrar(
	bootstrapBlock *cb.Block,
	ri *replicationInitiator,
	clusterDialer *cluster.PredicateDialer,
	srvConf comm.ServerConfig,
	srv *comm.GRPCServer,
	conf *localconfig.TopLevel,
	signer crypto.LocalSigner,
	metricsProvider metrics.Provider,
	healthChecker healthChecker,
	lf blockledger.Factory,
	callbacks ...channelconfig.BundleActor,
) *multichannel.Registrar {
	genesisBlock := extractBootstrapBlock(conf)
	// Are we bootstrapping?
	if len(lf.ChainIDs()) == 0 {
		initializeBootstrapChannel(genesisBlock, lf)
	} else {
		logger.Info("Not bootstrapping because of existing channels")
	}

	consenters := make(map[string]consensus.Consenter)

	// 创建Registrar
	registrar := multichannel.NewRegistrar(*conf, lf, signer, metricsProvider, callbacks...)

	var icr etcdraft.InactiveChainRegistry
	if isClusterType(bootstrapBlock) {
		etcdConsenter := initializeEtcdraftConsenter(consenters, conf, lf, clusterDialer, bootstrapBlock, ri, srvConf, srv, registrar, metricsProvider)
		icr = etcdConsenter.InactiveChainRegistry
	}

	// 初始化共识插件
	consenters["solo"] = solo.New()
	var kafkaMetrics *kafka.Metrics
	consenters["kafka"], kafkaMetrics = kafka.New(conf.Kafka, metricsProvider, healthChecker, icr, registrar.CreateChain)
	// Note, we pass a 'nil' channel here, we could pass a channel that
	// closes if we wished to cleanup this routine on exit.
	go kafkaMetrics.PollGoMetricsUntilStop(time.Minute, nil)

	registrar.Initialize(consenters)
	return registrar
}

Initialize

github.comhyperledgerfabricorderercommonmultichannel egistrar.go

// 启动共识
func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
	r.consenters = consenters
	existingChains := r.ledgerFactory.ChainIDs()

	// 启动本地所有的账本结构的共识过程
	for _, chainID := range existingChains {
		rl, err := r.ledgerFactory.GetOrCreate(chainID)
		if err != nil {
			logger.Panicf("Ledger factory reported chainID %s but could not retrieve it: %s", chainID, err)
		}
		configTx := configTx(rl)
		if configTx == nil {
			logger.Panic("Programming error, configTx should never be nil here")
		}
		ledgerResources := r.newLedgerResources(configTx)
		chainID := ledgerResources.ConfigtxValidator().ChainID()

		// 如果是系统账本(默认在首次启动时会自动创建)
		// 启动共识过程
		if _, ok := ledgerResources.ConsortiumsConfig(); ok {
			if r.systemChannelID != "" {
				logger.Panicf("There appear to be two system chains %s and %s", r.systemChannelID, chainID)
			}

			chain := newChainSupport(
				r,
				ledgerResources,
				r.consenters,
				r.signer,
				r.blockcutterMetrics,
			)
			r.templator = msgprocessor.NewDefaultTemplator(chain)
			chain.Processor = msgprocessor.NewSystemChannel(chain, r.templator, msgprocessor.CreateSystemChannelFilters(r, chain, r.config))

			// Retrieve genesis block to log its hash. See FAB-5450 for the purpose
			iter, pos := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{Oldest: &ab.SeekOldest{}}})
			defer iter.Close()
			if pos != uint64(0) {
				logger.Panicf("Error iterating over system channel: '%s', expected position 0, got %d", chainID, pos)
			}
			genesisBlock, status := iter.Next()
			if status != cb.Status_SUCCESS {
				logger.Panicf("Error reading genesis block of system channel '%s'", chainID)
			}
			logger.Infof("Starting system channel '%s' with genesis block hash %x and orderer type %s",
				chainID, genesisBlock.Header.Hash(), chain.SharedConfig().ConsensusType())

			r.chains[chainID] = chain
			r.systemChannelID = chainID
			r.systemChannel = chain
			// We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built
			// 启动共识过程
			defer chain.start()
		} else { // 应用账本
			logger.Debugf("Starting chain: %s", chainID)
			chain := newChainSupport(
				r,
				ledgerResources,
				r.consenters,
				r.signer,
				r.blockcutterMetrics,
			)
			r.chains[chainID] = chain
			chain.start()
		}

	}

	if r.systemChannelID == "" {
		logger.Panicf("No system chain found.  If bootstrapping, does your system channel contain a consortiums group definition?")
	}
}

chain.start()

1.0是如此,以协程的方式

func (chain *chainImpl) Start() {
    go startThread(chain)
}

1.4看起来是根据不同的公式插件不同处理

// TODO待研究是如何进行插拔的

// Start instructs the orderer to begin serving the chain and keep it current.
func (c *Chain) Start() {
	c.logger.Infof("Starting Raft node")

	if err := c.configureComm(); err != nil {
		c.logger.Errorf("Failed to start chain, aborting: +%v", err)
		close(c.doneC)
		return
	}

	isJoin := c.support.Height() > 1
	if isJoin && c.opts.MigrationInit {
		isJoin = false
		c.logger.Infof("Consensus-type migration detected, starting new raft node on an existing channel; height=%d", c.support.Height())
	}
	c.Node.start(c.fresh, isJoin)

	close(c.startC)
	close(c.errorC)

	go c.gc()
	go c.serveRequest()

	es := c.newEvictionSuspector()

	interval := DefaultLeaderlessCheckInterval
	if c.opts.LeaderCheckInterval != 0 {
		interval = c.opts.LeaderCheckInterval
	}

	c.periodicChecker = &PeriodicCheck{
		Logger:        c.logger,
		Report:        es.confirmSuspicion,
		CheckInterval: interval,
		Condition:     c.suspectEviction,
	}
	c.periodicChecker.Run()
}

小结

主要分为两个部分:初始化配置,核心启动

Orderer整体架构

客户端通过Broadcast接口向Orderer提交背书过的交易,客户端(cli,peer)通过Deliver接口订阅区块事件,从Orderer获取区块

架构图

多通道

Fabric 支持多通道特性,而Orderer是多通道的核心组成部分。多通道由Registrar、ChainSupport、BlockWriter等一些重要部件组成

  • Registrar是所有通道资源的汇总,访问每一条通道,都要经由Registrar
  • ChainSupport代表了每一条通道,它融合了一条通道所有的资源
  • BlockWriter 是区块达成共识后,Orderer写入区块到账本需要使用的接口

共识插件

Fabric的共识是插件化的,抽象出了Orderer所使用的共识接口,任何一种共识插件,只要满足给定的接口,就可以配合Fabric Orderer使用。

gRPC通信

  • Broadcast:用来接受客户端提交的待排序的交易
  • Deliver:客户端用来接受Orderer节点发送的共识后的区块

Local Config

用来解析orderer节点的配置文件: orderer.yaml

// Load parses the orderer YAML file and environment, producing
// a struct suitable for config use, returning error on failure.
func Load() (*TopLevel, error) {
	config := viper.New()
	coreconfig.InitViper(config, "orderer")
	...
}

本地配置,不需要节点之间的统一配置,相关配置有:

  • 网络相关配置
  • 账本类型、位置
  • raft文件位置
  • ...

通道配置/上链配置:

  • 共识类型
  • 区块大小
  • 切块时间
  • 区块交易数
  • 各种共识的配置
  • ...

Metadata

//TODO 暂未搞懂它干嘛的,生成区块后还可以继续修改,不知道存在意义是什么。

MsgProcessor

orderer收到交易后需要对交易进行多项检查,不同的通道可以设置不同的MsgProcessor,也就可以进行不同的检查

当前Processor分为两个:

  • 应用通道的叫StandardChannel
  • 系统通道的叫SystemChannel
func (s *StandardChannel) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
	oc, ok := s.support.OrdererConfig()
	if !ok {
		logger.Panicf("Missing orderer config")
	}
	if oc.Capabilities().ConsensusTypeMigration() {
		if oc.ConsensusState() != orderer.ConsensusType_STATE_NORMAL {
			return 0, errors.WithMessage(
				ErrMaintenanceMode, "normal transactions are rejected")
		}
	}

	configSeq = s.support.Sequence()
	err = s.filters.Apply(env)
	return
}

这里主要是进行各种验证相关的操作

	err = s.filters.Apply(env)
func (rs *RuleSet) Apply(message *ab.Envelope) error {
	for _, rule := range rs.rules {
		err := rule.Apply(message)
		if err != nil {
			return err
		}
	}
	return nil
}

BlockCutter

BlockCutter用来把收到的交易分成多个组,每组交易会打包到一个区块中。而分组的过程,就是切块,每组交易被称为一个Batch,它有一个缓冲区用来存放待切块交易。

type receiver struct {
    // 缓冲区收到配置交易,配置交易要放到单独区块,如果缓冲区有交易,缓冲区已有交易会先切到1个区块
	sharedConfigFetcher   OrdererConfigFetcher
	// 缓冲区内交易数,达到区块包含的交易上限(默认500)
	pendingBatch          []*cb.Envelope
	// 缓冲区内交易总大小,达到区块大小上限(默认10MB)
	pendingBatchSizeBytes uint32
    // 缓冲区存在交易,并且未出块的时间,达到切块超时时间(默认2s)
	PendingBatchStartTime time.Time
	ChannelID             string
	Metrics               *Metrics
}

这块更像是一个单纯的打包机器,不涉及排序,哪个交易先写入BlockCutter的缓冲区,哪个交易就在前面。

BlockWriter

写区块
这一部分主要是区块的生成,写,落盘

小结

Orderer的功能,组成

Orderer处理交易

接受消息的数据结构

// Envelope wraps a Payload with a signature so that the message may be authenticated
type Envelope struct {
	// A marshaled Payload
	// 一个信息的body
	Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"`
	// A signature by the creator specified in the Payload header
	// 一个发送者的签名
	Signature            []byte   `protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

普通交易信息在Orderer中的流程

  • Orderer 的 Broadcast 接口收到来自客户端提交的交易,会获取交易所在的链的资源,并进行首次检查,然后提交给该链的共识,对交易进行排序,最后向客户端发送响应。
// broadcast.go

// Handle reads requests from a Broadcast stream, processes them, and returns the responses to the stream
func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
	// 获取发送者的addr
	addr := util.ExtractRemoteAddress(srv.Context())
	logger.Debugf("Starting new broadcast loop for %s", addr)
	for {
		// 循环等待请求,其实是通过Orderer 的 Broadcast 接口收到来自客户端提交的交易
		msg, err := srv.Recv()
		if err == io.EOF {
			logger.Debugf("Received EOF from %s, hangup", addr)
			return nil
		}
		if err != nil {
			logger.Warningf("Error reading from %s: %s", addr, err)
			return err
		}

		// 验证处理信息
		resp := bh.ProcessMessage(msg, addr)
		err = srv.Send(resp)
		if resp.Status != cb.Status_SUCCESS {
			return err
		}

		if err != nil {
			logger.Warningf("Error sending to %s: %s", addr, err)
			return err
		}
	}

}

配置交易在Orderer中的流程

基本同上

节点接受和广播区块

核心过程

Kafka作为共识插件情况

  • 客户端通过gRPC发送交易信息给orderer节点的Broadcast()接口
  • Orderer接口收到请求后,提取消息,解析检查消息,通过验证后封装发给Kafka
  • Orderer同时不断从Kafka拉去排序好的信息,然后会打包成区块

Broadcast

流程图


Broadcast,意味着客户端将请求消息(例如完成背书后的交易)通过 gRPC 接口发送给 Ordering 服务。Orderer 进行本地验证处理后,会转到共识模块处理。

Orderer接收的主要有那些消息:链码的实例化,调用;通道的创建和更新。

代码流程

解析消息
  • 来自客户端的消息,首先交给server处理
// Broadcast receives a stream of messages from a client for ordering
func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
	logger.Debugf("Starting new Broadcast handler")
	defer func() {
		if r := recover(); r != nil {
			logger.Criticalf("Broadcast client triggered panic: %s
%s", r, debug.Stack())
		}
		logger.Debugf("Closing Broadcast stream")
	}()
	return s.bh.Handle(&broadcastMsgTracer{
		AtomicBroadcast_BroadcastServer: srv,
		msgTracer: msgTracer{
			debug:    s.debug,
			function: "Broadcast",
		},
	})
}

对应的grpc服务是broadcast里的recv

type AtomicBroadcast_BroadcastServer interface {
	Send(*BroadcastResponse) error
	Recv() (*common.Envelope, error)
	grpc.ServerStream
}

其实client到orderer的信息是通过Envelope包装

type Envelope struct {
	// A marshaled Payload
	Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"`
	// A signature by the creator specified in the Payload header
	Signature            []byte   `protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

到此,orderer的rpc部分的接受结束,接下来就是对信息的解析验证排序处理,然后返回给client一个响应

type AtomicBroadcast_BroadcastServer interface {
	Send(*BroadcastResponse) error
	Recv() (*common.Envelope, error)
	grpc.ServerStream
}

其中的信息

  • broadcas.Handler()
    • 解析消息:判断是否为配置消息,决定消息应由哪个通道结构进行处理,注意对于创建应用通道消息,处理器指定为系统的通道结构;
    • 处理消息:选用对应的通道结构对消息进行处理,包括普通消息和配置消息;
    • 返回响应消息给请求方。
// Handler is designed to handle connections from Broadcast AB gRPC service
type Handler struct {
	SupportRegistrar ChannelSupportRegistrar
	Metrics          *Metrics
}

// Handle reads requests from a Broadcast stream, processes them, and returns the responses to the stream
func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
	// 获取发送者的addr
	addr := util.ExtractRemoteAddress(srv.Context())
	logger.Debugf("Starting new broadcast loop for %s", addr)
	for {
		// 循环等待请求,其实是通过Orderer 的 Broadcast 接口收到来自客户端提交的交易
		msg, err := srv.Recv()
		if err == io.EOF {
			logger.Debugf("Received EOF from %s, hangup", addr)
			return nil
		}
		if err != nil {
			logger.Warningf("Error reading from %s: %s", addr, err)
			return err
		}

		// 验证处理接受信息
		resp := bh.ProcessMessage(msg, addr)
		// 将信息广播出去
		err = srv.Send(resp)
		if resp.Status != cb.Status_SUCCESS {
			return err
		}

		if err != nil {
			logger.Warningf("Error sending to %s: %s", addr, err)
			return err
		}
	}

}
  • Registrar.BroadcastChannelSupport()

    • channel 头部从消息信封结构中解析出来
    • 是否为配置信息根据消息头中通道类型进行判断(是否为 cb.HeaderType_CONFIG_UPDATE)
    • 通过字典结构查到对应的 ChainSupport 结构(应用通道、系统通道)作为处理器。
// BroadcastChannelSupport returns the message channel header, whether the message is a config update
// and the channel resources for a message or an error if the message is not a message which can
// be processed directly (like CONFIG and ORDERER_TRANSACTION messages)
func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, *ChainSupport, error) {
	// channel 头部从消息信封结构中解析出来;是否为配置信息根据消息头中通道类型进行判断(是否为 cb.HeaderType_CONFIG_UPDATE)
	chdr, err := utils.ChannelHeader(msg)
	if err != nil {
		return nil, false, nil, fmt.Errorf("could not determine channel ID: %s", err)
	}

	// 获取通道
	cs := r.GetChain(chdr.ChannelId)
	// New channel creation
	// 如果为空则默认系统通道。如收到新建应用通道请求时,Orderer 本地并没有该应用通道对应结构,因此也为空。
	if cs == nil {
		cs = r.systemChannel
	}

	isConfig := false
	switch cs.ClassifyMsg(chdr) {
	// 只有 CONFIG_UPDATE 会返回 ConfigUpdateMsg
	case msgprocessor.ConfigUpdateMsg:
		isConfig = true
	case msgprocessor.ConfigMsg:
		return chdr, false, nil, errors.New("message is of type that cannot be processed directly")
	default:
	}

	return chdr, isConfig, cs, nil
}
  • commonutils.ChannelHeader()
// ChannelHeader returns the *cb.ChannelHeader for a given *cb.Envelope.
func ChannelHeader(env *cb.Envelope) (*cb.ChannelHeader, error) {
    // 解析消息体
	envPayload, err := UnmarshalPayload(env.Payload)
	if err != nil {
		return nil, err
	}

	if envPayload.Header == nil {
		return nil, errors.New("header not set")
	}

	if envPayload.Header.ChannelHeader == nil {
		return nil, errors.New("channel header not set")
	}
    // 解析真正的通道头
	chdr, err := UnmarshalChannelHeader(envPayload.Header.ChannelHeader)
	if err != nil {
		return nil, errors.WithMessage(err, "error unmarshaling channel header")
	}

	return chdr, nil
}
根据解析结果,处理交易信息
普通交易信息
// ProcessMessage validates and enqueues a single message
func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {
        ...
        消息检查
		configSeq, err := processor.ProcessNormalMsg(msg)
		
        // 这里主要看客户端和peer节点发来的消息类型,前面已经判断过了
        // 假设创建通道的信息,消息类型则为配置信息
        if !isConfig {
        ...
        共识排序
		err = processor.Order(msg, configSeq)
        ...
  • 消息检查
// ProcessNormalMsg will check the validity of a message based on the current configuration.  It returns the current
// configuration sequence number and nil on success, or an error if the message is not valid
func (s *StandardChannel) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
	oc, ok := s.support.OrdererConfig()
	if !ok {
		logger.Panicf("Missing orderer config")
	}
	if oc.Capabilities().ConsensusTypeMigration() {
		if oc.ConsensusState() != orderer.ConsensusType_STATE_NORMAL {
			return 0, errors.WithMessage(
				ErrMaintenanceMode, "normal transactions are rejected")
		}
	}
	// 获取配置的序列号,映射到 common.configtx 包中 configManager 结构体的对应方法
	configSeq = s.support.Sequence()
	// 进行过滤检查,实现为 orderer.common.msgprocessor 包中 RuleSet 结构体的对应方法。
	err = s.filters.Apply(env)
	return
}

过滤器会在创建 ChainSupport 结构时候初始化:

应用通道:orderer.common.mspprocessor 包中的

CreateStandardChannelFilters(filterSupport channelconfig.Resources) *RuleSet

系统通道:orderer.common.mspprocessor 包中的

 CreateSystemChannelFilters(chainCreator ChainCreator, ledgerResources channelconfig.Resources) *RuleSet
  • 共识排序

根据不同的共识插件,具体逻辑不一致
// TODO 待补充

配置交易信息
config, configSeq, err := processor.ProcessConfigUpdateMsg(msg) // 合并配置更新,生成新的配置信封结构
processor.Configure(config, configSeq) //入队列操作,将生成的配置信封结构消息扔给后端队列(如 Kafka)
  • standardchannel.ProcessConfigUpdateMsg()
// orderer/common/msgprocessor/standardchannel.go
func (s *StandardChannel) ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
    logger.Debugf("Processing config update message for channel %s", s.support.ChainID())

    seq := s.support.Sequence() // 获取当前配置的版本号
    err = s.filters.Apply(env) // 校验权限,是否可以更新配置
    if err != nil {
        return nil, 0, err
    }

    // 根据输入的更新配置交易消息生成配置信封结构:Config 为更新后配置字典;LastUpdate 为输入的更新配置交易
    // 最终调用 `common/configtx` 包下 `ValidatorImpl.ProposeConfigUpdate()` 方法。
    configEnvelope, err := s.support.ProposeConfigUpdate(env)
    if err != nil {
        return nil, 0, err
    }

    // 生成签名的配置信封结构,通道头类型为 HeaderType_CONFIG。即排序后消息类型将由 CONFIG_UPDATE 变更为 CONFIG
    config, err = utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, s.support.ChainID(), s.support.Signer(), configEnvelope, msgVersion, epoch)
    if err != nil {
        return nil, 0, err
    }

    err = s.filters.Apply(config) // 校验生成的配置消息是否合法
    if err != nil {
        return nil, 0, err
    }

    return config, seq, nil
}
  • standardchannel.ProcessConfigUpdateMsg()

对于系统通道情况,除了调用普通通道结构的对应方法来处理普通的更新配置交易外,还会负责新建应用通道请求。

func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
    // 首先从消息体获取通道ID
    channelID, err := utils.ChannelID(envConfigUpdate)
    
    // 判断获取到的通道ID是否为已经存在的用户通道ID,如果是的话转到StandardChannel中的ProcessConfigUpdateMsg()方法进行处理
    if channelID == s.support.ChainID() { 
        return s.StandardChannel.ProcessConfigUpdateMsg(envConfigUpdate)
    }

    // 从系统通道中获取当前最新的配置,例如创建通道则会走进这个方法
    // orderer/common/msgprocessor/systemchannel.go#DefaultTemplator.NewChannelConfig()
    bundle, err := s.templator.NewChannelConfig(envConfigUpdate)

    // 合并来自客户端的配置更新信封结构,创建配置信封结构 ConfigEnvelope
    newChannelConfigEnv, err := bundle.ConfigtxValidator().ProposeConfigUpdate(envConfigUpdate)

    // 封装新的签名信封结构,其 Payload.Data 是 newChannelConfigEnv
    newChannelEnvConfig, err := utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, s.support.Signer(), newChannelConfigEnv, msgVersion, epoch)

    // 处理新建应用通道请求,封装为 ORDERER_TRANSACTION 类型消息
    wrappedOrdererTransaction, err := utils.CreateSignedEnvelope(cb.HeaderType_ORDERER_TRANSACTION, s.support.ChainID(), s.support.Signer(), newChannelEnvConfig, msgVersion, epoch)

    s.StandardChannel.filters.Apply(wrappedOrdererTransaction) // 再次校验配置

    // 返回封装后的签名信封结构
    return wrappedOrdererTransaction, s.support.Sequence(), nil
}
  • DefaultTemplator.NewChannelConfig()
// NewChannelConfig creates a new template channel configuration based on the current config in the ordering system channel.
func (dt *DefaultTemplator) NewChannelConfig(envConfigUpdate *cb.Envelope) (channelconfig.Resources, error) {
	// 反序列化payload
	configUpdatePayload, err := utils.UnmarshalPayload(envConfigUpdate.Payload)
	if err != nil {
		return nil, fmt.Errorf("Failing initial channel config creation because of payload unmarshaling error: %s", err)
	}
	// 反序列化配置信息
	configUpdateEnv, err := configtx.UnmarshalConfigUpdateEnvelope(configUpdatePayload.Data)
	if err != nil {
		return nil, fmt.Errorf("Failing initial channel config creation because of config update envelope unmarshaling error: %s", err)
	}

	if configUpdatePayload.Header == nil {
		return nil, fmt.Errorf("Failed initial channel config creation because config update header was missing")
	}

	// 获取通道头信息
	channelHeader, err := utils.UnmarshalChannelHeader(configUpdatePayload.Header.ChannelHeader)
	if err != nil {
		return nil, fmt.Errorf("Failed initial channel config creation because channel header was malformed: %s", err)
	}

	// 反序列化配置更新信息
	configUpdate, err := configtx.UnmarshalConfigUpdate(configUpdateEnv.ConfigUpdate)
	if err != nil {
		return nil, fmt.Errorf("Failing initial channel config creation because of config update unmarshaling error: %s", err)
	}

	if configUpdate.ChannelId != channelHeader.ChannelId {
		return nil, fmt.Errorf("Failing initial channel config creation: mismatched channel IDs: '%s' != '%s'", configUpdate.ChannelId, channelHeader.ChannelId)
	}

	if configUpdate.WriteSet == nil {
		return nil, fmt.Errorf("Config update has an empty writeset")
	}

	if configUpdate.WriteSet.Groups == nil || configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey] == nil {
		return nil, fmt.Errorf("Config update has missing application group")
	}

	if uv := configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Version; uv != 1 {
		return nil, fmt.Errorf("Config update for channel creation does not set application group version to 1, was %d", uv)
	}

	// 根据之前定义的各项策略对通道进行配置,具体的策略可以看configtx.yaml文件
	consortiumConfigValue, ok := configUpdate.WriteSet.Values[channelconfig.ConsortiumKey]
	if !ok {
		return nil, fmt.Errorf("Consortium config value missing")
	}

	consortium := &cb.Consortium{}
	err = proto.Unmarshal(consortiumConfigValue.Value, consortium)
	if err != nil {
		return nil, fmt.Errorf("Error reading unmarshaling consortium name: %s", err)
	}

	applicationGroup := cb.NewConfigGroup()
	consortiumsConfig, ok := dt.support.ConsortiumsConfig()
	if !ok {
		return nil, fmt.Errorf("The ordering system channel does not appear to support creating channels")
	}

	consortiumConf, ok := consortiumsConfig.Consortiums()[consortium.Name]
	if !ok {
		return nil, fmt.Errorf("Unknown consortium name: %s", consortium.Name)
	}

	applicationGroup.Policies[channelconfig.ChannelCreationPolicyKey] = &cb.ConfigPolicy{
		Policy: consortiumConf.ChannelCreationPolicy(),
	}
	applicationGroup.ModPolicy = channelconfig.ChannelCreationPolicyKey

	// Get the current system channel config
	// 获取当前系统通道的配置信息
	systemChannelGroup := dt.support.ConfigtxValidator().ConfigProto().ChannelGroup

	// If the consortium group has no members, allow the source request to have no members.  However,
	// if the consortium group has any members, there must be at least one member in the source request
	if len(systemChannelGroup.Groups[channelconfig.ConsortiumsGroupKey].Groups[consortium.Name].Groups) > 0 &&
		len(configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Groups) == 0 {
		return nil, fmt.Errorf("Proposed configuration has no application group members, but consortium contains members")
	}

	// If the consortium has no members, allow the source request to contain arbitrary members
	// Otherwise, require that the supplied members are a subset of the consortium members
	if len(systemChannelGroup.Groups[channelconfig.ConsortiumsGroupKey].Groups[consortium.Name].Groups) > 0 {
		for orgName := range configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Groups {
			consortiumGroup, ok := systemChannelGroup.Groups[channelconfig.ConsortiumsGroupKey].Groups[consortium.Name].Groups[orgName]
			if !ok {
				return nil, fmt.Errorf("Attempted to include a member which is not in the consortium")
			}
			applicationGroup.Groups[orgName] = proto.Clone(consortiumGroup).(*cb.ConfigGroup)
		}
	}

	channelGroup := cb.NewConfigGroup()

	// Copy the system channel Channel level config to the new config
	// 将系统通道的配置信息复制
	for key, value := range systemChannelGroup.Values {
		channelGroup.Values[key] = proto.Clone(value).(*cb.ConfigValue)
		if key == channelconfig.ConsortiumKey {
			// Do not set the consortium name, we do this later
			continue
		}
	}

	for key, policy := range systemChannelGroup.Policies {
		channelGroup.Policies[key] = proto.Clone(policy).(*cb.ConfigPolicy)
	}

	// Set the new config orderer group to the system channel orderer group and the application group to the new application group
	// 新的配置信息中Order组配置使用系统通道的配置,同时将定义的application组配置赋值到新的配置信息
	channelGroup.Groups[channelconfig.OrdererGroupKey] = proto.Clone(systemChannelGroup.Groups[channelconfig.OrdererGroupKey]).(*cb.ConfigGroup)
	channelGroup.Groups[channelconfig.ApplicationGroupKey] = applicationGroup
	channelGroup.Values[channelconfig.ConsortiumKey] = &cb.ConfigValue{
		Value:     utils.MarshalOrPanic(channelconfig.ConsortiumValue(consortium.Name).Value()),
		ModPolicy: channelconfig.AdminsPolicyKey,
	}

	// Non-backwards compatible bugfix introduced in v1.1
	// The capability check should be removed once v1.0 is deprecated
	if oc, ok := dt.support.OrdererConfig(); ok && oc.Capabilities().PredictableChannelTemplate() {
		channelGroup.ModPolicy = systemChannelGroup.ModPolicy
		zeroVersions(channelGroup)
	}

	// 将创建的新的配置打包为Bundle
	bundle, err := channelconfig.NewBundle(channelHeader.ChannelId, &cb.Config{
		ChannelGroup: channelGroup,
	})

	if err != nil {
		return nil, err
	}

	return bundle, nil
}
  • 共识部分
    // TODO 待补充

  • 返回响应成功,这里会是对信息接受成功的后对client的一个状态响应

return &ab.BroadcastResponse{Status: cb.Status_SUCCESS}

Deliver

Deliver部分主要是将生成的区块发送给peer,这部分类似生产消费模型,orderer就是一个区块的生产者,peer就是消费区块的地方。

orderer部分的主要逻辑在deliverBlocks,其中部分逻辑如下:

	for {
		if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
			if number > chain.Reader().Height()-1 {
				return cb.Status_NOT_FOUND, nil
			}
		}

		var block *cb.Block
		var status cb.Status

		iterCh := make(chan struct{})

		// 这里开启一个协程的作用是避免不必要的阻塞?
		go func() {
			block, status = cursor.Next()
			close(iterCh)
		}()

		select {
		case <-ctx.Done():
			logger.Debugf("Context canceled, aborting wait for next block")
			return cb.Status_INTERNAL_SERVER_ERROR, errors.Wrapf(ctx.Err(), "context finished before block retrieved")
		case <-erroredChan:
			// TODO, today, the only user of the errorChan is the orderer consensus implementations.  If the peer ever reports
			// this error, we will need to update this error message, possibly finding a way to signal what error text to return.
			logger.Warningf("Aborting deliver for request because the backing consensus implementation indicates an error")
			return cb.Status_SERVICE_UNAVAILABLE, nil
		case <-iterCh:
			// Iterator has set the block and status vars
		}

		if status != cb.Status_SUCCESS {
			logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
			return status, nil
		}

		// increment block number to support FAIL_IF_NOT_READY deliver behavior
		number++

		if err := accessControl.Evaluate(); err != nil {
			logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
			return cb.Status_FORBIDDEN, nil
		}

		// 返回peer的区块请求
		logger.Infof("Start delivering block [channel: %s, txid: %s, type: %s, from: %s, blockNum: %d, prevHash: %s] at (%s)", chdr.ChannelId, chdr.TxId, cb.HeaderType_name[chdr.Type], addr, number, base64.StdEncoding.EncodeToString(block.Header.DataHash[:]), strconv.FormatInt(time.Now().UnixNano(), 10))
		if err := srv.SendBlockResponse(block); err != nil {
			logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
			return cb.Status_INTERNAL_SERVER_ERROR, err
		}

		logger.Infof("End delivering block [channel: %s, txid: %s, type: %s, from: %s, blockNum: %d, prevHash: %s] at (%s)", chdr.ChannelId, chdr.TxId, cb.HeaderType_name[chdr.Type], addr, number, base64.StdEncoding.EncodeToString(block.Header.DataHash[:]), strconv.FormatInt(time.Now().UnixNano(), 10))

		h.Metrics.BlocksSent.With(labels...).Add(1)

		// 如果读到了事先在请求消息中的结束的位置,跳出循环,因为peer第一次请求发送的stopNum很大,所以这个for循环会/// 不断的迭代
		if stopNum == block.Header.Number {
			break
		}

	}

这个是一个并发的异步阻塞模型,可以看到每个区块的获取都是一个协程,select部分决定了是否可以继续执行下一步。

func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
	addr := util.ExtractRemoteAddress(srv.Context())
	logger.Debugf("Starting new broadcast loop for %s", addr)
	for {
		msg, err := srv.Recv()
		//logger.Infof("Recving a new message at %d", time.Now().UnixNano())
		if err == io.EOF {
			logger.Debugf("Received EOF from %s, hangup", addr)
			return nil
		}
		if err != nil {
			logger.Warningf("Error reading from %s: %s", addr, err)
			return err
		}
		logger.Infof("Start Processing message at %d", time.Now().UnixNano())
		resp := bh.ProcessMessage(msg, addr)
		logger.Debugf("End Processing message at %d", time.Now().UnixNano())

		err = srv.Send(resp)
		if resp.Status != cb.Status_SUCCESS {
			return err
		}

		if err != nil {
			logger.Warningf("Error sending to %s: %s", addr, err)
			return err
		}
	}

}

orderer接受到peer的区块请求rpc服务,orderer会通过SendBlockResponse服务来发送区块

// ResponseSender defines the interface a handler must implement to send
// responses.
type ResponseSender interface {
	SendStatusResponse(status cb.Status) error
	SendBlockResponse(block *cb.Block) error
}
type AtomicBroadcast_DeliverServer interface {
	Send(*DeliverResponse) error
	Recv() (*common.Envelope, error)
	grpc.ServerStream
}

流程图

下图展示了Orderer向Peer传递区块的宏观视角,能够展示多个通道在Orderer和Peer间传递区块的情况:

单通道区块同步

生成区块流程

总体步骤

  • 接收client的消息
  • 处理验证消息
  • 交给共识部分排序打包
  • 将块发送给peer

详细内容可以对照前面来查看

fabric-samples中的并发版本

这是fabric官方的一个并发测试版本,优点是快速部署完整的网络加完善的脚本,缺点是目前只能在单机部署

步骤

  • cd into the first-network folder within fabric-samples, e.g. cd ~/fabric-samples/first-network
  • Open docker-compose-cli.yaml in your favorite editor, and edit the following lines:
    • In the volumes section of the cli container, edit the second line which refers to the chaincode folder to point to the chaincode folder within the high-throughput folder,
      • e.g../../chaincode/:/opt/gopath/src/github.com/hyperledger/fabric/examples/chaincode/go --> ./../high-throughput/chaincode/:/opt/gopath/src/github.com/hyperledger/fabric/examples/chaincode/go
    • Again in the volumes section, edit the fourth line which refers to the scripts folder so it points to the scripts folder within the high-throughput folder, e.g.
      • ./scripts:/opt/gopath/src/github.com/hyperledger/fabric/peer/scripts/ --> ./../high-throughput/scripts/:/opt/gopath/src/github.com/hyperledger/fabric/peer/scripts/
    • Finally, comment out the docker exec cli scripts/script.sh command from the byfn.sh script by placing a # before it so that the standard BYFN end to end script doesn't run, e.g.
      • docker exec cli scripts/script.sh $CHANNEL_NAME $CLI_DELAY $LANGUAGE $CLI_TIMEOUT $VERBOSE
    • We can now bring our network up by typing in ./byfn.sh -m up -c mychannel
    • Open a new terminal window and enter the CLI container using docker exec -it cli bash, all operations on the network will happen within this container from now on.

如果没有问题则可以进行对网络的操作,详情可参考官方文档

如何利用wrk对这个网络进行测试

  • 下载编译wrk
  • 启动一个网络监听

简单示例

r := gin.Default()
        r.GET("/update", func(c *gin.Context) {
                //读取配置文件,创建SDK对象
                configProvider := config.FromFile("/root/fabric-samples/first-network/test/config.yaml")
                sdk, err := fabsdk.New(configProvider)
                if err != nil {
                        log.Fatalf("create sdk fail: %s
", err.Error())
                }else{
                        fmt.Println("create a new fabsdk")
                }

                //调用合约
                channelProvider := sdk.ChannelContext("mychannel",
                        fabsdk.WithUser("Admin"),
                        fabsdk.WithOrg("org1"))

                channelClient, err := channel.New(channelProvider)
                if err != nil {
                        log.Fatalf("create channel client fail: %s
", err.Error())
                } else {
                        fmt.Println("create channelClient succeed")
                }

                println("---------------ok-----------")

                //构建函数请求
                var args1 [][]byte

                args1 = append(args1, []byte("myvar"), []byte("10"), []byte("+"))

                var target =[]string{"peer0.org1.example.com","peer1.org2.example.com"}

                //调用channelClient执行请求
                //response, err := channelClient.Execute(request,channel.WithTargetEndpoints(target...))


                request := channel.Request{
                        ChaincodeID: "bigdatacc",
                        Fcn:         "update",
                        Args:        args1,
                }
                response, err := channelClient.Execute(request,channel.WithTargetEndpoints(target...))
                //response, err := channelClient.Execute(request)
                if err != nil {
                        log.Fatal("query fail: ", err.Error())
                } else {
                        fmt.Printf("response is [%s]
", string(response.Payload))
                }
        })

        r.Run(":8080")

  • 利用wrk对这个网络进行请求测试
wrk -t10 -c150 -d10 --timeout 100 http://0.0.0.0:8080/update

fabric源码中的protobuf对象

大致是orderer和common这块的protobuf的分析

orderer

ab.proto

  • 广播响应

这里指client段发送给orderer信息,然后orderer返回响应

message BroadcastResponse {
    // Status code, which may be used to programatically respond to success/failure
    common.Status status = 1;
    // Info string which may contain additional information about the status returned
    string info = 2;
}
  • 区块请求信息

这里指peer段向orderer请求区块的一些信息

message SeekNewest { }
message SeekOldest { }
message SeekPosition {
    oneof Type {
        SeekNewest newest = 1;
        SeekOldest oldest = 2;
        SeekSpecified specified = 3;
    }
}
message SeekInfo {
   // If BLOCK_UNTIL_READY is specified, the reply will block until the requested blocks are available,
   // if FAIL_IF_NOT_READY is specified, the reply will return an error indicating that the block is not
   // found.  To request that all blocks be returned indefinitely as they are created, behavior should be
   // set to BLOCK_UNTIL_READY and the stop should be set to specified with a number of MAX_UINT64
    enum SeekBehavior {
        BLOCK_UNTIL_READY = 0;
        FAIL_IF_NOT_READY = 1;
    }

    // SeekErrorTolerance indicates to the server how block provider errors should be tolerated.  By default,
    // if the deliver service detects a problem in the underlying block source (typically, in the orderer,
    // a consenter error), it will begin to reject deliver requests.  This is to prevent a client from waiting
    // for blocks from an orderer which is stuck in an errored state.  This is almost always the desired behavior
    // and clients should stick with the default STRICT checking behavior.  However, in some scenarios, particularly
    // when attempting to recover from a crash or other corruption, it's desirable to force an orderer to respond
    // with blocks on a best effort basis, even if the backing consensus implementation is in an errored state.
    // In this case, set the SeekErrorResponse to BEST_EFFORT to ignore the consenter errors.
    enum SeekErrorResponse {
        STRICT = 0;
        BEST_EFFORT = 1;
    }
    SeekPosition start = 1;               // The position to start the deliver from
    SeekPosition stop = 2;                // The position to stop the deliver
    SeekBehavior behavior = 3;            // The behavior when a missing block is encountered
    SeekErrorResponse error_response = 4; // How to respond to errors reported to the deliver service
}
  • 分发响应

这里指orderer响应peer的区块请求

message DeliverResponse {
    oneof Type {
        common.Status status = 1;
        common.Block block = 2;
    }
}
  • 定义广播和分发rpc服务
service AtomicBroadcast {
    // broadcast receives a reply of Acknowledgement for each common.Envelope in order, indicating success or type of failure
    rpc Broadcast(stream common.Envelope) returns (stream BroadcastResponse) {}

    // deliver first requires an Envelope of type DELIVER_SEEK_INFO with Payload data as a mashaled SeekInfo message, then a stream of block replies is received.
    rpc Deliver(stream common.Envelope) returns (stream DeliverResponse) {}
}

configuration.proto

  • 共识的类型定义
message ConsensusType {
    // The consensus type: "solo", "kafka" or "etcdraft".
    string type = 1;
    // Opaque metadata, dependent on the consensus type.
    bytes metadata = 2;

    // State defines the orderer mode of operation, typically for consensus-type migration.
    // NORMAL is during normal operation, when consensus-type migration is not, and can not, take place.
    // MAINTENANCE is when the consensus-type can be changed.
    enum State {
        STATE_NORMAL = 0;
        STATE_MAINTENANCE = 1;
    }
    // The state signals the ordering service to go into maintenance mode, typically for consensus-type migration.
    State state = 3;
}
  • 切块的一些配置
message BatchSize {
    // Simply specified as number of messages for now, in the future
    // we may want to allow this to be specified by size in bytes
    uint32 max_message_count = 1;
    // The byte count of the serialized messages in a batch cannot
    // exceed this value.
    uint32 absolute_max_bytes = 2;
    // The byte count of the serialized messages in a batch should not
    // exceed this value.
    uint32 preferred_max_bytes = 3;
}

common.proto

  • HTTP的状态信息
enum Status {
    UNKNOWN = 0;
    SUCCESS = 200;
    BAD_REQUEST = 400;
    FORBIDDEN = 403;
    NOT_FOUND = 404;
    REQUEST_ENTITY_TOO_LARGE = 413;
    INTERNAL_SERVER_ERROR = 500;
    NOT_IMPLEMENTED = 501;
    SERVICE_UNAVAILABLE = 503;
}
  • HeaderType

比如channelheader的类型就是从这检索

enum HeaderType {
    // Prevent removed tag re-use
    // Uncomment after fabric-baseimage moves to 3.5.1
    // reserved 7;
    // reserved "PEER_RESOURCE_UPDATE";

    MESSAGE = 0;                   // Used for messages which are signed but opaque
    CONFIG = 1;                    // Used for messages which express the channel config
    CONFIG_UPDATE = 2;             // Used for transactions which update the channel config
    ENDORSER_TRANSACTION = 3;      // Used by the SDK to submit endorser based transactions
    ORDERER_TRANSACTION = 4;       // Used internally by the orderer for management
    DELIVER_SEEK_INFO = 5;         // Used as the type for Envelope messages submitted to instruct the Deliver API to seek
    CHAINCODE_PACKAGE = 6;         // Used for packaging chaincode artifacts for install
    PEER_ADMIN_OPERATION = 8;      // Used for invoking an administrative operation on a peer
    TOKEN_TRANSACTION = 9;         // Used to denote transactions that invoke token management operations
}
  • Header

签名头,通道头

message Header {
    bytes channel_header = 1;
    bytes signature_header = 2;
}
  • ChannelHeader
    通道头,channelid,txid,version等信息
message ChannelHeader {
    int32 type = 1; // Header types 0-10000 are reserved and defined by HeaderType

    // Version indicates message protocol version
    int32 version = 2;

    // Timestamp is the local time when the message was created
    // by the sender
    google.protobuf.Timestamp timestamp = 3;

    // Identifier of the channel this message is bound for
    string channel_id = 4;

    // An unique identifier that is used end-to-end.
    //  -  set by higher layers such as end user or SDK
    //  -  passed to the endorser (which will check for uniqueness)
    //  -  as the header is passed along unchanged, it will be
    //     be retrieved by the committer (uniqueness check here as well)
    //  -  to be stored in the ledger
    string tx_id = 5;

    // The epoch in which this header was generated, where epoch is defined based on block height
    // Epoch in which the response has been generated. This field identifies a
    // logical window of time. A proposal response is accepted by a peer only if
    // two conditions hold:
    // 1. the epoch specified in the message is the current epoch
    // 2. this message has been only seen once during this epoch (i.e. it hasn't
    //    been replayed)
    uint64 epoch = 6;

    // Extension that may be attached based on the header type
    bytes extension = 7;

    // If mutual TLS is employed, this represents
    // the hash of the client's TLS certificate
    bytes tls_cert_hash = 8;
}
  • SignatureHeader

签名头

message SignatureHeader {
    // Creator of the message, a marshaled msp.SerializedIdentity
    bytes creator = 1;

    // Arbitrary number that may only be used once. Can be used to detect replay attacks.
    bytes nonce = 2;
}
  • Playload

消息内容

message Payload {

    // Header is included to provide identity and prevent replay
    Header header = 1;

    // Data, the encoding of which is defined by the type in the header
    bytes data = 2;
}
  • Envelope

信封用签名包裹有效负载,以便可以对消息进行身份验证

message Envelope {
    // A marshaled Payload
    bytes payload = 1;

    // A signature by the creator specified in the Payload header
    bytes signature = 2;
}
  • Block

区块分区块头,区块内容,区块元数据

message Block {
    BlockHeader header = 1;
    BlockData data = 2;
    BlockMetadata metadata = 3;
}
message BlockHeader {
    uint64 number = 1; // The position in the blockchain
    bytes previous_hash = 2; // The hash of the previous block header
    bytes data_hash = 3; // The hash of the BlockData, by MerkleTree
}
  • BlockData

可以看到它是一个二维字节数组,符合我们的多条交易打包的需求

message BlockData {
    repeated bytes data = 1;
}
message BlockMetadata {
    repeated bytes metadata = 1;
}

ledger.proto

账本的结构比较简单,区块高度,hash

message BlockchainInfo {
    uint64 height = 1;
    bytes currentBlockHash = 2;
    bytes previousBlockHash = 3;

}

主要参考

原文地址:https://www.cnblogs.com/CherryTab/p/13796254.html