dragon7088 2019-09-10 03:45
浏览 195

如何使用Jaeger追踪Kafka生产者和消费者

I want to tracing kafka producer and consumer by using jaeger and Go, but I can not get the chain of the span for the producer's tracer and consumer's tracer. Here is the producer's code:

func (p *Producer) WriteMessages(ctx context.Context, messages ...kafka.Message) error {
    topic := p.Stats().Topic
    for _, message := range messages {
        // span := opentracing.GlobalTracer().StartSpan("TO_"+topic, ext.SpanKindProducer)
        span, _ := opentracing.StartSpanFromContext(ctx, "TO_"+topic, ext.SpanKindProducer)
        ext.Component.Set(span, "golang-kafka")
        ext.PeerService.Set(span, "kafka")
        ext.MessageBusDestination.Set(span, topic)
        headers := make(map[string]string)
        opentracing.GlobalTracer().Inject(
            span.Context(),
            opentracing.TextMap,
            opentracing.TextMapCarrier(headers),
        )
        for key, value := range headers {
            header := kafka.Header{
                Key:   key,
                Value: []byte(value),
            }
            message.Headers = append(message.Headers, header)
        }
        span.Finish()
    }

    return p.Writer.WriteMessages(ctx, messages...)
}

and the consumer's code:

func (c *Consumer) ReadMessage(ctx context.Context) (kafka.Message, error) {
    message, err := c.Reader.ReadMessage(ctx)
    if err != nil {
        return kafka.Message{}, err
    }
    topic, partition, offset := message.Topic, message.Partition, message.Offset
    headers := make(map[string]string)
    for _, header := range message.Headers {
        headers[header.Key] = string(header.Value)
    }
    spanContext, _ := opentracing.GlobalTracer().Extract(
        opentracing.TextMap,
        opentracing.TextMapCarrier(headers),
    )
    span, _ := opentracing.StartSpanFromContext(ctx, "FROM_"+topic, opentracing.FollowsFrom(spanContext), ext.SpanKindConsumer)
    // span := opentracing.StartSpan("FROM_"+topic, opentracing.FollowsFrom(spanContext), ext.SpanKindConsumer)
    ext.Component.Set(span, "golang-kafka")
    ext.PeerService.Set(span, "kafka")
    span.SetTag("topic", topic)
    span.SetTag("partition", partition)
    span.SetTag("offset", offset)
    span.Finish()

    return message, err
}

I can get the producer chain and the consumer chain, but I can not chain them together.

  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥15 拟通过pc下指令到安卓系统,如果追求响应速度,尽可能无延迟,是不是用安卓模拟器会优于实体的安卓手机?如果是,可以快多少毫秒?
    • ¥20 神经网络Sequential name=sequential, built=False
    • ¥16 Qphython 用xlrd读取excel报错
    • ¥15 单片机学习顺序问题!!
    • ¥15 ikuai客户端多拨vpn,重启总是有个别重拨不上
    • ¥20 关于#anlogic#sdram#的问题,如何解决?(关键词-performance)
    • ¥15 相敏解调 matlab
    • ¥15 求lingo代码和思路
    • ¥15 公交车和无人机协同运输
    • ¥15 stm32代码移植没反应