diff --git a/check.go b/check.go index 77156c3..25e2d9f 100644 --- a/check.go +++ b/check.go @@ -1,6 +1,7 @@ package main import ( + "context" "crypto/tls" "fmt" "io" @@ -8,95 +9,135 @@ import ( "net/http" "strings" "sync" + "sync/atomic" "time" + + "golang.org/x/sync/singleflight" ) func (a *goBlog) checkAllExternalLinks() { - allPosts, err := a.db.getPosts(&postsRequestConfig{status: statusPublished, withoutParameters: true}) + // Get all published posts without parameters + posts, err := a.db.getPosts(&postsRequestConfig{status: statusPublished, withoutParameters: true}) if err != nil { log.Println(err.Error()) return } - wg := new(sync.WaitGroup) - linkChan := make(chan stringPair) + a.checkLinks(log.Writer(), posts...) +} + +func (a *goBlog) checkLinks(w io.Writer, posts ...*post) error { + // Get all links + allLinks, err := a.allLinks(posts...) + if err != nil { + return err + } + fmt.Fprintln(w, "Checking", len(allLinks), "links") + // Cancel context + var canceled, finished atomic.Value + canceled.Store(false) + finished.Store(false) + cancelContext, cancelFunc := context.WithCancel(context.Background()) + a.shutdown.Add(func() { + if finished.Load().(bool) { + return + } + canceled.Store(true) + cancelFunc() + fmt.Fprintln(w, "Canceled link check") + }) + // Create HTTP client client := &http.Client{ Timeout: 30 * time.Second, Transport: &http.Transport{ - DisableKeepAlives: true, TLSClientConfig: &tls.Config{ InsecureSkipVerify: true, }, + // Limits + DisableKeepAlives: true, + MaxConnsPerHost: 1, }, } - responses := map[string]int{} - rm := sync.RWMutex{} - processFunc := func() { - defer wg.Done() + // Process all links + var wg sync.WaitGroup + var sm sync.Map + var sg singleflight.Group + con := make(chan bool, 5) + for _, l := range allLinks { + con <- true // This waits until there's space in the buffered channel + // Check if check is canceled + if canceled.Load().(bool) { + break + } + // Increase wait group wg.Add(1) - for postLinkPair := range linkChan { - if strings.HasPrefix(postLinkPair.Second, a.cfg.Server.PublicAddress) { - continue + // Start link check + go func(link *stringPair) { + defer func() { + <-con // Read from channel, to let next routine execute + wg.Done() + }() + // Check if link is internal + if strings.HasPrefix(link.Second, a.cfg.Server.PublicAddress) { + return } - rm.RLock() - _, ok := responses[postLinkPair.Second] - rm.RUnlock() - if !ok { - req, err := http.NewRequest(http.MethodGet, postLinkPair.Second, nil) - if err != nil { - fmt.Println(err.Error()) - continue + // Process link + r, err, _ := sg.Do(link.Second, func() (interface{}, error) { + // Check if already cached + if mr, ok := sm.Load(link.Second); ok { + return mr, nil + } + // Do request + req, err := http.NewRequestWithContext(cancelContext, http.MethodGet, link.Second, nil) + if err != nil { + return nil, err } - // User-Agent from Tor req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 6.1; rv:60.0) Gecko/20100101 Firefox/60.0") req.Header.Set("Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8") req.Header.Set("Accept-Language", "en-US,en;q=0.5") resp, err := client.Do(req) if err != nil { - fmt.Println(postLinkPair.Second+" ("+postLinkPair.First+"):", err.Error()) - continue + return nil, err } - status := resp.StatusCode _, _ = io.Copy(io.Discard, resp.Body) - resp.Body.Close() - rm.Lock() - responses[postLinkPair.Second] = status - rm.Unlock() + _ = resp.Body.Close() + // Cache result + sm.Store(link.Second, resp.StatusCode) + // Return result + return resp.StatusCode, nil + }) + // Check error + if err != nil { + if !strings.Contains(err.Error(), "context canceled") { + fmt.Fprintln(w, "Error:", link.Second, err.Error()) + } + return } - rm.RLock() - if response, ok := responses[postLinkPair.Second]; ok && !checkSuccessStatus(response) { - fmt.Println(postLinkPair.Second+" ("+postLinkPair.First+"):", response) + // Check status code + if statusCode := r.(int); !successStatus(statusCode) { + fmt.Fprintln(w, link.Second, "in", link.First, statusCode, http.StatusText(statusCode)) } - rm.RUnlock() - } - } - for i := 0; i < 20; i++ { - go processFunc() - } - err = a.getExternalLinks(allPosts, linkChan) - if err != nil { - log.Println(err.Error()) - return + }(l) } + // Wait for all links to finish wg.Wait() -} - -func checkSuccessStatus(status int) bool { - return status >= 200 && status < 400 -} - -func (a *goBlog) getExternalLinks(posts []*post, linkChan chan<- stringPair) error { - wg := new(sync.WaitGroup) - for _, p := range posts { - wg.Add(1) - go func(p *post) { - defer wg.Done() - links, _ := allLinksFromHTMLString(string(a.absolutePostHTML(p)), a.fullPostURL(p)) - for _, link := range links { - linkChan <- stringPair{a.fullPostURL(p), link} - } - }(p) - } - wg.Wait() - close(linkChan) + // Finish + finished.Store(true) return nil } + +func (a *goBlog) allLinks(posts ...*post) (allLinks []*stringPair, err error) { + for _, p := range posts { + links, err := allLinksFromHTMLString(string(a.absolutePostHTML(p)), a.fullPostURL(p)) + if err != nil { + return nil, err + } + for _, link := range links { + allLinks = append(allLinks, &stringPair{a.fullPostURL(p), link}) + } + } + return allLinks, nil +} + +func successStatus(status int) bool { + return status >= 200 && status < 400 +}