hyperledger v0.6部署源码分析

合约部署全过程

本文描述了主节点执行peer node start --peer-chaincodedev合约端执行CORE_CHAINCODE_ID_NAME=mycc CORE_PEER_ADDRESS=0.0.0.0:7051 ./chaincode_example02等待合约被部署调用后,部署端调用peer chaincode deploy -n mycc -c '{"Args": ["init", "a","100", "b", "200"]}'的全过程。使用的共识插件为noops。没有启用安全模式。

步署链码

部署端

peer/chaincode/deploy.go
func chaincodeDeploy(cmd *cobra.Command, args []string) error

chaincodeDeploy deploys the chaincode. On success, the chaincode name (hash) is printed to STDOUT for use by subsequent chaincode-related CLI commands.

func chaincodeDeploy(cmd *cobra.Command, args []string) error {
	spec, err := getChaincodeSpecification(cmd)
	if err != nil {
		return err
	}

	devopsClient, err := common.GetDevopsClient(cmd)
	if err != nil {
		return fmt.Errorf("Error building %s: %s", chainFuncName, err)
	}

	chaincodeDeploymentSpec, err := devopsClient.Deploy(context.Background(), spec)
	if err != nil {
		return fmt.Errorf("Error building %s: %s
", chainFuncName, err)
	}
	logger.Infof("Deploy result: %s", chaincodeDeploymentSpec.ChaincodeSpec)
	fmt.Printf("Deploy chaincode: %s
", chaincodeDeploymentSpec.ChaincodeSpec.ChaincodeID.Name)

	return nil
}

合约入口是chaincodeDeploy,然后分别调用:

  1. getChaincodeSpecification(cmd cobra.Command) (pb.ChaincodeSpec, error)
    返回的ChaincodeSpec指的是:

A transaction is always associated with a chaincode specification which
defines the chaincode and the execution environment such as language and
security context. Currently there is an implementation that uses Golang
for writing chaincode. Other languages may be added in the future.

一个交易始终与一个链码规范相对应,链码规范定义了链码和执行环境,如执行语言和安全上下文。现在有一个关于go语言的链码实现,其他语言会在未来添加

它的定义如下:

//proto/chaincode.pb
type ChaincodeSpec struct {
	Type                 ChaincodeSpec_Type  
	ChaincodeID          *ChaincodeID
	CtorMsg              *ChaincodeInput
	Timeout              int32
	SecureContext        string
	ConfidentialityLevel ConfidentialityLevel
	Metadata             []byte
	Attributes           []string
}

由chaincodeCtorJSON("init", "a","100", "b", "200")与chaincodeAttributesJSON(这里为空)分别构造ChaincodeInput(ChaincodeSpec.CtorMsg)与Attributes,从chaincodeLang获得执行语言放到Type中。

最后:

spec = &pb.ChaincodeSpec{
	Type:        pb.ChaincodeSpec_Type(pb.ChaincodeSpec_Type_value[chaincodeLang]),
	ChaincodeID: &pb.ChaincodeID{Path: chaincodePath, Name: chaincodeName},
	CtorMsg:     input,
	Attributes:  attributes,
}

TODO:安全模式处理

2.构造devopsClient

devopsClient, err := common.GetDevopsClient(cmd)

common.GetDevopsClient会先创建NewPeerClientConnection(),没有错误后创建devopsClient(gRPC自带的方法):

// GetDevopsClient returns a new client connection for this peer
func GetDevopsClient(cmd *cobra.Command) (pb.DevopsClient, error) {
	clientConn, err := peer.NewPeerClientConnection()
	if err != nil {
		return nil, fmt.Errorf("Error trying to connect to local peer: %s", err)
	}
	devopsClient := pb.NewDevopsClient(clientConn)
	return devopsClient, nil
}

3.通过gRPC调用主节点函数

chaincodeDeploymentSpec, err := devopsClient.Deploy(context.Background(), spec)

4.打印chaincodeid

fmt.Printf("Deploy chaincode: %s
",chaincodeDeploymentSpec.ChaincodeSpec.ChaincodeID.Name)

所以最后部署完后会显示:
![W24JGJ$GT8@(FG19K`2WUA.png
虽然现在合约不一定部署好了(只是向vp提交了部署请求,当共识完后才真正部署)

主节点

gRPC注册

要使用gRPC需要在服务端注册实现类,gRPC注册在peer/node/start.go中的
serve(args []string) error

// Register Devops server
serverDevops := core.NewDevopsServer(peerServer)
pb.RegisterDevopsServer(grpcServer, serverDevops)

说明函数的实现在serverDevops中,而serverDevops由core.NewDevopsServer(peerServer)返回*Devops,这里Devops实现了DevopsServer接口,peerServer是*peer.Impl类型的。
DevopsServer描述了链码相关的函数接口:

// Server API for Devops service

type DevopsServer interface {
	// Log in - passed Secret object and returns Response object, where
	// msg is the security context to be used in subsequent invocations
	Login(context.Context, *Secret) (*Response, error)
	// Build the chaincode package.
	Build(context.Context, *ChaincodeSpec) (*ChaincodeDeploymentSpec, error)
	// Deploy the chaincode package to the chain.
	Deploy(context.Context, *ChaincodeSpec) (*ChaincodeDeploymentSpec, error)
	// Invoke chaincode.
	Invoke(context.Context, *ChaincodeInvocationSpec) (*Response, error)
	// Query chaincode.
	Query(context.Context, *ChaincodeInvocationSpec) (*Response, error)
	// Retrieve a TCert.
	EXP_GetApplicationTCert(context.Context, *Secret) (*Response, error)
	// Prepare for performing a TX, which will return a binding that can later be used to sign and then execute a transaction.
	EXP_PrepareForTx(context.Context, *Secret) (*Response, error)
	// Prepare for performing a TX, which will return a binding that can later be used to sign and then execute a transaction.
	EXP_ProduceSigma(context.Context, *SigmaInput) (*Response, error)
	// Execute a transaction with a specific binding
	EXP_ExecuteWithBinding(context.Context, *ExecuteWithBinding) (*Response, error)
}

在Deploy函数,也就是客户端通过gRPC调用的函数里开始合约的部署过程:

主节点部署

core/devops.go
函数 :(d *Devops) Deploy(ctx context.Context, spec pb.ChaincodeSpec) (pb.ChaincodeDeploymentSpec, error)

// Deploy deploys the supplied chaincode image to the validators through a transaction
func (d *Devops) Deploy(ctx context.Context, spec *pb.ChaincodeSpec) (*pb.ChaincodeDeploymentSpec, error) {
	// get the deployment spec
	chaincodeDeploymentSpec, err := d.getChaincodeBytes(ctx, spec)

	if err != nil {
		devopsLogger.Error(fmt.Sprintf("Error deploying chaincode spec: %v

 error: %s", spec, err))
		return nil, err
	}

	// Now create the Transactions message and send to Peer.

	transID := chaincodeDeploymentSpec.ChaincodeSpec.ChaincodeID.Name

	var tx *pb.Transaction
	var sec crypto.Client

	if peer.SecurityEnabled() {
		if devopsLogger.IsEnabledFor(logging.DEBUG) {
			devopsLogger.Debugf("Initializing secure devops using context %s", spec.SecureContext)
		}
		sec, err = crypto.InitClient(spec.SecureContext, nil)
		defer crypto.CloseClient(sec)

		// remove the security context since we are no longer need it down stream
		spec.SecureContext = ""

		if nil != err {
			return nil, err
		}

		if devopsLogger.IsEnabledFor(logging.DEBUG) {
			devopsLogger.Debugf("Creating secure transaction %s", transID)
		}
		tx, err = sec.NewChaincodeDeployTransaction(chaincodeDeploymentSpec, transID, spec.Attributes...)
		if nil != err {
			return nil, err
		}
	} else {
		if devopsLogger.IsEnabledFor(logging.DEBUG) {
			devopsLogger.Debugf("Creating deployment transaction (%s)", transID)
		}
		tx, err = pb.NewChaincodeDeployTransaction(chaincodeDeploymentSpec, transID)
		if err != nil {
			return nil, fmt.Errorf("Error deploying chaincode: %s ", err)
		}
	}

	if devopsLogger.IsEnabledFor(logging.DEBUG) {
		devopsLogger.Debugf("Sending deploy transaction (%s) to validator", tx.Txid)
	}
	resp := d.coord.ExecuteTransaction(tx)
	if resp.Status == pb.Response_FAILURE {
		err = fmt.Errorf(string(resp.Msg))
	}

	return chaincodeDeploymentSpec, err
}

首先调用:

  • func (*Devops) getChaincodeBytes(context context.Context, spec
    pb.ChaincodeSpec) (pb.ChaincodeDeploymentSpec, error)

  先获取模式mode(当前是"dev")
  如果不是dev模式 TODO。。。
  如果是dev模式,省略了CheckSpec等等步骤
  然后直接返回由ChaincodeSpec构造的ChaincodeDeploymentSpec:

//dev下 CodePackage为空
chaincodeDeploymentSpec := &pb.ChaincodeDeploymentSpec{ChaincodeSpec: spec, CodePackage: codePackageBytes}

如果开启安全模式则... TODO
否则构造一个*pb.Transaction(tx):

  • NewChaincodeDeployTransaction(chaincodeDeploymentSpec ChaincodeDeploymentSpec, uuid string) (Transaction, error)

  将transaction.Type 指定为 Transaction_CHAINCODE_DEPLOY
  transaction.Txid 指定为 uuid(mycc)
  然后创建时间戳
  将transaction.ChaincodeID指定为序列化后的cID
  将transaction.Payload指定为chaincodeDeploymentSpec的序列化

数据构造好了后调用Devops.coord.ExecuteTransaction(tx)来执行交易:

core/peer/peer.go

  • ExecuteTransaction(transaction *pb.Transaction) (response
    *pb.Response)
    Devops.coord.ExecuteTransaction(tx)只做一件事就是判断当前节点否是vp,如果是则调用Impl.sendTransactionsToLocalEngine(transaction)否则 随机获取一个vp,将消息发送给它:
//ExecuteTransaction executes transactions decides to do execute in dev or prod mode
func (p *Impl) ExecuteTransaction(transaction *pb.Transaction) (response *pb.Response) {
	if p.isValidator {
		response = p.sendTransactionsToLocalEngine(transaction)
	} else {
		peerAddresses := p.discHelper.GetRandomNodes(1)
		response = p.SendTransactionsToPeer(peerAddresses[0], transaction)
	}
	return response
}

core/peer/peer.go

  • (p *Impl) sendTransactionsToLocalEngine(transaction *pb.Transaction)
    *pb.Response

sendTransactionsToLocalEngine send the transaction to the localengine (This Peer is a validator)

现在皮球来到了p.sendTransactionsToLocalEngine(transaction)

func (p *Impl) sendTransactionsToLocalEngine(transaction *pb.Transaction) *pb.Response {

	peerLogger.Debugf("Marshalling transaction %s to send to local engine", transaction.Type)
	data, err := proto.Marshal(transaction)
	if err != nil {
		return &pb.Response{Status: pb.Response_FAILURE, Msg: []byte(fmt.Sprintf("Error sending transaction to local engine: %s", err))}
	}

	var response *pb.Response
	msg := &pb.Message{Type: pb.Message_CHAIN_TRANSACTION, Payload: data, Timestamp: util.CreateUtcTimestamp()}
	peerLogger.Debugf("Sending message %s with timestamp %v to local engine", msg.Type, msg.Timestamp)
	response = p.engine.ProcessTransactionMsg(msg, transaction)

	return response
}

首先将transaction序列化为data,构造&pb.Message:msg 使用Type: pb.Message_CHAIN_TRANSACTION,Payload:data和当前时间戳构造。调用p.engine.ProcessTransactionMsg(msg, transaction)得到响应。

/consensus/helper/engine.go

  • (eng *EngineImpl) ProcessTransactionMsg(msg *pb.Message, tx
    *pb.Transaction) (response *pb.Response)

    如果是query TODO....
    如果不是query:

Pass the message to the consenter (eg. PBFT) NOTE: Make sure engine has been initialized
TODO, do we want to put these requests into a queue? This will block until
the consenter gets around to handling the message, but it also provides some
natural feedback to the REST API to determine how long it takes to queue messages

转发给共识插件:

err := eng.consenter.RecvMsg(msg, eng.peerEndpoint.ID)

eng.peerEndpoint.ID是配置的节点名(默认:jdoe)
eng.consenter.RecvMsg(msg, eng.peerEndpoint.ID)这里跳转到了noops

  • RecvMsg(msg *pb.Message, senderHandle *pb.PeerID) error

RecvMsg is called for Message_CHAIN_TRANSACTION and Message_CONSENSUS messages.

如果是交易消息,执行i.broadcastConsensusMsg(msg),执行完后它会改变消息类型,被if msg.Type == pb.Message_CONSENSUS捕捉到...
如果是共识消息,将消息放入channel中:

	if msg.Type == pb.Message_CHAIN_TRANSACTION {
		if err := i.broadcastConsensusMsg(msg); nil != err {
			return err
		}
	}
	if msg.Type == pb.Message_CONSENSUS {
		tx, err := i.getTxFromMsg(msg)
		if nil != err {
			return err
		}
		if logger.IsEnabledFor(logging.DEBUG) {
			logger.Debugf("Sending to channel tx uuid: %s", tx.Txid)
		}
		i.channel <- tx
	}

/consensus/noops/noops.go

  • broadcastConsensusMsg(msg *pb.Message) error

首先反序列化消息msg为t

Change the msg type to consensus and broadcast to the network so that
other validators may execute the transaction

改变消息类型为pb.Message_CONSENSUS,将消息广播到网络上。
重新构造payload,调用i.stack.Broadcast(msg,pb.PeerEndpoint_VALIDATOR)转发消息

/consensus/helper/helper.go

  • Broadcast(msg *pb.Message, peerType pb.PeerEndpoint_Type) error
    广播消息,得到错误数组,如果错误数组长度大于等于1则
fmt.Errorf("Couldn't broadcast successfully")

否则返回空

core/peer/peer.go

  • (p *Impl) Broadcast(msg *pb.Message, typ pb.PeerEndpoint_Type)
    []error
// Broadcast broadcast a message to each of the currently registered PeerEndpoints of given type
// Broadcast will broadcast to all registered PeerEndpoints if the type is PeerEndpoint_UNDEFINED
func (p *Impl) Broadcast(msg *pb.Message, typ pb.PeerEndpoint_Type) []error {
	cloneMap := p.cloneHandlerMap(typ)
	errorsFromHandlers := make(chan error, len(cloneMap))
	var bcWG sync.WaitGroup

	start := time.Now()

	for _, msgHandler := range cloneMap {
		bcWG.Add(1)
		go func(msgHandler MessageHandler) {
			defer bcWG.Done()
			host, _ := msgHandler.To()
			t1 := time.Now()
			err := msgHandler.SendMessage(msg)
			if err != nil {
				toPeerEndpoint, _ := msgHandler.To()
				errorsFromHandlers <- fmt.Errorf("Error broadcasting msg (%s) to PeerEndpoint (%s): %s", msg.Type, toPeerEndpoint, err)
			}
			peerLogger.Debugf("Sending %d bytes to %s took %v", len(msg.Payload), host.Address, time.Since(t1))

		}(msgHandler)

	}
	bcWG.Wait()
	close(errorsFromHandlers)
	var returnedErrors []error
	for err := range errorsFromHandlers {
		returnedErrors = append(returnedErrors, err)
	}

	elapsed := time.Since(start)
	peerLogger.Debugf("Broadcast took %v", elapsed)

	return returnedErrors
}

先得到节点集合
通过go routine依次发送消息,然后将各个消息的返回结果组装成一个error数组,进行返回。如果没有错误客户端此时会接收到response。这里就返回到了最初部署代码的那个地方,也就是部署端显示deploy mycc success的地方,并不代表合约就100%部署好了。

主节点处理共识消息

当节点收到共识消息时,先把共识消息放在channel上,当channel满或者设定的时间到的时候才开始出块,由于noops是没有验证这一过程的,所以将所有收到的消息直接进行处理。

结点的共识入口在:
RecvMsg(msg *pb.Message, senderHandle *pb.PeerID) error的if msg.Type == pb.Message_CONSENSUS分支上:

	if msg.Type == pb.Message_CONSENSUS {
		tx, err := i.getTxFromMsg(msg)
		if nil != err {
			return err
		}
		if logger.IsEnabledFor(logging.DEBUG) {
			logger.Debugf("Sending to channel tx uuid: %s", tx.Txid)
		}
		i.channel <- tx
	}

将消息放在了i.channel上

然后共识函数handleChannels会不断检查是否到出块条件

func (i *Noops) handleChannels() {
	// Noops is a singleton object and only exits when peer exits, so we
	// don't need a condition to exit this loop
	for {
		select {
		case tx := <-i.channel:
			if i.canProcessBlock(tx) {
				if logger.IsEnabledFor(logging.DEBUG) {
					logger.Debug("Process block due to size")
				}
				if err := i.processBlock(); nil != err {
					logger.Error(err.Error())
				}
			}
		case <-i.timer.C:
			if logger.IsEnabledFor(logging.DEBUG) {
				logger.Debug("Process block due to time")
			}
			if err := i.processBlock(); nil != err {
				logger.Error(err.Error())
			}
		}
	}
}

这里的i.canProcessBlock(tx),先将交易加入txQ里面,然后判断是否块满,并且每次收到一个交易就重置定时器:

func (i *Noops) canProcessBlock(tx *pb.Transaction) bool {
	// For NOOPS, if we have completed the sync since we last connected,
	// we can assume that we are at the current state; otherwise, we need to
	// wait for the sync process to complete before we can exec the transactions

	// TODO: Ask coordinator if we need to start sync

	i.txQ.append(tx)

	// start timer if we get a tx
	if i.txQ.size() == 1 {
		i.timer.Reset(i.duration)
	}
	return i.txQ.isFull()
}

当区块满或者时间到的时候执行(i *Noops) processBlock() error

出块

总的流程是先调用i.processTransactions()处理交易,处理完后调用getBlockData来得到区块数据,最后调用i.notifyBlockAdded(data, delta)来通知区块变化。

  • processTransactions() error

Grab all transactions from the FIFO queue and run them in order
将所有的交易顺序执行

首先
i.stack.BeginTxBatch(timestamp) TODO..
然后调用i.stack.ExecTxs(timestamp, txarr)来执行所有交易

consensus does not need to understand transaction errors, errors here are actual ledger errors, and often irrecoverable

如果发生错误,进行回滚:

	_, err := i.stack.ExecTxs(timestamp, txarr)

	//consensus does not need to understand transaction errors, errors here are
	//actual ledger errors, and often irrecoverable
	if err != nil {
		logger.Debugf("Rolling back TX batch with timestamp: %v", timestamp)
		i.stack.RollbackTxBatch(timestamp)
		return fmt.Errorf("Fail to execute transactions: %v", err)
	}

否则提交交易:
i.stack.CommitTxBatch(timestamp, nil)

继续看执行交易:
/consensus/helper/helper.go

  • i.stack.ExecTxs(timestamp, txarr)

调用ExecuteTransactions(ctxt context.Context, cname ChainName, xacts []pb.Transaction)执行批量交易
将成功的交易添加到h.curBatch中,得到the candidate global state hash
将执行的错误放在txresults := make([]
pb.TransactionResult, len(txerrs))中,返回res(hash值):

func (h *Helper) ExecTxs(id interface{}, txs []*pb.Transaction) ([]byte, error) {
	// TODO id is currently ignored, fix once the underlying implementation accepts id

	// The secHelper is set during creat ChaincodeSupport, so we don't need this step
	// cxt := context.WithValue(context.Background(), "security", h.coordinator.GetSecHelper())
	// TODO return directly once underlying implementation no longer returns []error

	succeededTxs, res, ccevents, txerrs, err := chaincode.ExecuteTransactions(context.Background(), chaincode.DefaultChain, txs)

	h.curBatch = append(h.curBatch, succeededTxs...) // TODO, remove after issue 579

	//copy errs to result
	txresults := make([]*pb.TransactionResult, len(txerrs))

	//process errors for each transaction
	for i, e := range txerrs {
		//NOTE- it'll be nice if we can have error values. For now success == 0, error == 1
		if txerrs[i] != nil {
			txresults[i] = &pb.TransactionResult{Txid: txs[i].Txid, Error: e.Error(), ErrorCode: 1, ChaincodeEvent: ccevents[i]}
		} else {
			txresults[i] = &pb.TransactionResult{Txid: txs[i].Txid, ChaincodeEvent: ccevents[i]}
		}
	}
	h.curBatchErrs = append(h.curBatchErrs, txresults...) // TODO, remove after issue 579

	return res, err
}

继续看执行交易:
core/chaincode/exectransaction.go

  • ExecuteTransactions(ctxt context.Context, cname ChainName, xacts
    []*pb.Transaction)

ExecuteTransactions - will execute transactions on the array one by one will return an array of errors one for each transaction. If the execution succeeded, array element will be nil. returns []byte of state hash or error

先获取链的名称,当前是default
依次遍历交易,并调用Execute(ctxt, chain, t)进行执行
这里:
ctxt:安全上下文
chain:链名
t:单个transaction
如果执行成功,将它添加到succeededTxs中
否则:

//TODO
sendTxRejectedEvent(xacts[i], txerrs[i].Error())

然后取得*ledger.Ledger,调用lgr.GetTempStateHash()得到hash。

//ExecuteTransactions - will execute transactions on the array one by one
//will return an array of errors one for each transaction. If the execution
//succeeded, array element will be nil. returns []byte of state hash or
//error
func ExecuteTransactions(ctxt context.Context, cname ChainName, xacts []*pb.Transaction) (succeededTXs []*pb.Transaction, stateHash []byte, ccevents []*pb.ChaincodeEvent, txerrs []error, err error) {
	var chain = GetChain(cname)
	if chain == nil {
		// TODO: We should never get here, but otherwise a good reminder to better handle
		panic(fmt.Sprintf("[ExecuteTransactions]Chain %s not found
", cname))
	}

	txerrs = make([]error, len(xacts))
	ccevents = make([]*pb.ChaincodeEvent, len(xacts))
	var succeededTxs = make([]*pb.Transaction, 0)
	for i, t := range xacts {
		_, ccevents[i], txerrs[i] = Execute(ctxt, chain, t)
		if txerrs[i] == nil {
			succeededTxs = append(succeededTxs, t)
		} else {
			sendTxRejectedEvent(xacts[i], txerrs[i].Error())
		}
	}

	var lgr *ledger.Ledger
	lgr, err = ledger.GetLedger()
	if err == nil {
		stateHash, err = lgr.GetTempStateHash()
	}

	return succeededTxs, stateHash, ccevents, txerrs, err
}

ledger调用ledger.state.GetHash()

  • (state *State) GetHash() ([]byte, error)

GetHash computes new state hash if the stateDelta is to be applied.
Recomputes only if stateDelta has changed after most recent call to this function

先判断delta是否改变,然后调用state.stateImpl.PrepareWorkingSet(state.stateDelta)
最后调用state.stateImpl.ComputeCryptoHash()计算sha256的hash

state待跟踪。。。。。。 TODO

继续链码执行
core/chaincode/exectransaction.go

  • Execute(ctxt context.Context, chain *ChaincodeSupport, t
    *pb.Transaction)

进入到if t.Type == pb.Transaction_CHAINCODE_DEPLOY 这个分支
跳转到(chaincodeSupport *ChaincodeSupport) Deploy(context context.Context, t *pb.Transaction),这里就是在执行前后加了一个标记,以方便计算state delta

if t.Type == pb.Transaction_CHAINCODE_DEPLOY {
		_, err := chain.Deploy(ctxt, t)
		if err != nil {
			return nil, nil, fmt.Errorf("Failed to deploy chaincode spec(%s)", err)
		}

		//launch and wait for ready
		markTxBegin(ledger, t)
		_, _, err = chain.Launch(ctxt, t)
		if err != nil {
			markTxFinish(ledger, t, false)
			return nil, nil, fmt.Errorf("%s", err)
		}
		markTxFinish(ledger, t, true)
	}

继续跟下去:
core/chaincode/chaincode_support.go

  • (chaincodeSupport *ChaincodeSupport) Deploy(context context.Context,t pb.Transaction) (pb.ChaincodeDeploymentSpec, error)

Deploy deploys the chaincode if not in development mode where user is running the chaincode.

反序列出cds := &pb.ChaincodeDeploymentSpec{}
如果开关chaincodeSupport.userRunsCC打开的话(当前打开),直接返回空.....

返回后,标记一个新的交易开始markTxBegin(ledger, t)
启动链码 (chaincodeSupport *ChaincodeSupport) Launch(context context.Context, t *pb.Transaction):

  • (chaincodeSupport *ChaincodeSupport) Launch(context context.Context,t pb.Transaction) (pb.ChaincodeID, *pb.ChaincodeInput, error)

    Launch will launch the chaincode if not running (if running return nil) and will wait for handler of the chaincode to get into FSM ready state.

先反序列化,然后看这个链码是不是已经在运行了
然后检查链码是否注册了
调用chaincodeSupport.sendInitOrReady(context, t.Txid, chaincode, initargs, chaincodeSupport.ccStartupTimeout, t, depTx):
  还是先检查链码是否已经启动
  然后调用chrte.handler.initOrReady(txid, initArgs, tx, depTx)来启动容器
  如果容器启动发现错误或者接受消息超时,返回错误,否则返回空

标记一个交易结束
回到(i *Noops) processTransactions()来,当交易成功时,对交易尝试进行提交
调用i.stack.CommitTxBatch(timestamp, nil)

/consensus/helper/helper.go

  • (h Helper) CommitTxBatch(id interface{}, metadata []byte)
    (
    pb.Block, error)

CommitTxBatch gets invoked when the current transaction-batch needs to be committed. This function returns successfully iff the
transactions details and state changes (that may have happened
during execution of this transaction-batch) have been committed to permanent storage.

先获取ledger,然后调用ledger.CommitTxBatch(id, h.curBatch, h.curBatchErrs, metadata)

core/ledger/ledger.go

  • CommitTxBatch(id interface{}, transactions []protos.Transaction,
    transactionResults []
    protos.TransactionResult, metadata []byte)
    error

    CommitTxBatch - gets invoked when the current transaction-batch needs to be committed
    This function returns successfully iff the transactions details and state changes (that
    may have happened during execution of this transaction-batch) have been committed to permanent storage

先获取state的hash
然后得到数据库的writebatch,这里的元数据为空不知道为什么:

consensusMetadata - Consensus modules may optionally store any)

然后构造了chaincodeeven ccEvents
> We need the index so we can map the chaincode event to the transaction that generated it.
Hence need an entry for cc event even if one
wasn't generated for the transaction. We cannot
use a nil cc event as protobuf does not like
elements of a repeated array to be nil.

> We should discard empty events without chaincode
 ID when sending out events.

将ccEvents放入block.NonHashData
调用addPersistenceChangesForNewBlock(ctx context.Context,block *protos.Block, stateHash []byte, writeBatch *gorocksdb.WriteBatch)

core/ledger/blockchain.go

  • addPersistenceChangesForNewBlock(ctx context.Context, block
    *protos.Block, stateHash []byte, writeBatch *gorocksdb.WriteBatch) (uint64, error)

  将区块hash连上(添加previousBlockHash)
  然后在block.NonHashData上添加时间戳
  当前区块号码设置为blockNumber := blockchain.size
  写入了两组cf到db
  1.(区块编号,区块内容)
  2.(blockCountKey(常量), 区块编号+1)
  然后创建数据索引:blockchain.indexer.createIndexes(block, blockNumber, blockHash, writeBatch):
    先得到openchainDB.IndexesCF
    然后写入(blockhash,blocknumber)
    遍历交易,写入TxID -> (blockNumber+indexOfBlockTx)
    然后写入了encodeAddressBlockNumCompositeKey(address,     blockNumber), encodeListTxIndexes(txsIndexes))(未实现)
    
接着调用ledger.state.AddChangesForPersistence(newBlockNumber, writeBatch):

AddChangesForPersistence state implementation to add all the key-value pair that it needs
to persist for committing the stateDelta (passed in PrepareWorkingSet method) to DB.
In addition to the information in the StateDelta, the implementation may also want to
persist intermediate results for faster crypto-hash computation

  拿到delta改变量serializedStateDelta,写入StateDeltaCF (blockNumber, serializedStateDelta)
  

Control the number state deltas that are maintained. This takes additional
disk space, but allow the state to be rolled backwards and forwards
without the need to replay transactions.
deltaHistorySize: 500

  这里做了blockNumber >= state.historyStateDeltaSize判断,deltaHistorySize=500表示最多装500个delta,然后现在空间不够了
  于是就计算出了一个最后要删除的那个blocknumber,类似于环形队....

接着调用sendProducerBlockEvent(block *protos.Block):
> Remove payload from deploy transactions. This is done to make block
events more lightweight as the payload for these types of transactions
can be very large.

  先删除deploy的ChaincodeDeploymentSpec的CodePackage
  然后调用producer.Send(producer.CreateBlockEvent(block))

提交成功后获取区块大小(索引从0开始)
然后获取最后一个区快并返回

调用data, delta, err = i.getBlockData():
  取得最后一个区块data(重复又取了一次。。)
  取得最后一个delta

开启 go runtime i.notifyBlockAdded(data, delta)
  遍历block.Transactions,去掉Payload以减少大小
  Broadcasting Message_SYNC_BLOCK_ADDED to non-validators
  将block和delta包装成BlockState发送给NVP
  

Broadcast SYNC_BLOCK_ADDED to connected NVPs
VPs already know about this newly added block since they participate
in the execution. That is, they can compare their current block with
the network block

构造消息,调用i.stack.Broadcast(msg, pb.PeerEndpoint_NON_VALIDATOR)进行广播

原文地址:https://www.cnblogs.com/xiaodeshan/p/7813130.html