Rework queue algorithm

This commit is contained in:
Jan-Lukas Else 2022-07-17 11:26:27 +02:00
parent f0d3f1c84f
commit 0a8086e682
1 changed files with 27 additions and 19 deletions

View File

@ -78,32 +78,39 @@ func (a *goBlog) peekQueue(ctx context.Context, name string) (*queueItem, error)
type queueProcessFunc func(qi *queueItem, dequeue func(), reschedule func(time.Duration)) type queueProcessFunc func(qi *queueItem, dequeue func(), reschedule func(time.Duration))
func (a *goBlog) listenOnQueue(queueName string, wait time.Duration, process queueProcessFunc) { func (a *goBlog) listenOnQueue(queueName string, wait time.Duration, process queueProcessFunc) {
if process == nil {
return
}
endQueue := false
queueContext, cancelQueueContext := context.WithCancel(context.Background())
var wg sync.WaitGroup
a.shutdown.Add(func() {
endQueue = true
cancelQueueContext()
wg.Wait()
})
wg.Add(1)
go func() { go func() {
done := false queueLoop:
var wg sync.WaitGroup for {
wg.Add(1) if endQueue {
ctx, cancel := context.WithCancel(context.Background()) break queueLoop
defer cancel() }
a.shutdown.Add(func() { qi, err := a.peekQueue(queueContext, queueName)
done = true
cancel()
wg.Wait()
log.Println("Stopped queue:", queueName)
})
for !done {
qi, err := a.peekQueue(ctx, queueName)
if err != nil { if err != nil {
// log.Println("queue peek error:", err.Error()) log.Println("queue peek error:", err.Error())
continue continue queueLoop
} }
if qi == nil { if qi == nil {
// No item in the queue, wait a moment // No item in the queue, wait a moment
select { select {
case <-time.After(wait): case <-time.After(wait):
continue continue queueLoop
case <-ctx.Done(): case <-queueContext.Done():
done = true break queueLoop
continue
} }
} }
process( process(
@ -120,6 +127,7 @@ func (a *goBlog) listenOnQueue(queueName string, wait time.Duration, process que
}, },
) )
} }
log.Println("stopped queue:", queueName)
wg.Done() wg.Done()
}() }()
} }