div#pop_ad { opacity: 0; }

type TxPoolConfig struct { Locals []common.Address // 本地账户地址存放 NoLocals bool // 是否开启本地交易机制 Journal string // 本地交易存放路径 Rejournal time.Duration // 持久化本地交易的间隔 PriceLimit uint64 // 价格超出比例,若想覆盖一笔交易的时候,若价格上涨比例达不到要求,那么不能覆盖 PriceBump uint64 // 替换现有交易的最低价格涨幅百分比(一次) AccountSlots uint64 // 每个账户的可执行交易限制 GlobalSlots uint64 // 全部账户最大可执行交易 AccountQueue uint64 // 单个账户不可执行的交易限制 GlobalQueue uint64 // 全部账户最大非执行交易限制 Lifetime time.Duration // 一个账户在queue中的交易可以存活的时间}type TxPool struct { config TxPoolConfig // 交易池配置 chainconfig *params.ChainConfig // 区块链配置 chain blockChain // 定义blockchain接口 gasPrice *big.Int txFeed event.Feed //时间流 scope event.SubscriptionScope // 订阅范围 signer types.Signer //签名 mu sync.RWMutex istanbul bool // Fork indicator whether we are in the istanbul stage. currentState *state.StateDB // 当前头区块对应的状态 pendingNonces *txNoncer // Pending state tracking virtual nonces currentMaxGas uint64 // Current gas limit for transaction caps locals *accountSet // Set of local transaction to exempt from eviction rules journal *txJournal // Journal of local transaction to back up to disk pending map[common.Address]*txList // All currently processable transactions queue map[common.Address]*txList // Queued but non-processable transactions beats map[common.Address]time.Time // Last heartbeat from each known account all *txLookup // All transactions to allow lookups priced *txPricedList // All transactions sorted by price chainHeadCh chan ChainHeadEvent chainHeadSub event.Subscription reqResetCh chan *txpoolResetRequest reqPromoteCh chan *accountSet queueTxEventCh chan *types.Transaction reorgDoneCh chan chan struct{} reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop wg sync.WaitGroup // tracks loop, scheduleReorgLoop}txpool初始化config = (&config).sanitize()
pool.locals = newAccountSet(pool.signer)
pool.locals.add(addr)
pool.reset(nil, chain.CurrentBlock().Header())
pool.priced = newTxPricedList(pool.all)
if !config.NoLocals && config.Journal != { pool.journal = newTxJournal(config.Journal) if err := pool.journal.load(pool.AddLocals); err != nil { log.Warn( Failed to load transaction journal , err , err) } if err := pool.journal.rotate(pool.local()); err != nil { log.Warn( Failed to rotate transaction journal , err , err) } }pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
go pool.loop()

if pool.all.Get(tx.Hash()) != nil { errs[i] = fmt.Errorf( known transaction: %x , tx.Hash()) knownTxMeter.Mark(1) continue }newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
进入到addTxsLocked函数中:replaced, err := pool.add(tx, local)
if pool.all.Get(hash) != nil { log.Trace( Discarding already known transaction , hash , hash) knownTxMeter.Mark(1) return false, fmt.Errorf( known transaction: %x , hash) }validateTx: 主要做了以下几件事- 交易大小不能超过32kb- 交易金额不能为负- 交易gas值不能超出当前交易池设定的gaslimit- 交易签名必须正确- 如果交易为远程交易,则需验证其gasprice是否小于交易池gasprice最小值,如果是本地,优先打包,不管gasprice- 判断当前交易nonce值是否过低- 交易所需花费的转帐手续费是否大于帐户余额 cost == V + GP * GL- 判断交易花费gas是否小于其预估花费gas
if uint64(pool.all.Count()) = pool.config.GlobalSlots+pool.config.GlobalQueue { if !local && pool.priced.Underpriced(tx, pool.locals) { ... } drop := pool.priced.Discard(pool.all.Count()-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals) for _, tx := range drop { ... pool.removeTx(tx.Hash(), false) } } if list := pool.pending[from]; list != nil && list.Overlaps(tx) { // Nonce already pending, check if required price bump is met inserted, old := list.Add(tx, pool.config.PriceBump) if !inserted { pendingDiscardMeter.Mark(1) return false, ErrReplaceUnderpriced } // New transaction is better, replace old one if old != nil { pool.all.Remove(old.Hash()) pool.priced.Removed(1) pendingReplaceMeter.Mark(1) } pool.all.Add(tx) pool.priced.Put(tx) pool.journalTx(from, tx) pool.queueTxEvent(tx) log.Trace( Pooled new executable transaction , hash , hash, from , from, to , tx.To()) return old != nil, nil } // New transaction isn t replacing a pending one, push into queue replaced, err = pool.enqueueTx(hash, tx)提升交易主要把交易从queue扔到pending中,我们在接下来的里面重点讲
done := pool.requestPromoteExecutables(dirtyAddrs)交易升级

forwards := list.Forward(pool.currentState.GetNonce(addr)) for _, tx := range forwards { hash := tx.Hash() pool.all.Remove(hash) log.Trace( Removed old queued transaction , hash , hash) }drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) for _, tx := range drops { hash := tx.Hash() pool.all.Remove(hash) log.Trace( Removed unpayable queued transaction , hash , hash) }readies := list.Ready(pool.pendingNonces.get(addr)) for _, tx := range readies { hash := tx.Hash() if pool.promoteTx(addr, hash, tx) { log.Trace( Promoting queued transaction , hash , hash) promoted = append(promoted, tx) } }inserted, old := list.Add(tx, pool.config.PriceBump) if !inserted { // An older transaction was better, discard this // 老的交易更好,删除这个交易 pool.all.Remove(hash) pool.priced.Removed(1) pendingDiscardMeter.Mark(1) return false } // Otherwise discard any previous transaction and mark this // 现在这个交易更好,删除旧的交易 if old != nil { pool.all.Remove(old.Hash()) pool.priced.Removed(1) pendingReplaceMeter.Mark(1) } else { ... }if !pool.locals.contains(addr) { caps = list.Cap(int(pool.config.AccountQueue)) for _, tx := range caps { hash := tx.Hash() pool.all.Remove(hash) ... }if list.Empty() { delete(pool.queue, addr) }for addr, list := range pool.pending { ...}olds := list.Forward(nonce) for _, tx := range olds { hash := tx.Hash() pool.all.Remove(hash) log.Trace( Removed old pending transaction , hash , hash) }drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) for _, tx := range drops { hash := tx.Hash() log.Trace( Removed unpayable pending transaction , hash , hash) pool.all.Remove(hash) } pool.priced.Removed(len(olds) + len(drops)) pendingNofundsMeter.Mark(int64(len(drops))) for _, tx := range invalids { hash := tx.Hash() log.Trace( Demoting pending transaction , hash , hash) pool.enqueueTx(hash, tx) }if list.Len() 0 && list.txs.Get(nonce) == nil { gapped := list.Cap(0) for _, tx := range gapped { hash := tx.Hash() log.Error( Demoting invalidated transaction , hash , hash) pool.enqueueTx(hash, tx) } pendingGauge.Dec(int64(len(gapped))) }
if oldHead != nil && oldHead.Hash() != newHead.ParentHash {}if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth 64 { log.Debug( Skipping deep transaction reorg , depth , depth)}for rem.NumberU64() add.NumberU64() { discarded = append(discarded, rem.Transactions()...) if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { log.Error( Unrooted old chain seen by tx pool , block , oldHead.Number, hash , oldHead.Hash()) return } }for add.NumberU64() rem.NumberU64() { included = append(included, add.Transactions()...) if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { log.Error( Unrooted new chain seen by tx pool , block , newHead.Number, hash , newHead.Hash()) return } }for rem.Hash() != add.Hash() { discarded = append(discarded, rem.Transactions()...) if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { log.Error( Unrooted old chain seen by tx pool , block , oldHead.Number, hash , oldHead.Hash()) return } included = append(included, add.Transactions()...) if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { log.Error( Unrooted new chain seen by tx pool , block , newHead.Number, hash , newHead.Hash()) return } } statedb, err := pool.chain.StateAt(newHead.Root) if err != nil { log.Error( Failed to reset txpool state , err , err) return } pool.currentState = statedb pool.pendingNonces = newTxNoncer(statedb) pool.currentMaxGas = newHead.GasLimitsenderCacher.recover(pool.signer, reinject)pool.addTxsLocked(reinject, false)
添加新手交流群:币种分析、每日早晚盘分析
添加助理微信,一对一亲自指导:YoYo8abc