From 0a8086e682049b2f717a6d65a93c92bb77c5d9b2 Mon Sep 17 00:00:00 2001 From: Jan-Lukas Else Date: Sun, 17 Jul 2022 11:26:27 +0200 Subject: [PATCH] Rework queue algorithm --- queue.go | 46 +++++++++++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/queue.go b/queue.go index 46fe0a0..2d1e8a2 100644 --- a/queue.go +++ b/queue.go @@ -78,32 +78,39 @@ func (a *goBlog) peekQueue(ctx context.Context, name string) (*queueItem, error) type queueProcessFunc func(qi *queueItem, dequeue func(), reschedule func(time.Duration)) func (a *goBlog) listenOnQueue(queueName string, wait time.Duration, process queueProcessFunc) { + if process == nil { + return + } + + endQueue := false + queueContext, cancelQueueContext := context.WithCancel(context.Background()) + var wg sync.WaitGroup + + a.shutdown.Add(func() { + endQueue = true + cancelQueueContext() + wg.Wait() + }) + + wg.Add(1) go func() { - done := false - var wg sync.WaitGroup - wg.Add(1) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - a.shutdown.Add(func() { - done = true - cancel() - wg.Wait() - log.Println("Stopped queue:", queueName) - }) - for !done { - qi, err := a.peekQueue(ctx, queueName) + queueLoop: + for { + if endQueue { + break queueLoop + } + qi, err := a.peekQueue(queueContext, queueName) if err != nil { - // log.Println("queue peek error:", err.Error()) - continue + log.Println("queue peek error:", err.Error()) + continue queueLoop } if qi == nil { // No item in the queue, wait a moment select { case <-time.After(wait): - continue - case <-ctx.Done(): - done = true - continue + continue queueLoop + case <-queueContext.Done(): + break queueLoop } } process( @@ -120,6 +127,7 @@ func (a *goBlog) listenOnQueue(queueName string, wait time.Duration, process que }, ) } + log.Println("stopped queue:", queueName) wg.Done() }() }