Verify webmentions using a queue

This commit is contained in:
Jan-Lukas Else 2020-11-25 12:36:14 +01:00
parent c6200001b9
commit 004c4fd2c1
4 changed files with 92 additions and 59 deletions

View File

@ -83,12 +83,11 @@ func apQueueSendSigned(blogIri, to string, activity interface{}) error {
if err != nil { if err != nil {
return err return err
} }
err = apQueue.Enqueue(&apRequest{ return apQueue.Enqueue(&apRequest{
BlogIri: blogIri, BlogIri: blogIri,
To: to, To: to,
Activity: body, Activity: body,
}) })
return err
} }
func apSendSigned(blogIri, to string, activity []byte) error { func apSendSigned(blogIri, to string, activity []byte) error {

View File

@ -59,8 +59,12 @@ func main() {
log.Fatal(err) log.Fatal(err)
return return
} }
err = initWebmention()
if err != nil {
log.Fatal(err)
return
}
initTelegram() initTelegram()
initWebmention()
initNodeInfo() initNodeInfo()
// Start cron hooks // Start cron hooks

View File

@ -32,7 +32,7 @@ type mention struct {
Author string Author string
} }
func initWebmention() { func initWebmention() error {
// Add hooks // Add hooks
hookFunc := func(p *post) { hookFunc := func(p *post) {
p.sendWebmentions() p.sendWebmentions()
@ -41,7 +41,7 @@ func initWebmention() {
postHooks[postUpdateHook] = append(postHooks[postUpdateHook], hookFunc) postHooks[postUpdateHook] = append(postHooks[postUpdateHook], hookFunc)
postHooks[postDeleteHook] = append(postHooks[postDeleteHook], hookFunc) postHooks[postDeleteHook] = append(postHooks[postDeleteHook], hookFunc)
// Start verifier // Start verifier
startWebmentionVerifier() return initWebmentionQueue()
} }
func handleWebmention(w http.ResponseWriter, r *http.Request) { func handleWebmention(w http.ResponseWriter, r *http.Request) {
@ -54,7 +54,7 @@ func handleWebmention(w http.ResponseWriter, r *http.Request) {
http.Error(w, "target not allowed", http.StatusBadRequest) http.Error(w, "target not allowed", http.StatusBadRequest)
return return
} }
if err = createWebmention(m.Source, m.Target); err != nil { if err = queueMention(m); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
@ -71,13 +71,14 @@ func extractMention(r *http.Request) (*mention, error) {
return nil, err return nil, err
} }
source := r.Form.Get("source") source := r.Form.Get("source")
target := r.Form.Get("target") target := unescapedPath(r.Form.Get("target"))
if source == "" || target == "" || !isAbsoluteURL(source) || !isAbsoluteURL(target) { if source == "" || target == "" || !isAbsoluteURL(source) || !isAbsoluteURL(target) {
return nil, errors.New("Invalid request") return nil, errors.New("Invalid request")
} }
return &mention{ return &mention{
Source: source, Source: source,
Target: target, Target: target,
Created: time.Now().Unix(),
}, nil }, nil
} }
@ -138,7 +139,7 @@ func webmentionAdminApprove(w http.ResponseWriter, r *http.Request) {
func webmentionExists(source, target string) bool { func webmentionExists(source, target string) bool {
result := 0 result := 0
row, err := appDbQueryRow("select exists(select 1 from webmentions where source = ? and target = ?)", source, unescapedPath(target)) row, err := appDbQueryRow("select exists(select 1 from webmentions where source = ? and target = ?)", source, target)
if err != nil { if err != nil {
return false return false
} }
@ -149,16 +150,15 @@ func webmentionExists(source, target string) bool {
} }
func createWebmention(source, target string) (err error) { func createWebmention(source, target string) (err error) {
if webmentionExists(source, target) { return queueMention(&mention{
_, err = appDbExec("update webmentions set status = ? where source = ? and target = ?", webmentionStatusRenew, source, unescapedPath(target)) Source: source,
} else { Target: unescapedPath(target),
_, err = appDbExec("insert into webmentions (source, target, created) values (?, ?, ?)", source, unescapedPath(target), time.Now().Unix()) Created: time.Now().Unix(),
} })
return err
} }
func deleteWebmention(id int) error { func deleteWebmention(id int) error {
_, err := appDbExec("delete from webmentions where id = ?", id) _, err := appDbExec("delete from webmentions where id = @id", sql.Named("id", id))
return err return err
} }

View File

@ -6,56 +6,65 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"log"
"net/http" "net/http"
"net/url" "net/url"
"os"
"strings" "strings"
"time"
"github.com/PuerkitoBio/goquery" "github.com/PuerkitoBio/goquery"
"github.com/joncrlsn/dque"
"willnorris.com/go/microformats" "willnorris.com/go/microformats"
) )
func startWebmentionVerifier() { var wmQueue *dque.DQue
func wmMentionBuilder() interface{} {
return &mention{}
}
func initWebmentionQueue() (err error) {
queuePath := "queues"
if _, err := os.Stat(queuePath); os.IsNotExist(err) {
os.Mkdir(queuePath, 0755)
}
wmQueue, err = dque.NewOrOpen("webmention", queuePath, 5, wmMentionBuilder)
if err != nil {
return err
}
startWebmentionQueue()
return nil
}
func startWebmentionQueue() {
go func() { go func() {
for { for {
time.Sleep(30 * time.Second) if i, err := wmQueue.PeekBlock(); err == nil {
verifyNextWebmention() if i == nil {
// Empty request
_, _ = wmQueue.Dequeue()
continue
}
if m, ok := i.(*mention); ok {
err = m.verifyMention()
if err != nil {
log.Println(fmt.Sprintf("Failed to verify webmention from %s to %s: %s", m.Source, m.Target, err.Error()))
}
_, _ = wmQueue.Dequeue()
} else {
// Invalid type
_, _ = wmQueue.Dequeue()
}
}
} }
}() }()
} }
func verifyNextWebmention() error { func queueMention(m *mention) error {
m := &mention{} return wmQueue.Enqueue(m)
oldStatus := ""
row, err := appDbQueryRow("select id, source, target, status from webmentions where (status = ? or status = ?) limit 1", webmentionStatusNew, webmentionStatusRenew)
if err != nil {
return err
}
if err := row.Scan(&m.ID, &m.Source, &m.Target, &oldStatus); err == sql.ErrNoRows {
return nil
} else if err != nil {
return err
}
if err := wmVerify(m); err != nil {
// Invalid
return deleteWebmention(m.ID)
}
if len(m.Content) > 500 {
m.Content = m.Content[0:497] + "…"
}
newStatus := webmentionStatusVerified
if strings.HasPrefix(m.Source, appConfig.Server.PublicAddress) {
// Approve if it's server-intern
newStatus = webmentionStatusApproved
}
_, err = appDbExec("update webmentions set status = ?, title = ?, content = ?, author = ? where id = ?", newStatus, m.Title, m.Content, m.Author, m.ID)
if oldStatus == string(webmentionStatusNew) {
sendNotification(fmt.Sprintf("New webmention from %s to %s", m.Source, m.Target))
}
return err
} }
func wmVerify(m *mention) error { func (m *mention) verifyMention() error {
req, err := http.NewRequest(http.MethodGet, m.Source, nil) req, err := http.NewRequest(http.MethodGet, m.Source, nil)
if err != nil { if err != nil {
return err return err
@ -65,11 +74,32 @@ func wmVerify(m *mention) error {
if err != nil { if err != nil {
return err return err
} }
defer resp.Body.Close() err = m.verifyReader(resp.Body)
return wmVerifyReader(resp.Body, m) _ = resp.Body.Close()
if err != nil {
_, err := appDbExec("delete from webmentions where source = @source and target = @target", sql.Named("source", m.Source), sql.Named("target", m.Target))
return err
}
if len(m.Content) > 500 {
m.Content = m.Content[0:497] + "…"
}
newStatus := webmentionStatusVerified
if strings.HasPrefix(m.Source, appConfig.Server.PublicAddress) {
// Approve if it's server-intern
newStatus = webmentionStatusApproved
}
if webmentionExists(m.Source, m.Target) {
_, err = appDbExec("update webmentions set status = @status, title = @title, content = @content, author = @author where source = @source and target = @target",
sql.Named("status", newStatus), sql.Named("title", m.Title), sql.Named("content", m.Content), sql.Named("author", m.Author), sql.Named("source", m.Source), sql.Named("target", m.Target))
} else {
_, err = appDbExec("insert into webmentions (source, target, created, status, title, content, author) values (@source, @target, @created, @status, @title, @content, @author)",
sql.Named("source", m.Source), sql.Named("target", m.Target), sql.Named("created", m.Created), sql.Named("status", newStatus), sql.Named("title", m.Title), sql.Named("content", m.Content), sql.Named("author", m.Author))
sendNotification(fmt.Sprintf("New webmention from %s to %s", m.Source, m.Target))
}
return err
} }
func wmVerifyReader(body io.Reader, m *mention) error { func (m *mention) verifyReader(body io.Reader) error {
var linksBuffer, gqBuffer, mfBuffer bytes.Buffer var linksBuffer, gqBuffer, mfBuffer bytes.Buffer
io.Copy(io.MultiWriter(&linksBuffer, &gqBuffer, &mfBuffer), body) io.Copy(io.MultiWriter(&linksBuffer, &gqBuffer, &mfBuffer), body)
// Check if source mentions target // Check if source mentions target
@ -100,17 +130,17 @@ func wmVerifyReader(body io.Reader, m *mention) error {
if err != nil { if err != nil {
return err return err
} }
mfFillMentionFromData(m, microformats.Parse(&mfBuffer, sourceURL)) m.fillFromData(microformats.Parse(&mfBuffer, sourceURL))
return nil return nil
} }
func mfFillMentionFromData(m *mention, mf *microformats.Data) { func (m *mention) fillFromData(mf *microformats.Data) {
for _, i := range mf.Items { for _, i := range mf.Items {
mfFillMention(m, i) m.fill(i)
} }
} }
func mfFillMention(m *mention, mf *microformats.Microformat) bool { func (m *mention) fill(mf *microformats.Microformat) bool {
if mfHasType(mf, "h-entry") { if mfHasType(mf, "h-entry") {
if name, ok := mf.Properties["name"]; ok && len(name) > 0 { if name, ok := mf.Properties["name"]; ok && len(name) > 0 {
if title, ok := name[0].(string); ok { if title, ok := name[0].(string); ok {
@ -136,7 +166,7 @@ func mfFillMention(m *mention, mf *microformats.Microformat) bool {
return true return true
} else if len(mf.Children) > 0 { } else if len(mf.Children) > 0 {
for _, mfc := range mf.Children { for _, mfc := range mf.Children {
if mfFillMention(m, mfc) { if m.fill(mfc) {
return true return true
} }
} }