From 890e226ef62f13c43797461639a4e3fd2d0df84d Mon Sep 17 00:00:00 2001 From: Jan-Lukas Else Date: Wed, 23 Feb 2022 10:23:26 +0100 Subject: [PATCH] More linters and some contexts, improved queue listening --- .golangci.yml | 12 ++++++- activityPubSending.go | 66 +++++++++++++-------------------------- database.go | 12 +++++-- httpsCache.go | 12 +++---- persistentCache.go | 19 +++++++++-- queue.go | 56 +++++++++++++++++++++++++++++++-- queue_test.go | 11 ++++--- utils.go | 1 + webmentionVerification.go | 46 +++++++-------------------- 9 files changed, 136 insertions(+), 99 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index d79e883..811e135 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -25,4 +25,14 @@ linters: - noctx - prealloc - unparam - - durationcheck \ No newline at end of file + - durationcheck + - bidichk + - containedctx + - contextcheck +linters-settings: + gosimple: + go: "1.17" + checks: ["all"] + gostatichcheck: + go: "1.17" + checks: ["all"] \ No newline at end of file diff --git a/activityPubSending.go b/activityPubSending.go index 3d3288f..91861f5 100644 --- a/activityPubSending.go +++ b/activityPubSending.go @@ -10,7 +10,6 @@ import ( "log" "net/http" "net/url" - "sync" "time" "go.goblog.app/app/pkgs/bufferpool" @@ -24,51 +23,28 @@ type apRequest struct { } func (a *goBlog) initAPSendQueue() { - go func() { - done := false - var wg sync.WaitGroup - wg.Add(1) - a.shutdown.Add(func() { - done = true - wg.Wait() - log.Println("Stopped AP send queue") - }) - for !done { - qi, err := a.db.peekQueue("ap") - if err != nil { - log.Println("activitypub send queue:", err.Error()) - continue - } - if qi == nil { - // No item in the queue, wait a moment - time.Sleep(5 * time.Second) - continue - } - var r apRequest - if err = gob.NewDecoder(bytes.NewReader(qi.content)).Decode(&r); err != nil { - log.Println("activitypub send queue:", err.Error()) - _ = a.db.dequeue(qi) - continue - } - if err = a.apSendSigned(r.BlogIri, r.To, r.Activity); err != nil { - if r.Try++; r.Try < 20 { - // Try it again - buf := bufferpool.Get() - _ = r.encode(buf) - qi.content = buf.Bytes() - _ = a.db.reschedule(qi, time.Duration(r.Try)*10*time.Minute) - bufferpool.Put(buf) - continue - } - log.Println("AP request failed for the 20th time:", r.To) - _ = a.db.apRemoveInbox(r.To) - } - if err = a.db.dequeue(qi); err != nil { - log.Println("activitypub send queue:", err.Error()) - } + a.listenOnQueue("ap", 15*time.Second, func(qi *queueItem, dequeue func(), reschedule func(time.Duration)) { + var r apRequest + if err := gob.NewDecoder(bytes.NewReader(qi.content)).Decode(&r); err != nil { + log.Println("activitypub queue:", err.Error()) + dequeue() + return } - wg.Done() - }() + if err := a.apSendSigned(r.BlogIri, r.To, r.Activity); err != nil { + if r.Try++; r.Try < 20 { + // Try it again + buf := bufferpool.Get() + _ = r.encode(buf) + qi.content = buf.Bytes() + reschedule(time.Duration(r.Try) * 10 * time.Minute) + bufferpool.Put(buf) + return + } + log.Println("AP request failed for the 20th time:", r.To) + _ = a.db.apRemoveInbox(r.To) + } + dequeue() + }) } func (db *database) apQueueSendSigned(blogIri, to string, activity interface{}) error { diff --git a/database.go b/database.go index 6eef4fa..8efffb0 100644 --- a/database.go +++ b/database.go @@ -187,6 +187,10 @@ func (db *database) prepare(query string, args ...interface{}) (*sql.Stmt, []int const dbNoCache = "nocache" func (db *database) exec(query string, args ...interface{}) (sql.Result, error) { + return db.execContext(context.Background(), query, args...) +} + +func (db *database) execContext(c context.Context, query string, args ...interface{}) (sql.Result, error) { if db == nil || db.db == nil { return nil, errors.New("database not initialized") } @@ -196,7 +200,7 @@ func (db *database) exec(query string, args ...interface{}) (sql.Result, error) db.em.Lock() defer db.em.Unlock() // Prepare context, call hook - ctx := db.dbBefore(context.Background(), query, args...) + ctx := db.dbBefore(c, query, args...) defer db.dbAfter(ctx, query, args...) // Execute if st != nil { @@ -222,13 +226,17 @@ func (db *database) query(query string, args ...interface{}) (*sql.Rows, error) } func (db *database) queryRow(query string, args ...interface{}) (*sql.Row, error) { + return db.queryRowContext(context.Background(), query, args...) +} + +func (db *database) queryRowContext(c context.Context, query string, args ...interface{}) (*sql.Row, error) { if db == nil || db.db == nil { return nil, errors.New("database not initialized") } // Maybe prepare st, args, _ := db.prepare(query, args...) // Prepare context, call hook - ctx := db.dbBefore(context.Background(), query, args...) + ctx := db.dbBefore(c, query, args...) defer db.dbAfter(ctx, query, args...) // Query if st != nil { diff --git a/httpsCache.go b/httpsCache.go index b47a410..a4941fc 100644 --- a/httpsCache.go +++ b/httpsCache.go @@ -21,11 +21,11 @@ func (c *httpsCache) check() error { return nil } -func (c *httpsCache) Get(_ context.Context, key string) ([]byte, error) { +func (c *httpsCache) Get(ctx context.Context, key string) ([]byte, error) { if err := c.check(); err != nil { return nil, err } - d, err := c.db.retrievePersistentCache("https_" + key) + d, err := c.db.retrievePersistentCacheContext(ctx, "https_"+key) if d == nil && err == nil { return nil, autocert.ErrCacheMiss } else if err != nil { @@ -34,16 +34,16 @@ func (c *httpsCache) Get(_ context.Context, key string) ([]byte, error) { return d, nil } -func (c *httpsCache) Put(_ context.Context, key string, data []byte) error { +func (c *httpsCache) Put(ctx context.Context, key string, data []byte) error { if err := c.check(); err != nil { return err } - return c.db.cachePersistently("https_"+key, data) + return c.db.cachePersistentlyContext(ctx, "https_"+key, data) } -func (c *httpsCache) Delete(_ context.Context, key string) error { +func (c *httpsCache) Delete(ctx context.Context, key string) error { if err := c.check(); err != nil { return err } - return c.db.clearPersistentCache("https_" + key) + return c.db.clearPersistentCacheContext(ctx, "https_"+key) } diff --git a/persistentCache.go b/persistentCache.go index 1699fa7..44722af 100644 --- a/persistentCache.go +++ b/persistentCache.go @@ -1,24 +1,33 @@ package main import ( + "context" "database/sql" "errors" ) func (db *database) cachePersistently(key string, data []byte) error { + return db.cachePersistentlyContext(context.Background(), key, data) +} + +func (db *database) cachePersistentlyContext(ctx context.Context, key string, data []byte) error { if db == nil { return errors.New("database is nil") } - _, err := db.exec("insert or replace into persistent_cache(key, data, date) values(@key, @data, @date)", sql.Named("key", key), sql.Named("data", data), sql.Named("date", utcNowString())) + _, err := db.execContext(ctx, "insert or replace into persistent_cache(key, data, date) values(@key, @data, @date)", sql.Named("key", key), sql.Named("data", data), sql.Named("date", utcNowString())) return err } func (db *database) retrievePersistentCache(key string) (data []byte, err error) { + return db.retrievePersistentCacheContext(context.Background(), key) +} + +func (db *database) retrievePersistentCacheContext(c context.Context, key string) (data []byte, err error) { if db == nil { return nil, errors.New("database is nil") } d, err, _ := db.pc.Do(key, func() (interface{}, error) { - if row, err := db.queryRow("select data from persistent_cache where key = @key", sql.Named("key", key)); err != nil { + if row, err := db.queryRowContext(c, "select data from persistent_cache where key = @key", sql.Named("key", key)); err != nil { return nil, err } else { err = row.Scan(&data) @@ -38,6 +47,10 @@ func (db *database) retrievePersistentCache(key string) (data []byte, err error) } func (db *database) clearPersistentCache(pattern string) error { - _, err := db.exec("delete from persistent_cache where key like @pattern", sql.Named("pattern", pattern)) + return db.clearPersistentCacheContext(context.Background(), pattern) +} + +func (db *database) clearPersistentCacheContext(c context.Context, pattern string) error { + _, err := db.execContext(c, "delete from persistent_cache where key like @pattern", sql.Named("pattern", pattern)) return err } diff --git a/queue.go b/queue.go index 493c4ec..ff05a54 100644 --- a/queue.go +++ b/queue.go @@ -1,8 +1,11 @@ package main import ( + "context" "database/sql" "errors" + "log" + "sync" "time" "github.com/araddon/dateparse" @@ -43,8 +46,9 @@ func (db *database) dequeue(qi *queueItem) error { return err } -func (db *database) peekQueue(name string) (*queueItem, error) { - row, err := db.queryRow( +func (db *database) peekQueue(ctx context.Context, name string) (*queueItem, error) { + row, err := db.queryRowContext( + ctx, "select id, name, content, schedule from queue where schedule <= @schedule and name = @name order by schedule asc limit 1", sql.Named("name", name), sql.Named("schedule", time.Now().UTC().Format(time.RFC3339Nano)), @@ -67,3 +71,51 @@ func (db *database) peekQueue(name string) (*queueItem, error) { qi.schedule = t return qi, nil } + +type queueProcessFunc func(qi *queueItem, dequeue func(), reschedule func(time.Duration)) + +func (a *goBlog) listenOnQueue(queueName string, wait time.Duration, process queueProcessFunc) { + go func() { + done := false + var wg sync.WaitGroup + wg.Add(1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + a.shutdown.Add(func() { + done = true + cancel() + wg.Wait() + log.Println("Stopped queue:", queueName) + }) + for !done { + qi, err := a.db.peekQueue(ctx, queueName) + if err != nil { + log.Println("queue error:", err.Error()) + continue + } + if qi == nil { + // No item in the queue, wait a moment + select { + case <-time.After(wait): + continue + case <-ctx.Done(): + continue + } + } + process( + qi, + func() { + if err := a.db.dequeue(qi); err != nil { + log.Println("queue dequeue error:", err.Error()) + } + }, + func(dur time.Duration) { + if err := a.db.reschedule(qi, dur); err != nil { + log.Println("queue reschedule error:", err.Error()) + } + }, + ) + } + wg.Done() + }() +} diff --git a/queue_test.go b/queue_test.go index 7c06304..9c68a2c 100644 --- a/queue_test.go +++ b/queue_test.go @@ -1,6 +1,7 @@ package main import ( + "context" "path/filepath" "testing" "time" @@ -30,11 +31,11 @@ func Test_queue(t *testing.T) { err = db.enqueue("test", []byte("2"), time.Now()) require.NoError(t, err) - qi, err := db.peekQueue("abc") + qi, err := db.peekQueue(context.Background(), "abc") require.NoError(t, err) require.Nil(t, qi) - qi, err = db.peekQueue("test") + qi, err = db.peekQueue(context.Background(), "test") require.NoError(t, err) require.NotNil(t, qi) require.Equal(t, []byte("1"), qi.content) @@ -42,7 +43,7 @@ func Test_queue(t *testing.T) { err = db.reschedule(qi, 1*time.Second) require.NoError(t, err) - qi, err = db.peekQueue("test") + qi, err = db.peekQueue(context.Background(), "test") require.NoError(t, err) require.NotNil(t, qi) require.Equal(t, []byte("2"), qi.content) @@ -50,13 +51,13 @@ func Test_queue(t *testing.T) { err = db.dequeue(qi) require.NoError(t, err) - qi, err = db.peekQueue("test") + qi, err = db.peekQueue(context.Background(), "test") require.NoError(t, err) require.Nil(t, qi) time.Sleep(1 * time.Second) - qi, err = db.peekQueue("test") + qi, err = db.peekQueue(context.Background(), "test") require.NoError(t, err) require.NotNil(t, qi) require.Equal(t, []byte("1"), qi.content) diff --git a/utils.go b/utils.go index 3c0f02c..50e03fe 100644 --- a/utils.go +++ b/utils.go @@ -336,6 +336,7 @@ func saveToFile(reader io.Reader, fileName string) error { return err } +//nolint:containedctx type valueOnlyContext struct { context.Context } diff --git a/webmentionVerification.go b/webmentionVerification.go index e37f31c..8e1d8af 100644 --- a/webmentionVerification.go +++ b/webmentionVerification.go @@ -11,7 +11,6 @@ import ( "net/http" "net/url" "strings" - "sync" "time" "github.com/PuerkitoBio/goquery" @@ -22,41 +21,18 @@ import ( ) func (a *goBlog) initWebmentionQueue() { - go func() { - done := false - var wg sync.WaitGroup - wg.Add(1) - a.shutdown.Add(func() { - done = true - wg.Wait() - log.Println("Stopped webmention queue") - }) - for !done { - qi, err := a.db.peekQueue("wm") - if err != nil { - log.Println("webmention queue:", err.Error()) - continue - } - if qi == nil { - // No item in the queue, wait a moment - time.Sleep(5 * time.Second) - continue - } - var m mention - if err = gob.NewDecoder(bytes.NewReader(qi.content)).Decode(&m); err != nil { - log.Println("webmention queue:", err.Error()) - _ = a.db.dequeue(qi) - continue - } - if err = a.verifyMention(&m); err != nil { - log.Println(fmt.Sprintf("Failed to verify webmention from %s to %s: %s", m.Source, m.Target, err.Error())) - } - if err = a.db.dequeue(qi); err != nil { - log.Println("webmention queue:", err.Error()) - } + a.listenOnQueue("wm", 15*time.Second, func(qi *queueItem, dequeue func(), reschedule func(time.Duration)) { + var m mention + if err := gob.NewDecoder(bytes.NewReader(qi.content)).Decode(&m); err != nil { + log.Println("webmention queue:", err.Error()) + dequeue() + return } - wg.Done() - }() + if err := a.verifyMention(&m); err != nil { + log.Println(fmt.Sprintf("Failed to verify webmention from %s to %s: %s", m.Source, m.Target, err.Error())) + } + dequeue() + }) } func (a *goBlog) queueMention(m *mention) error {