mirror of https://github.com/jlelse/GoBlog
Improve activitypub sending
parent
baf9c63e56
commit
2e9b0ef4df
@ -0,0 +1,132 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/joncrlsn/dque"
|
||||
)
|
||||
|
||||
var apQueue *dque.DQue
|
||||
|
||||
type apRequest struct {
|
||||
BlogIri, To string
|
||||
Activity []byte
|
||||
Try int
|
||||
LastTry int64
|
||||
}
|
||||
|
||||
func apRequestBuilder() interface{} {
|
||||
return &apRequest{}
|
||||
}
|
||||
|
||||
func initAPSendQueue() (err error) {
|
||||
queuePath := "queues"
|
||||
if _, err := os.Stat(queuePath); os.IsNotExist(err) {
|
||||
os.Mkdir(queuePath, 0755)
|
||||
}
|
||||
apQueue, err = dque.NewOrOpen("activitypub", queuePath, 5, apRequestBuilder)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
startAPSendQueue()
|
||||
return nil
|
||||
}
|
||||
|
||||
func startAPSendQueue() {
|
||||
go func() {
|
||||
for {
|
||||
if rInterface, err := apQueue.PeekBlock(); err == nil {
|
||||
if rInterface == nil {
|
||||
// Empty request
|
||||
_, _ = apQueue.Dequeue()
|
||||
continue
|
||||
}
|
||||
if r, ok := rInterface.(*apRequest); ok {
|
||||
if r.LastTry != 0 && time.Now().Unix()+r.LastTry < int64(r.Try*5*60) {
|
||||
apQueue.Enqueue(r)
|
||||
} else {
|
||||
// Send request
|
||||
if err := apSendSigned(r.BlogIri, r.To, r.Activity); err != nil {
|
||||
if r.Try++; r.Try < 4 {
|
||||
// Try it again
|
||||
r.LastTry = time.Now().Unix()
|
||||
apQueue.Enqueue(r)
|
||||
} else {
|
||||
log.Println("Failed for the 3rd time:", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
// Finish
|
||||
_, _ = apQueue.Dequeue()
|
||||
time.Sleep(1 * time.Second)
|
||||
} else {
|
||||
// Invalid type
|
||||
_, _ = apQueue.Dequeue()
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func apQueueSendSigned(blogIri, to string, activity interface{}) error {
|
||||
body, err := json.Marshal(activity)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = apQueue.Enqueue(&apRequest{
|
||||
BlogIri: blogIri,
|
||||
To: to,
|
||||
Activity: body,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func apSendSigned(blogIri, to string, activity []byte) error {
|
||||
// Copy activity to sign it
|
||||
activityCopy := make([]byte, len(activity))
|
||||
copy(activityCopy, activity)
|
||||
// Create request context with timeout
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer cancel()
|
||||
// Create request
|
||||
r, err := http.NewRequestWithContext(ctx, http.MethodPost, to, bytes.NewBuffer(activity))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
iri, err := url.Parse(to)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Header.Set("Accept-Charset", "utf-8")
|
||||
r.Header.Set("Date", time.Now().UTC().Format("Mon, 02 Jan 2006 15:04:05")+" GMT")
|
||||
r.Header.Set(userAgent, appUserAgent)
|
||||
r.Header.Set("Accept", contentTypeASUTF8)
|
||||
r.Header.Set(contentType, contentTypeASUTF8)
|
||||
r.Header.Set("Host", iri.Host)
|
||||
// Sign request
|
||||
apPostSignMutex.Lock()
|
||||
err = apPostSigner.SignRequest(apPrivateKey, blogIri+"#main-key", r, activityCopy)
|
||||
apPostSignMutex.Unlock()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Do request
|
||||
resp, err := http.DefaultClient.Do(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !apRequestIsSuccess(resp.StatusCode) {
|
||||
body, _ := ioutil.ReadAll(resp.Body)
|
||||
_ = resp.Body.Close()
|
||||
return fmt.Errorf("signed request failed with status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue