From bc1a97879357c17ae1928c3b1d4a7f3411d0b8cc Mon Sep 17 00:00:00 2001 From: Jan-Lukas Else Date: Mon, 24 May 2021 09:12:46 +0200 Subject: [PATCH] SQLite based queue (option to reschedule item in queue) --- activityPubSending.go | 89 ++++++++++++++++++--------------------- databaseMigrations.go | 9 ++++ go.mod | 2 - go.sum | 5 --- queue.go | 57 +++++++++++++++++++++++++ webmention.go | 4 +- webmentionVerification.go | 59 +++++++++++--------------- 7 files changed, 133 insertions(+), 92 deletions(-) create mode 100644 queue.go diff --git a/activityPubSending.go b/activityPubSending.go index f152fa7..3d51daa 100644 --- a/activityPubSending.go +++ b/activityPubSending.go @@ -3,44 +3,23 @@ package main import ( "bytes" "context" + "encoding/gob" "encoding/json" "fmt" "io" "log" "net/http" "net/url" - "os" "time" - - "github.com/joncrlsn/dque" ) -var apQueue *dque.DQue - type apRequest struct { BlogIri, To string Activity []byte Try int - LastTry int64 -} - -func apRequestBuilder() interface{} { - return &apRequest{} } func initAPSendQueue() (err error) { - queuePath := "queues" - if _, err := os.Stat(queuePath); os.IsNotExist(err) { - if err = os.Mkdir(queuePath, 0755); err != nil { - return err - } - } else if err != nil { - return err - } - apQueue, err = dque.NewOrOpen("activitypub", queuePath, 1000, apRequestBuilder) - if err != nil { - return err - } startAPSendQueue() return nil } @@ -48,35 +27,34 @@ func initAPSendQueue() (err error) { func startAPSendQueue() { go func() { for { - if rInterface, err := apQueue.PeekBlock(); err == nil { - if rInterface == nil { - // Empty request - _, _ = apQueue.Dequeue() + time.Sleep(3 * time.Second) + qi, err := peekQueue("ap") + if err != nil { + log.Println(err.Error()) + continue + } else if qi != nil { + var r apRequest + err = gob.NewDecoder(bytes.NewReader(qi.content)).Decode(&r) + if err != nil { + log.Println(err.Error()) + _ = qi.dequeue() continue } - if r, ok := rInterface.(*apRequest); ok { - if r.LastTry != 0 && time.Now().Before(time.Unix(r.LastTry, 0).Add(time.Duration(r.Try)*10*time.Minute)) { - _ = apQueue.Enqueue(r) + if err := apSendSigned(r.BlogIri, r.To, r.Activity); err != nil { + if r.Try++; r.Try < 20 { + // Try it again + qi.content, _ = r.encode() + _ = qi.reschedule(time.Duration(r.Try) * 10 * time.Minute) + continue } else { - // Send request - if err := apSendSigned(r.BlogIri, r.To, r.Activity); err != nil { - if r.Try++; r.Try < 21 { - // Try it again - r.LastTry = time.Now().Unix() - _ = apQueue.Enqueue(r) - } else { - log.Printf("Request to %s failed for the 20th time", r.To) - log.Println() - _ = apRemoveInbox(r.To) - } - } + log.Printf("Request to %s failed for the 20th time", r.To) + log.Println() + _ = apRemoveInbox(r.To) } - // Finish - _, _ = apQueue.Dequeue() - time.Sleep(1 * time.Second) - } else { - // Invalid type - _, _ = apQueue.Dequeue() + } + err = qi.dequeue() + if err != nil { + log.Println(err.Error()) } } } @@ -88,11 +66,24 @@ func apQueueSendSigned(blogIri, to string, activity interface{}) error { if err != nil { return err } - return apQueue.Enqueue(&apRequest{ + b, err := (&apRequest{ BlogIri: blogIri, To: to, Activity: body, - }) + }).encode() + if err != nil { + return err + } + return enqueue("ap", b, time.Now()) +} + +func (r *apRequest) encode() ([]byte, error) { + var buf bytes.Buffer + err := gob.NewEncoder(&buf).Encode(r) + if err != nil { + return nil, err + } + return buf.Bytes(), nil } func apSendSigned(blogIri, to string, activity []byte) error { diff --git a/databaseMigrations.go b/databaseMigrations.go index 62f4882..eee1806 100644 --- a/databaseMigrations.go +++ b/databaseMigrations.go @@ -157,6 +157,15 @@ func migrateDb() error { return err }, }, + &migrator.Migration{ + Name: "00014", + Func: func(tx *sql.Tx) error { + _, err := tx.Exec(` + create table queue (id integer primary key autoincrement, name text not null, content blob, schedule text not null); + `) + return err + }, + }, ), ) if err != nil { diff --git a/go.mod b/go.mod index 1af8412..2bf6ccb 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,6 @@ require ( github.com/go-chi/chi/v5 v5.0.3 github.com/go-fed/httpsig v1.1.0 github.com/go-sql-driver/mysql v1.5.0 // indirect - github.com/gofrs/flock v0.8.0 // indirect github.com/golang/gddo v0.0.0-20210115222349-20d68f94ee1f // indirect github.com/golang/glog v0.0.0-20210429001901-424d2337a529 // indirect github.com/google/go-cmp v0.5.4 // indirect @@ -30,7 +29,6 @@ require ( github.com/gorilla/securecookie v1.1.1 github.com/gorilla/sessions v1.2.1 github.com/jonboulle/clockwork v0.2.2 // indirect - github.com/joncrlsn/dque v0.0.0-20200702023911-3e80e3146ce5 github.com/kaorimatz/go-opml v0.0.0-20210201121027-bc8e2852d7f9 github.com/kr/text v0.2.0 // indirect github.com/kyokomi/emoji/v2 v2.2.8 diff --git a/go.sum b/go.sum index c72bdc4..23063f0 100644 --- a/go.sum +++ b/go.sum @@ -99,9 +99,6 @@ github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gG github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.6.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= -github.com/gofrs/flock v0.8.0 h1:MSdYClljsF3PbENUUEx85nkWfJSGfzYI9yEBZOJz6CY= -github.com/gofrs/flock v0.8.0/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/gddo v0.0.0-20180823221919-9d8ff1c67be5/go.mod h1:xEhNfoBDX1hzLm2Nf80qUvZ2sVwoMZ8d6IE2SrsQfh4= @@ -181,8 +178,6 @@ github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/ github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ= github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= -github.com/joncrlsn/dque v0.0.0-20200702023911-3e80e3146ce5 h1:bo1aoO6l128nKJCBrFflOj9s+KPqMM7ErNyB5GGBNDs= -github.com/joncrlsn/dque v0.0.0-20200702023911-3e80e3146ce5/go.mod h1:dNKs71rs2VJGBAmttu7fouEsRQlRjxy0p1Sx+T5wbpY= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..04b3bad --- /dev/null +++ b/queue.go @@ -0,0 +1,57 @@ +package main + +import ( + "database/sql" + "errors" + "time" + + "github.com/araddon/dateparse" +) + +func enqueue(name string, content []byte, schedule time.Time) error { + if len(content) == 0 { + return errors.New("empty content") + } + _, err := appDbExec("insert into queue (name, content, schedule) values (@name, @content, @schedule)", + sql.Named("name", name), sql.Named("content", content), sql.Named("schedule", schedule.UTC().String())) + return err +} + +type queueItem struct { + id int + name string + content []byte + schedule *time.Time +} + +func (qi *queueItem) reschedule(dur time.Duration) error { + _, err := appDbExec("update queue set schedule = @schedule, content = @content where id = @id", sql.Named("schedule", qi.schedule.Add(dur).UTC().String()), sql.Named("content", qi.content), sql.Named("id", qi.id)) + return err +} + +func (qi *queueItem) dequeue() error { + _, err := appDbExec("delete from queue where id = @id", sql.Named("id", qi.id)) + return err +} + +func peekQueue(name string) (*queueItem, error) { + row, err := appDbQueryRow("select id, name, content, schedule from queue where schedule <= @schedule and name = @name order by schedule asc limit 1", sql.Named("name", name), sql.Named("schedule", time.Now().UTC().String())) + if err != nil { + return nil, err + } + qi := &queueItem{} + var timeString string + if err = row.Scan(&qi.id, &qi.name, &qi.content, &timeString); err != nil { + if err == sql.ErrNoRows { + return nil, nil + } + return nil, err + } + t, err := dateparse.ParseLocal(timeString) + if err != nil { + return nil, err + } + t = t.Local() + qi.schedule = &t + return qi, nil +} diff --git a/webmention.go b/webmention.go index d471f49..494ea22 100644 --- a/webmention.go +++ b/webmention.go @@ -121,9 +121,9 @@ func reverifyWebmention(id int) error { return err } if len(m) > 0 { - queueMention(m[0]) + err = queueMention(m[0]) } - return nil + return err } type webmentionsRequestConfig struct { diff --git a/webmentionVerification.go b/webmentionVerification.go index e834abe..acc1bd9 100644 --- a/webmentionVerification.go +++ b/webmentionVerification.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "database/sql" + "encoding/gob" "errors" "fmt" "io" @@ -11,34 +12,15 @@ import ( "net/http" "net/http/httptest" "net/url" - "os" "strings" + "time" "github.com/PuerkitoBio/goquery" - "github.com/joncrlsn/dque" "github.com/thoas/go-funk" "willnorris.com/go/microformats" ) -var wmQueue *dque.DQue - -func wmMentionBuilder() interface{} { - return &mention{} -} - func initWebmentionQueue() (err error) { - queuePath := "queues" - if _, err := os.Stat(queuePath); os.IsNotExist(err) { - if err := os.Mkdir(queuePath, 0755); err != nil { - return err - } - } else if err != nil { - return err - } - wmQueue, err = dque.NewOrOpen("webmention", queuePath, 5, wmMentionBuilder) - if err != nil { - return err - } startWebmentionQueue() return nil } @@ -46,21 +28,26 @@ func initWebmentionQueue() (err error) { func startWebmentionQueue() { go func() { for { - if i, err := wmQueue.PeekBlock(); err == nil { - if i == nil { - // Empty request - _, _ = wmQueue.Dequeue() + time.Sleep(10 * time.Second) + qi, err := peekQueue("wm") + if err != nil { + log.Println(err.Error()) + continue + } else if qi != nil { + var m mention + err = gob.NewDecoder(bytes.NewReader(qi.content)).Decode(&m) + if err != nil { + log.Println(err.Error()) + _ = qi.dequeue() continue } - if m, ok := i.(*mention); ok { - err = m.verifyMention() - if err != nil { - log.Println(fmt.Sprintf("Failed to verify webmention from %s to %s: %s", m.Source, m.Target, err.Error())) - } - _, _ = wmQueue.Dequeue() - } else { - // Invalid type - _, _ = wmQueue.Dequeue() + err = m.verifyMention() + if err != nil { + log.Println(fmt.Sprintf("Failed to verify webmention from %s to %s: %s", m.Source, m.Target, err.Error())) + } + err = qi.dequeue() + if err != nil { + log.Println(err.Error()) } } } @@ -71,7 +58,11 @@ func queueMention(m *mention) error { if wm := appConfig.Webmention; wm != nil && wm.DisableReceiving { return errors.New("webmention receiving disabled") } - return wmQueue.Enqueue(m) + var buf bytes.Buffer + if err := gob.NewEncoder(&buf).Encode(m); err != nil { + return err + } + return enqueue("wm", buf.Bytes(), time.Now()) } func (m *mention) verifyMention() error {