GoBlog/activityPubSending.go

98 lines
2.3 KiB
Go
Raw Normal View History

2020-11-22 15:10:59 +00:00
package main
import (
"bytes"
"context"
"encoding/gob"
2020-11-22 15:10:59 +00:00
"fmt"
"io"
2020-11-22 15:10:59 +00:00
"log"
"net/http"
"time"
ap "github.com/go-ap/activitypub"
"github.com/go-ap/jsonld"
"go.goblog.app/app/pkgs/bufferpool"
"go.goblog.app/app/pkgs/contenttype"
2020-11-22 15:10:59 +00:00
)
type apRequest struct {
BlogIri, To string
Activity []byte
Try int
}
func (a *goBlog) initAPSendQueue() {
a.listenOnQueue("ap", 30*time.Second, func(qi *queueItem, dequeue func(), reschedule func(time.Duration)) {
var r apRequest
if err := gob.NewDecoder(bytes.NewReader(qi.content)).Decode(&r); err != nil {
log.Println("activitypub queue:", err.Error())
dequeue()
return
}
if err := a.apSendSigned(r.BlogIri, r.To, r.Activity); err != nil {
if r.Try++; r.Try < 20 {
// Try it again
buf := bufferpool.Get()
_ = r.encode(buf)
qi.content = buf.Bytes()
reschedule(time.Duration(r.Try) * 10 * time.Minute)
bufferpool.Put(buf)
return
2020-11-22 15:10:59 +00:00
}
log.Println("AP request failed for the 20th time:", r.To)
_ = a.db.apRemoveInbox(r.To)
2020-11-22 15:10:59 +00:00
}
dequeue()
})
2020-11-22 15:10:59 +00:00
}
func (a *goBlog) apQueueSendSigned(blogIri, to string, activity any) error {
body, err := jsonld.WithContext(jsonld.IRI(ap.ActivityBaseURI), jsonld.IRI(ap.SecurityContextURI)).Marshal(activity)
2020-11-22 15:10:59 +00:00
if err != nil {
return err
}
buf := bufferpool.Get()
defer bufferpool.Put(buf)
if err := (&apRequest{
2020-11-22 15:10:59 +00:00
BlogIri: blogIri,
To: to,
Activity: body,
}).encode(buf); err != nil {
return err
}
return a.enqueue("ap", buf.Bytes(), time.Now())
}
func (r *apRequest) encode(w io.Writer) error {
return gob.NewEncoder(w).Encode(r)
2020-11-22 15:10:59 +00:00
}
func (a *goBlog) apSendSigned(blogIri, to string, activity []byte) error {
2020-11-22 15:10:59 +00:00
// Create request context with timeout
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
// Create request
r, err := http.NewRequestWithContext(ctx, http.MethodPost, to, bytes.NewReader(activity))
2020-11-22 15:10:59 +00:00
if err != nil {
return err
}
r.Header.Set("Accept-Charset", "utf-8")
r.Header.Set("Accept", contenttype.ASUTF8)
r.Header.Set(contentType, contenttype.ASUTF8)
2020-11-22 15:10:59 +00:00
// Sign request
if err = a.signRequest(r, blogIri); err != nil {
2020-11-22 15:10:59 +00:00
return err
}
// Do request
2021-06-19 06:37:16 +00:00
resp, err := a.httpClient.Do(r)
2020-11-22 15:10:59 +00:00
if err != nil {
return err
}
_ = resp.Body.Close()
2020-11-22 15:10:59 +00:00
if !apRequestIsSuccess(resp.StatusCode) {
2022-02-11 23:48:59 +00:00
return fmt.Errorf("signed request failed with status %d", resp.StatusCode)
2020-11-22 15:10:59 +00:00
}
return nil
}