More linters and some contexts, improved queue listening

This commit is contained in:
Jan-Lukas Else 2022-02-23 10:23:26 +01:00
parent 0437f61ba9
commit 890e226ef6
9 changed files with 136 additions and 99 deletions

View File

@ -26,3 +26,13 @@ linters:
- prealloc
- unparam
- durationcheck
- bidichk
- containedctx
- contextcheck
linters-settings:
gosimple:
go: "1.17"
checks: ["all"]
gostatichcheck:
go: "1.17"
checks: ["all"]

View File

@ -10,7 +10,6 @@ import (
"log"
"net/http"
"net/url"
"sync"
"time"
"go.goblog.app/app/pkgs/bufferpool"
@ -24,51 +23,28 @@ type apRequest struct {
}
func (a *goBlog) initAPSendQueue() {
go func() {
done := false
var wg sync.WaitGroup
wg.Add(1)
a.shutdown.Add(func() {
done = true
wg.Wait()
log.Println("Stopped AP send queue")
})
for !done {
qi, err := a.db.peekQueue("ap")
if err != nil {
log.Println("activitypub send queue:", err.Error())
continue
}
if qi == nil {
// No item in the queue, wait a moment
time.Sleep(5 * time.Second)
continue
}
a.listenOnQueue("ap", 15*time.Second, func(qi *queueItem, dequeue func(), reschedule func(time.Duration)) {
var r apRequest
if err = gob.NewDecoder(bytes.NewReader(qi.content)).Decode(&r); err != nil {
log.Println("activitypub send queue:", err.Error())
_ = a.db.dequeue(qi)
continue
if err := gob.NewDecoder(bytes.NewReader(qi.content)).Decode(&r); err != nil {
log.Println("activitypub queue:", err.Error())
dequeue()
return
}
if err = a.apSendSigned(r.BlogIri, r.To, r.Activity); err != nil {
if err := a.apSendSigned(r.BlogIri, r.To, r.Activity); err != nil {
if r.Try++; r.Try < 20 {
// Try it again
buf := bufferpool.Get()
_ = r.encode(buf)
qi.content = buf.Bytes()
_ = a.db.reschedule(qi, time.Duration(r.Try)*10*time.Minute)
reschedule(time.Duration(r.Try) * 10 * time.Minute)
bufferpool.Put(buf)
continue
return
}
log.Println("AP request failed for the 20th time:", r.To)
_ = a.db.apRemoveInbox(r.To)
}
if err = a.db.dequeue(qi); err != nil {
log.Println("activitypub send queue:", err.Error())
}
}
wg.Done()
}()
dequeue()
})
}
func (db *database) apQueueSendSigned(blogIri, to string, activity interface{}) error {

View File

@ -187,6 +187,10 @@ func (db *database) prepare(query string, args ...interface{}) (*sql.Stmt, []int
const dbNoCache = "nocache"
func (db *database) exec(query string, args ...interface{}) (sql.Result, error) {
return db.execContext(context.Background(), query, args...)
}
func (db *database) execContext(c context.Context, query string, args ...interface{}) (sql.Result, error) {
if db == nil || db.db == nil {
return nil, errors.New("database not initialized")
}
@ -196,7 +200,7 @@ func (db *database) exec(query string, args ...interface{}) (sql.Result, error)
db.em.Lock()
defer db.em.Unlock()
// Prepare context, call hook
ctx := db.dbBefore(context.Background(), query, args...)
ctx := db.dbBefore(c, query, args...)
defer db.dbAfter(ctx, query, args...)
// Execute
if st != nil {
@ -222,13 +226,17 @@ func (db *database) query(query string, args ...interface{}) (*sql.Rows, error)
}
func (db *database) queryRow(query string, args ...interface{}) (*sql.Row, error) {
return db.queryRowContext(context.Background(), query, args...)
}
func (db *database) queryRowContext(c context.Context, query string, args ...interface{}) (*sql.Row, error) {
if db == nil || db.db == nil {
return nil, errors.New("database not initialized")
}
// Maybe prepare
st, args, _ := db.prepare(query, args...)
// Prepare context, call hook
ctx := db.dbBefore(context.Background(), query, args...)
ctx := db.dbBefore(c, query, args...)
defer db.dbAfter(ctx, query, args...)
// Query
if st != nil {

View File

@ -21,11 +21,11 @@ func (c *httpsCache) check() error {
return nil
}
func (c *httpsCache) Get(_ context.Context, key string) ([]byte, error) {
func (c *httpsCache) Get(ctx context.Context, key string) ([]byte, error) {
if err := c.check(); err != nil {
return nil, err
}
d, err := c.db.retrievePersistentCache("https_" + key)
d, err := c.db.retrievePersistentCacheContext(ctx, "https_"+key)
if d == nil && err == nil {
return nil, autocert.ErrCacheMiss
} else if err != nil {
@ -34,16 +34,16 @@ func (c *httpsCache) Get(_ context.Context, key string) ([]byte, error) {
return d, nil
}
func (c *httpsCache) Put(_ context.Context, key string, data []byte) error {
func (c *httpsCache) Put(ctx context.Context, key string, data []byte) error {
if err := c.check(); err != nil {
return err
}
return c.db.cachePersistently("https_"+key, data)
return c.db.cachePersistentlyContext(ctx, "https_"+key, data)
}
func (c *httpsCache) Delete(_ context.Context, key string) error {
func (c *httpsCache) Delete(ctx context.Context, key string) error {
if err := c.check(); err != nil {
return err
}
return c.db.clearPersistentCache("https_" + key)
return c.db.clearPersistentCacheContext(ctx, "https_"+key)
}

View File

@ -1,24 +1,33 @@
package main
import (
"context"
"database/sql"
"errors"
)
func (db *database) cachePersistently(key string, data []byte) error {
return db.cachePersistentlyContext(context.Background(), key, data)
}
func (db *database) cachePersistentlyContext(ctx context.Context, key string, data []byte) error {
if db == nil {
return errors.New("database is nil")
}
_, err := db.exec("insert or replace into persistent_cache(key, data, date) values(@key, @data, @date)", sql.Named("key", key), sql.Named("data", data), sql.Named("date", utcNowString()))
_, err := db.execContext(ctx, "insert or replace into persistent_cache(key, data, date) values(@key, @data, @date)", sql.Named("key", key), sql.Named("data", data), sql.Named("date", utcNowString()))
return err
}
func (db *database) retrievePersistentCache(key string) (data []byte, err error) {
return db.retrievePersistentCacheContext(context.Background(), key)
}
func (db *database) retrievePersistentCacheContext(c context.Context, key string) (data []byte, err error) {
if db == nil {
return nil, errors.New("database is nil")
}
d, err, _ := db.pc.Do(key, func() (interface{}, error) {
if row, err := db.queryRow("select data from persistent_cache where key = @key", sql.Named("key", key)); err != nil {
if row, err := db.queryRowContext(c, "select data from persistent_cache where key = @key", sql.Named("key", key)); err != nil {
return nil, err
} else {
err = row.Scan(&data)
@ -38,6 +47,10 @@ func (db *database) retrievePersistentCache(key string) (data []byte, err error)
}
func (db *database) clearPersistentCache(pattern string) error {
_, err := db.exec("delete from persistent_cache where key like @pattern", sql.Named("pattern", pattern))
return db.clearPersistentCacheContext(context.Background(), pattern)
}
func (db *database) clearPersistentCacheContext(c context.Context, pattern string) error {
_, err := db.execContext(c, "delete from persistent_cache where key like @pattern", sql.Named("pattern", pattern))
return err
}

View File

@ -1,8 +1,11 @@
package main
import (
"context"
"database/sql"
"errors"
"log"
"sync"
"time"
"github.com/araddon/dateparse"
@ -43,8 +46,9 @@ func (db *database) dequeue(qi *queueItem) error {
return err
}
func (db *database) peekQueue(name string) (*queueItem, error) {
row, err := db.queryRow(
func (db *database) peekQueue(ctx context.Context, name string) (*queueItem, error) {
row, err := db.queryRowContext(
ctx,
"select id, name, content, schedule from queue where schedule <= @schedule and name = @name order by schedule asc limit 1",
sql.Named("name", name),
sql.Named("schedule", time.Now().UTC().Format(time.RFC3339Nano)),
@ -67,3 +71,51 @@ func (db *database) peekQueue(name string) (*queueItem, error) {
qi.schedule = t
return qi, nil
}
type queueProcessFunc func(qi *queueItem, dequeue func(), reschedule func(time.Duration))
func (a *goBlog) listenOnQueue(queueName string, wait time.Duration, process queueProcessFunc) {
go func() {
done := false
var wg sync.WaitGroup
wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a.shutdown.Add(func() {
done = true
cancel()
wg.Wait()
log.Println("Stopped queue:", queueName)
})
for !done {
qi, err := a.db.peekQueue(ctx, queueName)
if err != nil {
log.Println("queue error:", err.Error())
continue
}
if qi == nil {
// No item in the queue, wait a moment
select {
case <-time.After(wait):
continue
case <-ctx.Done():
continue
}
}
process(
qi,
func() {
if err := a.db.dequeue(qi); err != nil {
log.Println("queue dequeue error:", err.Error())
}
},
func(dur time.Duration) {
if err := a.db.reschedule(qi, dur); err != nil {
log.Println("queue reschedule error:", err.Error())
}
},
)
}
wg.Done()
}()
}

View File

@ -1,6 +1,7 @@
package main
import (
"context"
"path/filepath"
"testing"
"time"
@ -30,11 +31,11 @@ func Test_queue(t *testing.T) {
err = db.enqueue("test", []byte("2"), time.Now())
require.NoError(t, err)
qi, err := db.peekQueue("abc")
qi, err := db.peekQueue(context.Background(), "abc")
require.NoError(t, err)
require.Nil(t, qi)
qi, err = db.peekQueue("test")
qi, err = db.peekQueue(context.Background(), "test")
require.NoError(t, err)
require.NotNil(t, qi)
require.Equal(t, []byte("1"), qi.content)
@ -42,7 +43,7 @@ func Test_queue(t *testing.T) {
err = db.reschedule(qi, 1*time.Second)
require.NoError(t, err)
qi, err = db.peekQueue("test")
qi, err = db.peekQueue(context.Background(), "test")
require.NoError(t, err)
require.NotNil(t, qi)
require.Equal(t, []byte("2"), qi.content)
@ -50,13 +51,13 @@ func Test_queue(t *testing.T) {
err = db.dequeue(qi)
require.NoError(t, err)
qi, err = db.peekQueue("test")
qi, err = db.peekQueue(context.Background(), "test")
require.NoError(t, err)
require.Nil(t, qi)
time.Sleep(1 * time.Second)
qi, err = db.peekQueue("test")
qi, err = db.peekQueue(context.Background(), "test")
require.NoError(t, err)
require.NotNil(t, qi)
require.Equal(t, []byte("1"), qi.content)

View File

@ -336,6 +336,7 @@ func saveToFile(reader io.Reader, fileName string) error {
return err
}
//nolint:containedctx
type valueOnlyContext struct {
context.Context
}

View File

@ -11,7 +11,6 @@ import (
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/PuerkitoBio/goquery"
@ -22,41 +21,18 @@ import (
)
func (a *goBlog) initWebmentionQueue() {
go func() {
done := false
var wg sync.WaitGroup
wg.Add(1)
a.shutdown.Add(func() {
done = true
wg.Wait()
log.Println("Stopped webmention queue")
})
for !done {
qi, err := a.db.peekQueue("wm")
if err != nil {
log.Println("webmention queue:", err.Error())
continue
}
if qi == nil {
// No item in the queue, wait a moment
time.Sleep(5 * time.Second)
continue
}
a.listenOnQueue("wm", 15*time.Second, func(qi *queueItem, dequeue func(), reschedule func(time.Duration)) {
var m mention
if err = gob.NewDecoder(bytes.NewReader(qi.content)).Decode(&m); err != nil {
if err := gob.NewDecoder(bytes.NewReader(qi.content)).Decode(&m); err != nil {
log.Println("webmention queue:", err.Error())
_ = a.db.dequeue(qi)
continue
dequeue()
return
}
if err = a.verifyMention(&m); err != nil {
if err := a.verifyMention(&m); err != nil {
log.Println(fmt.Sprintf("Failed to verify webmention from %s to %s: %s", m.Source, m.Target, err.Error()))
}
if err = a.db.dequeue(qi); err != nil {
log.Println("webmention queue:", err.Error())
}
}
wg.Done()
}()
dequeue()
})
}
func (a *goBlog) queueMention(m *mention) error {