SQLite based queue (option to reschedule item in queue)

This commit is contained in:
Jan-Lukas Else 2021-05-24 09:12:46 +02:00
parent deb3189673
commit bc1a978793
7 changed files with 133 additions and 92 deletions

View File

@ -3,44 +3,23 @@ package main
import (
"bytes"
"context"
"encoding/gob"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"time"
"github.com/joncrlsn/dque"
)
var apQueue *dque.DQue
type apRequest struct {
BlogIri, To string
Activity []byte
Try int
LastTry int64
}
func apRequestBuilder() interface{} {
return &apRequest{}
}
func initAPSendQueue() (err error) {
queuePath := "queues"
if _, err := os.Stat(queuePath); os.IsNotExist(err) {
if err = os.Mkdir(queuePath, 0755); err != nil {
return err
}
} else if err != nil {
return err
}
apQueue, err = dque.NewOrOpen("activitypub", queuePath, 1000, apRequestBuilder)
if err != nil {
return err
}
startAPSendQueue()
return nil
}
@ -48,35 +27,34 @@ func initAPSendQueue() (err error) {
func startAPSendQueue() {
go func() {
for {
if rInterface, err := apQueue.PeekBlock(); err == nil {
if rInterface == nil {
// Empty request
_, _ = apQueue.Dequeue()
time.Sleep(3 * time.Second)
qi, err := peekQueue("ap")
if err != nil {
log.Println(err.Error())
continue
} else if qi != nil {
var r apRequest
err = gob.NewDecoder(bytes.NewReader(qi.content)).Decode(&r)
if err != nil {
log.Println(err.Error())
_ = qi.dequeue()
continue
}
if r, ok := rInterface.(*apRequest); ok {
if r.LastTry != 0 && time.Now().Before(time.Unix(r.LastTry, 0).Add(time.Duration(r.Try)*10*time.Minute)) {
_ = apQueue.Enqueue(r)
if err := apSendSigned(r.BlogIri, r.To, r.Activity); err != nil {
if r.Try++; r.Try < 20 {
// Try it again
qi.content, _ = r.encode()
_ = qi.reschedule(time.Duration(r.Try) * 10 * time.Minute)
continue
} else {
// Send request
if err := apSendSigned(r.BlogIri, r.To, r.Activity); err != nil {
if r.Try++; r.Try < 21 {
// Try it again
r.LastTry = time.Now().Unix()
_ = apQueue.Enqueue(r)
} else {
log.Printf("Request to %s failed for the 20th time", r.To)
log.Println()
_ = apRemoveInbox(r.To)
}
}
log.Printf("Request to %s failed for the 20th time", r.To)
log.Println()
_ = apRemoveInbox(r.To)
}
// Finish
_, _ = apQueue.Dequeue()
time.Sleep(1 * time.Second)
} else {
// Invalid type
_, _ = apQueue.Dequeue()
}
err = qi.dequeue()
if err != nil {
log.Println(err.Error())
}
}
}
@ -88,11 +66,24 @@ func apQueueSendSigned(blogIri, to string, activity interface{}) error {
if err != nil {
return err
}
return apQueue.Enqueue(&apRequest{
b, err := (&apRequest{
BlogIri: blogIri,
To: to,
Activity: body,
})
}).encode()
if err != nil {
return err
}
return enqueue("ap", b, time.Now())
}
func (r *apRequest) encode() ([]byte, error) {
var buf bytes.Buffer
err := gob.NewEncoder(&buf).Encode(r)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func apSendSigned(blogIri, to string, activity []byte) error {

View File

@ -157,6 +157,15 @@ func migrateDb() error {
return err
},
},
&migrator.Migration{
Name: "00014",
Func: func(tx *sql.Tx) error {
_, err := tx.Exec(`
create table queue (id integer primary key autoincrement, name text not null, content blob, schedule text not null);
`)
return err
},
},
),
)
if err != nil {

2
go.mod
View File

@ -20,7 +20,6 @@ require (
github.com/go-chi/chi/v5 v5.0.3
github.com/go-fed/httpsig v1.1.0
github.com/go-sql-driver/mysql v1.5.0 // indirect
github.com/gofrs/flock v0.8.0 // indirect
github.com/golang/gddo v0.0.0-20210115222349-20d68f94ee1f // indirect
github.com/golang/glog v0.0.0-20210429001901-424d2337a529 // indirect
github.com/google/go-cmp v0.5.4 // indirect
@ -30,7 +29,6 @@ require (
github.com/gorilla/securecookie v1.1.1
github.com/gorilla/sessions v1.2.1
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/joncrlsn/dque v0.0.0-20200702023911-3e80e3146ce5
github.com/kaorimatz/go-opml v0.0.0-20210201121027-bc8e2852d7f9
github.com/kr/text v0.2.0 // indirect
github.com/kyokomi/emoji/v2 v2.2.8

5
go.sum
View File

@ -99,9 +99,6 @@ github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gG
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.6.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gofrs/flock v0.8.0 h1:MSdYClljsF3PbENUUEx85nkWfJSGfzYI9yEBZOJz6CY=
github.com/gofrs/flock v0.8.0/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/gddo v0.0.0-20180823221919-9d8ff1c67be5/go.mod h1:xEhNfoBDX1hzLm2Nf80qUvZ2sVwoMZ8d6IE2SrsQfh4=
@ -181,8 +178,6 @@ github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ=
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/joncrlsn/dque v0.0.0-20200702023911-3e80e3146ce5 h1:bo1aoO6l128nKJCBrFflOj9s+KPqMM7ErNyB5GGBNDs=
github.com/joncrlsn/dque v0.0.0-20200702023911-3e80e3146ce5/go.mod h1:dNKs71rs2VJGBAmttu7fouEsRQlRjxy0p1Sx+T5wbpY=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=

57
queue.go Normal file
View File

@ -0,0 +1,57 @@
package main
import (
"database/sql"
"errors"
"time"
"github.com/araddon/dateparse"
)
func enqueue(name string, content []byte, schedule time.Time) error {
if len(content) == 0 {
return errors.New("empty content")
}
_, err := appDbExec("insert into queue (name, content, schedule) values (@name, @content, @schedule)",
sql.Named("name", name), sql.Named("content", content), sql.Named("schedule", schedule.UTC().String()))
return err
}
type queueItem struct {
id int
name string
content []byte
schedule *time.Time
}
func (qi *queueItem) reschedule(dur time.Duration) error {
_, err := appDbExec("update queue set schedule = @schedule, content = @content where id = @id", sql.Named("schedule", qi.schedule.Add(dur).UTC().String()), sql.Named("content", qi.content), sql.Named("id", qi.id))
return err
}
func (qi *queueItem) dequeue() error {
_, err := appDbExec("delete from queue where id = @id", sql.Named("id", qi.id))
return err
}
func peekQueue(name string) (*queueItem, error) {
row, err := appDbQueryRow("select id, name, content, schedule from queue where schedule <= @schedule and name = @name order by schedule asc limit 1", sql.Named("name", name), sql.Named("schedule", time.Now().UTC().String()))
if err != nil {
return nil, err
}
qi := &queueItem{}
var timeString string
if err = row.Scan(&qi.id, &qi.name, &qi.content, &timeString); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
t, err := dateparse.ParseLocal(timeString)
if err != nil {
return nil, err
}
t = t.Local()
qi.schedule = &t
return qi, nil
}

View File

@ -121,9 +121,9 @@ func reverifyWebmention(id int) error {
return err
}
if len(m) > 0 {
queueMention(m[0])
err = queueMention(m[0])
}
return nil
return err
}
type webmentionsRequestConfig struct {

View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"database/sql"
"encoding/gob"
"errors"
"fmt"
"io"
@ -11,34 +12,15 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"os"
"strings"
"time"
"github.com/PuerkitoBio/goquery"
"github.com/joncrlsn/dque"
"github.com/thoas/go-funk"
"willnorris.com/go/microformats"
)
var wmQueue *dque.DQue
func wmMentionBuilder() interface{} {
return &mention{}
}
func initWebmentionQueue() (err error) {
queuePath := "queues"
if _, err := os.Stat(queuePath); os.IsNotExist(err) {
if err := os.Mkdir(queuePath, 0755); err != nil {
return err
}
} else if err != nil {
return err
}
wmQueue, err = dque.NewOrOpen("webmention", queuePath, 5, wmMentionBuilder)
if err != nil {
return err
}
startWebmentionQueue()
return nil
}
@ -46,21 +28,26 @@ func initWebmentionQueue() (err error) {
func startWebmentionQueue() {
go func() {
for {
if i, err := wmQueue.PeekBlock(); err == nil {
if i == nil {
// Empty request
_, _ = wmQueue.Dequeue()
time.Sleep(10 * time.Second)
qi, err := peekQueue("wm")
if err != nil {
log.Println(err.Error())
continue
} else if qi != nil {
var m mention
err = gob.NewDecoder(bytes.NewReader(qi.content)).Decode(&m)
if err != nil {
log.Println(err.Error())
_ = qi.dequeue()
continue
}
if m, ok := i.(*mention); ok {
err = m.verifyMention()
if err != nil {
log.Println(fmt.Sprintf("Failed to verify webmention from %s to %s: %s", m.Source, m.Target, err.Error()))
}
_, _ = wmQueue.Dequeue()
} else {
// Invalid type
_, _ = wmQueue.Dequeue()
err = m.verifyMention()
if err != nil {
log.Println(fmt.Sprintf("Failed to verify webmention from %s to %s: %s", m.Source, m.Target, err.Error()))
}
err = qi.dequeue()
if err != nil {
log.Println(err.Error())
}
}
}
@ -71,7 +58,11 @@ func queueMention(m *mention) error {
if wm := appConfig.Webmention; wm != nil && wm.DisableReceiving {
return errors.New("webmention receiving disabled")
}
return wmQueue.Enqueue(m)
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(m); err != nil {
return err
}
return enqueue("wm", buf.Bytes(), time.Now())
}
func (m *mention) verifyMention() error {