Closed as not planned
Description
This is a draft test for understanding queue's behavior (only testing, no assertion). Quote from #23050 (comment)
The output could be:
TestPullRequest_QueueBehavior completed1=4, executed2=0, has2=20, executed3=110, has3=0
TestPullRequest_QueueBehavior completed1=3, executed2=0, has2=20, executed3=107, has3=0
I haven't fully understand some behaviors (I do not know whether they are right or wrong), so just write down the phenomenon for information.
- The
q2
always outputsexecuted2=0, has2=20
, does it mean that: if noPush
after the startup, the tasks in the queue would never be executed? - If the
q3
usesHas
to limit the tasks only being executed once (maybe like thecheckAndUpdateStatus
in code?), the output will beexecuted3=107, has3=0
, it means that many tasks are still re-executed even if there is aHas
check. I know that there is concurrency, but is it clear to future developers? Is the existing code likecheckAndUpdateStatus
which relies onHas
working as expected in all cases?
Are these two behaviors by design? If yes, I think some comments would be very useful.
func TestPullRequest_QueueBehavior(t *testing.T) {
setting_module.AppWorkPath = "/tmp"
_ = util.RemoveAll("/tmp/data")
unittest.PrepareTestEnv(t)
setting_module.InitProviderAndLoadCommonSettingsForTest()
setting_module.LoadQueueSettings()
q1 := func() (completedTasks []string) {
startWhen100Ready := make(chan struct{}) // only start data cnosuming when the 100 tasks are all pushed into queue
stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item
testHandler := func(data ...queue.Data) []queue.Data {
<-startWhen100Ready
time.Sleep(100 * time.Millisecond)
for _, datum := range data {
s := datum.(string)
completedTasks = append(completedTasks, s)
if s == "task-20" {
close(stopAt20Shutdown)
return nil
}
}
return nil
}
q := queue.CreateUniqueQueue("pr_patch_checker_test", testHandler, "")
q.Run(func(atShutdown func()) { go func() { <-stopAt20Shutdown; atShutdown() }() }, func(atTerminate func()) {})
// add 100 tasks to the queue
for i := 0; i < 100; i++ {
_ = q.Push("task-" + strconv.Itoa(i))
}
close(startWhen100Ready)
<-stopAt20Shutdown
return
}
q2 := func() (executedTasks []string, hasTasks []string) {
stop := make(chan struct{})
// collect the tasks that have been executed
testHandler := func(data ...queue.Data) []queue.Data {
for _, datum := range data {
executedTasks = append(executedTasks, datum.(string))
}
return nil
}
q := queue.CreateUniqueQueue("pr_patch_checker_test", testHandler, "")
q.Run(func(atShutdown func()) { go func() { <-stop; atShutdown() }() }, func(atTerminate func()) {})
// do not push anything, just wait for a while to see whether there are tasks to get executed.
time.Sleep(1 * time.Second)
// check whether the tasks are still in the queue
for i := 0; i < 100; i++ {
if has, _ := q.Has("task-" + strconv.Itoa(i)); has {
hasTasks = append(hasTasks, "task-"+strconv.Itoa(i))
}
}
close(stop)
return
}
q3 := func() (executedTasks []string, hasTasks []string) {
stop := make(chan struct{})
var q queue.UniqueQueue
// q3 test handler to use `Has` to only executed one task
testHandler := func(data ...queue.Data) []queue.Data {
for _, datum := range data {
s := datum.(string)
if has, _ := q.Has(s); !has {
executedTasks = append(executedTasks, s)
}
}
return nil
}
q = queue.CreateUniqueQueue("pr_patch_checker_test", testHandler, "")
q.Run(func(atShutdown func()) { go func() { <-stop; atShutdown() }() }, func(atTerminate func()) {})
// re-run all tasks
for i := 0; i < 100; i++ {
_ = q.Push("task-" + strconv.Itoa(i))
}
// wait for a while
time.Sleep(1 * time.Second)
// check whether the tasks are still in the queue
for i := 0; i < 100; i++ {
if has, _ := q.Has("task-" + strconv.Itoa(i)); has {
hasTasks = append(hasTasks, "task-"+strconv.Itoa(i))
}
}
close(stop)
return
}
completedTasks1 := q1() // run some tasks and shutdown at an intermediate point
time.Sleep(time.Second)
executedTasks2, hasTasks2 := q2() // restart the queue to check the tasks in it
time.Sleep(time.Second)
executedTasks3, hasTasks3 := q3() // try to re-run all tasks
log.Warn("TestPullRequest_QueueStuck completed1=%v, executed2=%v, has2=%v, executed3=%v, has3=%v",
len(completedTasks1), len(executedTasks2), len(hasTasks2), len(executedTasks3), len(hasTasks3))
}