以太坊源码解读(11)广播和同步 同步模块

以太坊网络中不断重复着广播和同步,这样才能保证对以太坊全网的规范链的维护和统一。

一、广播

【广播】主要是广播区块、区块hash和交易,分别通过ProtocolManager.BroadcastBlock和ProtocolManager.BroadcastTxs两个方法执行。广播有下面几种情形:

  • 1、minedBroadcastLoop()监听到新区块事件后,把新区块和区块hash分别广播出去;
  • 2、从远程节点同步完成后,将CurrentBlock广播出去,此时广播的是区块hash;
  • 3、txBlockcastLoop()监听到区块池的新增交易事件时会广播交易;

ProtocolManager.BroadcastBlock

  • 1、筛选p2p节点中不含当前区块的节点;
  • 2、如果propagate为true,将区块block和总难度td发送给一部分节点,节点数为根号n;
  • 3、如果propagate为false,将区块的hash发送给所有的节点。
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
	hash := block.Hash()
	peers := pm.peers.PeersWithoutBlock(hash)

	// If propagation is requested, send to a subset of the peer
	if propagate {
		// Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
		var td *big.Int
		if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
			td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
		} else {
			log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
			return
		}
		// Send the block to a subset of our peers
		transfer := peers[:int(math.Sqrt(float64(len(peers))))]
		for _, peer := range transfer {
			peer.AsyncSendNewBlock(block, td)
		}
		log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
		return
	}
	// Otherwise if the block is indeed in out own chain, announce it
	if pm.blockchain.HasBlock(hash, block.NumberU64()) {
		for _, peer := range peers {
			peer.AsyncSendNewBlockHash(block)
		}
		log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
	}
}

BroadcastBlock的调用时机:

1、挖矿成功:

Ethereum.Start() —> ProtocolManager.Start()
                 —> ProtocolManager.minedBroadcastLoop
                 —> ProtocolManager.BroadcastBlock 

2、同步新区块

ProtocolManager.synchronise —> ProtocolManager.BroadcastBlock

ProtocolManager.BroadcastTxs

func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
	var txset = make(map[*peer]types.Transactions)

	// Broadcast transactions to a batch of peers not knowing about it
	for _, tx := range txs {
		peers := pm.peers.PeersWithoutTx(tx.Hash())
		for _, peer := range peers {
			txset[peer] = append(txset[peer], tx)
		}
		log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
	}
	// FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
	for peer, txs := range txset {
		peer.AsyncSendTransactions(txs)
	}
}

该方法将针对每一个交易,准确的将它发送到所有没有该交易的节点。

BroadcastTxs的调用时机:接收到交易池添加新交易的事件

Ethereum.Start() —> ProtocolManager.Start
                 —> ProtocolManager.txBroadcastLoop
                 —> ProtocolManager.BraodcastTxs

二、同步

ProtocolManager.synchronise是负责同步的函数:

  • 1、确保当前区块的高度小于拥有的最高区块高度的p2p节点的高度;
  • 2、获取同步模式:fastSync或fullSync;
  • 3、使用该同步模式进行一次同步;
  • 4、同步完成后打开交易处理阀门atomic.StoreUint32(&pm.acceptTxs, 1),允许以太坊节点接受其他节点广播的交易;
  • 5、将CurrentBlock广播出去。
// synchronise tries to sync up our local block chain with a remote peer.
func (pm *ProtocolManager) synchronise(peer *peer) {
	// Short circuit if no peers are available
	if peer == nil {
		return
	}
	// Make sure the peer's TD is higher than our own
	currentBlock := pm.blockchain.CurrentBlock()
	td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())

	pHead, pTd := peer.Head()
	if pTd.Cmp(td) <= 0 {
		return
	}
	// Otherwise try to sync with the downloader
	mode := downloader.FullSync
	if atomic.LoadUint32(&pm.fastSync) == 1 {
		// Fast sync was explicitly requested, and explicitly granted
		mode = downloader.FastSync
	} else if currentBlock.NumberU64() == 0 && pm.blockchain.CurrentFastBlock().NumberU64() > 0 {
		// The database seems empty as the current block is the genesis. Yet the fast
		// block is ahead, so fast sync was enabled for this node at a certain point.
		// The only scenario where this can happen is if the user manually (or via a
		// bad block) rolled back a fast sync node below the sync point. In this case
		// however it's safe to reenable fast sync.
		atomic.StoreUint32(&pm.fastSync, 1)
		mode = downloader.FastSync
	}

	if mode == downloader.FastSync {
		// Make sure the peer's total difficulty we are synchronizing is higher.
		if pm.blockchain.GetTdByHash(pm.blockchain.CurrentFastBlock().Hash()).Cmp(pTd) >= 0 {
			return
		}
	}

	// Run the sync cycle, and disable fast sync if we've went past the pivot block
	if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
		return
	}
	if atomic.LoadUint32(&pm.fastSync) == 1 {
		log.Info("Fast sync complete, auto disabling")
		atomic.StoreUint32(&pm.fastSync, 0)
	}
	atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done
	if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 {
		// We've completed a sync cycle, notify all peers of new state. This path is
		// essential in star-topology networks where a gateway node needs to notify
		// all its out-of-date peers of the availability of a new block. This failure
		// scenario will most often crop up in private and hackathon networks with
		// degenerate connectivity, but it should be healthy for the mainnet too to
		// more reliably update peers or the local TD state.
		go pm.BroadcastBlock(head, false)
	}
}

ProtocolManger负责区块链的同步,其中有两个工具用于同步区块链:【Downloader】【Fetcher】

Fetcher:简单来说,fetcher模块收集其他Peer通知它的信息:NewBlockMsg或NewBlockHashMsg。根据通知的消息,获取完整的区块,然后传递给eth模块把区块插入区块链。如果是完整区块,就可以传递给eth插入区块,如果只有区块Hash,则需要从其他的Peer获取此完整的区块,然后再传递给eth插入区块。

Downloader:负责区块链同步的主要工作,在最开始的同步中从远端节点下载区块链信息,上面的ProtocolManager.synchronise就是用downloader从其他节点下载区块的。

Downloader同步有两种模式,分别是fastSync和fullSync模式。

fastSync模式:是一种不需要执行交易,而是直接下载状态和收据的同步方式,用于使本地链快速的跟上规范链的更新速度。

它的总体逻辑是:

  • 1、从k桶中拥有最高td的节点,同步所有区块头;
  • 2、同步了区块头后,从其他节点同步状态,收据和交易;
  • 3、到达(距离最高区块高度64)时,关闭fastSync,采用fullSync模式;
  • 4、同步完成后,广播自己的头区块。

fullSync模式:验证所有的交易并执行交易,在本地生成状态和收据。这种同步方式缓慢且消耗CPU和磁盘。

Fetcher是基于广播的同步工具,所以Fetcher必然是通过protocolManager的handleMsg()来调用的。下面就是handleMsg中收到NewBlockHashesM ...