Skip to content

Commit 18f26cf

Browse files
authored
Improve queue and logger context (#24924)
Before there was a "graceful function": RunWithShutdownFns, it's mainly for some modules which doesn't support context. The old queue system doesn't work well with context, so the old queues need it. After the queue refactoring, the new queue works with context well, so, use Golang context as much as possible, the `RunWithShutdownFns` could be removed (replaced by RunWithCancel for context cancel mechanism), the related code could be simplified. This PR also fixes some legacy queue-init problems, eg: * typo : archiver: "unable to create codes indexer queue" => "unable to create repo-archive queue" * no nil check for failed queues, which causes unfriendly panic After this PR, many goroutines could have better display name: ![image](https://github.com/go-gitea/gitea/assets/2114189/701b2a9b-8065-4137-aeaa-0bda2b34604a) ![image](https://github.com/go-gitea/gitea/assets/2114189/f1d5f50f-0534-40f0-b0be-f2c9daa5fe92)
1 parent e4922d4 commit 18f26cf

31 files changed

+204
-263
lines changed

modules/graceful/context.go

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,51 +5,8 @@ package graceful
55

66
import (
77
"context"
8-
"time"
98
)
109

11-
// ChannelContext is a context that wraps a channel and error as a context
12-
type ChannelContext struct {
13-
done <-chan struct{}
14-
err error
15-
}
16-
17-
// NewChannelContext creates a ChannelContext from a channel and error
18-
func NewChannelContext(done <-chan struct{}, err error) *ChannelContext {
19-
return &ChannelContext{
20-
done: done,
21-
err: err,
22-
}
23-
}
24-
25-
// Deadline returns the time when work done on behalf of this context
26-
// should be canceled. There is no Deadline for a ChannelContext
27-
func (ctx *ChannelContext) Deadline() (deadline time.Time, ok bool) {
28-
return deadline, ok
29-
}
30-
31-
// Done returns the channel provided at the creation of this context.
32-
// When closed, work done on behalf of this context should be canceled.
33-
func (ctx *ChannelContext) Done() <-chan struct{} {
34-
return ctx.done
35-
}
36-
37-
// Err returns nil, if Done is not closed. If Done is closed,
38-
// Err returns the error provided at the creation of this context
39-
func (ctx *ChannelContext) Err() error {
40-
select {
41-
case <-ctx.done:
42-
return ctx.err
43-
default:
44-
return nil
45-
}
46-
}
47-
48-
// Value returns nil for all calls as no values are or can be associated with this context
49-
func (ctx *ChannelContext) Value(key interface{}) interface{} {
50-
return nil
51-
}
52-
5310
// ShutdownContext returns a context.Context that is Done at shutdown
5411
// Callers using this context should ensure that they are registered as a running server
5512
// in order that they are waited for.

modules/graceful/manager.go

Lines changed: 11 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ const (
2323
stateTerminate
2424
)
2525

26+
type RunCanceler interface {
27+
Run()
28+
Cancel()
29+
}
30+
2631
// There are some places that could inherit sockets:
2732
//
2833
// * HTTP or HTTPS main listener
@@ -55,46 +60,19 @@ func InitManager(ctx context.Context) {
5560
})
5661
}
5762

58-
// WithCallback is a runnable to call when the caller has finished
59-
type WithCallback func(callback func())
60-
61-
// RunnableWithShutdownFns is a runnable with functions to run at shutdown and terminate
62-
// After the callback to atShutdown is called and is complete, the main function must return.
63-
// Similarly the callback function provided to atTerminate must return once termination is complete.
64-
// Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals
65-
// - users must therefore be careful to only call these as necessary.
66-
type RunnableWithShutdownFns func(atShutdown, atTerminate func(func()))
67-
68-
// RunWithShutdownFns takes a function that has both atShutdown and atTerminate callbacks
69-
// After the callback to atShutdown is called and is complete, the main function must return.
70-
// Similarly the callback function provided to atTerminate must return once termination is complete.
71-
// Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals
72-
// - users must therefore be careful to only call these as necessary.
73-
func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) {
63+
// RunWithCancel helps to run a function with a custom context, the Cancel function will be called at shutdown
64+
// The Cancel function should stop the Run function in predictable time.
65+
func (g *Manager) RunWithCancel(rc RunCanceler) {
66+
g.RunAtShutdown(context.Background(), rc.Cancel)
7467
g.runningServerWaitGroup.Add(1)
7568
defer g.runningServerWaitGroup.Done()
7669
defer func() {
7770
if err := recover(); err != nil {
78-
log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2))
71+
log.Critical("PANIC during RunWithCancel: %v\nStacktrace: %s", err, log.Stack(2))
7972
g.doShutdown()
8073
}
8174
}()
82-
run(func(atShutdown func()) {
83-
g.lock.Lock()
84-
defer g.lock.Unlock()
85-
g.toRunAtShutdown = append(g.toRunAtShutdown,
86-
func() {
87-
defer func() {
88-
if err := recover(); err != nil {
89-
log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2))
90-
g.doShutdown()
91-
}
92-
}()
93-
atShutdown()
94-
})
95-
}, func(atTerminate func()) {
96-
g.RunAtTerminate(atTerminate)
97-
})
75+
rc.Run()
9876
}
9977

10078
// RunWithShutdownContext takes a function that has a context to watch for shutdown.
@@ -151,21 +129,6 @@ func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) {
151129
})
152130
}
153131

154-
// RunAtHammer creates a go-routine to run the provided function at shutdown
155-
func (g *Manager) RunAtHammer(hammer func()) {
156-
g.lock.Lock()
157-
defer g.lock.Unlock()
158-
g.toRunAtHammer = append(g.toRunAtHammer,
159-
func() {
160-
defer func() {
161-
if err := recover(); err != nil {
162-
log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2))
163-
}
164-
}()
165-
hammer()
166-
})
167-
}
168-
169132
func (g *Manager) doShutdown() {
170133
if !g.setStateTransition(stateRunning, stateShuttingDown) {
171134
g.DoImmediateHammer()
@@ -206,9 +169,6 @@ func (g *Manager) doHammerTime(d time.Duration) {
206169
g.hammerCtxCancel()
207170
atHammerCtx := pprof.WithLabels(g.terminateCtx, pprof.Labels("graceful-lifecycle", "post-hammer"))
208171
pprof.SetGoroutineLabels(atHammerCtx)
209-
for _, fn := range g.toRunAtHammer {
210-
go fn()
211-
}
212172
}
213173
g.lock.Unlock()
214174
}

modules/graceful/manager_unix.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ type Manager struct {
4141
terminateWaitGroup sync.WaitGroup
4242

4343
toRunAtShutdown []func()
44-
toRunAtHammer []func()
4544
toRunAtTerminate []func()
4645
}
4746

modules/graceful/manager_windows.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ type Manager struct {
5050
shutdownRequested chan struct{}
5151

5252
toRunAtShutdown []func()
53-
toRunAtHammer []func()
5453
toRunAtTerminate []func()
5554
}
5655

modules/indexer/code/indexer.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ func Init() {
166166
handler := func(items ...*IndexerData) (unhandled []*IndexerData) {
167167
idx, err := indexer.get()
168168
if idx == nil || err != nil {
169-
log.Error("Codes indexer handler: unable to get indexer!")
169+
log.Warn("Codes indexer handler: indexer is not ready, retry later.")
170170
return items
171171
}
172172

@@ -201,7 +201,7 @@ func Init() {
201201
return unhandled
202202
}
203203

204-
indexerQueue = queue.CreateUniqueQueue("code_indexer", handler)
204+
indexerQueue = queue.CreateUniqueQueue(ctx, "code_indexer", handler)
205205
if indexerQueue == nil {
206206
log.Fatal("Unable to create codes indexer queue")
207207
}
@@ -259,7 +259,7 @@ func Init() {
259259
indexer.set(rIndexer)
260260

261261
// Start processing the queue
262-
go graceful.GetManager().RunWithShutdownFns(indexerQueue.Run)
262+
go graceful.GetManager().RunWithCancel(indexerQueue)
263263

264264
if populate {
265265
go graceful.GetManager().RunWithShutdownContext(populateRepoIndexer)

modules/indexer/issues/indexer.go

Lines changed: 32 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,15 @@ var (
102102
func InitIssueIndexer(syncReindex bool) {
103103
ctx, _, finished := process.GetManager().AddTypedContext(context.Background(), "Service: IssueIndexer", process.SystemProcessType, false)
104104

105-
waitChannel := make(chan time.Duration, 1)
105+
indexerInitWaitChannel := make(chan time.Duration, 1)
106106

107107
// Create the Queue
108108
switch setting.Indexer.IssueType {
109109
case "bleve", "elasticsearch", "meilisearch":
110110
handler := func(items ...*IndexerData) (unhandled []*IndexerData) {
111111
indexer := holder.get()
112112
if indexer == nil {
113-
log.Error("Issue indexer handler: unable to get indexer.")
113+
log.Warn("Issue indexer handler: indexer is not ready, retry later.")
114114
return items
115115
}
116116
toIndex := make([]*IndexerData, 0, len(items))
@@ -138,15 +138,17 @@ func InitIssueIndexer(syncReindex bool) {
138138
return unhandled
139139
}
140140

141-
issueIndexerQueue = queue.CreateSimpleQueue("issue_indexer", handler)
141+
issueIndexerQueue = queue.CreateSimpleQueue(ctx, "issue_indexer", handler)
142142

143143
if issueIndexerQueue == nil {
144144
log.Fatal("Unable to create issue indexer queue")
145145
}
146146
default:
147-
issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData]("issue_indexer", nil)
147+
issueIndexerQueue = queue.CreateSimpleQueue[*IndexerData](ctx, "issue_indexer", nil)
148148
}
149149

150+
graceful.GetManager().RunAtTerminate(finished)
151+
150152
// Create the Indexer
151153
go func() {
152154
pprof.SetGoroutineLabels(ctx)
@@ -178,51 +180,41 @@ func InitIssueIndexer(syncReindex bool) {
178180
if issueIndexer != nil {
179181
issueIndexer.Close()
180182
}
181-
finished()
182183
log.Info("PID: %d Issue Indexer closed", os.Getpid())
183184
})
184185
log.Debug("Created Bleve Indexer")
185186
case "elasticsearch":
186-
graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) {
187-
pprof.SetGoroutineLabels(ctx)
188-
issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
189-
if err != nil {
190-
log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
191-
}
192-
exist, err := issueIndexer.Init()
193-
if err != nil {
194-
log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
195-
}
196-
populate = !exist
197-
holder.set(issueIndexer)
198-
atTerminate(finished)
199-
})
187+
issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
188+
if err != nil {
189+
log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
190+
}
191+
exist, err := issueIndexer.Init()
192+
if err != nil {
193+
log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
194+
}
195+
populate = !exist
196+
holder.set(issueIndexer)
200197
case "db":
201198
issueIndexer := &DBIndexer{}
202199
holder.set(issueIndexer)
203-
graceful.GetManager().RunAtTerminate(finished)
204200
case "meilisearch":
205-
graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) {
206-
pprof.SetGoroutineLabels(ctx)
207-
issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName)
208-
if err != nil {
209-
log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
210-
}
211-
exist, err := issueIndexer.Init()
212-
if err != nil {
213-
log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
214-
}
215-
populate = !exist
216-
holder.set(issueIndexer)
217-
atTerminate(finished)
218-
})
201+
issueIndexer, err := NewMeilisearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName)
202+
if err != nil {
203+
log.Fatal("Unable to initialize Meilisearch Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err)
204+
}
205+
exist, err := issueIndexer.Init()
206+
if err != nil {
207+
log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
208+
}
209+
populate = !exist
210+
holder.set(issueIndexer)
219211
default:
220212
holder.cancel()
221213
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
222214
}
223215

224216
// Start processing the queue
225-
go graceful.GetManager().RunWithShutdownFns(issueIndexerQueue.Run)
217+
go graceful.GetManager().RunWithCancel(issueIndexerQueue)
226218

227219
// Populate the index
228220
if populate {
@@ -232,13 +224,14 @@ func InitIssueIndexer(syncReindex bool) {
232224
go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
233225
}
234226
}
235-
waitChannel <- time.Since(start)
236-
close(waitChannel)
227+
228+
indexerInitWaitChannel <- time.Since(start)
229+
close(indexerInitWaitChannel)
237230
}()
238231

239232
if syncReindex {
240233
select {
241-
case <-waitChannel:
234+
case <-indexerInitWaitChannel:
242235
case <-graceful.GetManager().IsShutdown():
243236
}
244237
} else if setting.Indexer.StartupTimeout > 0 {
@@ -249,7 +242,7 @@ func InitIssueIndexer(syncReindex bool) {
249242
timeout += setting.GracefulHammerTime
250243
}
251244
select {
252-
case duration := <-waitChannel:
245+
case duration := <-indexerInitWaitChannel:
253246
log.Info("Issue Indexer Initialization took %v", duration)
254247
case <-graceful.GetManager().IsShutdown():
255248
log.Warn("Shutdown occurred before issue index initialisation was complete")

modules/indexer/stats/queue.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,11 @@ func handler(items ...int64) []int64 {
2929
}
3030

3131
func initStatsQueue() error {
32-
statsQueue = queue.CreateUniqueQueue("repo_stats_update", handler)
32+
statsQueue = queue.CreateUniqueQueue(graceful.GetManager().ShutdownContext(), "repo_stats_update", handler)
3333
if statsQueue == nil {
34-
return fmt.Errorf("Unable to create repo_stats_update Queue")
34+
return fmt.Errorf("unable to create repo_stats_update queue")
3535
}
36-
37-
go graceful.GetManager().RunWithShutdownFns(statsQueue.Run)
38-
36+
go graceful.GetManager().RunWithCancel(statsQueue)
3937
return nil
4038
}
4139

modules/log/event_writer_base.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"io"
1010
"regexp"
11+
"runtime/pprof"
1112
"time"
1213
)
1314

@@ -143,9 +144,17 @@ func eventWriterStartGo(ctx context.Context, w EventWriter, shared bool) {
143144
}
144145
w.Base().shared = shared
145146
w.Base().stopped = make(chan struct{})
147+
148+
ctxDesc := "Logger: EventWriter: " + w.GetWriterName()
149+
if shared {
150+
ctxDesc = "Logger: EventWriter (shared): " + w.GetWriterName()
151+
}
152+
writerCtx, writerCancel := newContext(ctx, ctxDesc)
146153
go func() {
154+
defer writerCancel()
147155
defer close(w.Base().stopped)
148-
w.Run(ctx)
156+
pprof.SetGoroutineLabels(writerCtx)
157+
w.Run(writerCtx)
149158
}()
150159
}
151160

modules/log/event_writer_conn_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func TestConnLogger(t *testing.T) {
4040
level := INFO
4141
flags := LstdFlags | LUTC | Lfuncname
4242

43-
logger := NewLoggerWithWriters(context.Background(), NewEventWriterConn("test-conn", WriterMode{
43+
logger := NewLoggerWithWriters(context.Background(), "test", NewEventWriterConn("test-conn", WriterMode{
4444
Level: level,
4545
Prefix: prefix,
4646
Flags: FlagsFromBits(flags),

0 commit comments

Comments
 (0)