|
|
|
@ -18,21 +18,28 @@ type queueItem struct {
|
|
|
|
|
id int |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (db *database) enqueue(name string, content []byte, schedule time.Time) error { |
|
|
|
|
func (a *goBlog) enqueue(name string, content []byte, schedule time.Time) error { |
|
|
|
|
if len(content) == 0 { |
|
|
|
|
return errors.New("empty content") |
|
|
|
|
} |
|
|
|
|
_, err := db.exec( |
|
|
|
|
_, err := a.db.exec( |
|
|
|
|
"insert into queue (name, content, schedule) values (@name, @content, @schedule)", |
|
|
|
|
sql.Named("name", name), |
|
|
|
|
sql.Named("content", content), |
|
|
|
|
sql.Named("schedule", schedule.UTC().Format(time.RFC3339Nano)), |
|
|
|
|
) |
|
|
|
|
return err |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// Trigger all queue listeners
|
|
|
|
|
for _, trigger := range a.queueTriggers { |
|
|
|
|
trigger <- struct{}{} |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (db *database) reschedule(qi *queueItem, dur time.Duration) error { |
|
|
|
|
_, err := db.exec( |
|
|
|
|
func (a *goBlog) reschedule(qi *queueItem, dur time.Duration) error { |
|
|
|
|
_, err := a.db.exec( |
|
|
|
|
"update queue set schedule = @schedule, content = @content where id = @id", |
|
|
|
|
sql.Named("schedule", qi.schedule.Add(dur).UTC().Format(time.RFC3339Nano)), |
|
|
|
|
sql.Named("content", qi.content), |
|
|
|
@ -41,13 +48,13 @@ func (db *database) reschedule(qi *queueItem, dur time.Duration) error {
|
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (db *database) dequeue(qi *queueItem) error { |
|
|
|
|
_, err := db.exec("delete from queue where id = @id", sql.Named("id", qi.id)) |
|
|
|
|
func (a *goBlog) dequeue(qi *queueItem) error { |
|
|
|
|
_, err := a.db.exec("delete from queue where id = @id", sql.Named("id", qi.id)) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (db *database) peekQueue(ctx context.Context, name string) (*queueItem, error) { |
|
|
|
|
row, err := db.queryRowContext( |
|
|
|
|
func (a *goBlog) peekQueue(ctx context.Context, name string) (*queueItem, error) { |
|
|
|
|
row, err := a.db.queryRowContext( |
|
|
|
|
ctx, |
|
|
|
|
"select id, name, content, schedule from queue where schedule <= @schedule and name = @name order by schedule asc limit 1", |
|
|
|
|
sql.Named("name", name), |
|
|
|
@ -75,6 +82,10 @@ func (db *database) peekQueue(ctx context.Context, name string) (*queueItem, err
|
|
|
|
|
type queueProcessFunc func(qi *queueItem, dequeue func(), reschedule func(time.Duration)) |
|
|
|
|
|
|
|
|
|
func (a *goBlog) listenOnQueue(queueName string, wait time.Duration, process queueProcessFunc) { |
|
|
|
|
// Queue trigger
|
|
|
|
|
trigger := make(chan struct{}) |
|
|
|
|
a.queueTriggers = append(a.queueTriggers, trigger) |
|
|
|
|
// Start goroutine to listen on queue
|
|
|
|
|
go func() { |
|
|
|
|
done := false |
|
|
|
|
var wg sync.WaitGroup |
|
|
|
@ -88,9 +99,9 @@ func (a *goBlog) listenOnQueue(queueName string, wait time.Duration, process que
|
|
|
|
|
log.Println("Stopped queue:", queueName) |
|
|
|
|
}) |
|
|
|
|
for !done { |
|
|
|
|
qi, err := a.db.peekQueue(ctx, queueName) |
|
|
|
|
qi, err := a.peekQueue(ctx, queueName) |
|
|
|
|
if err != nil { |
|
|
|
|
log.Println("queue error:", err.Error()) |
|
|
|
|
log.Println("queue peek error:", err.Error()) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
if qi == nil { |
|
|
|
@ -98,19 +109,22 @@ func (a *goBlog) listenOnQueue(queueName string, wait time.Duration, process que
|
|
|
|
|
select { |
|
|
|
|
case <-time.After(wait): |
|
|
|
|
continue |
|
|
|
|
case <-trigger: |
|
|
|
|
continue |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
done = true |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
process( |
|
|
|
|
qi, |
|
|
|
|
func() { |
|
|
|
|
if err := a.db.dequeue(qi); err != nil { |
|
|
|
|
if err := a.dequeue(qi); err != nil { |
|
|
|
|
log.Println("queue dequeue error:", err.Error()) |
|
|
|
|
} |
|
|
|
|
}, |
|
|
|
|
func(dur time.Duration) { |
|
|
|
|
if err := a.db.reschedule(qi, dur); err != nil { |
|
|
|
|
if err := a.reschedule(qi, dur); err != nil { |
|
|
|
|
log.Println("queue reschedule error:", err.Error()) |
|
|
|
|
} |
|
|
|
|
}, |
|
|
|
|