douzong6649 2016-04-20 14:02
浏览 99

tail -f几个日志,然后使用sarama将每行换发到kafka

I'd like to tail -f several log files in /var/log (I thing one goroutine per log would be fine) and every goroutine will keep on "watching" forever on log and send every new line with kafka client sarama. Here is my code (does not work) :

package main

import (
    "flag"
    "github.com/Shopify/sarama"
    "log"
    "os"
    "fmt"
    "strings"
    "github.com/hpcloud/tail"
    "github.com/spf13/viper"
    //"io/ioutil"
    "reflect"
)



func produce(producer sarama.SyncProducer, cfg *sarama.Config, brokers *string, topic string, logger *log.Logger, log string, t *tail.Tail){
    logger.Printf("Entering produce")
    logger.Println(strings.Split(*brokers, ","))
    logger.Println(reflect.TypeOf(strings.Split(*brokers, ",")))
    logger.Println(log)
    /*t, err := tail.TailFile(log, tail.Config{Follow: true, ReOpen: true})
    if err != nil {
        fmt.Println(fmt.Errorf("Error with tail: %v
", err.Error()))
    }*/
    for line := range t.Lines {
        //logger.Println(line)
        //logger.Println(line.Text)
        msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(line.Text)}
        _, _, err := producer.SendMessage(msg)
        if err != nil {
            logger.Printf("FAILED to send message: %s
", err)
        }
    }

}


func main() {
    //Getting config file params
    viper.SetConfigName("config") 
    viper.AddConfigPath("/root/work/src/linux2kafka/")
    err := viper.ReadInConfig()
    if err != nil {
        panic(err)
    }
    viper.WatchConfig()
    logList := viper.Get("log_list")
    //logListString, err := ioutil.ReadFile(logList.(string))
    //fmt.Println(logList)
    //fmt.Println(reflect.TypeOf(logList))
    logsConfig := strings.Split(logList.(string),",")
    // print logs to watch
    /*for i := range logsConfig {
        fmt.Println(logsConfig[i])
    }*/
    brokerList := viper.Get("brokerList")
    brokers := flag.String("brokers", brokerList.(string), "Comma separated kafka brokers list") //must be set in config.toml
    topic := flag.String("topic", "test0", "Kafka topic to send messages to")
    flag.Parse()
    logger := log.New(os.Stdout, "producer ", log.Lmicroseconds)
    cfg := sarama.NewConfig()
    //Wait for replication
    cfg.Producer.RequiredAcks = -1
    cfg.Producer.Flush.Frequency = 333
    cfg.Producer.Flush.Messages = 1000
    cfg.Producer.Flush.MaxMessages = 3000
    producer, err := sarama.NewSyncProducer(strings.Split(*brokers, ","), cfg)
    if err != nil {
        logger.Fatalln(err)
    }

    defer func() {
        if err := producer.Close(); err != nil {
            logger.Fatalln(err)
        }
    }()
    for i := range logsConfig {
        fmt.Println("go")
        t, err := tail.TailFile(logsConfig[i], tail.Config{Follow: true, ReOpen: true})
        if err != nil {
            fmt.Println(fmt.Errorf("Error with tail: %v
", err.Error()))
        }
        go produce(producer, cfg, brokers, *topic, logger, logsConfig[i], t)
        }
}

And here are my errors (nothing is received by the consumer):

root@home:~/work/src/linux2kafka# go run main.go
go
producer 15:54:44.297745 Entering produce
root@home:~/work/src/linux2kafka# go run main.go
go
root@home:~/work/src/linux2kafka# go run main.go
go
root@home:~/work/src/linux2kafka# go run main.go
go
producer 15:55:01.951155 Entering produce
producer 15:55:01.951193 [localhost:9092 localhost:9092]
producer 15:55:01.951205 []string
producer 15:55:01.951214 /root/work/src/linux2kafka/test/log
panic: send on closed channel

goroutine 56 [running]:
panic(0x756440, 0xc820164290)
    /usr/local/go/src/runtime/panic.go:464 +0x3e6
github.com/Shopify/sarama.(*syncProducer).SendMessage(0xc8201742a0, 0xc820176300, 0x0, 0x0, 0x0, 0x0)
    /root/work/src/github.com/Shopify/sarama/sync_producer.go:66 +0x156
main.produce(0x7f65528661b8, 0xc8201742a0, 0xc82008ea20, 0xc82000b230, 0x8855c0, 0x5, 0xc8200789b0, 0xc820011320, 0x23, 0xc82017e000)
    /root/work/src/linux2kafka/main.go:31 +0x5d2
created by main.main
    /root/work/src/linux2kafka/main.go:85 +0x9f7
exit status 2

I cannot understand what I'm doing wrong. thx

Here is the modified code, is that ok ? :

package main

import (
    "flag"
    "github.com/Shopify/sarama"
    "log"
    "os"
    "fmt"
    "strings"
    "github.com/hpcloud/tail"
    "github.com/spf13/viper"
    //"io/ioutil"
    //"reflect"
)



//func produce(producer sarama.SyncProducer, cfg *sarama.Config, brokers *string, topic string, logger *log.Logger, log string, t *tail.Tail){
func produce(cfg *sarama.Config, brokers *string, topic string, logger *log.Logger, log string, t *tail.Tail){
    logger.Println("Entering produce")
    /*logger.Println(strings.Split(*brokers, ","))
    logger.Println(reflect.TypeOf(strings.Split(*brokers, ",")))
    logger.Println(log)*/
    logger.Printf("sarama.NewSyncProducer")
    producer, err := sarama.NewSyncProducer(strings.Split(*brokers, ","), cfg)
    if err != nil {
        logger.Fatalln(err)
    }
    defer func() {
        if err := producer.Close(); err != nil {
            logger.Fatalln(err)
        }
    }()

    /*t, err := tail.TailFile(log, tail.Config{Follow: true, ReOpen: true})
    if err != nil {
        fmt.Println(fmt.Errorf("Error with tail: %v
", err.Error()))
    }*/
    for line := range t.Lines {
        //logger.Println(line)
        //logger.Println(line.Text)
        logger.Printf("ProduceMessage")
        msg := &sarama.ProducerMessage{Topic: topic, Value: sarama.StringEncoder(line.Text)}
        _, _, err := producer.SendMessage(msg)
        if err != nil {
            logger.Printf("FAILED to send message: %s
", err)
        }
    }

}


func main() {
    //Getting config file params
    viper.SetConfigName("config") 
    viper.AddConfigPath("/root/work/src/linux2kafka/")
    err := viper.ReadInConfig()
    if err != nil {
        panic(err)
    }
    viper.WatchConfig()
    logList := viper.Get("log_list")
    //logListString, err := ioutil.ReadFile(logList.(string))
    //fmt.Println(logList)
    //fmt.Println(reflect.TypeOf(logList))
    logsConfig := strings.Split(logList.(string),",")
    // print logs to watch
    /*for i := range logsConfig {
        fmt.Println(logsConfig[i])
    }*/
    brokerList := viper.Get("brokerList")
    brokers := flag.String("brokers", brokerList.(string), "Comma separated kafka brokers list") //must be set in config.toml
    topic := flag.String("topic", "test0", "Kafka topic to send messages to")
    flag.Parse()
    logger := log.New(os.Stdout, "producer ", log.Lmicroseconds)
    cfg := sarama.NewConfig()
    //Wait for replication
    cfg.Producer.RequiredAcks = -1
    cfg.Producer.Flush.Frequency = 333
    cfg.Producer.Flush.Messages = 1000
    cfg.Producer.Flush.MaxMessages = 3000
    for i := range logsConfig {
        fmt.Println("go")
        t, err := tail.TailFile(logsConfig[i], tail.Config{Follow: true, ReOpen: true})
        if err != nil {
            fmt.Println(fmt.Errorf("Error with tail: %v
", err.Error()))
        }
        go produce(cfg, brokers, *topic, logger, logsConfig[i], t)
        }
}

But still not working... it's not printing the first Println

root@home:~/work/src/linux2kafka# go run main.go
go
root@home:~/work/src/linux2kafka# go run main.go
go
root@home:~/work/src/linux2kafka# go run main.go
go
  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥15 python变量和列表之间的相互影响
    • ¥15 写一个方法checkPerson,入参实体类Person,出参布尔值
    • ¥15 我想咨询一下路面纹理三维点云数据处理的一些问题,上传的坐标文件里是怎么对无序点进行编号的,以及xy坐标在处理的时候是进行整体模型分片处理的吗
    • ¥15 CSAPPattacklab
    • ¥15 一直显示正在等待HID—ISP
    • ¥15 Python turtle 画图
    • ¥15 关于大棚监测的pcb板设计
    • ¥15 stm32开发clion时遇到的编译问题
    • ¥15 lna设计 源简并电感型共源放大器
    • ¥15 如何用Labview在myRIO上做LCD显示?(语言-开发语言)