From 5f969f8ba81734f3caddf9d3140a856890b0beb4 Mon Sep 17 00:00:00 2001 From: Jan-Lukas Else Date: Thu, 31 Mar 2022 14:55:36 +0200 Subject: [PATCH] Fix un-deletion for activitypub / mastodon, improve queue handling --- activityPub.go | 36 +++++++++++++++++------------------ activityPubSending.go | 4 ++-- activityStreams.go | 12 +++++++++++- app.go | 2 ++ blogroll.go | 3 ++- geoTrack.go | 3 +-- go.mod | 3 ++- go.sum | 4 ++-- micropub.go | 2 +- nodeinfo.go | 5 +++-- pkgs/mp3merge/mp3merge.go | 2 +- queue.go | 40 ++++++++++++++++++++++++++------------- queue_test.go | 28 +++++++++++++-------------- utils.go | 4 ++++ utils_test.go | 10 ++++++++++ webmentionVerification.go | 2 +- webmention_test.go | 3 ++- 17 files changed, 102 insertions(+), 61 deletions(-) diff --git a/activityPub.go b/activityPub.go index 64eb5de..c33cf0f 100644 --- a/activityPub.go +++ b/activityPub.go @@ -300,7 +300,7 @@ func (a *goBlog) apPost(p *post) { a.apSendToAllFollowers(p.Blog, map[string]any{ "@context": []string{asContext}, "actor": a.apIri(a.cfg.Blogs[p.Blog]), - "id": a.fullPostURL(p), + "id": a.activityPubId(p), "published": n.Published, "type": "Create", "object": n, @@ -311,7 +311,7 @@ func (a *goBlog) apUpdate(p *post) { a.apSendToAllFollowers(p.Blog, map[string]any{ "@context": []string{asContext}, "actor": a.apIri(a.cfg.Blogs[p.Blog]), - "id": a.fullPostURL(p), + "id": a.activityPubId(p), "published": time.Now().Format("2006-01-02T15:04:05-07:00"), "type": "Update", "object": a.toASNote(p), @@ -323,22 +323,22 @@ func (a *goBlog) apDelete(p *post) { "@context": []string{asContext}, "actor": a.apIri(a.cfg.Blogs[p.Blog]), "type": "Delete", - "object": a.fullPostURL(p), + "object": a.activityPubId(p), }) } func (a *goBlog) apUndelete(p *post) { - a.apSendToAllFollowers(p.Blog, map[string]any{ - "@context": []string{asContext}, - "actor": a.apIri(a.cfg.Blogs[p.Blog]), - "type": "Undo", - "object": map[string]any{ - "@context": []string{asContext}, - "actor": a.apIri(a.cfg.Blogs[p.Blog]), - "type": "Delete", - "object": a.fullPostURL(p), - }, - }) + // The optimal way to do this would be to send a "Undo Delete" activity, + // but that doesn't work with Mastodon yet. + // see: + // https://socialhub.activitypub.rocks/t/soft-deletes-and-restoring-deleted-posts/2318 + // https://github.com/mastodon/mastodon/issues/17553 + + // Update "activityPubVersion" parameter to current timestamp in nanoseconds + p.Parameters[activityPubVersionParam] = []string{fmt.Sprintf("%d", utcNowNanos())} + a.db.replacePostParam(p.Path, activityPubVersionParam, p.Parameters[activityPubVersionParam]) + // Post as new post + a.apPost(p) } func (a *goBlog) apAccept(blogName string, blog *configBlog, follow map[string]any) { @@ -373,7 +373,7 @@ func (a *goBlog) apAccept(blogName string, blog *configBlog, follow map[string]a "object": follow, } _, accept["id"] = a.apNewID(blog) - _ = a.db.apQueueSendSigned(a.apIri(blog), inbox, accept) + _ = a.apQueueSendSigned(a.apIri(blog), inbox, accept) } func (a *goBlog) apSendToAllFollowers(blog string, activity any) { @@ -382,13 +382,13 @@ func (a *goBlog) apSendToAllFollowers(blog string, activity any) { log.Println("Failed to retrieve inboxes:", err.Error()) return } - a.db.apSendTo(a.apIri(a.cfg.Blogs[blog]), activity, inboxes) + a.apSendTo(a.apIri(a.cfg.Blogs[blog]), activity, inboxes) } -func (db *database) apSendTo(blogIri string, activity any, inboxes []string) { +func (a *goBlog) apSendTo(blogIri string, activity any, inboxes []string) { for _, i := range inboxes { go func(inbox string) { - _ = db.apQueueSendSigned(blogIri, inbox, activity) + _ = a.apQueueSendSigned(blogIri, inbox, activity) }(i) } } diff --git a/activityPubSending.go b/activityPubSending.go index 027e52e..a62cf65 100644 --- a/activityPubSending.go +++ b/activityPubSending.go @@ -47,7 +47,7 @@ func (a *goBlog) initAPSendQueue() { }) } -func (db *database) apQueueSendSigned(blogIri, to string, activity any) error { +func (a *goBlog) apQueueSendSigned(blogIri, to string, activity any) error { body, err := json.Marshal(activity) if err != nil { return err @@ -61,7 +61,7 @@ func (db *database) apQueueSendSigned(blogIri, to string, activity any) error { }).encode(buf); err != nil { return err } - return db.enqueue("ap", buf.Bytes(), time.Now()) + return a.enqueue("ap", buf.Bytes(), time.Now()) } func (r *apRequest) encode(w io.Writer) error { diff --git a/activityStreams.go b/activityStreams.go index f6e5472..148b366 100644 --- a/activityStreams.go +++ b/activityStreams.go @@ -101,7 +101,7 @@ func (a *goBlog) toASNote(p *post) *asNote { Context: []string{asContext}, To: []string{"https://www.w3.org/ns/activitystreams#Public"}, MediaType: contenttype.HTML, - ID: a.fullPostURL(p), + ID: a.activityPubId(p), URL: a.fullPostURL(p), AttributedTo: a.apIri(a.cfg.Blogs[p.Blog]), } @@ -152,6 +152,16 @@ func (a *goBlog) toASNote(p *post) *asNote { return as } +const activityPubVersionParam = "activitypubversion" + +func (a *goBlog) activityPubId(p *post) string { + fu := a.fullPostURL(p) + if version := p.firstParameter(activityPubVersionParam); version != "" { + return fu + "?activitypubversion=" + version + } + return fu +} + func (a *goBlog) serveActivityStreams(blog string, w http.ResponseWriter, r *http.Request) { b := a.cfg.Blogs[blog] publicKeyDer, err := x509.MarshalPKIXPublicKey(&(a.apPrivateKey.PublicKey)) diff --git a/app.go b/app.go index 6d71c5d..dba0b6b 100644 --- a/app.go +++ b/app.go @@ -73,6 +73,8 @@ type goBlog struct { regexRedirects []*regexRedirect // Sessions loginSessions, captchaSessions *dbSessionStore + // Queue triggers + queueTriggers []chan struct{} // Shutdown shutdown shutdowner.Shutdowner // Template strings diff --git a/blogroll.go b/blogroll.go index 492d510..e40e85a 100644 --- a/blogroll.go +++ b/blogroll.go @@ -3,6 +3,7 @@ package main import ( "bytes" "context" + "io" "log" "net/http" "sort" @@ -66,7 +67,7 @@ func (a *goBlog) serveBlogrollExport(w http.ResponseWriter, r *http.Request) { return } w.Header().Set(contentType, contenttype.XMLUTF8) - _, _ = opmlBuf.WriteTo(w) + _, _ = io.Copy(w, opmlBuf) } func (a *goBlog) getBlogrollOutlines(blog string) ([]*opml.Outline, error) { diff --git a/geoTrack.go b/geoTrack.go index 1aafd0b..1f16262 100644 --- a/geoTrack.go +++ b/geoTrack.go @@ -5,7 +5,6 @@ import ( "errors" "log" "math" - "strings" "github.com/tkrajina/gpxgo/gpx" "golang.org/x/text/language" @@ -102,7 +101,7 @@ func trackParseGPX(gpxString string) (result *trackParseResult, err error) { points []*trackPoint } - result.gpxData, err = gpx.Parse(strings.NewReader(gpxString)) + result.gpxData, err = gpx.ParseString(gpxString) if err != nil { return nil, err } diff --git a/go.mod b/go.mod index a04307d..f4ffa51 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,8 @@ require ( github.com/spf13/viper v1.10.1 github.com/stretchr/testify v1.7.1 github.com/tdewolff/minify/v2 v2.10.0 - github.com/tkrajina/gpxgo v1.2.1 + // master + github.com/tkrajina/gpxgo v1.2.2-0.20220217201249-321f19554eec github.com/tomnomnom/linkheader v0.0.0-20180905144013-02ca5825eb80 github.com/vcraescu/go-paginator v1.0.1-0.20201114172518-2cfc59fe05c2 github.com/yuin/goldmark v1.4.11 diff --git a/go.sum b/go.sum index e6528dc..e6860f4 100644 --- a/go.sum +++ b/go.sum @@ -438,8 +438,8 @@ github.com/tdewolff/parse/v2 v2.5.27/go.mod h1:WzaJpRSbwq++EIQHYIRTpbYKNA3gn9it1 github.com/tdewolff/test v1.0.6 h1:76mzYJQ83Op284kMT+63iCNCI7NEERsIN8dLM+RiKr4= github.com/tdewolff/test v1.0.6/go.mod h1:6DAvZliBAAnD7rhVgwaM7DE5/d9NMOAJ09SqYqeK4QE= github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M= -github.com/tkrajina/gpxgo v1.2.1 h1:MJJtT4Re5btDGg89brFDrUP3EWz+cBmyo8pQwV0ZOak= -github.com/tkrajina/gpxgo v1.2.1/go.mod h1:795sjVRFo5wWyN6oOZp0RYienGGBJjpAlgOz2nCngA0= +github.com/tkrajina/gpxgo v1.2.2-0.20220217201249-321f19554eec h1:o5aL1yX+/xzvK4QfZe/iDcCoRrFoOQ/Dn43jV/thKBM= +github.com/tkrajina/gpxgo v1.2.2-0.20220217201249-321f19554eec/go.mod h1:795sjVRFo5wWyN6oOZp0RYienGGBJjpAlgOz2nCngA0= github.com/tomnomnom/linkheader v0.0.0-20180905144013-02ca5825eb80 h1:nrZ3ySNYwJbSpD6ce9duiP+QkD3JuLCcWkdaehUS/3Y= github.com/tomnomnom/linkheader v0.0.0-20180905144013-02ca5825eb80/go.mod h1:iFyPdL66DjUD96XmzVL3ZntbzcflLnznH0fr99w5VqE= github.com/u-root/uio v0.0.0-20210528114334-82958018845c h1:BFvcl34IGnw8yvJi8hlqLFo9EshRInwWBs2M5fGWzQA= diff --git a/micropub.go b/micropub.go index f5deea7..0e71676 100644 --- a/micropub.go +++ b/micropub.go @@ -81,7 +81,7 @@ func (a *goBlog) serveMicropubQuery(w http.ResponseWriter, r *http.Request) { return } w.Header().Set(contentType, contenttype.JSONUTF8) - _, _ = buf.WriteTo(w) + _, _ = io.Copy(w, buf) } func (a *goBlog) serveMicropubPost(w http.ResponseWriter, r *http.Request) { diff --git a/nodeinfo.go b/nodeinfo.go index 3d1746a..543c455 100644 --- a/nodeinfo.go +++ b/nodeinfo.go @@ -2,6 +2,7 @@ package main import ( "encoding/json" + "io" "net/http" "go.goblog.app/app/pkgs/bufferpool" @@ -25,7 +26,7 @@ func (a *goBlog) serveNodeInfoDiscover(w http.ResponseWriter, r *http.Request) { } w.Header().Set(contentType, contenttype.JSONUTF8) mw := a.min.Writer(contenttype.JSON, w) - _, _ = buf.WriteTo(mw) + _, _ = io.Copy(mw, buf) _ = mw.Close() } @@ -60,6 +61,6 @@ func (a *goBlog) serveNodeInfo(w http.ResponseWriter, r *http.Request) { } w.Header().Set(contentType, contenttype.JSONUTF8) mw := a.min.Writer(contenttype.JSON, w) - _, _ = buf.WriteTo(mw) + _, _ = io.Copy(mw, buf) _ = mw.Close() } diff --git a/pkgs/mp3merge/mp3merge.go b/pkgs/mp3merge/mp3merge.go index b043b9a..fb05cb9 100644 --- a/pkgs/mp3merge/mp3merge.go +++ b/pkgs/mp3merge/mp3merge.go @@ -74,6 +74,6 @@ func MergeMP3(out io.Writer, in ...io.Reader) error { } // Copy the temporary output to the output - _, err := tmpOut.WriteTo(out) + _, err := io.Copy(out, tmpOut) return err } diff --git a/queue.go b/queue.go index ff05a54..6caae12 100644 --- a/queue.go +++ b/queue.go @@ -18,21 +18,28 @@ type queueItem struct { id int } -func (db *database) enqueue(name string, content []byte, schedule time.Time) error { +func (a *goBlog) enqueue(name string, content []byte, schedule time.Time) error { if len(content) == 0 { return errors.New("empty content") } - _, err := db.exec( + _, err := a.db.exec( "insert into queue (name, content, schedule) values (@name, @content, @schedule)", sql.Named("name", name), sql.Named("content", content), sql.Named("schedule", schedule.UTC().Format(time.RFC3339Nano)), ) - return err + if err != nil { + return err + } + // Trigger all queue listeners + for _, trigger := range a.queueTriggers { + trigger <- struct{}{} + } + return nil } -func (db *database) reschedule(qi *queueItem, dur time.Duration) error { - _, err := db.exec( +func (a *goBlog) reschedule(qi *queueItem, dur time.Duration) error { + _, err := a.db.exec( "update queue set schedule = @schedule, content = @content where id = @id", sql.Named("schedule", qi.schedule.Add(dur).UTC().Format(time.RFC3339Nano)), sql.Named("content", qi.content), @@ -41,13 +48,13 @@ func (db *database) reschedule(qi *queueItem, dur time.Duration) error { return err } -func (db *database) dequeue(qi *queueItem) error { - _, err := db.exec("delete from queue where id = @id", sql.Named("id", qi.id)) +func (a *goBlog) dequeue(qi *queueItem) error { + _, err := a.db.exec("delete from queue where id = @id", sql.Named("id", qi.id)) return err } -func (db *database) peekQueue(ctx context.Context, name string) (*queueItem, error) { - row, err := db.queryRowContext( +func (a *goBlog) peekQueue(ctx context.Context, name string) (*queueItem, error) { + row, err := a.db.queryRowContext( ctx, "select id, name, content, schedule from queue where schedule <= @schedule and name = @name order by schedule asc limit 1", sql.Named("name", name), @@ -75,6 +82,10 @@ func (db *database) peekQueue(ctx context.Context, name string) (*queueItem, err type queueProcessFunc func(qi *queueItem, dequeue func(), reschedule func(time.Duration)) func (a *goBlog) listenOnQueue(queueName string, wait time.Duration, process queueProcessFunc) { + // Queue trigger + trigger := make(chan struct{}) + a.queueTriggers = append(a.queueTriggers, trigger) + // Start goroutine to listen on queue go func() { done := false var wg sync.WaitGroup @@ -88,9 +99,9 @@ func (a *goBlog) listenOnQueue(queueName string, wait time.Duration, process que log.Println("Stopped queue:", queueName) }) for !done { - qi, err := a.db.peekQueue(ctx, queueName) + qi, err := a.peekQueue(ctx, queueName) if err != nil { - log.Println("queue error:", err.Error()) + log.Println("queue peek error:", err.Error()) continue } if qi == nil { @@ -98,19 +109,22 @@ func (a *goBlog) listenOnQueue(queueName string, wait time.Duration, process que select { case <-time.After(wait): continue + case <-trigger: + continue case <-ctx.Done(): + done = true continue } } process( qi, func() { - if err := a.db.dequeue(qi); err != nil { + if err := a.dequeue(qi); err != nil { log.Println("queue dequeue error:", err.Error()) } }, func(dur time.Duration) { - if err := a.db.reschedule(qi, dur); err != nil { + if err := a.reschedule(qi, dur); err != nil { log.Println("queue reschedule error:", err.Error()) } }, diff --git a/queue_test.go b/queue_test.go index 82a19fd..7537b3c 100644 --- a/queue_test.go +++ b/queue_test.go @@ -20,47 +20,46 @@ func Test_queue(t *testing.T) { } _ = app.initDatabase(false) defer app.db.close() - db := app.db time1 := time.Now() - err := db.enqueue("test", []byte(""), time.Now()) + err := app.enqueue("test", []byte(""), time.Now()) require.Error(t, err) - err = db.enqueue("test", []byte("1"), time1) + err = app.enqueue("test", []byte("1"), time1) require.NoError(t, err) - err = db.enqueue("test", []byte("2"), time.Now()) + err = app.enqueue("test", []byte("2"), time.Now()) require.NoError(t, err) - qi, err := db.peekQueue(context.Background(), "abc") + qi, err := app.peekQueue(context.Background(), "abc") require.NoError(t, err) require.Nil(t, qi) - qi, err = db.peekQueue(context.Background(), "test") + qi, err = app.peekQueue(context.Background(), "test") require.NoError(t, err) require.NotNil(t, qi) require.Equal(t, []byte("1"), qi.content) require.Equal(t, time1.UTC(), qi.schedule.UTC()) - err = db.reschedule(qi, 1*time.Second) + err = app.reschedule(qi, 1*time.Second) require.NoError(t, err) - qi, err = db.peekQueue(context.Background(), "test") + qi, err = app.peekQueue(context.Background(), "test") require.NoError(t, err) require.NotNil(t, qi) require.Equal(t, []byte("2"), qi.content) - err = db.dequeue(qi) + err = app.dequeue(qi) require.NoError(t, err) - qi, err = db.peekQueue(context.Background(), "test") + qi, err = app.peekQueue(context.Background(), "test") require.NoError(t, err) require.Nil(t, qi) time.Sleep(1 * time.Second) - qi, err = db.peekQueue(context.Background(), "test") + qi, err = app.peekQueue(context.Background(), "test") require.NoError(t, err) require.NotNil(t, qi) require.Equal(t, []byte("1"), qi.content) @@ -77,20 +76,19 @@ func Benchmark_queue(b *testing.B) { } _ = app.initDatabase(false) defer app.db.close() - db := app.db - err := db.enqueue("test", []byte("1"), time.Now()) + err := app.enqueue("test", []byte("1"), time.Now()) require.NoError(b, err) b.Run("Peek with item", func(b *testing.B) { for i := 0; i < b.N; i++ { - _, _ = db.peekQueue(context.Background(), "test") + _, _ = app.peekQueue(context.Background(), "test") } }) b.Run("Peek without item", func(b *testing.B) { for i := 0; i < b.N; i++ { - _, _ = db.peekQueue(context.Background(), "abc") + _, _ = app.peekQueue(context.Background(), "abc") } }) } diff --git a/utils.go b/utils.go index c5ef74d..a4a516e 100644 --- a/utils.go +++ b/utils.go @@ -193,6 +193,10 @@ func utcNowString() string { return time.Now().UTC().Format(time.RFC3339) } +func utcNowNanos() int64 { + return time.Now().UTC().UnixNano() +} + type stringPair struct { First, Second string } diff --git a/utils_test.go b/utils_test.go index af15daa..6b28f4f 100644 --- a/utils_test.go +++ b/utils_test.go @@ -144,3 +144,13 @@ func Test_lowerUnescaptedPath(t *testing.T) { assert.Equal(t, "/de/posts/fahrradanhänger", lowerUnescapedPath("/de/posts/fahrradanh%C3%84nger")) assert.Equal(t, "/de/posts/fahrradanhänger", lowerUnescapedPath("/de/posts/fahrradanhÄnger")) } + +func Fuzz_lowerUnescaptedPath(f *testing.F) { + f.Add("/de/posts/fahrradanh%C3%84nger") + f.Fuzz(func(t *testing.T, str string) { + out := lowerUnescapedPath(str) + if out == "" { + t.Error("Empty output") + } + }) +} diff --git a/webmentionVerification.go b/webmentionVerification.go index c909849..01a2c68 100644 --- a/webmentionVerification.go +++ b/webmentionVerification.go @@ -44,7 +44,7 @@ func (a *goBlog) queueMention(m *mention) error { if err := gob.NewEncoder(buf).Encode(m); err != nil { return err } - return a.db.enqueue("wm", buf.Bytes(), time.Now()) + return a.enqueue("wm", buf.Bytes(), time.Now()) } func (a *goBlog) verifyMention(m *mention) error { diff --git a/webmention_test.go b/webmention_test.go index 5adb70e..2488f42 100644 --- a/webmention_test.go +++ b/webmention_test.go @@ -76,10 +76,11 @@ func Test_webmentions(t *testing.T) { mentions = app.db.getWebmentionsByAddress("https://example.com/t%C3%A4st") assert.Len(t, mentions, 1) - app.db.deleteWebmention(&mention{ + err = app.db.deleteWebmention(&mention{ Source: "https://example.net/test", Target: "https://example.com/T%C3%84ST", }) + assert.NoError(t, err) mentions = app.db.getWebmentionsByAddress("https://example.com/täst") assert.Len(t, mentions, 0)