|
|
|
@ -31,10 +31,6 @@ func (a *goBlog) enqueue(name string, content []byte, schedule time.Time) error
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
// Trigger all queue listeners
|
|
|
|
|
for _, trigger := range a.queueTriggers {
|
|
|
|
|
trigger <- struct{}{}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -82,10 +78,6 @@ func (a *goBlog) peekQueue(ctx context.Context, name string) (*queueItem, error)
|
|
|
|
|
type queueProcessFunc func(qi *queueItem, dequeue func(), reschedule func(time.Duration))
|
|
|
|
|
|
|
|
|
|
func (a *goBlog) listenOnQueue(queueName string, wait time.Duration, process queueProcessFunc) {
|
|
|
|
|
// Queue trigger
|
|
|
|
|
trigger := make(chan struct{})
|
|
|
|
|
a.queueTriggers = append(a.queueTriggers, trigger)
|
|
|
|
|
// Start goroutine to listen on queue
|
|
|
|
|
go func() {
|
|
|
|
|
done := false
|
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
@ -109,8 +101,6 @@ func (a *goBlog) listenOnQueue(queueName string, wait time.Duration, process que
|
|
|
|
|
select {
|
|
|
|
|
case <-time.After(wait):
|
|
|
|
|
continue
|
|
|
|
|
case <-trigger:
|
|
|
|
|
continue
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
done = true
|
|
|
|
|
continue
|
|
|
|
|