Skip to content

Commit 130f2a2

Browse files
committed
support concurrency
1 parent 2d7e6e9 commit 130f2a2

File tree

15 files changed

+726
-270
lines changed

15 files changed

+726
-270
lines changed

models/actions/run.go

Lines changed: 76 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"code.gitea.io/gitea/modules/util"
2121
webhook_module "code.gitea.io/gitea/modules/webhook"
2222

23-
"github.com/nektos/act/pkg/jobparser"
2423
"xorm.io/builder"
2524
)
2625

@@ -47,6 +46,8 @@ type ActionRun struct {
4746
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
4847
Status Status `xorm:"index"`
4948
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
49+
ConcurrencyGroup string
50+
ConcurrencyCancel bool
5051
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
5152
Started timeutil.TimeStamp
5253
Stopped timeutil.TimeStamp
@@ -168,7 +169,7 @@ func (run *ActionRun) IsSchedule() bool {
168169
return run.ScheduleID > 0
169170
}
170171

171-
func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
172+
func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
172173
_, err := db.GetEngine(ctx).ID(repo.ID).
173174
SetExpr("num_action_runs",
174175
builder.Select("count(*)").From("action_run").
@@ -196,13 +197,20 @@ func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) err
196197
// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
197198
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
198199
// Find all runs in the specified repository, reference, and workflow with non-final status
199-
runs, total, err := db.FindAndCount[ActionRun](ctx, FindRunOptions{
200+
opts := &FindRunOptions{
200201
RepoID: repoID,
201202
Ref: ref,
202203
WorkflowID: workflowID,
203204
TriggerEvent: event,
204205
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
205-
})
206+
}
207+
return CancelPreviousJobsWithOpts(ctx, opts)
208+
}
209+
210+
// CancelPreviousJobs cancels all previous jobs with opts
211+
func CancelPreviousJobsWithOpts(ctx context.Context, opts *FindRunOptions) error {
212+
// Find all runs by opts
213+
runs, total, err := db.FindAndCount[ActionRun](ctx, opts)
206214
if err != nil {
207215
return err
208216
}
@@ -222,119 +230,50 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
222230
return err
223231
}
224232

225-
// Iterate over each job and attempt to cancel it.
226-
for _, job := range jobs {
227-
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
228-
status := job.Status
229-
if status.IsDone() {
230-
continue
231-
}
232-
233-
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
234-
if job.TaskID == 0 {
235-
job.Status = StatusCancelled
236-
job.Stopped = timeutil.TimeStampNow()
237-
238-
// Update the job's status and stopped time in the database.
239-
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
240-
if err != nil {
241-
return err
242-
}
243-
244-
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
245-
if n == 0 {
246-
return fmt.Errorf("job has changed, try again")
247-
}
248-
249-
// Continue with the next job.
250-
continue
251-
}
252-
253-
// If the job has an associated task, try to stop the task, effectively cancelling the job.
254-
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
255-
return err
256-
}
233+
if err := CancelJobs(ctx, jobs); err != nil {
234+
return err
257235
}
258236
}
259237

260238
// Return nil to indicate successful cancellation of all running and waiting jobs.
261239
return nil
262240
}
263241

264-
// InsertRun inserts a run
265-
// The title will be cut off at 255 characters if it's longer than 255 characters.
266-
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
267-
ctx, committer, err := db.TxContext(ctx)
268-
if err != nil {
269-
return err
270-
}
271-
defer committer.Close()
272-
273-
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
274-
if err != nil {
275-
return err
276-
}
277-
run.Index = index
278-
run.Title, _ = util.SplitStringAtByteN(run.Title, 255)
242+
func CancelJobs(ctx context.Context, jobs []*ActionRunJob) error {
243+
// Iterate over each job and attempt to cancel it.
244+
for _, job := range jobs {
245+
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
246+
status := job.Status
247+
if status.IsDone() {
248+
continue
249+
}
279250

280-
if err := db.Insert(ctx, run); err != nil {
281-
return err
282-
}
251+
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
252+
if job.TaskID == 0 {
253+
job.Status = StatusCancelled
254+
job.Stopped = timeutil.TimeStampNow()
283255

284-
if run.Repo == nil {
285-
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
286-
if err != nil {
287-
return err
288-
}
289-
run.Repo = repo
290-
}
256+
// Update the job's status and stopped time in the database.
257+
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
258+
if err != nil {
259+
return err
260+
}
291261

292-
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
293-
return err
294-
}
262+
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
263+
if n == 0 {
264+
return fmt.Errorf("job has changed, try again")
265+
}
295266

296-
runJobs := make([]*ActionRunJob, 0, len(jobs))
297-
var hasWaiting bool
298-
for _, v := range jobs {
299-
id, job := v.Job()
300-
needs := job.Needs()
301-
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
302-
return err
267+
// Continue with the next job.
268+
continue
303269
}
304-
payload, _ := v.Marshal()
305-
status := StatusWaiting
306-
if len(needs) > 0 || run.NeedApproval {
307-
status = StatusBlocked
308-
} else {
309-
hasWaiting = true
310-
}
311-
job.Name, _ = util.SplitStringAtByteN(job.Name, 255)
312-
runJobs = append(runJobs, &ActionRunJob{
313-
RunID: run.ID,
314-
RepoID: run.RepoID,
315-
OwnerID: run.OwnerID,
316-
CommitSHA: run.CommitSHA,
317-
IsForkPullRequest: run.IsForkPullRequest,
318-
Name: job.Name,
319-
WorkflowPayload: payload,
320-
JobID: id,
321-
Needs: needs,
322-
RunsOn: job.RunsOn(),
323-
Status: status,
324-
})
325-
}
326-
if err := db.Insert(ctx, runJobs); err != nil {
327-
return err
328-
}
329270

330-
// if there is a job in the waiting status, increase tasks version.
331-
if hasWaiting {
332-
if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
271+
// If the job has an associated task, try to stop the task, effectively cancelling the job.
272+
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
333273
return err
334274
}
335275
}
336-
337-
return committer.Commit()
276+
return nil
338277
}
339278

340279
func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) {
@@ -426,7 +365,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
426365
}
427366
run.Repo = repo
428367
}
429-
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
368+
if err := UpdateRepoRunsNumbers(ctx, run.Repo); err != nil {
430369
return err
431370
}
432371
}
@@ -435,3 +374,38 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
435374
}
436375

437376
type ActionRunIndex db.ResourceIndex
377+
378+
func CancelConcurrentJobs(ctx context.Context, actionRunJob *ActionRunJob) error {
379+
// cancel previous jobs in the same concurrency group
380+
previousJobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
381+
RepoID: actionRunJob.RepoID,
382+
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
383+
Statuses: []Status{
384+
StatusRunning,
385+
StatusWaiting,
386+
StatusBlocked,
387+
},
388+
})
389+
if err != nil {
390+
return fmt.Errorf("find previous jobs: %w", err)
391+
}
392+
393+
return CancelJobs(ctx, previousJobs)
394+
}
395+
396+
func ShouldJobBeBlockedByConcurrentJobs(ctx context.Context, actionRunJob *ActionRunJob) (bool, error) {
397+
if actionRunJob.ConcurrencyCancel {
398+
return false, CancelConcurrentJobs(ctx, actionRunJob)
399+
}
400+
401+
concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{
402+
RepoID: actionRunJob.RepoID,
403+
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
404+
Statuses: []Status{StatusRunning, StatusWaiting},
405+
})
406+
if err != nil {
407+
return false, fmt.Errorf("count waiting jobs: %w", err)
408+
}
409+
410+
return concurrentJobsNum > 0, nil
411+
}

models/actions/run_job.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,16 @@ type ActionRunJob struct {
3333
RunsOn []string `xorm:"JSON TEXT"`
3434
TaskID int64 // the latest task of the job
3535
Status Status `xorm:"index"`
36-
Started timeutil.TimeStamp
37-
Stopped timeutil.TimeStamp
38-
Created timeutil.TimeStamp `xorm:"created"`
39-
Updated timeutil.TimeStamp `xorm:"updated index"`
36+
37+
RawConcurrencyGroup string // raw concurrency.group
38+
RawConcurrencyCancel string // raw concurrency.cancel-in-progress
39+
ConcurrencyGroup string // evaluated concurrency.group
40+
ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress
41+
42+
Started timeutil.TimeStamp
43+
Stopped timeutil.TimeStamp
44+
Created timeutil.TimeStamp `xorm:"created"`
45+
Updated timeutil.TimeStamp `xorm:"updated index"`
4046
}
4147

4248
func init() {

models/actions/run_job_list.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,21 @@ func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) err
4646
return jobs.LoadRuns(ctx, withRepo)
4747
}
4848

49+
func GetRunsByIDs(ctx context.Context, runIDs []int64) (RunList, error) {
50+
runList := make(RunList, 0, len(runIDs))
51+
err := db.GetEngine(ctx).In("id", runIDs).Find(&runList)
52+
return runList, err
53+
}
54+
4955
type FindRunJobOptions struct {
5056
db.ListOptions
51-
RunID int64
52-
RepoID int64
53-
OwnerID int64
54-
CommitSHA string
55-
Statuses []Status
56-
UpdatedBefore timeutil.TimeStamp
57+
RunID int64
58+
RepoID int64
59+
OwnerID int64
60+
CommitSHA string
61+
Statuses []Status
62+
UpdatedBefore timeutil.TimeStamp
63+
ConcurrencyGroup string
5764
}
5865

5966
func (opts FindRunJobOptions) ToConds() builder.Cond {
@@ -76,5 +83,8 @@ func (opts FindRunJobOptions) ToConds() builder.Cond {
7683
if opts.UpdatedBefore > 0 {
7784
cond = cond.And(builder.Lt{"updated": opts.UpdatedBefore})
7885
}
86+
if opts.ConcurrencyGroup != "" {
87+
cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup})
88+
}
7989
return cond
8090
}

models/actions/run_list.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,16 @@ func (runs RunList) LoadRepos(ctx context.Context) error {
6363

6464
type FindRunOptions struct {
6565
db.ListOptions
66-
RepoID int64
67-
OwnerID int64
68-
WorkflowID string
69-
Ref string // the commit/tag/… that caused this workflow
70-
TriggerUserID int64
71-
TriggerEvent webhook_module.HookEventType
72-
Approved bool // not util.OptionalBool, it works only when it's true
73-
Status []Status
66+
RepoID int64
67+
OwnerID int64
68+
WorkflowID string
69+
Ref string // the commit/tag/… that caused this workflow
70+
TriggerUserID int64
71+
TriggerEvent webhook_module.HookEventType
72+
Approved bool // not util.OptionalBool, it works only when it's true
73+
Status []Status
74+
SortType string
75+
ConcurrencyGroup string
7476
}
7577

7678
func (opts FindRunOptions) ToConds() builder.Cond {
@@ -99,11 +101,21 @@ func (opts FindRunOptions) ToConds() builder.Cond {
99101
if opts.TriggerEvent != "" {
100102
cond = cond.And(builder.Eq{"trigger_event": opts.TriggerEvent})
101103
}
104+
if len(opts.ConcurrencyGroup) > 0 {
105+
cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup})
106+
}
102107
return cond
103108
}
104109

105110
func (opts FindRunOptions) ToOrders() string {
106-
return "`id` DESC"
111+
switch opts.SortType {
112+
case "oldest":
113+
return "created ASC"
114+
case "newest":
115+
return "created DESC"
116+
default:
117+
return "`id` DESC"
118+
}
107119
}
108120

109121
type StatusInfo struct {

models/migrations/migrations.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ func prepareMigrationTasks() []*migration {
369369
newMigration(309, "Improve Notification table indices", v1_23.ImproveNotificationTableIndices),
370370
newMigration(310, "Add Priority to ProtectedBranch", v1_23.AddPriorityToProtectedBranch),
371371
newMigration(311, "Add TimeEstimate to Issue table", v1_23.AddTimeEstimateColumnToIssueTable),
372+
// TODO: AddActionsConcurrency
372373
}
373374
return preparedMigrations
374375
}

models/migrations/v1_23/v312.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright 2024 The Gitea Authors. All rights reserved.
2+
// SPDX-License-Identifier: MIT
3+
4+
package v1_23 //nolint
5+
6+
import (
7+
"xorm.io/xorm"
8+
)
9+
10+
func AddActionsConcurrency(x *xorm.Engine) error {
11+
type ActionRun struct {
12+
ConcurrencyGroup string
13+
ConcurrencyCancel bool
14+
}
15+
16+
if err := x.Sync(new(ActionRun)); err != nil {
17+
return err
18+
}
19+
20+
type ActionRunJob struct {
21+
RawConcurrencyGroup string
22+
RawConcurrencyCancel string
23+
ConcurrencyGroup string
24+
ConcurrencyCancel bool
25+
}
26+
27+
return x.Sync(new(ActionRunJob))
28+
}

0 commit comments

Comments
 (0)