Skip to content

refactor: cleanup scroll_worker.go #823

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 9 additions & 78 deletions miner/scroll_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,6 @@ const (

// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
chainHeadChanSize = 10

// chainSideChanSize is the size of channel listening to ChainSideEvent.
chainSideChanSize = 10

// minRecommitInterval is the minimal time interval to recreate the mining block with
// any newly arrived transactions.
minRecommitInterval = 1 * time.Second
)

var (
Expand All @@ -77,12 +70,6 @@ var (
commitGasCounter = metrics.NewRegisteredCounter("miner/commit_gas", nil)
)

// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
type newWorkReq struct {
noempty bool
timestamp int64
}

// prioritizedTransaction represents a single transaction that
// should be processed as the first transaction in the next block.
type prioritizedTransaction struct {
Expand All @@ -108,13 +95,10 @@ type worker struct {
txsSub event.Subscription
chainHeadCh chan core.ChainHeadEvent
chainHeadSub event.Subscription
chainSideCh chan core.ChainSideEvent
chainSideSub event.Subscription

// Channels
newWorkCh chan *newWorkReq
startCh chan struct{}
exitCh chan struct{}
startCh chan struct{}
exitCh chan struct{}

wg sync.WaitGroup

Expand All @@ -131,9 +115,8 @@ type worker struct {
snapshotState *state.StateDB

// atomic status counters
running int32 // The indicator whether the consensus engine is running or not.
newTxs int32 // New arrival transaction count since last sealing work submitting.
newL1Msgs int32 // New arrival L1 message count since last sealing work submitting.
running int32 // The indicator whether the consensus engine is running or not.
newTxs int32 // New arrival transaction count since last sealing work submitting.

// noempty is the flag used to control whether the feature of pre-seal empty
// block is enabled. The default value is false(pre-seal is enabled by default).
Expand Down Expand Up @@ -163,8 +146,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
isLocalBlock: isLocalBlock,
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
newWorkCh: make(chan *newWorkReq),
exitCh: make(chan struct{}),
startCh: make(chan struct{}, 1),
circuitCapacityChecker: circuitcapacitychecker.NewCircuitCapacityChecker(true),
Expand All @@ -176,24 +157,15 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus

// Subscribe events for blockchain
worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)

// Sanitize recommit interval if the user-specified one is too short.
recommit := worker.config.Recommit
if recommit < minRecommitInterval {
log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
recommit = minRecommitInterval
}

// Sanitize account fetch limit.
if worker.config.MaxAccountsNum == 0 {
log.Warn("Sanitizing miner account fetch limit", "provided", worker.config.MaxAccountsNum, "updated", math.MaxInt)
worker.config.MaxAccountsNum = math.MaxInt
}

worker.wg.Add(2)
worker.wg.Add(1)
go worker.mainLoop()
go worker.newWorkLoop(recommit)

// Submit first work to initialize pending state.
if init {
Expand Down Expand Up @@ -289,44 +261,11 @@ func (w *worker) close() {
w.wg.Wait()
}

// newWorkLoop is a standalone goroutine to submit new mining work upon received events.
func (w *worker) newWorkLoop(recommit time.Duration) {
defer w.wg.Done()
var (
timestamp int64 // timestamp for each round of mining.
)

// commit aborts in-flight transaction execution with given signal and resubmits a new one.
commit := func(noempty bool) {
select {
case w.newWorkCh <- &newWorkReq{noempty: noempty, timestamp: timestamp}:
case <-w.exitCh:
return
}
atomic.StoreInt32(&w.newTxs, 0)
atomic.StoreInt32(&w.newL1Msgs, 0)
}

for {
select {
case <-w.startCh:
timestamp = time.Now().Unix()
commit(false)
case <-w.chainHeadCh:
timestamp = time.Now().Unix()
commit(true)
case <-w.exitCh:
return
}
}
}

// mainLoop is a standalone goroutine to regenerate the sealing task based on the received event.
func (w *worker) mainLoop() {
defer w.wg.Done()
defer w.txsSub.Unsubscribe()
defer w.chainHeadSub.Unsubscribe()
defer w.chainSideSub.Unsubscribe()

deadCh := make(chan *pipeline.Result)
pipelineResultCh := func() <-chan *pipeline.Result {
Expand All @@ -338,8 +277,10 @@ func (w *worker) mainLoop() {

for {
select {
case req := <-w.newWorkCh:
w.startNewPipeline(req.timestamp)
case <-w.startCh:
w.startNewPipeline(time.Now().Unix())
case <-w.chainHeadCh:
w.startNewPipeline(time.Now().Unix())
case result := <-pipelineResultCh():
w.handlePipelineResult(result)
case ev := <-w.txsCh:
Expand Down Expand Up @@ -369,8 +310,6 @@ func (w *worker) mainLoop() {
return
case <-w.chainHeadSub.Err():
return
case <-w.chainSideSub.Err():
return
}
}
}
Expand Down Expand Up @@ -798,14 +737,6 @@ func copyReceipts(receipts []*types.Receipt) []*types.Receipt {
return result
}

// postSideBlock fires a side chain event, only use it for testing.
func (w *worker) postSideBlock(event core.ChainSideEvent) {
select {
case w.chainSideCh <- event:
case <-w.exitCh:
}
}

func (w *worker) onTxFailingInPipeline(txIndex int, tx *types.Transaction, err error) bool {
if !w.isRunning() {
return false
Expand Down
2 changes: 0 additions & 2 deletions miner/scroll_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,6 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool) {
for i := 0; i < 5; i++ {
b.txPool.AddLocal(b.newRandomTx(true))
b.txPool.AddLocal(b.newRandomTx(false))
w.postSideBlock(core.ChainSideEvent{Block: b.newRandomUncle()})
w.postSideBlock(core.ChainSideEvent{Block: b.newRandomUncle()})

select {
case ev := <-sub.Chan():
Expand Down
Loading