This is a follow up to my earlier post:
http://stackoverflow.com/questions/34736825/goroutine-exit-status-2-what-does-it-mean-why-is-it-happening?noredirect=1#comment57238789_34736825
I'm still having trouble figuring out where the channels should be closed, after reading multiple topics and articles both on and off SO.
This program will open a list of files, create an output file for each input file (with the same name),visit all the urls in each input file and get all href links from these - which are saved to the corresponding output file. However, I'm getting the following error:
http://play.golang.org/p/8X-1rM3aXC
The linkgetter, and getHref functions are mainly for processing. Head and tail are run as separate goroutines, and worker does the processing.
package main
import (
"bufio"
"bytes"
"fmt"
"golang.org/x/net/html"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"regexp"
"sync"
)
type Work struct {
Link string
Filename string
}
type Output struct {
Href string
Filename string
}
func getHref(t html.Token) (href string, ok bool) {
// Iterate over all of the Token's attributes until we find an "href"
for _, a := range t.Attr {
if a.Key == "href" {
href = a.Val
ok = true
}
}
return
}
func linkGetter(out chan<- Output, r io.Reader, filename string) {
z := html.NewTokenizer(r)
for {
tt := z.Next()
switch {
case tt == html.ErrorToken:
return
case tt == html.StartTagToken:
t := z.Token()
isAnchor := t.Data == "a"
if !isAnchor {
continue
}
// Extract the href value, if there is one
url, ok := getHref(t)
if !ok {
continue
}
out <- Output{url, filename}
}
}
}
func worker(out chan<- Output, in <-chan Work, wg *sync.WaitGroup) {
defer wg.Done()
for work := range in {
resp, err := http.Get(work.Link)
if err != nil {
continue
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
continue
}
if err = resp.Body.Close(); err != nil {
fmt.Println(err)
}
linkGetter(out, bytes.NewReader(body), work.Filename)
}
}
func head(c chan<- Work) {
r, _ := regexp.Compile("(.*)(?:.json)")
files, _ := filepath.Glob("*.json")
for _, elem := range files {
res := r.FindStringSubmatch(elem)
for k, v := range res {
if k == 0 {
outpath, _ := filepath.Abs(fmt.Sprintf("go_tester/%s", v))
abspath, _ := filepath.Abs(fmt.Sprintf("url_links/%s", v))
f, _ := os.Open(abspath)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
c <- Work{outpath, scanner.Text()}
}
}
}
}
}
func tail(c <-chan Output) {
currentfile := ""
var f *os.File
var err error
for out := range c {
if out.Filename != currentfile {
if err = f.Close(); err != nil { // might cause an error on first run
fmt.Println(err)
}
f, err = os.OpenFile(out.Filename, os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
log.Fatal(err)
}
currentfile = out.Filename
}
if _, err = f.WriteString(out.Href + "
"); err != nil {
fmt.Println(err)
}
}
}
const (
nworkers = 80
)
func main() {
//fmt.Println("hi")
in := make(chan Work)
out := make(chan Output)
go head(in)
go tail(out)
var wg sync.WaitGroup
for i := 0; i < 85; i++ {
wg.Add(1)
go worker(out, in, &wg)
}
close(in)
close(out)
wg.Wait()
}
What is wrong with the way the channels are closed?