GoBlog/activityPubSending.go

131 lines
2.8 KiB
Go
Raw Normal View History

2020-11-22 15:10:59 +00:00
package main
import (
"bytes"
"context"
"encoding/gob"
2021-01-21 16:59:47 +00:00
"encoding/json"
2020-11-22 15:10:59 +00:00
"fmt"
2021-02-17 07:23:03 +00:00
"io"
2020-11-22 15:10:59 +00:00
"log"
"net/http"
"net/url"
"time"
)
type apRequest struct {
BlogIri, To string
Activity []byte
Try int
}
func initAPSendQueue() (err error) {
startAPSendQueue()
return nil
}
func startAPSendQueue() {
go func() {
for {
time.Sleep(3 * time.Second)
qi, err := peekQueue("ap")
if err != nil {
log.Println(err.Error())
continue
} else if qi != nil {
var r apRequest
err = gob.NewDecoder(bytes.NewReader(qi.content)).Decode(&r)
if err != nil {
log.Println(err.Error())
_ = qi.dequeue()
2020-11-22 15:10:59 +00:00
continue
}
if err := apSendSigned(r.BlogIri, r.To, r.Activity); err != nil {
if r.Try++; r.Try < 20 {
// Try it again
qi.content, _ = r.encode()
_ = qi.reschedule(time.Duration(r.Try) * 10 * time.Minute)
continue
2020-11-22 15:10:59 +00:00
} else {
log.Printf("Request to %s failed for the 20th time", r.To)
log.Println()
_ = apRemoveInbox(r.To)
2020-11-22 15:10:59 +00:00
}
}
err = qi.dequeue()
if err != nil {
log.Println(err.Error())
2020-11-22 15:10:59 +00:00
}
}
}
}()
}
func apQueueSendSigned(blogIri, to string, activity interface{}) error {
body, err := json.Marshal(activity)
if err != nil {
return err
}
b, err := (&apRequest{
2020-11-22 15:10:59 +00:00
BlogIri: blogIri,
To: to,
Activity: body,
}).encode()
if err != nil {
return err
}
return enqueue("ap", b, time.Now())
}
func (r *apRequest) encode() ([]byte, error) {
var buf bytes.Buffer
err := gob.NewEncoder(&buf).Encode(r)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
2020-11-22 15:10:59 +00:00
}
func apSendSigned(blogIri, to string, activity []byte) error {
// Create request context with timeout
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
// Create request
2021-03-19 13:26:45 +00:00
var requestBuffer bytes.Buffer
requestBuffer.Write(activity)
r, err := http.NewRequestWithContext(ctx, http.MethodPost, to, &requestBuffer)
2020-11-22 15:10:59 +00:00
if err != nil {
return err
}
iri, err := url.Parse(to)
if err != nil {
return err
}
r.Header.Set("Accept-Charset", "utf-8")
r.Header.Set("Date", time.Now().UTC().Format("Mon, 02 Jan 2006 15:04:05")+" GMT")
r.Header.Set(userAgent, appUserAgent)
r.Header.Set("Accept", contentTypeASUTF8)
r.Header.Set(contentType, contentTypeASUTF8)
r.Header.Set("Host", iri.Host)
// Sign request
apPostSignMutex.Lock()
2021-03-19 13:26:45 +00:00
err = apPostSigner.SignRequest(apPrivateKey, blogIri+"#main-key", r, activity)
2020-11-22 15:10:59 +00:00
apPostSignMutex.Unlock()
if err != nil {
return err
}
// Do request
2021-03-31 07:29:52 +00:00
resp, err := appHttpClient.Do(r)
2020-11-22 15:10:59 +00:00
if err != nil {
return err
}
2021-03-31 07:29:52 +00:00
defer resp.Body.Close()
2020-11-22 15:10:59 +00:00
if !apRequestIsSuccess(resp.StatusCode) {
2021-02-17 07:23:03 +00:00
body, _ := io.ReadAll(resp.Body)
2020-11-22 15:10:59 +00:00
return fmt.Errorf("signed request failed with status %d: %s", resp.StatusCode, string(body))
2021-03-31 07:29:52 +00:00
} else {
_, _ = io.Copy(io.Discard, resp.Body)
2020-11-22 15:10:59 +00:00
}
return nil
}