Save shared inbox to reduce amount of messages

This commit is contained in:
Jan-Lukas Else 2020-11-25 11:29:36 +01:00
parent 899caf9aaa
commit c6200001b9
5 changed files with 41 additions and 27 deletions

View File

@ -258,21 +258,21 @@ func apGetRemoteActor(iri string) (*asPerson, int, error) {
return actor, 0, nil return actor, 0, nil
} }
func apGetAllFollowers(blog string) (map[string]string, error) { func apGetAllInboxes(blog string) ([]string, error) {
rows, err := appDbQuery("select follower, inbox from activitypub_followers where blog = @blog", sql.Named("blog", blog)) rows, err := appDbQuery("select distinct inbox from activitypub_followers where blog = @blog", sql.Named("blog", blog))
if err != nil { if err != nil {
return nil, err return nil, err
} }
followers := map[string]string{} inboxes := []string{}
for rows.Next() { for rows.Next() {
var follower, inbox string var inbox string
err = rows.Scan(&follower, &inbox) err = rows.Scan(&inbox)
if err != nil { if err != nil {
return nil, err return nil, err
} }
followers[follower] = inbox inboxes = append(inboxes, inbox)
} }
return followers, nil return inboxes, nil
} }
func apAddFollower(blog, follower, inbox string) error { func apAddFollower(blog, follower, inbox string) error {
@ -285,6 +285,11 @@ func apRemoveFollower(blog, follower string) error {
return err return err
} }
func apRemoveInbox(inbox string) error {
_, err := appDbExec("delete from activitypub_followers where inbox = @inbox", sql.Named("inbox", inbox))
return err
}
func (p *post) apPost() { func (p *post) apPost() {
n := p.toASNote() n := p.toASNote()
createActivity := make(map[string]interface{}) createActivity := make(map[string]interface{})
@ -354,7 +359,11 @@ func apAccept(blogName string, blog *configBlog, follow map[string]interface{})
return return
} }
// Add or update follower // Add or update follower
apAddFollower(blogName, follower.ID, follower.Inbox) inbox := follower.Inbox
if endpoints := follower.Endpoints; endpoints != nil && endpoints.SharedInbox != "" {
inbox = endpoints.SharedInbox
}
apAddFollower(blogName, follower.ID, inbox)
// remove @context from the inner activity // remove @context from the inner activity
delete(follow, "@context") delete(follow, "@context")
accept := make(map[string]interface{}) accept := make(map[string]interface{})
@ -368,17 +377,16 @@ func apAccept(blogName string, blog *configBlog, follow map[string]interface{})
} }
func apSendToAllFollowers(blog string, activity interface{}) { func apSendToAllFollowers(blog string, activity interface{}) {
followers, err := apGetAllFollowers(blog) inboxes, err := apGetAllInboxes(blog)
if err != nil { if err != nil {
log.Println("Failed to retrieve followers:", err.Error()) log.Println("Failed to retrieve inboxes:", err.Error())
return return
} }
apSendTo(appConfig.Blogs[blog].apIri(), activity, followers) apSendTo(appConfig.Blogs[blog].apIri(), activity, inboxes)
} }
func apSendTo(blogIri string, activity interface{}, followers map[string]string) { func apSendTo(blogIri string, activity interface{}, inboxes []string) {
for _, i := range followers { for _, i := range inboxes {
go func(inbox string) { go func(inbox string) {
apQueueSendSigned(blogIri, inbox, activity) apQueueSendSigned(blogIri, inbox, activity)
}(i) }(i)

View File

@ -32,7 +32,7 @@ func initAPSendQueue() (err error) {
if _, err := os.Stat(queuePath); os.IsNotExist(err) { if _, err := os.Stat(queuePath); os.IsNotExist(err) {
os.Mkdir(queuePath, 0755) os.Mkdir(queuePath, 0755)
} }
apQueue, err = dque.NewOrOpen("activitypub", queuePath, 5, apRequestBuilder) apQueue, err = dque.NewOrOpen("activitypub", queuePath, 1000, apRequestBuilder)
if err != nil { if err != nil {
return err return err
} }
@ -50,18 +50,19 @@ func startAPSendQueue() {
continue continue
} }
if r, ok := rInterface.(*apRequest); ok { if r, ok := rInterface.(*apRequest); ok {
if r.LastTry != 0 && time.Now().Before(time.Unix(r.LastTry, 0).Add(time.Duration(r.Try)*5*time.Minute)) { if r.LastTry != 0 && time.Now().Before(time.Unix(r.LastTry, 0).Add(time.Duration(r.Try)*10*time.Minute)) {
apQueue.Enqueue(r) apQueue.Enqueue(r)
} else { } else {
// Send request // Send request
if err := apSendSigned(r.BlogIri, r.To, r.Activity); err != nil { if err := apSendSigned(r.BlogIri, r.To, r.Activity); err != nil {
if r.Try++; r.Try < 4 { if r.Try++; r.Try < 21 {
// Try it again // Try it again
r.LastTry = time.Now().Unix() r.LastTry = time.Now().Unix()
apQueue.Enqueue(r) apQueue.Enqueue(r)
} else { } else {
log.Printf("Request to %s failed for the 3rd time", r.To) log.Printf("Request to %s failed for the 20th time", r.To)
log.Println() log.Println()
apRemoveInbox(r.To)
} }
} }
} }

View File

@ -49,6 +49,7 @@ type asPerson struct {
Icon *asAttachment `json:"icon,omitempty"` Icon *asAttachment `json:"icon,omitempty"`
Inbox string `json:"inbox,omitempty"` Inbox string `json:"inbox,omitempty"`
PublicKey *asPublicKey `json:"publicKey,omitempty"` PublicKey *asPublicKey `json:"publicKey,omitempty"`
Endpoints *asEndpoints `json:"endpoints,omitempty"`
} }
type asAttachment struct { type asAttachment struct {
@ -62,6 +63,10 @@ type asPublicKey struct {
PublicKeyPem string `json:"publicKeyPem,omitempty"` PublicKeyPem string `json:"publicKeyPem,omitempty"`
} }
type asEndpoints struct {
SharedInbox string `json:"sharedInbox,omitempty"`
}
func (p *post) serveActivityStreams(w http.ResponseWriter) { func (p *post) serveActivityStreams(w http.ResponseWriter) {
// Send JSON // Send JSON
w.Header().Add(contentType, contentTypeASUTF8) w.Header().Add(contentType, contentTypeASUTF8)

6
go.mod
View File

@ -14,7 +14,7 @@ require (
github.com/go-sql-driver/mysql v1.5.0 // indirect github.com/go-sql-driver/mysql v1.5.0 // indirect
github.com/gofrs/flock v0.8.0 // indirect github.com/gofrs/flock v0.8.0 // indirect
github.com/goodsign/monday v1.0.1-0.20201007115131-c065b60ec611 github.com/goodsign/monday v1.0.1-0.20201007115131-c065b60ec611
github.com/google/go-cmp v0.5.3 // indirect github.com/google/go-cmp v0.5.4 // indirect
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect
github.com/gorilla/feeds v1.1.1 github.com/gorilla/feeds v1.1.1
github.com/gorilla/handlers v1.5.1 github.com/gorilla/handlers v1.5.1
@ -49,13 +49,13 @@ require (
github.com/yuin/goldmark-emoji v1.0.1 github.com/yuin/goldmark-emoji v1.0.1
go.uber.org/multierr v1.6.0 // indirect go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.16.0 // indirect go.uber.org/zap v1.16.0 // indirect
golang.org/x/crypto v0.0.0-20201117144127-c1f2f97bffc9 golang.org/x/crypto v0.0.0-20201124201722-c8d3bf9c5392
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b // indirect golang.org/x/net v0.0.0-20201110031124-69a78807bb2b // indirect
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 // indirect golang.org/x/sys v0.0.0-20201119102817-f84b799fce68 // indirect
golang.org/x/text v0.3.4 // indirect golang.org/x/text v0.3.4 // indirect
golang.org/x/tools v0.0.0-20201121010211-780cb80bd7fb // indirect golang.org/x/tools v0.0.0-20201124202034-299f270db459 // indirect
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
gopkg.in/ini.v1 v1.62.0 // indirect gopkg.in/ini.v1 v1.62.0 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect gopkg.in/yaml.v2 v2.3.0 // indirect

12
go.sum
View File

@ -100,8 +100,8 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.3 h1:x95R7cp+rSeeqAMI2knLtQ0DKlaBhv2NrtrOvafPHRo= github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
@ -353,8 +353,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201117144127-c1f2f97bffc9 h1:phUcVbl53swtrUN8kQEXFhUxPlIlWyBfKmidCu7P95o= golang.org/x/crypto v0.0.0-20201124201722-c8d3bf9c5392 h1:xYJJ3S178yv++9zXV/hnr29plCAGO9vAFG9dorqaFQc=
golang.org/x/crypto v0.0.0-20201117144127-c1f2f97bffc9/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20201124201722-c8d3bf9c5392/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@ -474,8 +474,8 @@ golang.org/x/tools v0.0.0-20191216052735-49a3e744a425 h1:VvQyQJN0tSuecqgcIxMWnnf
golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200410194907-79a7a3126eef/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200410194907-79a7a3126eef/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20201121010211-780cb80bd7fb h1:z5+u0pkAUPUWd3taoTialQ2JAMo4Wo1Z3L25U4ZV9r0= golang.org/x/tools v0.0.0-20201124202034-299f270db459 h1:XrUnpqJ8xqeZHrgPu3FuYCv9/O3MrxnIKh5/+MLDE8Q=
golang.org/x/tools v0.0.0-20201121010211-780cb80bd7fb/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201124202034-299f270db459/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=