From 004c4fd2c1929717f0e1be3e503fbd5e5d8b7238 Mon Sep 17 00:00:00 2001 From: Jan-Lukas Else Date: Wed, 25 Nov 2020 12:36:14 +0100 Subject: [PATCH] Verify webmentions using a queue --- activityPubSending.go | 3 +- main.go | 6 +- webmention.go | 28 +++++----- webmentionVerification.go | 114 ++++++++++++++++++++++++-------------- 4 files changed, 92 insertions(+), 59 deletions(-) diff --git a/activityPubSending.go b/activityPubSending.go index 9c688c1..728dbb0 100644 --- a/activityPubSending.go +++ b/activityPubSending.go @@ -83,12 +83,11 @@ func apQueueSendSigned(blogIri, to string, activity interface{}) error { if err != nil { return err } - err = apQueue.Enqueue(&apRequest{ + return apQueue.Enqueue(&apRequest{ BlogIri: blogIri, To: to, Activity: body, }) - return err } func apSendSigned(blogIri, to string, activity []byte) error { diff --git a/main.go b/main.go index a858268..ccc65a8 100644 --- a/main.go +++ b/main.go @@ -59,8 +59,12 @@ func main() { log.Fatal(err) return } + err = initWebmention() + if err != nil { + log.Fatal(err) + return + } initTelegram() - initWebmention() initNodeInfo() // Start cron hooks diff --git a/webmention.go b/webmention.go index a505e37..7cb1c0b 100644 --- a/webmention.go +++ b/webmention.go @@ -32,7 +32,7 @@ type mention struct { Author string } -func initWebmention() { +func initWebmention() error { // Add hooks hookFunc := func(p *post) { p.sendWebmentions() @@ -41,7 +41,7 @@ func initWebmention() { postHooks[postUpdateHook] = append(postHooks[postUpdateHook], hookFunc) postHooks[postDeleteHook] = append(postHooks[postDeleteHook], hookFunc) // Start verifier - startWebmentionVerifier() + return initWebmentionQueue() } func handleWebmention(w http.ResponseWriter, r *http.Request) { @@ -54,7 +54,7 @@ func handleWebmention(w http.ResponseWriter, r *http.Request) { http.Error(w, "target not allowed", http.StatusBadRequest) return } - if err = createWebmention(m.Source, m.Target); err != nil { + if err = queueMention(m); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -71,13 +71,14 @@ func extractMention(r *http.Request) (*mention, error) { return nil, err } source := r.Form.Get("source") - target := r.Form.Get("target") + target := unescapedPath(r.Form.Get("target")) if source == "" || target == "" || !isAbsoluteURL(source) || !isAbsoluteURL(target) { return nil, errors.New("Invalid request") } return &mention{ - Source: source, - Target: target, + Source: source, + Target: target, + Created: time.Now().Unix(), }, nil } @@ -138,7 +139,7 @@ func webmentionAdminApprove(w http.ResponseWriter, r *http.Request) { func webmentionExists(source, target string) bool { result := 0 - row, err := appDbQueryRow("select exists(select 1 from webmentions where source = ? and target = ?)", source, unescapedPath(target)) + row, err := appDbQueryRow("select exists(select 1 from webmentions where source = ? and target = ?)", source, target) if err != nil { return false } @@ -149,16 +150,15 @@ func webmentionExists(source, target string) bool { } func createWebmention(source, target string) (err error) { - if webmentionExists(source, target) { - _, err = appDbExec("update webmentions set status = ? where source = ? and target = ?", webmentionStatusRenew, source, unescapedPath(target)) - } else { - _, err = appDbExec("insert into webmentions (source, target, created) values (?, ?, ?)", source, unescapedPath(target), time.Now().Unix()) - } - return err + return queueMention(&mention{ + Source: source, + Target: unescapedPath(target), + Created: time.Now().Unix(), + }) } func deleteWebmention(id int) error { - _, err := appDbExec("delete from webmentions where id = ?", id) + _, err := appDbExec("delete from webmentions where id = @id", sql.Named("id", id)) return err } diff --git a/webmentionVerification.go b/webmentionVerification.go index cf9bd56..18e27a7 100644 --- a/webmentionVerification.go +++ b/webmentionVerification.go @@ -6,56 +6,65 @@ import ( "errors" "fmt" "io" + "log" "net/http" "net/url" + "os" "strings" - "time" "github.com/PuerkitoBio/goquery" + "github.com/joncrlsn/dque" "willnorris.com/go/microformats" ) -func startWebmentionVerifier() { +var wmQueue *dque.DQue + +func wmMentionBuilder() interface{} { + return &mention{} +} + +func initWebmentionQueue() (err error) { + queuePath := "queues" + if _, err := os.Stat(queuePath); os.IsNotExist(err) { + os.Mkdir(queuePath, 0755) + } + wmQueue, err = dque.NewOrOpen("webmention", queuePath, 5, wmMentionBuilder) + if err != nil { + return err + } + startWebmentionQueue() + return nil +} + +func startWebmentionQueue() { go func() { for { - time.Sleep(30 * time.Second) - verifyNextWebmention() + if i, err := wmQueue.PeekBlock(); err == nil { + if i == nil { + // Empty request + _, _ = wmQueue.Dequeue() + continue + } + if m, ok := i.(*mention); ok { + err = m.verifyMention() + if err != nil { + log.Println(fmt.Sprintf("Failed to verify webmention from %s to %s: %s", m.Source, m.Target, err.Error())) + } + _, _ = wmQueue.Dequeue() + } else { + // Invalid type + _, _ = wmQueue.Dequeue() + } + } } }() } -func verifyNextWebmention() error { - m := &mention{} - oldStatus := "" - row, err := appDbQueryRow("select id, source, target, status from webmentions where (status = ? or status = ?) limit 1", webmentionStatusNew, webmentionStatusRenew) - if err != nil { - return err - } - if err := row.Scan(&m.ID, &m.Source, &m.Target, &oldStatus); err == sql.ErrNoRows { - return nil - } else if err != nil { - return err - } - if err := wmVerify(m); err != nil { - // Invalid - return deleteWebmention(m.ID) - } - if len(m.Content) > 500 { - m.Content = m.Content[0:497] + "…" - } - newStatus := webmentionStatusVerified - if strings.HasPrefix(m.Source, appConfig.Server.PublicAddress) { - // Approve if it's server-intern - newStatus = webmentionStatusApproved - } - _, err = appDbExec("update webmentions set status = ?, title = ?, content = ?, author = ? where id = ?", newStatus, m.Title, m.Content, m.Author, m.ID) - if oldStatus == string(webmentionStatusNew) { - sendNotification(fmt.Sprintf("New webmention from %s to %s", m.Source, m.Target)) - } - return err +func queueMention(m *mention) error { + return wmQueue.Enqueue(m) } -func wmVerify(m *mention) error { +func (m *mention) verifyMention() error { req, err := http.NewRequest(http.MethodGet, m.Source, nil) if err != nil { return err @@ -65,11 +74,32 @@ func wmVerify(m *mention) error { if err != nil { return err } - defer resp.Body.Close() - return wmVerifyReader(resp.Body, m) + err = m.verifyReader(resp.Body) + _ = resp.Body.Close() + if err != nil { + _, err := appDbExec("delete from webmentions where source = @source and target = @target", sql.Named("source", m.Source), sql.Named("target", m.Target)) + return err + } + if len(m.Content) > 500 { + m.Content = m.Content[0:497] + "…" + } + newStatus := webmentionStatusVerified + if strings.HasPrefix(m.Source, appConfig.Server.PublicAddress) { + // Approve if it's server-intern + newStatus = webmentionStatusApproved + } + if webmentionExists(m.Source, m.Target) { + _, err = appDbExec("update webmentions set status = @status, title = @title, content = @content, author = @author where source = @source and target = @target", + sql.Named("status", newStatus), sql.Named("title", m.Title), sql.Named("content", m.Content), sql.Named("author", m.Author), sql.Named("source", m.Source), sql.Named("target", m.Target)) + } else { + _, err = appDbExec("insert into webmentions (source, target, created, status, title, content, author) values (@source, @target, @created, @status, @title, @content, @author)", + sql.Named("source", m.Source), sql.Named("target", m.Target), sql.Named("created", m.Created), sql.Named("status", newStatus), sql.Named("title", m.Title), sql.Named("content", m.Content), sql.Named("author", m.Author)) + sendNotification(fmt.Sprintf("New webmention from %s to %s", m.Source, m.Target)) + } + return err } -func wmVerifyReader(body io.Reader, m *mention) error { +func (m *mention) verifyReader(body io.Reader) error { var linksBuffer, gqBuffer, mfBuffer bytes.Buffer io.Copy(io.MultiWriter(&linksBuffer, &gqBuffer, &mfBuffer), body) // Check if source mentions target @@ -100,17 +130,17 @@ func wmVerifyReader(body io.Reader, m *mention) error { if err != nil { return err } - mfFillMentionFromData(m, microformats.Parse(&mfBuffer, sourceURL)) + m.fillFromData(microformats.Parse(&mfBuffer, sourceURL)) return nil } -func mfFillMentionFromData(m *mention, mf *microformats.Data) { +func (m *mention) fillFromData(mf *microformats.Data) { for _, i := range mf.Items { - mfFillMention(m, i) + m.fill(i) } } -func mfFillMention(m *mention, mf *microformats.Microformat) bool { +func (m *mention) fill(mf *microformats.Microformat) bool { if mfHasType(mf, "h-entry") { if name, ok := mf.Properties["name"]; ok && len(name) > 0 { if title, ok := name[0].(string); ok { @@ -136,7 +166,7 @@ func mfFillMention(m *mention, mf *microformats.Microformat) bool { return true } else if len(mf.Children) > 0 { for _, mfc := range mf.Children { - if mfFillMention(m, mfc) { + if m.fill(mfc) { return true } }