以太坊源码分析 插入新区块流程 insertChain

BlockChain 模块实现了插入一个新区块的流程。

1. 新区块来源

新区块有两种来源:

  • 本节点挖矿成功,要调用BlockChain模块向本地区块链上插入
  • 从网络上的其他节点收到一个区块,调用BlockChain模块插入本地区块链。

2. insertChain 函数

将一个区块插入区块链是调用 BlockChain 的 insertChain 函数。insertChain 函数功能:

  • 验证每一个区块的 header;
  • 验证每一个区块的 body;
  • 处理验证 header和 body所产生的错误;
  • 若是验证经过,对待插入区块的交易状态进行验证,不然退出;
  • 若是验证经过,调用 WriteBlockWithState 将第 n 个区块插入区块链,写入数据库,而后写入规范链,同时处理分叉问题。

inertChain 函数是将一组区块批量插入区块链,inesrtChain 函数会检查这一组区块是否是首尾相接。检查无误后会校验区块头和区块体,然后还需要校验状态是不是正确。最后将区块插入区块链,需要注意的是能插入区块链不一定能插入规范链,在插入的时候会判断是否能插入规范链,如果不能插入规范链就是一条分叉。

func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*types.Log, error) {
 // 如果传入的区块切片上长度为空,直接返回
if len(chain) == 0 {
     return 0, nil, nil, nil
}
//1. 确保这组区块的是首尾相接的,并且区块号连续递增,如果不是则直接返回
// Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(chain); i++ {
     if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() {
         // Chain broke ancestry, log a message (programming error) and skip insertion
         log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(),
             "parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash())
         return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x…], item %d is #%d [%x…] (parent [%x…])", i-1, chain[i-1].NumberU64(),
             chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4])
     }
}
// Pre-checks passed, start the full block imports
bc.wg.Add(1)
defer bc.wg.Done()
 
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
 
// A queued approach to delivering events. This is generally
// faster than direct delivery and requires much less mutex
// acquiring.
var (
     stats         = insertStats{startTime: mclock.Now()}
     events        = make([]interface{}, 0, len(chain))
     lastCanon     *types.Block
     coalescedLogs []*types.Log
)
// Start the parallel header verifier
headers := make([]*types.Header, len(chain))
seals := make([]bool, len(chain))
//2. 并行验证这组区块的区块头
for i, block := range chain {
     headers[i] = block.Header()
     seals[i] = true
}
abort, results := bc.engine.VerifyHeaders(bc, headers, seals)
defer close(abort)
 
// Start a parallel signature recovery
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
 
// Iterate over the blocks and insert when the verifier permits
//3. 验证这组区块的区块体
for i, block := range chain {
     // If the chain is terminating, stop processing blocks
     // 如果有人打断验证直接返回
     if atomic.LoadInt32(&bc.procInterrupt) == 1 {
         log.Debug("Premature abort during blocks processing")
         break
     }
     // 如果这个区块在bad区块列表里,说明这个区块不能插入区块链,直接返回
     // If the header is a banned one, straight out abort
     if BadHashes[block.Hash()] {
         bc.reportBlock(block, nil, ErrBlacklistedHash)
         return i, events, coalescedLogs, ErrBlacklistedHash
     }
     // Wait for the block's verification to complete
     bstart := time.Now()
     //4. 接收区块头的验证结果 
     err := <-results
     if err == nil {
         // 如果区块头没有问题,则验证区块体
         err = bc.Validator().ValidateBody(block)
     }
     //5. 处理区块头或区块体的验证错误
     switch {
     case err == ErrKnownBlock:
         // Block and state both already known. However if the current block is below
         // this number we did a rollback and we should reimport it nonetheless.
         //6. 待插入的区块在数据库中已经存在,如果当前的区块链的头区块高度比待插入的区块大,则直接忽略这个区块,否则继续向下执行插入流程
        if bc.CurrentBlock().NumberU64() >= block.NumberU64() {
             stats.ignored++
             continue
         }
 
     case err == consensus.ErrFutureBlock:
         //7. 如果待插入区块是一个未来区块(大于当前时间15秒),则判断是否是小于30s,如果是则将区块放入futureBlocks列表
         // Allow up to MaxFuture second in the future blocks. If this limit is exceeded
         // the chain is discarded and processed at a later time if given.
         max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks)
         if block.Time().Cmp(max) > 0 {
             return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max)
         }
         bc.futureBlocks.Add(block.Hash(), block)
         stats.queued++
         continue
 
     case err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()):
         //8. 数据库里面找不到这个区块的父亲区块,并且未来待处理区块缓冲里面有它的父区块,就将它放入到futureBlocks列表中
         bc.futureBlocks.Add(block.Hash(), block)
         stats.queued++
         continue
 
     case err == consensus.ErrPrunedAncestor:
         //9. 如果待插入区块的祖先是一个精简分支(所谓精简分支就是一条分叉,只有区块头和区块体,但是没有状态),看这个区块的总难度是否大于本地规范链头区块的总难度,如果大于,则将这条精简分支上的所有没有状态的区块重新做一次插入,插入的过程会产生状态,并将这条精简分支升级为规范链,否则如果不大于则将这个区块不带状态的情况下插入这条精简分支。
         // Block competing with the canonical chain, store in the db, but don't process
         // until the competitor TD goes above the canonical TD
         currentBlock := bc.CurrentBlock()
         localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
         externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty())
         if localTd.Cmp(externTd) > 0 {
             if err = bc.WriteBlockWithoutState(block, externTd); err != nil {
                 return i, events, coalescedLogs, err
             }
             continue
         }
         // Competitor chain beat canonical, gather all blocks from the common ancestor
         var winner []*types.Block
 
         parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
         for !bc.HasState(parent.Root()) {
             winner = append(winner, parent)
             parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1)
         }
         // 将精简分支上的区块收集好后,做一个倒序,因为插入的时候要安从小到大的顺序插入
         for j := 0; j < len(winner)/2; j++ {
             winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j]
         }
         // Import all the pruned blocks to make the state available
         bc.chainmu.Unlock()
         _, evs, logs, err := bc.insertChain(winner)
         bc.chainmu.Lock()
         events, coalescedLogs = evs, logs
 
         if err != nil {
             return i, events, coalescedLogs, err
         }
 
     case err != nil:
         // 无法处理的错误,直接返回
         bc.reportBlock(block, nil, err)
         return i, events, coalescedLogs, err
     }
     //10. 验证区块的状态
     // Create a new statedb using the parent block and report an
     // error if it fails.
     var parent *types.Block
     if i == 0 {
         parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
     } else {
         parent = chain[i-1]
     }
     state, err := state.New(parent.Root(), bc.stateCache)
     if err != nil {
         return i, events, coalescedLogs, err
     }
     // Process block using the parent state as reference point.
     receipts, logs, usedGas, err := bc.processor.Process(block, state, bc.vmConfig)
     if err != nil {
         bc.reportBlock(block, receipts, err)
         return i, events, coalescedLogs, err
     }
     // Validate the state using the default validator
     err = bc.Validator().ValidateState(block, parent, state, receipts, usedGas)
     if err != nil {
         bc.reportBlock(block, receipts, err)
         return i, events, coalescedLogs, err
     }
     proctime := time.Since(bstart)
     //11. 调用WriteBlockWithState将区块写入区块链,返回值如果是CanonStatTy,表示写入了规范链,如果是SideStatTy表示写入了分叉
     // Write the block to the chain and get the status.
     status, err := bc.WriteBlockWithState(block, receipts, state)
     if err != nil {
         return i, events, coalescedLogs, err
     }
     switch status {
     case CanonStatTy:
         log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()),
             "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)))
 
         coalescedLogs = append(coalescedLogs, logs...)
         blockInsertTimer.UpdateSince(bstart)
         events = append(events, ChainEvent{block, block.Hash(), logs})
         lastCanon = block
 
         // Only count canonical blocks for GC processing time
         bc.gcproc += proctime
 
     case SideStatTy:
         log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed",
             common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()))
 
         blockInsertTimer.UpdateSince(bstart)
         events = append(events, ChainSideEvent{block})
     }
     stats.processed++
     stats.usedGas += usedGas
 
     cache, _ := bc.stateCache.TrieDB().Size()
     stats.report(chain, i, cache)
}
// Append a single chain head event if we've progressed the chain
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
     events = append(events, ChainHeadEvent{lastCanon})
}
// 返回插入过程中的事件,BlockChain会将这个事件post出去,让其他监听的模块可以获知这些事件
return 0, events, coalescedLogs, nil
}

上面代码的第2步,使用了bc.engine.VerifyHeaders函数去验证区块的区块头,这个函数传入了两个切片:

headers := make([]*types.Header, len(chain))
seals := make([]bool, len(chain))

这两个切片的长度是相同的,第一个切片传入的是这组区块的区块头 ,第二组切片指定要验证哪些区块头,如果某个区块要验证,则在seals切片上对应位置置为true。我们可以看到代码里面把所有位置都置为了true,表示要验证headers切片里的所有区块头。

for i, block := range chain {
     headers[i] = block.Header()
     seals[i] = true
}

另外bc.engine.VerifyHeaders是异步检查,调完之后会直接返回,继续往下执行。它会返回两个管道abort和result,第一个管道可以命令VerifyHeaders函数停止验证,第二个管道是可以从其中等待验证结果,VerifyHeaders函数会保证验证结果返回的顺序和我们传入的headers切片的顺序相同。所以第3步验证区块头的时候重新用for循环遍历chain切片时,for循环的第一次执行,result管道返回的必然是chain中第一个区块的区块头的验证结果。

第6步处理ErrFutureBlock错误返回值时,如果待插入的区块在数据库中已经存在,说明它是有可能是一个分叉,如果它的区块高度比当前规范链的头区块要大,那么就重新在插入一下,因为有可能这个区块的所在的分叉的总难度比当前规范链大,如果真是这样的话,需要把这条分叉升级为规范链,重新插入的过程会检查是否是规范链,如果是就会升级。

第7步,当收到一个区块它的时间戳大于当前时间15秒,小于30秒,节点不会将这个区块丢弃, 而是将这个区块放入到futureBlocks列表,我们在上一章节讲到NewBlockChain函数最后会启动一个go程定时来检查这些区块能不能插入到区块链,如果能插,就会再次调用insertChain来插入。

第 10步是验证区块的状态,验证区块的状态流程是基于父区块的世界状态去执行待插入区块的里的所有交易,生成新的世界状态,然后调用bc.Validator().ValidateState去验证新的状态是不是正确,其中重要的一个环节就是比较新生成的状态树树根和区块头中的状态树树根是否相同。

第11步调用WriteBlockWithState将区块写入区块链,这个函数会去判断这个区块写入的是一个分叉还是规范链,当然这个插入的区块有可能将一个原来的分叉升级为规范链,原来的规范链变成一条分叉。

3. 总结

本章节主要介绍insertChain的流程,从上面的分析可以看出insertChain函数里面主要是实现了对区块的校验,包括区块头和区块体,校验通过之后会调用WriteBlockWithState函数将区块写入区块链,而真正写入的过程在WriteBlockWithState中。下一章节我们分析WriteBlockWithState函数流程。

1. blockchain 关键元素db:持久化到底层数据储存,即 leveldb;genesisBlock:创始区块;currentBlock:当前区块,blockchain 中并不是储存链所有的bl ...