1
mirror of https://github.com/jlelse/GoBlog synced 2024-06-15 11:37:09 +00:00
GoBlog/activityPubSending.go

139 lines
3.1 KiB
Go
Raw Normal View History

2020-11-22 15:10:59 +00:00
package main
import (
"bytes"
"context"
2021-01-21 16:59:47 +00:00
"encoding/json"
2020-11-22 15:10:59 +00:00
"fmt"
2021-02-17 07:23:03 +00:00
"io"
2020-11-22 15:10:59 +00:00
"log"
"net/http"
"net/url"
"os"
"time"
"github.com/joncrlsn/dque"
)
var apQueue *dque.DQue
type apRequest struct {
BlogIri, To string
Activity []byte
Try int
LastTry int64
}
func apRequestBuilder() interface{} {
return &apRequest{}
}
func initAPSendQueue() (err error) {
queuePath := "queues"
if _, err := os.Stat(queuePath); os.IsNotExist(err) {
2021-02-08 17:51:07 +00:00
if err = os.Mkdir(queuePath, 0755); err != nil {
return err
}
} else if err != nil {
return err
2020-11-22 15:10:59 +00:00
}
apQueue, err = dque.NewOrOpen("activitypub", queuePath, 1000, apRequestBuilder)
2020-11-22 15:10:59 +00:00
if err != nil {
return err
}
startAPSendQueue()
return nil
}
func startAPSendQueue() {
go func() {
for {
if rInterface, err := apQueue.PeekBlock(); err == nil {
if rInterface == nil {
// Empty request
_, _ = apQueue.Dequeue()
continue
}
if r, ok := rInterface.(*apRequest); ok {
if r.LastTry != 0 && time.Now().Before(time.Unix(r.LastTry, 0).Add(time.Duration(r.Try)*10*time.Minute)) {
2021-02-08 17:51:07 +00:00
_ = apQueue.Enqueue(r)
2020-11-22 15:10:59 +00:00
} else {
// Send request
if err := apSendSigned(r.BlogIri, r.To, r.Activity); err != nil {
if r.Try++; r.Try < 21 {
2020-11-22 15:10:59 +00:00
// Try it again
r.LastTry = time.Now().Unix()
2021-02-08 17:51:07 +00:00
_ = apQueue.Enqueue(r)
2020-11-22 15:10:59 +00:00
} else {
log.Printf("Request to %s failed for the 20th time", r.To)
2020-11-22 17:29:46 +00:00
log.Println()
2021-02-08 17:51:07 +00:00
_ = apRemoveInbox(r.To)
2020-11-22 15:10:59 +00:00
}
}
}
// Finish
_, _ = apQueue.Dequeue()
time.Sleep(1 * time.Second)
} else {
// Invalid type
_, _ = apQueue.Dequeue()
}
}
}
}()
}
func apQueueSendSigned(blogIri, to string, activity interface{}) error {
body, err := json.Marshal(activity)
if err != nil {
return err
}
2020-11-25 11:36:14 +00:00
return apQueue.Enqueue(&apRequest{
2020-11-22 15:10:59 +00:00
BlogIri: blogIri,
To: to,
Activity: body,
})
}
func apSendSigned(blogIri, to string, activity []byte) error {
// Copy activity to sign it
activityCopy := make([]byte, len(activity))
copy(activityCopy, activity)
// Create request context with timeout
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
// Create request
r, err := http.NewRequestWithContext(ctx, http.MethodPost, to, bytes.NewBuffer(activity))
if err != nil {
return err
}
iri, err := url.Parse(to)
if err != nil {
return err
}
r.Header.Set("Accept-Charset", "utf-8")
r.Header.Set("Date", time.Now().UTC().Format("Mon, 02 Jan 2006 15:04:05")+" GMT")
r.Header.Set(userAgent, appUserAgent)
r.Header.Set("Accept", contentTypeASUTF8)
r.Header.Set(contentType, contentTypeASUTF8)
r.Header.Set("Host", iri.Host)
// Sign request
apPostSignMutex.Lock()
err = apPostSigner.SignRequest(apPrivateKey, blogIri+"#main-key", r, activityCopy)
apPostSignMutex.Unlock()
if err != nil {
return err
}
// Do request
resp, err := http.DefaultClient.Do(r)
if err != nil {
return err
}
if !apRequestIsSuccess(resp.StatusCode) {
2021-02-17 07:23:03 +00:00
body, _ := io.ReadAll(resp.Body)
2020-11-22 15:10:59 +00:00
_ = resp.Body.Close()
return fmt.Errorf("signed request failed with status %d: %s", resp.StatusCode, string(body))
}
return nil
}