diff --git a/.gitignore b/.gitignore index a919301..9c23928 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ /config.yaml /data /GoBlog -/tmp_assets \ No newline at end of file +/tmp_assets +/queues \ No newline at end of file diff --git a/activityPub.go b/activityPub.go index b0a70d0..c499bf3 100644 --- a/activityPub.go +++ b/activityPub.go @@ -1,8 +1,6 @@ package main import ( - "bytes" - "context" "crypto/rsa" "crypto/x509" "database/sql" @@ -66,6 +64,10 @@ func initActivityPub() error { if err != nil { return err } + // Init send queue + if err = initAPSendQueue(); err != nil { + return err + } return nil } @@ -362,12 +364,7 @@ func apAccept(blogName string, blog *configBlog, follow map[string]interface{}) accept["actor"] = blog.apIri() accept["object"] = follow accept["type"] = "Accept" - err = apSendSigned(blog, accept, follower.Inbox) - if err != nil { - log.Printf("Failed to accept: %s\n%s\n", follower.ID, err.Error()) - return - } - log.Println("Follower accepted:", follower.ID) + apQueueSendSigned(blog.apIri(), follower.Inbox, accept) } func apSendToAllFollowers(blog string, activity interface{}) { @@ -377,64 +374,17 @@ func apSendToAllFollowers(blog string, activity interface{}) { return } - apSendTo(appConfig.Blogs[blog], activity, followers) + apSendTo(appConfig.Blogs[blog].apIri(), activity, followers) } -func apSendTo(blog *configBlog, activity interface{}, followers map[string]string) { +func apSendTo(blogIri string, activity interface{}, followers map[string]string) { for _, i := range followers { go func(inbox string) { - _ = apSendSigned(blog, activity, inbox) + apQueueSendSigned(blogIri, inbox, activity) }(i) } } -func apSendSigned(blog *configBlog, activity interface{}, to string) error { - // Marshal to json - body, err := json.Marshal(activity) - if err != nil { - return err - } - // Copy body to sign it - bodyCopy := make([]byte, len(body)) - copy(bodyCopy, body) - // Create request context with timeout - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - // Create request - r, err := http.NewRequestWithContext(ctx, http.MethodPost, to, bytes.NewBuffer(body)) - if err != nil { - return err - } - iri, err := url.Parse(to) - if err != nil { - return err - } - r.Header.Set("Accept-Charset", "utf-8") - r.Header.Set("Date", time.Now().UTC().Format("Mon, 02 Jan 2006 15:04:05")+" GMT") - r.Header.Set(userAgent, appUserAgent) - r.Header.Set("Accept", contentTypeASUTF8) - r.Header.Set(contentType, contentTypeASUTF8) - r.Header.Set("Host", iri.Host) - // Sign request - apPostSignMutex.Lock() - err = apPostSigner.SignRequest(apPrivateKey, blog.apIri()+"#main-key", r, bodyCopy) - apPostSignMutex.Unlock() - if err != nil { - return err - } - // Do request - resp, err := http.DefaultClient.Do(r) - if err != nil { - return err - } - if !apRequestIsSuccess(resp.StatusCode) { - body, _ := ioutil.ReadAll(resp.Body) - _ = resp.Body.Close() - return fmt.Errorf("signed request failed with status %d: %s", resp.StatusCode, string(body)) - } - return nil -} - func apNewID(blog *configBlog) (hash string, url string) { return hash, blog.apIri() + generateRandomString(16) } diff --git a/activityPubSending.go b/activityPubSending.go new file mode 100644 index 0000000..0786047 --- /dev/null +++ b/activityPubSending.go @@ -0,0 +1,132 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "log" + "net/http" + "net/url" + "os" + "time" + + "github.com/joncrlsn/dque" +) + +var apQueue *dque.DQue + +type apRequest struct { + BlogIri, To string + Activity []byte + Try int + LastTry int64 +} + +func apRequestBuilder() interface{} { + return &apRequest{} +} + +func initAPSendQueue() (err error) { + queuePath := "queues" + if _, err := os.Stat(queuePath); os.IsNotExist(err) { + os.Mkdir(queuePath, 0755) + } + apQueue, err = dque.NewOrOpen("activitypub", queuePath, 5, apRequestBuilder) + if err != nil { + return err + } + startAPSendQueue() + return nil +} + +func startAPSendQueue() { + go func() { + for { + if rInterface, err := apQueue.PeekBlock(); err == nil { + if rInterface == nil { + // Empty request + _, _ = apQueue.Dequeue() + continue + } + if r, ok := rInterface.(*apRequest); ok { + if r.LastTry != 0 && time.Now().Unix()+r.LastTry < int64(r.Try*5*60) { + apQueue.Enqueue(r) + } else { + // Send request + if err := apSendSigned(r.BlogIri, r.To, r.Activity); err != nil { + if r.Try++; r.Try < 4 { + // Try it again + r.LastTry = time.Now().Unix() + apQueue.Enqueue(r) + } else { + log.Println("Failed for the 3rd time:", err.Error()) + } + } + } + // Finish + _, _ = apQueue.Dequeue() + time.Sleep(1 * time.Second) + } else { + // Invalid type + _, _ = apQueue.Dequeue() + } + } + } + }() +} + +func apQueueSendSigned(blogIri, to string, activity interface{}) error { + body, err := json.Marshal(activity) + if err != nil { + return err + } + err = apQueue.Enqueue(&apRequest{ + BlogIri: blogIri, + To: to, + Activity: body, + }) + return err +} + +func apSendSigned(blogIri, to string, activity []byte) error { + // Copy activity to sign it + activityCopy := make([]byte, len(activity)) + copy(activityCopy, activity) + // Create request context with timeout + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + // Create request + r, err := http.NewRequestWithContext(ctx, http.MethodPost, to, bytes.NewBuffer(activity)) + if err != nil { + return err + } + iri, err := url.Parse(to) + if err != nil { + return err + } + r.Header.Set("Accept-Charset", "utf-8") + r.Header.Set("Date", time.Now().UTC().Format("Mon, 02 Jan 2006 15:04:05")+" GMT") + r.Header.Set(userAgent, appUserAgent) + r.Header.Set("Accept", contentTypeASUTF8) + r.Header.Set(contentType, contentTypeASUTF8) + r.Header.Set("Host", iri.Host) + // Sign request + apPostSignMutex.Lock() + err = apPostSigner.SignRequest(apPrivateKey, blogIri+"#main-key", r, activityCopy) + apPostSignMutex.Unlock() + if err != nil { + return err + } + // Do request + resp, err := http.DefaultClient.Do(r) + if err != nil { + return err + } + if !apRequestIsSuccess(resp.StatusCode) { + body, _ := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + return fmt.Errorf("signed request failed with status %d: %s", resp.StatusCode, string(body)) + } + return nil +} diff --git a/go.mod b/go.mod index f790759..5108ecc 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/go-chi/chi v4.1.2+incompatible github.com/go-fed/httpsig v1.0.1-0.20200711113112-812070f75b67 github.com/go-sql-driver/mysql v1.5.0 // indirect + github.com/gofrs/flock v0.8.0 // indirect github.com/goodsign/monday v1.0.1-0.20201007115131-c065b60ec611 github.com/google/go-cmp v0.5.3 // indirect github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect @@ -19,6 +20,7 @@ require ( github.com/gorilla/handlers v1.5.1 github.com/hashicorp/golang-lru v0.5.4 github.com/jeremywohl/flatten v1.0.1 + github.com/joncrlsn/dque v0.0.0-20200702023911-3e80e3146ce5 github.com/json-iterator/go v1.1.10 github.com/klauspost/cpuid v1.3.1 // indirect github.com/kr/pretty v0.2.1 // indirect @@ -32,7 +34,6 @@ require ( github.com/mitchellh/mapstructure v1.3.3 // indirect github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/pelletier/go-toml v1.8.1 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/smartystreets/assertions v1.2.0 // indirect github.com/snabb/sitemap v1.0.0 github.com/spf13/afero v1.4.1 // indirect diff --git a/go.sum b/go.sum index 9552864..7c0e2dc 100644 --- a/go.sum +++ b/go.sum @@ -78,6 +78,10 @@ github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gofrs/flock v0.7.1 h1:DP+LD/t0njgoPBvT5MJLeliUIVQR03hiKR6vezdwHlc= +github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= +github.com/gofrs/flock v0.8.0 h1:MSdYClljsF3PbENUUEx85nkWfJSGfzYI9yEBZOJz6CY= +github.com/gofrs/flock v0.8.0/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -148,6 +152,8 @@ github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= +github.com/joncrlsn/dque v0.0.0-20200702023911-3e80e3146ce5 h1:bo1aoO6l128nKJCBrFflOj9s+KPqMM7ErNyB5GGBNDs= +github.com/joncrlsn/dque v0.0.0-20200702023911-3e80e3146ce5/go.mod h1:dNKs71rs2VJGBAmttu7fouEsRQlRjxy0p1Sx+T5wbpY= github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= @@ -167,6 +173,7 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -501,6 +508,7 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= diff --git a/templateAssets.go b/templateAssets.go index f6a66ec..79e1c26 100644 --- a/templateAssets.go +++ b/templateAssets.go @@ -18,7 +18,6 @@ var assetFiles map[string]string func initTemplateAssets() (err error) { compiledAssetsFolder, err = ioutil.TempDir("", "goblog-assets-*") - // err = os.MkdirAll(compiledAssetsFolder, 0755) if err != nil { return }