I'd like to tail -f several log files in /var/log (I thing one goroutine per log would be fine) and every goroutine will keep on "watching" forever on log and send every new line with kafka client sarama. Here is my code (does not work) :
package main
import (
"flag"
"github.com/Shopify/sarama"
"log"
"os"
"fmt"
"strings"
"github.com/hpcloud/tail"
"github.com/spf13/viper"
//"io/ioutil"
"reflect"
)
func produce(producer sarama.SyncProducer, cfg *sarama.Config, brokers *string, topic string, logger *log.Logger, log string, t *tail.Tail){
logger.Printf("Entering produce")
logger.Println(strings.Split(*brokers, ","))
logger.Println(reflect.TypeOf(strings.Split(*brokers, ",")))
logger.Println(log)
/*t, err := tail.TailFile(log, tail.Config{Follow: true, ReOpen: true})
if err != nil {
fmt.Println(fmt.Errorf("Error with tail: %v
", err.Error()))
}*/
for line := range t.Lines {
//logger.Println(line)
//logger.Println(line.Text)
msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(line.Text)}
_, _, err := producer.SendMessage(msg)
if err != nil {
logger.Printf("FAILED to send message: %s
", err)
}
}
}
func main() {
//Getting config file params
viper.SetConfigName("config")
viper.AddConfigPath("/root/work/src/linux2kafka/")
err := viper.ReadInConfig()
if err != nil {
panic(err)
}
viper.WatchConfig()
logList := viper.Get("log_list")
//logListString, err := ioutil.ReadFile(logList.(string))
//fmt.Println(logList)
//fmt.Println(reflect.TypeOf(logList))
logsConfig := strings.Split(logList.(string),",")
// print logs to watch
/*for i := range logsConfig {
fmt.Println(logsConfig[i])
}*/
brokerList := viper.Get("brokerList")
brokers := flag.String("brokers", brokerList.(string), "Comma separated kafka brokers list") //must be set in config.toml
topic := flag.String("topic", "test0", "Kafka topic to send messages to")
flag.Parse()
logger := log.New(os.Stdout, "producer ", log.Lmicroseconds)
cfg := sarama.NewConfig()
//Wait for replication
cfg.Producer.RequiredAcks = -1
cfg.Producer.Flush.Frequency = 333
cfg.Producer.Flush.Messages = 1000
cfg.Producer.Flush.MaxMessages = 3000
producer, err := sarama.NewSyncProducer(strings.Split(*brokers, ","), cfg)
if err != nil {
logger.Fatalln(err)
}
defer func() {
if err := producer.Close(); err != nil {
logger.Fatalln(err)
}
}()
for i := range logsConfig {
fmt.Println("go")
t, err := tail.TailFile(logsConfig[i], tail.Config{Follow: true, ReOpen: true})
if err != nil {
fmt.Println(fmt.Errorf("Error with tail: %v
", err.Error()))
}
go produce(producer, cfg, brokers, *topic, logger, logsConfig[i], t)
}
}
And here are my errors (nothing is received by the consumer):
root@home:~/work/src/linux2kafka# go run main.go
go
producer 15:54:44.297745 Entering produce
root@home:~/work/src/linux2kafka# go run main.go
go
root@home:~/work/src/linux2kafka# go run main.go
go
root@home:~/work/src/linux2kafka# go run main.go
go
producer 15:55:01.951155 Entering produce
producer 15:55:01.951193 [localhost:9092 localhost:9092]
producer 15:55:01.951205 []string
producer 15:55:01.951214 /root/work/src/linux2kafka/test/log
panic: send on closed channel
goroutine 56 [running]:
panic(0x756440, 0xc820164290)
/usr/local/go/src/runtime/panic.go:464 +0x3e6
github.com/Shopify/sarama.(*syncProducer).SendMessage(0xc8201742a0, 0xc820176300, 0x0, 0x0, 0x0, 0x0)
/root/work/src/github.com/Shopify/sarama/sync_producer.go:66 +0x156
main.produce(0x7f65528661b8, 0xc8201742a0, 0xc82008ea20, 0xc82000b230, 0x8855c0, 0x5, 0xc8200789b0, 0xc820011320, 0x23, 0xc82017e000)
/root/work/src/linux2kafka/main.go:31 +0x5d2
created by main.main
/root/work/src/linux2kafka/main.go:85 +0x9f7
exit status 2
I cannot understand what I'm doing wrong. thx
Here is the modified code, is that ok ? :
package main
import (
"flag"
"github.com/Shopify/sarama"
"log"
"os"
"fmt"
"strings"
"github.com/hpcloud/tail"
"github.com/spf13/viper"
//"io/ioutil"
//"reflect"
)
//func produce(producer sarama.SyncProducer, cfg *sarama.Config, brokers *string, topic string, logger *log.Logger, log string, t *tail.Tail){
func produce(cfg *sarama.Config, brokers *string, topic string, logger *log.Logger, log string, t *tail.Tail){
logger.Println("Entering produce")
/*logger.Println(strings.Split(*brokers, ","))
logger.Println(reflect.TypeOf(strings.Split(*brokers, ",")))
logger.Println(log)*/
logger.Printf("sarama.NewSyncProducer")
producer, err := sarama.NewSyncProducer(strings.Split(*brokers, ","), cfg)
if err != nil {
logger.Fatalln(err)
}
defer func() {
if err := producer.Close(); err != nil {
logger.Fatalln(err)
}
}()
/*t, err := tail.TailFile(log, tail.Config{Follow: true, ReOpen: true})
if err != nil {
fmt.Println(fmt.Errorf("Error with tail: %v
", err.Error()))
}*/
for line := range t.Lines {
//logger.Println(line)
//logger.Println(line.Text)
logger.Printf("ProduceMessage")
msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(line.Text)}
_, _, err := producer.SendMessage(msg)
if err != nil {
logger.Printf("FAILED to send message: %s
", err)
}
}
}
func main() {
//Getting config file params
viper.SetConfigName("config")
viper.AddConfigPath("/root/work/src/linux2kafka/")
err := viper.ReadInConfig()
if err != nil {
panic(err)
}
viper.WatchConfig()
logList := viper.Get("log_list")
//logListString, err := ioutil.ReadFile(logList.(string))
//fmt.Println(logList)
//fmt.Println(reflect.TypeOf(logList))
logsConfig := strings.Split(logList.(string),",")
// print logs to watch
/*for i := range logsConfig {
fmt.Println(logsConfig[i])
}*/
brokerList := viper.Get("brokerList")
brokers := flag.String("brokers", brokerList.(string), "Comma separated kafka brokers list") //must be set in config.toml
topic := flag.String("topic", "test0", "Kafka topic to send messages to")
flag.Parse()
logger := log.New(os.Stdout, "producer ", log.Lmicroseconds)
cfg := sarama.NewConfig()
//Wait for replication
cfg.Producer.RequiredAcks = -1
cfg.Producer.Flush.Frequency = 333
cfg.Producer.Flush.Messages = 1000
cfg.Producer.Flush.MaxMessages = 3000
for i := range logsConfig {
fmt.Println("go")
t, err := tail.TailFile(logsConfig[i], tail.Config{Follow: true, ReOpen: true})
if err != nil {
fmt.Println(fmt.Errorf("Error with tail: %v
", err.Error()))
}
go produce(cfg, brokers, *topic, logger, logsConfig[i], t)
}
}
But still not working... it's not printing the first Println
root@home:~/work/src/linux2kafka# go run main.go
go
root@home:~/work/src/linux2kafka# go run main.go
go
root@home:~/work/src/linux2kafka# go run main.go
go